Source code for tespy.models.template

import numpy as np
import pandas as pd
from fluprodia import FluidPropertyDiagram
from matplotlib import pyplot as plt
from scipy.spatial.distance import cdist

from tespy.networks import Network
from tespy.tools import OptimizationProblem
from tespy.tools import get_plotting_data
from tespy.tools.fluid_properties import single_fluid


[docs] class ModelTemplate(): def __init__(self) -> None: self._diagram_cache = {} self._stable_solution = None self._ean = None self.parameter_lookup = self._parameter_lookup() self._create_network() def _create_network(self) -> None: self.nw = Network() def _parameter_lookup(self) -> dict: """ Define the mapping between flat parameter names and their location in the model. Three entry forms are supported: - :code:`["Connections", "label", "attr"]` or :code:`["Components", "label", "attr"]`: maps to a network object attribute. The parameter is both settable and readable. - :code:`{"get": callable}`: read-only. The callable takes no arguments and returns the current value. - :code:`{"set": callable}`: write-only. The callable accepts a single value argument. Use this for parameters that cannot be associated with a single Connection or Component attribute, e.g. a UserDefinedEquation target value. - :code:`{"get": callable, "set": callable}`: custom readable and settable, e.g. a UDE target that can also be read back. Returns ------- dict e.g. { "evaporator pinch": ["Components", "evaporator", "td_pinch"], "cop": {"get": self.calc_cop}, "ude target": {"get": self.get_ude_target, "set": self.set_ude_target}, } """ return {}
[docs] def get_results(self, labels): return {label: self.get_parameter(label) for label in labels}
def _map_parameter(self, parameter: str) -> list: mapped = self.parameter_lookup[parameter] if isinstance(mapped, dict): raise TypeError( f"'{parameter}' uses a custom get/set spec and cannot be " "translated to a Connections/Components path." ) return mapped
[docs] def get_parameter(self, parameter: str) -> float: if parameter not in self.parameter_lookup: raise KeyError(f"'{parameter}' is not in parameter_lookup.") mapped = self.parameter_lookup[parameter] if isinstance(mapped, dict): if "get" not in mapped: raise AttributeError(f"'{parameter}' is write-only.") return mapped["get"]() if mapped[0] == "Connections": return self.nw.get_conn(mapped[1]).get_attr(mapped[2]).val elif mapped[0] == "Components": return self.nw.get_comp(mapped[1]).get_attr(mapped[2]).val
[docs] def set_parameters(self, **kwargs) -> None: conn_params = {} comp_params = {} for param, value in kwargs.items(): if param not in self.parameter_lookup: raise KeyError( f"The parameter '{param}' is not in parameter_lookup. " f"Available: {', '.join(self.parameter_lookup)}." ) mapped = self.parameter_lookup[param] if isinstance(mapped, dict): if "set" not in mapped: raise AttributeError(f"'{param}' is read-only.") mapped["set"](value) else: obj_type, label, attr = mapped if obj_type == "Connections": conn_params.setdefault(label, {})[attr] = value elif obj_type == "Components": comp_params.setdefault(label, {})[attr] = value for label, params in conn_params.items(): self.nw.get_conn(label).set_attr(**params) for label, params in comp_params.items(): self.nw.get_comp(label).set_attr(**params)
[docs] def solve_model_design(self, **kwargs) -> None: self.set_parameters(**kwargs) self._solved = False self.nw.solve("design") if self.nw.status == 0: self._solved = True # path = # self.nw.export("stable_solution") # self._stable_solution = path # save the stable solution for later use in case of model corruption # is not required in this example, but could lead to handling some # stuff elif self.nw.status == 1: self._solved = False elif self.nw.status in [2, 3, 99]: # in this case model is very likely corrupted!! # fix it by running a presolve using the stable solution self._solved = False self.nw.solve("design", init_only=True, init_path=self._stable_solution)
[docs] def solve_model_offdesign(self, **kwargs) -> None: self.set_parameters(**kwargs) self._solved = False # Check whether the design path is available self.nw.solve("offdesign", design_path=self._design_path) if self.nw.status == 0: self._solved = True elif self.nw.status in [1, 2, 3, 99]: # in this case model is very likely corrupted!! # fix it by running a presolve using the stable solution self._solved = False # check whether the design path and stable solution path are available self.nw.solve( "offdesign", init_only=True, design_path=self._design_path, init_path=self._stable_solution )
def _get_diagram(self, fluid_name): if fluid_name in self._diagram_cache: return self._diagram_cache[fluid_name] else: diagram = FluidPropertyDiagram(fluid_name) diagram.set_unit_system(self.nw.units) diagram.set_isolines_subcritical(-20, 200) diagram.calc_isolines() self._diagram_cache[fluid_name] = diagram return diagram def _prepare_diagram_and_process_data(self, connection_label): if connection_label not in self.nw.conns.index: msg = f"There is no connection with the label {connection_label}." raise KeyError(msg) fluid_name = single_fluid(self.nw.get_conn(connection_label).fluid_data) if fluid_name is None: raise ValueError( "Fluid is mixture, and not supported by fluprodia" ) diagram = self._get_diagram(fluid_name) processes, points = get_plotting_data(self.nw, connection_label) processes = { key: diagram.calc_individual_isoline(**value) for key, value in processes.items() if value is not None } return processes, points, diagram def _plot_processes_and_states(self, ax, processes, points, x_property, y_property): for label, values in processes.items(): _ = ax.plot(values[x_property], values[y_property], label=label, color="tab:red", zorder=10000) for label, point in points.items(): _ = ax.scatter(point[x_property], point[y_property], label=label, color="tab:red", zorder=10000)
[docs] def plot_Ts_diagram_matplotlib(self, connection_label, ax=None, save_dir=None, figsize=None, xlim=None, ylim=None): if ax is None: fig, ax = plt.subplots(figsize=figsize or (10, 6)) else: fig = ax.get_figure() processes, points, diagram = self._prepare_diagram_and_process_data(connection_label) x_min, x_max = xlim or self._make_cycle_plot_limits(points, "s", "lin") if ylim: y_min, y_max = ylim else: conn = self.nw.get_conn(connection_label) fluid_name = single_fluid(conn.fluid_data) ureg = self.nw.units.get_ureg() T_unit = self.nw.units.get_default('temperature') T_crit = ureg.Quantity(conn.fluid.wrapper[fluid_name]._T_crit, 'K').to(T_unit).magnitude y_min, y_max = self._make_cycle_plot_limits(points, "T", "lin", clamp_max=T_crit) diagram.draw_isolines( fig, ax, "Ts", x_min, x_max, y_min, y_max, isoline_data={ "Q": {"values": np.array([0.0, 0.2, 0.4, 0.6, 0.8, 1.0])}, } ) self._plot_processes_and_states(ax, processes, points, "s", "T") if save_dir: fig.savefig(f"{save_dir}/ts_diagram.svg", bbox_inches="tight") return fig, ax
[docs] def plot_logph_diagram_matplotlib(self, connection_label, ax=None, save_dir=None, figsize=None, xlim=None, ylim=None): if ax is None: fig, ax = plt.subplots(figsize=figsize or (10, 6)) else: fig = ax.get_figure() processes, points, diagram = self._prepare_diagram_and_process_data(connection_label) x_min, x_max = xlim or self._make_cycle_plot_limits(points, "h", "lin") if ylim: y_min, y_max = ylim else: conn = self.nw.get_conn(connection_label) fluid_name = single_fluid(conn.fluid_data) ureg = self.nw.units.get_ureg() p_unit = self.nw.units.get_default('pressure') p_crit = ureg.Quantity(conn.fluid.wrapper[fluid_name]._p_crit, 'Pa').to(p_unit).magnitude y_min, y_max = self._make_cycle_plot_limits(points, "p", "log", clamp_max=p_crit) diagram.draw_isolines( fig, ax, "logph", x_min, x_max, y_min, y_max, isoline_data={ "s": {"values": np.array([])}, "Q": {"values": np.array([0.0, 0.2, 0.4, 0.6, 0.8, 1.0])}, } ) self._plot_processes_and_states(ax, processes, points, "h", "p") if save_dir: fig.savefig(f"{save_dir}/log_ph_diagram.svg", bbox_inches="tight") return fig, ax
[docs] def plot_QT_diagram_matplotlib(self, heatexchanger_label, ax=None, save_dir=None, figsize=None): if ax is None: fig, ax = plt.subplots(figsize=figsize or (10, 6)) else: fig = ax.get_figure() heatex = self.nw.get_comp(heatexchanger_label) heat, T_hot, T_cold, _, _ = heatex.calc_sections() ax.plot(heat, T_hot, "o-", color="red") ax.plot(heat, T_cold, "o-", color="blue") if save_dir: fig.savefig(f"{save_dir}/qt_diagram.svg", bbox_inches="tight") return fig, ax
def _make_cycle_plot_limits(self, states: list, quantity: str, scale: str, padding_rel=0.1, clamp_max=None) -> tuple: """Automatically retrieve the limits for an axes based on the process point limits in one axis Parameters ---------- states : list List of process states quantity : str Name of the quantity, e.g. :code:`T`, :code:`h` scale : str Scale of the axis to plot on padding_rel : float, optional relative difference to overall distance between min and max value, by default 0.1 clamp_max : float, optional if provided, the upper bound is computed using :code:`max(max_val, clamp_max)` so the axis always reaches at least this value (plus padding) Returns ------- tuple minimum and maximum value for axis """ all_values = [point[quantity] for point in states.values()] min_val = min(all_values) max_val = max(all_values) if clamp_max is not None: max_val = max(max_val, clamp_max) if scale == 'lin': delta_val = max_val - min_val ax_min_val = min_val - padding_rel * delta_val ax_max_val = max_val + padding_rel * delta_val elif scale == 'log': delta_val = np.log10(max_val) - np.log10(min_val) ax_min_val = 10 ** (np.log10(min_val) - padding_rel * delta_val) ax_max_val = 10 ** (np.log10(max_val) + padding_rel * delta_val) return ax_min_val, ax_max_val
[docs] def save_design(self, path=None) -> None: """Save current network state as the design reference.""" if path is not None: self.nw.save(path) self._design_path = path else: self._design_path = self.nw.save(as_dict=True)
[docs] def run_exergy_analysis(self, Tamb, pamb, E_F, E_P, E_L=None): """Run an exergy analysis via exerpy and cache the result. Parameters ---------- Tamb : float Ambient temperature in °C. pamb : float Ambient pressure in bar. E_F : dict Fuel exergy definition, e.g. :code:`{'inputs': [...], 'outputs': [...]}`. E_P : dict Product exergy definition. E_L : dict, optional Loss exergy definition. Returns ------- exerpy.ExergyAnalysis """ from exerpy import ExergyAnalysis kwargs = {'E_F': E_F, 'E_P': E_P} if E_L is not None: kwargs['E_L'] = E_L self._ean = ExergyAnalysis.from_tespy(self.nw, Tamb + 273.15, pamb * 1e5) self._ean.analyse(**kwargs) return self._ean
[docs] def sensitivity_analysis(self, param_dict=None, result_param_list=None, mode='design', postproc_func=None) -> pd.DataFrame: """ Parameters ---------- param_dict : dict A dictionary of parameter names and lists of values to be used in the sensitivity analysis. All lists must have the same length, which determines the number of simulations to be run. result_param_list : list, optional Names of model parameters (from :meth:`_parameter_lookup`) to record after each simulation step. mode : str, optional :code:`'design'` or :code:`'offdesign'`. Default is :code:`'design'`. postproc_func : callable, optional A function :code:`postproc_func(model) -> dict | None` called after each successful solve. Use it to run any postprocessing - e.g. an exergy analysis, custom KPI calculation, or result export. The returned dict (if any) is merged into the result row as additional columns alongside :code:`result_param_list`. If the function returns :code:`None` no extra columns are added. Example - running an exergy analysis after each offdesign solve:: def run_exergy(model): model.run_exergy_analysis(Tamb, pamb, E_F, E_P) hp.sensitivity_analysis( param_dict={"T_geo": [8, 9, 10, 11]}, result_param_list=["epsilon"], mode="offdesign", postproc_func=run_exergy, ) Returns ------- pandas.core.frame.DataFrame DataFrame with input parameter columns and result columns. """ if param_dict is None: raise ValueError( "Parameters need to be provided for the sensitivity analysis." ) # never enter kwarg=[] in function signature! if result_param_list is None: result_param_list = [] self._check_parameter_lengths(param_dict) result_rows = [] keys = list(param_dict.keys()) input_values = np.array(list(param_dict.values())).T # check current state of model and get nearest to current state current_values = np.array([self.get_parameter(k) for k in keys]) start_idx = int(np.argmin(cdist([current_values], input_values)[0])) # sort following simulations and force starting with the nearest to # current state order = self._order_min_change(input_values, start_idx=start_idx) sorted_input = input_values[order] # Sensitivity analysis loop: for row_num, row in enumerate(sorted_input): input_dict = {keys[i]: row[i] for i in range(len(keys))} if mode == 'offdesign': self.solve_model_offdesign(**input_dict) else: self.solve_model_design(**input_dict) if self.nw.status > 1: # TODO: get an example, which actually would trigger this # and then implement a respective logic # previous_input = { # keys[i]: sorted_input[row_num - 1][i] # for i in range(len(keys)) # } # self._intermediate_simulations(previous_input, input_dict) # if self.nw.status > 1: result_rows.append({ **input_dict, **{param: None for param in result_param_list} }) continue if postproc_func is not None: postproc_func(self) result_rows.append({ **input_dict, **self.get_results(result_param_list), }) results = pd.DataFrame(result_rows) results["_idx"] = order return results.sort_values(by="_idx").drop(columns="_idx").reset_index(drop=True)
# Method for checking the parameter lenghts def _check_parameter_lengths(self, param_dict=None): lengths = [len(v) for v in param_dict.values()] if len(set(lengths)) != 1: raise ValueError( "All parameters in the sensitivity dictionary must have " "the same number of values." ) # Method for ordering function - @staticmethod def _order_min_change(points: np.ndarray, start_idx: int = 0) -> np.ndarray: """Greedy heuristic: always go to the nearest unvisited point.""" n = len(points) dist = cdist(points, points) order = [start_idx] visited = set(order) while len(order) < n: last = order[-1] # pick nearest unvisited candidates = [(i, dist[last, i]) for i in range(n) if i not in visited] next_idx = min(candidates, key=lambda x: x[1])[0] order.append(next_idx) visited.add(next_idx) return order # Method for handling large step changes # def _intermediate_simulations(self, start: dict, end: dict) -> None: # """Run simulation at intermediate design points for more stability.""" # n=0 # num_steps= 2 # max_n=10 # while n< max_n: # intermediate_points = { # param: np.linspace(start[param], end[param], num_steps).tolist() # for param in range(len(start)) # } # for step in range(num_steps-1): # try: # self.solve_model_design( # {key: value[step] for key, value in intermediate_points.items()} # ) # except Exception as e: # print(f"Error solving model at intermediate point {step}: {e}") # for param, value in enumerate(end): # start[param]= intermediate_points[param][step] # num_steps*=2 # n+=1 # continue # else: # continue # else: # raise Exception("Simulation failed. Max iteration reached for intermediate simulation step increase.")
[docs] def solve_model(self, **kwargs) -> None: msg = ( "For the usage of the optimization please implement a concrete " "method called 'solve_model' in your class. For example, you " "could directly wire to the 'solve_model_design' method." ) raise NotImplementedError(msg)
[docs] def get_objectives(self, objective_list: list) -> list: return [self.get_parameter(obj) for obj in objective_list]
[docs] def optimize(self, algorithm, termination, variables: dict, constraints: dict = None, objective: list = None, minimize_flags: list = None, kpi: list = None) -> pd.DataFrame: from pymoo.optimize import minimize as pymoo_minimize problem = OptimizationProblem( self, variables=variables, constraints=constraints, objective=objective, minimize=minimize_flags, kpi=kpi, ) pymoo_minimize( problem=problem, algorithm=algorithm, termination=termination, ) return pd.DataFrame(problem.log)