Source code for tespy.tools.optimization

# -*- coding: utf-8

"""Module for OptimizationProblem class.

This file is part of project TESPy (github.com/oemof/tespy). It's copyrighted
by the contributors recorded in the version control history of the file,
available from its original location tespy/tools/optimization.py

SPDX-License-Identifier: MIT
"""
import warnings

import numpy as np

try:
    # this is to make this import of tespy possible without the optional
    # dependency pymoo available
    from pymoo.core.problem import ElementwiseProblem

except ModuleNotFoundError:
    class ElementwiseProblem:

        def __init__(self, **kwargs):
            pass

from tespy.tools.logger import logger


class _NestedModelAdapter:
    """Wraps a nested-API model to present the flat get_parameter/solve_model interface."""

    def __init__(self, model, param_mapping):
        self._model = model
        self._mapping = param_mapping

    def solve_model(self, **flat_kwargs):
        nested = {}
        for flat_name, value in flat_kwargs.items():
            obj, label, param = self._mapping[flat_name]
            if param is not None:
                nested.setdefault(obj, {}).setdefault(label, {})[param] = value
            else:
                nested.setdefault(obj, {})[label] = value
        self._model.solve_model(**nested)

    def get_parameter(self, name):
        if name in self._mapping:
            obj, label, param = self._mapping[name]
            return self._model.get_param(obj, label, param)
        return self._model.get_objective(name)

    def get_objectives(self, objective_list):
        return self._model.get_objectives(objective_list)

    def penalize(self, fitness, c):
        return self._model.penalize(fitness, c)


def _is_nested(variables, constraints, kpi):
    if variables and not any(
        isinstance(v, dict) and ("min" in v or "max" in v)
        for v in variables.values()
    ):
        return True
    if "lower limits" in constraints or "upper limits" in constraints:
        return True
    if isinstance(kpi, dict):
        return True
    return False


def _translate_nested(variables, constraints, kpi):
    """Translate legacy nested dicts to flat equivalents.

    Returns (flat_vars, flat_constraints, flat_kpi, param_mapping).
    param_mapping: flat_name -> (obj, label, param)  - param is None for Customs.
    """
    param_mapping = {}

    flat_vars = {}
    for obj, data in variables.items():
        if obj in ("Connections", "Components"):
            for label, params in data.items():
                for param, bounds in params.items():
                    flat_name = f"{obj}-{label}-{param}"
                    flat_vars[flat_name] = bounds
                    param_mapping[flat_name] = (obj, label, param)
        else:
            for label, bounds in data.items():
                flat_name = f"{obj}-{label}"
                flat_vars[flat_name] = bounds
                param_mapping[flat_name] = (obj, label, None)

    ref_map = {}
    for key, value in constraints.items():
        if key not in ("lower limits", "upper limits") and isinstance(value, list):
            obj, label, param = value
            ref_flat = f"{obj}-{label}-{param}"
            ref_map[key] = ref_flat
            param_mapping[ref_flat] = (obj, label, param)

    flat_constraints = {}
    for border, direction in [("lower", "min"), ("upper", "max")]:
        limit_key = f"{border} limits"
        if limit_key not in constraints:
            continue
        for obj, data in constraints[limit_key].items():
            if obj in ("Connections", "Components"):
                for label, params in data.items():
                    for param, val in params.items():
                        flat_name = f"{obj}-{label}-{param}"
                        param_mapping.setdefault(flat_name, (obj, label, param))
                        bound = ref_map[val] if isinstance(val, str) and val in ref_map else val
                        flat_constraints.setdefault(flat_name, {})[direction] = bound
            else:
                for label, val in data.items():
                    flat_name = f"{obj}-{label}"
                    param_mapping.setdefault(flat_name, (obj, label, None))
                    bound = ref_map[val] if isinstance(val, str) and val in ref_map else val
                    flat_constraints.setdefault(flat_name, {})[direction] = bound

    flat_kpi = []
    if isinstance(kpi, dict):
        for obj, data in kpi.items():
            if obj in ("Connections", "Components"):
                for label, params in data.items():
                    for param in params:
                        flat_name = f"{obj}-{label}-{param}"
                        flat_kpi.append(flat_name)
                        param_mapping.setdefault(flat_name, (obj, label, param))
            else:
                for label in data:
                    flat_name = f"{obj}-{label}"
                    flat_kpi.append(flat_name)
                    param_mapping.setdefault(flat_name, (obj, label, None))
    else:
        flat_kpi = list(kpi)

    return flat_vars, flat_constraints, flat_kpi, param_mapping


[docs] class OptimizationProblem(ElementwiseProblem): r""" The OptimizationProblem handles the optimization. Parameters ---------- model : tespy.models.ModelTemplate Model instance providing :code:`set_parameters`, :code:`get_parameter`, :code:`get_objectives` and :code:`solve_model`. variables : dict Flat dictionary of decision variables and their bounds, e.g. .. code-block:: python { "extraction pressure 1": {"min": 1, "max": 40}, "extraction pressure 2": {"min": 1, "max": 40}, } constraints : dict Flat dictionary of parameter constraints. Values are numeric bounds or a parameter name string for cross-parameter references, e.g. .. code-block:: python { "extraction pressure 1": {"min": "extraction pressure 2"}, "some temperature": {"min": 20, "max": 200}, } objective : list Names of the objective parameters as defined in the model's :code:`_parameter_lookup`. minimize : list :code:`True` to minimize, :code:`False` to maximize. One entry per objective, in the same order. kpi : list Parameter names to log at each evaluation in addition to the objectives, e.g. :code:`["hpt power", "hpt pressure ratio"]`. penalty_instead_of_constraints : bool If :code:`True`, constraints are passed to :code:`model.penalize` instead of being enforced directly. Default :code:`False`. Example ------- For an example please check out :ref:`this section <tutorial_optimization_label>` in the docs. """ def __init__( self, model, variables=None, constraints=None, objective=None, minimize=None, kpi=None, penalty_instead_of_constraints=False, ): if variables is None: variables = {} if constraints is None: constraints = {} if objective is None: objective = [] if kpi is None: kpi = [] if _is_nested(variables, constraints, kpi): warnings.warn( "Passing nested dictionaries to OptimizationProblem is " "deprecated and will be removed in a future release. Use flat " "parameter name dictionaries instead.", DeprecationWarning, stacklevel=2, ) variables, constraints, kpi, param_mapping = _translate_nested(variables, constraints, kpi) model = _NestedModelAdapter(model, param_mapping) self.model = model self.variables = variables self.constraints = constraints self.kpi = kpi self._build_objective(objective, minimize) self._build_variables() self._build_kpi() self.nic = 0 self.constraint_list = [] self._build_constraints() self.penalty_instead_of_constraints = penalty_instead_of_constraints n_ieq_constr = 0 if self.penalty_instead_of_constraints else len(self.constraint_list) self.log = [] super().__init__( n_var=len(self.variable_list), n_obj=len(self.objective_list), n_ieq_constr=n_ieq_constr, n_eq_constr=0, xl=self._bounds[0], xu=self._bounds[1] ) def _build_objective(self, objective, minimize) -> None: if not isinstance(objective, list): msg = "The objective(s) must be passed as a list." raise TypeError(msg) self.objective_list = objective self.nobj = len(self.objective_list) if minimize is None: self.minimize = [True for _ in self.objective_list] elif len(minimize) != self.nobj: msg = ( "If you supply the minimize argument the number of values in " "the list must be identical to the number of objectives." ) raise ValueError(msg) else: self.minimize = minimize def _build_variables(self) -> None: self.variable_list = [] self._bounds = [[], []] for param_name, bounds in self.variables.items(): self._bounds[0].append(bounds["min"]) self._bounds[1].append(bounds["max"]) self.variable_list.append(param_name) def _build_kpi(self) -> None: self.kpi_list = list(self.kpi) def _build_constraints(self) -> None: for param_name, bounds in self.constraints.items(): if "min" in bounds: right = bounds["min"] if isinstance(bounds["min"], str) else str(bounds["min"]) self.constraint_list.append(f"{param_name}>={right}") self.nic += 1 if "max" in bounds: right = bounds["max"] if isinstance(bounds["max"], str) else str(bounds["max"]) self.constraint_list.append(f"{param_name}<={right}") self.nic += 1 def _evaluate_constraints(self) -> list: evaluation = [] for param_name, bounds in self.constraints.items(): actual = self.model.get_parameter(param_name) if "min" in bounds: val = bounds["min"] limit = self.model.get_parameter(val) if isinstance(val, str) else val evaluation.append(limit - actual) if "max" in bounds: val = bounds["max"] limit = self.model.get_parameter(val) if isinstance(val, str) else val evaluation.append(actual - limit) return evaluation def _evaluate(self, x: np.ndarray, out: dict, *_args, **_kwargs) -> None: self.model.solve_model(**dict(zip(self.variable_list, x))) fitness = self.model.get_objectives(self.objective_list) kpi = [self.model.get_parameter(k) for k in self.kpi_list] _fitness = [(-1) ** (sense + 1) * f for f, sense in zip(fitness, self.minimize)] c = self._evaluate_constraints() if self.penalty_instead_of_constraints: _fitness = self.model.penalize(_fitness, c) else: out["G"] = c out["F"] = [value if not np.isnan(value) else 1e21 for value in _fitness] log_entry = { **{self.variable_list[i]: val for i, val in enumerate(x)}, **{self.objective_list[i]: val for i, val in enumerate(fitness)}, **{self.constraint_list[i]: val for i, val in enumerate(c)}, **{self.kpi_list[i]: val for i, val in enumerate(kpi)}, } self.log.append(log_entry)