# Source code for co2mpas.core.model.physical.gear_box.at_gear.cmv

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and dsp model to model the CMV Approach.
"""
import itertools
import collections
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl
from .core import prediction_gears_gsm, define_gear_filter

dsp = sh.BlueDispatcher(name='Corrected Matrix Velocity Approach')

dsp.add_data('stop_velocity', dfl.values.stop_velocity)

def _correct_gsv(gsv, stop_velocity):
"""
Corrects gear shifting velocity matrix from unreliable limits.

:param gsv:
Gear shifting velocity matrix.
:type gsv: dict

:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float

:return:
Gear shifting velocity matrix corrected from unreliable limits.
:rtype: dict
"""

gsv[0] = [0, (stop_velocity, (dfl.INF, 0))]

# noinspection PyMissingOrEmptyDocstring
def func(x):
return not x and float('inf') or 1 / x

for v0, v1 in sh.pairwise(gsv.values()):
up0, s0, down1, s1 = v0[1][0], v0[1][1][1], v1[0][0], v1[0][1][1]

if down1 + s1 <= v0[0]:
v0[1], v1[0] = up0 + s0, up0 - s0
elif up0 >= down1:
v0[1], v1[0] = up0 + s0, down1 - s1
continue
elif (v0[1][1][0], func(s0)) >= (v1[0][1][0], func(s1)):
v0[1], v1[0] = up0 + s0, up0 - s0
else:
v0[1], v1[0] = down1 + s1, down1 - s1

v0[1] += stop_velocity

gsv[max(gsv)][1] = dfl.INF

return gsv

def _filter_gear_shifting_velocity(limits, stop_velocity):
def _rjt_out(x, default):
if x:
x = np.asarray(x)

# noinspection PyTypeChecker
m, (n, s) = np.median(x), (len(x), np.std(x))

y = s and 2 > (abs(x - m) / s)

if s and y.any():
y = x[y]

# noinspection PyTypeChecker
m, (n, s) = np.median(y), (len(y), np.std(y))

return m, (n, s)
else:
return default

max_gear = max(limits)
gsv = collections.OrderedDict()
for k in range(max_gear + 1):
v0, v1 = limits.get(k, [[], []])
gsv[k] = [_rjt_out(v0, (-1, (0, 0))), _rjt_out(v1, (dfl.INF, (0, 0)))]

return _correct_gsv(gsv, stop_velocity)

def _identify_gear_shifting_velocity_limits(gears, velocities, stop_velocity):
"""
Identifies gear shifting velocity matrix.

:param gears:
Gear vector [-].
:type gears: numpy.array

:param velocities:
Vehicle velocity [km/h].
:type velocities: numpy.array

:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float

:return:
Gear shifting velocity matrix.
:rtype: dict
"""

limits = {}

for v, (g0, g1) in zip(velocities, sh.pairwise(gears)):
if v >= stop_velocity and g0 != g1:
limits[g0] = limits.get(g0, [[], []])
limits[g0][int(g0 < g1)].append(v)
return _filter_gear_shifting_velocity(limits, stop_velocity)

def _interp(r, y, nr):
from scipy.interpolate import InterpolatedUnivariateSpline as Spline
n = len(r)
if n == 0:
return np.nan * nr
elif n == 1:
return (y / r) * nr
else:
return Spline(r, y / r, k=1)(nr) * nr

# noinspection PyPep8Naming
def _convert_limits(lu, vsr, n_vsr, stop_velocity=dfl.values.stop_velocity):
_r, _l, _u = np.array(
sorted([vsr.get(k, 0)] + list(v) for k, v in lu.items())
).T
nk, nr = np.array(sorted(sh.combine_dicts(n_vsr, base={0: 0}).items())).T
klu = sorted(zip(
nk.astype(int),
_interp(_r[2:], _l[2:], nr),
_interp(_r[1:-1], _u[1:-1], nr)
))
res, n0, n1 = {}, min(k for k in n_vsr if k > 0), max(n_vsr)
for k, l, u in klu:
if k == 0:
l, u = list(lu.get(k, [0, stop_velocity + 1]))
else:
if k == n0:
l = lu.get(k, (stop_velocity,))[0]
if k == n1:
u = lu.get(k, (None, dfl.INF))[1]
res[k] = [l, u]
return res

def _grouper(iterable, n):
"""
Collect data into fixed-length chunks or blocks.

:param iterable:
Iterable object.
:param iterable: iter

:param n:
Length chunks or blocks.
:type n: int
"""
args = [iter(iterable)] * n
return zip(*args)

# noinspection PyMissingOrEmptyDocstring,PyUnusedLocal
[docs]class CMV(collections.OrderedDict):
[docs]    def __init__(self, *args, velocity_speed_ratios=None):
super(CMV, self).__init__(*args)
if args and isinstance(args[0], CMV):
if velocity_speed_ratios:
self.convert(velocity_speed_ratios)
else:
velocity_speed_ratios = args[0].velocity_speed_ratios

self.velocity_speed_ratios = velocity_speed_ratios or {}

def __repr__(self):
from pprint import pformat
name = self.__class__.__name__
items = [(k, v) for k, v in self.items()]
vsr = pformat(self.velocity_speed_ratios)
s = '{}({}, velocity_speed_ratios={})'.format(name, items, vsr)
return s.replace('inf', "float('inf')")

[docs]    def fit(self, correct_gear, gears, engine_speeds_out, times, velocities,
accelerations, motive_powers, velocity_speed_ratios, stop_velocity):
from ..mechanical import calculate_gear_box_speeds_in
self.clear()
self.velocity_speed_ratios = velocity_speed_ratios
self.update(_identify_gear_shifting_velocity_limits(
gears, velocities, stop_velocity
))
if dfl.functions.CMV.ENABLE_OPT_LOOP:
from co2mpas.utils import mae
gear_id, velocity_limits = zip(*sorted(self.items())[1:])
max_gear, _inf = gear_id[-1], float('inf')
update, predict = self.update, self.predict

def _update_gvs(vel_limits):
self[0] = (0, vel_limits[0])
self[max_gear] = (vel_limits[-1], _inf)
update(dict(zip(gear_id, _grouper(vel_limits[1:-1], 2))))

def _error_fun(vel_limits):
_update_gvs(vel_limits)

g_pre = predict(
times, velocities, accelerations, motive_powers,
correct_gear=correct_gear
)

speed_pred = calculate_gear_box_speeds_in(
g_pre, velocities, velocity_speed_ratios, stop_velocity
)
return np.float32(mae(speed_pred, engine_speeds_out))

x0 = [self[0][1]].__add__(
list(itertools.chain(*velocity_limits))[:-1]
)
from scipy.optimize import fmin
_update_gvs(fmin(_error_fun, x0, disp=False))

return self

[docs]    def correct_constant_velocity(
self, up_cns_vel=(), up_window=0.0, up_delta=0.0, dn_cns_vel=(),
dn_window=0.0, dn_delta=0.0):
"""
Corrects the gear shifting matrix velocity for constant velocities.

:param up_cns_vel:
Constant velocities to correct the upper limits [km/h].
:type up_cns_vel: tuple[float]

:param up_window:
Window to identify if the shifting matrix has limits close to
up_cns_vel [km/h].
:type up_window: float

:param up_delta:
Delta to add to the limit if this is close to up_cns_vel [km/h].
:type up_delta: float

:param dn_cns_vel:
Constant velocities to correct the bottom limits [km/h].
:type dn_cns_vel: tuple[float]

:param dn_window:
Window to identify if the shifting matrix has limits close to
dn_cns_vel [km/h].
:type dn_window: float

:param dn_delta:
Delta to add to the limit if this is close to dn_cns_vel [km/h].
:type dn_delta: float

:return:
A gear shifting velocity matrix corrected from NEDC velocities.
:rtype: dict
"""

def _set_velocity(velocity, const_steps, window, delta):
for s in const_steps:
if s < velocity < s + window:
return s + delta
return velocity

for k, v in sorted(self.items()):
v = [
_set_velocity(v[0], dn_cns_vel, dn_window, dn_delta),
_set_velocity(v[1], up_cns_vel, up_window, up_delta)
]

if v[0] >= v[1]:
v[0] = v[1] + dn_delta

try:
if self[k - 1][1] <= v[0]:
v[0] = self[k - 1][1] + up_delta
except KeyError:
pass
self[k] = tuple(v)

return self

[docs]    def plot(self):
import matplotlib.pylab as plt
for k, v in self.items():
kv = {}
for (s, l), x in zip((('down', '--'), ('up', '-')), v):
if x < dfl.INF:
kv['label'] = 'Gear %d:%s-shift' % (k, s)
kv['linestyle'] = l
# noinspection PyProtectedMember
kv['color'] = plt.plot([x] * 2, [0, 1], **kv)[0]._color
plt.legend(loc='best')
plt.xlabel('Velocity [km/h]')

def _init_gear(self, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures):

keys = sorted(self.keys())
matrix, c = {}, len(keys) - 1
from co2mpas.utils import List
if isinstance(velocities, List):
for i, k in enumerate(keys):
matrix[k] = self[k], (keys[max(0, i - 1)], keys[min(i + 1, c)])

def _next(gear, index):
v = velocities[index]
(_down, _up), (g0, g1) = matrix[gear]
if v >= _up:
return g1
if v < _down:
return g0
return gear
else:
r = velocities.shape[0]
for i, g in enumerate(keys):
down, up = self[g]
matrix[g] = p = np.tile(g, r)
p[velocities < down] = keys[max(0, i - 1)]
p[velocities >= up] = keys[min(i + 1, c)]

def _next(gear, index):
return matrix[gear][index]
return _next

[docs]    def predict(self, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures=None,
correct_gear=lambda g, *args: g,
gear_filter=define_gear_filter()):
gears = np.zeros_like(times, int)
gen = self.init_gear(
gears, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures, correct_gear
)
for i in range(times.shape[0]):
gears[i] = gen(i)

# if gear_filter is not None:
#    gears[index:times.shape[0]] = gear_filter(times, gears)

return gears

[docs]    def init_gear(self, gears, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures=None,
correct_gear=lambda g, *args: g):

next_g = self._init_gear(
times, velocities, accelerations, motive_powers,
engine_coolant_temperatures
)

valid_gears = np.array(list(getattr(self, 'gears', self)))

def get_valid_gear(g):
if g in valid_gears:
return g
return valid_gears[np.abs(np.subtract(valid_gears, g)).argmin()]

args = (
gears, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures, next_g
)

def _next(i):
if i != 0:
g = gears[i - 1]
return get_valid_gear(correct_gear(next_g(g, i), i, *args))
g = valid_gears.min()
for _ in range(len(valid_gears) + 1):
g0 = g
g = get_valid_gear(correct_gear(next_g(g, i), i, *args))
if g == g0:
break
return g

return _next

[docs]    def init_speed(self, stop_velocity, gears, velocities, *args, **kwargs):
vsr = self.velocity_speed_ratios.get

def _next(i):
v = velocities[i]
r = v > stop_velocity and vsr(gears[i], 0)
return v / r if r else 0

return _next

# noinspection PyPep8Naming
[docs]    def convert(self, velocity_speed_ratios):
if velocity_speed_ratios != self.velocity_speed_ratios:
vsr, n_vsr = self.velocity_speed_ratios, velocity_speed_ratios
new_self = _convert_limits(self, vsr, n_vsr)
reverse = np.diff(list(self))[0] < 0
self.clear()
for k, lu in sorted(new_self.items(), reverse=reverse):
self[k] = lu

self.velocity_speed_ratios = n_vsr

return self

[docs]@sh.add_function(dsp, outputs=['CMV'])
def calibrate_gear_shifting_cmv(
correct_gear, gears, engine_speeds_out, times, velocities,
accelerations, motive_powers, velocity_speed_ratios, stop_velocity):
"""
Calibrates a corrected matrix velocity to predict gears.

:param correct_gear:
A function to correct the predicted gear.
:type correct_gear: callable

:param gears:
Gear vector [-].
:type gears: numpy.array

:param engine_speeds_out:
Engine speed vector [RPM].
:type engine_speeds_out: numpy.array

:param times:
Time vector [s].
:type times: numpy.array

:param velocities:
Vehicle velocity [km/h].
:type velocities: numpy.array

:param accelerations:
Vehicle acceleration [m/s2].
:type accelerations: numpy.array

:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array

:param velocity_speed_ratios:
Constant velocity speed ratios of the gear box [km/(h*RPM)].
:type velocity_speed_ratios: dict[int | float]

:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float

:returns:
A corrected matrix velocity to predict gears.
:rtype: dict
"""

cmv = CMV().fit(
correct_gear, gears, engine_speeds_out, times, velocities,
accelerations, motive_powers, velocity_speed_ratios, stop_velocity
)

return cmv

# predict gears with corrected matrix velocity
dsp.add_function(
function=prediction_gears_gsm,
inputs=['correct_gear', 'gear_filter', 'CMV', 'times', 'velocities',
'accelerations', 'motive_powers', 'cycle_type',
'velocity_speed_ratios'],
outputs=['gears']
)