# -*- coding: utf-8
"""Module for tespy network class.
The network is the container for every TESPy simulation. The network class
automatically creates the system of equations describing topology and
parametrization of a specific model and solves it.
This file is part of project TESPy (github.com/oemof/tespy). It's copyrighted
by the contributors recorded in the version control history of the file,
available from its original location tespy/networks/networks.py
SPDX-License-Identifier: MIT
"""
import importlib
import json
import math
import os
import warnings
from pathlib import Path
from time import time
import numpy as np
import pandas as pd
from numpy.linalg import norm
from scipy.optimize import brentq
from tabulate import tabulate
from tespy.components import CycleCloser
from tespy.components import FuelCell
from tespy.components import Source
from tespy.components import WaterElectrolyzer
from tespy.components.component import component_registry
from tespy.connections.connection import ConnectionBase
from tespy.connections.connection import connection_registry
from tespy.tools import helpers as hlp
from tespy.tools import logger
from tespy.tools.characteristics import CharLine
from tespy.tools.characteristics import CharMap
from tespy.tools.data_containers import ComponentCharacteristicMaps as dc_cm
from tespy.tools.data_containers import ComponentCharacteristics as dc_cc
from tespy.tools.data_containers import ComponentProperties as dc_cp
from tespy.tools.data_containers import DataContainer as dc
from tespy.tools.data_containers import FluidProperties as dc_prop
from tespy.tools.data_containers import ScalarVariable as dc_scavar
from tespy.tools.data_containers import VectorVariable as dc_vecvar
from tespy.tools.global_vars import ERR
from tespy.tools.units import SI_UNITS
from tespy.tools.units import Units
# Only require cupy if Cuda shall be used
try:
import cupy as cu
except ModuleNotFoundError:
cu = None
[docs]
class Network:
r"""
Class component is the base class of all TESPy components.
Parameters
----------
iterinfo : boolean
Print convergence progress to console.
h_range : list
List with minimum and maximum values for enthalpy value range.
m_range : list
List with minimum and maximum values for mass flow value range.
p_range : list
List with minimum and maximum values for pressure value range.
Note
----
Units are specified via the :code:`Network.units.set_defaults` interface.
The specification is optional and will use SI units by default.
Range specification is optional, too. The value range is used to stabilize
the newton algorithm. For more information see the "getting started"
section in the online-documentation.
Example
-------
Basic example for a setting up a :code:`tespy.networks.network.Network`
object.
Standard value for iterinfo is :code:`True`. This will print out
convergence progress to the console. You can stop the printouts by setting
this property to :code:`False`.
>>> from tespy.networks import Network
>>> mynetwork = Network()
>>> mynetwork.units.set_defaults(**{
... "pressure": "bar", "pressure_difference": "bar",
... "temperature": "degC"
... })
>>> mynetwork.p_range = [1, 10]
>>> type(mynetwork)
<class 'tespy.networks.network.Network'>
>>> mynetwork.iterinfo = False
>>> mynetwork.iterinfo
False
>>> mynetwork.iterinfo = True
>>> mynetwork.iterinfo
True
A simple network consisting of a source, a pipe and a sink. This example
shows how the printout parameter can be used. We specify
:code:`printout=False` for both connections, the pipe as well as the power
connection. Therefore the :code:`.print_results()` method should not print
any results.
>>> from tespy.networks import Network
>>> from tespy.components import Source, Sink, Pipe, HeatSink
>>> from tespy.connections import Connection, HeatConnection
>>> nw = Network()
>>> nw.units.set_defaults(**{
... "pressure": "bar", "pressure_difference": "bar",
... "temperature": "degC"
... })
>>> so = Source('source')
>>> si = Sink('sink')
>>> p = Pipe('pipe', Q=0, pr=0.95, printout=False)
>>> h = HeatSink('heat to ambient')
>>> a = Connection(so, 'out1', p, 'in1')
>>> b = Connection(p, 'out1', si, 'in1')
>>> nw.add_conns(a, b)
>>> a.set_attr(fluid={'CH4': 1}, T=30, p=10, m=10, printout=False)
>>> b.set_attr(printout=False)
>>> e = HeatConnection(p, 'heat', h, 'heat', printout=False)
>>> nw.add_conns(e)
>>> nw.iterinfo = False
>>> nw.solve('design')
>>> nw.print_results()
"""
def __init__(self, iterinfo=True, units=None, m_range=None, p_range=None, h_range=None, **kwargs):
self._set_defaults()
self.iterinfo = iterinfo
if units is not None:
self.units = units
self.set_attr(**kwargs)
# because the units can still be specified via the deprecated API of
# set_attr, ranges need to be updated after set_attr!
if m_range is not None:
self.m_range = m_range
if p_range is not None:
self.p_range = p_range
if h_range is not None:
self.h_range = h_range
def _serialize(self):
return {
"m_range": list(self.m_range.magnitude),
"p_range": list(self.p_range.magnitude),
"h_range": list(self.h_range.magnitude),
"units": self.units._serialize()
}
def _set_defaults(self):
"""Set default network properties."""
# connection dataframe
dtypes={
"object": object,
"source": object,
"source_id": str,
"target": object,
"target_id": str,
"conn_type": str
}
self.conns = pd.DataFrame(columns=list(dtypes.keys())).astype(dtypes)
self.all_fluids = set()
# component dataframe
dtypes = {
"comp_type": str,
"object": object,
}
self.comps = pd.DataFrame(columns=list(dtypes.keys())).astype(dtypes)
# user defined function dictionary for fast access
self.user_defined_eq = {}
self.subsystems = {}
# results and specification dictionary
self.results = {}
# in case of a design calculation after an offdesign calculation
self.redesign = False
self.checked = False
self.design_path = None
self.iterinfo = True
self.units = Units()
msg = 'Default unit specifications:\n'
for prop, unit in self.units.default.items():
# standard unit set
msg += f"{prop}: {unit}" + "\n"
# don't need the last newline
logger.debug(msg[:-1])
# generic value range
self.m_range_SI = [-1e12, 1e12]
self.p_range_SI = [2e2, 300e5]
self.h_range_SI = [1e3, 7e6]
self.m_range = self.m_range_SI
self.p_range = self.p_range_SI
self.h_range = self.h_range_SI
[docs]
def set_attr(self, **kwargs):
r"""
Set, resets or unsets attributes of a network.
Parameters
----------
iterinfo : boolean
Print convergence progress to console.
h_range : list
List with minimum and maximum values for enthalpy value range.
m_range : list
List with minimum and maximum values for mass flow value range.
p_range : list
List with minimum and maximum values for pressure value range.
"""
if kwargs:
msg = (
"The set_attr method of Network is deprecated and will be "
"removed in the next major release. Please explicitly call "
"the respective set methods for specification of value "
"ranges, units or iterinfo."
)
self.units = kwargs.get('units', self.units)
for key in kwargs:
if "_unit" in key:
msg = (
f"Passing '{key}' to Network.set_attr is no longer "
"supported. Use Network.units.set_defaults() instead."
)
raise TypeError(msg)
for prop in ['m', 'p', 'h']:
key = f"{prop}_range"
if key in kwargs:
msg = (
"Setting variable ranges through the Network.set_attr "
f"is deprecated. Please use Network.set_{key} in the "
"future."
)
warnings.warn(msg, FutureWarning)
logger.warning(msg)
if key == "m_range":
self.m_range = kwargs[key]
elif key == "p_range":
self.p_range = kwargs[key]
else:
self.h_range = kwargs[key]
self.iterinfo = kwargs.get('iterinfo', self.iterinfo)
if "iterinfo" in kwargs:
msg = (
"Setting iterinfo through the Network.set_attr is deprecated. "
"Please directly specify Network.iterinfo=True/False in the "
"future."
)
warnings.warn(msg, FutureWarning)
logger.warning(msg)
def _set_iterinfo(self, value):
if not isinstance(value, bool):
msg = 'Network parameter iterinfo must be True or False!'
logger.error(msg)
raise TypeError(msg)
else:
self._iterinfo = value
def _get_iterinfo(self):
return self._iterinfo
def _set_units(self, value):
if not isinstance(value, Units):
msg = (
"The units must be an instance of class "
"tespy.tools.units.Units."
)
logger.error(msg)
raise TypeError(msg)
else:
self._units = value
def _get_units(self):
return self._units
def _set_m_range(self, value):
self._check_range_dtype(value, "mass flow")
quantity = "mass_flow"
unit = self.units.default[quantity]
self._m_range = self.units.ureg.Quantity(np.array(value), unit)
self.m_range_SI = self.m_range.to(SI_UNITS[quantity]).magnitude
def _get_m_range(self):
return self._m_range
def _set_p_range(self, value):
self._check_range_dtype(value, "pressure")
quantity = "pressure"
unit = self.units.default[quantity]
self._p_range = self.units.ureg.Quantity(np.array(value), unit)
self.p_range_SI = self.p_range.to(SI_UNITS[quantity]).magnitude
def _get_p_range(self):
return self._p_range
def _set_h_range(self, value):
self._check_range_dtype(value, "enthalpy")
quantity = "enthalpy"
unit = self.units.default[quantity]
self._h_range = self.units.ureg.Quantity(np.array(value), unit)
self.h_range_SI = self.h_range.to(SI_UNITS[quantity]).magnitude
def _get_h_range(self):
return self._h_range
@staticmethod
def _check_range_dtype(value, property):
if isinstance(value, list) or isinstance(value, np.ndarray):
return
else:
msg = (
f"Specify the range for {property} as list: [min, max]."
)
logger.error(msg)
raise TypeError(msg)
iterinfo = property(_get_iterinfo, _set_iterinfo)
units = property(_get_units, _set_units)
m_range = property(_get_m_range, _set_m_range)
p_range = property(_get_p_range, _set_p_range)
h_range = property(_get_h_range, _set_h_range)
[docs]
def get_attr(self, key):
r"""
Get the value of a network attribute.
Parameters
----------
key : str
The attribute you want to retrieve.
Returns
-------
out :
Specified attribute.
"""
msg = (
"The Network.get_attr method is deprecated and will be removed "
"in the next major release."
)
warnings.warn(msg, FutureWarning)
logger.warning(msg)
if key in self.__dict__:
return self.__dict__[key]
else:
msg = f"Network has no attribute '{key}'."
logger.error(msg)
raise KeyError(msg)
[docs]
def add_subsystems(self, *args):
r"""
Add one or more subsystems to the network.
Parameters
----------
c : tespy.components.subsystem.Subsystem
The subsystem to be added to the network, subsystem objects si
:code:`network.add_subsystems(s1, s2, s3, ...)`.
"""
for subsystem in args:
if subsystem.label in self.subsystems:
msg = (
'There is already a subsystem with the label '
f'{subsystem.label}. The labels must be unique!'
)
logger.error(msg)
raise ValueError(msg)
self.subsystems[subsystem.label] = subsystem
for c in subsystem.conns.values():
self.add_conns(c)
[docs]
def del_subsystems(self, *args):
r"""
Delete one or more subsystems from the network.
Parameters
----------
c : tespy.components.subsystem.Subsystem
The subsystem to be deleted from the network, subsystem objects si
:code:`network.del_subsystems(s1, s2, s3, ...)`.
"""
for subsystem in args:
if subsystem.label in self.subsystems:
for c in subsystem.conns.values():
self.del_conns(c)
del self.subsystems[subsystem.label]
[docs]
def get_subsystem(self, label):
r"""
Get Subsystem via label.
Parameters
----------
label : str
Label of the Subsystem object.
Returns
-------
tespy.components.subsystem.Subsystem
Subsystem objectt with specified label, None if no Subsystem of
the network has this label.
"""
try:
return self.subsystems[label]
except KeyError:
logger.warning(f"Subsystem with label {label} not found.")
return None
[docs]
def get_conn(self, label):
r"""
Get Connection via label.
Parameters
----------
label : str
Label of the Connection object.
Returns
-------
c : tespy.connections.connection.Connection
Connection object with specified label, None if no Connection of
the network has this label.
"""
try:
return self.conns.loc[label, 'object']
except KeyError:
warnings.warn(
f"Connection with label {label} not found. Returning None is "
"deprecated and will raise a KeyError in a future version.",
FutureWarning,
stacklevel=2,
)
return None
[docs]
def get_comp(self, label):
r"""
Get Component via label.
Parameters
----------
label : str
Label of the Component object.
Returns
-------
c : tespy.components.component.Component
Component object with specified label, None if no Component of
the network has this label.
"""
try:
return self.comps.loc[label, 'object']
except KeyError:
warnings.warn(
f"Component with label {label} not found. Returning None is "
"deprecated and will raise a KeyError in a future version.",
FutureWarning,
stacklevel=2,
)
return None
[docs]
def add_conns(self, *args):
r"""
Add one or more connections to the network.
Parameters
----------
c : tespy.connections.connection.Connection
The connection to be added to the network, connections objects ci
:code:`add_conns(c1, c2, c3, ...)`.
"""
for c in args:
if not isinstance(c, ConnectionBase):
msg = (
'Must provide tespy.connections.connection.Connection '
'objects as parameters.'
)
logger.error(msg)
raise TypeError(msg)
elif c.label in self.conns.index:
msg = (
'There is already a connection with the label '
f'{c.label}. The connection labels must be unique!'
)
logger.error(msg)
raise ValueError(msg)
c.good_starting_values = False
conn_type = c.__class__.__name__
self.conns.loc[c.label] = [
c, c.source, c.source_id, c.target, c.target_id, conn_type
]
msg = f'Added connection {c.label} to network.'
logger.debug(msg)
# set status "checked" to false, if connection is added to network.
self.checked = False
self.conns = self.conns.sort_index()
self._add_comps(*args)
[docs]
def del_conns(self, *args):
"""
Remove one or more connections from the network.
Parameters
----------
c : tespy.connections.connection.Connection
The connection to be removed from the network, connections objects
ci :code:`del_conns(c1, c2, c3, ...)`.
"""
comps = list({cp for c in args for cp in [c.source, c.target]})
for c in args:
self.conns.drop(c.label, inplace=True)
if c.__class__.__name__ in self.results:
self.results[c.__class__.__name__].drop(
c.label, inplace=True, errors="ignore"
)
msg = f'Deleted connection {c.label} from network.'
logger.debug(msg)
self._del_comps(comps)
# set status "checked" to false, if connection is deleted from network.
self.checked = False
def _add_comps(self, *args):
r"""
Add to network's component DataFrame from added connections.
Parameters
----------
c : tespy.connections.connection.Connection
The connections, which have been added to the network. The
components are extracted from these information.
"""
# get unique components in new connections
comps = list({cp for c in args for cp in [c.source, c.target]})
# add to the dataframe of components
for comp in comps:
if comp.label in self.comps.index:
if self.comps.loc[comp.label, 'object'] == comp:
continue
else:
comp_type = comp.__class__.__name__
other_obj = self.comps.loc[comp.label, "object"]
other_comp_type = other_obj.__class__.__name__
msg = (
f"The component with the label {comp.label} of type "
f"{comp_type} cannot be added to the network as a "
f"different component of type {other_comp_type} with "
"the same label has already been added. All "
"components must have unique values!"
)
raise hlp.TESPyNetworkError(msg)
comp_type = comp.__class__.__name__
self.comps.loc[comp.label, 'comp_type'] = comp_type
self.comps.loc[comp.label, 'object'] = comp
self.comps = self.comps.sort_index()
def _del_comps(self, comps):
r"""
Delete from network's component DataFrame from deleted connections.
For every component it is checked, if it is still part of other
connections, which have not been deleted. The component is only
removed if it cannot be found int the remaining connections.
Parameters
----------
comps : list
List of components to potentially be deleted.
"""
for comp in comps:
if (
comp not in self.conns["source"].values and
comp not in self.conns["target"].values
):
self.comps.drop(comp.label, inplace=True)
comp_type = comp.__class__.__name__
if comp_type in self.results:
self.results[comp_type].drop(
comp.label, inplace=True, errors="ignore"
)
msg = f"Deleted component {comp.label} from network."
logger.debug(msg)
[docs]
def add_ude(self, *args):
r"""
Add a user defined function to the network.
Parameters
----------
c : tespy.tools.helpers.UserDefinedEquation
The objects to be added to the network, UserDefinedEquation objects
ci :code:`add_ude(c1, c2, c3, ...)`.
"""
for c in args:
if not isinstance(c, hlp.UserDefinedEquation):
msg = (
'Must provide tespy.tools.helpers.UserDefinedEquation '
'objects as parameters.'
)
logger.error(msg)
raise TypeError(msg)
elif c.label in self.user_defined_eq:
msg = (
'There is already a UserDefinedEquation with the label '
f'{c.label} . The UserDefinedEquation labels must be '
'unique within a network'
)
logger.error(msg)
raise ValueError(msg)
self.user_defined_eq[c.label] = c
msg = f"Added UserDefinedEquation {c.label} to network."
logger.debug(msg)
[docs]
def del_ude(self, *args):
"""
Remove a user defined function from the network.
Parameters
----------
c : tespy.tools.helpers.UserDefinedEquation
The objects to be deleted from the network,
UserDefinedEquation objects ci :code:`del_ude(c1, c2, c3, ...)`.
"""
for c in args:
del self.user_defined_eq[c.label]
msg = f"Deleted UserDefinedEquation {c.label} from network."
logger.debug(msg)
[docs]
def get_ude(self, label):
r"""
Get UserDefinedEquation via label.
Parameters
----------
label : str
Label of the UserDefinedEquation object.
Returns
-------
c : tespy.tools.helpers.UserDefinedEquation
UserDefinedEquation object with specified label, None if no
UserDefinedEquation of the network has this label.
"""
try:
return self.user_defined_eq[label]
except KeyError:
warnings.warn(
f"UserDefinedEquation with label {label} not found. Returning "
"None is deprecated and will raise a KeyError in a future version.",
FutureWarning,
stacklevel=2,
)
return None
[docs]
def assert_convergence(self):
"""Check convergence status of a simulation."""
msg = 'Calculation did not converge!'
assert self.converged, msg
@property
def converged(self):
if hasattr(self, "status"):
return self.status == 0 or self.status == 1
else:
msg = (
"The converged attribute can only be accessed after the first "
"call of the solve method"
)
raise AttributeError(msg)
[docs]
def check_topology(self):
r"""Check if components are connected properly within the network."""
if len(self.conns) == 0:
msg = (
'No connections have been added to the network, please make '
'sure to add your connections with the .add_conns() method.'
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
self._check_connections()
self._init_components()
self._check_components()
self._create_fluid_wrapper_branches()
# network checked
self.checked = True
msg = 'Networkcheck successful.'
logger.info(msg)
def _check_connections(self):
r"""Check connections for multiple usage of inlets or outlets."""
dub = self.conns.loc[self.conns.duplicated(["source", "source_id"])]
for c in dub['object']:
targets = []
mask = (
(self.conns["source"].values == c.source)
& (self.conns["source_id"].values == c.source_id)
)
for conns in self.conns.loc[mask, "object"]:
targets += [f"\"{conns.target.label}\" ({conns.target_id})"]
targets = ", ".join(targets)
msg = (
f"The source \"{c.source.label}\" ({c.source_id}) is attached "
f"to more than one component on the target side: {targets}. "
"Please check your network configuration."
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
dub = self.conns.loc[
self.conns.duplicated(['target', 'target_id'])
]
for c in dub['object']:
sources = []
mask = (
(self.conns["target"].values == c.target)
& (self.conns["target_id"].values == c.target_id)
)
for conns in self.conns.loc[mask, "object"]:
sources += [f"\"{conns.source.label}\" ({conns.source_id})"]
sources = ", ".join(sources)
msg = (
f"The target \"{c.target.label}\" ({c.target_id}) is attached "
f"to more than one component on the source side: {sources}. "
"Please check your network configuration."
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
def _init_connection_result_datastructure(self):
for conn_type in self.conns["conn_type"].unique():
if conn_type in self.results:
del self.results[conn_type]
for conn in self.conns["object"]:
conn_type = conn.__class__.__name__
# this will move somewhere else!
# set up results dataframe for connections
# this should be done based on the connections
if conn_type not in self.results:
cols = conn._get_result_cols(set(self.all_fluids))
self.results[conn_type] = pd.DataFrame(columns=cols, dtype='float64')
def _init_components(self):
r"""Set up necessary component information."""
for comp in self.comps["object"]:
source_mask = self.conns["source"] == comp
target_mask = self.conns["target"] == comp
comp.inl, comp.outl = self._resolve_comp_conn_domain(
source_mask, target_mask, comp.inlets(), comp.outlets()
)
comp.power_inl, comp.power_outl = self._resolve_comp_conn_domain(
source_mask, target_mask,
comp.powerinlets(), comp.poweroutlets(), "PowerConnection"
)
comp.num_power_i = len(comp.powerinlets())
comp.num_power_o = len(comp.poweroutlets())
comp.heat_inl, comp.heat_outl = self._resolve_comp_conn_domain(
source_mask, target_mask,
comp.heatinlets(), comp.heatoutlets(), "HeatConnection"
)
comp.num_heat_i = len(comp.heatinlets())
comp.num_heat_o = len(comp.heatoutlets())
# set up results and specification dataframes
comp_type = comp.__class__.__name__
if comp_type not in self.results:
cols = [
c for col, data in comp.parameters.items()
if isinstance(data, dc_cp)
for c in [col, f"{col}_unit"]
]
self.results[comp_type] = pd.DataFrame(
columns=cols, dtype='float64'
)
def _resolve_comp_conn_domain(
self, source_mask, target_mask, inlet_ids, outlet_ids, conn_type=None
):
"""Return :code:`(inl, outl)` connection lists for one domain.
Parameters
----------
source_mask, target_mask : boolean Series
Rows in :code:`self.conns` where the component is source / target.
inlet_ids, outlet_ids : list[str]
Port IDs returned by the component's :code:`*inlets()` / :code:`*outlets()`.
conn_type : str, optional
If given, further restrict to rows whose :code:`conn_type` column
matches this class name (e.g. :code:`"PowerConnection"`).
"""
if conn_type is not None:
type_mask = self.conns["conn_type"] == conn_type
src = self.conns[source_mask & self.conns["source_id"].isin(outlet_ids) & type_mask]
tgt = self.conns[target_mask & self.conns["target_id"].isin(inlet_ids) & type_mask]
else:
src = self.conns[source_mask & self.conns["source_id"].isin(outlet_ids)]
tgt = self.conns[target_mask & self.conns["target_id"].isin(inlet_ids)]
return (
self.conns.loc[tgt["target_id"].sort_values().index, "object"].tolist(),
self.conns.loc[src["source_id"].sort_values().index, "object"].tolist(),
)
def _check_components(self):
for comp in self.comps['object']:
comp._validate_connections()
def _prepare_problem(self):
r"""
Initialise the network depending on calculation mode.
Design
- Generic fluid composition and fluid property initialisation.
- Starting values from initialisation path if provided.
Offdesign
- Check offdesign path specification.
- Set component and connection design point properties.
- Switch from design/offdesign parameter specification.
"""
# keep track of the number of component and connection equations
# as well as number of component variables
self.num_comp_eq = 0
self.num_conn_eq = 0
self.variable_counter = 0
self.variables_dict = {}
# in multiprocessing copies are made of all connections
# the mass flow branches and fluid branches hold references to
# connections from the original run (where network.checked is False)
# The assignment of variable spaces etc. is however made on the
# copies of the connections which do not correspond to the mass flow
# branches and fluid branches anymore. So the topology simplification
# does not actually apply to the copied network, therefore the
# branches have to be recreated for this case. We can detect that by
# checking whether a network holds a massflow branch with some
# connections and compare that with the connection object actually
# present in the network
for k, v in self.fluid_wrapper_branches.items():
if self.conns.loc[v["connections"][0].label, "object"] != v["connections"][0]:
self._create_fluid_wrapper_branches()
continue
self._propagate_fluid_wrappers()
self._init_connection_result_datastructure()
self._prepare_solve_mode()
# this method will distribute units and set SI values from given values
# and units
self._transform_user_input_to_SI()
self._create_structure_matrix()
self._presolve()
self._prepare_for_solver()
# generic fluid property initialisation
self._set_starting_values()
msg = 'Network initialised.'
logger.info(msg)
def _propagate_fluid_wrappers(self):
connections_in_wrapper_branches = []
self.all_fluids = []
for branch_data in self.fluid_wrapper_branches.values():
all_connections = [c for c in branch_data["connections"]]
any_fluids_set = []
engines = {}
back_ends = {}
wrapper_kwargs = {}
any_fluids = []
all_components = [c for c in branch_data["components"]]
for cp in all_components:
any_fluids += cp._add_missing_fluids(all_connections)
any_fluids0 = []
mixing_rules = []
for c in all_connections:
any_fluids_set += list(c.fluid.is_set)
any_fluids += list(c.fluid.val.keys())
any_fluids0 += list(c.fluid.val0.keys())
if c.mixing_rule is not None:
mixing_rules += [c.mixing_rule]
for c in all_connections:
for f in set(any_fluids):
if f in c.fluid.engine:
if f in engines and engines[f] != c.fluid.engine[f]:
raise ValueError("")
engines[f] = c.fluid.engine[f]
if f in c.fluid.back_end:
if f in back_ends and back_ends[f] != c.fluid.back_end[f]:
raise ValueError("")
back_ends[f] = c.fluid.back_end[f]
if f in c.fluid.wrapper_kwargs:
if f in wrapper_kwargs and wrapper_kwargs[f] != c.fluid.wrapper_kwargs[f]:
raise ValueError("")
wrapper_kwargs[f] = c.fluid.wrapper_kwargs[f]
mixing_rule = list(set(mixing_rules))
if len(mixing_rule) > 1:
msg = (
"You have provided more than one mixing rule in the "
"branches including the following connections: "
f"{', '.join([c.label for c in all_connections])}"
)
raise hlp.TESPyNetworkError(msg)
elif len(mixing_rule) == 0:
mixing_rule = "ideal-cond"
else:
mixing_rule = mixing_rules[0]
if not any_fluids_set:
msg = "You are missing fluid specifications."
potential_fluids = set(any_fluids_set + any_fluids + any_fluids0)
self.all_fluids += list(potential_fluids)
num_potential_fluids = len(potential_fluids)
if num_potential_fluids == 0:
msg = (
"The following connections of your network are missing any "
"kind of fluid composition information:"
f"{', '.join([c.label for c in all_connections])}."
)
raise hlp.TESPyNetworkError(msg)
for c in all_connections:
c.mixing_rule = mixing_rule
c._potential_fluids = potential_fluids
if num_potential_fluids == 1:
f = list(potential_fluids)[0]
c.fluid.val[f] = 1
else:
for f in potential_fluids:
if (f not in c.fluid.is_set and f not in c.fluid.val and f not in c.fluid.val0):
c.fluid.val[f] = 1 / len(potential_fluids)
elif f not in c.fluid.is_set and f not in c.fluid.val and f in c.fluid.val0:
c.fluid.val[f] = c.fluid.val0[f]
for f, engine in engines.items():
c.fluid.engine[f] = engine
for f, back_end in back_ends.items():
c.fluid.back_end[f] = back_end
for f, w_kwargs in wrapper_kwargs.items():
c.fluid.wrapper_kwargs[f] = w_kwargs
c._create_fluid_wrapper()
connections_in_wrapper_branches += all_connections
mask = self.conns["conn_type"] == "Connection"
missing_wrappers = (
set(self.conns.loc[mask, "object"].tolist())
- set(connections_in_wrapper_branches)
)
if len(missing_wrappers) > 0:
msg = (
f"The fluid information propagation for the connections "
f"{', '.join([c.label for c in missing_wrappers])} failed. "
"The reason for this is likely, that these connections do not "
"have any Sources or a CycleCloser attached to them."
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
self.all_fluids = set(self.all_fluids)
def _prepare_solve_mode(self):
if self.mode == 'offdesign':
self.redesign = True
if self.design_path is None:
# must provide design_path
msg = "Please provide a design_path for offdesign mode."
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
# load design case
if self.new_design:
self._load_offdesign_state()
self._prepare_offdesign()
else:
# reset any preceding offdesign calculation
self._prepare_design()
def _presolve_fluid_vectors(self):
# right now, this ignores potential factors and offsets between the
# fluids.
# On top of that, branches of constant fluid composition are not
# caught, if they only consist of a single connection!
for linear_dependents in self._variable_dependencies:
reference = linear_dependents["reference"]
if self._variable_lookup[reference]["property"] != "fluid":
continue
all_connections = [
self._variable_lookup[var]["object"]
for var in linear_dependents["variables"]
]
reference_container = self._reference_container_lookup[reference]
reference_conn = all_connections[0]
fluid_specs = [f for c in all_connections for f in c.fluid.is_set]
fluid0 = {
f: value for c in all_connections
for f, value in c.fluid.val0.items()
}
if len(fluid_specs) == 0:
if len(reference_conn._potential_fluids) > 1:
reference_container.is_var = {
f for f in reference_conn._potential_fluids
}
reference_container.val = {
f: 1 / len(reference_container.is_var)
for f in reference_container.is_var
}
# load up specification of starting values if any are
# available
reference_container.val.update(fluid0)
else:
reference_container.val[
list(reference_conn._potential_fluids)[0]
] = 1
elif len(fluid_specs) != len(set(fluid_specs)):
msg = (
"The mass fraction of a single fluid has been been "
"specified more than once in the following linear branch "
"of connections: "
f"{', '.join([c.label for c in all_connections])}."
)
raise hlp.TESPyNetworkError(msg)
else:
fixed_fractions = {
f: c.fluid._val[f]
for c in all_connections
for f in fluid_specs
if f in c.fluid._is_set
}
mass_fraction_sum = sum(fixed_fractions.values())
if mass_fraction_sum > 1 + ERR:
msg = (
"The mass fraction of fluids within a linear branch "
"of connections cannot exceed 1: "
f"{', '.join([c.label for c in all_connections])}."
)
raise ValueError(msg)
elif mass_fraction_sum < 1 - ERR:
# set the fluids with specified mass fraction
# remaining fluids are variable, create wrappers for them
all_fluids = reference_conn._potential_fluids
num_remaining_fluids = len(all_fluids) - len(fixed_fractions)
if num_remaining_fluids == 1:
missing_fluid = list(
set(all_fluids) - set(fixed_fractions.keys())
)[0]
fixed_fractions[missing_fluid] = 1 - mass_fraction_sum
variable = set()
else:
missing_fluids = (
set(all_fluids) - set(fixed_fractions.keys())
)
variable = {f for f in missing_fluids}
else:
# fluid mass fraction is 100 %, all other fluids are 0 %
all_fluids = reference_container.val.keys()
remaining_fluids = (
reference_container.val.keys() - fixed_fractions.keys()
)
for f in remaining_fluids:
fixed_fractions[f] = 0
variable = set()
reference_container.val.update(fixed_fractions)
reference_container.is_set = {f for f in fixed_fractions}
reference_container.is_var = variable
# this seems to be a problem in some cases, e.g. the basic
# gas turbine tutorial
num_var = len(variable)
for f in variable:
reference_container.val[f] = (1 - mass_fraction_sum) / num_var
if f in fluid0:
reference_container.val[f] = fluid0[f]
for fluid in reference_container.is_var:
reference_container._J_col[fluid] = self.variable_counter
self.variables_dict[self.variable_counter] = {
"obj": reference_container,
"variable": "fluid",
"fluid": fluid,
"_represents": linear_dependents["variables"]
}
self.variable_counter += 1
def _create_structure_matrix(self):
self._structure_matrix = {}
self._rhs = {}
self._variable_lookup = {}
self._object_to_variable_lookup = {}
self._equation_set_lookup = {}
self._presolved_equations = []
self._reference_container_lookup = {}
self._equation_lookup = {}
self._equation_obj_lookup = {}
self._incidence_matrix = {}
num_vars = self._prepare_variables()
self._reassign_ude_objects()
sum_eq = 0
sum_eq = self._preprocess_network_parts(self.conns["object"], sum_eq)
sum_eq = self._preprocess_network_parts(self.comps["object"], sum_eq)
sum_eq = self._preprocess_network_parts(self.user_defined_eq.values(), sum_eq)
_linear_dependencies = self._find_linear_dependent_variables(
self._structure_matrix, self._rhs
)
_linear_dependent_variables = [
var for linear_dependents in _linear_dependencies
for var in linear_dependents["variables"]
]
_missing_variables = [
{
"variables": [var],
"reference": var,
"factors": {var: 1.0},
"offsets": {var: 0.0},
"equation_indices": {}
}
for var in set(range(num_vars)) - set(_linear_dependent_variables)
]
self._variable_dependencies = _missing_variables + _linear_dependencies
for linear_dependents in self._variable_dependencies:
reference_variable = self._variable_lookup[
linear_dependents["reference"]
]
reference = reference_variable["object"].get_attr(
reference_variable["property"]
)
d = reference.d
if hasattr(reference, "min_val"):
min_val = reference.min_val
max_val = reference.max_val
else:
min_val = None
max_val = None
if reference_variable["property"] != "fluid":
DataContainer = dc_scavar
reference_container = DataContainer(
_is_var=True,
_d=d,
min_val=min_val,
max_val=max_val,
)
else:
DataContainer = dc_vecvar
reference_container = DataContainer(
_d=d
)
self._reference_container_lookup[
linear_dependents['reference']
] = reference_container
for variable in linear_dependents["variables"]:
variable_data = self._variable_lookup[variable]
if variable_data["property"] != reference_variable["property"]:
msg = (
"There is a direct linear dependency between two "
"variables of different properties. This is unexpected "
"and might not work as intended: "
f"{reference_variable['object'].label}: "
f"{reference_variable['property']} and "
f"{variable_data['object'].label}: "
f"{variable_data['property']}."
)
logger.warning(msg)
container = variable_data["object"].get_attr(variable_data["property"])
container._reference_container = reference_container
container._factor = linear_dependents["factors"][variable]
container._offset = linear_dependents["offsets"][variable]
# impose set values in the reference containers
for conn in self.conns["object"]:
for prop, container in conn.get_variables().items():
if conn.get_attr(prop).is_set:
conn.get_attr(prop).set_reference_val_SI(conn.get_attr(prop)._val_SI)
# collect all presolved equations
self._presolved_equations = [
indices
for dependents in self._variable_dependencies
for indices in dependents["equation_indices"].values()
]
def _prepare_variables(self):
num_vars = 0
for conn in self.conns["object"]:
for prop, container in conn.get_variables().items():
# flag potential variables
container._potential_var = not container.is_set
container.sm_col = num_vars
num_vars += 1
self._variable_lookup[container.sm_col] = {
"object": conn, "property": prop
}
if conn not in self._object_to_variable_lookup:
self._object_to_variable_lookup[conn] = {}
self._object_to_variable_lookup[conn].update(
{prop: container.sm_col}
)
if hasattr(conn, "fluid"):
# fluid is handled separately
container = conn.fluid
container.sm_col = num_vars
num_vars += 1
self._variable_lookup[container.sm_col] = {
"object": conn, "property": "fluid"
}
if conn not in self._object_to_variable_lookup:
self._object_to_variable_lookup[conn] = {}
self._object_to_variable_lookup[conn].update(
{"fluid": container.sm_col}
)
for comp in self.comps["object"]:
for prop, container in comp.get_variables().items():
container.sm_col = num_vars
num_vars += 1
self._variable_lookup[container.sm_col] = {
"object": comp, "property": prop
}
if comp not in self._object_to_variable_lookup:
self._object_to_variable_lookup[comp] = {}
self._object_to_variable_lookup[comp].update(
{prop: container.sm_col}
)
return num_vars
def _reassign_ude_objects(self):
for ude in self.user_defined_eq.values():
ude.conns = [self.get_conn(c.label) for c in ude.conns]
ude.comps = [self.get_comp(c.label) for c in ude.comps]
def _preprocess_network_parts(self, parts, eq_counter):
for obj in parts:
obj._preprocess(eq_counter)
self._structure_matrix.update(obj._structure_matrix)
self._rhs.update(obj._rhs)
eq_map = {
eq_num: (obj.label, eq_name)
for eq_num, eq_name in obj._equation_set_lookup.items()
}
self._equation_set_lookup.update(eq_map)
eq_counter += obj.num_eq
return eq_counter
def _find_linear_dependent_variables(self, sparse_matrix, rhs):
if len(sparse_matrix) == 0:
return []
adjacency_list, eq_idx, edges_with_factors, rhs_offsets = (
self._build_graph(sparse_matrix, rhs)
)
# Detect cycles (to check for circular dependencies)
cycle = self._find_cycles_in_graph(
{k: [x[0] for x in v] for k, v in adjacency_list.items()}
)
if cycle is not None:
self._raise_error_if_cycle(cycle, edges_with_factors, eq_idx)
# Find connected components and compute factors/offsets
visited = set()
variables_factors_offsets = []
def dfs_component(node, current_factor, current_offset):
"""DFS to calculate factors and offsets relative to the reference
variable."""
stack = [(node, current_factor, current_offset)]
factors = {node: current_factor}
offsets = {node: current_offset}
equation_indices = {}
while stack:
curr_node, curr_factor, curr_offset = stack.pop()
visited.add(curr_node)
for neighbor, edge_factor in adjacency_list.get(curr_node, []):
if neighbor not in factors: # Process unvisited neighbor
# Calculate edge offset
idx_f = neighbor, curr_node
idx_b = curr_node, neighbor
edge_offset = (
rhs_offsets.get(idx_b, 0.0)
or -rhs_offsets.get(idx_f, 0.0)
)
# Determine which equation to use
if idx_f in rhs_offsets:
equation_indices[idx_f] = eq_idx[idx_f]
else:
equation_indices[idx_b] = eq_idx[idx_b]
# Compute new factor and offset
new_factor = curr_factor * edge_factor
new_offset = curr_offset * edge_factor + edge_offset
# Store and continue traversal
factors[neighbor] = new_factor
offsets[neighbor] = new_offset
stack.append((neighbor, new_factor, new_offset))
return factors, offsets, equation_indices
# Process each connected component
for node in adjacency_list:
if node not in visited:
reference = node
factors, offsets, equation_indices = dfs_component(
reference, 1.0, 0.0
)
variables_factors_offsets.append({
'variables': list(factors.keys()),
'reference': reference,
'factors': factors,
'offsets': offsets,
'equation_indices': equation_indices
})
return variables_factors_offsets
def _build_graph(self, sparse_matrix, rhs):
edges_with_factors = []
rhs_offsets = {}
eq_idx = {}
# The equation indices keep track of which equations to eliminate
# Extract edges and offsets from rows with two non-zero entries
rows = {k[0] for k in sparse_matrix}
# sorting needs to be applied to always have same orientation on edges
# otherwise duplicate edges are not found if one is just in reverse
rows_with_cols = {
row: sorted([k[1] for k in sparse_matrix if k[0] == row])
for row in rows
}
for row, cols in rows_with_cols.items():
if len(cols) == 2:
non_zero_values = (
sparse_matrix[(row, cols[0])], sparse_matrix[(row, cols[1])]
)
col1, col2 = cols
val1, val2 = non_zero_values
factor = -val1 / val2
offset = rhs[row] / val2
edges_with_factors.append((col1, col2, factor))
rhs_offsets[(col1, col2)] = offset
if (col1, col2) in eq_idx:
variables = self._get_variables_before_presolve_by_number([col1, col2])
equations = self._get_equation_sets_by_eq_set_number(
[eq_idx[(col1, col2)], row]
)
var_str = ", ".join(f"{lbl} ({prop})" for lbl, prop in variables)
eq_str = ", ".join(f"{lbl}.{eq}" for lbl, eq in equations)
msg = (
"Two variables are directly linked by two equations. "
"This overdetermines the problem.\n"
f" Variables: {var_str}\n"
f" Equations: {eq_str}"
)
raise hlp.TESPyNetworkError(msg)
eq_idx[(col1, col2)] = row
# Build adjacency list for the graph
adjacency_list = {}
for col1, col2, factor in edges_with_factors:
if col1 not in adjacency_list:
adjacency_list[col1] = []
if col2 not in adjacency_list:
adjacency_list[col2] = []
# Add edge with factor and reverse edge with reciprocal value
adjacency_list[col1].append((col2, factor))
adjacency_list[col2].append((col1, 1 / factor))
return adjacency_list, eq_idx, edges_with_factors, rhs_offsets
def _find_cycles_in_graph(self, graph):
visited = set()
parent = {}
def dfs(node, prev):
visited.add(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
parent[neighbor] = node
result = dfs(neighbor, node)
if result:
return result
elif neighbor != prev:
# Cycle found, reconstruct it
cycle = [neighbor, node]
while cycle[-1] != neighbor:
cycle.append(parent[cycle[-1]])
cycle.reverse()
return cycle
return None
for node in graph:
if node not in visited:
parent[node] = None
cycle = dfs(node, None)
if cycle:
return set(cycle)
return None
def _raise_error_if_cycle(self, cycle, edges_with_factors, eq_idx):
edge_list = [
e[:2] for e in edges_with_factors
if e[0] in cycle or e[1] in cycle
]
cycling_eqs = [v for k, v in eq_idx.items() if k in edge_list]
variable_names = self._get_variables_before_presolve_by_number(cycle)
equations = self._get_equation_sets_by_eq_set_number(cycling_eqs)
var_str = ", ".join(f"{lbl} ({prop})" for lbl, prop in variable_names)
eq_str = ", ".join(f"{lbl}.{eq}" for lbl, eq in equations)
msg = (
"A circular dependency has been detected. This overdetermines the problem.\n"
f" Variables: {var_str}\n"
f" Equations: {eq_str}"
)
raise hlp.TESPyNetworkError(msg)
def _create_fluid_wrapper_branches(self):
self.fluid_wrapper_branches = {}
mask = self.comps["object"].apply(
lambda x:
isinstance(x, Source)
or isinstance(x, CycleCloser)
or isinstance(x, WaterElectrolyzer)
or isinstance(x, FuelCell)
)
start_components = self.comps["object"].loc[mask]
for start in start_components:
self.fluid_wrapper_branches.update(start.start_fluid_wrapper_branch())
merged = self.fluid_wrapper_branches.copy()
for branch_name, branch_data in self.fluid_wrapper_branches.items():
if branch_name not in merged:
continue
for ob_name, ob_data in self.fluid_wrapper_branches.items():
if ob_name != branch_name:
common_connections = list(
set(branch_data["connections"])
& set(ob_data["connections"])
)
if len(common_connections) > 0 and ob_name in merged:
merged[branch_name]["connections"] = list(
set(branch_data["connections"] + ob_data["connections"])
)
merged[branch_name]["components"] = list(
set(branch_data["components"] + ob_data["components"])
)
del merged[ob_name]
break
self.fluid_wrapper_branches = merged
def _presolve(self):
# handle the fluid vector variables
self._presolve_fluid_vectors()
# set up the actual list of equations for connections, components,
for c in self.conns['object']:
self._presolved_equations += c._presolve()
self._presolve_linear_dependents()
# iteratively check presolvable fluid properties
# and distribute presolved variables to all linear dependents
# until the number of variables does not change anymore
number_variables = sum([
variable.is_var
for conn in self.conns['object']
for variable in conn.get_variables().values()
])
while True:
for c in self.conns['object']:
self._presolved_equations += c._presolve()
self._presolve_linear_dependents()
reduced_variables = [
variable.is_var
for conn in self.conns['object']
for variable in conn.get_variables().values()
]
reduced_variables = sum(reduced_variables)
if reduced_variables == number_variables:
break
number_variables = reduced_variables
def _prepare_for_solver(self):
for variable in self._variable_dependencies:
reference = self._variable_lookup[variable["reference"]]
represents = variable["variables"]
if reference["property"] != "fluid":
self._assign_variable_space(reference, represents)
eq_counter = 0
_eq_counter = self._prepare_network_parts(self.comps["object"], eq_counter)
self.num_comp_eq = _eq_counter - eq_counter
eq_counter = _eq_counter
_eq_counter = self._prepare_network_parts(self.conns["object"], eq_counter)
self.num_conn_eq = _eq_counter - eq_counter
eq_counter = _eq_counter
_eq_counter = self._prepare_network_parts(self.user_defined_eq.values(), eq_counter)
self.num_ude_eq = _eq_counter - eq_counter
eq_counter = _eq_counter
def _prepare_network_parts(self, parts, eq_counter):
for obj in parts:
eq_counter = obj._prepare_for_solver(self._presolved_equations, eq_counter)
eq_map = {
eq_num: (obj.label, eq_name)
for eq_num, eq_name in obj._equation_lookup.items()
}
self._equation_lookup.update(eq_map)
self._equation_obj_lookup.update(
{eq_num: obj for eq_num in obj._equation_lookup}
)
dependents_map = {
eq_num: [dependent.J_col for dependent in dependents]
for eq_num, dependents in obj._equation_scalar_dependents_lookup.items()
}
self._incidence_matrix.update(dependents_map)
dependents_map = {
eq_num: [
dependent.J_col[key]
for dependent, keys in dependents.items()
for key in keys
]
for eq_num, dependents in obj._equation_vector_dependents_lookup.items()
}
for eq_num, dependents in dependents_map.items():
if eq_num in self._incidence_matrix:
self._incidence_matrix[eq_num] += dependents
else:
self._incidence_matrix[eq_num] = dependents
return eq_counter
def _presolve_linear_dependents(self):
for linear_dependents in self._variable_dependencies:
reference = linear_dependents["reference"]
if self._variable_lookup[reference]["property"] == "fluid":
continue
all_containers = [
self._variable_lookup[var]["object"].get_attr(
self._variable_lookup[var]["property"]
) for var in linear_dependents["variables"]
]
number_specifications = sum(
[not c._potential_var for c in all_containers]
)
if number_specifications > 1:
variables_properties = [
f"{self._variable_lookup[var]['object'].label} "
f"({self._variable_lookup[var]['property']})"
for var in linear_dependents["variables"]
]
var_str = ", ".join(variables_properties)
msg = (
"You specified more than one variable within a set of "
"linearly dependent variables.\n"
f" Variables: {var_str}"
)
raise hlp.TESPyNetworkError(msg)
elif number_specifications == 1:
reference_data = self._variable_lookup[reference]
reference_container = reference_data["object"].get_attr(
reference_data["property"]
)._reference_container
reference_container.is_var = False
def _assign_variable_space(self, reference, represents):
container = reference["object"].get_attr(reference["property"])._reference_container
if container.is_var:
container.J_col = self.variable_counter
self.variables_dict[self.variable_counter] = {
"obj": container,
"variable": reference["property"],
"fluid": None,
"_represents": represents
}
self.variable_counter += 1
def _transform_user_input_to_SI(self):
"""Specification of SI values for user set values."""
# fluid property values
for c in self.conns['object']:
if not self.init_previous:
c.good_starting_values = False
for key in c.property_data:
if "fluid" in key:
continue
param = c.get_attr(key)
if param.is_set:
if "ref" in key:
unit = self.units.default[param.quantity]
param.ref.delta_SI = self.units.ureg.Quantity(
param.ref.delta,
unit
).to(SI_UNITS[param.quantity]).magnitude
else:
param.set_SI_from_val(self.units)
msg = (
"Updated fluid property SI values and fluid mass fraction for user "
"specified connection parameters."
)
logger.debug(msg)
for cp in self.comps["object"]:
for param, value in cp.parameters.items():
if isinstance(value, dc_prop) and value.is_set:
value.set_SI_from_val(self.units)
def _prepare_design(self):
r"""
Initialise a design calculation.
Offdesign parameters are unset, design parameters are set. If
:code:`local_offdesign` is :code:`True` for connections or components,
the design point information are read from the .csv-files in the
respective :code:`design_path`. In this case, the design values are
unset, the offdesign values set.
"""
# connections
self._conn_variables = []
for c in self.conns['object']:
# read design point information of connections with
# local_offdesign activated from their respective design path
if c.local_offdesign:
path = c.design_path
if path is None:
msg = (
"The parameter local_offdesign is True for the "
f"connection {c.label}, an individual design_path must "
"be specified in this case!"
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
# unset design parameters
for var in c.design:
c.get_attr(var).is_set = False
# set offdesign parameters
for var in c.offdesign:
c.get_attr(var).is_set = True
entries = self._load_network_state(path)[c.__class__.__name__]
# write data to connections
self._write_design_state_to_connection(c, entries)
else:
c._reset_design(self.redesign)
# unset all design values
series = pd.Series(dtype='float64')
for cp in self.comps['object']:
c = cp.__class__.__name__
# read design point information of components with
# local_offdesign activated from their respective design path
if cp.local_offdesign:
path = cp.design_path
if path is None:
msg = (
"The parameter local_offdesign is True for the "
f"component {cp.label}, an individual design_path must "
"be specified in this case!"
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
local_design = self._load_network_state(path)
data = local_design[c]
# resolve design label (may differ from cp.label)
label = self._find_isolated_comp_label(cp, data)
# write data
self._write_design_state_to_component(cp, data, label)
# store adjacent connection design values from the component's
# own design_path for use in offdesign equations
cp._local_connection_design_state = {}
for adj_conn in cp.all_connections:
conn_type = adj_conn.__class__.__name__
if conn_type in local_design:
conn_entries = local_design[conn_type]
matched_row = self._find_conn_in_isolated_design(
adj_conn, cp, label, conn_entries
)
if matched_row is not None:
cp._local_connection_design_state[adj_conn.label] = (
adj_conn._get_design_state_SI(matched_row, self.units)
)
else:
msg = (
"Could not retrieve connection design point "
"data in local_offdesign of component "
f"{cp.label} for the connections adjacent to "
"the component."
)
raise KeyError(msg)
# unset design parameters
for var in cp.design:
cp.get_attr(var).is_set = False
# set offdesign parameters
switched = False
msg = 'Set component attributes '
for var in cp.offdesign:
# set variables provided in .offdesign attribute
data = cp.get_attr(var)
data.is_set = True
# take nominal values from design point
if isinstance(data, dc_cp):
cp.get_attr(var).val = cp.get_attr(var).design
switched = True
msg += var + ", "
if switched:
msg = f"{msg[:-2]} to design value at component {cp.label}."
logger.debug(msg)
cp.new_design = True
else:
# switch connections to design mode
if self.redesign:
for var in cp.design:
cp.get_attr(var).is_set = True
for var in cp.offdesign:
cp.get_attr(var).is_set = False
cp._set_design_parameters(self.mode, series)
def _prepare_offdesign(self):
r"""
Switch components and connections from design to offdesign mode.
Note
----
**components**
All parameters stated in the component's attribute :code:`cp.design`
will be unset and all parameters stated in the component's attribute
:code:`cp.offdesign` will be set instead.
Additionally, all component parameters specified as variables are
unset and the values from design point are set.
**connections**
All parameters given in the connection's attribute :code:`c.design`
will be unset and all parameters stated in the connections's attribute
:code:`cp.offdesign` will be set instead. This does also affect
referenced values!
"""
self._conn_variables = []
for c in self.conns['object']:
if not c.local_design:
# switch connections to offdesign mode
for var in c.design:
param = c.get_attr(var)
param.is_set = False
if f"{var}_ref" in c.property_data:
c.get_attr(f"{var}_ref").is_set = False
for var in c.offdesign:
param = c.get_attr(var)
param.is_set = True
param.val_SI = param.design
param.set_val_from_SI(self.units)
c.new_design = False
msg = 'Switched connections from design to offdesign.'
logger.debug(msg)
for cp in self.comps['object']:
if not cp.local_design:
# unset variables provided in .design attribute
for var in cp.design:
cp.get_attr(var).is_set = False
switched = False
msg = 'Set component attributes '
for var in cp.offdesign:
# set variables provided in .offdesign attribute
data = cp.get_attr(var)
data.is_set = True
# take nominal values from design point
if isinstance(data, dc_cp):
data.val_SI = data.design
data.set_val_from_SI(self.units)
switched = True
msg += var + ', '
if switched:
msg = f"{msg[:-2]} to design value at component {cp.label}."
logger.debug(msg)
cp.new_design = False
msg = 'Switched components from design to offdesign.'
logger.debug(msg)
def _load_offdesign_state(self):
r"""
Read design point information from specified :code:`design_path`.
If a :code:`design_path` has been specified individually for components
or connections, the data will be read from the specified individual
path instead.
"""
# components with offdesign parameters
components_with_parameters = [
cp.label for cp in self.comps["object"] if len(cp.parameters) > 0
]
# fetch all components, reindex with label
df_comps = self.comps.loc[components_with_parameters].copy()
# iter through unique types of components (class names)
state = self._load_network_state(self.design_path)
# iter through all components of this type and set data
for _, row in df_comps.iterrows():
entries = state[row["comp_type"]]
comp = row["object"]
path = comp.design_path
# in offdesign mode any individually specified design_path is used
# to load this component's design reference, regardless of
# local_offdesign
if path is not None:
_individual_design = self._load_network_state(path)
data = _individual_design[row["comp_type"]]
label = self._find_isolated_comp_label(comp, data)
self._write_design_state_to_component(comp, data, label)
# write adjacent connections design state from individual
# design_path to the component
comp._local_connection_design_state = {}
for adj_conn in comp.all_connections:
conn_type = adj_conn.__class__.__name__
if conn_type in _individual_design:
conn_entries = _individual_design[conn_type]
matched_row = self._find_conn_in_isolated_design(
adj_conn, comp, label, conn_entries
)
if matched_row is not None:
comp._local_connection_design_state[adj_conn.label] = (
adj_conn._get_design_state_SI(matched_row, self.units)
)
else:
msg = (
"Could not retrieve connection design point "
f"data for component {comp.label}, connection "
f"{adj_conn.label}."
)
raise KeyError(msg)
else:
# write data to components
self._write_design_state_to_component(comp, entries, comp.label)
msg = 'Done reading design point information for components.'
logger.debug(msg)
# iter through connections
for c in self.conns['object']:
conn_type = c.__class__.__name__
entries = state[conn_type]
# read data of connections with individual design_path
path = c.design_path
if path is not None:
entries = self._load_network_state(path)[conn_type]
self._write_design_state_to_connection(c, entries)
msg = 'Done reading design point information for connections.'
logger.debug(msg)
def _find_isolated_comp_label(self, comp, comp_entries):
"""
Resolve which label in *comp_entries* corresponds to *comp* for
isolated design loading.
- Exact match -> return :code:`comp.label`
- Single-type fallback: label not found but exactly one entry ->
return that entry's label (the isolated design contains exactly one
component of that type, so it is unambiguous)
- Ambiguous (multiple entries, no exact match) -> raise error
"""
if comp.label in comp_entries:
return comp.label
elif len(comp_entries) == 1:
return next(iter(comp_entries))
return None
def _find_conn_in_isolated_design(self, adj_conn, comp, comp_label, conn_entries):
"""
Find the entry in *conn_entries* that corresponds to *adj_conn* when
loading an isolated design file.
Matching strategy (in order):
1. Direct label match (:code:`adj_conn.label` in :code:`conn_entries`).
2. Port-based topology match using the :code:`source` / :code:`target` /
:code:`source_id` / :code:`target_id` fields stored by
:py:meth:`tespy.connections.connection.Connection.collect_results`.
Parameters
----------
adj_conn : tespy.connections.connection.BaseConnection
BaseConnection type object
comp : tespy.components.component.Component
Component type object
comp_label : str
Label of the component to look for inside the connection entries.
conn_entries : dict
Mapping of connection labels to their data dicts.
Returns
-------
dict or None
Data dict for the matched connection, or None if not found.
"""
# --- direct label match ---
if adj_conn.label in conn_entries:
return conn_entries[adj_conn.label]
# --- port-based topology match ---
if comp_label is None or not conn_entries:
return None
any_row = next(iter(conn_entries.values()))
if 'source' not in any_row or 'target' not in any_row:
return None
if adj_conn in comp.all_inlets:
matches = [
row for row in conn_entries.values()
if row.get('target') == comp_label
and row.get('target_id') == adj_conn.target_id
]
else:
matches = [
row for row in conn_entries.values()
if row.get('source') == comp_label
and row.get('source_id') == adj_conn.source_id
]
if len(matches) == 1:
return matches[0]
return None
def _write_design_state_to_component(self, c, entries, label):
r"""
Write design point information to components.
Parameters
----------
c : tespy.components.component.Component
Write design point information to this component.
entries : dict
Mapping of component labels to their design point data dicts.
label : str
Label of the component inside the data. It can differ under the
condition of an individual design_path specified for that
component.
"""
if label not in entries:
# no matches in the connections of the network and the design files
msg = (
f"Could not find component '{label}' in design case file. "
"This is is critical only to components, which need to load "
"design values from this case."
)
logger.debug(msg)
return
# write component design data
c._set_design_parameters(self.mode, entries[label])
def _write_design_state_to_connection(self, c, entries):
r"""
Write design point information to connections.
Parameters
----------
c : tespy.connections.connection.Connection
Write design point information to this connection.
entries : dict
Mapping of connection labels to their design point data dicts.
"""
if c.label not in entries:
# no matches in the connections of the network and the design files
msg = (
f"Could not find connection '{c.label}' in design case. "
"Please make sure no connections have been modified or "
"components have been relabeled for your offdesign "
"calculation."
)
logger.exception(msg)
raise hlp.TESPyNetworkError(msg)
c._set_design_params(entries[c.label], self.units)
def _write_starting_values_to_connection(self, c, entries):
r"""
Write parameter information from init_path to connections.
Parameters
----------
c : tespy.connections.connection.Connection
Write init path information to this connection.
entries : dict
Mapping of connection labels to their state data dicts.
"""
if c.label not in entries:
# no matches in the connections of the network and the design files
msg = f"Could not find connection {c.label} in init path file."
logger.debug(msg)
return
c._set_starting_values(entries[c.label], self.units)
c.good_starting_values = True
def _set_starting_values(self):
"""
Initialise the fluid properties on every connection of the network.
- Set generic starting values for mass flow, enthalpy and pressure if
not user specified, read from :code:`init_path` or available from
previous calculation.
- For generic starting values precalculate enthalpy value at points of
given temperature, vapor mass fraction, temperature difference to
boiling point or fluid state.
"""
if self.init_path is not None:
state = self._load_network_state(self.init_path)
# improved starting values for referenced connections,
# specified vapour content values, temperature values as well as
# subccooling/overheating and state specification
for c in self.conns['object']:
if self.init_path is not None:
self._write_starting_values_to_connection(c, state[c.__class__.__name__])
c._guess_starting_values(self.units)
# here reference values can be updated, e.g. a reference temperature
# if the starting value of the reference connection is not yet updated
# then the calculation of the reference can cause issues, therefore:
# first update all of the starting values and only then to
# precalculation of reference values
for c in self.conns["object"]:
c._precalc_guess_values_for_references()
for cp in self.comps["object"]:
for key, variable in cp.get_variables().items():
# for components every variable should be an actual variable
# if variable.is_var:
if np.isnan(variable.val):
variable.val = (variable.min_val + variable.max_val) / 2
variable.set_SI_from_val(self.units)
variable.set_reference_val_SI(variable._val_SI)
msg = 'Generic fluid property specification complete.'
logger.debug(msg)
@staticmethod
def _load_network_state(json_path: str | bytes | bytearray | Path | dict):
r"""
Read network state from given file or in-memory dict.
Parameters
----------
json_path : str | bytes | bytearray | Path | dict
Path to a saved network state file, a JSON string, or a state
dict as returned by :meth:`Network.save` with no arguments.
"""
if isinstance(json_path, dict):
data = json_path
else:
data = None
if not isinstance(json_path, Path):
try:
data = json.loads(json_path)
except json.JSONDecodeError as e:
msg = (
"The provided json_path could not be decoded. If this is not "
"a valid json string, please provide a valid file path instead of "
"%s"
)
logger.debug(msg, str(json_path))
pass
if data is None:
with open(json_path, "r") as f:
data = json.load(f)
def _row(d):
return {col: np.nan if val is None else val for col, val in d.items()}
state = {}
if any(k in data["Connection"] for k in ("Connection", "PowerConnection", "HeatConnection")):
for key, value in data["Connection"].items():
state[key] = {str(k): _row(v) for k, v in value.items()}
# TODO: deprecate
# this is for compatibility of older savestates
else:
state["Connection"] = {str(k): _row(v) for k, v in data["Connection"].items()}
for key, value in data["Component"].items():
state[key] = {str(k): _row(v) for k, v in value.items()}
return state
[docs]
def get_linear_dependent_variables(self) -> list:
"""Get a list with sublists containing linear dependent variables
Returns
-------
list
List of lists of linear dependent variables
"""
variable_list = []
for dependents in self._variable_dependencies:
variables = [
self._variable_lookup[v] for v in dependents["variables"]
]
variable_list += [
[(v["object"].label, v["property"]) for v in variables]
]
return variable_list
def _get_equation_sets_by_eq_set_number(self, number_list) -> list:
return [self._equation_set_lookup[num] for num in number_list]
def _get_variables_before_presolve_by_number(self, number_list) -> list:
return [
(v["object"].label, v["property"])
for k, v in self._variable_lookup.items()
if k in number_list
]
[docs]
def get_presolved_equations(self) -> list:
"""Get the list of equations, that has been presolved with their
respective parent object
Returns
-------
list
list of presolved equations
"""
return [
v for k, v in self._equation_set_lookup.items()
if k in self._presolved_equations
]
[docs]
def print_presolved_equations(self):
"""Print a formatted table of presolved equations."""
rows = self.get_presolved_equations()
print(f"Presolved equations ({len(rows)} total):")
if rows:
print(tabulate(rows, headers=["Object", "Equation"], tablefmt="simple"))
[docs]
def get_variables_before_presolve(self) -> list:
"""Get the list of variables before presolving.
Returns
-------
list
list of original variables
"""
return [
(v["object"].label, v["property"])
for v in self._variable_lookup.values()
]
[docs]
def print_variables_before_presolve(self):
"""Print a formatted table of all variables before presolving."""
rows = self.get_variables_before_presolve()
print(f"Variables before presolving ({len(rows)} total):")
if rows:
print(tabulate(rows, headers=["Object", "Property"], tablefmt="simple"))
[docs]
def get_presolved_variables(self) -> list:
"""Get the list of presolved variables with their respective parent
object and property.
Returns
-------
list
list of presolved variables
"""
represented_variables = []
for v in self.variables_dict.values():
represented_variables += v["_represents"]
if len(self.variables_dict) == 0 and len(self._presolved_equations) == 0:
return []
return [
(v["object"].label, v["property"])
for key, v in self._variable_lookup.items()
if key not in represented_variables
]
[docs]
def print_presolved_variables(self):
"""Print a formatted table of presolved variables."""
rows = self.get_presolved_variables()
print(f"Presolved variables ({len(rows)} total):")
if rows:
print(tabulate(rows, headers=["Object", "Property"], tablefmt="simple"))
[docs]
def get_variables(self) -> dict:
"""Get all variables of the presolved problem with their respective
represented original variables.
Returns
-------
dict
variable number and property with the list of represented variables
"""
return {
(key, data["variable"]):
[
(
self._variable_lookup[v]["object"].label,
self._variable_lookup[v]["property"]
) for v in data["_represents"]
]
for key, data in self.variables_dict.items()
}
[docs]
def print_variables(self):
"""Print a formatted table of variables after presolving."""
variables = self.get_variables()
print(f"Variables after presolving ({len(variables)} total):")
rows = [
(
var_idx,
var_type,
", ".join(f"{lbl} ({prop})" for lbl, prop in represents),
)
for (var_idx, var_type), represents in variables.items()
]
if rows:
print(tabulate(rows, headers=["#", "Property", "Represents"], tablefmt="simple"))
def _get_variables_by_number(self, number_list) -> dict:
"""Get all variables of the presolved problem by variable numbers.
Returns
-------
dict
variable number and property with the list of represented variables
"""
return {
(key, data["variable"]):
[
(
self._variable_lookup[v]["object"].label,
self._variable_lookup[v]["property"]
) for v in data["_represents"]
]
for key, data in self.variables_dict.items()
if key in number_list
}
[docs]
def get_equations(self) -> dict:
"""Get the actual equations after presolving the problem
Returns
-------
dict
Lookup with equation number as index and tuple of label and
parameter defining the equation. In case one parameter defines
multiple equations, the same equation is repeated.
"""
return self._equation_lookup
def _format_var_label(self, v_idx):
v_data = self.variables_dict[v_idx]
v_type = v_data["variable"]
if v_type == "fluid" and v_data["fluid"] is not None:
return f"{v_data['fluid']}{v_idx}"
return f"{v_type}{v_idx}"
@staticmethod
def _format_eq_name(eq_name):
if isinstance(eq_name, tuple):
name, sub_idx = eq_name
return f"{name}{{{sub_idx}}}" if sub_idx > 0 else name
return eq_name
[docs]
def print_equations(self):
"""Print a formatted table of equations after presolving."""
equations = self.get_equations()
print(f"Equations after presolving ({len(equations)} total):")
rows = [
(eq_num, label, self._format_eq_name(eq_name))
for eq_num, (label, eq_name) in sorted(equations.items())
]
if rows:
print(tabulate(rows, headers=["Eq#", "Object", "Equation"], tablefmt="simple"))
[docs]
def get_equations_with_dependents(self) -> dict:
"""Get the equations together with the variables they depend on.
Returns
-------
dict
Lookup with equation (component, (parameter_label, number)) and
the variables it depends on as a list
(variable number, variable type)
"""
dependencies = {}
for eq_idx, dependents in self._incidence_matrix.items():
dependencies.update({
self._equation_lookup[eq_idx]:
list(self._get_variables_by_number(dependents).keys())
})
return dependencies
[docs]
def print_equations_with_dependents(self):
"""Print a formatted table of equations and the variables they depend on."""
print(f"Equations with dependent variables ({len(self._incidence_matrix)} total):")
rows = []
for eq_idx, dependents in sorted(self._incidence_matrix.items()):
label, eq_name = self._equation_lookup[eq_idx]
dep_str = ", ".join(
self._format_var_label(v_idx)
for v_idx, _ in self._get_variables_by_number(dependents).keys()
)
rows.append((eq_idx, label, self._format_eq_name(eq_name), dep_str))
if rows:
print(tabulate(
rows,
headers=["Eq#", "Object", "Equation", "Dependent variables"],
tablefmt="simple",
))
[docs]
def print_incidence_matrix(self):
"""Print the incidence matrix with equation rows and variable columns."""
eq_indices = sorted(self._incidence_matrix.keys())
all_var_indices = sorted({
v_idx
for deps in self._incidence_matrix.values()
for v_idx in deps
})
col_labels = [self._format_var_label(v_idx) for v_idx in all_var_indices]
rows = []
for eq_idx in eq_indices:
label, eq_name = self._equation_lookup[eq_idx]
row_label = f"{label}.{self._format_eq_name(eq_name)}"
dep_set = set(self._incidence_matrix[eq_idx])
rows.append(
[row_label] + ["x" if v in dep_set else "-" for v in all_var_indices]
)
print("Incidence matrix:")
print(tabulate(rows, headers=[""] + col_labels, tablefmt="simple"))
[docs]
def print_residuals(self):
"""Print a formatted table of equation residuals, sorted by magnitude."""
if not hasattr(self, "residual"):
print("Residuals are not available before the first solve call.")
return
rows = []
for eq_idx in self.get_sorted_residual_index():
label, eq_name = self._equation_lookup[eq_idx]
rows.append((eq_idx, label, self._format_eq_name(eq_name), self.residual[eq_idx]))
print(f"Residuals per equation ({len(rows)} total, sorted by magnitude):")
if rows:
print(tabulate(
rows,
headers=["Eq#", "Object", "Equation", "Residual"],
tablefmt="simple",
floatfmt=".3e",
))
def _get_equations_by_number(self, number_list) -> dict:
"""Get the actual equations after presolving the problem by equation
number
Returns
-------
dict
Lookup with equation number as index and tuple of label and
parameter defining the equation. In case one parameter defines
multiple equations, the same equation is repeated.
"""
return {
k: v for k, v in self._equation_lookup.items() if k in number_list
}
[docs]
def get_linear_dependents_by_object(self, obj, prop) -> list:
"""Get the list of linear dependent variables for a specified variable
Parameters
----------
obj : object
Parent object holding a variable
prop : str
Name of the variable (e.g. 'm' or 'h')
Returns
-------
list
list of linear dependent variables
Raises
------
KeyError
In case the object does not have any variables
KeyError
In case the specified property is not a variable
"""
if obj not in self._object_to_variable_lookup:
msg = f"The object {obj.label} does not have any variables."
raise KeyError(msg)
if prop not in self._object_to_variable_lookup[obj]:
msg = f"The object {obj.label} does not have a variable {prop}."
raise KeyError(msg)
variable_idx = self._object_to_variable_lookup[obj][prop]
return self._get_linear_dependents_by_variable_index(variable_idx)
def _get_linear_dependents_by_variable_index(self, idx) -> list:
"""Get the list of linear dependent variables for a specified variable
Parameters
----------
idx : object
Index of the variable
Returns
-------
list
list of linear dependent variables
"""
for dependents in self._variable_dependencies:
if idx in dependents["variables"]:
break
variables = [self._variable_lookup[v] for v in dependents["variables"]]
variable_list = [(v["object"].label, v["property"]) for v in variables]
return variable_list
[docs]
def get_sorted_residual_index(self) -> list[int]:
"""Get the sorted array of residual indices.
Returns
-------
list[int]
List of variable numbers, the index values.
"""
# vars: dict[tuple[int, str], dict] = self.get_variables()
sidx: list[int] = list(np.argsort(np.abs(self.residual))[::-1])
# sres = np.array([self.residual[i] for i in sidx])
# chis = self.residual_history.shape[1]
# for i in range(2, n):
# sres = np.vstack((sres, [self.residual_history[i-2][j] for j in sidx]))
# sres = np.vstack((sres, self.residual_history[-n+1:, :][:, sidx].T))
return sidx
[docs]
def solve(self, mode, init_path=None, design_path=None,
max_iter=50, min_iter=4, init_only=False, init_previous=True,
use_cuda=False, print_results=True, robust_relax=False, skip_postprocess=False):
r"""
Solve the network.
- Check network consistency.
- Initialise calculation and preprocessing.
- Perform actual calculation.
- Postprocessing.
It is possible to check programmatically, if a network was solved
successfully with the `.converged` attribute.
Parameters
----------
mode : str
Choose from 'design' and 'offdesign'.
init_path : str | Path | dict
Path to a previously saved network state (e.g.
:code:`nw.save('myplant/test.json')`), or the dict returned by
:code:`nw.save(as_dict=True)`.
design_path : str | Path | dict
Path to the saved design-case state (e.g.
:code:`nw.save('myplant/test.json')`), or the dict returned by
:code:`nw.save(as_dict=True)`.
max_iter : int
Maximum number of iterations before calculation stops, default: 50.
min_iter : int
Minimum number of iterations before calculation stops, default: 4.
init_only : boolean
Perform initialisation only, default: :code:`False`.
init_previous : boolean
Initialise the calculation with values from the previous
calculation, default: :code:`True`.
use_cuda : boolean
Use cuda instead of numpy for matrix inversion, default:
:code:`False`.
Note
----
For more information on the solution process have a look at the online
documentation at tespy.readthedocs.io in the section "TESPy modules".
"""
## to own function
self.status = 99
self.new_design = False
if self.design_path == design_path and design_path is not None:
for c in self.conns['object']:
if c.new_design:
self.new_design = True
break
if not self.new_design:
for cp in self.comps['object']:
if cp.new_design:
self.new_design = True
break
else:
self.new_design = True
self.init_path = init_path
self.design_path = design_path
self.max_iter = max_iter
self.min_iter = min_iter
self.init_previous = init_previous
self.iter = 0
self.use_cuda = use_cuda
self.robust_relax = robust_relax
self.skip_postprocess = skip_postprocess
if self.skip_postprocess:
msg = (
"Postprocessing will be skipped, violations of "
"physical/operational are not reported or logged!"
)
logger.debug(msg)
if self.use_cuda and cu is None:
msg = (
'Specifying use_cuda=True requires cupy to be installed on '
'your machine. Numpy will be used instead.'
)
logger.warning(msg)
self.use_cuda = False
if mode not in ['offdesign', 'design']:
msg = 'Mode must be "design" or "offdesign".'
logger.error(msg)
raise ValueError(msg)
else:
self.mode = mode
if not self.checked:
self.check_topology()
msg = (
"Network information:\n"
f" - Number of components: {len(self.comps)}\n"
f" - Number of connections: {len(self.conns)}\n"
)
logger.debug(msg)
self._prepare_problem()
if init_only:
return
msg = 'Starting solver.'
logger.info(msg)
self._check_determination()
n = self.variable_counter
self._incidence_matrix_dense = np.zeros((n, n))
for row, cols in self._incidence_matrix.items():
self._incidence_matrix_dense[row, cols] = 1
try:
self._solve_loop(print_results=print_results)
except ValueError as e:
self.status = 99
msg = f"Simulation crashed due to an unexpected error:\n{e}"
logger.exception(msg)
self._unload_variables()
return
self._unload_variables()
if self.status == 3:
logger.error(self.singularity_msg)
return
elif self.status == 2:
msg = (
"The solver does not seem to make any progress, aborting "
"calculation. Residual value is "
"{:.2e}".format(norm(self.residual)) +
"\nPossible reasons include:\n"
" - fluid properties moving outside the valid range of the "
"property database (consider adjusting p_range or h_range),\n"
" - an impossible constraint that can never be satisfied \n"
" - bad starting values causing the Newton solver to diverge.\n"
"Use nw.print_residuals() to identify which equations have "
"the largest residuals."
)
logger.warning(msg)
return
self._postprocess()
msg = 'Calculation complete.'
logger.info(msg)
return
def _solve_loop(self, print_results=True):
r"""Loop of the newton algorithm."""
# parameter definitions
self.residual_history = np.array([])
self.residual = np.zeros([self.variable_counter])
self.increment = np.ones([self.variable_counter])
self.jacobian = np.zeros((self.variable_counter, self.variable_counter))
self.start_time = time()
if self.iterinfo:
self._print_iterinfo_head(print_results)
for self.iter in range(self.max_iter):
self.increment_filter = np.absolute(self.increment) < ERR ** 2
self._solve_iteration()
self.residual_history = np.append(
self.residual_history, norm(self.residual)
)
if self.iterinfo:
self._print_iterinfo_body(print_results)
if self.lin_dep:
self.status = 3
break
elif self.iter > 40:
if (
all(
self.residual_history[(self.iter - 3):] >= self.residual_history[-3] * 0.95
) and self.residual_history[-1] >= self.residual_history[-2] * 0.95
):
self.status = 2
break
if (
self.iter >= self.min_iter - 1
and (self.residual_history[-2:] < ERR ** 0.5).all()
# the increment should also be small
and (abs(self.increment) < ERR ** 0.5).all()
):
self.status = 0
break
self.end_time = time()
if self.iterinfo:
self._print_iterinfo_tail(print_results)
if self.iter == self.max_iter - 1:
msg = (
f"Reached maximum iteration count ({self.max_iter}), "
"calculation stopped. Residual value is "
"{:.2e}. ".format(norm(self.residual)) +
"\nPossible reasons include:\n"
" - fluid properties moving outside the valid range of the "
"property database (consider adjusting p_range or h_range),\n"
" - an impossible constraint that can never be satisfied \n"
" - bad starting values causing the Newton solver to diverge.\n"
"Use nw.print_residuals() to identify which equations have "
"the largest residuals."
)
logger.warning(msg)
self.status = 2
def _check_determination(self):
r"""Check, if the number of supplied parameters is sufficient."""
msg = f'Number of connection equations: {self.num_conn_eq}.'
logger.debug(msg)
msg = f'Number of component equations: {self.num_comp_eq}.'
logger.debug(msg)
msg = f'Number of user defined equations: {self.num_ude_eq}.'
logger.debug(msg)
msg = f'Total number of variables: {self.variable_counter}.'
logger.debug(msg)
_hint = (
"\nUse nw.print_variables() and nw.print_equations() to inspect "
"which variables and equations are present, "
"nw.print_equations_with_dependents() to see which variables each "
"equation depends on, or nw.print_incidence_matrix() for a compact "
"overview."
)
n = self.num_comp_eq + self.num_conn_eq + self.num_ude_eq
if n > self.variable_counter:
msg = (
f"You have provided too many parameters: {self.variable_counter} "
f"required, {n} supplied. Aborting calculation!{_hint}"
)
logger.error(msg)
self.status = 12
raise hlp.TESPyNetworkError(msg)
elif n < self.variable_counter:
msg = (
f"You have not provided enough parameters: {self.variable_counter} "
f"required, {n} supplied. Aborting calculation!{_hint}"
)
logger.error(msg)
self.status = 11
raise hlp.TESPyNetworkError(msg)
def _print_iterinfo_head(self, print_results=True):
"""Print head of convergence progress."""
# Start with defining the format here
self.iterinfo_fmt = ' {iter:5s} | {residual:10s} | {progress:10s} '
self.iterinfo_fmt += '| {massflow:10s} | {pressure:10s} | {enthalpy:10s} '
self.iterinfo_fmt += '| {fluid:10s} | {component:10s} '
# Use the format to create the first logging entry
msg = self.iterinfo_fmt.format(
iter='iter',
residual='residual',
progress='progress',
massflow='massflow',
pressure='pressure',
enthalpy='enthalpy',
fluid='fluid',
component='component'
)
logger.progress(0, msg)
msg2 = '-' * 7 + '+------------' * 7
logger.progress(0, msg2)
if print_results:
print('\n' + msg + '\n' + msg2)
return
def _print_iterinfo_body(self, print_results=True):
"""Print convergence progress."""
m = [k for k, v in self.variables_dict.items() if v["variable"] == "m"]
p = [k for k, v in self.variables_dict.items() if v["variable"] == "p"]
h = [k for k, v in self.variables_dict.items() if v["variable"] == "h"]
fl = [k for k, v in self.variables_dict.items() if v["variable"] == "fluid"]
e = [k for k, v in self.variables_dict.items() if v["variable"] == "E"]
cp = [k for k in self.variables_dict if k not in m + p + h + fl]
iter_str = str(self.iter + 1)
residual_norm = norm(self.residual)
residual = 'NaN'
progress = 'NaN'
massflow = 'NaN'
pressure = 'NaN'
enthalpy = 'NaN'
fluid = 'NaN'
energy = 'NaN'
component = 'NaN'
progress_val = -1
if not np.isnan(residual_norm):
residual = '{:.2e}'.format(residual_norm)
if not self.lin_dep:
massflow = '{:.2e}'.format(norm(self.increment[m]))
pressure = '{:.2e}'.format(norm(self.increment[p]))
enthalpy = '{:.2e}'.format(norm(self.increment[h]))
fluid = '{:.2e}'.format(norm(self.increment[fl]))
energy = '{:.2e}'.format(norm(self.increment[e]))
component = '{:.2e}'.format(norm(self.increment[cp]))
# This should not be hardcoded here.
if residual_norm > np.finfo(float).eps * 100:
progress_min = math.log(ERR)
progress_max = math.log(ERR ** 0.5) * -1
progress_val = math.log(max(residual_norm, ERR)) * -1
# Scale to 0-1
progres_scaled = (
(progress_val - progress_min)
/ (progress_max - progress_min)
)
progress_val = max(0, min(1, progres_scaled))
# Scale to 100%
progress_val = int(progress_val * 100)
else:
progress_val = 100
progress = '{:d} %'.format(progress_val)
msg = self.iterinfo_fmt.format(
iter=iter_str,
residual=residual,
progress=progress,
massflow=massflow,
pressure=pressure,
enthalpy=enthalpy,
fluid=fluid,
energy=energy,
component=component
)
logger.progress(progress_val, msg)
if print_results:
print(msg)
return
def _print_iterinfo_tail(self, print_results=True):
"""Print tail of convergence progress."""
num_iter = self.iter + 1
clc_time = self.end_time - self.start_time
num_ips = num_iter / clc_time if clc_time > 1e-10 else np.inf
msg = '-' * 7 + '+------------' * 7
logger.progress(100, msg)
msg = (
"Total iterations: {0:d}, Calculation time: {1:.2f} s, "
"Iterations per second: {2:.2f}"
).format(num_iter, clc_time, num_ips)
logger.debug(msg)
if print_results:
print(msg)
return
def _search_reducing_step(self, row, col):
"""Find the increment for variable col that reduces equation row's
residual.
Searches both +/- directions with geometrically growing step sizes
(x2 per iteration, up to 20 iterations each). Works for both scalar
variables (m, h, p, E) and vector variables (fluid mass fractions).
Prefers the side that produces a sign change in the residual, which
guarantees a root in the bracket [x0, x0±d] by the IVT, and refines
its location with Brent's method. If both sides bracket a root, the
tighter one (smaller |r| at the probe point) is used. Falls back to a
secant step if brentq raises, and to the lower-magnitude heuristic
when neither side yields a sign change.
Returns the step to add to the variable, or None if neither direction
improves the residual.
"""
obj = self._equation_obj_lookup.get(row)
if obj is None:
return None
_, (param_name, sub_idx) = self._equation_lookup[row]
if param_name not in obj.equations:
return None
data = obj.equations[param_name]
var_data = self.variables_dict[col]
container = var_data["obj"]
if var_data["variable"] == "fluid":
fluid_key = var_data["fluid"]
x0 = container.val[fluid_key]
# Maintain sum=1 by adjusting the largest other variable fluid by
# the same delta in the opposite direction.
other_var_fluids = [f for f in container.is_var if f != fluid_key]
if other_var_fluids:
companion = max(other_var_fluids, key=lambda f: container.val.get(f, 0))
companion_x0 = container.val[companion]
else:
companion = None
companion_x0 = None
def set_x(v):
container.val[fluid_key] = v
if companion is not None:
container.val[companion] = companion_x0 - (v - x0)
else:
x0 = container._val_SI
def set_x(v):
container._val_SI = v
r0 = self.residual[row]
abs_r0 = abs(r0)
def eval_r(x):
set_x(x)
try:
result = data.func(**data.func_params)
except Exception:
return None
finally:
set_x(x0)
if hasattr(result, '__iter__'):
result = list(result)
return result[sub_idx] if sub_idx < len(result) else result[0]
return result
# Guard against x0 == 0 producing a zero initial step
d = max(abs(x0) * 0.1, 1e-3)
found_plus = None
found_minus = None
for _ in range(20):
if found_plus is None:
r = eval_r(x0 + d)
if r is not None and r != r0:
found_plus = (d, r)
if found_minus is None:
r = eval_r(x0 - d)
if r is not None and r != r0:
found_minus = (d, r)
if found_plus is not None and found_minus is not None:
break
d *= 2
plus_sign_change = found_plus is not None and r0 * found_plus[1] < 0
minus_sign_change = found_minus is not None and r0 * found_minus[1] < 0
if plus_sign_change or minus_sign_change:
# Both sides bracket a root: prefer the tighter probe (smaller |r|)
if plus_sign_change and minus_sign_change:
plus_d, plus_r = found_plus
minus_d, minus_r = found_minus
if abs(plus_r) <= abs(minus_r):
sign, step_d, r_val = +1, plus_d, plus_r
else:
sign, step_d, r_val = -1, minus_d, minus_r
elif plus_sign_change:
sign, step_d, r_val = +1, found_plus[0], found_plus[1]
else:
sign, step_d, r_val = -1, found_minus[0], found_minus[1]
a = x0
b = x0 + sign * step_d
try:
tol = max(abs(x0) * 1e-6, 1e-10)
x_root = brentq(
eval_r, min(a, b), max(a, b), xtol=tol, maxiter=5
)
return x_root - x0
except Exception:
pass
# Secant fallback: linear interpolation between x0 and the probe
return sign * step_d * (-r0) / (r_val - r0)
# No sign change found - fall back to lower-magnitude direction
if found_plus is None and found_minus is None:
return None
if found_plus is None:
step_d, r_val = found_minus
return -step_d if abs(r_val) < abs_r0 else None
if found_minus is None:
step_d, r_val = found_plus
return +step_d if abs(r_val) < abs_r0 else None
plus_d, plus_r = found_plus
minus_d, minus_r = found_minus
if abs(plus_r) <= abs(minus_r):
return +plus_d if abs(plus_r) < abs_r0 else None
else:
return -minus_d if abs(minus_r) < abs_r0 else None
def _fill_jacobian_surrogates(self):
"""Restore invertibility for all-zero rows and find better steps.
For each row that is entirely zero but expected to have non-zero
entries (per the incidence matrix), inserts 1 in the expected positions
so the Jacobian can be inverted for all other variables.
Subsequently searches value of associated variable(s) to find the
increment for the affected variable(s) that reduces that equation's
residual.
Returns a dict {col: step} of increment overrides to apply after the
inversion.
"""
overrides = {}
for row in self._check_all_zero_rows(self.jacobian):
for col in self._incidence_matrix.get(row, []):
if self.jacobian[row, col] == 0.0:
self.jacobian[row, col] = 1.0
step = self._search_reducing_step(row, col)
if step is not None:
overrides[col] = step
return overrides
def _invert_jacobian(self):
"""Compute Newton step, storing result in self.increment. Sets self.lin_dep."""
self.lin_dep = False
self.increment = self.residual * 0
if len(self.variables_dict) == 0:
return
overrides = self._fill_jacobian_surrogates()
try:
if self.use_cuda:
self.increment = cu.asnumpy(cu.dot(
cu.linalg.inv(cu.asarray(self.jacobian)),
-cu.asarray(self.residual)
))
else:
row_scales = np.abs(self.jacobian).max(axis=1)
row_scales[row_scales == 0] = 1.0
J_eq = self.jacobian / row_scales[:, None]
col_scales = np.maximum(np.abs(J_eq).max(axis=0), 1e-10)
J_sc = J_eq / col_scales[None, :]
r_eq = -self.residual / row_scales
self.increment = np.linalg.solve(J_sc, r_eq) / col_scales
except np.linalg.LinAlgError:
self.lin_dep = True
return
for col, step in overrides.items():
self.increment[col] = step
def _diagnose_singularity(self):
"""Build singularity_msg after a failed matrix solve."""
if self.iter == 0 and np.linalg.matrix_rank(self._incidence_matrix_dense) < self._incidence_matrix_dense.shape[0]:
self.singularity_msg = (
"Detected singularity in Jacobian matrix. This singularity "
"is most likely caused by the parametrization of your "
"problem and NOT a numerical issue. Double check your "
"setup.\n"
)
self._find_linear_dependencies(self.jacobian)
return
expected_entries = self._incidence_matrix_dense.astype(bool)
actual_entries = self.jacobian.astype(bool)
rows, cols = np.where(expected_entries != actual_entries)
missing_entries = []
for row, col in zip(rows, cols):
lbl, eq_name = self._equation_lookup[row]
eq_str = f"{lbl}.{self._format_eq_name(eq_name)}"
var_str = self._format_var_label(col)
missing_entries += [f"{eq_str}: {var_str}"]
entries_str = ", ".join(missing_entries)
self.singularity_msg = (
"Found singularity in Jacobian matrix, calculation aborted! "
"The setup of your problem seems to be solvable. It failed "
"due to partial derivatives in the Jacobian being zero where "
"a non-zero was expected, or vice versa. This usually lies in "
"starting value selection or bad convergence.\n"
" The following equation/variable pairs may have an "
f"unexpected zero/non-zero partial derivative: {entries_str}\n"
)
self._find_linear_dependencies(self.jacobian)
def _find_linear_dependencies(self, matrix):
all_zero_cols = self._check_all_zero_columns(matrix)
all_zero_rows = self._check_all_zero_rows(matrix)
if len(all_zero_cols) + len(all_zero_rows) == 0:
eq_indices = self._cauchy_schwarz_inequality(matrix)
eq_str = ", ".join(
f"{lbl}.{self._format_eq_name(eq_name)}"
for lbl, eq_name in self._get_equations_by_number(eq_indices).values()
)
self.singularity_msg += (
"The following equations form a linear dependency:\n"
f" {eq_str}\n"
)
else:
if len(all_zero_cols) > 0:
var_str = ", ".join(self._format_var_label(i) for i in all_zero_cols)
self.singularity_msg += (
"The following variables are not associated with any equation:\n"
f" {var_str}\n"
)
if len(all_zero_rows) > 0:
eq_str = ", ".join(
f"{lbl}.{self._format_eq_name(eq_name)}"
for lbl, eq_name in self._get_equations_by_number(all_zero_rows).values()
)
self.singularity_msg += (
"The following equations do not depend on any variable:\n"
f" {eq_str}\n"
)
def _check_all_zero_columns(self, matrix):
return np.where((matrix == 0).all(axis=0))[0]
def _check_all_zero_rows(self, matrix):
return np.where((matrix == 0).all(axis=1))[0]
def _cauchy_schwarz_inequality(self, matrix):
n = matrix.shape[0]
dependent_equations = []
for i in range(n):
for j in range(n):
if i != j:
inner_product = np.inner(
matrix[i,:],
matrix[j,:]
)
norm_i = np.linalg.norm(matrix[i,:])
norm_j = np.linalg.norm(matrix[j,:])
if np.abs(inner_product - norm_j * norm_i) < 1e-5:
dependent_equations += [i]
return list(set(dependent_equations))
def _update_variables(self):
# cast dtype to float from numpy float64
# this is necessary to keep the doctests running and note make them
# look ugly all over the place
# I have yet to come up with a better idea, or vectorize all operations
# which requires major changes in tespy
increment = [float(val) for val in self.increment]
# the J_cols here point to actual variables, no need to call to
# get_J_col yet
relax = 1
if self.robust_relax:
relax = 0.05 + 0.95 * min(1, self.iter / (0.25 * self.max_iter))
for _, data in self.variables_dict.items():
if data["variable"] in ["m", "h", "E"]:
container = data["obj"]
container._val_SI += increment[container.J_col] * relax
elif data["variable"] in ["p"]:
container = data["obj"]
p_relax = max(
1, -2 * increment[container.J_col] / container.val_SI
)
container._val_SI += increment[container.J_col] / p_relax
elif data["variable"] == "fluid":
container = data["obj"]
inc = increment[container.J_col[data["fluid"]]]
value = container.val[data["fluid"]]
if value > ERR:
f_relax = max(1, -2 * inc / value)
else:
f_relax = 1
container.val[data["fluid"]] += inc / f_relax
if container.val[data["fluid"]] < ERR:
container.val[data["fluid"]] = 0
elif container.val[data["fluid"]] > 1 - ERR:
container.val[data["fluid"]] = 1
else:
# add increment
data["obj"]._val_SI += increment[data["obj"].J_col] * relax
# keep value within specified value range
if data["obj"].val_SI < data["obj"].min_val:
data["obj"].val_SI = data["obj"].min_val
elif data["obj"].val_SI > data["obj"].max_val:
data["obj"].val_SI = data["obj"].max_val
def _adapt_to_variable_bounds(self):
# this could be in a different place, its kind of in between
# network and connection
if self.iter < 10:
for data in self.variables_dict.values():
if type(data["obj"]) == dc_vecvar:
total_mass_fractions = sum(data["obj"].val.values())
for fluid in data["obj"].is_var:
data["obj"]._val[fluid] /= total_mass_fractions
if norm(self.increment) > 1e-1:
for c in self.conns['object']:
# check the fluid properties for physical ranges
c._adjust_to_property_limits(self)
# second check based on component heuristics
# - for first three iterations
# - only if the increment is sufficiently large
# - only in design case
if (
self.iter < 3
and norm(self.increment) > 1e-1
and self.mode == "design"
):
for cp in self.comps['object']:
cp.convergence_check()
for c in self.conns['object']:
c._adjust_to_property_limits(self)
def _solve_iteration(self):
r"""
Control iteration step of the newton algorithm.
- Calculate the residual value for each equation
- Calculate the jacobian matrix
- Calculate new values for variables
- Restrict fluid properties to value ranges
- Check component parameters for consistency
"""
self._solve_equations()
self._invert_jacobian()
if self.lin_dep:
self._diagnose_singularity()
return
self._update_variables()
self._adapt_to_variable_bounds()
def _solve_equations(self):
r"""
Calculate the residual and derivatives of all equations.
"""
to_solve = (
self.comps["object"].tolist()
+ self.conns["object"].tolist()
+ list(self.user_defined_eq.values())
)
for obj in to_solve:
hlp.solve(obj, self.increment_filter)
if len(obj.jacobian) > 0:
rows = list(obj.residual.keys())
data = list(obj.residual.values())
self.residual[rows] = data
rows = [k[0] for k in obj.jacobian]
columns = [k[1] for k in obj.jacobian]
data = list(obj.jacobian.values())
self.jacobian[rows, columns] = data
obj.it += 1
def _postprocess(self):
r"""Calculate connection and component parameters."""
_converged = self._postprocess_connections()
_converged = self._postprocess_components() and _converged
if self.status == 0 and not _converged:
self.status = 1
msg = 'Postprocessing complete.'
logger.info(msg)
def _unload_variables(self):
for dependents in self._variable_dependencies:
for variable_num in dependents["variables"]:
variable_dict = self._variable_lookup[variable_num]
variable = variable_dict["object"].get_attr(variable_dict["property"])
if variable_dict["property"] != "fluid":
variable.val_SI = variable.val_SI
else:
variable.val = variable.val
variable._reference_container = None
def _postprocess_connections(self):
"""Process the Connection results."""
_converged = True
buckets = {}
for c in self.conns['object']:
c.good_starting_values = True
_converged = c.calc_results(self.units, self.skip_postprocess) and _converged
if self.skip_postprocess:
continue
conn_type = c.__class__.__name__
if conn_type not in buckets:
buckets[conn_type] = ([], [])
buckets[conn_type][0].append(c.label)
buckets[conn_type][1].append(c.collect_results(self.all_fluids))
for conn_type, (labels, rows) in buckets.items():
cols = self.results[conn_type].columns
self.results[conn_type] = pd.DataFrame(rows, index=labels, columns=cols)
return _converged
def _postprocess_components(self):
"""Process the component results."""
# components
_converged = True
if self.skip_postprocess:
return _converged
for cp in self.comps['object']:
cp.calc_parameters()
_converged = _converged and cp.check_parameter_bounds()
# this thing could be somewhere else
for key, value in cp.parameters.items():
if isinstance(value, dc_prop):
result = value._get_val_from_SI(self.units)
if (
value.is_set
and not value.is_var
and not np.isclose(result.magnitude, value.val, 1e-3, 1e-3)
and not cp.bypass
):
_converged = False
msg = (
"The simulation converged but the calculated "
f"result {result} for the fixed input parameter "
f"{key} is not equal to the originally specified "
f"value: {value.val}. Usually, this can happen, "
"when a method internally manipulates the "
"associated equation during iteration in order to "
"allow progress in situations, when the equation "
"is otherwise not well defined for the current"
"values of the variables, e.g. in case a negative "
"root would need to be evaluated. Often, this "
"can happen during the first iterations and then "
"will resolve itself as convergence progresses. "
"In this case it did not, meaning convergence was "
"not actually achieved."
)
logger.warning(msg)
self.status = 2
else:
if not value.is_set or value.is_var:
value.set_val_from_SI(self.units)
if self.status == 2:
return False
buckets = {}
for cp in self.comps['object']:
result = cp.collect_results()
if len(result) == 0:
continue
key = cp.__class__.__name__
if key not in buckets:
buckets[key] = ([], [])
buckets[key][0].append(cp.label)
buckets[key][1].append(result)
for key, (labels, rows) in buckets.items():
cols = self.results[key].columns
self.results[key] = pd.DataFrame(rows, index=labels, columns=cols)
return _converged
[docs]
def print_results(self, colored=True, colors=None, print_results=True, subsystem=None):
r"""Print the calculations results to prompt."""
# Define colors for highlighting values in result table
if colors is None:
colors = {}
result = ""
coloring = {
'end': '\033[0m',
'set': '\033[94m',
'err': '\033[31m',
'var': '\033[32m'
}
coloring.update(colors)
if not hasattr(self, 'results'):
msg = (
'It is not possible to print the results of a network, that '
'has never been solved successfully. Results DataFrames are '
'only available after a full simulation run is performed.'
)
raise hlp.TESPyNetworkError(msg)
result += self._print_components(colored, coloring, subsystem)
result += self._print_connections(colored, coloring, subsystem)
if len(str(result)) > 0:
logger.result(result)
if print_results:
print(result)
return
def _print_components(self, colored, coloring, subsystem) -> str:
result = ""
for cp in self.comps['comp_type'].unique():
df = self.results[cp].copy()
for c in df.index:
if not self.get_comp(c).printout:
df = df.drop(c)
# are there any parameters to print?
if df.size > 0:
if subsystem is not None:
component_labels = [
c.label for c in subsystem.comps.values()
if c.label in df.index
]
df = df.loc[component_labels]
c = self.comps.loc[self.comps["comp_type"] == cp, "object"]
cols = [
col for col in c.iloc[0]._get_result_attributes()
if not col.endswith("_unit")
]
if len(cols) > 0:
df = df[cols].dropna(axis=1, how="all")
for col in df.columns:
df[col] = df.apply(
self._color_component_prints, axis=1,
args=(col, colored, coloring))
df.dropna(how='all', inplace=True)
if len(df) > 0:
# printout with tabulate
result += f"\n##### RESULTS ({cp}) #####\n"
result += (
tabulate(
df, headers='keys', tablefmt='psql',
floatfmt='.2e'
)
)
return result
def _print_connections(self, colored, coloring, subsystem) -> str:
result = ""
# connection properties
for c_type in self.conns["conn_type"].unique():
cols = connection_registry.items[c_type]._print_attributes()
df = self.results[c_type].copy().loc[:, cols]
if subsystem is not None:
connection_labels = [c.label for c in subsystem.conns.values()]
df = df.loc[connection_labels]
df = df.astype(str)
for c in df.index:
if not self.get_conn(c).printout:
df.drop([c], axis=0, inplace=True)
elif colored:
conn = self.get_conn(c)
for col in df.columns:
if conn.get_attr(col).is_set:
value = conn.get_attr(col).val
df.loc[c, col] = (
f"{coloring['set']}{value}{coloring['end']}"
)
if len(df) > 0:
result += (f'\n##### RESULTS ({c_type}) #####\n')
result += (
tabulate(df, headers='keys', tablefmt='psql', floatfmt='.3e')
)
return result
def _color_component_prints(self, c, *args):
"""
Get the print values for the component data.
Parameters
----------
c : pandas.core.series.Series
Series containing the component data.
param : str
Component parameter to print.
colored : bool
Color the printout.
coloring : dict
Coloring information for colored printout.
Returns
----------
value : str
String representation of the value to print.
"""
param, colored, coloring = args
comp = self.get_comp(c.name)
if comp.printout:
# select parameter from results DataFrame
param_obj = comp.get_attr(param)
val = param_obj.val
val_SI = param_obj.val_SI
if not colored:
return str(val)
# else part
if val_SI < param_obj.min_val - ERR or val_SI > param_obj.max_val + ERR:
return f"{coloring['err']}{val}{coloring['end']}"
if param_obj.is_var:
return f"{coloring['var']}{val}{coloring['end']}"
if param_obj.is_set:
return f"{coloring['set']}{val}{coloring['end']}"
return str(val)
else:
return np.nan
[docs]
@classmethod
def from_dict(cls, network_data):
# create network
# get method to ensure compatibility with old style export
units = Units.from_json(network_data["Network"].get("units", {}))
network_data["Network"]["units"] = units
nw = cls(**network_data["Network"])
# load components
comps = {}
module_name = "tespy.components"
_ = importlib.import_module(module_name)
for component, data in network_data["Component"].items():
if component not in component_registry.items:
msg = (
f"A class {component} is not available through the "
"tespy.components.component.component_registry decorator. "
"If you are using a custom component make sure to "
"decorate the class."
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
target_class = component_registry.items[component]
comps.update(_construct_components(target_class, data, nw))
msg = 'Created network components.'
logger.info(msg)
conns = {}
# load connections
if "Connection" not in network_data["Connection"]:
# v0.8 compatibility
target_class = connection_registry.items["Connection"]
conns.update(_construct_connections(
target_class, network_data["Connection"], comps)
)
else:
for connection, data in network_data["Connection"].items():
if connection not in connection_registry.items:
msg = (
f"A class {connection} is not available through the "
"tespy.connections.connection.connection_registry "
"decorator. If you are using a custom connection make "
"sure to decorate the class."
)
logger.error(msg)
raise hlp.TESPyNetworkError(msg)
target_class = connection_registry.items[connection]
conns.update(_construct_connections(target_class, data, comps))
# add connections to network
for c in conns.values():
nw.add_conns(c)
msg = 'Created connections.'
logger.info(msg)
msg = 'Created network.'
logger.info(msg)
nw.check_topology()
return nw
[docs]
@classmethod
def from_json(cls, json_file_path):
r"""
Load a network from a base path.
Parameters
----------
path : str
The path to the network data.
Returns
-------
nw : tespy.networks.network.Network
TESPy networks object.
Note
----
If you export the network structure of an existing TESPy network, it
will be saved to the path you specified. The structure of the saved
data in that path is the structure you need to provide in the path for
loading the network.
The structure of the path must be as follows:
- Folder: path (e.g. 'mynetwork')
- Component.json
- Connection.json
- Network.json
Example
-------
Create a network and export it. This is followed by loading the network
from the exported json file. All network information stored will be
passed to a new network object. Components and connections will be
accessible by label. The following example setup is simple gas
turbine setup with compressor, combustion chamber and turbine. The fuel
is fed from a pipeline and throttled to the required pressure while
keeping the temperature at a constant value.
>>> from tespy.components import (
... Sink, Source, CombustionChamber, TurboCompressor, Turbine,
... SimpleHeatExchanger, PowerBus, PowerSink, Generator
... )
>>> from tespy.connections import Connection, Ref, PowerConnection
>>> from tespy.networks import Network
>>> import os
>>> nw = Network()
>>> nw.iterinfo = False
>>> nw.units.set_defaults(**{
... "pressure": "bar", "pressure_difference": "bar",
... "temperature": "degC", "enthalpy": "kJ/kg",
... "power": "MW"
... })
>>> air = Source('air')
>>> f = Source('fuel')
>>> compressor = TurboCompressor('compressor')
>>> combustion = CombustionChamber('combustion')
>>> turbine = Turbine('turbine')
>>> preheater = SimpleHeatExchanger('fuel preheater')
>>> si = Sink('sink')
>>> shaft = PowerBus('shaft', num_in=1, num_out=2)
>>> generator = Generator('generator')
>>> grid = PowerSink('grid')
>>> c1 = Connection(air, 'out1', compressor, 'in1', label='c01')
>>> c2 = Connection(compressor, 'out1', combustion, 'in1', label='c02')
>>> c11 = Connection(f, 'out1', preheater, 'in1', label='c11')
>>> c12 = Connection(preheater, 'out1', combustion, 'in2', label='c12')
>>> c3 = Connection(combustion, 'out1', turbine, 'in1', label='c03')
>>> c4 = Connection(turbine, 'out1', si, 'in1', label='c04')
>>> nw.add_conns(c1, c2, c11, c12, c3, c4)
>>> e1 = PowerConnection(turbine, 'power', shaft, 'power_in1', label='e1')
>>> e2 = PowerConnection(shaft, 'power_out1', compressor, 'power', label='e2')
>>> e3 = PowerConnection(shaft, 'power_out2', generator, 'power_in', label='e3')
>>> e4 = PowerConnection(generator, 'power_out', grid, 'power', label='e4')
>>> nw.add_conns(e1, e2, e3, e4)
Specify component and connection properties. The intlet pressure at the
compressor and the outlet pressure after the turbine are identical. For
the compressor, the pressure ratio and isentropic efficiency are design
parameters. A compressor map (efficiency vs. mass flow and pressure
rise vs. mass flow) is selected for the compressor. Fuel is Methane.
>>> compressor.set_attr(
... pr=10, eta_s=0.88, design=['eta_s', 'pr'],
... offdesign=['char_map_eta_s', 'char_map_pr']
... )
>>> turbine.set_attr(
... eta_s=0.9, design=['eta_s'],
... offdesign=['eta_s_char', 'cone']
... )
>>> combustion.set_attr(lamb=2)
>>> c1.set_attr(
... fluid={'N2': 0.7556, 'O2': 0.2315, 'Ar': 0.0129}, T=25, p=1
... )
>>> c11.set_attr(fluid={'CH4': 0.96, 'CO2': 0.04}, T=25, p=40)
>>> c12.set_attr(T=25)
>>> c4.set_attr(p=Ref(c1, 1, 0))
>>> generator.set_attr(eta=1)
For a stable start, we specify the fresh air mass flow.
>>> c1.set_attr(m=3)
>>> nw.solve('design')
>>> nw.assert_convergence()
The total power output is set to 1 MW, electrical or mechanical
efficiencies are not considered in this example. See
:py:class:`tespy.components.power.motor.Motor` and
:py:class:`tespy.components.power.generator.Generator` for modelling
conversion efficiencies between mechanical and electrical power.
>>> combustion.set_attr(lamb=None)
>>> c3.set_attr(T=1100)
>>> c1.set_attr(m=None)
>>> e4.set_attr(E=1)
>>> nw.solve('design')
>>> nw.assert_convergence()
>>> design_state = nw.save(as_dict=True)
>>> _ = nw.export('exported_nwk.json')
>>> mass_flow = round(nw.get_conn('c01').m.val_SI, 1)
>>> compressor.set_attr(igva='var')
>>> nw.solve('offdesign', design_path=design_state)
>>> round(turbine.eta_s.val, 1)
0.9
>>> e4.set_attr(E=0.75)
>>> nw.solve('offdesign', design_path=design_state)
>>> nw.assert_convergence()
>>> eta_s_t = round(turbine.eta_s.val, 3)
>>> igva = round(compressor.igva.val, 3)
>>> eta_s_t
0.898
>>> igva
20.138
The designed network is exported to the path 'exported_nwk'. Now import
the network and recalculate. Check if the results match with the
previous calculation in design and offdesign case.
>>> imported_nwk = Network.from_json('exported_nwk.json')
>>> imported_nwk.iterinfo = False
>>> imported_nwk.solve('design')
>>> imported_nwk.lin_dep
False
>>> round(imported_nwk.get_conn('c01').m.val_SI, 1) == mass_flow
True
>>> round(imported_nwk.get_comp('turbine').eta_s.val, 3)
0.9
>>> imported_nwk.get_comp('compressor').set_attr(igva='var')
>>> imported_nwk.solve('offdesign', design_path=design_state)
>>> round(imported_nwk.get_comp('turbine').eta_s.val, 3)
0.9
>>> imported_nwk.get_conn('e4').set_attr(E=0.75)
>>> imported_nwk.solve('offdesign', design_path=design_state)
>>> round(imported_nwk.get_comp('turbine').eta_s.val, 3) == eta_s_t
True
>>> round(imported_nwk.get_comp('compressor').igva.val, 3) == igva
True
>>> os.remove('exported_nwk.json')
"""
msg = f'Reading network data from base path {json_file_path}.'
logger.info(msg)
with open(json_file_path, "r") as f:
network_data = json.load(f)
return cls.from_dict(network_data)
[docs]
def export(self, json_file_path=None):
"""Export the parametrization and structure of the Network instance
Parameters
----------
json_file_path : str, optional
Path for exporting to filesystem. If path is None, the data are
only returned and not written to the filesystem, by default None.
Returns
-------
dict
Parametrization and structure of the Network instance.
"""
export = {}
export["Network"] = self._export_network()
export["Connection"] = self._export_connections()
export["Component"] = self._export_components()
if json_file_path:
os.makedirs(os.path.dirname(os.path.abspath(json_file_path)), exist_ok=True)
with open(json_file_path, "w") as f:
json.dump(export, f, indent=2)
logger.debug(f'Model information saved to {json_file_path}.')
return export
[docs]
def save(self, json_file_path: str | Path | None = None, as_dict: bool = False) -> None | dict | str:
r"""
Dump the results to a json style output.
Parameters
----------
json_file_path : str | Path | None
Filename to dump results into. If :code:`None`, the state is returned
in-memory (as dict when :code:`as_dict=True`, otherwise as JSON string).
as_dict : bool
If :code:`True` and :code:`json_file_path` is :code:`None`, return the state as
a dict that can be passed directly as :code:`design_path` or
:code:`init_path` in a subsequent :meth:`solve` call. Default
:code:`False`; the :code:`False` behaviour (returning a JSON string) is
deprecated and will be removed in a future release.
Returns
-------
None
If a file path is provided, results are saved to file.
dict
If :code:`json_file_path` is :code:`None` and :code:`as_dict=True`.
str
If :code:`json_file_path` is :code:`None` and :code:`as_dict=False`
(deprecated).
"""
dump = {}
# save relevant state information only
dump["Connection"] = self._save_connections()
dump["Component"] = self._save_components()
dump = hlp._nested_dict_of_dataframes_to_dict(dump)
if json_file_path is None:
if as_dict:
return dump
msg = (
"Calling Network.save() without a file path returns a JSON "
"string, which is deprecated and will be removed in a future "
"release. Use Network.save(as_dict=True) to get a dict that "
"can be passed directly as design_path or init_path."
)
warnings.warn(msg, FutureWarning)
return json.dumps(dump, indent=2)
os.makedirs(os.path.dirname(os.path.abspath(json_file_path)), exist_ok=True)
with open(json_file_path, "w") as f:
json.dump(dump, f)
[docs]
def save_csv(self, folder_path):
"""Export the results in multiple csv files in a folder structure
- Connection.csv
- Component/
- Compressor.csv
- ....
Parameters
----------
folder_path : str
Path to dump results to
"""
dump = {}
# save relevant state information only
dump["Connection"] = self._save_connections()
dump["Component"] = self._save_components()
hlp._nested_dict_of_dataframes_to_filetree(dump, folder_path)
def _save_connections(self):
"""Save the connection properties.
Returns
-------
pandas.DataFrame
pandas.Dataframe of the connection results
"""
dump = {}
for c in self.conns["conn_type"].unique():
dump[c] = self.results[c].replace(np.nan, None)
return dump
def _save_components(self):
r"""
Save the component properties.
Returns
-------
dump : dict
Dump of the component information.
"""
dump = {}
for c in self.comps['comp_type'].unique():
dump[c] = self.results[c].replace(np.nan, None)
return dump
def _export_network(self):
r"""Export network information
Returns
-------
dict
Serialization of network object.
"""
return self._serialize()
def _export_connections(self):
"""Export connection information
Returns
-------
dict
Serialization of connection objects.
"""
connections = {}
for c in self.conns["object"]:
conn_type = c.__class__.__name__
if conn_type not in connections:
connections[conn_type] = {}
connections[conn_type].update(c._serialize())
return connections
def _export_components(self):
"""Export component information
Returns
-------
dict
Dict of dicts with per class serialization of component objects.
"""
components = {}
for c in self.comps["comp_type"].unique():
components[c] = {}
for cp in self.comps.loc[self.comps["comp_type"] == c, "object"]:
components[c].update(cp._serialize())
return components
def _construct_components(target_class, data, nw):
r"""
Create TESPy component from class name and set parameters.
Parameters
----------
component : str
Name of the component class to be constructed.
data : dict
Dictionary with component information.
Returns
-------
dict
Dictionary of all components of the specified type.
"""
instances = {}
for cp, cp_data in data.items():
instances[cp] = target_class(cp)
for param, param_data in cp_data.items():
container = instances[cp].get_attr(param)
if isinstance(container, dc):
if "char_func" in param_data:
if isinstance(container, dc_cc):
param_data["char_func"] = CharLine(**param_data["char_func"])
elif isinstance(container, dc_cm):
param_data["char_func"] = CharMap(**param_data["char_func"])
if "val" in param_data:
if "unit" in param_data and param_data["unit"] is not None:
param_data["val"] = nw.units.ureg.Quantity(
param_data["val"], param_data["unit"]
)
if "val0" in param_data:
param_data["val0"] = param_data["val"]
container.set_attr(**param_data)
else:
instances[cp].set_attr(**{param: param_data})
return instances
def _construct_connections(target_class, data, comps):
r"""
Create TESPy connection from data in the .json-file and its parameters.
Parameters
----------
data : dict
Dictionary with connection data.
comps : dict
Dictionary of constructed components.
Returns
-------
dict
Dictionary of TESPy connection objects.
"""
conns = {}
module_name = "tespy.tools.fluid_properties.wrappers"
_ = importlib.import_module(module_name)
for label, conn_data in data.items():
conns[label] = target_class(
comps[conn_data["source"]], conn_data["source_id"],
comps[conn_data["target"]], conn_data["target_id"],
label=label
)
for label, conn_data in data.items():
conns[label]._deserialize(conn_data, conns)
return conns