Source code for pymoskito.simulation_core

# -*- coding: utf-8 -*-

import logging
import sys

import numpy as np
from copy import deepcopy
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot

from .simulation_modules import SimulationException

[docs]class SimulationStateChange(object): """ Object that is emitted when Simulator changes its state. Keyword Args: type: Keyword describing the state change, can be one of the following * `init` Initialisation * `start` : Start of Simulation * `time` : Accomplishment of new progress step * `finish` : Finish of Simulation * `abort` : Abortion of Simulation data: Data that is emitted on state change. info: Further information. """ def __init__(self, **kwargs): assert "type" in kwargs.keys() for key, val in kwargs.items(): setattr(self, key, val)
class SimulationSettings(object): def __init__(self, start_time, end_time, step_size, measure_rate): self.start_time = start_time self.end_time = end_time self.step_size = step_size self.measure_rate = measure_rate def to_dict(self): return { "start time": self.start_time, "end time": self.end_time, "step size": self.step_size, "measure rate": self.measure_rate, }
[docs]class Simulator(QObject): """ This Class executes the time-step integration. It forms the Core of the physical simulation and interacts with the GUI via the :py:class:''SimulationInterface` Calculated values will be stored every 1 / measure rate seconds. """ work_done = pyqtSignal() state_changed = pyqtSignal(SimulationStateChange) # list of modules that have to appear in every run static_module_list = [ "Model", "Solver" ] # list of modules that might not always appear but have to be calculated # in a special order _dynamic_module_list = [ "Disturbance", "Sensor", "ObserverMixer", "Observer", "Trajectory", "Feedforward", "Controller", "ModelMixer", "Limiter", ] module_list = static_module_list + _dynamic_module_list def __init__(self, settings, modules): QObject.__init__(self, None) self._run = False self._logger = logging.getLogger(self.__class__.__name__) assert isinstance(settings, SimulationSettings) self._settings = settings assert isinstance(modules, dict) self._simulation_modules = modules self._init_states() self._init_settings() self.updated_time = 0 self._storage = dict() def _init_states(self): self._input_vector = {} self._counter = {} self._current_outputs = {} self._current_outputs.update(time=self._settings.start_time) for mod_name, obj in self._simulation_modules.items(): self._counter.update({mod_name: obj.tick_divider}) self._current_outputs.update({mod_name: []}) self._current_outputs[mod_name] = [] # init model output with current state self._simulation_modules["Solver"].next_output = np.array( self._simulation_modules["Model"].initial_state) def _init_settings(self): """ Initialize module settings that depend on other modules. """ # calculate the correct step width for every block for mod_name, obj in self._simulation_modules.items(): obj.step_width = obj.tick_divider * self._settings.step_size return def _calc_module(self, module_name): """ Calculates the output of a simulation module """ if module_name in self._simulation_modules.keys(): if self._counter[module_name] == \ self._simulation_modules[module_name].tick_divider: self._current_outputs[module_name] = np.atleast_1d( self._simulation_modules[module_name].calc_output( self._input_vector)) self._counter[module_name] = 1 else: self._counter[module_name] += 1 # update input vector self._input_vector.update( {module_name: self._current_outputs[module_name]}) def _calc_step(self): """ Calculate one step in simulation. Warn: Due to the observers need for the last system input, the values of the last step are kept in the input vector until they are overridden. Be careful about which value is needed at which place or otherwise you end up using a value from the last step. """ # update time and current state self._current_outputs["time"] = self._simulation_modules["Solver"].t self._input_vector.update( time=self._current_outputs["time"], system_state=np.atleast_1d( self._simulation_modules["Solver"].next_output) ) # apply new output self._current_outputs["Model"] = np.atleast_1d( self._simulation_modules["Model"].calc_output( self._input_vector["system_state"])) self._input_vector.update(system_output=self._current_outputs["Model"]) # compute all dynamic modules for mod in self._dynamic_module_list: self._calc_module(mod) # integrate model self._choose_system_input(self._input_vector) self._calc_module("Solver") if 0: # calculate system state changes self._current_outputs["State_Changes"] = \ self._simulation_modules["Model"]\ .state_function(self._current_outputs["time"], self._current_outputs["Solver"], self._current_outputs["ModelMixer"]) return def _choose_system_input(self, input_vector): """ This is mainly done for convenience. """ if "Limiter" in input_vector: _input = input_vector["Limiter"] elif "ModelMixer" in input_vector: _input = input_vector["ModelMixer"] elif "Controller" in input_vector: if "Feedforward" in input_vector: raise SimulationException( "Controller and Feedforward present but no" "ModelMixer. Ambiguous Situation") _input = input_vector["Controller"] elif "Feedforward" in input_vector: _input = input_vector["Feedforward"] else: raise SimulationException("No system input given.") self._input_vector["system_input"] = _input def _store_values(self): """ store all values of finished integration step """ for key, val in self._current_outputs.items(): if key in self._storage: self._storage[key].append(np.array(val)) else: self._storage.update({key: [np.array(val)]}) return def _check_time(self): """ send update notification every second """ t = self._current_outputs["time"] if t - self.updated_time > 1: self.state_changed.emit(SimulationStateChange(type="time", t=t)) # self.timeChanged.emit(t) self.updated_time = t
[docs] @pyqtSlot() def run(self): """ Start the simulation. """ self._run = True self.state_changed.emit(SimulationStateChange(type="start")) first_run = True rate = 1 / self._settings.measure_rate solver = self._simulation_modules["Solver"] while self._current_outputs["time"] < self._settings.end_time: t = solver.t dt = 0 while dt < rate: if not self._run: self._abort("Simulation aborted by user") break try: self._calc_step() except Exception as e: # catch all to avoid loosing data self._abort(sys.exc_info()) return dt = solver.t - t if dt < rate and first_run: self._store_values() first_run = False self._store_values() self._check_time() self._finish()
def _abort(self, info): """ Overwrite end time with reached time. """ self._settings.end_time = self._current_outputs["time"] self._storage.update(finished=False) end_state = "abort" self.state_changed.emit(SimulationStateChange(type=end_state, data=self.output, info=info)) self.work_done.emit() def _finish(self): self._storage.update(finished=True) end_state = "finish" self.state_changed.emit(SimulationStateChange(type=end_state, data=self.output, info="Success")) self.work_done.emit()
[docs] @pyqtSlot(name="stop") def stop(self): """ Stop the simulation. """ self._run = False
@property def output(self): # convert storage entries out = dict(modules={}, simulation={}, results={}) for mod, results in self._storage.items(): # grab module settings if mod in self._simulation_modules: out["modules"].update( {mod: self._simulation_modules[mod].settings}) # grab module data if not isinstance(results, list): # flag or string -> nothing to convert entry = results elif isinstance(results[0], np.ndarray): # convert list of 1d-arrays into 2d-array entry = np.array(results) else: # convert list of scalars into 1d-array entry = np.array(results) out["results"].update({mod: entry}) # grab simulator settings out.update({"simulation": self._settings.to_dict()}) return out @property def settings(self): return self._settings.to_dict()