diff --git "a/component/adapter - \345\211\257\346\234\254.py" "b/component/adapter - \345\211\257\346\234\254.py" deleted file mode 100644 index 34b290a52bca49a48925b59dc613b094cb095ae0..0000000000000000000000000000000000000000 --- "a/component/adapter - \345\211\257\346\234\254.py" +++ /dev/null @@ -1,206 +0,0 @@ -""" -MIT License - -Copyright (c) 2023 RWTH Aachen University - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -from Model_Library.component.core import ( - AbstractComponent, - ComponentCapacity, - ComponentKind, - ComponentCommodity, - ComponentLink, - ComponentPart, -) -from Model_Library.optimization_model import VariableKind - -import pyomo.environ as pyo - - -class AssetLink(ComponentLink): - def __init__(self, asset, asset_var_name, start, var_name, end): - self.asset = asset - self.asset_var_name = asset_var_name - self.start = start - self.var_name = var_name - self.end = end - - -class MemberLink(ComponentLink): - def __init__(self, member, member_res_name, local_res_name, end): - self.member = member - self.member_res_name = member_res_name - self.local_res_name = local_res_name - self.end = end - - -class AssetAdapter(AbstractComponent): - def __init__(self, name, configuration, assets): - self.asset = assets[configuration["asset"]] - self.grid = self.asset._components[configuration["grid"]] - - super().__init__( - name=name, - commodity_1=self.grid.commodity, - commodity_2=None, - commodity_3=self.grid.commodity, - commodity_4=None, - configuration=configuration, - capacity=ComponentCapacity.NONE, - ) - - self.commodity = self.grid.commodity - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - match_kind = kind == ComponentKind.ALL or kind == ComponentKind.ASSET_ADAPTER - match_commodity = ( - commodity == ComponentCommodity.ALL - or commodity == self.commodity - or (isinstance(commodity, list) and self.commodity in commodity) - ) - return match_kind and match_commodity - - def iter_component_parts(self): - yield ComponentPart.NONE_STATE - - def iter_links(self): - yield AssetLink( - self.asset, - self.grid.name + ".output_1", - (self.grid.name, ComponentPart.NONE_STATE), - self.name + ".into_asset", - (self.name, ComponentPart.NONE_STATE), - ) - yield AssetLink( - self.asset, - self.grid.name + ".input_1", - (self.grid.name, ComponentPart.NONE_STATE), - self.name + ".from_asset", - (self.name, ComponentPart.NONE_STATE), - ) - - def non_state_base_variable_names(self): - return [ - (self.name + ".input_1", VariableKind.INDEXED), - (self.name + ".output_1", VariableKind.INDEXED), - ] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - output = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".output_1", output) - - def add_non_state_model(self, d_block, o_block): - input = o_block.component_dict[self.name + ".input_1"] - - output = o_block.component_dict[self.name + ".output_1"] - - into_asset = o_block.component_dict[self.name + ".into_asset"] - - from_asset = o_block.component_dict[self.name + ".from_asset"] - - def rule(m, t): - return input[t] == into_asset[t] - - o_block.add(self.name + ".input_cons", pyo.Constraint(o_block.T, rule=rule)) - - def rule(m, t): - return output[t] == from_asset[t] - - o_block.add(self.name + ".output_cons", pyo.Constraint(o_block.T, rule=rule)) - - -class MemberAdapter(AbstractComponent): - def __init__(self, name, configuration, members): - self.member = members[configuration["member"]] - self.grid = self.member._components[configuration["grid"]] - - super().__init__( - name=name, - commodity_1=self.grid.commodity, - commodity_2=None, - commodity_3=self.grid.commodity, - commodity_4=None, - configuration=configuration, - capacity=ComponentCapacity.NONE, - ) - - self.commodity = self.grid.commodity - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - match_kind = kind == ComponentKind.ALL or kind == ComponentKind.ASSET_ADAPTER - match_commodity = ( - commodity == ComponentCommodity.ALL - or commodity == self.commodity - or (isinstance(commodity, list) and self.commodity in commodity) - ) - return match_kind and match_commodity - - def iter_component_parts(self): - yield ComponentPart.NONE_STATE - - def iter_links(self): - yield MemberLink( - self.member, - self.grid.name + ".output_1", - "into_member", - (self.name, ComponentPart.NONE_STATE), - ) - yield MemberLink( - self.member, - self.grid.name + ".input_1", - "from_member", - (self.name, ComponentPart.NONE_STATE), - ) - - def non_state_base_variable_names(self): - return [ - (self.name + ".input_1", VariableKind.INDEXED), - (self.name + ".output_1", VariableKind.INDEXED), - ] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - output = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".output_1", output) - - def add_non_state_model(self, d_block, o_block): - input = o_block.component_dict[self.name + ".input_1"] - - output = o_block.component_dict[self.name + ".output_1"] - - into_member = self.into_member.resample(o_block.dynamic).values - - from_member = self.from_member.resample(o_block.dynamic).values - - def rule(m, t): - return input[t] == into_member[t] - - o_block.add(self.name + ".input_cons", pyo.Constraint(o_block.T, rule=rule)) - - def rule(m, t): - return output[t] == from_member[t] - - o_block.add(self.name + ".output_cons", pyo.Constraint(o_block.T, rule=rule)) diff --git "a/component/core - \345\211\257\346\234\254.py" "b/component/core - \345\211\257\346\234\254.py" deleted file mode 100644 index 52f222b802e05a7b55d3da5db4174e32bc918ea0..0000000000000000000000000000000000000000 --- "a/component/core - \345\211\257\346\234\254.py" +++ /dev/null @@ -1,1004 +0,0 @@ - -from Model_Library.utility import design_annuity, operational_annuity -from Model_Library.optimization_model import VariableKind - -import abc -from enum import Enum -import json -import pyomo.environ as pyo -from typing import List - - -class ComponentKind(Enum): - ALL = 1 - ASSET_ADAPTER = 2 - BASE = 3 - BUSBAR = 4 - CONSUMPTION = 5 - GENERATION = 6 - GRID = 7 - MEMBER_ADAPTER = 8 - STORAGE = 9 - - -class ComponentCommodity(Enum): - ALL = 1 - ELECTRICITY = 2 - HEAT = 3 - GAS = 4 - COLD = 5 - HYDROGEN = 6 - SPACE_HEAT = 7 - - -class ComponentPart(Enum): - ALL = (1,) - DESIGN = (2,) - STATE = (3,) - NONE_STATE = 4 - - -class ComponentLink(abc.ABC): - pass - - -class VariableLink(ComponentLink): - def __init__(self, var_name, start, end): - self.var_name = var_name - self.start = start - self.end = end - - -class ComponentPartPattern: - def __init__( - self, - kind=ComponentKind.ALL, - type="all", - commodity=ComponentCommodity.ALL, - part=ComponentPart.ALL, - ): - if isinstance(kind, List) and ComponentKind.ALL in kind: - self.kind = ComponentKind.ALL - else: - self.kind = kind - - if isinstance(type, List) and "all" in type: - self.type = "all" - else: - self.type = type - - if isinstance(commodity, List) and ComponentCommodity.ALL in commodity: - self.commodity = ComponentCommodity.ALL - else: - self.commodity = commodity - - if isinstance(part, List) and ComponentPart.ALL in part: - self.part = ComponentPart.ALL - else: - self.part = part - - def match( - self, - kind=ComponentKind.ALL, - type="all", - commodity=ComponentCommodity.ALL, - part=ComponentPart.ALL, - ): - if kind != ComponentKind.ALL and self.kind != ComponentKind.ALL: - if isinstance(self.kind, List): - if kind not in self.kind: - return False - else: - if kind != self.kind: - return False - - if type != "all" and self.type != "all": - if isinstance(self.type, List): - if type not in self.type: - return False - else: - if type != self.type: - return False - - if ( - commodity != ComponentCommodity.ALL - and self.commodity != ComponentCommodity.ALL - ): - if isinstance(self.commodity, List): - if commodity not in self.commodity: - return False - else: - if commodity != self.commodity: - return False - - if part != ComponentPart.ALL and self.part != ComponentPart.ALL: - if isinstance(self.part, List): - if part not in self.part: - return False - else: - if part != self.part: - return False - - return True - - @staticmethod - def match_simple( - component_kind, - component_type, - component_commodities, - pattern_kind=ComponentKind.ALL, - pattern_type="all", - pattern_commodity=ComponentCommodity.ALL, - ): - match_kind = pattern_kind == ComponentKind.ALL or pattern_kind == component_kind - match_type = pattern_type == "all" or pattern_type == component_type - match_commodity = ( - pattern_commodity == ComponentCommodity.ALL - or pattern_commodity in component_commodities - ) - return match_kind and match_type and match_commodity - - -class ComponentCapacity(Enum): - NONE = 1 - OPTIONAL = 2 - REQUIRED = 3 - - -class AbstractComponent: - def __init__( - self, - name, - commodity_1, - commodity_2, - commodity_3, - commodity_4, - configuration, - capacity, - ): - self.name = name - self.input_commodity_1 = commodity_1 - self.input_commodity_2 = commodity_2 - self.output_commodity_1 = commodity_3 - self.output_commodity_2 = commodity_4 - - if "model" in configuration: - if isinstance(configuration["model"], str): - with open(configuration["model"]) as f: - configuration["model"] = json.load(f) - self._load_model(configuration, configuration.get("model", dict())) - - if capacity == ComponentCapacity.NONE: - self.capacity = None - elif capacity == ComponentCapacity.OPTIONAL: - if "capacity" in configuration: - self.capacity = configuration["capacity"] - elif "min_capacity" in configuration or "max_capacity" in configuration: - min_capacity = configuration.get("min_capacity", 0) - max_capacity = configuration.get("max_capacity", None) - if min_capacity == max_capacity: - self.capacity = min_capacity - else: - self.capacity = (min_capacity, max_capacity) - else: - self.capacity = None - elif capacity == ComponentCapacity.REQUIRED: - if "capacity" in configuration: - self.capacity = configuration["capacity"] - else: - min_capacity = configuration.get("min_capacity", 0) - max_capacity = configuration.get("max_capacity", None) - if min_capacity == max_capacity: - self.capacity = min_capacity - else: - self.capacity = (min_capacity, max_capacity) - - if self.capacity is not None: - self.specific_capital_expenditure = configuration["model"].get( - "specific_capital_expenditure", 0 - ) - self.service_life = configuration["model"].get("service_life", None) - self.factor_effort_maintenance_operation = configuration["model"].get( - "factor_effort_maintenance_operation", 0 - ) - - if configuration is not None and "additional_model_logic" in configuration: - self.additional_model_logic = configuration["additional_model_logic"] - else: - self.additional_model_logic = dict() - - def _load_model(self, configuration, model): - pass - - def commodities_ordered(self): - return ( - self.input_commodity_1, - self.input_commodity_2, - self.output_commodity_1, - self.output_commodity_2, - ) - - def iter_component_parts(self): - if False: - yield - - def iter_links(self): - if False: - yield - - # TODO make these functions into generator functions - def design_base_variable_names(self): - if self.capacity is None: - return [] - else: - return [(self.name + ".capacity", VariableKind.UNINDEXED)] - - # TODO make these functions into generator functions - def non_state_base_variable_names(self): - return [] - - # TODO make these functions into generator functions - def state_base_variable_names(self): - return [] - - def add_design_variables(self, d_block): - if self.capacity is not None: - if isinstance(self.capacity, tuple): - d_block.add(self.name + ".capacity", pyo.Var(bounds=self.capacity)) - elif isinstance(self.capacity, (int, float)): - d_block.add( - self.name + ".capacity", pyo.Param(initialize=self.capacity) - ) - else: - raise ValueError(f"Invalid capacity {self.capacity}!") - - def add_non_state_variables(self, o_block): - pass - - def add_state_variables(self, s_block): - pass - - def add_non_state_model(self, d_block, o_block): - pass - - def add_state_model(self, d_block, s_block): - pass - - def add_additional_model_logic(self, d_block, o_block): - for logic_name, logic in self.additional_model_logic.items(): - if logic["type"] == "RampPenalty": - var_name = logic["variable"] - var = o_block.component_dict[self.name + "." + var_name] - - var_ub_encoded = logic["variable_ub"] - if isinstance(var_ub_encoded, (int, float)): - var_ub = var_ub_encoded - elif var_ub_encoded == "capacity": - if self.capacity is not None: - if isinstance(self.capacity, tuple): - var_ub = d_block.component_dict[self.name + ".capacity"].ub - elif isinstance(self.capacity, (int, float)): - var_ub = self.capacity - else: - raise ValueError(f"Invalid capacity {self.capacity}!") - else: - raise ValueError(f"Invalid variable_ub {var_ub_encoded}") - else: - raise ValueError(f"Invalid variable_ub {var_ub_encoded}") - - z = pyo.Var(o_block.T, domain=pyo.Binary) - o_block.add(self.name + "." + logic_name + "_z", z) - change = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + "." + logic_name + "_change", change) - - def rule(m, t): - if t == o_block.T.first(): - return pyo.Constraint.Skip - else: - return 0 <= change[t] - ( - var[o_block.T[o_block.T.ord(t) - 1]] - var[t] - ) - - o_block.add( - self.name + "." + logic_name + "_cons_1", - pyo.Constraint(o_block.T, rule=rule), - ) - - def rule(m, t): - if t == o_block.T.first(): - return pyo.Constraint.Skip - else: - return ( - change[t] - (var[o_block.T[o_block.T.ord(t) - 1]] - var[t]) - <= 2.0 * var_ub * z[t] - ) - - o_block.add( - self.name + "." + logic_name + "_cons_2", - pyo.Constraint(o_block.T, rule=rule), - ) - - def rule(m, t): - if t == o_block.T.first(): - return pyo.Constraint.Skip - else: - return 0 <= change[t] - ( - var[t] - var[o_block.T[o_block.T.ord(t) - 1]] - ) - - o_block.add( - self.name + "." + logic_name + "_cons_3", - pyo.Constraint(o_block.T, rule=rule), - ) - - def rule(m, t): - if t == o_block.T.first(): - return pyo.Constraint.Skip - else: - return change[t] - ( - var[t] - var[o_block.T[o_block.T.ord(t) - 1]] - ) <= 2.0 * var_ub * (1.0 - z[t]) - - o_block.add( - self.name + "." + logic_name + "_cons_4", - pyo.Constraint(o_block.T, rule=rule), - ) - - objective_term = pyo.quicksum( - change[t] * logic["objective_factor"] for t in o_block.T - ) # This objective term should be scaled by the time step, because the change itself is bigger, when the time step is longer, so this implicetly scales the objective term - - o_block.add_general_scaled_objective(objective_term) - - if logic["type"] == "additional_operational_objective": - factor_encoded, var_name = logic["value"] - if isinstance(factor_encoded, (int, float)): - factor = factor_encoded - else: - raise ValueError(f"Invalid factor {factor_encoded}!") - var = o_block.component_dict[self.name + "." + var_name] - - objective_term = pyo.quicksum( - factor * var[t] * o_block.step_size(t) for t in o_block.T - ) - - o_block.add_general_scaled_objective(objective_term) - - def design_annuity(self, model, T, r, q): - if self.capacity is not None: - A_0 = ( - model.component_dict[self.name + ".capacity"] - * self.specific_capital_expenditure - ) - else: - A_0 = 0.0 - if isinstance(A_0, float) and A_0 == 0.0: - self_design_annuity = 0.0 - else: - if self.service_life is not None: - T_N = self.service_life - else: - T_N = T - f_b = self.factor_effort_maintenance_operation - self_design_annuity = design_annuity(A_0, T, T_N, r, q, f_b) - - return self_design_annuity - - # TODO rename model to o_block - def _operational_exprenditure(self, model): - return 0.0 - - def operational_annuity( - self, model, T, r, q - ): # normally is w, to scale the operational expenditure A_V to the operational expenditure in the first year A_V1, but that is done for all operational annuities when right before the objective is added to the model - A_V = self._operational_exprenditure(model) - if isinstance(A_V, float) and A_V == 0.0: - self_operational_annuity = 0.0 - else: - self_operational_annuity = operational_annuity(T, r, q, A_V) - - return self_operational_annuity - - def peak_power_cost(self, model): - return 0.0 - - def co2_emissions(self, model): - # heat https://www.uni-goettingen.de/de/document/download/e778b3727c64ed6f962e4c1cea80fa2f.pdf/CO2%20Emissionen_2016.pdf - # https://www.umweltbundesamt.de/presse/pressemitteilungen/bilanz-2019-co2-emissionen-pro-kilowattstunde-strom - return 0.0 - - -class BaseBusBar(AbstractComponent): - def __init__(self, name, commodity): - super().__init__( - name=name, - commodity_1=commodity, - commodity_2=None, - commodity_3=commodity, - commodity_4=None, - configuration=dict(), - capacity=ComponentCapacity.NONE, - ) - - self.commodity = commodity - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - return ComponentPartPattern.match_simple( - ComponentKind.BUSBAR, - self.__class__.__name__, - [self.commodity], - pattern_kind=kind, - pattern_commodity=commodity, - ) - - def iter_component_parts(self): - yield ComponentPart.NONE_STATE - - def non_state_base_variable_names(self): - return [ - (self.name + ".input_1", VariableKind.INDEXED), - (self.name + ".output_1", VariableKind.INDEXED), - ] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - output = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".output_1", output) - - def add_non_state_model(self, d_block, o_block): - input = o_block.component_dict[self.name + ".input_1"] - output = o_block.component_dict[self.name + ".output_1"] - - def rule(m, t): - return output[t] == input[t] - - o_block.add(self.name + ".sum", pyo.Constraint(o_block.T, rule=rule)) - - -class BaseComponent(AbstractComponent): - def __init__( - self, name, commodity_1, commodity_2, commodity_3, commodity_4, configuration - ): - super().__init__( - name=name, - commodity_1=commodity_1, - commodity_2=commodity_2, - commodity_3=commodity_3, - commodity_4=commodity_4, - configuration=configuration, - capacity=ComponentCapacity.REQUIRED, - ) - - self._setup_conversions(configuration) - - def _setup_conversions(self, configuration): - self._load_conversions(configuration) - - self.indep_var_1 = configuration["indep_var_1"] - self.indep_var_2 = configuration["indep_var_2"] - self.conversion_1 = configuration["conversion_1"] - self.conversion_2 = configuration["conversion_2"] - dep_var_1 = self.conversion_1[1] - dep_var_2 = self.conversion_2[1] if self.conversion_2 is not None else None - - self.operational_variables = [ - (self.name + "." + var, VariableKind.INDEXED) - for var in ["input_1", "input_2", "output_1", "output_2"] - if var in [self.indep_var_1, self.indep_var_2, dep_var_1, dep_var_2] - ] - - def _load_conversions(self, configuration): - efficiency = configuration["efficiency"] - - configuration["indep_var_1"] = "output_1" - configuration["indep_var_2"] = None - configuration["conversion_1"] = ("output_1", "input_1", 1.0 / efficiency) - configuration["conversion_2"] = None - - def _load_model(self, configuration, model): - configuration["efficiency"] = model["efficiency"] - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - return ComponentPartPattern.match_simple( - ComponentKind.BASE, - self.__class__.__name__, - [ - self.input_commodity_1, - self.input_commodity_2, - self.output_commodity_1, - self.output_commodity_2, - ], - pattern_kind=kind, - pattern_commodity=commodity, - ) - - def iter_component_parts(self): - yield ComponentPart.DESIGN - yield ComponentPart.NONE_STATE - - def non_state_base_variable_names(self): - return self.operational_variables - - def add_non_state_variables(self, o_block): - dep_var_1 = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + "." + self.indep_var_1, dep_var_1) - - if self.indep_var_2 is not None: - dep_var_2 = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + "." + self.indep_var_2, dep_var_2) - - def add_non_state_model(self, d_block, o_block): - capacity = d_block.component_dict[self.name + ".capacity"] - - dep_var_1 = o_block.component_dict[self.name + "." + self.indep_var_1] - - def rule(m, t): - return dep_var_1[t] <= capacity - - o_block.add( - self.name + "." + self.indep_var_1 + "_capacity_cons", - pyo.Constraint(o_block.T, rule=rule), - ) - - if self.indep_var_2 is not None: - dep_var_2 = o_block.component_dict[self.name + "." + self.indep_var_2] - - def rule(m, t): - return dep_var_2[t] <= capacity - - o_block.add( - self.name + "." + self.indep_var_2 + "_capacity_cons", - pyo.Constraint(o_block.T, rule=rule), - ) - - z_bi_flow = pyo.Var(o_block.T, domain=pyo.Binary) - o_block.add(self.name + ".z_bi_flow", z_bi_flow) - - if isinstance(self.capacity, (int, float)): - - def rule(m, t): - return dep_var_1[t] <= z_bi_flow[t] * capacity - - o_block.add( - self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return dep_var_2[t] <= (1.0 - z_bi_flow[t]) * capacity - - o_block.add( - self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule) - ) - else: - - def rule(m, t): - return dep_var_1[t] <= z_bi_flow[t] * capacity.ub - - o_block.add( - self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return dep_var_2[t] <= (1.0 - z_bi_flow[t]) * capacity.ub - - o_block.add( - self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule) - ) - - self._handle_conversion(d_block, o_block, self.conversion_1) - - if self.conversion_2 is not None: - self._handle_conversion(d_block, o_block, self.conversion_2) - - def _handle_conversion(self, d_block, o_block, conversion): - indep_var = o_block.component_dict[self.name + "." + conversion[0]] - - dep_var = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + "." + conversion[1], dep_var) - - if isinstance(conversion[2], (int, float)): - - def rule(m, t): - return dep_var[t] == indep_var[t] * conversion[2] - - o_block.add( - self.name - + "." - + conversion[0] - + "_to_" - + conversion[1] - + "_conversion", - pyo.Constraint(o_block.T, rule=rule), - ) - else: - - def rule(m, t): - return dep_var[t] == indep_var[t] * conversion[2][t] - - o_block.add( - self.name - + "." - + conversion[0] - + "_to_" - + conversion[1] - + "_conversion", - pyo.Constraint(o_block.T, rule=rule), - ) - - -class BaseConsumption(AbstractComponent): - def __init__(self, name, commodity, configuration): - super().__init__( - name=name, - commodity_1=commodity, - commodity_2=None, - commodity_3=None, - commodity_4=None, - configuration=configuration, - capacity=ComponentCapacity.NONE, - ) - - self.commodity = commodity - - self.consumption = configuration["consumption"] - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - return ComponentPartPattern.match_simple( - ComponentKind.CONSUMPTION, - self.__class__.__name__, - [self.commodity], - pattern_kind=kind, - pattern_commodity=commodity, - ) - - def iter_component_parts(self): - yield ComponentPart.NONE_STATE - - def non_state_base_variable_names(self): - return [(self.name + ".input_1", VariableKind.INDEXED)] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - def add_non_state_model(self, d_block, o_block): - input = o_block.component_dict[self.name + ".input_1"] - - consumption = self.consumption.resample(o_block.dynamic).values - - def rule(m, t): - return input[t] == consumption[t] - - o_block.add( - self.name + ".consumption_cons", pyo.Constraint(o_block.T, rule=rule) - ) - - -class BaseGrid(AbstractComponent): - def __init__(self, name, commodity, configuration): - super().__init__( - name=name, - commodity_1=commodity, - commodity_2=None, - commodity_3=commodity, - commodity_4=None, - configuration=configuration, - capacity=ComponentCapacity.OPTIONAL, - ) - - self.commodity = commodity - - self.price = configuration.get("price", 0) - self.injection_price = configuration.get("injection_price", 0) - self.peak_power_cost_ = configuration.get("peak_power_cost", 0) - self.co2_emissions_ = configuration.get("co2_emissions", 0) - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - return ComponentPartPattern.match_simple( - ComponentKind.GRID, - self.__class__.__name__, - [self.commodity], - pattern_kind=kind, - pattern_commodity=commodity, - ) - - def iter_component_parts(self): - if self.capacity is not None: - yield ComponentPart.DESIGN - - yield ComponentPart.NONE_STATE - - def non_state_base_variable_names(self): - return [ - (self.name + ".input_1", VariableKind.INDEXED), - (self.name + ".output_1", VariableKind.INDEXED), - ] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - output = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".output_1", output) - - def add_non_state_model(self, d_block, o_block): - input = o_block.component_dict[self.name + ".input_1"] - - output = o_block.component_dict[self.name + ".output_1"] - - if self.capacity is not None: - capacity = d_block.component_dict[self.name + ".capacity"] - - def rule(m, t): - return input[t] <= capacity - - o_block.add( - self.name + ".capacity_input_cons", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return output[t] <= capacity - - o_block.add( - self.name + ".capacity_output_cons", - pyo.Constraint(o_block.T, rule=rule), - ) - - def _operational_exprenditure(self, model): - if isinstance(self.price, (int, float)): - output = model.component_dict[self.name + ".output_1"] - price_function = lambda t: output[t] * self.price - else: - price = self.price.resample(model.dynamic).values - - output = model.component_dict[self.name + ".output_1"] - price_function = lambda t: output[t] * price[t] - - if isinstance(self.injection_price, (int, float)): - input = model.component_dict[self.name + ".input_1"] - injection_price_function = lambda t: input[t] * self.injection_price - else: - injection_price = self.injection_price.resample(model.dynamic).values - - input = model.component_dict[self.name + ".input_1"] - injection_price_function = lambda t: input[t] * injection_price[t] - - return pyo.quicksum( - (price_function(t) - injection_price_function(t)) * model.step_size(t) - for t in model.T - ) - - ''' - def limit_flow(flow, max_value): - return min(flow, max_value) - - max_output = 500 - max_input = 500 - - return pyo.quicksum( - (limit_flow(price_function(t), max_output) - - limit_flow(injection_price_function(t), max_input)) * model.step_size(t) - for t in model.T - ) - ''' - - def peak_power_cost(self, model): - peak_import = pyo.Var() - model.add(self.name + ".peak_import", peak_import) - - output = model.component_dict[self.name + ".output_1"] - - def rule(m, t): - return output[t] <= peak_import - - model.add(self.name + ".peak_import_cons", pyo.Constraint(model.T, rule=rule)) - - return peak_import * self.peak_power_cost_ - - def co2_emissions(self, model): - if isinstance(self.co2_emissions_, (int, float)): - output = model.component_dict[self.name + ".output_1"] - co2_emissions_function = lambda t: output[t] * self.co2_emissions_ - else: - co2_emissions_ = self.co2_emissions_.resample(model.dynamic).values - - output = model.component_dict[self.name + ".output_1"] - co2_emissions_function = lambda t: output[t] * co2_emissions_[t] - - -class BaseStorage(AbstractComponent): - def __init__(self, name, commodity, configuration): - super().__init__( - name=name, - commodity_1=commodity, - commodity_2=None, - commodity_3=commodity, - commodity_4=None, - configuration=configuration, - capacity=ComponentCapacity.REQUIRED, - ) - - self.commodity = commodity - if "first_soe" in configuration: - self.first_soe = configuration["first_soe"] - if "final_soe" in configuration: - self.final_soe = configuration["final_soe"] - - def _load_model(self, configuration, model): - self.input_efficiency = model["input_efficiency"] - self.e2p_in = model["e2p_in"] - self.output_efficiency = model["output_efficiency"] - self.e2p_out = model["e2p_out"] - self.self_discharging_loss = model["self_discharging_loss"] - - def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL): - return ComponentPartPattern.match_simple( - ComponentKind.STORAGE, - self.__class__.__name__, - [self.commodity], - pattern_kind=kind, - pattern_commodity=commodity, - ) - - def iter_component_parts(self): - yield ComponentPart.DESIGN - yield ComponentPart.NONE_STATE - yield ComponentPart.STATE - - def iter_links(self): - yield VariableLink( - self.name + ".input_1", - (self.name, ComponentPart.NONE_STATE), - (self.name, ComponentPart.STATE), - ) - yield VariableLink( - self.name + ".output_1", - (self.name, ComponentPart.NONE_STATE), - (self.name, ComponentPart.STATE), - ) - - def state_base_variable_names(self): - return [ - (self.name + ".input_1", VariableKind.INDEXED), - (self.name + ".output_1", VariableKind.INDEXED), - ] - - def state_base_variable_names(self): - return [ - (self.name + ".energy", VariableKind.EXTENDED_INDEXED), - ] - - def add_non_state_variables(self, o_block): - input = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".input_1", input) - - output = pyo.Var(o_block.T, bounds=(0, None)) - o_block.add(self.name + ".output_1", output) - - def add_state_variables(self, s_block): - energy = pyo.Var(s_block.T_prime, bounds=(0, None)) - s_block.add(self.name + ".energy", energy) - - def add_non_state_model(self, d_block, o_block): - capacity = d_block.component_dict[self.name + ".capacity"] - input = o_block.component_dict[self.name + ".input_1"] - output = o_block.component_dict[self.name + ".output_1"] - - def rule(m, t): - return input[t] * self.input_efficiency <= capacity / self.e2p_in - - o_block.add( - self.name + ".capacity_input_cons", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return output[t] / self.output_efficiency <= capacity / self.e2p_out - - o_block.add( - self.name + ".capacity_output_cons", pyo.Constraint(o_block.T, rule=rule) - ) - - z_bi_flow = pyo.Var(o_block.T, domain=pyo.Binary) - o_block.add(self.name + ".z_bi_flow", z_bi_flow) - - if isinstance(self.capacity, (int, float)): - - def rule(m, t): - return input[t] <= z_bi_flow[t] * ( - capacity / (self.e2p_in * self.input_efficiency) - ) - - o_block.add( - self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return output[t] <= (1.0 - z_bi_flow[t]) * ( - (capacity * self.output_efficiency) / self.e2p_out - ) - - o_block.add( - self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule) - ) - else: - - def rule(m, t): - return input[t] <= z_bi_flow[t] * ( - capacity.ub / (self.e2p_in * self.input_efficiency) - ) - - o_block.add( - self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule) - ) - - def rule(m, t): - return output[t] <= (1.0 - z_bi_flow[t]) * ( - (capacity.ub * self.output_efficiency) / self.e2p_out - ) - - o_block.add( - self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule) - ) - - def add_state_model(self, d_block, s_block): - capacity = d_block.component_dict[self.name + ".capacity"] - energy = s_block.component_dict[self.name + ".energy"] - input = s_block.component_dict[self.name + ".input_1"] - output = s_block.component_dict[self.name + ".output_1"] - - def rule(m, t): - return energy[t] <= capacity - - s_block.add(self.name + ".energy_ub", pyo.Constraint(s_block.T, rule=rule)) - - if hasattr(self, "first_soe"): - factor_encoded, var_name = self.first_soe["value"] - var = s_block.component_dict[self.name + "." + var_name] - if isinstance(factor_encoded, (int, float)): - factor = factor_encoded - else: - raise ValueError(f"Invalid factor {factor}!") - sense = self.first_soe["sense"] - if sense == "==": - expr = energy[s_block.T_prime.first()] == factor * var - elif sense == "<=": - expr = energy[s_block.T_prime.first()] <= factor * var - elif sense == ">=": - expr = energy[s_block.T_prime.first()] >= factor * var - else: - raise ValueError(f"Invalid sense {sense}!") - s_block.add(self.name + ".fix_first_energy", pyo.Constraint(expr=expr)) - - def rule(m, t): - return energy[t] == energy[s_block.T_prime[s_block.T_prime.ord(t) - 1]] * ( - 1.0 - self.self_discharging_loss * s_block.step_size(t) - ) + input[t] * self.input_efficiency * s_block.step_size(t) - output[ - t - ] / self.output_efficiency * s_block.step_size( - t - ) - - s_block.add(self.name + ".state_equation", pyo.Constraint(s_block.T, rule=rule)) - - if hasattr(self, "final_soe"): - factor_encoded, var = self.final_soe["value"] - if isinstance(factor_encoded, (int, float)): - factor = factor_encoded - else: - raise ValueError(f"Invalid factor {factor}!") - if var == "first_soe": - var = energy[s_block.T_prime.first()] - else: - var = s_block.component_dict[self.name + "." + var] - if self.final_soe["sense"] == "==": - expr = energy[s_block.T_prime.last()] == factor * var - elif self.final_soe["sense"] == "<=": - expr = energy[s_block.T_prime.last()] <= factor * var - elif self.final_soe["sense"] == ">=": - expr = energy[s_block.T_prime.last()] >= factor * var - else: - sense = self.final_soe["sense"] - raise ValueError(f"Invalid sense {sense}!") - s_block.add(self.name + ".fix_final_energy", pyo.Constraint(expr=expr)) diff --git "a/component/electricity - \345\211\257\346\234\254.py" "b/component/electricity - \345\211\257\346\234\254.py" deleted file mode 100644 index c21f20a3952fbfb694ee644121187c73d86feebc..0000000000000000000000000000000000000000 --- "a/component/electricity - \345\211\257\346\234\254.py" +++ /dev/null @@ -1,79 +0,0 @@ - -from Model_Library.component.core import ( - BaseBusBar, - BaseComponent, - BaseConsumption, - BaseGeneration, - BaseGrid, - BaseStorage, - ComponentCommodity, -) -from Model_Library.optimization_model import VariableKind - -import pyomo.environ as pyo - - -class ElectricalBusBar(BaseBusBar): - def __init__(self, name, configuration): - super().__init__(name=name, commodity=ComponentCommodity.ELECTRICITY) - - -class BiPowerElectronic(BaseComponent): - def __init__(self, name, configuration): - super().__init__( - name=name, - commodity_1=ComponentCommodity.ELECTRICITY, - commodity_2=ComponentCommodity.ELECTRICITY, - commodity_3=ComponentCommodity.ELECTRICITY, - commodity_4=ComponentCommodity.ELECTRICITY, - configuration=configuration, - ) - - def _load_model(self, configuration, model): - configuration["efficiency_a"] = model["efficiency_a"] - configuration["efficiency_b"] = model["efficiency_b"] - configuration["rated_power_side_a"] = model["rated_power_side_a"] - configuration["rated_power_side_b"] = model["rated_power_side_b"] - - def _load_conversions(self, configuration): - efficiency_a = configuration["efficiency_a"] # TODO move comment to docu 1 -> 2 - efficiency_b = configuration["efficiency_b"] # TODO move comment to docu 2 -> 1 - - if configuration["rated_power_side_a"] == "output": - configuration["indep_var_1"] = "output_2" - configuration["conversion_1"] = ("output_2", "input_1", 1.0 / efficiency_a) - else: - configuration["indep_var_1"] = "input_1" - configuration["conversion_1"] = ("input_1", "output_2", efficiency_a) - - if configuration["rated_power_side_b"] == "output": - configuration["indep_var_2"] = "output_1" - configuration["conversion_2"] = ("output_1", "input_2", 1.0 / efficiency_b) - else: - configuration["indep_var_2"] = "input_2" - configuration["conversion_2"] = ("input_2", "output_1", efficiency_b) - -class ElectricalConsumption(BaseConsumption): - def __init__(self, name, configuration): - super().__init__( - name=name, - commodity=ComponentCommodity.ELECTRICITY, - configuration=configuration, - ) - -class ElectricalGrid(BaseGrid): - def __init__(self, name, configuration): - super().__init__( - name=name, - commodity=ComponentCommodity.ELECTRICITY, - configuration=configuration, - ) - - -class Battery(BaseStorage): - def __init__(self, name, configuration): - super().__init__( - name=name, - commodity=ComponentCommodity.ELECTRICITY, - configuration=configuration, - ) diff --git a/component/electricity.py b/component/electricity.py index e8dccaf30e1a74f8db10b2b47e5a77677926397c..0825f1de336334df3da5ab1e35a0fc299a996dc8 100644 --- a/component/electricity.py +++ b/component/electricity.py @@ -193,7 +193,12 @@ class Battery(BaseStorage): class EVBattery(BaseStorage): def __init__(self, name, configuration): - super().__init__(name=name, commodity=ComponentCommodity.ELECTRICITY, configuration=configuration) + super().__init__( + name=name, + commodity=ComponentCommodity.ELECTRICITY, + configuration=configuration + ) + self.connected_profile = configuration.get("connected_profile", None) def add_state_model(self, d_block, s_block): super().add_state_model(d_block, s_block) @@ -202,11 +207,12 @@ class EVBattery(BaseStorage): energy = s_block.component_dict[self.name + ".energy"] T = s_block.T + # === SOC 惩罚项 === soc_ranges = [ - ("low_heavy", 0.3, 0.1, "below"), # <30% - ("low_light", 0.5, 0.03, "below"), # 30-50% - ("high_light", 0.7, 0.03, "above"), # 70-80% - ("high_heavy", 0.8, 0.1, "above"), # >80% + ("low_heavy", 0.2, 0.005, "below"), + ("low_light", 0.5, 0.001, "below"), + ("high_light", 0.7, 0.001, "above"), + ("high_heavy", 0.9, 0.005, "above"), ] for name, threshold, factor, direction in soc_ranges: @@ -223,10 +229,27 @@ class EVBattery(BaseStorage): return slack[t] >= energy[t] - thr * capacity s_block.add(con_name, pyo.Constraint(T, rule=rule)) - obj_expr = pyo.quicksum(factor * slack[t] * s_block.step_size(t) for t in T) s_block.add_general_scaled_objective(obj_expr) + if self.connected_profile is not None: + connected = self.connected_profile.resample(s_block.dynamic).values + + lookback_steps = 8 + for t in T: + if t > 0 and connected[t - 1] == 1 and connected[t] == 0: + + recently_drove = (connected[max(0, t - lookback_steps):t] == 0).any() + + if not recently_drove: + s_block.add( + f"{self.name}.soc_departure_strict_{t}", + pyo.Constraint(expr=energy[t] >= 0.7 * capacity) + ) + else: + print(f"⚠️ {self.name}: connected_profile NOT SET. No departure SOC constraints added.") + + class ElectricalPassThrough(BaseComponent): def __init__(self, name, configuration):