# Source code for co2mpas.core.model.physical.vehicle

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and dsp model to model the mechanic of the vehicle.
"""
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl

dsp = sh.BlueDispatcher(
name='Vehicle free body diagram',
description='Calculates forces and power acting on the vehicle.'
)

def calculate_velocities(times, obd_velocities):
"""
Filters the obd velocities [km/h].

:param times:
Time vector [s].
:type times: numpy.array

:param obd_velocities:
OBD velocity vector [km/h].
:type obd_velocities: numpy.array

:return:
Velocity vector [km/h].
:rtype: numpy.array
"""
from co2mpas.utils import median_filter
dt_window = dfl.functions.calculate_velocities.dt_window
return median_filter(times, obd_velocities, dt_window, np.mean)

def _integral(x, y, y0=.0):
from scipy.interpolate import InterpolatedUnivariateSpline as Spl
return Spl(x, y).antiderivative()(x) + y0

def calculate_distances(times, velocities):
"""
Calculates the cumulative distance vector [m].

:param times:
Time vector [s].
:type times: numpy.array

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array

:return:
Cumulative distance vector [m].
:rtype: numpy.array
"""
d = _integral(times, velocities / 3.6, 0)
d[1:] = np.cumsum(np.maximum(0, np.diff(d))) + d[0]
return d

dsp, inputs_kwargs=True, inputs_defaults=True, outputs=['velocities']
)
def calculate_velocities_v1(times, accelerations, initial_velocity=.0):
"""
Calculates the velocity from acceleration time series [km/h].

:param times:
Time vector [s].
:type times: numpy.array

:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array

:param initial_velocity:
Initial velocity [km/h].
:type initial_velocity: float

:return:
Velocity vector [km/h].
:rtype: numpy.array
"""
vel = _integral(times, accelerations, initial_velocity / 3.6) * 3.6
return np.maximum(0, vel)

def calculate_accelerations(times, velocities):
"""
Calculates the acceleration from velocity time series [m/s2].

:param times:
Time vector [s].
:type times: numpy.array

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array

:return:
Acceleration vector [m/s2].
:rtype: numpy.array
"""
from scipy.interpolate import InterpolatedUnivariateSpline as Spl
acc = Spl(times, velocities / 3.6).derivative()(times)
b = (velocities[:-1] == 0) & (velocities[1:] == velocities[:-1])
acc[:-1][b] = 0
if b[-1]:
acc[-1] = 0
return acc

def calculate_aerodynamic_resistances(f2, velocities):
"""
Calculates the aerodynamic resistances of the vehicle [N].

:param f2:
As used in the dyno and defined by respective guidelines [N/(km/h)^2].
:type f2: float

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array | float

:return:
Aerodynamic resistance vector [N].
:rtype: numpy.array | float
"""

return f2 * velocities ** 2

def calculate_passengers_mass(n_passengers, passenger_mass):
"""
Calculate passengers mass including driver [kg].

:param n_passengers:
Number of passengers including driver [-].
:type n_passengers: int

:param passenger_mass:
Average passenger mass [kg].
:type passenger_mass: float

:return:
Passengers mass including the driver [kg].
:rtype: float
"""

return passenger_mass * n_passengers

"""
Calculate unladen mass [kg].

:param curb_mass:
Curb mass [kg].
:type curb_mass: float

:param fuel_mass:
Fuel mass [kg].
:type fuel_mass: float

:return:
:rtype: float
"""

return curb_mass + fuel_mass

def calculate_unladen_mass_v1(vehicle_mass, passengers_mass, cargo_mass):
"""
Calculate unladen mass [kg].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param passengers_mass:
Passengers mass including the driver [kg].
:type passengers_mass: float

:param cargo_mass:
Cargo mass [kg].
:type cargo_mass: float

:return:
:rtype: float
"""

return vehicle_mass - passengers_mass - cargo_mass

"""
Calculate unladen mass [kg].

:param fuel_mass:
Fuel mass [kg].
:type fuel_mass: float

:return:
Curb mass [kg].
:rtype: float
"""

return unladen_mass - fuel_mass

def calculate_vehicle_mass(unladen_mass, passengers_mass, cargo_mass):
"""
Calculate vehicle_mass [kg].

:param passengers_mass:
Passengers mass including the driver [kg].
:type passengers_mass: float

:param cargo_mass:
Cargo mass [kg].
:type cargo_mass: float

:return:
Vehicle mass [kg].
:rtype: float
"""

return unladen_mass + passengers_mass + cargo_mass

def calculate_raw_frontal_area(vehicle_height, vehicle_width):
"""
Calculates raw frontal area of the vehicle [m2].

:param vehicle_height:
Vehicle height [m].
:type vehicle_height: float

:param vehicle_width:
Vehicle width [m].
:type vehicle_width: float

:return:
Raw frontal area of the vehicle [m2].
:rtype: float
"""
return vehicle_height * vehicle_width

def calculate_raw_frontal_area_v1(vehicle_mass, vehicle_category):
"""
Calculates raw frontal area of the vehicle [m2].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param vehicle_category:
Vehicle category (i.e., A, B, C, D, E, F, S, M, and J).
:type vehicle_category: str

:return:
Raw frontal area of the vehicle [m2].
:rtype: float
"""
from asteval import Interpreter as Interp
d = dfl.functions.calculate_raw_frontal_area_v1
expr = d.formulas[vehicle_category.upper()]
return Interp(dict(vehicle_mass=vehicle_mass)).eval(expr)

def calculate_frontal_area(raw_frontal_area):
"""
Calculates the vehicle frontal area [m2].

:param raw_frontal_area:
Raw frontal area of the vehicle [m2].
:type raw_frontal_area: float

:return:
Frontal area of the vehicle [m2].
:rtype: float
"""
d = dfl.functions.calculate_frontal_area.projection_factor
return raw_frontal_area * d

def calculate_aerodynamic_drag_coefficient(vehicle_category):
"""
Calculates the aerodynamic drag coefficient [-].

:param vehicle_category:
Vehicle category (i.e., A, B, C, D, E, F, S, M, and J).
:type vehicle_category: str

:return:
Aerodynamic drag coefficient [-].
:rtype: float
"""
d = dfl.functions.calculate_aerodynamic_drag_coefficient
return d.cw[vehicle_category.upper()]

def calculate_aerodynamic_drag_coefficient_v1(vehicle_body):
"""
Calculates the aerodynamic drag coefficient [-].

:param vehicle_body:
Vehicle body (i.e., cabriolet, sedan, hatchback, stationwagon,
suv/crossover, mpv, coupé, bus, bestelwagen, pick-up).
:type vehicle_body: str

:return:
Aerodynamic drag coefficient [-].
:rtype: float
"""
d = dfl.functions.calculate_aerodynamic_drag_coefficient_v1
return d.cw[vehicle_body.lower()]

def calculate_air_density(air_temperature, atmospheric_pressure):
"""
Calculates the air density [kg/m3].

:param air_temperature:
Air temperature [°C].
:type air_temperature: float

:param atmospheric_pressure:
Atmospheric pressure [kPa].
:type atmospheric_pressure: float

:return:
Air density [kg/m3].
:rtype: float
"""
return 3.48 * atmospheric_pressure / (273.16 + air_temperature)

def calculate_f2(
air_density, aerodynamic_drag_coefficient, frontal_area, has_roof_box):
"""
Calculates the f2 coefficient [N/(km/h)^2].

:param air_density:
Air density [kg/m3].
:type air_density: float

:param aerodynamic_drag_coefficient:
Aerodynamic drag coefficient [-].
:type aerodynamic_drag_coefficient: float

:param frontal_area:
Frontal area of the vehicle [m2].
:type frontal_area: float

:param has_roof_box:
Has the vehicle a roof box? [-].
:type has_roof_box: bool

:return:
As used in the dyno and defined by respective guidelines [N/(km/h)^2].
:rtype: float
"""

c = aerodynamic_drag_coefficient * frontal_area * air_density

if has_roof_box:
c *= dfl.functions.calculate_f2.roof_box

return 0.5 * c / 3.6 ** 2

"""
Returns the default static friction coefficient [-].

:param tyre_state:
Tyre state (i.e., new or worm).
:type tyre_state: str

Road state (i.e., dry, wet, rainfall, puddles, ice).

:return:
Static friction coefficient [-].
:rtype: float
"""
coeff = dfl.functions.default_static_friction.coeff

def default_n_wheel(n_wheel_drive):
"""
Returns the default total number of wheels [-].

:param n_wheel_drive:
Number of wheel drive [-].
:type n_wheel_drive: int

:return:
Total number of wheels [-].
:rtype: int
"""
return max(n_wheel_drive, dfl.functions.default_n_wheel.n_wheel)

"""
Calculate the repartition of the load on wheel drive axles [-].

:param n_wheel_drive:
Number of wheel drive [-].
:type n_wheel_drive: int

:param n_wheel:
Total number of wheels [-].
:type n_wheel: int

:return:
Repartition of the load on wheel drive axles [-].
:rtype: float
"""
return n_wheel_drive / n_wheel

def _compile_traction_acceleration_limits(
deceleration = -9.81 * static_friction
acceleration = -deceleration * wheel_drive_load_fraction

def _func(angle_slopes):
slope = np.cos(angle_slopes)
return deceleration * slope, acceleration * slope

return _func

dsp, outputs=['traction_deceleration_limit', 'traction_acceleration_limit']
)
def calculate_traction_acceleration_limits(
"""
Calculates the traction acceleration limits [m/s2].

:param static_friction:
Static friction coefficient [-].
:type static_friction: float

Repartition of the load on wheel drive axles [-].

:param angle_slopes:
Angle slope vector [rad].
:type angle_slopes: numpy.array

:return:
Traction acceleration limits (i.e., deceleration, acceleration) [m/s2].
:rtype: tuple[numpy.array]
"""
return _compile_traction_acceleration_limits(
)(angle_slopes)

def calculate_rolling_resistance_coeff(tyre_class, tyre_category):
"""
Calculates the rolling resistance coefficient [-].

:param tyre_class:
Tyre class (i.e., C1, C2, and C3).
:type tyre_class: str

:param tyre_category:
Tyre category (i.e., A, B, C, D, E, F, and G).
:type tyre_category: str

:return:
Rolling resistance coefficient [-].
:rtype: float
"""
coeff = dfl.functions.calculate_rolling_resistance_coeff.coeff
return coeff[tyre_class.upper()][tyre_category.upper()]

def calculate_f0(vehicle_mass, rolling_resistance_coeff):
"""
Calculates rolling resistance [N].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param rolling_resistance_coeff:
Rolling resistance coefficient [-].
:type rolling_resistance_coeff: float

:return:
Rolling resistance force [N] when angle_slope == 0.
:rtype: float
"""

return vehicle_mass * 9.81 * rolling_resistance_coeff

def calculate_f1(f2):
"""

:param f2:
As used in the dyno and defined by respective guidelines [N/(km/h)^2].
:type f2: float

:return:
Defined by dyno procedure [N/(km/h)].
:rtype: float
"""

q, m = dfl.functions.calculate_f1.qm
return m * f2 + q

dsp, inputs=['path_distances', 'path_elevations'], outputs=['slope_model']
)
def define_slope_model(distances, elevations):
"""
Returns the angle slope model [rad].

:param distances:
Cumulative distance vector [m].
:type distances: numpy.array

:param elevations:
Elevation vector [m].
:type elevations: numpy.array

:return:
Angle slope model [rad].
:rtype: function
"""
from scipy.interpolate import InterpolatedUnivariateSpline as Spl
i = np.append([0], np.where(np.diff(distances) > dfl.EPS)[0] + 1)
func = Spl(distances[i], elevations[i]).derivative()
return lambda d: np.arctan(func(d))

def define_slope_model_v1(angle_slope):
"""
Returns the angle slope model [rad].

:param angle_slope:
:type angle_slope: float

:return:
Angle slope model [rad].
:rtype: function
"""
return np.vectorize(lambda *args: angle_slope, otypes=[float])

def calculate_angle_slopes(slope_model, distances):
"""
Returns the angle slope vector [rad].

:param slope_model:
Angle slope model [rad].
:type slope_model: function

:param distances:
Cumulative distance vector [m].
:type distances: numpy.array

:return:
Angle slope vector [rad].
:rtype: numpy.array
"""
return slope_model(distances)

def calculate_rolling_resistance(f0, angle_slopes):
"""
Calculates rolling resistance [N].

:param f0:
Rolling resistance force [N] when angle_slope == 0.
:type f0: float

:param angle_slopes:
Angle slope vector [rad].
:type angle_slopes: numpy.array

:return:
Rolling resistance force [N].
:rtype: numpy.array
"""
return f0 * np.cos(angle_slopes)

def calculate_velocity_resistances(f1, velocities):
"""
Calculates forces function of velocity [N].

:param f1:
Defined by dyno procedure [N/(km/h)].
:type f1: float

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array | float

:return:
Forces function of velocity [N].
:rtype: numpy.array | float
"""

return f1 * velocities

def calculate_climbing_force(vehicle_mass, angle_slopes):
"""
Calculates the vehicle climbing resistance [N].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param angle_slopes:
Angle slope vector [rad].
:type angle_slopes: numpy.array

:return:
Vehicle climbing resistance [N].
:rtype: numpy.array
"""
return vehicle_mass * 9.81 * np.sin(angle_slopes)

def select_default_n_dyno_axes(cycle_type, n_wheel_drive):
"""
Selects the default number of dyno axes[-].

:param cycle_type:
Cycle type (WLTP or NEDC).
:type cycle_type: str

:param n_wheel_drive:
Number of wheel drive [-].
:type n_wheel_drive: int

:return:
Number of dyno axes [-].
:rtype: int
"""
par = dfl.functions.select_default_n_dyno_axes

try:
return par.DYNO_AXES[cycle_type.upper()][n_wheel_drive]
except KeyError:
return n_wheel_drive // 2

def select_inertial_factor(n_dyno_axes):
"""
Selects the inertia factor [%] according to the number of dyno axes.

:param n_dyno_axes:
Number of dyno axes [-].
:type n_dyno_axes: int

:return:
Factor that considers the rotational inertia [%].
:rtype: float
"""

return 1.5 * n_dyno_axes

def calculate_rotational_inertia_forces(
vehicle_mass, inertial_factor, accelerations):
"""
Calculate rotational inertia forces [N].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param inertial_factor:
Factor that considers the rotational inertia [%].
:type inertial_factor: float

:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array | float

:return:
Rotational inertia forces [N].
:rtype: numpy.array | float
"""

return vehicle_mass * inertial_factor * accelerations / 100

def calculate_accelerations_v1(
vehicle_mass, inertial_factor, motive_forces, climbing_force,
aerodynamic_resistances, rolling_resistance, velocity_resistances):
"""
Calculates the acceleration from motive forces [m/s2].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param inertial_factor:
Factor that considers the rotational inertia [%].
:type inertial_factor: float

:param motive_forces:
Motive forces [N].
:type motive_forces: numpy.array | float

:param climbing_force:
Vehicle climbing resistance [N].
:type climbing_force: float | numpy.array

:param aerodynamic_resistances:
Aerodynamic resistance vector [N].
:type aerodynamic_resistances: numpy.array | float

:param rolling_resistance:
Rolling resistance force [N].
:type rolling_resistance: float | numpy.array

:param velocity_resistances:
Forces function of velocity [N].
:type velocity_resistances: numpy.array | float

:return:
Acceleration vector [m/s2].
:rtype: numpy.array
"""
acc = motive_forces - climbing_force - aerodynamic_resistances
acc -= rolling_resistance + velocity_resistances
acc /= vehicle_mass * (1 + inertial_factor / 100)
return acc

# noinspection PyPep8Naming
def calculate_motive_forces(
vehicle_mass, accelerations, climbing_force, aerodynamic_resistances,
rolling_resistance, velocity_resistances, rotational_inertia_forces):
"""
Calculate motive forces [N].

:param vehicle_mass:
Vehicle mass [kg].
:type vehicle_mass: float

:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array | float

:param climbing_force:
Vehicle climbing resistance [N].
:type climbing_force: float | numpy.array

:param rolling_resistance:
Rolling resistance force [N].
:type rolling_resistance: float | numpy.array

:param aerodynamic_resistances:
Aerodynamic resistance vector [N].
:type aerodynamic_resistances: numpy.array | float

:param velocity_resistances:
Forces function of velocity [N].
:type velocity_resistances: numpy.array | float

:param rotational_inertia_forces:
Rotational inertia forces [N].
:type rotational_inertia_forces: numpy.array | float

:return:
Motive forces [N].
:rtype: numpy.array | float
"""

# namespace shortcuts
Frr = rolling_resistance
Faero = aerodynamic_resistances
Fclimb = climbing_force
Fvel = velocity_resistances
Finertia = rotational_inertia_forces

return vehicle_mass * accelerations + Fclimb + Frr + Faero + Fvel + Finertia

def calculate_motive_powers(motive_forces, velocities):
"""
Calculates motive power [kW].

:param motive_forces:
Motive forces [N].
:type motive_forces: numpy.array | float

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array | float

:return:
Motive power [kW].
:rtype: numpy.array | float
"""

return motive_forces * velocities / 3600

def calculate_motive_forces_v1(motive_powers, velocities):
"""
Calculate motive forces [N].

:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array | float

:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array | float

:return:
Motive forces [N].
:rtype: numpy.array | float
"""

return motive_powers / velocities * 3600

)

def apply_f0_correction(f0_uncorrected, correct_f0):
"""
Corrects the rolling resistance force [N] if a different preconditioning
cycle was used for WLTP (WLTP precon) and NEDC (NEDC precon).

:param f0_uncorrected:
Uncorrected rolling resistance force [N] when angle_slope == 0.
:type f0_uncorrected: float

:param correct_f0:
A different preconditioning cycle was used for WLTP and NEDC?
:type correct_f0: bool

:return:
Rolling resistance force [N] when angle_slope == 0.
:rtype: float
"""

if correct_f0:
return f0_uncorrected - 6.0
return f0_uncorrected