"""MuMoT view classes"""
import bisect
import copy
import datetime
import math
import sys
from typing import Dict, Optional, Tuple, Union
from IPython.display import display, Math
from IPython.utils import io
from ipywidgets import HTML
import ipywidgets.widgets as widgets
import matplotlib
from matplotlib import pyplot as plt
import matplotlib.patches as mpatch
import matplotlib.ticker as ticker
from mpl_toolkits.mplot3d import proj3d
import numpy as np
import networkx as nx
import PyDSTool as dst
from scipy.integrate import odeint
import sympy
from sympy import (
default_sort_key,
Derivative,
factorial,
Function,
lambdify,
latex,
simplify,
Symbol,
symbols,
)
from sympy.parsing.latex import parse_latex
from warnings import warn, catch_warnings, simplefilter
from . import (
consts,
exceptions,
utils,
)
figureCounter = 1 # global figure counter for model views
[docs]class MuMoTview:
"""A view on a model."""
# Model view is on
_mumotModel = None
# Figure/axis object to plot view to
_figure = None
# Unique figure number
_figureNum = None
# 3d axes? (False => 2d)
_axes3d = None
# Controller that controls this view @todo - could become None
_controller = None
# Summary logs of view behaviour
_logs = None
# parameter values when used without controller
_fixedParams = None
# dictionary of rates and value
_ratesDict = None
# total number of agents in the simulation
_systemSize = None
# silent flag (TRUE = do not try to acquire figure handle from pyplot)
_silent = None
# plot limits (for non-constant system size) @todo: not used?
_plotLimits = None
# command name that generates this view
_generatingCommand = None
# generating keyword arguments
_generatingKwargs = None
# x-label
_xlab = None
# y-label
_ylab = None
# defines fontsize on the axes
_axes_font_size = None
# legend location: combinations like 'upper left', lower right, or 'center center' are allowed (9 options in total)
_legend_loc = None
# legend fontsize, accepts integers
_legend_fontsize = None
# displayed range for vertical axis
_chooseXrange = None
# displayed range for horizontal axis
_chooseYrange = None
[docs] def __init__(self, model, controller, figure=None, params=None, **kwargs):
self._silent = kwargs.get('silent', False)
self._mumotModel = model
self._controller = controller
self._logs = []
self._axes3d = False
self._fixedParams = {}
self._plotLimits = 1
self._generatingKwargs = kwargs
if params is not None:
(paramNames, paramValues) = utils._process_params(params)
self._fixedParams = dict(zip(paramNames, paramValues))
# storing the rates for each rule
if self._mumotModel:
freeParamDict = self._get_argDict()
self._ratesDict = {}
for rule in self._mumotModel._rules:
self._ratesDict[str(rule.rate)] = rule.rate.subs(freeParamDict)
if self._ratesDict[str(rule.rate)] == float('inf') or self._ratesDict[str(rule.rate)] is sympy.zoo:
self._ratesDict[str(rule.rate)] = sys.maxsize
# print(self._ratesDict)
self._systemSize = self._getSystemSize()
# storing the graphics params
if 'fontsize' in kwargs:
self._axes_font_size = kwargs['fontsize']
else:
self._axes_font_size = None
self._legend_loc = kwargs.get('legend_loc', 'upper right')
self._legend_fontsize = kwargs.get('legend_fontsize', None)
self._chooseXrange = kwargs.get('choose_xrange', None)
self._chooseYrange = kwargs.get('choose_yrange', None)
if not self._silent:
_buildFig(self, figure)
def _resetErrorMessage(self) -> None:
if self._controller is not None:
if not self._silent:
self._controller._errorMessage.value = ''
def _showErrorMessage(self, message) -> None:
if self._controller is not None:
self._controller._errorMessage.value = self._controller._errorMessage.value + message
else:
print(message)
def _show_computation_start(self) -> None:
if self._controller is not None:
self._controller._bookmarkWidget.style.button_color = 'pink'
def _show_computation_stop(self) -> None:
# ax = plt.gca()
# ax.set_facecolor('xkcd:white')
# print("pink off")
if self._controller is not None:
self._controller._bookmarkWidget.style.button_color = 'silver'
def _setLog(self, log) -> None:
self._logs = log
def _log(self, analysis) -> None:
print(f"Starting {analysis} with parameters ", end='')
param_names = []
param_values = []
if self._controller is not None:
# @todo: if the alphabetic order is not good,
# the view could store the desired order in (param_names)
# when the controller is constructed
for name in sorted(self._controller._widgetsFreeParams.keys()):
param_names.append(name)
param_values.append(self._controller._widgetsFreeParams[name].value)
for name in sorted(self._controller._widgetsExtraParams.keys()):
param_names.append(name)
param_values.append(self._controller._widgetsExtraParams[name].value)
# if self._param_names is not None:
# param_names += map(str, self._param_names)
# param_values += self._paramValues
# # @todo: in soloView, this does not show the extra parameters
# # (we should make clearer what the use of showLogs)
for key, value in self._fixedParams.items():
param_names.append(str(key))
param_values.append(value)
for i in zip(param_names, param_values):
print('(' + i[0] + '=' + repr(i[1]) + '), ', end='')
print("at", datetime.datetime.now())
def _print_standalone_view_cmd(self, includeParams: bool = True) -> str:
log_str = self._build_bookmark(includeParams)
if not self._silent and log_str is not None:
with io.capture_output() as log:
print(log_str)
self._logs.append(log)
return log_str
def _set_fixedParams(self, paramDict) -> None:
self._fixedParams = paramDict
def _get_params(self, refModel=None):
if refModel is not None:
model = refModel
else:
model = self._mumotModel
params = []
paramInitCheck = []
for reactant in model._reactants:
if reactant not in model._constantReactants:
paramInitCheck.append(latex(sympy.Symbol(f"Phi^0_{reactant}")))
if self._controller:
for name, value in self._controller._widgetsFreeParams.items():
if name in model._ratesLaTeX:
name = model._ratesLaTeX[name]
name = name.replace('(', '')
name = name.replace(')', '')
if name not in paramInitCheck:
params.append((latex(name), value.value))
for name, value in self._fixedParams.items():
# if name == 'systemSize' or name == 'plotLimits':
# continue
if name not in self._mumotModel._get_rates_from_stoichiometry() and name not in self._mumotModel._constantReactants:
continue
name = repr(name)
if name in model._ratesLaTeX:
name = model._ratesLaTeX[name]
name = name.replace('(', '')
name = name.replace(')', '')
params.append((latex(name), value))
params.append(('plotLimits', self._getPlotLimits()))
params.append(('systemSize', self._getSystemSize()))
return params
def _get_bookmarks_params(self, refModel=None) -> str:
params = self._get_params(refModel)
log_str = "params = ["
for name, value in params:
log_str += f"('{name}', {value}), "
log_str = log_str[:-2] # throw away last ", "
log_str += "]"
return log_str
def _build_bookmark(self, _=None) -> None:
self._resetErrorMessage()
self._showErrorMessage(f"Bookmark functionality not implemented for class {self._generatingCommand}")
def _getPlotLimits(self, defaultLimits: int = 1) -> int:
# if self._paramNames is not None and 'plotLimits' in self._paramNames:
if self._fixedParams is not None and 'plotLimits' in self._fixedParams:
# systemSize = self._paramValues[self._paramNames.index('plotLimits')]
plotLimits = self._fixedParams['plotLimits']
elif self._controller is not None and self._controller._plotLimitsWidget is not None:
plotLimits = self._controller._plotLimitsWidget.value
else:
plotLimits = defaultLimits
return plotLimits
def _getSystemSize(self, defaultSize: int = 1) -> int:
# if self._paramNames is not None and 'systemSize' in self._paramNames:
if self._fixedParams is not None and 'systemSize' in self._fixedParams:
# systemSize = self._paramValues[self._paramNames.index('systemSize')]
systemSize = self._fixedParams['systemSize']
elif self._controller is not None and self._controller._systemSizeWidget is not None:
systemSize = self._controller._systemSizeWidget.value
else:
systemSize = defaultSize
return systemSize
def _get_argDict(self):
"""Get names and values from widgets."""
paramNames = []
paramValues = []
if self._controller is not None:
for name, value in self._controller._widgetsFreeParams.items():
# print("wdg-name: " + str(name) + " wdg-val: " + str(value.value))
paramNames.append(name)
paramValues.append(value.value)
# if self._controller._widgetsExtraParams and 'initBifParam' in self._controller._widgetsExtraParams:
# paramNames.append(self._bifurcationParameter_for_get_argDict)
# paramValues.append(self._controller._widgetsExtraParams['initBifParam'].value)
# if self._fixedParams and 'initBifParam' in self._fixedParams:
# paramNames.append(self._bifurcationParameter_for_get_argDict)
# paramValues.append(self._fixedParams['initBifParam'])
if self._fixedParams is not None:
for key, item in self._fixedParams.items():
# print(f"fix-name: {key} fix-val: {item}")
paramNames.append(str(key))
paramValues.append(item)
argNamesSymb = list(map(sympy.Symbol, paramNames))
argDict = dict(zip(argNamesSymb, paramValues))
if self._mumotModel._systemSize:
argDict[self._mumotModel._systemSize] = 1
# @todo: is this necessary? for which view?
systemSize = sympy.Symbol('systemSize')
argDict[systemSize] = self._getSystemSize()
return argDict
def _get_fixedPoints1d(self):
"""Calculate stationary states of 1D system."""
argDict = self._get_argDict()
EQ1 = self._mumotModel._equations[self._stateVariable1].subs(argDict)
eps = 1e-8
EQsol = sympy.solve((EQ1), (self._stateVariable1), dict=True)
realEQsol = [{self._stateVariable1: sympy.re(EQsol[kk][self._stateVariable1])}
for kk in range(len(EQsol))
if sympy.Abs(sympy.im(EQsol[kk][self._stateVariable1])) <= eps]
MAT = sympy.Matrix([EQ1])
JAC = MAT.jacobian([self._stateVariable1])
eigList = []
for nn in range(len(realEQsol)):
evSet = {}
JACsub = JAC.subs([(self._stateVariable1, realEQsol[nn][self._stateVariable1])])
# evSet = JACsub.eigenvals()
eigVects = JACsub.eigenvects()
for kk in range(len(eigVects)):
evSet[eigVects[kk][0]] = (eigVects[kk][1], eigVects[kk][2])
eigList.append(evSet)
return realEQsol, eigList # returns two lists of dictionaries
def _get_fixedPoints2d(self):
"""Calculate stationary states of 2d system."""
argDict = self._get_argDict()
EQ1 = self._mumotModel._equations[self._stateVariable1].subs(argDict)
EQ2 = self._mumotModel._equations[self._stateVariable2].subs(argDict)
eps = 1e-8
EQsolA = sympy.solve((EQ1, EQ2), (self._stateVariable1, self._stateVariable2), dict=True)
EQsol = []
addIndexToSolList = []
for nn in range(len(EQsolA)):
if len(EQsolA[nn]) == 2:
addIndexToSolList.append(nn)
# else:
# self._showErrorMessage('Some solutions for Fixed Points may not be unique.')
for el in addIndexToSolList:
EQsol.append(EQsolA[el])
if len(EQsol) == 0:
self._showErrorMessage('Could not compute any unique solutions for Fixed Points. ')
return None, None
elif len(EQsol) < len(EQsolA):
self._showErrorMessage('Some solutions for Fixed Points may not be unique. ')
# for nn in range(len(EQsol)):
# if len(EQsol[nn]) != 2:
# self._showErrorMessage('Some or all solutions are NOT unique.')
# return None, None
realEQsol = [{self._stateVariable1: sympy.re(EQsol[kk][self._stateVariable1]),
self._stateVariable2: sympy.re(EQsol[kk][self._stateVariable2])}
for kk in range(len(EQsol))
if (sympy.Abs(sympy.im(EQsol[kk][self._stateVariable1])) <= eps and
sympy.Abs(sympy.im(EQsol[kk][self._stateVariable2])) <= eps)]
MAT = sympy.Matrix([EQ1, EQ2])
JAC = MAT.jacobian([self._stateVariable1, self._stateVariable2])
eigList = []
for nn in range(len(realEQsol)):
evSet = {}
JACsub = JAC.subs([(self._stateVariable1, realEQsol[nn][self._stateVariable1]),
(self._stateVariable2, realEQsol[nn][self._stateVariable2])])
# evSet = JACsub.eigenvals()
eigVects = JACsub.eigenvects()
for kk in range(len(eigVects)):
evSet[eigVects[kk][0]] = (eigVects[kk][1], eigVects[kk][2])
eigList.append(evSet)
return realEQsol, eigList # returns two lists of dictionaries
def _get_fixedPoints3d(self):
"""Calculate stationary states of 3d system."""
argDict = self._get_argDict()
EQ1 = self._mumotModel._equations[self._stateVariable1].subs(argDict)
EQ2 = self._mumotModel._equations[self._stateVariable2].subs(argDict)
EQ3 = self._mumotModel._equations[self._stateVariable3].subs(argDict)
eps = 1e-8
EQsolA = sympy.solve((EQ1, EQ2, EQ3),
(self._stateVariable1, self._stateVariable2, self._stateVariable3),
dict=True)
EQsol = []
addIndexToSolList = []
for nn in range(len(EQsolA)):
if len(EQsolA[nn]) == 3:
addIndexToSolList.append(nn)
# else:
# self._showErrorMessage('Some solutions for Fixed Points may not be unique.')
for el in addIndexToSolList:
EQsol.append(EQsolA[el])
if len(EQsol) == 0:
self._showErrorMessage('Could not compute any unique solutions for Fixed Points. ')
return None, None
elif len(EQsol) < len(EQsolA):
self._showErrorMessage('Some solutions for Fixed Points may not be unique. ')
# for nn in range(len(EQsol)):
# if len(EQsol[nn]) != 3:
# self._showErrorMessage('Some or all solutions are NOT unique.')
# return None, None
realEQsol = [{self._stateVariable1: sympy.re(EQsol[kk][self._stateVariable1]),
self._stateVariable2: sympy.re(EQsol[kk][self._stateVariable2]),
self._stateVariable3: sympy.re(EQsol[kk][self._stateVariable3])}
for kk in range(len(EQsol))
if (sympy.Abs(sympy.im(EQsol[kk][self._stateVariable1])) <= eps and
sympy.Abs(sympy.im(EQsol[kk][self._stateVariable2])) <= eps and
sympy.Abs(sympy.im(EQsol[kk][self._stateVariable3])) <= eps)]
MAT = sympy.Matrix([EQ1, EQ2, EQ3])
JAC = MAT.jacobian([self._stateVariable1, self._stateVariable2, self._stateVariable3])
eigList = []
for nn in range(len(realEQsol)):
evSet = {}
JACsub = JAC.subs([(self._stateVariable1, realEQsol[nn][self._stateVariable1]),
(self._stateVariable2, realEQsol[nn][self._stateVariable2]),
(self._stateVariable3, realEQsol[nn][self._stateVariable3])])
# evSet = JACsub.eigenvals()
eigVects = JACsub.eigenvects()
for kk in range(len(eigVects)):
evSet[eigVects[kk][0]] = (eigVects[kk][1], eigVects[kk][2])
eigList.append(evSet)
return realEQsol, eigList # returns two lists of dictionaries
def _update_params(self) -> None:
"""Update parameters from widgets.
If the view requires view-specific params they can be updated implementing _update_view_specific_params().
"""
freeParamDict = self._get_argDict()
if self._controller is not None:
# getting the rates' value
self._ratesDict = {}
for rule in self._mumotModel._rules:
self._ratesDict[str(rule.rate)] = rule.rate.subs(freeParamDict)
if self._ratesDict[str(rule.rate)] == float('inf') or self._ratesDict[str(rule.rate)] is sympy.zoo:
self._ratesDict[str(rule.rate)] = sys.maxsize
errorMsg = (f"WARNING! Rate with division by zero. \n"
f"The rule has a rate with division by zero: \n"
f"{rule.lhsReactants} --> {rule.rhsReactants} with rate {rule.rate}.\n"
f"The system has run the simulation with the maximum system value: {self._ratesDict[str(rule.rate)]}")
self._showErrorMessage(errorMsg)
# print("_ratesDict=" + str(self._ratesDict))
self._systemSize = self._getSystemSize()
self._update_view_specific_params(freeParamDict)
def _getWidgetParamValue(self, key, dict=None):
"""Check fixedParams then generatingKwargs for a key value otherwise return from dict."""
if self._fixedParams.get(key) is not None:
return self._fixedParams[key]
elif self._generatingKwargs.get(key) is not None:
return self._generatingKwargs[key]
elif dict is not None:
if dict.get(key) is not None:
return dict[key].value
else:
raise exceptions.MuMoTValueError(f"Could not find value for key '{key}'; "
"if using a multicontroller try moving keyword definition down to creation of constitutent controllers")
else:
return None
def _getInitialState(self, state, freeParamDict):
"""Get initial state from widgets, otherwise original initial state."""
if state in self._mumotModel._constantReactants:
return freeParamDict[state]
elif self._controller._widgetsExtraParams.get(f"init{state}") is not None:
return self._controller._widgetsExtraParams[f"init{state}"].value
else:
return self._initialState[state]
def _update_view_specific_params(self, freeParamDict=None):
"""Interface method to update view-specific params from widgets.
@todo JARM: I don't see what purpose this serves - it is mostly ignored and I don't think will function as intended
"""
if freeParamDict is None:
freeParamDict = {}
def _safeSymbol(self, item):
"""Used in _update_view_specific_params"""
if type(item) is sympy.Symbol:
return item
else:
return sympy.Symbol(item)
[docs] def showLogs(self, tail: bool = False) -> None:
"""Show logs from view.
Parameters
----------
tail : bool, optional
Flag to show only tail entries from logs. Defaults to False.
"""
if tail:
tailLength = 5
print(f"Showing last {min(tailLength, len(self._logs))} of {len(self._logs)} log entries:")
for log in self._logs[-tailLength:]:
log.show()
else:
for log in self._logs:
log.show()
[docs]class MuMoTmultiView(MuMoTview):
"""Multi-view view.
Tied closely to :class:`MuMoTmultiController`.
"""
# view list
_views = None
# axes are used for subplots ('shareAxes = True')
_axes = None
# number of subplots
_subPlotNum = None
# subplot rows
_numRows = None
# subplot columns
_numColumns = None
# use common axes for all plots (False = use subplots)
_shareAxes = None
# controllers (for building bookmarks)
_controllers = None
[docs] def __init__(self, controller, model, views, controllers, subPlotNum, **kwargs):
super().__init__(model, controller, **kwargs)
self._generatingCommand = "mumot.MuMoTmultiController"
self._views = views
self._controllers = controllers
self._subPlotNum = subPlotNum
self._shareAxes = kwargs.get('shareAxes', False)
for i, view in enumerate(self._views):
view._figure = self._figure
view._figureNum = self._figureNum
view._setLog(self._logs)
view._controller = controller
# MuMoTstochasticSimulationView could be run with the option realtimePlot, this would work only if:
# * this view is the first of the list of views (so that realtimePlot will not erase previous plots)
# * the multiController has shareAxes==False, in which case the update will happen only on the view subplot
if isinstance(view, MuMoTstochasticSimulationView) and self._shareAxes and i>0:
view._allowRealtimePlotting = False
if not self._shareAxes:
self._numColumns = consts.MULTIPLOT_COLUMNS
self._numRows = math.ceil(self._subPlotNum / self._numColumns)
plt.gcf().set_size_inches(9, 4.5)
def _plot(self, _=None) -> None:
fig = plt.figure(self._figureNum)
plt.clf()
self._resetErrorMessage()
if self._shareAxes:
for func, subPlotNum, axes3d in self._controller._replotFunctions:
func()
else:
# subplotNum = 1
for func, subPlotNum, axes3d in self._controller._replotFunctions:
with catch_warnings():
simplefilter("ignore")
if axes3d:
# self._figure.add_subplot(self._numRows, self._numColumns, subPlotNum, projection = '3d')
plt.subplot(self._numRows, self._numColumns, subPlotNum, projection='3d')
else:
plt.subplot(self._numRows, self._numColumns, subPlotNum)
func()
plt.subplots_adjust(left=0.12, bottom=0.25, right=0.98, top=0.9, wspace=0.45, hspace=None)
# plt.tight_layout()
# subplotNum += 1
def _setLog(self, log) -> None:
for view in self._views:
view._setLog(log)
def _print_standalone_view_cmd(self, includeParams: bool = False):
model = self._views[0]._mumotModel # @todo this suppose that all models are the same for all views
with io.capture_output() as log:
if not self._controller._silent:
log_str = "bookmark = "
else:
log_str = ""
log_str += self._generatingCommand + "(["
for controller in self._controllers:
log_str += controller._view._print_standalone_view_cmd(False) + ", "
log_str = log_str[:-2] # throw away last ", "
log_str += "]"
if includeParams:
log_str += ", " + self._get_bookmarks_params(model)
if len(self._generatingKwargs) > 0:
log_str += ", "
for key in self._generatingKwargs:
if type(self._generatingKwargs[key]) == str:
log_str += key + " = " + "\'" + str(self._generatingKwargs[key]) + "\'" + ", "
else:
log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str = log_str[:-2] # throw away last ", "
if 'silent' not in self._generatingKwargs:
log_str += ", silent = " + str(self._silent)
if 'bookmark' not in self._generatingKwargs:
log_str += ", bookmark = False"
log_str += ")"
# log_str = log_str.replace('\\', '\\\\') # @todo is this necessary?
if not self._silent and log_str is not None:
print(log_str)
self._logs.append(log)
return log_str
def _set_fixedParams(self, paramDict):
self._fixedParams = paramDict
for view in self._views:
# view._set_fixedParams(paramDict)
# This operation merges the two dictionaries with the second overriding the values of the first
view._set_fixedParams({**paramDict, **view._fixedParams})
[docs]class MuMoTtimeEvolutionView(MuMoTview):
"""Time evolution view on model including state variables and noise.
Specialised by :class:`MuMoTintegrateView` and :class:`MuMoTnoiseCorrelationsView`.
"""
# list of all state variables
_stateVarList = None
# list of all state variables displayed in figure
_stateVarListDisplay = None
# 1st state variable
_stateVariable1 = None
# 2nd state variable
_stateVariable2 = None
# 3rd state variable
_stateVariable3 = None
# # 4th state variable
# _stateVariable4 = None
# # end time of numerical simulation of ODE system of the state variables
# _tend = None
# simulation length in time units
_maxTime = None
# time step of numerical simulation
_tstep = None
# total number of agents in the simulation
_systemSize = None
# the system state at the start of the simulation (timestep zero)
_initialState = None
# flag to plot proportions or full populations
_plotProportions = None
# Parameters for controller specific to this time evolution view
_tEParams = None
[docs] def __init__(self, model, controller, tEParams, showStateVars=None, figure=None, params=None, **kwargs):
# if model._systemSize is None and model._constantSystemSize:
# print("Cannot construct time evolution -based plot until system size is set, using substitute()")
# return
self._silent = kwargs.get('silent', False)
super().__init__(model=model, controller=controller, figure=figure, params=params, **kwargs)
# super().__init__(model, controller, figure, params, **kwargs)
self._tEParams = tEParams
with io.capture_output() as log:
# if True:
# log=''
self._systemSize = self._getSystemSize()
if self._controller is None:
# storing the initial state
self._initialState = {}
for state, pop in tEParams["initialState"].items():
if isinstance(state, str):
# convert string into SymPy symbol
self._initialState[parse_latex(state)] = pop
else:
self._initialState[state] = pop
# # add to the _initialState the constant reactants
# for constantReactant in self._mumotModel._getAllReactants()[1]:
# self._initialState[constantReactant] = freeParamDict[constantReactant]
# Storing all values of MA-specific parameters
self._maxTime = tEParams["maxTime"]
self._plotProportions = tEParams["plotProportions"]
else:
# storing the initial state
self._initialState = {}
for state, pop in tEParams["initialState"][0].items():
if isinstance(state, str):
# Convert string into SymPy symbol
self._initialState[parse_latex(state)] = pop[0]
else:
self._initialState[state] = pop[0]
# # add to the _initialState the constant reactants
# for constantReactant in self._mumotModel._getAllReactants()[1]:
# self._initialState[constantReactant] = freeParamDict[constantReactant]
# storing fixed params
for key, value in tEParams.items():
if value[-1]:
if key == 'initialState':
self._fixedParams[key] = self._initialState
else:
self._fixedParams[key] = value[0]
self._xlab = kwargs.get('xlab', 'time t')
self._stateVarList = []
for reactant in self._mumotModel._reactants:
if reactant not in self._mumotModel._constantReactants:
self._stateVarList.append(reactant)
if showStateVars:
if type(showStateVars) == list:
self._stateVarListDisplay = showStateVars
for kk in range(len(self._stateVarListDisplay)):
self._stateVarListDisplay[kk] = parse_latex(self._stateVarListDisplay[kk])
else:
self._showErrorMessage('Check input: should be of type list!')
else:
self._stateVarListDisplay = copy.deepcopy(self._stateVarList)
self._stateVarListDisplay = sorted(self._stateVarListDisplay, key=str)
self._stateVariable1 = self._stateVarList[0]
if len(self._stateVarList) == 2 or len(self._stateVarList) == 3:
self._stateVariable2 = self._stateVarList[1]
if len(self._stateVarList) == 3:
self._stateVariable3 = self._stateVarList[2]
self._tstep = kwargs.get('tstep', 0.01)
self._constructorSpecificParams(tEParams)
self._logs.append(log)
if not self._silent:
self._plot_NumSolODE()
def _get_eqsODE(self, y_old, time):
""" Calculates right-hand side of ODE system."""
SVsub = {}
for kk in range(len(self._stateVarList)):
SVsub[self._stateVarList[kk]] = y_old[kk]
argDict = self._get_argDict()
ode_sys = []
for kk in range(len(self._stateVarList)):
EQ = self._mumotModel._equations[self._stateVarList[kk]].subs(argDict)
EQ = EQ.subs(SVsub)
ode_sys.append(EQ)
return ode_sys
def _plot_NumSolODE(self):
if not self._silent: # @todo is this necessary?
plt.figure(self._figureNum)
plt.clf()
self._resetErrorMessage()
self._showErrorMessage(str(self))
self._update_params()
def _update_view_specific_params(self, freeParamDict=None):
"""getting other parameters specific to integrate"""
if freeParamDict is None:
freeParamDict = {}
if self._controller is not None:
if self._getWidgetParamValue('initialState', None) is not None:
# self._initialState = self._getWidgetParamValue('initialState', None)
# self._initialState = {sympy.Symbol(key): self._getWidgetParamValue('initialState', None)[key]
# for key in self._getWidgetParamValue('initialState', None)}
self._initialState = {self._safeSymbol(key): self._getWidgetParamValue('initialState', None)[key]
for key in self._getWidgetParamValue('initialState', None)}
else:
for state in self._initialState.keys():
# add normal and constant reactants to the _initialState
self._initialState[state] = self._getInitialState(state, freeParamDict) # freeParamDict[state] if state in self._mumotModel._constantReactants else self._controller._widgetsExtraParams['init' + str(state)].value
if 'plotProportions' in self._tEParams: # @todo JARM: I don't really understand logic of checking _tEParams but then retrieving the value from elsewhere
self._plotProportions = self._getWidgetParamValue('plotProportions', self._controller._widgetsPlotOnly) # self._fixedParams['plotProportions'] if self._fixedParams.get('plotProportions') is not None else self._controller._widgetsPlotOnly['plotProportions'].value
self._maxTime = self._getWidgetParamValue('maxTime', self._controller._widgetsExtraParams) # self._fixedParams['maxTime'] if self._fixedParams.get('maxTime') is not None else self._controller._widgetsExtraParams['maxTime'].value
[docs]class MuMoTintegrateView(MuMoTtimeEvolutionView):
"""Numerical solution of ODEs plot view on model."""
# initial conditions used for proportion plot
_y0 = None
# save solution for redraw to switch between plotProportions = True and False
_sol_ODE_dict = None
# ordered list of colors to be used
_colors = None
def _constructorSpecificParams(self, _):
if self._controller is not None:
self._generatingCommand = "integrate"
self._colors = []
for idx, state in enumerate(sorted(self._initialState.keys(), key=str)):
if state in self._stateVarListDisplay:
self._colors.append(consts.LINE_COLOR_LIST[idx])
# print(self._colors)
[docs] def __init__(self, *args, **kwargs):
self._ylab = kwargs.get('ylab', 'reactants')
super().__init__(*args, **kwargs)
# self._generatingCommand = "numSimStateVar"
def _plot_NumSolODE(self, _=None):
self._show_computation_start()
super()._plot_NumSolODE()
with io.capture_output() as log:
self._log("numerical integration of ODE system")
self._logs.append(log)
# check input
for nn in range(len(self._stateVarListDisplay)):
if self._stateVarListDisplay[nn] not in self._stateVarList:
self._showErrorMessage(f"Warning: {self._stateVarListDisplay[nn]} is no reactant in the current model.")
return None
# if self._stateVariable1 not in self._mumotModel._reactants:
# self._showErrorMessage('Warning: ' + str(self._stateVariable1) + ' is no reactant in the current model.')
# if self._stateVariable2 not in self._mumotModel._reactants:
# self._showErrorMessage('Warning: ' + str(self._stateVariable2) + ' is no reactant in the current model.')
# if self._stateVariable3:
# if self._stateVariable3 not in self._mumotModel._reactants:
# self._showErrorMessage('Warning: ' + str(self._stateVariable3) + ' is no reactant in the current model.')
# if self._stateVariable4:
# if self._stateVariable4 not in self._mumotModel._reactants:
# self._showErrorMessage('Warning: ' + str(self._stateVariable4) + ' is no reactant in the current model.')
NrDP = int(self._maxTime / self._tstep) + 1
time = np.linspace(0, self._maxTime, NrDP)
initDict = self._initialState # self._get_tEParams() # self._initialState
# if len(initDict) < 2 or len(initDict) > 4:
# self._showErrorMessage("Not implemented: This feature is available only for systems with 2, 3 or 4 time-dependent reactants!")
y0 = []
for nn in range(len(self._stateVarList)):
# SVi0 = initDict[sympy.Symbol(latex(sympy.Symbol('Phi^0_' + str(self._stateVarList[nn]))))]
SVi0 = initDict[sympy.Symbol(str(self._stateVarList[nn]))]
y0.append(SVi0)
self._y0 = y0
sol_ODE = odeint(self._get_eqsODE, y0, time)
sol_ODE_dict = {}
for nn in range(len(self._stateVarList)):
sol_ODE_dict[str(self._stateVarList[nn])] = sol_ODE[:, nn]
self._sol_ODE_dict = sol_ODE_dict
# x_data = [time for kk in range(len(self._get_eqsODE(y0, time)))]
x_data = [time for kk in range(len(self._stateVarListDisplay))]
# y_data = [sol_ODE[:, kk] for kk in range(len(self._get_eqsODE(y0, time)))]
y_data = [sol_ODE_dict[str(self._stateVarListDisplay[kk])]
for kk in range(len(self._stateVarListDisplay))]
if not self._plotProportions:
syst_Size = sympy.Symbol('systemSize')
sysS = syst_Size.subs(self._get_argDict())
# sysS = syst_Size.subs(self._getSystemSize())
sysS = sympy.N(sysS)
# y_scaling = np.sum(np.asarray(y0))
# if y_scaling > 0:
# sysS = sysS/y_scaling
for nn in range(len(y_data)):
y_temp = np.copy(y_data[nn])
for kk in range(len(y_temp)):
y_temp[kk] = y_temp[kk] * sysS
y_data[nn] = y_temp
c_labels = [r'$' + str(self._stateVarListDisplay[nn]) + '$' for nn in range(len(self._stateVarListDisplay))]
else:
c_labels = [r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVarListDisplay[nn]))) + '$' for nn in range(len(self._stateVarListDisplay))]
c_labels = [utils._doubleUnderscorify(utils._greekPrependify(c_labels[jj])) for jj in range(len(c_labels))]
# c_labels = [r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable1))) + '$', r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable2))) + '$']
# if self._stateVariable3:
# c_labels.append(r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable3))) + '$')
# if self._stateVariable4:
# c_labels.append(r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable4))) + '$')
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, self._maxTime]
_fig_formatting_2D(xdata=x_data, ydata=y_data, xlab=self._xlab,
ylab=self._ylab, choose_xrange=choose_xrange,
choose_yrange=self._chooseYrange,
fontsize=self._axes_font_size, curvelab=c_labels,
legend_loc=self._legend_loc, grid=True,
legend_fontsize=self._legend_fontsize,
line_color_list=self._colors)
with io.capture_output() as log:
print('Last point on curve:')
if not self._plotProportions:
for nn in range(len(self._stateVarListDisplay)):
out = latex(str(self._stateVarListDisplay[nn])) + '(t =' + str(_roundNumLogsOut(x_data[nn][-1])) + ') = ' + str(_roundNumLogsOut(y_data[nn][-1]))
out = utils._doubleUnderscorify(utils._greekPrependify(out))
display(Math(out))
else:
for nn in range(len(self._stateVarListDisplay)):
out = latex(sympy.Symbol('Phi_' + str(self._stateVarListDisplay[nn]))) + '(t =' + str(_roundNumLogsOut(x_data[nn][-1])) + ') = ' + str(_roundNumLogsOut(y_data[nn][-1]))
out = utils._doubleUnderscorify(utils._greekPrependify(out))
display(Math(out))
self._logs.append(log)
self._show_computation_stop()
def _redrawOnly(self, _=None):
super()._plot_NumSolODE()
self._update_params()
NrDP = int(self._maxTime / self._tstep) + 1
time = np.linspace(0, self._maxTime, NrDP)
# x_data = [time for kk in range(len(self._get_eqsODE(y0, time)))]
x_data = [time for kk in range(len(self._stateVarListDisplay))]
# y_data = [sol_ODE[:, kk] for kk in range(len(self._get_eqsODE(y0, time)))]
y_data = [self._sol_ODE_dict[str(self._stateVarListDisplay[kk])] for kk in range(len(self._stateVarListDisplay))]
if not self._plotProportions:
syst_Size = sympy.Symbol('systemSize')
sysS = syst_Size.subs(self._get_argDict())
# sysS = syst_Size.subs(self._getSystemSize())
sysS = sympy.N(sysS)
# y_scaling = np.sum(np.asarray(self._y0))
# if y_scaling > 0:
# sysS = sysS / y_scaling
for nn in range(len(y_data)):
y_temp = np.copy(y_data[nn])
for kk in range(len(y_temp)):
y_temp[kk] = y_temp[kk] * sysS
y_data[nn] = y_temp
c_labels = [r'$' + str(self._stateVarListDisplay[nn]) + '$' for nn in range(len(self._stateVarListDisplay))]
else:
c_labels = [r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVarListDisplay[nn]))) + '$' for nn in range(len(self._stateVarListDisplay))]
c_labels = [utils._doubleUnderscorify(utils._greekPrependify(c_labels[jj])) for jj in range(len(c_labels))]
# c_labels = [r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable1))) + '$', r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable2))) + '$']
# if self._stateVariable3:
# c_labels.append(r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable3))) + '$')
# if self._stateVariable4:
# c_labels.append(r'$' + latex(sympy.Symbol('Phi_' + str(self._stateVariable4))) + '$')
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, self._maxTime]
_fig_formatting_2D(xdata=x_data, ydata=y_data, xlab=self._xlab,
ylab=self._ylab, choose_xrange=choose_xrange,
choose_yrange=self._chooseYrange,
fontsize=self._axes_font_size, curvelab=c_labels,
legend_loc=self._legend_loc, grid=True,
legend_fontsize=self._legend_fontsize)
with io.capture_output() as log:
print('Last point on curve:')
if not self._plotProportions:
for nn in range(len(self._stateVarListDisplay)):
out = latex(str(self._stateVarListDisplay[nn])) + '(t =' + str(x_data[nn][-1]) + ') = ' + str(str(y_data[nn][-1]))
out = utils._doubleUnderscorify(utils._greekPrependify(out))
display(Math(out))
else:
for nn in range(len(self._stateVarListDisplay)):
out = latex(sympy.Symbol('Phi_' + str(self._stateVarListDisplay[nn]))) + '(t =' + str(x_data[nn][-1]) + ') = ' + str(str(y_data[nn][-1]))
out = utils._doubleUnderscorify(utils._greekPrependify(out))
display(Math(out))
self._logs.append(log)
def _build_bookmark(self, includeParams=True):
if not self._silent:
log_str = "bookmark = "
else:
log_str = ""
log_str += "<modelName>." + self._generatingCommand + "(showStateVars=["
for nn in range(len(self._stateVarListDisplay)):
if nn == len(self._stateVarListDisplay) - 1:
log_str += "'" + str(self._stateVarListDisplay[nn]) + "'], "
else:
log_str += "'" + str(self._stateVarListDisplay[nn]) + "', "
if "initialState" not in self._generatingKwargs.keys():
initState_str = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
log_str += "initialState = " + str(initState_str) + ", "
if "maxTime" not in self._generatingKwargs.keys():
log_str += "maxTime = " + str(self._maxTime) + ", "
if "plotProportions" not in self._generatingKwargs.keys():
log_str += "plotProportions = " + str(self._plotProportions) + ", "
if includeParams:
log_str += self._get_bookmarks_params() + ", "
if len(self._generatingKwargs) > 0:
for key in self._generatingKwargs:
if type(self._generatingKwargs[key]) == str:
log_str += key + " = " + "\'" + str(self._generatingKwargs[key]) + "\'" + ", "
else:
log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str += "bookmark = False"
log_str += ")"
log_str = utils._greekPrependify(log_str)
log_str = log_str.replace('\\', '\\\\')
log_str = log_str.replace('\\\\\\\\', '\\\\')
return log_str
[docs]class MuMoTnoiseCorrelationsView(MuMoTtimeEvolutionView):
"""Noise correlations around fixed points plot view on model."""
# equations of motion for first order moments of noise variables
_EOM_1stOrdMomDict = None
# equations of motion for second order moments of noise variables
_EOM_2ndOrdMomDict = None
# upper bound of simulation time for dynamical system to reach equilibrium (can be set via keyword)
_maxTimeDS = None
# time step of simulation for dynamical system to reach equilibrium (can be set via keyword)
_tstepDS = None
def _constructorSpecificParams(self, _):
if self._controller is not None:
self._generatingCommand = "noiseCorrelations"
[docs] def __init__(self, model, controller, NCParams, EOM_1stOrdMom,
EOM_2ndOrdMom, figure=None, params=None, **kwargs):
self._EOM_1stOrdMomDict = EOM_1stOrdMom
self._EOM_2ndOrdMomDict = EOM_2ndOrdMom
self._maxTimeDS = kwargs.get('maxTimeDS', 50)
self._tstepDS = kwargs.get('tstepDS', 0.01)
self._ylab = kwargs.get('ylab', 'noise correlations')
self._silent = kwargs.get('silent', False)
super().__init__(model=model, controller=controller, tEParams=NCParams,
showStateVars=None, figure=figure, params=params, **kwargs)
# super().__init__(model, controller, None, figure, params, **kwargs)
if len(self._stateVarList) < 1 or len(self._stateVarList) > 3:
self._showErrorMessage("Not implemented: This feature is available only for systems with 1, 2 or 3 time-dependent reactants!")
return None
def _plot_NumSolODE(self, _=None):
self._show_computation_start()
super()._plot_NumSolODE()
with io.capture_output() as log:
self._log("numerical integration of noise correlations")
self._logs.append(log)
# Check input
for nn in range(len(self._stateVarListDisplay)):
if self._stateVarListDisplay[nn] not in self._stateVarList:
self._showErrorMessage(f"Warning: {self._stateVarListDisplay[nn]} is no reactant in the current model.")
return None
eps = 5e-3
systemSize = sympy.Symbol('systemSize')
NrDP = int(self._maxTimeDS / self._tstepDS) + 1
time = np.linspace(0, self._maxTimeDS, NrDP)
# NrDP = int(self._tend / self._tstep) + 1
# time = np.linspace(0, self._tend, NrDP)
initDict = self._initialState
SV1_0 = initDict[sympy.Symbol(str(self._stateVariable1))]
y0 = [SV1_0]
if self._stateVariable2:
SV2_0 = initDict[sympy.Symbol(str(self._stateVariable2))]
y0.append(SV2_0)
if self._stateVariable3:
SV3_0 = initDict[sympy.Symbol(str(self._stateVariable3))]
y0.append(SV3_0)
sol_ODE = odeint(self._get_eqsODE, y0, time)
if self._stateVariable3:
realEQsol, eigList = self._get_fixedPoints3d()
elif self._stateVariable2:
realEQsol, eigList = self._get_fixedPoints2d()
else:
realEQsol, eigList = self._get_fixedPoints1d()
y_stationary = [sol_ODE[-1, kk] for kk in range(len(y0))]
if realEQsol != [] and realEQsol is not None:
steadyStateReached = False
for kk in range(len(realEQsol)):
if self._stateVariable3:
if abs(realEQsol[kk][self._stateVariable1] - y_stationary[0]) <= eps and abs(realEQsol[kk][self._stateVariable2] - y_stationary[1]) <= eps and abs(realEQsol[kk][self._stateVariable3] - y_stationary[2]) <= eps:
if all(sympy.sign(sympy.re(lam)) < 0 for lam in eigList[kk]):
steadyStateReached = True
steadyStateDict = {self._stateVariable1: realEQsol[kk][self._stateVariable1],
self._stateVariable2: realEQsol[kk][self._stateVariable2],
self._stateVariable3: realEQsol[kk][self._stateVariable3]}
elif self._stateVariable2:
if abs(realEQsol[kk][self._stateVariable1] - y_stationary[0]) <= eps and abs(realEQsol[kk][self._stateVariable2] - y_stationary[1]) <= eps:
if all(sympy.sign(sympy.re(lam)) < 0 for lam in eigList[kk]):
steadyStateReached = True
steadyStateDict = {self._stateVariable1: realEQsol[kk][self._stateVariable1],
self._stateVariable2: realEQsol[kk][self._stateVariable2]}
else:
if abs(realEQsol[kk][self._stateVariable1] - y_stationary[0]) <= eps:
if all(sympy.sign(sympy.re(lam)) < 0 for lam in eigList[kk]):
steadyStateReached = True
steadyStateDict = {self._stateVariable1: realEQsol[kk][self._stateVariable1]}
if not steadyStateReached:
self._show_computation_stop()
self._showErrorMessage('ODE system could not reach stable steady state: '
'Try changing the initial conditions or model parameters '
'using the sliders provided, increase simulation time, or decrease timestep tstep.')
return None
else:
steadyStateReached = 'uncertain'
self._showErrorMessage('Warning: ODE system may not have reached a steady state. '
'Values of state variables at t=maxTimeDS were substituted '
"(maxTimeDS can be set via keyword 'maxTimeDS = <number>').")
if self._stateVariable3:
steadyStateDict = {self._stateVariable1: y_stationary[0],
self._stateVariable2: y_stationary[1],
self._stateVariable3: y_stationary[2]}
elif self._stateVariable2:
steadyStateDict = {self._stateVariable1: y_stationary[0],
self._stateVariable2: y_stationary[1]}
else:
steadyStateDict = {self._stateVariable1: y_stationary[0]}
with io.capture_output() as log:
if steadyStateReached == 'uncertain':
print('This plot depicts the noise-noise auto-correlation and '
'cross-correlation functions around the following state (this might NOT be a steady state).')
else:
print('This plot depicts the noise-noise auto-correlation and '
'cross-correlation functions around the following stable steady state:')
for reactant in steadyStateDict:
out = 'Phi^s_{' + latex(str(reactant)) + '} = ' + latex(_roundNumLogsOut(steadyStateDict[reactant]))
out = utils._doubleUnderscorify(utils._greekPrependify(out))
display(Math(out))
self._logs.append(log)
argDict_tmp = self._get_argDict()
# Mutate key names of the argDict and the steadyStateDict so that they indicate concentrations via prefix Phi
argDict = {}
for key, val in argDict_tmp.items():
key_phi = sympy.Symbol(f"Phi_{key}") if key in self._mumotModel._constantReactants else key
argDict[key_phi] = val
steadyStateDictPhi = {}
for key, val in steadyStateDict.items():
key_phi = sympy.Symbol(f"Phi_{key}") if key in self._mumotModel._reactants else key
steadyStateDictPhi[key_phi] = val
EOM_1stOrdMomDict = copy.deepcopy(self._EOM_1stOrdMomDict)
for sol in EOM_1stOrdMomDict:
EOM_1stOrdMomDict[sol] = EOM_1stOrdMomDict[sol].subs(steadyStateDictPhi)
EOM_1stOrdMomDict[sol] = EOM_1stOrdMomDict[sol].subs(argDict)
EOM_2ndOrdMomDict = copy.deepcopy(self._EOM_2ndOrdMomDict)
SOL_2ndOrdMomDict = self._numericSol2ndOrdMoment(EOM_2ndOrdMomDict, steadyStateDictPhi, argDict)
time_depend_noise = []
for reactant in self._mumotModel._reactants:
if reactant not in self._mumotModel._constantReactants:
time_depend_noise.append(sympy.Symbol(f"eta_{reactant}"))
noiseCorrEOM = []
noiseCorrEOMdict = {}
for sym in time_depend_noise:
for key in EOM_1stOrdMomDict:
noiseCorrEOMdict[sym * key] = sympy.expand(sym * EOM_1stOrdMomDict[key])
M_1 = Function('M_1')
M_2 = Function('M_2')
eta_SV1 = sympy.Symbol(f"eta_{self._stateVariable1}")
cVar1 = sympy.symbols('cVar1')
if self._stateVariable2:
eta_SV2 = sympy.Symbol(f"eta_{self._stateVariable2}")
cVar2, cVar3, cVar4 = sympy.symbols('cVar2 cVar3 cVar4')
if self._stateVariable3:
eta_SV3 = sympy.Symbol(f"eta_{self._stateVariable3}")
cVar5, cVar6, cVar7, cVar8, cVar9 = sympy.symbols('cVar5 cVar6 cVar7 cVar8 cVar9')
cVarSubdict = {}
if len(time_depend_noise) == 1:
cVarSubdict[eta_SV1 * M_1(eta_SV1)] = cVar1
# auto-correlation
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV1)])
elif len(time_depend_noise) == 2:
cVarSubdict[eta_SV1 * M_1(eta_SV1)] = cVar1
cVarSubdict[eta_SV2 * M_1(eta_SV2)] = cVar2
cVarSubdict[eta_SV1 * M_1(eta_SV2)] = cVar3
cVarSubdict[eta_SV2 * M_1(eta_SV1)] = cVar4
# auto-correlation
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV1)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV2 * M_1(eta_SV2)])
# cross-correlation
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV2)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV2 * M_1(eta_SV1)])
elif len(time_depend_noise) == 3:
cVarSubdict[eta_SV1 * M_1(eta_SV1)] = cVar1
cVarSubdict[eta_SV2 * M_1(eta_SV2)] = cVar2
cVarSubdict[eta_SV3 * M_1(eta_SV3)] = cVar3
cVarSubdict[eta_SV1 * M_1(eta_SV2)] = cVar4
cVarSubdict[eta_SV2 * M_1(eta_SV1)] = cVar5
cVarSubdict[eta_SV1 * M_1(eta_SV3)] = cVar6
cVarSubdict[eta_SV3 * M_1(eta_SV1)] = cVar7
cVarSubdict[eta_SV2 * M_1(eta_SV3)] = cVar8
cVarSubdict[eta_SV3 * M_1(eta_SV2)] = cVar9
# auto-correlation
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV1)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV2 * M_1(eta_SV2)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV3 * M_1(eta_SV3)])
# cross-correlation
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV2)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV2 * M_1(eta_SV1)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV1 * M_1(eta_SV3)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV3 * M_1(eta_SV1)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV2 * M_1(eta_SV3)])
noiseCorrEOM.append(noiseCorrEOMdict[eta_SV3 * M_1(eta_SV2)])
else:
self._showErrorMessage('Only implemented for 2 or 3 time-dependent noise variables.')
for kk in range(len(noiseCorrEOM)):
noiseCorrEOM[kk] = noiseCorrEOM[kk].subs(cVarSubdict)
def noiseODEsys(yin, t):
dydt = copy.deepcopy(noiseCorrEOM)
for kk in range(len(dydt)):
if self._stateVariable3:
dydt[kk] = dydt[kk].subs({cVar1: yin[0], cVar2: yin[1], cVar3: yin[2], cVar4: yin[3],
cVar5: yin[4], cVar6: yin[5], cVar7: yin[6], cVar8: yin[7], cVar9: yin[8]})
elif self._stateVariable2:
dydt[kk] = dydt[kk].subs({cVar1: yin[0], cVar2: yin[1], cVar3: yin[2], cVar4: yin[3]})
else:
dydt[kk] = dydt[kk].subs({cVar1: yin[0]})
dydt[kk] = dydt[kk].evalf()
return dydt
NrDP = int(self._maxTime / self._tstep) + 1
time = np.linspace(0, self._maxTime, NrDP)
if len(SOL_2ndOrdMomDict) > 0:
if self._stateVariable3:
y0 = [SOL_2ndOrdMomDict[M_2(eta_SV1**2)], SOL_2ndOrdMomDict[M_2(eta_SV2**2)], SOL_2ndOrdMomDict[M_2(eta_SV3**2)],
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)], SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)],
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV3)], SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV3)],
SOL_2ndOrdMomDict[M_2(eta_SV2 * eta_SV3)], SOL_2ndOrdMomDict[M_2(eta_SV2 * eta_SV3)]]
elif self._stateVariable2:
y0 = [SOL_2ndOrdMomDict[M_2(eta_SV1**2)], SOL_2ndOrdMomDict[M_2(eta_SV2**2)],
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)], SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)]]
else:
y0 = [SOL_2ndOrdMomDict[M_2(eta_SV1**2)]]
else:
self._showErrorMessage('Could not compute Second Order Moments. '
'Could not generate figure. '
'Try different initial conditions in the Advanced options tab! ')
return None
sol_ODE = odeint(noiseODEsys, y0, time) # sol_ODE overwritten
x_data = [time for kk in range(len(y0))]
y_data = [sol_ODE[:, kk] for kk in range(len(y0))]
noiseNorm = systemSize.subs(argDict)
noiseNorm = sympy.N(noiseNorm)
for nn in range(len(y_data)):
y_temp = np.copy(y_data[nn])
for kk in range(len(y_temp)):
y_temp[kk] = y_temp[kk] / noiseNorm
y_data[nn] = y_temp
if self._stateVariable3:
c_labels = [r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV1) + '(0)' + '>$',
r'$<' + latex(eta_SV2) + '(t)' + latex(eta_SV2) + '(0)' + '>$',
r'$<' + latex(eta_SV3) + '(t)' + latex(eta_SV3) + '(0)' + '>$',
r'$<' + latex(eta_SV2) + '(t)' + latex(eta_SV1) + '(0)' + '>$',
r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV2) + '(0)' + '>$',
r'$<' + latex(eta_SV3) + '(t)' + latex(eta_SV1) + '(0)' + '>$',
r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV3) + '(0)' + '>$',
r'$<' + latex(eta_SV3) + '(t)' + latex(eta_SV2) + '(0)' + '>$',
r'$<' + latex(eta_SV2) + '(t)' + latex(eta_SV3) + '(0)' + '>$']
elif self._stateVariable2:
c_labels = [r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV1) + '(0)' + '>$',
r'$<' + latex(eta_SV2) + '(t)' + latex(eta_SV2) + '(0)' + '>$',
r'$<' + latex(eta_SV2) + '(t)' + latex(eta_SV1) + '(0)' + '>$',
r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV2) + '(0)' + '>$']
else:
c_labels = [r'$<' + latex(eta_SV1) + '(t)' + latex(eta_SV1) + '(0)' + '>$']
c_labels = [utils._doubleUnderscorify(utils._greekPrependify(c_labels[jj]))
for jj in range(len(c_labels))]
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, self._maxTime]
_fig_formatting_2D(xdata=x_data, ydata=y_data, xlab=self._xlab,
ylab=self._ylab, choose_xrange=choose_xrange,
choose_yrange=self._chooseYrange,
fontsize=self._axes_font_size, curvelab=c_labels,
legend_loc=self._legend_loc, grid=True,
legend_fontsize=self._legend_fontsize)
self._show_computation_stop()
def _numericSol2ndOrdMoment(self, EOM_2ndOrdMomDict, steadyStateDict, argDict):
for sol in EOM_2ndOrdMomDict:
EOM_2ndOrdMomDict[sol] = EOM_2ndOrdMomDict[sol].subs(steadyStateDict)
EOM_2ndOrdMomDict[sol] = EOM_2ndOrdMomDict[sol].subs(argDict)
# eta_SV1 = sympy.Symbol('eta_' + str(self._stateVarList[0]))
# eta_SV2 = sympy.Symbol('eta_' + str(self._stateVarList[1]))
# M_1, M_2 = sympy.symbols('M_1 M_2')
# if len(self._stateVarList) == 3:
# eta_SV3 = sympy.Symbol('eta_' + str(self._stateVarList[2]))
M_1 = Function('M_1')
M_2 = Function('M_2')
eta_SV1 = sympy.Symbol('eta_' + str(self._stateVariable1))
if self._stateVariable2:
eta_SV2 = sympy.Symbol('eta_' + str(self._stateVariable2))
if self._stateVariable3:
eta_SV3 = sympy.Symbol('eta_' + str(self._stateVariable3))
SOL_2ndOrdMomDict = {}
EQsys2ndOrdMom = []
if self._stateVariable3:
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV2 * eta_SV2)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV3 * eta_SV3)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV3)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV2 * eta_SV3)])
solEQsys2ndOrdMom = sympy.linsolve(EQsys2ndOrdMom, [M_2(eta_SV1 * eta_SV1),
M_2(eta_SV2 * eta_SV2),
M_2(eta_SV3 * eta_SV3),
M_2(eta_SV1 * eta_SV2),
M_2(eta_SV1 * eta_SV3),
M_2(eta_SV2 * eta_SV3)])
if len(list(solEQsys2ndOrdMom)) == 0:
return SOL_2ndOrdMomDict
else:
SOL_2ndOrderMom = list(solEQsys2ndOrdMom)[0] # only one set of solutions (if any) in linear system of equations; hence index [0]
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)] = SOL_2ndOrderMom[0]
SOL_2ndOrdMomDict[M_2(eta_SV2 * eta_SV2)] = SOL_2ndOrderMom[1]
SOL_2ndOrdMomDict[M_2(eta_SV3 * eta_SV3)] = SOL_2ndOrderMom[2]
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)] = SOL_2ndOrderMom[3]
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV3)] = SOL_2ndOrderMom[4]
SOL_2ndOrdMomDict[M_2(eta_SV2 * eta_SV3)] = SOL_2ndOrderMom[5]
elif self._stateVariable2:
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV2 * eta_SV2)])
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)])
solEQsys2ndOrdMom = sympy.linsolve(EQsys2ndOrdMom, [M_2(eta_SV1 * eta_SV1),
M_2(eta_SV2 * eta_SV2),
M_2(eta_SV1 * eta_SV2)])
if len(list(solEQsys2ndOrdMom)) == 0:
return SOL_2ndOrdMomDict
else:
SOL_2ndOrderMom = list(solEQsys2ndOrdMom)[0] # only one set of solutions (if any) in linear system of equations
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)] = SOL_2ndOrderMom[0]
SOL_2ndOrdMomDict[M_2(eta_SV2 * eta_SV2)] = SOL_2ndOrderMom[1]
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV2)] = SOL_2ndOrderMom[2]
else:
EQsys2ndOrdMom.append(EOM_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)])
solEQsys2ndOrdMom = sympy.linsolve(EQsys2ndOrdMom, [M_2(eta_SV1 * eta_SV1)])
if len(list(solEQsys2ndOrdMom)) == 0:
return SOL_2ndOrdMomDict
else:
SOL_2ndOrderMom = list(solEQsys2ndOrdMom)[0] # only one set of solutions (if any) in linear system of equations
SOL_2ndOrdMomDict[M_2(eta_SV1 * eta_SV1)] = SOL_2ndOrderMom[0]
return SOL_2ndOrdMomDict
def _build_bookmark(self, includeParams: bool = True):
if not self._silent:
log_str = "bookmark = "
else:
log_str = ""
log_str += "<modelName>." + self._generatingCommand + "("
if "initialState" not in self._generatingKwargs.keys():
initState_str = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
log_str += "initialState = " + str(initState_str) + ", "
if "maxTime" not in self._generatingKwargs.keys():
log_str += "maxTime = " + str(self._maxTime) + ", "
if includeParams:
log_str += self._get_bookmarks_params() + ", "
if len(self._generatingKwargs) > 0:
for key in self._generatingKwargs:
if type(self._generatingKwargs[key]) == str:
log_str += key + " = " + "\'" + str(self._generatingKwargs[key]) + "\'" + ", "
else:
log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str += "bookmark = False"
log_str += ")"
log_str = utils._greekPrependify(log_str)
log_str = log_str.replace('\\', '\\\\')
log_str = log_str.replace('\\\\\\\\', '\\\\')
return log_str
[docs]class MuMoTfieldView(MuMoTview):
"""Field view on model.
Specialised by :class:`MuMoTvectorView` and :class:`MuMoTstreamView`.
"""
# 1st state variable (x-dimension)
_stateVariable1 = None
# 2nd state variable (y-dimension)
_stateVariable2 = None
# 3rd state variable (z-dimension)
_stateVariable3 = None
# stores fixed points
_FixedPoints = None
# stores 2nd Order moments of noise-noise correlations
_SOL_2ndOrdMomDict = None
# X ordinates array
_X = None
# Y ordinates array
_Y = None
# Z ordinates array
_Z = None
# X derivatives array
_X = None
# Y derivatives array
_Y = None
# Z derivatives array
_Z = None
# speed array
_speed = None
# class-global dictionary of memoised masks with (mesh size, dimension) as key
_mask = {}
# z-label
_zlab = None
# flag to run SSA simulations to compute noise ellipse
_showSSANoise = None
# flag to show Noise
_showNoise = None
# fixed points for logs
_realEQsol = None
# eigenvalues for logs
_EV = None
# eigenvectors for logs
_Evects = None
# random seed (for computing SSA noise)
_randomSeed = None
# simulation length (for computing SSA noise)
_maxTime = None
# reactants to display on the two axes
# _finalViewAxes = None
# flag to plot proportions or full populations
_plotProportions = None
# number of runs to execute (for computing SSA noise)
_runs = None
# flag to set if the results from multimple runs must be aggregated or not (for computing SSA noise)
_aggregateResults = None
[docs] def __init__(self, model, controller, fieldParams, SOL_2ndOrd,
stateVariable1, stateVariable2=None, stateVariable3=None,
figure=None, params=None, **kwargs):
if model._systemSize is None and model._constantSystemSize:
print("Cannot construct field-based plot until system size is set, using substitute()")
return
self._silent = kwargs.get('silent', False)
super().__init__(model=model, controller=controller, figure=figure, params=params, **kwargs)
with io.capture_output() as log:
self._showFixedPoints = kwargs.get('showFixedPoints', False)
self._xlab = r'' + kwargs.get('xlab', r'$' + r'\Phi_{' + utils._doubleUnderscorify(utils._greekPrependify(str(stateVariable1))) + '}$')
if stateVariable2:
self._ylab = r'' + kwargs.get('ylab', r'$' + r'\Phi_{' + utils._doubleUnderscorify(utils._greekPrependify(str(stateVariable2))) + '}$')
if stateVariable3:
self._zlab = r'' + kwargs.get('zlab', r'$' + r'\Phi_{' + utils._doubleUnderscorify(utils._greekPrependify(str(stateVariable3))) + '}$')
self._stateVariable1 = parse_latex(stateVariable1)
if stateVariable2 is not None:
self._stateVariable2 = parse_latex(stateVariable2)
if stateVariable3 is not None:
self._axes3d = True
self._stateVariable3 = parse_latex(stateVariable3)
_mask = {}
self._SOL_2ndOrdMomDict = SOL_2ndOrd
self._showNoise = kwargs.get('showNoise', False)
if self._showNoise and self._SOL_2ndOrdMomDict is None and self._stateVariable3 is None:
self._showSSANoise = True
else:
self._showSSANoise = False
if stateVariable3 is not None:
self._chooseXrange = None
self._chooseYrange = None
if self._controller is None:
# storing all values of MA-specific parameters
self._maxTime = fieldParams["maxTime"]
self._randomSeed = fieldParams["randomSeed"]
# final_x = str(parse_latex(fieldParams.get("final_x", latex(sorted(self._mumotModel._getAllReactants()[0], key=str)[0]))))
# final_y = str(parse_latex(fieldParams.get("final_y", latex(sorted(self._mumotModel._getAllReactants()[0], key=str)[0]))))
# self._finalViewAxes = (final_x, final_y)
# self._plotProportions = fieldParams["plotProportions"]
self._runs = fieldParams.get('runs', 20)
self._aggregateResults = fieldParams.get('aggregateResults', True)
else:
# storing fixed params
for key, value in fieldParams.items():
if value[-1]:
self._fixedParams[key] = value[0]
self._update_field_params_wdgt()
self._logs.append(log)
if not self._silent:
self._plot_field()
# @todo: this function will be useful to implement a solution to issue #282
def _update_field_params_wdgt(self):
"""Update the widgets related to the _showSSANoise
(it cannot be a :class:`MuMoTcontroller` method because with multi-controller it needs to point to the right ``_controller``)
"""
if self._showSSANoise:
if self._controller._advancedTabWidget is not None:
self._controller._advancedTabWidget.layout.display = 'flex'
if self._controller._widgetsExtraParams.get('maxTime') is not None:
self._controller._widgetsExtraParams['maxTime'].layout.display = 'flex'
if self._controller._widgetsExtraParams.get('maxTime') is not None:
self._controller._widgetsExtraParams['maxTime'].layout.display = 'flex'
if self._controller._widgetsExtraParams.get('randomSeed') is not None:
self._controller._widgetsExtraParams['randomSeed'].layout.display = 'flex'
if self._controller._widgetsExtraParams.get('runs') is not None:
self._controller._widgetsExtraParams['runs'].layout.display = 'flex'
if self._controller._widgetsExtraParams.get('aggregateResults') is not None:
self._controller._widgetsExtraParams['aggregateResults'].layout.display = 'flex'
else:
if self._controller._widgetsExtraParams.get('maxTime') is not None:
self._controller._widgetsExtraParams['maxTime'].layout.display = 'none'
if self._controller._widgetsExtraParams.get('randomSeed') is not None:
self._controller._widgetsExtraParams['randomSeed'].layout.display = 'none'
if self._controller._widgetsExtraParams.get('runs') is not None:
self._controller._widgetsExtraParams['runs'].layout.display = 'none'
if self._controller._widgetsExtraParams.get('aggregateResults') is not None:
self._controller._widgetsExtraParams['aggregateResults'].layout.display = 'none'
if self._controller._advancedTabWidget is not None:
self._controller._advancedTabWidget.layout.display = 'none'
def _update_view_specific_params(self, freeParamDict=None):
"""Getting other parameters specific to the Field view."""
if freeParamDict is None:
freeParamDict = {}
if self._controller is not None:
self._randomSeed = self._getWidgetParamValue('randomSeed', self._controller._widgetsExtraParams) # self._fixedParams['randomSeed'] if self._fixedParams.get('randomSeed') is not None else self._controller._widgetsExtraParams['randomSeed'].value
# self._finalViewAxes = (self._getWidgetParamValue('final_x', self._controller._widgetsPlotOnly), self._getWidgetParamValue('final_y', self._controller._widgetsPlotOnly))
# self._finalViewAxes = (self._fixedParams['final_x'] if self._fixedParams.get('final_x') is not None else self._controller._widgetsPlotOnly['final_x'].value, self._fixedParams['final_y'] if self._fixedParams.get('final_y') is not None else self._controller._widgetsPlotOnly['final_y'].value)
# self._plotProportions = self._getWidgetParamValue('plotProportions', self._controller._widgetsPlotOnly) # self._fixedParams['plotProportions'] if self._fixedParams.get('plotProportions') is not None else self._controller._widgetsPlotOnly['plotProportions'].value
self._maxTime = self._getWidgetParamValue('maxTime', self._controller._widgetsExtraParams) # self._fixedParams['maxTime'] if self._fixedParams.get('maxTime') is not None else self._controller._widgetsExtraParams['maxTime'].value
self._runs = self._getWidgetParamValue('runs', self._controller._widgetsExtraParams) # self._fixedParams['runs'] if self._fixedParams.get('runs') is not None else self._controller._widgetsExtraParams['runs'].value
self._aggregateResults = self._getWidgetParamValue('aggregateResults', self._controller._widgetsExtraParams) # self._fixedParams['aggregateResults'] if self._fixedParams.get('aggregateResults') is not None else self._controller._widgetsPlotOnly['aggregateResults'].value
def _build_bookmark(self, includeParams: bool = True) -> str:
log_str = "bookmark = " if not self._silent else ""
log_str += "<modelName>." + self._generatingCommand + "('" + str(self._stateVariable1) + "', '" + str(self._stateVariable2) + "', "
if self._stateVariable3 is not None:
log_str += "'" + str(self._stateVariable3) + "', "
# todo: plotting parameters are not kept, this could be solved with
# some work on the _generatingKwargs and should be made general for all
# views (similar to _get_bookmarks_params() )
# if includeParams:
# log_str += self._get_bookmarks_params() + ", "
# if len(self._generatingKwargs) > 0:
# for key in self._generatingKwargs:
# if key == 'xlab' or key == 'ylab' or key == 'zlab':
# log_str += key + " = '" + str(self._generatingKwargs[key]) + "', "
# else:
# log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
if includeParams:
log_str += self._get_bookmarks_params()
log_str += ", "
log_str = log_str.replace('\\', '\\\\')
log_str += "showNoise = " + str(self._showNoise)
log_str += ", showFixedPoints = " + str(self._showFixedPoints)
log_str += ", runs = " + str(self._runs)
log_str += ", maxTime = " + str(self._maxTime)
log_str += ", randomSeed = " + str(self._randomSeed)
# todo: Following commented lines are ready to implement issue #95
# if self._visualisationType == 'final':
# # these loops are necessary to return the latex() format of the reactant
# for reactant in self._mumotModel._getAllReactants()[0]:
# if str(reactant) == self._finalViewAxes[0]:
# log_str += ", final_x = '" + latex(reactant).replace('\\', '\\\\') + "'"
# break
# for reactant in self._mumotModel._getAllReactants()[0]:
# if str(reactant) == self._finalViewAxes[1]:
# log_str += ", final_y = '" + latex(reactant).replace('\\', '\\\\') + "'"
# break
# log_str += ", plotProportions = " + str(self._plotProportions) todo: ready to implement issue #283
log_str += ", aggregateResults = " + str(self._aggregateResults)
log_str += ", silent = " + str(self._silent)
log_str += ", bookmark = False"
log_str += ")"
log_str = utils._greekPrependify(log_str)
log_str = log_str.replace('\\', '\\\\')
log_str = log_str.replace('\\\\\\\\', '\\\\')
return log_str
def _plot_field(self) -> None:
self._update_params()
self._show_computation_start()
if not(self._silent): # @todo is this necessary?
plt.figure(self._figureNum)
plt.clf()
self._resetErrorMessage()
self._showErrorMessage(str(self))
realEQsol = None
EV = None
Evects = None
if self._stateVariable2 is None:
if self._showNoise:
print("Please note: Currently 'showNoise' only available for 2D stream and vector plots.")
if self._showFixedPoints:
FixedPoints = [[], []]
realEQsol, eigList = self._get_fixedPoints1d()
for kk in range(len(realEQsol)):
for val1, key2 in zip(realEQsol[kk].values(), eigList[kk].keys()):
FixedPoints[0].append(val1)
FixedPoints[1].append(key2)
else:
FixedPoints = None
self._FixedPoints = FixedPoints
elif self._stateVariable3 is None:
if self._showFixedPoints or self._SOL_2ndOrdMomDict is not None or self._showSSANoise:
Phi_stateVar1 = sympy.Symbol('Phi_' + str(self._stateVariable1))
Phi_stateVar2 = sympy.Symbol('Phi_' + str(self._stateVariable2))
eta_stateVar1 = sympy.Symbol('eta_' + str(self._stateVariable1))
eta_stateVar2 = sympy.Symbol('eta_' + str(self._stateVariable2))
M_2 = sympy.Function('M_2')
systemSize = sympy.Symbol('systemSize')
argDict_tmp = self._get_argDict()
# Mutate key names of argDict so that they indicate concentrations via prefix Phi
argDict = {}
for key, val in argDict_tmp.items():
key_phi = sympy.Symbol(f"Phi_{key}") if key in self._mumotModel._constantReactants else key
argDict[key_phi] = val
realEQsol, eigList = self._get_fixedPoints2d()
PhiSubList = []
for kk in range(len(realEQsol)):
PhiSubDict = {}
for solXi in realEQsol[kk]:
PhiSubDict[sympy.Symbol('Phi_' + str(solXi))] = realEQsol[kk][solXi]
PhiSubList.append(PhiSubDict)
Evects = []
EvectsPlot = []
EV = []
EVplot = []
for kk in range(len(eigList)):
EVsub = []
EvectsSub = []
for key in eigList[kk]:
for jj in range(len(eigList[kk][key][1])):
EvectsSub.append(eigList[kk][key][1][jj].evalf())
if eigList[kk][key][0] > 1:
for jj in range(eigList[kk][key][0]):
EVsub.append(key.evalf())
else:
EVsub.append(key.evalf())
EV.append(EVsub)
Evects.append(EvectsSub)
if self._mumotModel._constantSystemSize:
if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1 and
0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1):
EVplot.append(EVsub)
EvectsPlot.append(EvectsSub)
else:
EVplot.append(EVsub)
EvectsPlot.append(EvectsSub)
# self._EV = EV
# self._realEQsol = realEQsol
# self._Evects = Evects
if self._SOL_2ndOrdMomDict:
eta_cross = eta_stateVar1 * eta_stateVar2
for key in self._SOL_2ndOrdMomDict:
if key == self._SOL_2ndOrdMomDict[key] or key in self._SOL_2ndOrdMomDict[key].args:
for key2 in self._SOL_2ndOrdMomDict:
if key2 != key and key2 != M_2(eta_cross):
self._SOL_2ndOrdMomDict[key] = self._SOL_2ndOrdMomDict[key].subs(key, self._SOL_2ndOrdMomDict[key2])
SOL_2ndOrdMomDictList = []
for nn in range(len(PhiSubList)):
SOL_2ndOrdMomDict = copy.deepcopy(self._SOL_2ndOrdMomDict)
for sol in SOL_2ndOrdMomDict:
SOL_2ndOrdMomDict[sol] = SOL_2ndOrdMomDict[sol].subs(PhiSubList[nn])
SOL_2ndOrdMomDict[sol] = SOL_2ndOrdMomDict[sol].subs(argDict)
SOL_2ndOrdMomDictList.append(SOL_2ndOrdMomDict)
angle_ell_list = []
projection_angle_list = []
vec1 = sympy.Matrix([[0], [1]])
for nn in range(len(EvectsPlot)):
vec2 = EvectsPlot[nn][0]
vec2norm = vec2.norm()
vec2 = vec2 / vec2norm
vec3 = EvectsPlot[nn][0]
vec3norm = vec3.norm()
vec3 = vec3 / vec3norm
if vec2norm >= vec3norm:
angle_ell = sympy.acos(vec1.dot(vec2) / (vec1.norm() * vec2.norm())).evalf()
else:
angle_ell = sympy.acos(vec1.dot(vec3) / (vec1.norm() * vec3.norm())).evalf()
angle_ell = angle_ell.evalf()
projection_angle_list.append(angle_ell)
angle_ell_deg = 180 * angle_ell / (sympy.pi).evalf()
angle_ell_list.append(round(angle_ell_deg, 5))
projection_angle_list = [(abs(projection_angle_list[kk])
if abs(projection_angle_list[kk]) <= sympy.N(sympy.pi / 2)
else sympy.N(sympy.pi) - abs(projection_angle_list[kk]))
for kk in range(len(projection_angle_list))]
if self._showFixedPoints or self._SOL_2ndOrdMomDict is not None or self._showSSANoise:
if self._mumotModel._constantSystemSize:
FixedPoints = [[PhiSubList[kk][Phi_stateVar1]
for kk in range(len(PhiSubList))
if (0 <= sympy.re(PhiSubList[kk][Phi_stateVar1]) <= 1 and
0 <= sympy.re(PhiSubList[kk][Phi_stateVar2]) <= 1)],
[PhiSubList[kk][Phi_stateVar2]
for kk in range(len(PhiSubList))
if (0 <= sympy.re(PhiSubList[kk][Phi_stateVar1]) <= 1 and
0 <= sympy.re(PhiSubList[kk][Phi_stateVar2]) <= 1)]]
if self._SOL_2ndOrdMomDict:
Ell_width = [2.0 * sympy.re(sympy.cos(sympy.N(sympy.pi / 2) - projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar2**2)] / systemSize.subs(argDict)) +
sympy.sin(sympy.N(sympy.pi / 2) - projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar1**2)] / systemSize.subs(argDict)))
for kk in range(len(SOL_2ndOrdMomDictList))
if (0 <= sympy.re(PhiSubList[kk][Phi_stateVar1]) <= 1 and
0 <= sympy.re(PhiSubList[kk][Phi_stateVar2]) <= 1)]
Ell_height = [2.0 * sympy.re(sympy.cos(projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar2**2)] / systemSize.subs(argDict)) +
sympy.sin(projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar1**2)] / systemSize.subs(argDict)))
for kk in range(len(SOL_2ndOrdMomDictList))
if (0 <= sympy.re(PhiSubList[kk][Phi_stateVar1]) <= 1 and
0 <= sympy.re(PhiSubList[kk][Phi_stateVar2]) <= 1)]
# FixedPoints=[[realEQsol[kk][self._stateVariable1]
# for kk in range(len(realEQsol))
# if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1 and
# 0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1)],
# [realEQsol[kk][self._stateVariable2]
# for kk in range(len(realEQsol))
# if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1 and
# 0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1)]]
else:
FixedPoints = [[PhiSubList[kk][Phi_stateVar1]
for kk in range(len(PhiSubList))],
[PhiSubList[kk][Phi_stateVar2]
for kk in range(len(PhiSubList))]]
if self._SOL_2ndOrdMomDict:
Ell_width = [2.0 * sympy.re(sympy.cos(sympy.N(sympy.pi / 2) - projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar2**2)] / systemSize.subs(argDict)) +
sympy.sin(sympy.N(sympy.pi / 2) - projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar1**2)] / systemSize.subs(argDict)))
for kk in range(len(SOL_2ndOrdMomDictList))]
Ell_height = [2.0 * sympy.re(sympy.cos(projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar2**2)] / systemSize.subs(argDict)) +
sympy.sin(projection_angle_list[kk]) *
sympy.sqrt(SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar1**2)] / systemSize.subs(argDict)))
for kk in range(len(SOL_2ndOrdMomDictList))]
# FixedPoints=[[realEQsol[kk][self._stateVariable1] for kk in range(len(realEQsol))],
# [realEQsol[kk][self._stateVariable2] for kk in range(len(realEQsol))]]
# Ell_width = [SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar1**2)] for kk in range(len(SOL_2ndOrdMomDictList))]
# Ell_height = [SOL_2ndOrdMomDictList[kk][M_2(eta_stateVar2**2)] for kk in range(len(SOL_2ndOrdMomDictList))]
FixedPoints.append(EVplot)
else:
FixedPoints = None
self._FixedPoints = FixedPoints
skipEllipse = False
if self._SOL_2ndOrdMomDict:
for kk in range(len(Ell_width)):
if Ell_width[kk] == sympy.nan or Ell_width[kk] == 0 or Ell_height[kk] == sympy.nan or Ell_height[kk] == 0:
skipEllipse = True
self._showErrorMessage('Noise could not be calculated analytically. ')
break
if self._SOL_2ndOrdMomDict and skipEllipse is False:
# swap width and height of ellipse if width > height
for kk in range(len(Ell_width)):
# print(Ell_width[kk])
# print(type(Ell_width[kk]))
# print(Ell_height[kk])
# print(type(Ell_height[kk]))
ell_width_temp = Ell_width[kk]
ell_height_temp = Ell_height[kk]
if ell_width_temp > ell_height_temp:
Ell_height[kk] = ell_width_temp
Ell_width[kk] = ell_height_temp
ells = [mpatch.Ellipse(xy=[self._FixedPoints[0][nn],
self._FixedPoints[1][nn]],
width=Ell_width[nn] / systemSize.subs(argDict),
height=Ell_height[nn] / systemSize.subs(argDict),
angle=round(angle_ell_list[nn], 5))
for nn in range(len(self._FixedPoints[0]))]
ax = plt.gca()
for kk in range(len(ells)):
ax.add_artist(ells[kk])
ells[kk].set_alpha(0.5)
if sympy.re(EVplot[kk][0]) < 0 and sympy.re(EVplot[kk][1]) < 0:
Fcolor = consts.LINE_COLOR_LIST[1]
elif sympy.re(EVplot[kk][0]) > 0 and sympy.re(EVplot[kk][1]) > 0:
Fcolor = consts.LINE_COLOR_LIST[2]
else:
Fcolor = consts.LINE_COLOR_LIST[0]
ells[kk].set_facecolor(Fcolor)
# self._ells = ells
else:
if self._showNoise:
self._showSSANoise = True
if self._showSSANoise:
# print(FixedPoints)
# print(self._stateVariable1)
# print(self._stateVariable2)
# print(realEQsol)
skipList = []
for kk in range(len(realEQsol)):
# print("printing ellipse for point " + str(realEQsol[kk]) )
# skip values out of range [0,1] and unstable equilibria
skip = False
for p in realEQsol[kk].values():
# if p < 0 or p > 1:
if p < 0:
skip = True
# print("Skipping for out range")
break
for eigenV in EV[kk]:
# skip if no stable fixed points detected
if sympy.re(eigenV) >= 0:
skip = True
# print("Skipping for positive eigenvalue")
break
if skip:
break
if skip:
skipList.append('skip')
continue
# Generate proper init reactant list
initState = copy.deepcopy(realEQsol[kk])
for reactant in self._mumotModel._getAllReactants()[0]:
if reactant not in initState.keys():
# initState[reactant] = 1 - sum(initState.values())
iniSum = 0
for val in initState.values():
iniSum += np.real(val)
initState[reactant] = 1 - iniSum
# print(initState)
# print(f"Using params: {self._get_params()}")
getParams = []
for set_item in self._get_params():
getParams.append((utils._greekPrependify(set_item[0].replace('{', '').replace('}', '')), set_item[1]))
SSAView = MuMoTSSAView(self._mumotModel,
None,
params=getParams,
SSParams={'maxTime': self._maxTime,
'runs': self._runs,
'realtimePlot': False,
'plotProportions': True,
'aggregateResults': self._aggregateResults,
'visualisationType': 'final',
'final_x': utils._greekPrependify(latex(self._stateVariable1)),
'final_y': utils._greekPrependify(latex(self._stateVariable2)),
'initialState': initState,
'randomSeed': self._randomSeed},
silent=True)
# print(SSAView._printStandaloneViewCmd())
SSAView._figure = self._figure
SSAView._computeAndPlotSimulation()
if len(realEQsol) == len(skipList):
self._showErrorMessage('No stable fixed points detected. Noise could not be calculated numerically.')
else:
if self._showNoise:
print("Please note: Currently 'showNoise' only available for 2D stream and vector plots.")
if self._showFixedPoints:
realEQsol, eigList = self._get_fixedPoints3d()
EV = []
EVplot = []
for kk in range(len(eigList)):
EVsub = []
for key in eigList[kk]:
if eigList[kk][key][0] > 1:
for jj in range(eigList[kk][key][0]):
EVsub.append(key.evalf())
else:
EVsub.append(key.evalf())
EV.append(EVsub)
if self._mumotModel._constantSystemSize:
if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1 and
0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1 and
0 <= sympy.re(realEQsol[kk][self._stateVariable3]) <= 1):
EVplot.append(EVsub)
else:
EVplot.append(EVsub)
if self._mumotModel._constantSystemSize:
FixedPoints = [[realEQsol[kk][self._stateVariable1]
for kk in range(len(realEQsol))
if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable3]) <= 1)],
[realEQsol[kk][self._stateVariable2]
for kk in range(len(realEQsol))
if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable3]) <= 1)],
[realEQsol[kk][self._stateVariable3]
for kk in range(len(realEQsol))
if (0 <= sympy.re(realEQsol[kk][self._stateVariable1]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable2]) <= 1) and
(0 <= sympy.re(realEQsol[kk][self._stateVariable3]) <= 1)]]
else:
FixedPoints = [[realEQsol[kk][self._stateVariable1] for kk in range(len(realEQsol))],
[realEQsol[kk][self._stateVariable2] for kk in range(len(realEQsol))],
[realEQsol[kk][self._stateVariable3] for kk in range(len(realEQsol))]]
FixedPoints.append(EVplot)
# with io.capture_output() as log:
# for kk in range(len(realEQsol)):
# print('Fixed point' + str(kk + 1) + ':', realEQsol[kk], 'with eigenvalues:', str(EV[kk]))
# self._logs.append(log)
else:
FixedPoints = None
self._FixedPoints = FixedPoints
self._realEQsol = realEQsol
self._EV = EV
self._Evects = Evects
self._show_computation_stop()
def _get_field(self):
"""Helper for _get_field_2d() and _get_field_3d()."""
plotLimits = self._getPlotLimits()
# paramNames = []
# paramValues = []
# if self._controller is not None:
# for name, value in self._controller._widgetsFreeParams.items():
# # throw away formatting for constant reactants
# # name = name.replace('(','')
# # name = name.replace(')','')
# paramNames.append(name)
# paramValues.append(value.value)
# if self._paramNames is not None:
# paramNames += map(str, self._paramNames)
# paramValues += self._paramValues
# argNamesSymb = list(map(sympy.Symbol, paramNames))
# argDict = dict(zip(argNamesSymb, paramValues))
argDict = self._get_argDict()
funcs = self._mumotModel._getFuncs()
return (funcs, argDict, plotLimits)
def _get_field1d(self, kind, meshPoints, plotLimits=1):
"""Get 1-dimensional field for plotting."""
(funcs, argDict, plotLimits) = self._get_field()
self._X = np.mgrid[0:plotLimits:complex(0, meshPoints)]
if self._mumotModel._constantSystemSize:
mask = self._mask.get((meshPoints, 2))
if mask is None:
mask = np.zeros(self._X.shape, dtype=bool)
upperright = np.triu_indices(meshPoints, m=1) # @todo: allow user to set mesh points with keyword
mask[upperright[0]] = True
mask = np.flipud(mask)
self._mask[(meshPoints, 2)] = mask
self._Xdot = funcs[self._stateVariable1](*self._mumotModel._getArgTuple1d(argDict, self._stateVariable1, self._X))
try:
# self._speed = np.log(self._Xdot)
# if np.isnan(self._speed).any():
self._speed = None
except:
self._speed = None
if self._mumotModel._constantSystemSize:
self._Xdot = np.ma.array(self._Xdot, mask=mask)
def _get_field2d(self, kind, meshPoints, plotLimits=1):
"""Gget 2-dimensional field for plotting."""
with io.capture_output() as log:
self._log(kind)
(funcs, argDict, plotLimits) = self._get_field()
self._Y, self._X = np.mgrid[0:plotLimits:complex(0, meshPoints), 0:plotLimits:complex(0, meshPoints)]
if self._mumotModel._constantSystemSize:
mask = self._mask.get((meshPoints, 2))
if mask is None:
mask = np.zeros(self._X.shape, dtype=bool)
upperright = np.triu_indices(meshPoints) # @todo: allow user to set mesh points with keyword
mask[upperright] = True
np.fill_diagonal(mask, False)
mask = np.flipud(mask)
self._mask[(meshPoints, 2)] = mask
self._Xdot = funcs[self._stateVariable1](*self._mumotModel._getArgTuple2d(argDict, self._stateVariable1, self._stateVariable2, self._X, self._Y))
self._Ydot = funcs[self._stateVariable2](*self._mumotModel._getArgTuple2d(argDict, self._stateVariable1, self._stateVariable2, self._X, self._Y))
try:
self._speed = np.log(np.sqrt(self._Xdot ** 2 + self._Ydot ** 2))
if np.isnan(self._speed).any():
self._speed = None
except:
# self._speed = np.ones(self._X.shape, dtype=float)
self._speed = None
if self._mumotModel._constantSystemSize:
self._Xdot = np.ma.array(self._Xdot, mask=mask)
self._Ydot = np.ma.array(self._Ydot, mask=mask)
# if len(self._logs) > 0:
# self._logs.insert(0, log)
# else:
self._logs.append(log)
# get 3-dimensional field for plotting
def _get_field3d(self, kind, meshPoints, plotLimits=1):
with io.capture_output() as log:
self._log(kind)
(funcs, argDict, plotLimits) = self._get_field()
self._Z, self._Y, self._X = np.mgrid[0:plotLimits:complex(0, meshPoints),
0:plotLimits:complex(0, meshPoints),
0:plotLimits:complex(0, meshPoints)]
if self._mumotModel._constantSystemSize:
mask = self._mask.get((meshPoints, 3))
if mask is None:
# mask = np.zeros(self._X.shape, dtype=bool)
# upperright = np.triu_indices(meshPoints) # @todo: allow user to set mesh points with keyword
# mask[upperright] = True
# np.fill_diagonal(mask, False)
# mask = np.flipud(mask)
mask = self._X + self._Y + self._Z >= 1
# mask = mask.astype(int)
self._mask[(meshPoints, 3)] = mask
self._Xdot = funcs[self._stateVariable1](*self._mumotModel._getArgTuple3d(argDict, self._stateVariable1, self._stateVariable2, self._stateVariable3, self._X, self._Y, self._Z))
self._Ydot = funcs[self._stateVariable2](*self._mumotModel._getArgTuple3d(argDict, self._stateVariable1, self._stateVariable2, self._stateVariable3, self._X, self._Y, self._Z))
self._Zdot = funcs[self._stateVariable3](*self._mumotModel._getArgTuple3d(argDict, self._stateVariable1, self._stateVariable2, self._stateVariable3, self._X, self._Y, self._Z))
try:
self._speed = np.log(np.sqrt(self._Xdot ** 2 + self._Ydot ** 2 + self._Zdot ** 2))
except:
self._speed = None
# self._Xdot = self._Xdot * mask
# self._Ydot = self._Ydot * mask
# self._Zdot = self._Zdot * mask
if self._mumotModel._constantSystemSize:
self._Xdot = np.ma.array(self._Xdot, mask=mask)
self._Ydot = np.ma.array(self._Ydot, mask=mask)
self._Zdot = np.ma.array(self._Zdot, mask=mask)
# if len(self._logs) > 0:
# self._logs.insert(0, log)
# else:
self._logs.append(log)
def _appendFixedPointsToLogs(self, realEQsol, EV, Evects):
if realEQsol is not None:
for kk in range(len(realEQsol)):
realEQsolStr = ""
for key, val in realEQsol[kk].items():
realEQsolStr += str(key) + ' = ' + str(_roundNumLogsOut(val)) + ', '
evalString = ""
for val in EV[kk]:
evalString += str(_roundNumLogsOut(val)) + ', '
if Evects is None:
print('Fixed point' + str(kk + 1) + ': ', realEQsolStr, 'with eigenvalues: ', evalString)
else:
evecString = ""
for matrix in Evects[kk]:
evecStringPart = "["
for entry in matrix.col(0):
if evecStringPart == "[":
evecStringPart += str(_roundNumLogsOut(entry))
else:
evecStringPart += ', ' + str(_roundNumLogsOut(entry))
evecStringPart += "]"
if evecString == "":
evecString += evecStringPart
else:
evecString += ' and ' + evecStringPart
print('Fixed point' + str(kk + 1) + ': ', realEQsolStr, 'with eigenvalues: ', evalString,
'and eigenvectors: ', evecString)
[docs]class MuMoTvectorView(MuMoTfieldView):
"""Vector plot view on model."""
# dictionary containing the solutions of the second order noise moments in the stationary state
_SOL_2ndOrdMomDict = None
# set of all reactants
_checkReactants = None
# set of all constant reactants to get intersection with _checkReactants
_checkConstReactants = None
[docs] def __init__(self, model, controller, fieldParams, SOL_2ndOrd, stateVariable1, stateVariable2, stateVariable3=None, figure=None, params=None, **kwargs):
# if model._systemSize is None and model._constantSystemSize:
# self._showErrorMessage("Cannot construct field-based plot until system size is set, using substitute()")
# return
# if self._SOL_2ndOrdMomDict is None:
# self._showErrorMessage('Noise in the system could not be calculated: \'showNoise\' automatically disabled.')
super().__init__(model=model, controller=controller, fieldParams=fieldParams, SOL_2ndOrd=SOL_2ndOrd, stateVariable1=stateVariable1, stateVariable2=stateVariable2, stateVariable3=stateVariable3, figure=figure, params=params, **kwargs)
self._generatingCommand = "vector"
def _plot_field(self, _=None):
super()._plot_field()
if self._stateVariable3 is None:
self._get_field2d("2d vector plot", 10) # @todo: allow user to set mesh points with keyword
fig_vector = plt.quiver(self._X, self._Y, self._Xdot, self._Ydot, units='width', color='black') # @todo: define colormap by user keyword
if self._mumotModel._constantSystemSize:
plt.fill_between([0, 1], [1, 0], [1, 1], color='grey', alpha=0.25)
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, 1]
if self._chooseYrange:
choose_yrange = self._chooseYrange
else:
choose_yrange = [0, 1]
else:
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, self._X.max()]
if self._chooseYrange:
choose_yrange = self._chooseYrange
else:
choose_yrange = [0, self._Y.max()]
# plt.xlim(0,self._X.max())
# plt.ylim(0,self._Y.max())
_fig_formatting_2D(figure=fig_vector, xlab=self._xlab,
specialPoints=self._FixedPoints,
showFixedPoints=self._showFixedPoints,
ax_reformat=False, curve_replot=False,
ylab=self._ylab, fontsize=self._axes_font_size,
aspectRatioEqual=True,
choose_xrange=choose_xrange,
choose_yrange=choose_yrange)
else:
self._get_field3d("3d vector plot", 10)
ax = self._figure.gca(projection='3d')
# @todo: define colormap by user keyword; normalise off maximum value
# in self._speed, and meshpoints?
fig_vec3d = ax.quiver(self._X, self._Y, self._Z, self._Xdot,
self._Ydot, self._Zdot, length=0.01, color='black')
_fig_formatting_3D(figure=fig_vec3d, xlab=self._xlab,
ylab=self._ylab, zlab=self._zlab,
specialPoints=self._FixedPoints,
showFixedPoints=self._showFixedPoints,
ax_reformat=True,
showPlane=self._mumotModel._constantSystemSize,
fontsize=self._axes_font_size)
with io.capture_output() as log:
self._appendFixedPointsToLogs(self._realEQsol, self._EV, self._Evects)
self._logs.append(log)
[docs]class MuMoTstreamView(MuMoTfieldView):
"""Stream plot view on model."""
# dictionary containing the solutions of the second order noise moments in the stationary state
_SOL_2ndOrdMomDict = None
# set of all reactants
_checkReactants = None
# set of all constant reactants to get intersection with _checkReactants
_checkConstReactants = None
# sets time of integration for 3d streams
maxTime = None
# time of integration for 3d streams
_integrationTime = None
# set number of 3d streams
setNumPoints = None
# number of 3d streams
_numPoints = None
[docs] def __init__(self, model, controller, fieldParams, SOL_2ndOrd,
stateVariable1, stateVariable2, stateVariable3=None,
figure=None, params=None, **kwargs):
# if model._systemSize is None and model._constantSystemSize:
# self._showErrorMessage("Cannot construct field-based plot until system size is set, using substitute()")
# return
# if self._SOL_2ndOrdMomDict is None:
# self._showErrorMessage('Noise in the system could not be calculated: \'showNoise\' automatically disabled.')
# These might be better somewhere else?
self._numPoints = kwargs.get('setNumPoints', 30)
self._integrationTime = kwargs.get('maxTime', 1.0)
self._checkReactants = model._reactants
if model._constantReactants:
self._checkConstReactants = model._constantReactants
else:
self._checkConstReactants = None
super().__init__(model=model, controller=controller,
fieldParams=fieldParams, SOL_2ndOrd=SOL_2ndOrd,
stateVariable1=stateVariable1, stateVariable2=stateVariable2,
stateVariable3=stateVariable3, figure=figure, params=params,
**kwargs)
self._generatingCommand = "stream"
def _plot_field(self, _=None):
# check number of time-dependent reactants
checkReactants = copy.deepcopy(self._checkReactants)
if self._checkConstReactants:
checkConstReactants = copy.deepcopy(self._checkConstReactants)
for reactant in checkReactants:
if reactant in checkConstReactants:
checkReactants.remove(reactant)
if len(checkReactants) > 3:
self._showErrorMessage("Not implemented: This feature is available only for systems with 1,2 or 3 time-dependent reactants!")
super()._plot_field()
# if model has 1 dimension
if self._stateVariable2 is None:
_mesh_points = 100
self._get_field1d("1d stream plot", _mesh_points)
# Since plot is 2d, need every element of x-data to be plotted against 0
ys = np.zeros(self._X.shape)
# length of negative section of stream
_neg_length = 0
# length of positive section of stream
_pos_length = 0
# point along line where sign changes
_sign_change_point = 0.0
# index where sign changes
_sign_change_index = 0
# represents whether previous point was positive or negative
_prev_point = 0
# has the sign of the point changed from the previous point
_sign_change = False
for i in range(self._X.shape[0]):
_Xdot_temp = np.absolute(self._Xdot[i]) # used for line shading
if self._Xdot[i] < 0:
# if this is the first point, set prev point to -1
if (_prev_point == 0):
_prev_point = -1
# if previous _Xdot value was > 0, the sign has changed
elif (_prev_point == 1):
_prev_point = -1
_sign_change = True
# plot line segment, with color based on absolute value of _Xdot
fig_stream_1d = plt.plot(self._X[i:i + 2], ys[i:i + 2],
color=plt.cm.Reds(_Xdot_temp))
_neg_length += 1
elif self._Xdot[i] >= 0:
# if this is the first point, set prev point to 1
if (_prev_point == 0):
_prev_point = 1
# if previous _Xdot value was < 0, the sign has changed
elif (_prev_point == -1):
_prev_point = 1
_sign_change = True
# plot line segment, with color based on absolute value of _Xdot
fig_stream_1d = plt.plot(self._X[i:i + 2], ys[i:i + 2], color=plt.cm.Blues(_Xdot_temp))
_pos_length += 1
# if sign has changed, record where sign changed
if _sign_change:
_sign_change_index = i
_sign_change_point = self._X[i]
_sign_change = False
# These values are used to plot each arrows start point
_neg_arrow_start = 0.0
_pos_arrow_start = 0.0
# These values are used for the color of each arrow so it matches the color of the point of line its on
_neg_arrow_color = 0.0
_pos_arrow_color = 0.0
# Currently only works if there is one fixed point within the range 0 - 1
# If _prev_point == 1, then the last point was positive and so the pos arrow goes on the right
if (_prev_point == 1):
_pos_arrow_start = _sign_change_point + (_pos_length / (2 * _mesh_points))
_neg_arrow_start = (_neg_length / (2 * _mesh_points))
_pos_arrow_color = self._Xdot[_sign_change_index + int(_pos_length / 2)]
_neg_arrow_color = self._Xdot[int(_neg_length / 2)]
# If _prev_point == -1, then the last point was negative and so the neg arrow goes on the right
else:
_neg_arrow_start = _sign_change_point + (_neg_length / (2 * _mesh_points))
_pos_arrow_start = (_pos_length / (2 * _mesh_points))
_neg_arrow_color = self._Xdot[_sign_change_index + int(_neg_length / 2)]
_pos_arrow_color = self._Xdot[int(_pos_length / 2)]
# Need to get absolute values so color value isn't negative
_neg_arrow_color = np.absolute(_neg_arrow_color)
_pos_arrow_color = np.absolute(_pos_arrow_color)
# if either length segment is zero, no arrow will be plotted
if _neg_length != 0:
plt.arrow(_neg_arrow_start + 0.05, 0, -0.01, 0,
width=0.000001, head_width=0.04, head_length=0.025,
color=plt.cm.Reds(_neg_arrow_color))
if _pos_length != 0:
plt.arrow(_pos_arrow_start - 0.05, 0, 0.01, 0,
width=0.000001, head_width=0.04, head_length=0.025,
color=plt.cm.Blues(_pos_arrow_color))
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, 1]
if self._chooseYrange:
choose_yrange = self._chooseYrange
else:
choose_yrange = [-0.1, 0.1]
# Format the plot
_fig_formatting_1D(figure=fig_stream_1d,
choose_xrange=choose_xrange,
choose_yrange=choose_yrange,
showFixedPoints=self._showFixedPoints,
specialPoints=self._FixedPoints,
xlab=self._xlab)
# elif model has 2 dimensions
elif self._stateVariable3 is None:
self._get_field2d("2d stream plot", 100) # @todo: allow user to set mesh points with keyword
if self._speed is not None:
with io.capture_output() as log: # catch warnings from streamplot
# @todo: define colormap by user keyword
fig_stream = plt.streamplot(self._X, self._Y, self._Xdot,
self._Ydot, color=self._speed, cmap='gray')
self._logs.append(log)
else:
# @todo: define colormap by user keyword
fig_stream = plt.streamplot(self._X, self._Y, self._Xdot,
self._Ydot, color='k')
if self._mumotModel._constantSystemSize:
plt.fill_between([0, 1], [1, 0], [1, 1], color='grey', alpha=0.25)
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, 1]
if self._chooseYrange:
choose_yrange = self._chooseYrange
else:
choose_yrange = [0, 1]
else:
if self._chooseXrange:
choose_xrange = self._chooseXrange
else:
choose_xrange = [0, self._X.max()]
if self._chooseYrange:
choose_yrange = self._chooseYrange
else:
choose_yrange = [0, self._Y.max()]
# plt.xlim(0,self._X.max())
# plt.ylim(0,self._Y.max())
_fig_formatting_2D(figure=fig_stream, xlab=self._xlab,
specialPoints=self._FixedPoints,
showFixedPoints=self._showFixedPoints,
ax_reformat=False, curve_replot=False,
ylab=self._ylab, fontsize=self._axes_font_size,
aspectRatioEqual=True,
choose_xrange=choose_xrange,
choose_yrange=choose_yrange)
with io.capture_output() as log:
self._appendFixedPointsToLogs(self._realEQsol, self._EV,
self._Evects)
self._logs.append(log)
else:
self._get_field3d("3d stream plot", 10)
ax = self._figure.gca(projection='3d')
argDict = self._get_argDict()
# Derived model equations stored with parameter values substituted
# in from widgets
eqA = self._mumotModel._equations[self._stateVariable1].subs(argDict)
eqB = self._mumotModel._equations[self._stateVariable2].subs(argDict)
eqC = self._mumotModel._equations[self._stateVariable3].subs(argDict)
# Model of ODEs from model equations
# Needed for odeint() method to integrate streams from start points
# N is needed for the equations
def modelODEs(states, t, eqA, eqB, eqC):
A = states[0]
B = states[1]
C = states[2]
# eval is needed to run the derived equation as a mathematical
# formula rather than a string
dAdt = eval(str(eqA))
dBdt = eval(str(eqB))
dCdt = eval(str(eqC))
return [dAdt, dBdt, dCdt]
# Time over which streams are integrated, longer time gives a longer stream.
t = np.linspace(0, self._integrationTime, 20)
# This empty array will store start points of streams
start_points = np.empty([0, 3])
# This is used to calculate speed value at each randomly selected point.
# Each column and row in self._X contains the same values so we only need 1.
# Used to find corresponding speed value for each starting point.
speed_points = self._X[0, 0, :]
j = 0
# Builds array start_points of starting points for streams
# _numPoints is a keyword for the number of start points (number of streams)
while j < self._numPoints:
# Randomly choose points to start streams from
p = np.random.choice(self._X[0, 0, :])
q = np.random.choice(self._Y[0, :, 0])
r = np.random.choice(self._Z[:, 0, 0])
# Use selected point as start point if they meet following criteria
# If not, then reselect start point
# Method is ineffecient for high number of start points
if (p + q + r <= 1):
temp = np.array([[p, q, r]])
start_points = np.concatenate((start_points, temp), axis=0)
j += 1
# Plot streams from each start point
for i in range(start_points[:, 0].shape[0]):
x = start_points[i, 0]
y = start_points[i, 1]
z = start_points[i, 2]
# Finds the speed of stream at each start point
# Finds index of each start point in speed_points
# Uses indexes to find corresponding speed for start point in self._speed
speed_x = np.where(speed_points == x)[0]
speed_y = np.where(speed_points == y)[0]
speed_z = np.where(speed_points == z)[0]
speed = np.absolute(self._speed[speed_x, speed_y, speed_z])[0]
# Initial conditions for integrations
state0 = [x, y, z]
state = odeint(modelODEs, state0, t, args=(eqA, eqB, eqC))
fig_stream3d = ax.plot(state[:, 0],
state[:, 1],
state[:, 2],
color=plt.cm.Greys(speed))
# Adds arrow halfway along the length of each stream
# mutation_scale is the size of the arrow head
arrow_point = int(t.shape[0] / 2)
arrow = Arrow3D([state[:, 0][arrow_point], state[:, 0][arrow_point + 2]],
[state[:, 1][arrow_point], state[:, 1][arrow_point + 2]],
[state[:, 2][arrow_point], state[:, 2][arrow_point + 2]],
mutation_scale=7,
color=plt.cm.Greys(speed))
ax.add_artist(arrow)
_fig_formatting_3D(figure=fig_stream3d, xlab=self._xlab,
ylab=self._ylab, zlab=self._zlab,
specialPoints=self._FixedPoints,
showFixedPoints=self._showFixedPoints,
ax_reformat=True,
showPlane=self._mumotModel._constantSystemSize,
fontsize=self._axes_font_size)
[docs]class MuMoTbifurcationView(MuMoTview):
"""Bifurcation view on model."""
# model for bifurcation analysis
_pyDSmodel = None
# critical parameter for bifurcation analysis
_bifurcationParameter = None
# first state variable of 2D system
_stateVariable1 = None
# second state variable of 2D system
_stateVariable2 = None
# state variable of 2D system used for bifurcation analysis, can either be _stateVariable1 or _stateVariable2
_stateVarBif1 = None
# state variable of 2D system used for bifurcation analysis, can either be _stateVariable1 or _stateVariable2
_stateVarBif2 = None
# state variable 1 for logs output
_stateVarBif1Print = None
# state variable 2 for logs output
_stateVarBif2Print = None
# generates command for bookmark functionality
_generatingCommand = None
# maximum number of points in one continuation calculation
_MaxNumPoints = None
# information about the mathematical expression displayed on vertical axis; can be 'None', '+' or '-'
_SVoperation = None
# initial conditions specified on corresponding sliders, will be used when calculation of fixed points fails
_pyDSmodel_ics = None
# Parameters for controller specific to this MuMoTbifurcationView
_BfcParams = None
# bifurcation parameter prepared for use in _get_argDict function in MuMoTView
_bifurcationParameter_for_get_argDict = None
# bifurcation parameter to be used in bookmark function
_bifurcationParameter_for_bookmark = None
# the system state at the start of the simulation (timestep zero)
_initialState = None
# list of state variables
_stateVariableList = None
# list of symbols protected in PyDSTool
_pydsProtected = ['gamma', 'Gamma']
# bifurcation parameter symbol passsed to PyDSTool
_bifurcationParameterPyDS = None
def _constructorSpecificParams(self, _):
if self._controller is not None:
self._generatingCommand = "bifurcation"
[docs] def __init__(self, model, controller, BfcParams, bifurcationParameter,
stateVarExpr1, stateVarExpr2=None, figure=None, params=None,
**kwargs):
self._silent = kwargs.get('silent', False)
self._bifurcationParameter_for_get_argDict = str(parse_latex(bifurcationParameter))
# self._bifurcationParameter_for_bookmark = utils._greekPrependify(utils._doubleUnderscorify(self._bifurcationParameter_for_get_argDict))
self._bifurcationParameter_for_bookmark = bifurcationParameter
super().__init__(model=model, controller=controller, figure=figure, params=params, **kwargs)
self._axes_font_size = kwargs.get('fontsize', None)
if '-' in str(stateVarExpr1):
self._ylab = kwargs.get('ylab', r'$' + r'\Phi_{' + str(stateVarExpr1)[:str(stateVarExpr1).index('-')].replace('\\\\', '\\') + '}' + '-' + r'\Phi_{' + str(stateVarExpr1)[str(stateVarExpr1).index('-') + 1:].replace('\\\\', '\\') + '}' + '$')
elif '+' in str(stateVarExpr1):
self._ylab = kwargs.get('ylab', r'$' + r'\Phi_{' + str(stateVarExpr1)[:str(stateVarExpr1).index('-')].replace('\\\\', '\\') + '}' + '+' + r'\Phi_{' + str(stateVarExpr1)[str(stateVarExpr1).index('-') + 1:].replace('\\\\', '\\') + '}' + '$')
else:
self._ylab = kwargs.get('ylab', r'$' + r'\Phi_{' + str(stateVarExpr1).replace('\\\\', '\\') + '}$')
self._xlab = kwargs.get('xlab', r'$' + bifurcationParameter + '$')
self._MaxNumPoints = kwargs.get('contMaxNumPoints', 100)
self._bifurcationParameter = _pydstoolify(bifurcationParameter)
replBifParam = {}
if self._bifurcationParameter in self._pydsProtected:
self._bifurcationParameterPyDS = 'A' + self._bifurcationParameter
replBifParam[self._bifurcationParameter] = self._bifurcationParameterPyDS
else:
self._bifurcationParameterPyDS = self._bifurcationParameter
self._stateVarExpr1 = stateVarExpr1
stateVarExpr1 = _pydstoolify(stateVarExpr1)
if stateVarExpr2:
stateVarExpr2 = _pydstoolify(stateVarExpr2)
self._SVoperation = None
try:
stateVarExpr1.index('-')
self._stateVarBif1 = stateVarExpr1[:stateVarExpr1.index('-')]
self._stateVarBif2 = stateVarExpr1[stateVarExpr1.index('-') + 1:]
self._SVoperation = '-'
except ValueError:
try:
stateVarExpr1.index(' + ')
self._stateVarBif1 = stateVarExpr1[:stateVarExpr1.index('+')]
self._stateVarBif2 = stateVarExpr1[stateVarExpr1.index('+') + 1:]
self._SVoperation = '+'
except ValueError:
self._stateVarBif1 = stateVarExpr1
self._stateVarBif2 = stateVarExpr2
# print(self._stateVarBif1)
# print(self._stateVarBif2)
self._BfcParams = BfcParams
if self._controller is None:
# storing the initial state
self._initialState = {}
for state, pop in BfcParams["initialState"].items():
if isinstance(state, str):
self._initialState[parse_latex(state)] = pop # convert string into SymPy symbol
else:
self._initialState[state] = pop
# storing all values of Bfc-specific parameters
self._initBifParam = BfcParams["initBifParam"]
else:
# storing the initial state
self._initialState = {}
for state, pop in BfcParams["initialState"][0].items():
if isinstance(state, str):
self._initialState[parse_latex(state)] = pop[0] # convert string into SymPy symbol
else:
self._initialState[state] = pop[0]
# storing fixed params
for key, value in BfcParams.items():
if value[-1]:
if key == 'initialState':
self._fixedParams[key] = self._initialState
else:
self._fixedParams[key] = value[0]
self._constructorSpecificParams(BfcParams)
# self._logs.append(log)
self._pyDSmodel = dst.args(name='MuMoT Model' + str(id(self)))
varspecs = {}
stateVariableList = []
replaceSV = {}
for reactant in self._mumotModel._reactants:
if reactant not in self._mumotModel._constantReactants:
stateVariableList.append(reactant)
reactantString = _pydstoolify(reactant)
if reactantString[0].islower() or reactantString in self._pydsProtected:
replaceSV[reactantString] = 'A' + reactantString
varspecs['A' + reactantString] = _pydstoolify(self._mumotModel._equations[reactant])
else:
varspecs[reactantString] = _pydstoolify(self._mumotModel._equations[reactant])
for key, equation in varspecs.items():
for replKey, replVal in replaceSV.items():
equationNew = equation.replace(replKey, replVal)
varspecs[key] = equationNew
for key, equation in varspecs.items():
for replKey, replVal in replBifParam.items():
equationNew = equation.replace(replKey, replVal)
varspecs[key] = equationNew
self._pyDSmodel.varspecs = varspecs
if len(stateVariableList) > 2:
self._showErrorMessage('Bifurcation diagrams are currently only supported for 1D and 2D systems (1 or 2 time-dependent variables in the ODE system)!')
return None
self._stateVariableList = stateVariableList
self._stateVariable1 = stateVariableList[0]
if len(stateVariableList) == 2:
self._stateVariable2 = stateVariableList[1]
if self._stateVarBif2 is None:
if self._stateVariable2:
if self._stateVarBif1 == _pydstoolify(self._stateVariable1):
self._stateVarBif2 = _pydstoolify(self._stateVariable2)
elif self._stateVarBif1 == _pydstoolify(self._stateVariable2):
self._stateVarBif2 = _pydstoolify(self._stateVariable1)
self._stateVarBif2Print = self._stateVarBif2
if self._stateVarBif2[0].islower() or self._stateVarBif2 in self._pydsProtected:
self._stateVarBif2 = 'A' + self._stateVarBif2
else:
self._stateVarBif2Print = self._stateVarBif2
if self._stateVarBif2[0].islower() or self._stateVarBif2 in self._pydsProtected:
self._stateVarBif2 = 'A' + self._stateVarBif2
self._stateVarBif1Print = self._stateVarBif1
if self._stateVarBif1[0].islower() or self._stateVarBif1 in self._pydsProtected:
self._stateVarBif1 = 'A' + self._stateVarBif1
if not self._silent:
self._plot_bifurcation()
def _plot_bifurcation(self, _=None):
self._show_computation_start()
self._initFigure()
self._update_params()
with io.capture_output() as log:
self._log("bifurcation plot")
if self._stateVariable2:
print(f"State variables are: {self._stateVarBif1Print} and {self._stateVarBif2Print}.")
else:
print("{State variable is: {self._stateVarBif1Print}.")
print(f"The bifurcation parameter chosen is: {self._bifurcationParameter}.")
self._logs.append(log)
argDict = self._get_argDict()
paramDict = {}
replaceRates = {}
for arg in argDict:
if arg in self._mumotModel._rates or arg in self._mumotModel._constantReactants or arg == self._mumotModel._systemSize:
if _pydstoolify(arg) in self._pydsProtected:
paramDict['A' + _pydstoolify(arg)] = argDict[arg]
if _pydstoolify(arg) != self._bifurcationParameter:
replaceRates[_pydstoolify(arg)] = 'A' + _pydstoolify(arg)
else:
paramDict[_pydstoolify(arg)] = argDict[arg]
for key, equation in self._pyDSmodel.varspecs.items():
for replKey, replVal in replaceRates.items():
equationNew = equation.replace(replKey, replVal)
self._pyDSmodel.varspecs[key] = equationNew
with io.capture_output() as log:
self._pyDSmodel.pars = paramDict
xdata = [] # list of arrays containing the bifurcation-parameter data for bifurcation diagram data
ydata = [] # list of arrays containing the state variable data (either one variable, or the sum or difference of the two SVs) for bifurcation diagram data
initDictList = []
self._pyDSmodel_ics = {}
for inState in self._initialState:
if inState in self._stateVariableList:
self._pyDSmodel_ics[inState] = self._initialState[inState]
# print(self._pyDSmodel_ics
# for ic in self._pyDSmodel_ics:
# if 'Phi0' in _pydstoolify(ic):
# self._pyDSmodel_ics[_pydstoolify(ic)[_pydstoolify(ic).index('0') + 1:]] = self._pyDSmodel_ics.pop(ic) # {'A': 0.1, 'B': 0.9 }
if len(self._stateVariableList) == 1:
realEQsol, eigList = self._get_fixedPoints1d()
elif len(self._stateVariableList) == 2:
realEQsol, eigList = self._get_fixedPoints2d()
if realEQsol != [] and realEQsol is not None:
for kk in range(len(realEQsol)):
if all(sympy.sign(sympy.re(lam)) < 0 for lam in eigList[kk]):
initDictList.append(realEQsol[kk])
# self._showErrorMessage('Stationary state(s) detected and continuated.'
# 'Initial conditions for state variables specified on sliders in Advanced options tab were not used.'
# '(Those are only used in case the calculation of fixed points fails.) ')
print(f"{len(initDictList)} stable steady state(s) detected and continuated. "
'Initial conditions for state variables specified on sliders in Advanced options tab were not used. '
'Those are only used in case the calculation of fixed points fails.')
else:
initDictList.append(self._pyDSmodel_ics)
# self._showErrorMessage('Stationary states could not be calculated;'
# 'used initial conditions specified on sliders in Advanced options tab instead. '
# 'This means only one branch was attempted to be continuated '
# 'and the starting point might not have been a stationary state. ')
print('Stationary states could not be calculated; '
f"used initial conditions specified on sliders in Advanced options tab instead: {self._pyDSmodel_ics}."
'This means only one branch was continuated and the starting point might not have been a stationary state.')
specialPoints = [] # list of special points: LP and BP
sPoints_X = [] # bifurcation parameter
sPoints_Y = [] # stateVarBif1
sPoints_Labels = []
eigenvalues = []
sPoints_Z = [] # stateVarBif2
k_iter_BPlabel = 0
k_iter_LPlabel = 0
for nn, init_dict in enumerate(initDictList):
# Mutate key names so they are in a form that is compatible
# with PyDSTool
init_dict_pyds = {}
for k, v in init_dict.items():
k_pyds = _pydstoolify(k)
if k_pyds.islower() or k_pyds in self._pydsProtected:
k_pyds = 'A' + k_pyds
init_dict_pyds[k_pyds] = v
#for key in initDictList[nn]:
# old_key = key
# new_key = _pydstoolify(key)
# if new_key[0].islower() or new_key in self._pydsProtected:
# new_key = 'A' + new_key
# initDictList[nn][new_key] = initDictList[nn].pop(old_key)
# self._pyDSmodel.ics = init_dict_pyds
pyDSode = dst.Generator.Vode_ODEsystem(self._pyDSmodel)
pyDSode.set(ics=init_dict_pyds)
# pyDSode.set(pars = self._getBifParInitCondFromSlider())
pyDSode.set(pars={self._bifurcationParameterPyDS: self._initBifParam})
# print(self._getBifParInitCondFromSlider())
pyDScont = dst.ContClass(pyDSode)
EQ_iter = 1 + nn
k_iter_BP = 1
k_iter_LP = 1
# 'EP-C' stands for Equilibrium Point Curve. The branch will be labelled with the string aftr name='name'.
pyDScontArgs = dst.args(name='EQ' + str(EQ_iter), type='EP-C')
# control parameter(s) (should be among those specified in self._pyDSmodel.pars)
pyDScontArgs.freepars = [self._bifurcationParameterPyDS]
# The following 3 parameters should work for most cases, as
# there should be a step-size adaption within PyDSTool.
pyDScontArgs.MaxNumPoints = self._MaxNumPoints
pyDScontArgs.MaxStepSize = 1e-1
pyDScontArgs.MinStepSize = 1e-5
pyDScontArgs.StepSize = 2e-3
# 'Limit Points' and 'Branch Points may be detected'
pyDScontArgs.LocBifPoints = ['LP', 'BP']
# to tell unstable from stable branches
pyDScontArgs.SaveEigen = True
pyDScont.newCurve(pyDScontArgs)
try:
try:
pyDScont['EQ' + str(EQ_iter)].backward()
except:
self._showErrorMessage('Continuation failure (backward) on initial branch<br>')
try:
pyDScont['EQ' + str(EQ_iter)].forward()
except:
self._showErrorMessage('Continuation failure (forward) on initial branch<br>')
except ZeroDivisionError:
self._show_computation_stop()
self._showErrorMessage('Division by zero<br>')
# pyDScont['EQ' + str(EQ_iter)].info()
if self._stateVarBif2 is not None:
try:
xdata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._bifurcationParameterPyDS])
if self._SVoperation:
if self._SVoperation == '-':
ydata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1] -
pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif2])
elif self._SVoperation == '+':
ydata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1] +
pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif2])
else:
self._showErrorMessage("Only '+' and '-' are supported operations between state variables.")
else:
ydata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1])
eigenvalues.append(np.array([pyDScont['EQ' + str(EQ_iter)].sol[kk].labels['EP']['data'].evals
for kk in range(len(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1]))]))
while pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP)):
if (round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._bifurcationParameterPyDS], 4) not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif1], 4) not in [round(kk, 4) for kk in sPoints_Y]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif2], 4) not in [round(kk, 4) for kk in sPoints_Z]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif1])
sPoints_Z.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif2])
k_iter_LPlabel += 1
sPoints_Labels.append('LP' + str(k_iter_LPlabel))
k_iter_LP += 1
k_iter_BPlabel_previous = k_iter_BPlabel
while pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP)):
if (round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._bifurcationParameterPyDS], 4) not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif1], 4) not in [round(kk, 4) for kk in sPoints_Y]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif2], 4) not in [round(kk, 4) for kk in sPoints_Z]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif1])
sPoints_Z.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif2])
k_iter_BPlabel += 1
sPoints_Labels.append('BP' + str(k_iter_BPlabel))
k_iter_BP += 1
for jj in range(1, k_iter_BP):
if 'BP' + str(jj + k_iter_BPlabel_previous) in sPoints_Labels:
EQ_iter_BP = jj
# print(EQ_iter_BP)
k_iter_next = 1
# 'EP-C' stands for Equilibrium Point Curve. The branch will be labelled with the string aftr name='name'.
pyDScontArgs = dst.args(name='EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP), type='EP-C')
# control parameter(s) (should be among those specified in self._pyDSmodel.pars)
pyDScontArgs.freepars = [self._bifurcationParameterPyDS]
# The following 3 parameters should work for most cases, as there should be a step-size adaption within PyDSTool.
pyDScontArgs.MaxNumPoints = self._MaxNumPoints
pyDScontArgs.MaxStepSize = 1e-1
pyDScontArgs.MinStepSize = 1e-5
pyDScontArgs.StepSize = 5e-3
# 'Limit Points' and 'Branch Points may be detected'
pyDScontArgs.LocBifPoints = ['LP', 'BP']
# To tell unstable from stable branches
pyDScontArgs.SaveEigen = True
pyDScontArgs.initpoint = 'EQ' + str(EQ_iter) + ':BP' + str(jj)
pyDScont.newCurve(pyDScontArgs)
try:
try:
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].backward()
except:
self._showErrorMessage('Continuation failure (backward) starting from branch point<br>')
try:
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].forward()
except:
self._showErrorMessage('Continuation failure (forward) starting from branch point<br>')
except ZeroDivisionError:
self._show_computation_stop()
self._showErrorMessage('Division by zero<br>')
xdata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._bifurcationParameterPyDS])
if self._SVoperation:
if self._SVoperation == '-':
ydata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1] -
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif2])
elif self._SVoperation == '+':
ydata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1] +
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif2])
else:
self._showErrorMessage('Only \' +\' and \'-\' are supported operations between state variables.')
else:
ydata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1])
eigenvalues.append(np.array([pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[kk].labels['EP']['data'].evals
for kk in range(len(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1]))]))
while pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next)):
if (round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif2], 4)
not in [round(kk, 4) for kk in sPoints_Z]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif1])
sPoints_Z.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif2])
sPoints_Labels.append('EQ_BP_BP' + str(k_iter_next))
k_iter_next += 1
k_iter_next = 1
while pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next)):
if (round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif2], 4)
not in [round(kk, 4) for kk in sPoints_Z]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif1])
sPoints_Z.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif2])
sPoints_Labels.append('EQ_BP_LP' + str(k_iter_next))
k_iter_next += 1
except TypeError:
self._show_computation_stop()
print("Continuation failed; "
"try with different parameters - use sliders. "
"If that does not work, try changing maximum number of continuation points using the keyword 'contMaxNumPoints'. "
"If not set, default value is contMaxNumPoints=100.")
# Bifurcation routine for 1D system
else:
try:
xdata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._bifurcationParameterPyDS])
ydata.append(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1])
eigenvalues.append(np.array([pyDScont['EQ' + str(EQ_iter)].sol[kk].labels['EP']['data'].evals
for kk in range(len(pyDScont['EQ' + str(EQ_iter)].sol[self._stateVarBif1]))]))
while pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP)):
if (round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('LP' + str(k_iter_LP))[self._stateVarBif1])
k_iter_LPlabel += 1
sPoints_Labels.append('LP' + str(k_iter_LPlabel))
k_iter_LP += 1
k_iter_BPlabel_previous = k_iter_BPlabel
while pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP)):
if (round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter)].getSpecialPoint('BP' + str(k_iter_BP))[self._stateVarBif1])
k_iter_BPlabel += 1
sPoints_Labels.append('BP' + str(k_iter_BPlabel))
k_iter_BP += 1
for jj in range(1, k_iter_BP):
if 'BP' + str(jj + k_iter_BPlabel_previous) in sPoints_Labels:
EQ_iter_BP = jj
print(EQ_iter_BP)
k_iter_next = 1
# 'EP-C' stands for Equilibrium Point Curve. The branch will be labelled with the string aftr name='name'.
pyDScontArgs = dst.args(name='EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP), type='EP-C')
# Control parameter(s) (should be among those specified in self._pyDSmodel.pars)
pyDScontArgs.freepars = [self._bifurcationParameterPyDS]
# The following 3 parameters should work for most cases, as there should be a step-size adaption within PyDSTool.
pyDScontArgs.MaxNumPoints = self._MaxNumPoints
pyDScontArgs.MaxStepSize = 1e-1
pyDScontArgs.MinStepSize = 1e-5
pyDScontArgs.StepSize = 5e-3
# 'Limit Points' and 'Branch Points may be detected'
pyDScontArgs.LocBifPoints = ['LP', 'BP']
# To tell unstable from stable branches
pyDScontArgs.SaveEigen = True
pyDScontArgs.initpoint = 'EQ' + str(EQ_iter) + ':BP' + str(jj)
pyDScont.newCurve(pyDScontArgs)
try:
try:
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].backward()
except:
self._showErrorMessage('Continuation failure (backward) starting from branch point<br>')
try:
pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].forward()
except:
self._showErrorMessage('Continuation failure (forward) starting from branch point<br>')
except ZeroDivisionError:
self._show_computation_stop()
self._showErrorMessage('Division by zero<br>')
xdata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._bifurcationParameterPyDS])
ydata.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1])
eigenvalues.append(np.array([pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[kk].labels['EP']['data'].evals
for kk in range(len(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].sol[self._stateVarBif1]))]))
while pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next)):
if (round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('BP' + str(k_iter_next))[self._stateVarBif1])
sPoints_Labels.append('EQ_BP_BP' + str(k_iter_next))
k_iter_next += 1
k_iter_next = 1
while pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next)):
if (round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._bifurcationParameterPyDS], 4)
not in [round(kk, 4) for kk in sPoints_X]
and round(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif1], 4)
not in [round(kk, 4) for kk in sPoints_Y]):
sPoints_X.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._bifurcationParameterPyDS])
sPoints_Y.append(pyDScont['EQ' + str(EQ_iter) + 'BP' + str(EQ_iter_BP)].getSpecialPoint('LP' + str(k_iter_next))[self._stateVarBif1])
sPoints_Labels.append('EQ_BP_LP' + str(k_iter_next))
k_iter_next += 1
except TypeError:
self._show_computation_stop()
print("Continuation failed; "
"try with different parameters - use sliders. "
"If that does not work, try changing maximum number of continuation points using the keyword 'contMaxNumPoints'. "
"If not set, default value is contMaxNumPoints=100.")
del(pyDScontArgs)
del(pyDScont)
del(pyDSode)
if self._SVoperation:
if self._SVoperation == '-':
specialPoints = [sPoints_X, np.asarray(sPoints_Y) - np.asarray(sPoints_Z), sPoints_Labels]
elif self._SVoperation == '+':
specialPoints = [sPoints_X, np.asarray(sPoints_Y) + np.asarray(sPoints_Z), sPoints_Labels]
else:
self._showErrorMessage("Only '+' and '-' are supported operations between state variables.")
else:
specialPoints = [sPoints_X, np.asarray(sPoints_Y), sPoints_Labels]
# print('Special Points on curve: ', specialPoints)
print('Special Points on curve:')
if len(specialPoints[0]) == 0:
print('No special points could be detected.')
else:
for kk in range(len(specialPoints[0])):
print(specialPoints[2][kk], ': ', self._bifurcationParameter, '=', str(_roundNumLogsOut(specialPoints[0][kk])), ',',
self._stateVarExpr1, '=', str(_roundNumLogsOut(specialPoints[1][kk])))
if self._chooseYrange is None:
if self._mumotModel._systemSize:
self._chooseYrange = [-self._getSystemSize(), self._getSystemSize()]
if xdata != [] and self._chooseXrange is None:
xmaxbif = np.max([np.max(xdata[kk]) for kk in range(len(xdata))])
self._chooseXrange = [0, xmaxbif]
if xdata != [] and ydata != []:
# plt.clf()
_fig_formatting_2D(xdata=xdata,
ydata=ydata,
xlab=self._xlab,
ylab=self._ylab,
specialPoints=specialPoints,
eigenvalues=eigenvalues,
choose_xrange=self._chooseXrange, choose_yrange=self._chooseYrange,
ax_reformat=False, curve_replot=False, fontsize=self._axes_font_size)
else:
self._showErrorMessage("Bifurcation diagram could not be computed. "
"Try changing parameter values on the sliders.")
return None
self._logs.append(log)
self._show_computation_stop()
def _initFigure(self) -> None:
# self._show_computation_start()
if not self._silent:
plt.figure(self._figureNum)
plt.clf()
self._resetErrorMessage()
self._showErrorMessage(str(self))
def _build_bookmark(self, includeParams: bool = True) -> str:
if not self._silent:
log_str = "bookmark = "
else:
log_str = ""
log_str += "<modelName>." + self._generatingCommand + "('" + str(self._bifurcationParameter_for_bookmark) + "', '" + str(self._stateVarExpr1) + "', "
# if self._stateVarBif2 is not None:
# log_str += "'" + str(self._stateVarBif2) + "', "
if "initialState" not in self._generatingKwargs.keys():
initState_str = {latex(state): pop for state, pop in self._initialState.items() if state not in self._mumotModel._constantReactants}
log_str += "initialState = " + str(initState_str) + ", "
if "initBifParam" not in self._generatingKwargs.keys():
log_str += "initBifParam = " + str(self._initBifParam) + ", "
if includeParams:
log_str += self._get_bookmarks_params() + ", "
if len(self._generatingKwargs) > 0:
for key in self._generatingKwargs:
if type(self._generatingKwargs[key]) == str:
log_str += key + " = " + "\'" + str(self._generatingKwargs[key]) + "\'" + ", "
else:
log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str += "bookmark = False"
log_str += ")"
log_str = utils._greekPrependify(log_str)
log_str = log_str.replace('\\', '\\\\')
log_str = log_str.replace('\\\\\\\\', '\\\\')
return log_str
def _update_view_specific_params(self, freeParamDict: Optional[Dict[object, object]] = None) -> None:
"""Get other parameters specific to bifurcation()"""
if freeParamDict is None:
freeParamDict = {}
if self._controller is not None:
if self._getWidgetParamValue('initialState', None) is not None:
# self._initialState = self._getWidgetParamValue('initialState', None)
# self._initialState = {sympy.Symbol(key): self._getWidgetParamValue('initialState', None)[key]
# for key in self._getWidgetParamValue('initialState', None)}
self._initialState = {self._safeSymbol(key): self._getWidgetParamValue('initialState', None)[key]
for key in self._getWidgetParamValue('initialState', None)}
else:
for state in self._initialState.keys():
# add normal and constant reactants to the _initialState
self._initialState[state] = self._getInitialState(state, freeParamDict) # freeParamDict[state] if state in self._mumotModel._constantReactants else self._controller._widgetsExtraParams['init'+str(state)].value
self._initBifParam = self._getWidgetParamValue('initBifParam', self._controller._widgetsExtraParams) # self._fixedParams['initBifParam'] if self._fixedParams.get('initBifParam') is not None else self._controller._widgetsExtraParams['initBifParam'].value
def _get_argDict(self):
"""Get and return names and values from widgets, overrides method defined in parent class MuMoTview."""
paramNames = []
paramValues = []
if self._controller is not None:
for name, value in self._controller._widgetsFreeParams.items():
# print("wdg-name: " + str(name) + " wdg-val: " + str(value.value))
paramNames.append(name)
paramValues.append(value.value)
if self._controller._widgetsExtraParams and 'initBifParam' in self._controller._widgetsExtraParams:
paramNames.append(self._bifurcationParameter_for_get_argDict)
paramValues.append(self._controller._widgetsExtraParams['initBifParam'].value)
if self._fixedParams and 'initBifParam' in self._fixedParams:
paramNames.append(self._bifurcationParameter_for_get_argDict)
paramValues.append(self._fixedParams['initBifParam'])
if self._fixedParams is not None:
for key, item in self._fixedParams.items():
# print("fix-name: " + str(key) + " fix-val: " + str(item))
paramNames.append(str(key))
paramValues.append(item)
argNamesSymb = list(map(sympy.Symbol, paramNames))
argDict = dict(zip(argNamesSymb, paramValues))
if self._mumotModel._systemSize:
argDict[self._mumotModel._systemSize] = 1
# @todo: is this necessary? for which view?
systemSize = sympy.Symbol('systemSize')
argDict[systemSize] = self._getSystemSize()
return argDict
[docs]class MuMoTstochasticSimulationView(MuMoTview):
"""Stochastic-simulations-view view.
For views that allow for multiple runs with different random-seeds.
"""
# the system state at the start of the simulation (timestep zero) described as proportion of _systemSize
_initialState = None
# variable to link a color to each reactant
_colors = None
# list of colors to pass to the _fig_formatting method (that does not include constant reactants)
_colors_list = None
# random seed
_randomSeed = None
# simulation length (in the same time unit of the rates)
_maxTime = None
# visualisation type
_visualisationType = None
# reactants to display on the two axes
_finalViewAxes = None
# flag to plot proportions or full populations
_plotProportions = None
# realtimePlot flag (TRUE = the plot is updated each timestep of the simulation; FALSE = it is updated once at the end of the simulation)
_realtimePlot = None
# latest computed results
_latestResults = None
# number of runs to execute
_runs = None
# flag to set if the results from multimple runs must be aggregated or not
_aggregateResults = None
# variable to store simulation time during simulation
_t = 0
# variable to store the current simulation state
_currentState = 0
# variable to store the time evolution of the simulation
_evo = None
# progress bar
_progressBar = None
# variable that is set to False only by the multiController managing this view (when shareAxes == True and not first view to be run)
_allowRealtimePlotting = True
[docs] def __init__(self, model, controller, SSParams, figure=None, params=None, **kwargs):
# Loading bar (useful to give user progress status for long executions)
self._progressBar = widgets.FloatProgress(
value=0,
min=0,
max=1,
description='Loading:',
bar_style='success', # 'success', 'info', 'warning', 'danger' or ''
style={'description_width': 'initial'},
orientation='horizontal'
)
self._silent = kwargs.get('silent', False)
self._xlab = kwargs.get('xlab', 'time t')
self._ylab = kwargs.get('ylab', 'reactants')
if not self._silent:
display(self._progressBar)
super().__init__(model=model, controller=controller, figure=figure, params=params, **kwargs)
with io.capture_output() as log:
freeParamDict = self._get_argDict()
if self._controller is None:
# Storing the initial state
self._initialState = {}
for state, pop in SSParams["initialState"].items():
if isinstance(state, str):
self._initialState[parse_latex(state)] = pop # convert string into SymPy symbol
else:
self._initialState[state] = pop
# Add to the _initialState the constant reactants
for constantReactant in self._mumotModel._getAllReactants()[1]:
self._initialState[constantReactant] = freeParamDict[constantReactant]
# Storing all values of MA-specific parameters
self._maxTime = SSParams["maxTime"]
self._randomSeed = SSParams["randomSeed"]
self._visualisationType = SSParams["visualisationType"]
final_x = str(parse_latex(SSParams.get("final_x", latex(sorted(self._mumotModel._getAllReactants()[0], key=str)[0]))))
# if isinstance(final_x, str): final_x = parse_latex(final_x)
final_y = str(parse_latex(SSParams.get("final_y", latex(sorted(self._mumotModel._getAllReactants()[0], key=str)[0]))))
# if isinstance(final_y, str): final_y = parse_latex(final_y)
self._finalViewAxes = (final_x, final_y)
self._plotProportions = SSParams["plotProportions"]
self._realtimePlot = SSParams.get('realtimePlot', False)
self._runs = SSParams.get('runs', 1)
self._aggregateResults = SSParams.get('aggregateResults', True)
else:
# storing the initial state
self._initialState = {}
for state, pop in SSParams["initialState"][0].items():
if isinstance(state, str):
self._initialState[parse_latex(state)] = pop[0] # convert string into SymPy symbol
else:
self._initialState[state] = pop[0]
# add to the _initialState the constant reactants
for constantReactant in self._mumotModel._getAllReactants()[1]:
self._initialState[constantReactant] = freeParamDict[constantReactant]
# storing fixed params
for key, value in SSParams.items():
if value[-1]:
if key == 'initialState':
self._fixedParams[key] = self._initialState
else:
self._fixedParams[key] = value[0]
self._constructorSpecificParams(SSParams)
# map colouts to each reactant
# colors = cm.rainbow(np.linspace(0, 1, len(self._mumotModel._reactants) )) # @UndefinedVariable
self._colors = {}
self._colors_list = []
for idx, state in enumerate(sorted(self._initialState.keys(), key=str)):
self._colors[state] = consts.LINE_COLOR_LIST[idx]
if state not in self._mumotModel._constantReactants:
self._colors_list.append(consts.LINE_COLOR_LIST[idx])
self._logs.append(log)
if not self._silent:
self._computeAndPlotSimulation()
def _constructorSpecificParams(self, _) -> None:
pass
def _computeAndPlotSimulation(self, _=None) -> None:
with io.capture_output() as log:
self._show_computation_start()
self._update_params()
self._log("Stochastic Simulation")
if not self._allowRealtimePlotting: self._realtimePlot = False # if realtimePlot is not allowed, then the _realtimePlot param is forced to False (regardless the widget or input value)
# if you need to access the standalone view, you can use the
# command self._printStandaloneViewCmd(), this is very useful for
# developer and advanced users as indicated in issue #92
# Clearing the plot and setting the axes
self._initFigure()
self._latestResults = []
for r in range(self._runs):
runID = f"[{r + 1}/{self._runs}] " if self._runs > 1 else ''
self._latestResults.append(self._runSingleSimulation(self._randomSeed + r,
runID=runID))
# Final plot
if not self._realtimePlot or self._aggregateResults:
# for results in self._latestResults:
# self._updateSimultationFigure(results, fullPlot=True)
self._updateSimultationFigure(self._latestResults, fullPlot=True)
self._show_computation_stop()
self._logs.append(log)
if self._controller is not None:
self._updateDownloadLink()
def _update_view_specific_params(self, freeParamDict: Optional[Dict[object, object]] = None) -> None:
"""Get other parameters specific to SSA."""
if freeParamDict is None:
freeParamDict = {}
if self._controller is not None:
if self._getWidgetParamValue('initialState', None) is not None:
# self._initialState = self._getWidgetParamValue('initialState', None)
# self._initialState = {sympy.Symbol(key): self._getWidgetParamValue('initialState', None)[key]
# for key in self._getWidgetParamValue('initialState', None)}
self._initialState = {self._safeSymbol(key): self._getWidgetParamValue('initialState', None)[key]
for key in self._getWidgetParamValue('initialState', None)}
else:
for state in self._initialState.keys():
# Add normal and constant reactants to the _initialState
self._initialState[state] = self._getInitialState(state, freeParamDict) # freeParamDict[state] if state in self._mumotModel._constantReactants else self._controller._widgetsExtraParams['init'+str(state)].value
self._randomSeed = self._getWidgetParamValue('randomSeed', self._controller._widgetsExtraParams) # self._fixedParams['randomSeed'] if self._fixedParams.get('randomSeed') is not None else self._controller._widgetsExtraParams['randomSeed'].value
self._visualisationType = self._getWidgetParamValue('visualisationType', self._controller._widgetsPlotOnly) # self._fixedParams['visualisationType'] if self._fixedParams.get('visualisationType') is not None else self._controller._widgetsPlotOnly['visualisationType'].value
if self._visualisationType == 'final':
self._finalViewAxes = (self._getWidgetParamValue('final_x',
self._controller._widgetsPlotOnly),
self._getWidgetParamValue('final_y',
self._controller._widgetsPlotOnly))
# self._finalViewAxes = (self._fixedParams['final_x'] if self._fixedParams.get('final_x') is not None else self._controller._widgetsPlotOnly['final_x'].value, self._fixedParams['final_y'] if self._fixedParams.get('final_y') is not None else self._controller._widgetsPlotOnly['final_y'].value)
self._plotProportions = self._getWidgetParamValue('plotProportions', self._controller._widgetsPlotOnly) # self._fixedParams['plotProportions'] if self._fixedParams.get('plotProportions') is not None else self._controller._widgetsPlotOnly['plotProportions'].value
self._maxTime = self._getWidgetParamValue('maxTime', self._controller._widgetsExtraParams) # self._fixedParams['maxTime'] if self._fixedParams.get('maxTime') is not None else self._controller._widgetsExtraParams['maxTime'].value
self._realtimePlot = self._getWidgetParamValue('realtimePlot', self._controller._widgetsExtraParams) # self._fixedParams['realtimePlot'] if self._fixedParams.get('realtimePlot') is not None else self._controller._widgetsExtraParams['realtimePlot'].value
self._runs = self._getWidgetParamValue('runs', self._controller._widgetsExtraParams) # self._fixedParams['runs'] if self._fixedParams.get('runs') is not None else self._controller._widgetsExtraParams['runs'].value
self._aggregateResults = self._getWidgetParamValue('aggregateResults', self._controller._widgetsPlotOnly) # self._fixedParams['aggregateResults'] if self._fixedParams.get('aggregateResults') is not None else self._controller._widgetsPlotOnly['aggregateResults'].value
def _initSingleSimulation(self) -> None:
self._progressBar.max = self._maxTime
# Initialise populations by multiplying proportion with _systemSize
# currentState = copy.deepcopy(self._initialState)
# currentState = {s:p * self._systemSize for s,p in self._initialState.items()}
self._currentState = {}
leftOvers = {}
for state, prop in self._initialState.items():
pop = prop * self._systemSize
if (not utils._almostEqual(pop, math.floor(pop))) and (state not in self._mumotModel._constantReactants):
leftOvers[state] = pop - math.floor(pop)
self._currentState[state] = math.floor(pop)
# If approximations resulted in one agent less, it is added randomly (with probability proportional to the rounding quantities)
sumReactants = sum([self._currentState[state]
for state in self._currentState.keys()
if state not in self._mumotModel._constantReactants])
if sumReactants < self._systemSize:
rnd = np.random.rand() * sum(leftOvers.values())
bottom = 0.0
for state, prob in leftOvers.items():
if rnd >= bottom and rnd < (bottom + prob):
self._currentState[state] += 1
break
bottom += prob
# Create logging structs
self._evo = {}
self._evo['time'] = [0]
for state, pop in self._currentState.items():
self._evo[state] = []
self._evo[state].append(pop)
# initialise time
self._t = 0
def _runSingleSimulation(self, randomSeed, runID=''):
# init the random seed
np.random.seed(randomSeed)
self._initSingleSimulation()
while self._t < self._maxTime:
# Update progress bar
self._progressBar.value = self._t
self._progressBar.description = f"Loading {runID}{round(self._t / self._maxTime*100)}%:"
timeInterval, self._currentState = self._simulationStep()
# increment time
self._t += timeInterval
# log step
for state, pop in self._currentState.items():
if state in self._mumotModel._constantReactants:
continue
self._evo[state].append(pop)
self._evo['time'].append(self._t)
# Print (self._evo)
# Plotting each timestep
if self._realtimePlot:
self._updateSimultationFigure(allResults=self._latestResults,
fullPlot=False,
currentEvo=self._evo)
self._progressBar.value = self._progressBar.max
self._progressBar.description = "Completed 100%:"
# print("Temporal evolution per state: " + str(self._evo))
return self._evo
def _updateSimultationFigure(self, allResults, fullPlot: bool = True, currentEvo: Optional[Dict] = None) -> None:
if (self._visualisationType == "evo"):
# if incremental plot is requested, but it's the first item, we operate as fullPlot (to allow legend)
# if not fullPlot and len(currentEvo['time']) <= 2:
# fullPlot = True
# If fullPlot, plot all time-evolution
if fullPlot or len(currentEvo['time']) <= 2:
y_max = 1.0 if self._plotProportions else self._systemSize
# plot in aggregate mode only if there's enough data
if self._aggregateResults and len(allResults) > 1:
self._initFigure()
steps = 10
timesteps = list(np.arange(0, self._maxTime, step=self._maxTime / steps))
# if timesteps[-1] - self._maxTime > self._maxTime / (steps * 2):
# timesteps.append(self._maxTime)
# else:
# timesteps[-1] = self._maxTime
if not utils._almostEqual(timesteps[-1], self._maxTime):
timesteps.append(self._maxTime)
for state in sorted(self._initialState.keys(), key=str):
if state == 'time' or state in self._mumotModel._constantReactants:
continue
boxesData = []
avgs = []
for timestep in timesteps:
boxData = []
for results in allResults:
idx = max(0, bisect.bisect_left(results['time'], timestep))
if self._plotProportions:
boxData.append(results[state][idx] / self._systemSize)
else:
boxData.append(results[state][idx])
y_max = max(y_max, max(boxData))
boxesData.append(boxData)
avgs.append(np.mean(boxData))
# bplot = plt.boxplot(boxData, patch_artist=True, positions=[timestep],
# manage_ticks=False, widths=self._maxTime / (steps * 3) )
# print(f"Plotting bxplt at positions {timestep} generated from idx = {idx}")
plt.plot(timesteps, avgs, color=self._colors[state])
bplots = plt.boxplot(boxesData, patch_artist=True, positions=timesteps,
manage_ticks=False, widths=self._maxTime / (steps * 3))
# for patch, color in zip(bplots['boxes'], [self._colors[state]] * len(timesteps)):
# patch.set_facecolor(color)
# bplot['boxes'].set_facecolor(self._colors[state])
# plt.setp(bplots['boxes'], color=self._colors[state])
wdt = 2
for box in bplots['boxes']:
# change outline color
box.set(color=self._colors[state], linewidth=wdt)
# box.set( color='black', linewidth=2)
# change fill color
box.set(facecolor='None')
# box.set( facecolor = self._colors[state] )
plt.setp(bplots['whiskers'], color=self._colors[state], linewidth=wdt)
plt.setp(bplots['caps'], color=self._colors[state], linewidth=wdt)
plt.setp(bplots['medians'], color=self._colors[state], linewidth=wdt)
# plt.setp(bplots['fliers'], color=self._colors[state], marker='o', alpha=0.5)
for flier in bplots['fliers']:
flier.set_markerfacecolor(self._colors[state])
flier.set_markeredgecolor("None")
flier.set(marker='o', alpha=0.5)
padding_x = self._maxTime / 20.0
padding_y = y_max / 20.0
else:
for state in sorted(self._initialState.keys(), key=str):
if (state == 'time') or state in self._mumotModel._constantReactants:
continue
# xdata = []
# xdata.append( results['time'] )
for results in allResults:
# ydata = []
if self._plotProportions:
ydata = [y / self._systemSize for y in results[state]]
# ydata.append(ytmp)
y_max = max(y_max, max(ydata))
else:
ydata = results[state]
y_max = max(y_max, max(results[state]))
# xdata=[list(np.arange(len(list(evo.values())[0])))] * len(evo.values()), ydata=list(evo.values()), curvelab=list(evo.keys())
plt.plot(results['time'], ydata, color=self._colors[state], lw=2)
# _fig_formatting_2D(xdata=xdata, ydata=ydata, curvelab=labels, curve_replot=False,
# choose_xrange=(0, self._maxTime), choose_yrange=(0, y_max))
padding_x = self._maxTime / 100.0
padding_y = y_max / 100.0
# plot legend
if self._plotProportions:
stateNamesLabel = [r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(sympy.Symbol('Phi_{' + str(state) + '}')))) + '$'
for state in sorted(self._initialState.keys(), key=str)
if state not in self._mumotModel._constantReactants]
# stateNamesLabel = [r'$'+latex(sympy.Symbol('Phi_'+str(state))) +'$'
# for state in sorted(self._initialState.keys(), key=str)
# if state not in self._mumotModel._constantReactants]
else:
stateNamesLabel = [r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(sympy.Symbol(str(state))))) + '$'
for state in sorted(self._initialState.keys(), key=str)
if state not in self._mumotModel._constantReactants]
# stateNamesLabel = [r'$'+latex(sympy.Symbol(str(state)))+'$'
# for state in sorted(self._initialState.keys(), key=str)
# if state not in self._mumotModel._constantReactants]
markers = [plt.Line2D([0, 0], [0, 0], color=self._colors[state], marker='s', linestyle='', markersize=10)
for state in sorted(self._initialState.keys(), key=str)
if state not in self._mumotModel._constantReactants]
plt.legend(markers, stateNamesLabel, loc=self._legend_loc, borderaxespad=0., numpoints=1, fontsize=self._legend_fontsize) # bbox_to_anchor=(0.885, 1),
xrange = (0 - padding_x, self._maxTime + padding_x) if self._chooseXrange is None else self._chooseXrange
yrange = (0 - padding_y, y_max + padding_y) if self._chooseYrange is None else self._chooseYrange
_fig_formatting_2D(figure=self._figure, xlab=self._xlab, ylab=self._ylab,
choose_xrange=xrange,
choose_yrange=yrange,
#choose_xrange=choose_xrange,
#choose_yrange=self._chooseYrange,
fontsize=self._axes_font_size,
aspectRatioEqual=False, grid=True)
if not fullPlot: # If realtime-plot mode, draw only the last timestep rather than overlay all
xdata = []
ydata = []
y_max = 1.0 if self._plotProportions else self._systemSize
for state in sorted(self._initialState.keys(), key=str):
if (state == 'time') or self._mumotModel._constantReactants:
continue
xdata.append(currentEvo['time'][-2:])
# modify if plotProportions
ytmp = ([y / self._systemSize
for y in currentEvo[state][-2:]]
if self._plotProportions else currentEvo[state][-2:])
y_max = max(y_max, max(ytmp))
ydata.append(ytmp)
xrange = (0, self._maxTime) if self._chooseXrange is None else self._chooseXrange
yrange = (0, y_max) if self._chooseYrange is None else self._chooseYrange
_fig_formatting_2D(xdata=xdata, ydata=ydata, curve_replot=False,
choose_xrange=xrange,
choose_yrange=yrange,
aspectRatioEqual=False, LineThickness=2,
fontsize=self._axes_font_size,
grid=True, line_color_list=self._colors_list)
# y_max = 1.0 if self._plotProportions else self._systemSize
# for state in sorted(self._initialState.keys(), key=str):
# if (state == 'time'):
# continue
# # modify if plotProportions
# ytmp = [y / self._systemSize for y in currentEvo[state][-2:] ] if self._plotProportions else currentEvo[state][-2:]
# y_max = max(y_max, max(ytmp))
# plt.plot(currentEvo['time'][-2:], ytmp, color=self._colors[state], lw=2)
# padding_x = 0
# padding_y = 0
# _fig_formatting_2D(figure=self._figure, xlab="Time", ylab="Reactants",
# choose_xrange=(0-padding_x, self._maxTime+padding_x),
# choose_yrange=(0-padding_y, y_max+padding_y),
# aspectRatioEqual=False)
elif (self._visualisationType == "final"):
points_x = []
points_y = []
if not fullPlot: # if it's a runtime plot
self._initFigure() # the figure must be cleared each timestep
for state in self._mumotModel._getAllReactants()[0]: # the current point added to the list of points
if str(state) == self._finalViewAxes[0]:
points_x.append(currentEvo[state][-1] / self._systemSize
if self._plotProportions
else currentEvo[state][-1])
trajectory_x = ([x / self._systemSize
for x in currentEvo[state]]
if self._plotProportions
else currentEvo[state])
if str(state) == self._finalViewAxes[1]:
points_y.append(currentEvo[state][-1] / self._systemSize
if self._plotProportions
else currentEvo[state][-1])
trajectory_y = ([y / self._systemSize
for y in currentEvo[state]]
if self._plotProportions
else currentEvo[state])
if self._aggregateResults and len(allResults) > 2: # plot in aggregate mode only if there's enough data
self._initFigure()
samples_x = []
samples_y = []
for state in self._mumotModel._getAllReactants()[0]:
if str(state) == self._finalViewAxes[0]:
for results in allResults:
samples_x.append(results[state][-1] / self._systemSize
if self._plotProportions
else results[state][-1])
if str(state) == self._finalViewAxes[1]:
for results in allResults:
samples_y.append(results[state][-1] / self._systemSize
if self._plotProportions
else results[state][-1])
samples = np.column_stack((samples_x, samples_y))
_plot_point_cov(samples, nstd=1, alpha=0.5, color='green')
else:
for state in self._mumotModel._getAllReactants()[0]:
if str(state) == self._finalViewAxes[0]:
for results in allResults:
points_x.append(results[state][-1] / self._systemSize
if self._plotProportions
else results[state][-1])
if str(state) == self._finalViewAxes[1]:
for results in allResults:
points_y.append(results[state][-1] / self._systemSize
if self._plotProportions
else results[state][-1])
# _fig_formatting_2D(xdata=[xdata], ydata=[ydata], curve_replot=False,
# xlab=self._finalViewAxes[0], ylab=self._finalViewAxes[1])
if not fullPlot:
plt.plot(trajectory_x, trajectory_y, '-', c='0.6')
plt.plot(points_x, points_y, 'ro')
if self._plotProportions:
xlab = r'$' + r'\Phi_{' + utils._doubleUnderscorify(utils._greekPrependify(str(self._finalViewAxes[0]))) + '}$'
ylab = r'$' + r'\Phi_{' + utils._doubleUnderscorify(utils._greekPrependify(str(self._finalViewAxes[1]))) + '}$'
else:
xlab = r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(self._finalViewAxes[0]))) + '$'
ylab = r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(self._finalViewAxes[1]))) + '$'
_fig_formatting_2D(figure=self._figure, aspectRatioEqual=True, xlab=xlab, ylab=ylab, fontsize=self._axes_font_size,
choose_xrange=self._chooseXrange, choose_yrange=self._chooseYrange)
elif self._visualisationType == "barplot":
self._initFigure()
finaldata = []
colors = []
stdev = []
y_max = 1.0 if self._plotProportions else self._systemSize
if fullPlot:
for state in sorted(self._initialState.keys(), key=str):
if state == 'time' or self._mumotModel._constantReactants:
continue
if self._aggregateResults and len(allResults) > 0:
points = []
for results in allResults:
points.append(results[state][-1] / self._systemSize
if self._plotProportions
else results[state][-1])
avg = np.mean(points)
stdev.append(np.std(points))
else:
if allResults:
avg = (allResults[-1][state][-1] / self._systemSize
if self._plotProportions
else allResults[-1][state][-1])
else:
avg = 0
stdev.append(0)
finaldata.append(avg)
# labels.append(state)
colors.append(self._colors[state])
else:
for state in sorted(self._initialState.keys(), key=str):
if state == 'time' or self._mumotModel._constantReactants:
continue
finaldata.append(currentEvo[state][-1] / self._systemSize
if self._plotProportions
else currentEvo[state][-1])
stdev.append(0)
# labels.append(state)
colors.append(self._colors[state])
# plt.pie(finaldata, labels=labels, autopct=utils._make_autopct(piedata),
# colors=colors) # shadow=True, startangle=90,
xpos = np.arange(len(finaldata)) # the x locations for the bars
width = 1 # the width of the bars
plt.bar(xpos, finaldata, width, color=colors, yerr=stdev, ecolor='black')
# set axes
ax = plt.gca()
ax.set_xticks(xpos) # for matplotlib < 2 ---> ax.set_xticks(xpos - (width / 2) )
y_max = max(y_max, max(finaldata))
padding_y = y_max / 100.0 if self._runs <= 1 else y_max / 20.0
# set lables
if self._plotProportions:
stateNamesLabel = [r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(sympy.Symbol('Phi_{' + str(state) + '}')))) + '$'
for state in sorted(self._initialState.keys(), key=str)
if state not in self._mumotModel._constantReactants]
# stateNamesLabel = [r'$'+latex(sympy.Symbol('Phi_'+str(state))) +'$'
# for state in sorted(self._initialState.keys(), key=str)]
else:
stateNamesLabel = [r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(sympy.Symbol(str(state))))) + '$'
for state in sorted(self._initialState.keys(), key=str)
if state not in self._mumotModel._constantReactants]
# stateNamesLabel = [r'$'+latex(sympy.Symbol(str(state)))+'$'
# for state in sorted(self._initialState.keys(), key=str)]
ax.set_xticklabels(stateNamesLabel)
_fig_formatting_2D(figure=self._figure, xlab=self._ylab, ylab="population proportion"
if self._plotProportions
else "population size", aspectRatioEqual=False, fontsize=self._axes_font_size)
# @todo: to fix the choose_yrange of _fig_formatting_2D (issue #104)
plt.ylim((0, y_max + padding_y))
# update the figure
if not self._silent or self._realtimePlot:
self._figure.canvas.draw()
def _redrawOnly(self, _=None):
self._update_params()
self._initFigure()
self._updateSimultationFigure(self._latestResults, fullPlot=True)
# for results in self._latestResults:
# self._updateSimultationFigure(results, fullPlot=True)
def _initFigure(self):
if not self._silent or self._realtimePlot:
plt.figure(self._figureNum)
plt.cla()
if (self._visualisationType == 'evo'):
pass
elif (self._visualisationType == "final"):
# plt.axes().set_aspect('equal')
if self._plotProportions:
plt.xlim((0, 1.0))
plt.ylim((0, 1.0))
else:
plt.xlim((0, self._systemSize))
plt.ylim((0, self._systemSize))
# plt.axes().set_xlabel(self._finalViewAxes[0])
# plt.axes().set_ylabel(self._finalViewAxes[1])
elif (self._visualisationType == "barplot"):
# plt.axes().set_aspect('equal') # for piechart
# plt.axes().set_aspect('auto') # for barchart
pass
def _convertLatestDataIntoCSV(self):
"""Formatting of the latest simulation data in the CSV format"""
csv_results = ""
line = 'runID' + ',' + 'time'
for state in sorted(self._initialState.keys(), key=str):
line += ',' + str(state)
line += '\n'
csv_results += line
for runID, runData in enumerate(self._latestResults):
for t, timestep in enumerate(runData['time']):
line = str(runID) + ',' + str(timestep)
for state in sorted(self._initialState.keys(), key=str):
if state in self._mumotModel._constantReactants:
continue
line += ',' + str(runData[state][t])
line += '\n'
csv_results += line
return csv_results
def _updateDownloadLink(self):
"""Update the link with the latest results"""
self._controller._downloadWidgetLink.value = self._controller._create_download_link(self._convertLatestDataIntoCSV(), title="Download results", filename="simulationData.txt")
[docs] def downloadResults(self):
"""Create a download link to access the latest results."""
return HTML(self._controller._create_download_link(self._convertLatestDataIntoCSV(), title="Download results", filename="simulationData.txt"))
[docs]class MuMoTmultiagentView(MuMoTstochasticSimulationView):
"""Agent on networks view on model."""
# structure to store the communication network
_graph = None
# type of network used in the M-A simulation
_netType = None
# network parameter which varies with respect to each type of network
# (for Erdos-Renyi is linking probability, for Barabasi-Albert is the number of edge per new node, for spatial is the communication range)
_netParam = None
# speed of the particle on one timestep (only for dynamic netType)
_particleSpeed = None
# corelatedness in the random walk motion (only for dynamic netType)
_motionCorrelatedness = None
# list of agents involved in the simulation
_agents = None
# list of agents' positions
_positions = None
_positionHistory = None
# Arena size: width
_arena_width = 1
# Arena size: height
_arena_height = 1
# number of simulation timesteps
_maxTimeSteps = None
# time scaling (i.e. lenght of each timestep)
_timestepSize = None
# visualise the agent trace (on moving particles)
_showTrace = None
# visualise the agent trace (on moving particles)
_showInteractions = None
def _constructorSpecificParams(self, MAParams):
if self._controller is None:
self._timestepSize = MAParams.get('timestepSize', 1)
self._netType = utils._decodeNetworkTypeFromString(MAParams['netType'])
if self._netType != consts.NetworkType.FULLY_CONNECTED:
self._netParam = MAParams['netParam']
if self._netType == consts.NetworkType.DYNAMIC:
self._motionCorrelatedness = MAParams['motionCorrelatedness']
self._particleSpeed = MAParams['particleSpeed']
self._showTrace = MAParams.get('showTrace', False)
self._showInteractions = MAParams.get('showInteractions', False)
else:
# needed for bookmarking
self._generatingCommand = "multiagent"
# storing fixed params
if self._fixedParams.get('netType') is not None:
self._fixedParams['netType'] = utils._decodeNetworkTypeFromString(self._fixedParams['netType'])
self._netType = utils._decodeNetworkTypeFromString(MAParams['netType'][0])
self._update_net_params(False)
self._mumotModel._getSingleAgentRules()
# print(self._mumotModel._agentProbabilities)
# check if any network is available or only moving particles
onlyDynamic = False
(_, allConstantReactants) = self._mumotModel._getAllReactants()
for rule in self._mumotModel._rules:
if consts.EMPTYSET_SYMBOL in rule.lhsReactants + rule.rhsReactants:
onlyDynamic = True
break
for react in rule.lhsReactants + rule.rhsReactants:
if react in allConstantReactants:
onlyDynamic = True
if onlyDynamic:
break
if onlyDynamic:
# if (not self._controller) and (not self._netType == consts.NetworkType.DYNAMIC): # if the user has specified the network type, we notify him/her through error-message
# self._errorMessage.value = "Only Moving-Particle netType is available when rules contain the emptyset."
if not self._netType == consts.NetworkType.DYNAMIC:
wrnMsg = "Only Moving-Particle netType is available when rules contain the emptyset or constant reactants."
warn(wrnMsg, exceptions.MuMoTWarning)
self._netType = consts.NetworkType.DYNAMIC
if self._controller: # updating value and disabling widget
if self._controller._widgetsExtraParams.get('netType') is not None:
self._controller._widgetsExtraParams['netType'].value = consts.NetworkType.DYNAMIC
self._controller._widgetsExtraParams['netType'].disabled = True
else:
self._fixedParams['netType'] = consts.NetworkType.DYNAMIC
self._controller._update_net_params()
else: # this is a standalone view
# if the assigned value of net-param is not consistent with the input, raise a WARNING and set the default value to 0.1
if self._netParam < 0 or self._netParam > 1:
wrnMsg = "WARNING! net-param value " + str(self._netParam) + " is invalid for Moving-Particles. Valid range is [0,1] indicating the particles' communication range. \n"
self._netParam = 0.1
wrnMsg += "New default values is '_netParam'=" + str(self._netParam)
warn(wrnMsg, exceptions.MuMoTWarning)
def _build_bookmark(self, includeParams=True) -> str:
log_str = "bookmark = " if not self._silent else ""
log_str += "<modelName>." + self._generatingCommand + "("
# log_str += _find_obj_names(self._mumotModel)[0] + "." + self._generatingCommand + "("
if includeParams:
log_str += self._get_bookmarks_params()
log_str += ", "
log_str = log_str.replace('\\', '\\\\')
init_state_str = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
log_str += "initialState = " + str(init_state_str)
log_str += ", maxTime = " + str(self._maxTime)
log_str += ", timestepSize = " + str(self._timestepSize)
log_str += ", randomSeed = " + str(self._randomSeed)
log_str += ", netType = '" + utils._encodeNetworkTypeToString(self._netType) + "'"
if not self._netType == consts.NetworkType.FULLY_CONNECTED:
log_str += ", netParam = " + str(self._netParam)
if self._netType == consts.NetworkType.DYNAMIC:
log_str += ", motionCorrelatedness = " + str(self._motionCorrelatedness)
log_str += ", particleSpeed = " + str(self._particleSpeed)
log_str += ", showTrace = " + str(self._showTrace)
log_str += ", showInteractions = " + str(self._showInteractions)
log_str += ", visualisationType = '" + str(self._visualisationType) + "'"
if self._visualisationType == 'final':
# these loops are necessary to return the latex() format of the reactant
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[0]:
log_str += ", final_x = '" + latex(reactant).replace('\\', '\\\\') + "'"
break
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[1]:
log_str += ", final_y = '" + latex(reactant).replace('\\', '\\\\') + "'"
break
log_str += ", plotProportions = " + str(self._plotProportions)
log_str += ", realtimePlot = " + str(self._realtimePlot)
log_str += ", runs = " + str(self._runs)
log_str += ", aggregateResults = " + str(self._aggregateResults)
log_str += ", silent = " + str(self._silent)
log_str += ", bookmark = False"
# if len(self._generatingKwargs) > 0:
# for key in self._generatingKwargs:
# if type(self._generatingKwargs[key]) == str:
# log_str += key + " = " + "\'"+ str(self._generatingKwargs[key]) + "\'" + ", "
# else:
# log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str += ")"
return log_str
def _printStandaloneViewCmd(self) -> None:
MAParams = {}
MAParams["initialState"] = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
MAParams["maxTime"] = self._maxTime
MAParams['timestepSize'] = self._timestepSize
MAParams["randomSeed"] = self._randomSeed
MAParams['netType'] = utils._encodeNetworkTypeToString(self._netType)
if self._netType != consts.NetworkType.FULLY_CONNECTED:
MAParams['netParam'] = self._netParam
if self._netType == consts.NetworkType.DYNAMIC:
MAParams['motionCorrelatedness'] = self._motionCorrelatedness
MAParams['particleSpeed'] = self._particleSpeed
MAParams['showTrace'] = self._showTrace
MAParams['showInteractions'] = self._showInteractions
MAParams["visualisationType"] = self._visualisationType
if self._visualisationType == 'final':
# this loop is necessary to return the latex() format of the reactant
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[0]:
MAParams['final_x'] = latex(reactant)
if str(reactant) == self._finalViewAxes[1]:
MAParams['final_y'] = latex(reactant)
MAParams["plotProportions"] = self._plotProportions
MAParams["realtimePlot"] = self._realtimePlot
MAParams["runs"] = self._runs
MAParams["aggregateResults"] = self._aggregateResults
# sortedDict = "{"
# for key,value in sorted(MAParams.items()):
# sortedDict += "'" + key + "': " + str(value) + ", "
# sortedDict += "}"
print("mumot.MuMoTmultiagentView(<modelName>, None, " + self._get_bookmarks_params().replace('\\', '\\\\') + ", SSParams = " + str(MAParams) + " )")
def _update_view_specific_params(self, freeParamDict=None) -> None:
"""Read the new parameters (in case they changed in the controller) specific to multiagent().
This function should only update local parameters and not compute data.
"""
if freeParamDict is None:
freeParamDict = {}
super()._update_view_specific_params(freeParamDict)
self._adjust_barabasi_network_range()
if self._controller is not None:
self._netType = self._getWidgetParamValue('netType', self._controller._widgetsExtraParams)
if self._netType != consts.NetworkType.FULLY_CONNECTED: # this used to refer only to value in self._fixedParams; possible bug?
self._netParam = self._getWidgetParamValue('netParam', self._controller._widgetsExtraParams) # self._fixedParams['netParam'] if self._fixedParams.get('netParam') is not None else self._controller._widgetsExtraParams['netParam'].value
if self._netType is None or self._netType == consts.NetworkType.DYNAMIC: # this used to refer only to value in self._fixedParams; possible bug?
self._motionCorrelatedness = self._getWidgetParamValue('motionCorrelatedness', self._controller._widgetsExtraParams) # self._fixedParams['motionCorrelatedness'] if self._fixedParams.get('motionCorrelatedness') is not None else self._controller._widgetsExtraParams['motionCorrelatedness'].value
self._particleSpeed = self._getWidgetParamValue('particleSpeed', self._controller._widgetsExtraParams) # self._fixedParams['particleSpeed'] if self._fixedParams.get('particleSpeed') is not None else self._controller._widgetsExtraParams['particleSpeed'].value
self._showTrace = self._getWidgetParamValue('showTrace', self._controller._widgetsPlotOnly) # self._fixedParams['showTrace'] if self._fixedParams.get('showTrace') is not None else self._controller._widgetsPlotOnly['showTrace'].value
self._showInteractions = self._getWidgetParamValue('showInteractions', self._controller._widgetsPlotOnly) # self._fixedParams['showInteractions'] if self._fixedParams.get('showInteractions') is not None else self._controller._widgetsPlotOnly['showInteractions'].value
self._timestepSize = self._getWidgetParamValue('timestepSize', self._controller._widgetsExtraParams) # self._fixedParams['timestepSize'] if self._fixedParams.get('timestepSize') is not None else self._controller._widgetsExtraParams['timestepSize'].value
self._computeScalingFactor()
def _initSingleSimulation(self) -> None:
super()._initSingleSimulation()
# init the network
self._initGraph()
# init the agents
self._initMultiagent()
def _initFigure(self) -> None:
super()._initFigure()
if self._visualisationType == "graph":
# plt.axes().set_aspect('equal')
ax = plt.gca()
ax.set_aspect('equal')
# def _updateSimultationFigure(self, evo, fullPlot=True):
def _updateSimultationFigure(self, allResults, fullPlot=True, currentEvo=None):
if (self._visualisationType == "graph"):
self._initFigure()
# plt.clf()
# plt.axes().set_aspect('equal')
if self._netType == consts.NetworkType.DYNAMIC:
plt.xlim((0, 1))
plt.ylim((0, 1))
# plt.axes().set_aspect('equal')
# xs = [p[0] for p in positions]
# ys = [p[1] for p in positions]
# plt.plot(xs, ys, 'o' )
xs = {}
ys = {}
for state in self._initialState.keys():
xs[state] = []
ys[state] = []
for a in np.arange(len(self._positions)):
xs[self._agents[a]].append(self._positions[a][0])
ys[self._agents[a]].append(self._positions[a][1])
if self._showInteractions:
agent_p = [self._positions[a][0], self._positions[a][1]]
for n in self._getNeighbours(a, self._positions, self._netParam):
neigh_p = [self._positions[n][0], self._positions[n][1]]
jump_boudaries = False
if abs(agent_p[0] - neigh_p[0]) > self._netParam:
jump_boudaries = True
if agent_p[0] > neigh_p[0]:
neigh_p[0] += self._arena_width
else:
neigh_p[0] -= self._arena_width
if abs(agent_p[1] - neigh_p[1]) > self._netParam:
jump_boudaries = True
if agent_p[1] > neigh_p[1]:
neigh_p[1] += self._arena_height
else:
neigh_p[1] -= self._arena_height
plt.plot((agent_p[0], neigh_p[0]), (agent_p[1], neigh_p[1]), '-', c='orange' if jump_boudaries else 'y')
# plt.plot((self._positions[a][0], self._positions[n][0]),(self._positions[a][1], self._positions[n][1]), '-', c='y')
if self._showTrace:
trace_xs = []
trace_ys = []
trace_xs.append(self._positions[a][0])
trace_ys.append(self._positions[a][1])
for p in reversed(self._positionHistory[a]):
# check if the trace is making a jump from one side to the other of the screen
if abs(trace_xs[-1] - p[0]) > self._particleSpeed or abs(trace_ys[-1] - p[1]) > self._particleSpeed:
tmp_start = [trace_xs[-1], trace_ys[-1]]
if abs(trace_xs[-1] - p[0]) > self._particleSpeed:
if trace_xs[-1] > p[0]:
trace_xs.append(self._arena_width)
tmp_start[0] = 0
else:
trace_xs.append(0)
tmp_start[0] = self._arena_width
else:
trace_xs.append(p[0])
if abs(trace_ys[-1] - p[1]) > self._particleSpeed:
if trace_ys[-1] > p[1]:
trace_ys.append(self._arena_height)
tmp_start[1] = 0
else:
trace_ys.append(0)
tmp_start[1] = self._arena_height
else:
trace_ys.append(p[1])
plt.plot(trace_xs, trace_ys, '-', c='0.6')
trace_xs = []
trace_ys = []
trace_xs.append(tmp_start[0])
trace_ys.append(tmp_start[1])
trace_xs.append(p[0])
trace_ys.append(p[1])
plt.plot(trace_xs, trace_ys, '-', c='0.6')
for state in self._initialState.keys():
plt.plot(xs.get(state, []), ys.get(state, []), 'o', c=self._colors[state])
else:
stateColors = []
for n in self._graph.nodes():
stateColors.append(self._colors.get(self._agents[n], 'w'))
nx.draw_networkx(self._graph, self._positionHistory, node_color=stateColors, with_labels=True)
plt.axis('off')
# plot legend
stateNamesLabel = [r'$' + utils._doubleUnderscorify(utils._greekPrependify(str(sympy.Symbol(str(state))))) + '$' for state in sorted(self._initialState.keys(), key=str) if state not in self._mumotModel._constantReactants]
# stateNamesLabel = [r'$' + latex(sympy.Symbol(str(state))) + '$' for state in sorted(self._initialState.keys(), key=str)]
markers = [plt.Line2D([0, 0], [0, 0], color=self._colors[state], marker='o', linestyle='', markersize=10) for state in sorted(self._initialState.keys(), key=str)]
plt.legend(markers, stateNamesLabel, bbox_to_anchor=(1, 1), loc=self._legend_loc, borderaxespad=0., numpoints=1, fontsize=self._legend_fontsize)
super()._updateSimultationFigure(allResults, fullPlot, currentEvo)
def _computeScalingFactor(self):
# Determining the minimum speed of the process (thus the max-scaling factor)
maxRatesAll = 0
for reactant, reactions in self._mumotModel._agentProbabilities.items():
if reactant == consts.EMPTYSET_SYMBOL:
continue # not considering the spontaneous births as limiting component for simulation step
sumRates = 0
for reaction in reactions:
sumRates += self._ratesDict[str(reaction[1])]
# print("self._ratesDict " + str(self._ratesDict) )
# print("reactant " + str(reactant) + " has sum rates: " + str(sumRates))
if sumRates > maxRatesAll:
maxRatesAll = sumRates
if maxRatesAll > 0:
maxTimestepSize = 1 / maxRatesAll
else:
maxTimestepSize = 1
# if the timestep size is too small (and generated a too large number of timesteps, it returns an error!)
if math.ceil(self._maxTime / maxTimestepSize) > 10000000:
errorMsg = "ERROR! Invalid rate values. The current rates limit the agent timestep to be too small and would correspond to more than 10 milions simulation timesteps.\n"\
"Please modify the free parameters value to allow quicker simulations."
self._showErrorMessage(errorMsg)
raise exceptions.MuMoTValueError(errorMsg)
if self._timestepSize > maxTimestepSize:
self._timestepSize = maxTimestepSize
self._maxTimeSteps = math.ceil(self._maxTime / self._timestepSize)
if self._controller is not None and self._controller._widgetsExtraParams.get('timestepSize'):
self._update_timestepSize_widget(self._timestepSize, maxTimestepSize, self._maxTimeSteps)
else:
if self._fixedParams.get('timestepSize') != self._timestepSize:
self._showErrorMessage(f"Time step size was fixed to {self._fixedParams.get('timestepSize')} but needs to be updated to {self._timestepSize}")
self._fixedParams['timestepSize'] = self._timestepSize
def _update_timestepSize_widget(self, timestepSize, maxTimestepSize, maxTimeSteps):
if not self._controller._widgetsExtraParams['timestepSize'].value == timestepSize:
if self._controller._replotFunction:
self._controller._widgetsExtraParams['timestepSize'].unobserve(self._controller._replotFunction, 'value')
if (self._controller._widgetsExtraParams['timestepSize'].max < timestepSize):
self._controller._widgetsExtraParams['timestepSize'].max = maxTimestepSize
if (self._controller._widgetsExtraParams['timestepSize'].min > timestepSize):
self._controller._widgetsExtraParams['timestepSize'].min = timestepSize
self._controller._widgetsExtraParams['timestepSize'].value = timestepSize
if self._controller._replotFunction:
self._controller._widgetsExtraParams['timestepSize'].observe(self._controller._replotFunction, 'value')
if not self._controller._widgetsExtraParams['timestepSize'].max == maxTimestepSize:
self._controller._widgetsExtraParams['timestepSize'].max = maxTimestepSize
self._controller._widgetsExtraParams['timestepSize'].min = min(maxTimestepSize / 100, timestepSize)
self._controller._widgetsExtraParams['timestepSize'].step = self._controller._widgetsExtraParams['timestepSize'].min
self._controller._widgetsExtraParams['timestepSize'].readout_format = '.' + str(utils._count_sig_decimals(str(self._controller._widgetsExtraParams['timestepSize'].step))) + 'f'
if self._controller._widgetsExtraParams.get('maxTime'):
self._controller._widgetsExtraParams['maxTime'].description = f"Simulation time (equivalent to {maxTimeSteps} simulation timesteps)"
self._controller._widgetsExtraParams['maxTime'].layout = widgets.Layout(width='70%')
else:
self._controller._widgetsExtraParams['timestepSize'].description = f"Timestep size (total time is {self._fixedParams['maxTime']} = {maxTimeSteps} timesteps)"
self._controller._widgetsExtraParams['timestepSize'].layout = widgets.Layout(width='70%')
def _initGraph(self):
numNodes = sum(self._currentState.values())
if (self._netType == consts.NetworkType.FULLY_CONNECTED):
# print("Generating full graph")
self._graph = nx.complete_graph(numNodes) # np.repeat(0, self.numNodes)
elif (self._netType == consts.NetworkType.ERSOS_RENYI):
# print("Generating Erdos-Renyi graph (connected)")
if self._netParam is not None and self._netParam > 0 and self._netParam <= 1:
self._graph = nx.erdos_renyi_graph(numNodes, self._netParam, np.random.randint(consts.MAX_RANDOM_SEED))
i = 0
while (not nx.is_connected(self._graph)):
if i > 100000:
errorMsg = (f"ERROR! Invalid network parameter (link probability={self._netParam} for E-R networks."
f"After {i} attempts of network initialisation, the network is never connected.\n"
"Please increase the network parameter value.")
print(errorMsg)
raise exceptions.MuMoTValueError(errorMsg)
# print("Graph was not connected; Resampling!")
i = i + 1
self._graph = nx.erdos_renyi_graph(numNodes, self._netParam, np.random.randint(consts.MAX_RANDOM_SEED))
else:
errorMsg = ("ERROR! Invalid network parameter (link probability) for E-R networks. "
f"It must be between 0 and 1; input is {self._netParam}")
raise exceptions.MuMoTValueError(errorMsg)
elif (self._netType == consts.NetworkType.BARABASI_ALBERT):
# print("Generating Barabasi-Albert graph")
netParam = int(self._netParam)
if netParam is not None and netParam > 0 and netParam <= numNodes:
self._graph = nx.barabasi_albert_graph(numNodes, netParam, np.random.randint(consts.MAX_RANDOM_SEED))
else:
errorMsg = ("ERROR! Invalid network parameter (number of edges per new node) for B-A networks."
f"It must be an integer between 1 and {numNodes}; input is {self._netParam}")
raise exceptions.MuMoTValueError(errorMsg)
elif (self._netType == consts.NetworkType.SPACE):
# @todo: implement network generate by placing points (with local communication range) randomly in 2D space
errorMsg = "ERROR: Graphs of type SPACE are not implemented yet."
raise exceptions.MuMoTValueError(errorMsg)
elif (self._netType == consts.NetworkType.DYNAMIC):
self._positions = []
for _ in range(numNodes):
x = np.random.rand() * self._arena_width
y = np.random.rand() * self._arena_height
o = np.random.rand() * np.pi * 2.0
self._positions.append((x, y, o))
return
def _initMultiagent(self):
# init the agents list
self._agents = []
for state, pop in self._currentState.items():
self._agents.extend([state] * pop)
self._agents = np.random.permutation(self._agents).tolist() # random shuffling of elements (useful to avoid initial clusters in networks)
# init the positionHistory lists
dynamicNetwork = self._netType == consts.NetworkType.DYNAMIC
if dynamicNetwork:
self._positionHistory = []
for _ in np.arange(sum(self._currentState.values())):
self._positionHistory.append([])
else: # store the graph layout (only for 'graph' visualisation)
self._positionHistory = nx.circular_layout(self._graph)
def _simulationStep(self):
tmp_agents = copy.deepcopy(self._agents)
dynamic = self._netType == consts.NetworkType.DYNAMIC
if dynamic:
tmp_positions = copy.deepcopy(self._positions)
communication_range = self._netParam
# store the position history
for idx, _ in enumerate(self._agents): # second element _ is the agent (unused)
self._positionHistory[idx].append(self._positions[idx])
children = []
activeAgents = [True] * len(self._agents)
# for idx, a in enumerate(self._agents):
# to execute in random order the agents I just create a shuffled list of idx and I follow that
indexes = np.arange(0, len(self._agents))
indexes = np.random.permutation(indexes).tolist() # shuffle the indexes
for idx in indexes:
a = self._agents[idx]
# if moving-particles the agent moves
if dynamic:
# print("Agent " + str(idx) + " moved from " + str(self._positions[idx]) )
self._positions[idx] = self._updatePosition(self._positions[idx][0], self._positions[idx][1], self._positions[idx][2], self._particleSpeed, self._motionCorrelatedness)
# print("to position " + str(self._positions[idx]) )
# the step is executed only if the agent is active
if not activeAgents[idx]:
continue
# computing the list of neighbours for the given agent
if dynamic:
neighNodes = self._getNeighbours(idx, tmp_positions, communication_range)
else:
neighNodes = list(nx.all_neighbors(self._graph, idx))
neighNodes = np.random.permutation(neighNodes).tolist() # random shuffling of neighNodes (to randomise interactions)
neighAgents = [tmp_agents[x] for x in neighNodes] # creating the list of neighbours' states
neighActive = [activeAgents[x] for x in neighNodes] # creating the list of neighbour' activity-status
# print("Neighs of agent " + str(idx) + " are " + str(neighNodes) + " with states " + str(neighAgents) )
# run one simulation step for agent a
oneStepOutput = self._stepOneAgent(a, neighAgents, neighActive)
self._agents[idx] = oneStepOutput[0][0]
# check for new particles generated in the step
if len(oneStepOutput[0]) > 1: # new particles must be created
for particle in oneStepOutput[0][1:]:
children.append((particle, tmp_positions[idx]))
for idx_c, neighChange in enumerate(oneStepOutput[1]):
if neighChange:
activeAgents[neighNodes[idx_c]] = False
self._agents[neighNodes[idx_c]] = neighChange
# add the new agents coming from splitting (possible only for moving-particles view)
for child in children:
self._agents.append(child[0])
self._positions.append(child[1])
self._positionHistory.append([])
idx = len(self._positions) - 1
self._positionHistory[idx].append(self._positions[idx])
self._positions[idx] = (self._positions[idx][0], self._positions[idx][1], np.random.rand() * np.pi * 2.0) # set random orientation
# self._positions[idx][2] = np.random.rand() * np.pi * 2.0 # set random orientation
self._positions[idx] = self._updatePosition(self._positions[idx][0], self._positions[idx][1], self._positions[idx][2], self._particleSpeed, self._motionCorrelatedness)
# compute self birth (possible only for moving-particles view)
for birth in self._mumotModel._agentProbabilities[consts.EMPTYSET_SYMBOL]:
birthRate = self._ratesDict[str(birth[1])] * self._timestepSize # scale the rate
decimal = birthRate % 1
birthsNum = int(birthRate - decimal)
np.random.rand()
if (np.random.rand() < decimal):
birthsNum += 1
# print ( "Birth rate " + str(birth[1]) + " triggers " + str(birthsNum) + " newborns")
for _ in range(birthsNum):
for newborn in birth[2]:
self._agents.append(newborn)
self._positions.append((np.random.rand() * self._arena_width, np.random.rand() * self._arena_height, np.random.rand() * np.pi * 2.0))
self._positionHistory.append([])
self._positionHistory[len(self._positions) - 1].append(self._positions[len(self._positions) - 1])
# Remove from lists (_agents, _positions, and _positionHistory) the 'dead' agents (possible only for moving-particles view)
deads = [idx for idx, a in enumerate(self._agents) if a == consts.EMPTYSET_SYMBOL]
# print("Dead list is " + str(deads))
for dead in reversed(deads):
del self._agents[dead]
del self._positions[dead]
del self._positionHistory[dead]
currentState = {state: self._agents.count(state) for state in self._initialState.keys()} # if state not in self._mumotModel._constantReactants} # self._mumotModel._reactants | self._mumotModel._constantReactants}
return (self._timestepSize, currentState)
def _stepOneAgent(self, agent, neighs, activeNeighs):
"""One timestep for one agent."""
rnd = np.random.rand()
lastVal = 0
neighChanges = [None] * len(neighs)
# counting how many neighbours for each state (to be uses for the interaction probabilities)
neighCount = {x: neighs.count(x) for x in self._initialState.keys()} # self._mumotModel._reactants | self._mumotModel._constantReactants}
for idx, neigh in enumerate(neighs):
if not activeNeighs[idx]:
neighCount[neigh] -= 1
# print("Agent " + str(agent) + " with probSet=" + str(probSets))
# print("nc:" + str(neighCount))
for reaction in self._mumotModel._agentProbabilities[agent]:
popScaling = 1
rate = self._ratesDict[str(reaction[1])] * self._timestepSize # scaling the rate by the timeStep size
if len(neighs) >= len(reaction[0]):
j = 0
for reagent in reaction[0]:
popScaling *= (neighCount[reagent] / (len(neighs) - j)
if neighCount[reagent] >= reaction[0].count(reagent)
else 0)
j += 1
else:
popScaling = 0
val = popScaling * rate
# print(f"For reaction: {agent}+{reaction[0]} the popScaling is {popScaling}")
if (rnd < val + lastVal):
# A state change happened!
# print(f"Reaction: {reaction[1]} by agent {agent} with agent(s) {reaction[0]} becomes {reaction[2]} &others: {reaction[3]}")
# print("Val was: {val} lastVal: {lastVal} and rand: {rnd}")
# Locking the other reagents involved in the reaction
for idx_r, reagent in enumerate(reaction[0]):
for idx_n, neigh in enumerate(neighs):
if neigh == reagent and activeNeighs[idx_n] and neighChanges[idx_n] is None:
neighChanges[idx_n] = reaction[3][idx_r]
break
return (reaction[2], neighChanges)
else:
lastVal += val
# No state change happened
return ([agent], neighChanges)
def _updatePosition(self, x, y, o, speed, correlatedness):
# random component
rand_o = np.random.rand() * np.pi * 2.0
rand_x = speed * np.cos(rand_o) * (1 - correlatedness)
rand_y = speed * np.sin(rand_o) * (1 - correlatedness)
# persistance component
corr_x = speed * np.cos(o) * correlatedness
corr_y = speed * np.sin(o) * correlatedness
# movement
move_x = rand_x + corr_x
move_y = rand_y + corr_y
# new orientation
o = np.arctan2(move_y, move_x)
# new position
x = x + move_x
y = y + move_y
# Implement the periodic boundary conditions
x = x % self._arena_width
y = y % self._arena_height
# CODE FOR A BOUNDED ARENA
# if a.position.x < 0:
# a.position.x = 0
# elif a.position.x > self.dimensions.x:
# a.position.x = self.dimensions.x
# if a.position.y < 0:
# a.position.y = 0
# elif a.position.y > self.dimensions.y:
# a.position.x = self.dimensions.x
return (x, y, o)
def _getNeighbours(self, agent, positions, distance_range):
"""Return the (index) list of neighbours of ``agent``."""
neighbour_list = []
for neigh in np.arange(len(positions)):
if (not neigh == agent) and (self._distance_on_torus(positions[agent][0], positions[agent][1], positions[neigh][0], positions[neigh][1]) < distance_range):
neighbour_list.append(neigh)
return neighbour_list
def _distance_on_torus(self, x_1, y_1, x_2, y_2):
"""Returns the minimum distance calculated on the torus given by periodic boundary conditions."""
return np.sqrt(min(abs(x_1 - x_2), self._arena_width - abs(x_1 - x_2))**2 +
min(abs(y_1 - y_2), self._arena_height - abs(y_1 - y_2))**2)
def _update_net_params(self, resetValueAndRange):
"""Update the widgets related to the netType
(it cannot be a :class:`MuMoTcontroller` method because with multi-controller it needs to point to the right ``_controller``)
"""
# if netType is fixed, no update is necessary
if self._fixedParams.get('netParam') is not None:
return
self._netType = (self._fixedParams['netType']
if self._fixedParams.get('netType') is not None
else self._controller._widgetsExtraParams['netType'].value)
# oder of assignment is important (first, update the min and max, later, the value)
toLinkPlotFunction = False
if self._controller._replotFunction:
try:
self._controller._widgetsExtraParams['netParam'].unobserve(self._controller._replotFunction, 'value')
toLinkPlotFunction = True
except ValueError:
pass
if resetValueAndRange:
self._controller._widgetsExtraParams['netParam'].max = float("inf") # temp to avoid min > max exception
if (self._netType == consts.NetworkType.FULLY_CONNECTED):
# self._controller._widgetsExtraParams['netParam'].min = 0
# self._controller._widgetsExtraParams['netParam'].max = 1
# self._controller._widgetsExtraParams['netParam'].step = 1
# self._controller._widgetsExtraParams['netParam'].value = 0
# self._controller._widgetsExtraParams['netParam'].disabled = True
# self._controller._widgetsExtraParams['netParam'].description = "None"
self._controller._widgetsExtraParams['netParam'].layout.display = 'none'
elif (self._netType == consts.NetworkType.ERSOS_RENYI):
# self._controller._widgetsExtraParams['netParam'].disabled = False
self._controller._widgetsExtraParams['netParam'].layout.display = 'flex'
if resetValueAndRange:
self._controller._widgetsExtraParams['netParam'].min = 0.1
self._controller._widgetsExtraParams['netParam'].max = 1
self._controller._widgetsExtraParams['netParam'].step = 0.1
self._controller._widgetsExtraParams['netParam'].value = 0.5
self._controller._widgetsExtraParams['netParam'].description = "Network connectivity parameter (link probability)"
elif (self._netType == consts.NetworkType.BARABASI_ALBERT):
# self._controller._widgetsExtraParams['netParam'].disabled = False
self._controller._widgetsExtraParams['netParam'].layout.display = 'flex'
maxVal = self._systemSize - 1
if resetValueAndRange:
self._controller._widgetsExtraParams['netParam'].min = 1
self._controller._widgetsExtraParams['netParam'].max = maxVal
self._controller._widgetsExtraParams['netParam'].step = 1
self._controller._widgetsExtraParams['netParam'].value = min(maxVal, 3)
self._controller._widgetsExtraParams['netParam'].description = "Network connectivity parameter (new edges)"
elif (self._netType == consts.NetworkType.SPACE):
self._controller._widgetsExtraParams['netParam'].value = -1
if (self._netType == consts.NetworkType.DYNAMIC):
# self._controller._widgetsExtraParams['netParam'].disabled = False
self._controller._widgetsExtraParams['netParam'].layout.display = 'flex'
if resetValueAndRange:
self._controller._widgetsExtraParams['netParam'].min = 0.0
self._controller._widgetsExtraParams['netParam'].max = 1
self._controller._widgetsExtraParams['netParam'].step = 0.05
self._controller._widgetsExtraParams['netParam'].value = 0.1
self._controller._widgetsExtraParams['netParam'].description = "Interaction range"
# self._controller._widgetsExtraParams['particleSpeed'].disabled = False
# self._controller._widgetsExtraParams['motionCorrelatedness'].disabled = False
# self._controller._widgetsPlotOnly['showTrace'].disabled = False
# self._controller._widgetsPlotOnly['showInteractions'].disabled = False
if self._controller._widgetsExtraParams.get('particleSpeed') is not None:
self._controller._widgetsExtraParams['particleSpeed'].layout.display = 'flex'
if self._controller._widgetsExtraParams.get('motionCorrelatedness') is not None:
self._controller._widgetsExtraParams['motionCorrelatedness'].layout.display = 'flex'
if self._controller._widgetsPlotOnly.get('showTrace') is not None:
self._controller._widgetsPlotOnly['showTrace'].layout.display = 'flex'
if self._controller._widgetsPlotOnly.get('showInteractions') is not None:
self._controller._widgetsPlotOnly['showInteractions'].layout.display = 'flex'
else:
# self._controller._widgetsExtraParams['particleSpeed'].disabled = True
# self._controller._widgetsExtraParams['motionCorrelatedness'].disabled = True
# self._controller._widgetsPlotOnly['showTrace'].disabled = True
# self._controller._widgetsPlotOnly['showInteractions'].disabled = True
if self._controller._widgetsExtraParams.get('particleSpeed') is not None:
self._controller._widgetsExtraParams['particleSpeed'].layout.display = 'none'
if self._controller._widgetsExtraParams.get('motionCorrelatedness') is not None:
self._controller._widgetsExtraParams['motionCorrelatedness'].layout.display = 'none'
if self._controller._widgetsPlotOnly.get('showTrace') is not None:
self._controller._widgetsPlotOnly['showTrace'].layout.display = 'none'
if self._controller._widgetsPlotOnly.get('showInteractions') is not None:
self._controller._widgetsPlotOnly['showInteractions'].layout.display = 'none'
self._controller._widgetsExtraParams['netParam'].readout_format = \
'.' + str(utils._count_sig_decimals(str(self._controller._widgetsExtraParams['netParam'].step))) + 'f'
if toLinkPlotFunction:
self._controller._widgetsExtraParams['netParam'].observe(self._controller._replotFunction, 'value')
def _adjust_barabasi_network_range(self) -> None:
"""Adjust the widget of the number of edges of the Barabasi-Albert network when the system size slider is changed."""
if self._controller is None or not self._netType == consts.NetworkType.BARABASI_ALBERT or self._controller._widgetsExtraParams.get('netParam') is None:
return
maxVal = self._systemSize - 1
if self._controller._widgetsExtraParams['netParam'].max == maxVal:
# the value is correct and no action is necessary
return
toLinkPlotFunction = False
if self._controller._replotFunction:
try:
self._controller._widgetsExtraParams['netParam'].unobserve(self._controller._replotFunction, 'value')
toLinkPlotFunction = True
except ValueError:
pass
self._controller._widgetsExtraParams['netParam'].max = maxVal
if toLinkPlotFunction:
self._controller._widgetsExtraParams['netParam'].observe(self._controller._replotFunction, 'value')
[docs]class MuMoTSSAView(MuMoTstochasticSimulationView):
"""View for computational simulations of the Gillespie algorithm to approximate the Master Equation solution."""
# A matrix form of the left-handside of the rules
_reactantsMatrix = None
# The effect of each rule
_ruleChanges = None
def _constructorSpecificParams(self, _) -> None:
if self._controller is not None:
self._generatingCommand = "SSA"
def _build_bookmark(self, includeParams=True) -> str:
log_str = "bookmark = " if not self._silent else ""
log_str += "<modelName>." + self._generatingCommand + "("
if includeParams:
log_str += self._get_bookmarks_params()
log_str += ", "
log_str = log_str.replace('\\', '\\\\')
# initState_str = {self._mumotModel._reactantsLaTeX.get(str(state), str(state)): pop
# for state,pop in self._initialState.items()
# if state not in self._mumotModel._constantReactants}
initState_str = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
log_str += "initialState = " + str(initState_str)
log_str += ", maxTime = " + str(self._maxTime)
log_str += ", randomSeed = " + str(self._randomSeed)
log_str += ", visualisationType = '" + str(self._visualisationType) + "'"
if self._visualisationType == 'final':
# these loops are necessary to return the latex() format of the reactant
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[0]:
log_str += ", final_x = '" + latex(reactant).replace('\\', '\\\\') + "'"
break
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[1]:
log_str += ", final_y = '" + latex(reactant).replace('\\', '\\\\') + "'"
break
log_str += ", plotProportions = " + str(self._plotProportions)
log_str += ", realtimePlot = " + str(self._realtimePlot)
log_str += ", runs = " + str(self._runs)
log_str += ", aggregateResults = " + str(self._aggregateResults)
log_str += ", silent = " + str(self._silent)
log_str += ", bookmark = False"
# if len(self._generatingKwargs) > 0:
# for key in self._generatingKwargs:
# if type(self._generatingKwargs[key]) == str:
# log_str += key + " = " + "\'"+ str(self._generatingKwargs[key]) + "\'" + ", "
# else:
# log_str += key + " = " + str(self._generatingKwargs[key]) + ", "
log_str += ")"
return log_str
def _printStandaloneViewCmd(self) -> None:
ssa_params = {}
# initState_str = {}
# for state,pop in self._initialState.items():
# initState_str[str(state)] = pop
ssa_params["initialState"] = {latex(state): pop
for state, pop in self._initialState.items()
if state not in self._mumotModel._constantReactants}
ssa_params["maxTime"] = self._maxTime
ssa_params["randomSeed"] = self._randomSeed
ssa_params["visualisationType"] = self._visualisationType
if self._visualisationType == 'final':
# this loop is necessary to return the latex() format of the reactant
for reactant in self._mumotModel._getAllReactants()[0]:
if str(reactant) == self._finalViewAxes[0]:
ssa_params['final_x'] = latex(reactant)
if str(reactant) == self._finalViewAxes[1]:
ssa_params['final_y'] = latex(reactant)
ssa_params["plotProportions"] = self._plotProportions
ssa_params['realtimePlot'] = self._realtimePlot
ssa_params['runs'] = self._runs
ssa_params['aggregateResults'] = self._aggregateResults
# str( list(self._ratesDict.items()) )
print("mumot.MuMoTSSAView(<modelName>, None, " + str(self._get_bookmarks_params().replace('\\', '\\\\')) + ", SSParams = " + str(ssa_params) + " )")
def _simulationStep(self) -> Tuple[float, object]:
"""Update transition probabilities accounting for the current state."""
probabilitiesOfChange = {}
for reaction_id, reaction in self._mumotModel._stoichiometry.items():
prob = self._ratesDict[str(reaction["rate"])]
numReagents = 0
for reactant, re_stoch in reaction.items():
if reactant == 'rate':
continue
if re_stoch == 'const':
reactantOccurencies = 1
else:
reactantOccurencies = re_stoch[0]
if reactantOccurencies > 0:
prob *= self._currentState[reactant] ** reactantOccurencies
numReagents += reactantOccurencies
# print("for reaction " + str(reaction) + " counted " + str(numReagents) + " reagents (prob:" + str(prob) +" ) (rate: " + str(self._ratesDict[str(reaction["rate"])]) + ")")
if prob > 0 and numReagents > 1:
prob /= sum(self._currentState.values())**(numReagents - 1)
probabilitiesOfChange[reaction_id] = prob
# for rule in self._reactantsMatrix:
# prob = sum([a * b for a,b in zip(rule,currentState)])
# numReagents = sum(x > 0 for x in rule)
# if numReagents > 1:
# prob /= sum(currentState)**( numReagents -1 )
# probabilitiesOfChange.append(prob)
probSum = sum(probabilitiesOfChange.values())
if probSum == 0: # no reaction are possible (the execution terminates with this population)
infiniteTime = self._maxTime - self._t
return (infiniteTime, self._currentState)
# computing when is happening next reaction
timeInterval = np.random.exponential(1 / probSum)
# Selecting the occurred reaction at random, with probability proportional to each reaction probabilities
bottom = 0.0
# Get a random between [0,1) (but we don't want 0!)
reaction = 0.0
while reaction == 0.0:
reaction = np.random.random_sample()
# Normalising probOfChange in the range [0,1]
# probabilitiesOfChange = [pc / probSum for pc in probabilitiesOfChange]
probabilitiesOfChange = {r_id: pc / probSum
for r_id, pc in probabilitiesOfChange.items()}
# print("Prob of Change: " + str(probabilitiesOfChange))
# index = -1
# for i, prob in enumerate(probabilitiesOfChange):
# if ( reaction >= bottom and reaction < (bottom + prob)):
# index = i
# break
# bottom += prob
# if (index == -1):
# print("ERROR! Transition not found. Error in the algorithm execution.")
# sys.exit()
reaction_id = -1
for r_id, prob in probabilitiesOfChange.items():
if bottom <= reaction < (bottom + prob):
reaction_id = r_id
break
bottom += prob
if reaction_id == -1:
raise exceptions.MuMoTError("ERROR! Transition not found. Error in the algorithm execution.")
sys.exit()
# print("current state: " + str(self._currentState))
# print("triggered change: " + str(self._mumotModel._stoichiometry[reaction_id]))
# # apply the change
# currentState = list(np.array(self._currentState) + np.array(self._ruleChanges[index]) )
for reactant, re_stoch in self._mumotModel._stoichiometry[reaction_id].items():
if reactant == 'rate' or re_stoch == 'const':
continue
self._currentState[reactant] += re_stoch[1] - re_stoch[0]
if self._currentState[reactant] < 0:
raise exceptions.MuMoTError(f"ERROR! Population size became negative: {self._currentState}; Error in the algorithm execution.")
sys.exit()
# print(self._currentState)
return (timeInterval, self._currentState)
class Arrow3D(mpatch.FancyArrowPatch):
"""Enable arrows to be added to 3D stream plot.
Adapted from https://stackoverflow.com/questions/22867620/putting-arrowheads-on-vectors-in-matplotlibs-3d-plot
"""
def __init__(self, xs, ys, zs, *args, **kwargs):
mpatch.FancyArrowPatch.__init__(self, (0, 0), (0, 0), *args, **kwargs)
self._verts3d = xs, ys, zs
def draw(self, renderer) -> None:
xs3d, ys3d, zs3d = self._verts3d
xs, ys, zs = proj3d.proj_transform(xs3d, ys3d, zs3d, renderer.M)
self.set_positions((xs[0], ys[0]), (xs[1], ys[1]))
mpatch.FancyArrowPatch.draw(self, renderer)
def _roundNumLogsOut(number: Union[sympy.Add, float]) -> str:
""" Round numerical output in Logs to 3 decimal places. """
# if number is complex
if isinstance(number, sympy.Add):
return str(sympy.re(number).round(4)) + str(sympy.im(number).round(4)) + 'j'
# if number is real
else:
return str(number.round(4))
def _buildFig(object, figure=None):
"""Generic function for constructing figure objects in :class:`MuMoTview` and :class:`MuMoTmultiController` classes.
This constructs figure objects with a global figure number.
To avoid superfluous drawings, ``plt.ion`` and ``plt.ioff``
are used to turn interactive mode on and off,
i.e. choosing when data and figure objects are drawn and displayed or not
to avoid replotting of previous figure windows in addition to updated plots.
Parameters
----------
object : MuMoTview figure object
Generated internally.
figure : optional
If ``None` then switch interactive mode on ONLY when global figure number larger than 2
to avoid superfluous matplotlib figure.
Returns
-------
value : Numbered figure object
"""
global figureCounter
object._figureNum = figureCounter
if figureCounter == 1:
plt.ion()
else:
plt.ioff()
figureCounter += 1
if figure is None:
if figureCounter > 2:
plt.ion()
object._figure = plt.figure(object._figureNum)
else:
object._figure = figure
def _fig_formatting_3D(figure, xlab=None, ylab=None, zlab=None, ax_reformat=False,
specialPoints=None, showFixedPoints=False, **kwargs):
"""Function for editing properties of 3D plots.
Called by :class:`MuMoTvectorView` and :class:`MuMoTstreamView`
"""
fig = plt.gcf()
# fig.set_size_inches(10,8)
ax = fig.gca(projection='3d')
if kwargs.get('showPlane', False) is True:
# pointsMesh = np.linspace(0, 1, 11)
# Xdat, Ydat = np.meshgrid(pointsMesh, pointsMesh)
# Zdat = 1 - Xdat - Ydat
# Zdat[Zdat<0] = 0
# ax.plot_surface(Xdat, Ydat, Zdat, rstride=20, cstride=20, color='grey', alpha=0.25)
# ax.plot_wireframe(Xdat, Ydat, Zdat, rstride=1, cstride=1, color='grey', alpha=0.5)
ax.plot([1, 0, 0, 1], [0, 1, 0, 0], [0, 0, 1, 0], linewidth=2, c='k')
if xlab is None:
try:
xlabelstr = ax.xaxis.get_label_text()
if len(ax.xaxis.get_label_text()) == 0:
xlabelstr = 'choose x-label'
except:
xlabelstr = 'choose x-label'
else:
xlabelstr = xlab
if ylab is None:
try:
ylabelstr = ax.yaxis.get_label_text()
if len(