from collections import OrderedDict
import os
import pickle
from scipy.integrate import ode
from scipy.signal import StateSpace
import sympy as sp
import numpy as np
from .simulation_modules import (
Model, Solver, SolverException, Trajectory, TrajectoryException, Controller,
Feedforward, SignalMixer, ModelMixer, ObserverMixer, Limiter, Sensor,
Disturbance
)
from .controltools import calc_prefilter, place_siso
__all__ = ["LinearStateSpaceModel", "ODEInt", "ModelInputLimiter",
"Setpoint", "HarmonicTrajectory", "SmoothTransition",
"PIDController", "LinearStateSpaceController",
"DeadTimeSensor", "GaussianNoise",
"AdditiveMixer"]
"""
Ready to go implementations of simulation modules.
"""
[docs]class LinearStateSpaceModel(Model):
"""
The state space model of a linear system.
The parameters of this model can be provided in form of a file whose path is
given by the setting ``config file`` .
This path should point to a pickled dict holding the following keys:
`system`:
An Instance of :py:class:`scipy.signal.StateSpace` (from scipy)
representing the system,
`op_inputs`:
An array-like object holding the operational point's inputs,
`op_outputs`:
An array-like object holding the operational point's outputs.
"""
public_settings = OrderedDict([
("config file", None),
("initial state", None),
("initial output", None),
])
def __init__(self, settings):
file = settings["config file"]
assert os.path.isfile(file)
with open(file, "rb") as f:
data = pickle.load(f)
if "system" not in data:
raise ValueError("Config file lacks mandatory settings.")
self.ss = data["system"]
# no feedthrough possible
np.testing.assert_array_equal(self.ss.D, np.zeros_like(self.ss.D))
settings["state_count"] = self.ss.B.shape[0]
settings["input_count"] = self.ss.B.shape[1]
self.input_offset = data.get("op_inputs",
np.zeros((self.ss.B.shape[1], )))
if len(self.input_offset) != self.ss.B.shape[1]:
raise ValueError("Provided input offset does not match input "
"dimensions.")
self.output_offset = data.get("op_outputs",
np.zeros((self.ss.C.shape[0], )))
if len(self.output_offset) != self.ss.C.shape[0]:
raise ValueError("Length of provided output offset does not match "
"output dimensions ({} != {}).".format(
len(self.output_offset),
self.ss.C.shape[0]
))
if settings["initial state"] is None:
if settings["initial output"] is None:
raise ValueError("Neither 'initial state' nor 'initial output'"
"given.")
settings["initial state"] = \
np.linalg.pinv(self.ss.C) @ (settings["initial output"]
- self.output_offset)
super().__init__(settings)
[docs] def state_function(self, t, x, args):
return np.squeeze(self.ss.A @ x + self.ss.B @ (
args[0] - self.input_offset))
def calc_output(self, input_vector):
return (self.ss.C @ input_vector
+ self.output_offset)
[docs]class ODEInt(Solver):
"""
Wrapper for ode_int from Scipy project
"""
public_settings = OrderedDict([
("Mode", "vode"),
("Method", "adams"),
("measure rate", 500),
("step size", 1e-3),
("rTol", 1e-6),
("aTol", 1e-9),
("start time", 0),
("end time", 5)
])
def __init__(self, settings):
Solver.__init__(self, settings)
# setup solver
if hasattr(self._model, "jacobian"):
self._solver = ode(self._model.state_function,
jac=self._model.jacobian)
else:
self._solver = ode(self._model.state_function)
self._solver.set_integrator(self._settings["Mode"],
method=self._settings["Method"],
rtol=self._settings["rTol"],
atol=self._settings["aTol"],
max_step=self._settings["step size"]
)
self._solver.set_initial_value(np.atleast_1d(self._model.initial_state),
t=self._settings["start time"])
@property
def t(self):
return self._solver.t
@property
def successful(self):
return self._solver.successful()
[docs] def integrate(self, t):
"""
Integrate until target step is reached.
:param t: target time
:return: system state at target time
"""
state = self._solver.integrate(t + self._settings["step size"])
# check model constraints
new_state = self._model.root_function(state)
if new_state[0]:
# reset solver since discontinuous change in equations happened
self._solver.set_initial_value(new_state[1], self.t)
if not self._solver.successful():
raise SolverException("Integration has not been successful.")
return state
[docs]class SmoothTransition(Trajectory):
"""
provides (differential) smooth transition between two scalar states
"""
# TODO enable generation of transitions for state vector
public_settings = {"states": [0, 1],
"start time": 0,
"delta t": 5,
}
def __init__(self, settings):
Trajectory.__init__(self, settings)
# setup symbolic expressions
tau, k = sp.symbols('tau, k')
gamma = self._settings["differential_order"] + 1
alpha = sp.factorial(2 * gamma + 1)
f = sp.binomial(gamma, k) * (-1) ** k * tau ** (gamma + k + 1) / (gamma + k + 1)
phi = alpha / sp.factorial(gamma) ** 2 * sp.summation(f, (k, 0, gamma))
# differentiate phi(tau), index in list corresponds to order
dphi_sym = [phi] # init with phi(tau)
for order in range(self._settings["differential_order"]):
dphi_sym.append(dphi_sym[-1].diff(tau))
# lambdify
self.dphi_num = []
for der in dphi_sym:
self.dphi_num.append(sp.lambdify(tau, der, 'numpy'))
def _desired_values(self, t):
"""
Calculates desired trajectory
"""
y = np.zeros((len(self.dphi_num),))
yd = self._settings['states']
t0 = self._settings['start time']
dt = self._settings['delta t']
if t < t0:
y[0] = yd[0]
elif t > t0 + dt:
y[0] = yd[1]
else:
for order, dphi in enumerate(self.dphi_num):
if order == 0:
ya = yd[0]
else:
ya = 0
y[order] = ya + (yd[1] - yd[0]) * dphi((t - t0) / dt) * 1 / dt ** order
return y
[docs]class HarmonicTrajectory(Trajectory):
"""
This generator provides a scalar harmonic sinus signal
with derivatives up to order n
"""
public_settings = OrderedDict([("Amplitude", 0.25),
("Frequency", 0.1),
("Offset", 0.75),
("Phase in degree", 0)])
def __init__(self, settings):
Trajectory.__init__(self, settings)
# calculate symbolic derivatives up to order n
t, a, f, off, p = sp.symbols("t, a, f, off, p")
self.yd_sym = []
harmonic = a * sp.sin(2 * sp.pi * f * t + p) + off
for idx in range(settings["differential_order"] + 1):
self.yd_sym.append(harmonic.diff(t, idx))
# lambdify
for idx, val in enumerate(self.yd_sym):
self.yd_sym[idx] = sp.lambdify((t, a, f, off, p), val, "numpy")
def _desired_values(self, t):
# yd = []
yd = np.zeros((self._settings['differential_order'] + 1), )
a = self._settings['Amplitude']
f = self._settings['Frequency']
off = self._settings["Offset"]
p = self._settings["Phase in degree"] * np.pi / 180
for idx, val in enumerate(self.yd_sym):
yd[idx] = val(t, a, f, off, p)
# yd.append(val(t, a, f, off, p))
return yd
[docs]class Setpoint(Trajectory):
"""
Provides setpoints for every output component.
If the output is not scalar, just add more entries to the list.
By querying the differential order from the controller (if available) the
required derivatives are given.
Note:
Keep in mind that while this class provides zeros for all derivatives
of the desired value, they actually strive to infinity for :math:`t=0` .
"""
public_settings = OrderedDict([("Setpoint", [0])])
def __init__(self, settings):
Trajectory.__init__(self, settings)
self.yd = np.zeros((len(self._settings["Setpoint"]),
self._settings["differential_order"] + 1))
for idx, val in enumerate(self._settings["Setpoint"]):
self.yd[idx, 0] = val
def _desired_values(self, t):
return self.yd
[docs]class LinearStateSpaceController(Controller):
"""
A controller that is based on a state space model of a linear system.
This controller needs a linear statespace model, just as the
:py:class:`LinearStateSpaceModel` . The file provided in ``config file``
should therefore contain a dict holding the entries: ``model``,
``op_inputs`` and ``op_outputs`` .
If poles is given (differing from `None` ) the state-feedback will be
computed using :py:func:`pymoskito.place_siso` .
Furthermore an appropriate prefilter is calculated, which establishes
stationary attainment of the desired output values.
Note:
If a SIMO or MIMO system is given, the control_ package as well as the
slycot_ package are needed the perform the pole placement.
.. _control: https://github.com/python-control/python-control
.. _slycot: https://github.com/python-control/Slycot
"""
public_settings = OrderedDict([
("input source", "system_state"),
("config file", None),
("poles", None),
])
def __init__(self, settings):
file = settings["config file"]
assert os.path.isfile(file)
with open(file, "rb") as f:
data = pickle.load(f)
if "system" not in data:
raise ValueError("Config file lacks mandatory settings.")
self.ss = data["system"]
self.input_offset = data.get("op_inputs", None)
self.output_offset = data.get("op_outputs", None)
if self.input_offset is None:
self.input_offset = np.zeros((self.ss.B.shape[1], ))
if len(self.input_offset) != self.ss.B.shape[1]:
raise ValueError("Provided input offset does not match input "
"dimensions.")
if self.output_offset is None:
self.output_offset = np.zeros((self.ss.C.shape[0], ))
if len(self.output_offset) != self.ss.C.shape[0]:
raise ValueError("Length of provided output offset does not match "
"output dimensions ({} != {}).".format(
len(self.output_offset),
self.ss.C.shape[0]
))
# add specific "private" settings
settings.update(input_order=0)
settings.update(output_dim=self.ss.C.shape[0])
settings.update(input_type=settings["input source"])
super().__init__(settings)
if settings.get("poles", None) is None:
# pretty useless but hey why not.
self.feedback = np.zeros((self.ss.B.shape[1], self.ss.A.shape[0]))
else:
if self.ss.B.shape[1] == 1:
# save the control/slycot dependency
self.feedback = place_siso(self.ss.A,
self.ss.B,
self.settings["poles"])
else:
import control
self.feedback = control.place(self.ss.A,
self.ss.B,
self.settings["poles"])
self.prefilter = calc_prefilter(self.ss.A, self.ss.B, self.ss.C,
self.feedback)
def _control(self, time, trajectory_values=None, feedforward_values=None,
input_values=None, **kwargs):
return (-self.feedback @ input_values
+ self.prefilter @ (trajectory_values[:, 0] -
self.output_offset)
+ self.input_offset)
[docs]class PIDController(Controller):
"""
PID Controller
"""
public_settings = OrderedDict([("Kp", 700),
("Ki", 500),
("Kd", 200),
("output_limits", [0, 255]),
("input_state", [2]),
("tick divider", 1)])
last_time = 0
def __init__(self, settings):
# add specific "private" settings
settings.update(input_order=0)
settings.update(output_dim=len(self.public_settings["input_state"]))
settings.update(input_type="system_state")
Controller.__init__(self, settings)
# define variables for data saving in the right dimension
self.e_old = np.zeros((len(self._settings["input_state"]), )) # column vector
self.integral_old = np.zeros((len(self._settings["input_state"]), )) # column vector
self.last_u = np.zeros((len(self._settings["input_state"]), )) # column vector
self.output = np.zeros((len(self._settings["input_state"]), )) # column vector
def _control(self, time, trajectory_values=None, feedforward_values=None, input_values=None, **kwargs):
# input abbreviations
x = np.zeros((len(self._settings["input_state"]), ))
for idx, state in enumerate(self._settings["input_state"]):
x[idx] = input_values[int(state)]
yd = trajectory_values
# step size
dt = time - self.last_time
# save current time
self.last_time = time
if dt != 0:
for i in range(len(x)):
# error
e = yd[i] - x[i]
integral = e * dt + self.integral_old[i]
if integral > self._settings["output_limits"][1]:
integral = self._settings["output_limits"][1]
elif integral < self._settings["output_limits"][0]:
integral = self._settings["output_limits"][0]
differential = (e - self.e_old[i]) / dt
self.output[i] = (self._settings["Kp"] * e
+ self._settings["Ki"] * integral
+ self._settings["Kd"] * differential)
if self.output[i] > self._settings["output_limits"][1]:
self.output[i] = self._settings["output_limits"][1]
elif self.output[i] < self._settings["output_limits"][0]:
self.output[i] = self._settings["output_limits"][0]
# save data for new calculation
self.e_old[i] = e
self.integral_old[i] = integral
u = self.output
else:
u = self.last_u
self.last_u = u
return u
[docs]class AdditiveMixer(SignalMixer):
"""
Signal Mixer that accumulates all input signals.
Processing is done according to rules of numpy broadcasting.
"""
public_settings = OrderedDict([("Input A", None),
("Input B", None)])
def __init__(self, settings):
settings.update([("input signals", [settings["Input A"],
settings["Input B"]])])
SignalMixer.__init__(self, settings)
def _mix(self, signal_values):
return sum(signal_values)
[docs]class DeadTimeSensor(Sensor):
"""
Sensor that adds a measurement delay on chosen states
"""
public_settings = OrderedDict([("states to delay", [0]),
("delay", 1)])
def __init__(self, settings):
settings.update([("input signal", "system_state")])
Sensor.__init__(self, settings)
self._storage = None
def _measure(self, value):
if self._storage is None:
# create storage with length "delay"
# initial values are the first input
self._storage = [value]*int(self._settings["delay"])
# save current values
measurement = value.copy()
# add new measurement
self._storage.append(value)
# get delayed measurements
delayed_measurement = self._storage.pop(0)
# replace current values with delayed values, if it is chosen
for i in self._settings["states to delay"]:
measurement[i] = delayed_measurement[i]
return measurement
[docs]class GaussianNoise(Disturbance):
"""
Noise generator for gaussian noise
"""
public_settings = OrderedDict([("sigma", 1),
("mean", 0)])
def __init__(self, settings):
settings.update([("input signal", "Sensor")])
Disturbance.__init__(self, settings)
def _disturb(self, value):
return np.random.normal(self._settings['mean'],
self._settings['sigma'],
value.output_dim)