Skip to content
Snippets Groups Projects
Commit 141250aa authored by yifan.he's avatar yifan.he
Browse files

new components

parent 252b63ae
No related branches found
No related tags found
No related merge requests found
"""
MIT License
Copyright (c) 2023 RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from Model_Library.component.core import (
AbstractComponent,
ComponentCapacity,
ComponentKind,
ComponentCommodity,
ComponentLink,
ComponentPart,
)
from Model_Library.optimization_model import VariableKind
import pyomo.environ as pyo
class AssetLink(ComponentLink):
def __init__(self, asset, asset_var_name, start, var_name, end):
self.asset = asset
self.asset_var_name = asset_var_name
self.start = start
self.var_name = var_name
self.end = end
class MemberLink(ComponentLink):
def __init__(self, member, member_res_name, local_res_name, end):
self.member = member
self.member_res_name = member_res_name
self.local_res_name = local_res_name
self.end = end
class AssetAdapter(AbstractComponent):
def __init__(self, name, configuration, assets):
self.asset = assets[configuration["asset"]]
self.grid = self.asset._components[configuration["grid"]]
super().__init__(
name=name,
commodity_1=self.grid.commodity,
commodity_2=None,
commodity_3=self.grid.commodity,
commodity_4=None,
configuration=configuration,
capacity=ComponentCapacity.NONE,
)
self.commodity = self.grid.commodity
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
match_kind = kind == ComponentKind.ALL or kind == ComponentKind.ASSET_ADAPTER
match_commodity = (
commodity == ComponentCommodity.ALL
or commodity == self.commodity
or (isinstance(commodity, list) and self.commodity in commodity)
)
return match_kind and match_commodity
def iter_component_parts(self):
yield ComponentPart.NONE_STATE
def iter_links(self):
yield AssetLink(
self.asset,
self.grid.name + ".output_1",
(self.grid.name, ComponentPart.NONE_STATE),
self.name + ".into_asset",
(self.name, ComponentPart.NONE_STATE),
)
yield AssetLink(
self.asset,
self.grid.name + ".input_1",
(self.grid.name, ComponentPart.NONE_STATE),
self.name + ".from_asset",
(self.name, ComponentPart.NONE_STATE),
)
def non_state_base_variable_names(self):
return [
(self.name + ".input_1", VariableKind.INDEXED),
(self.name + ".output_1", VariableKind.INDEXED),
]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
output = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".output_1", output)
def add_non_state_model(self, d_block, o_block):
input = o_block.component_dict[self.name + ".input_1"]
output = o_block.component_dict[self.name + ".output_1"]
into_asset = o_block.component_dict[self.name + ".into_asset"]
from_asset = o_block.component_dict[self.name + ".from_asset"]
def rule(m, t):
return input[t] == into_asset[t]
o_block.add(self.name + ".input_cons", pyo.Constraint(o_block.T, rule=rule))
def rule(m, t):
return output[t] == from_asset[t]
o_block.add(self.name + ".output_cons", pyo.Constraint(o_block.T, rule=rule))
class MemberAdapter(AbstractComponent):
def __init__(self, name, configuration, members):
self.member = members[configuration["member"]]
self.grid = self.member._components[configuration["grid"]]
super().__init__(
name=name,
commodity_1=self.grid.commodity,
commodity_2=None,
commodity_3=self.grid.commodity,
commodity_4=None,
configuration=configuration,
capacity=ComponentCapacity.NONE,
)
self.commodity = self.grid.commodity
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
match_kind = kind == ComponentKind.ALL or kind == ComponentKind.ASSET_ADAPTER
match_commodity = (
commodity == ComponentCommodity.ALL
or commodity == self.commodity
or (isinstance(commodity, list) and self.commodity in commodity)
)
return match_kind and match_commodity
def iter_component_parts(self):
yield ComponentPart.NONE_STATE
def iter_links(self):
yield MemberLink(
self.member,
self.grid.name + ".output_1",
"into_member",
(self.name, ComponentPart.NONE_STATE),
)
yield MemberLink(
self.member,
self.grid.name + ".input_1",
"from_member",
(self.name, ComponentPart.NONE_STATE),
)
def non_state_base_variable_names(self):
return [
(self.name + ".input_1", VariableKind.INDEXED),
(self.name + ".output_1", VariableKind.INDEXED),
]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
output = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".output_1", output)
def add_non_state_model(self, d_block, o_block):
input = o_block.component_dict[self.name + ".input_1"]
output = o_block.component_dict[self.name + ".output_1"]
into_member = self.into_member.resample(o_block.dynamic).values
from_member = self.from_member.resample(o_block.dynamic).values
def rule(m, t):
return input[t] == into_member[t]
o_block.add(self.name + ".input_cons", pyo.Constraint(o_block.T, rule=rule))
def rule(m, t):
return output[t] == from_member[t]
o_block.add(self.name + ".output_cons", pyo.Constraint(o_block.T, rule=rule))
from Model_Library.utility import design_annuity, operational_annuity
from Model_Library.optimization_model import VariableKind
import abc
from enum import Enum
import json
import pyomo.environ as pyo
from typing import List
class ComponentKind(Enum):
ALL = 1
ASSET_ADAPTER = 2
BASE = 3
BUSBAR = 4
CONSUMPTION = 5
GENERATION = 6
GRID = 7
MEMBER_ADAPTER = 8
STORAGE = 9
class ComponentCommodity(Enum):
ALL = 1
ELECTRICITY = 2
HEAT = 3
GAS = 4
COLD = 5
HYDROGEN = 6
SPACE_HEAT = 7
class ComponentPart(Enum):
ALL = (1,)
DESIGN = (2,)
STATE = (3,)
NONE_STATE = 4
class ComponentLink(abc.ABC):
pass
class VariableLink(ComponentLink):
def __init__(self, var_name, start, end):
self.var_name = var_name
self.start = start
self.end = end
class ComponentPartPattern:
def __init__(
self,
kind=ComponentKind.ALL,
type="all",
commodity=ComponentCommodity.ALL,
part=ComponentPart.ALL,
):
if isinstance(kind, List) and ComponentKind.ALL in kind:
self.kind = ComponentKind.ALL
else:
self.kind = kind
if isinstance(type, List) and "all" in type:
self.type = "all"
else:
self.type = type
if isinstance(commodity, List) and ComponentCommodity.ALL in commodity:
self.commodity = ComponentCommodity.ALL
else:
self.commodity = commodity
if isinstance(part, List) and ComponentPart.ALL in part:
self.part = ComponentPart.ALL
else:
self.part = part
def match(
self,
kind=ComponentKind.ALL,
type="all",
commodity=ComponentCommodity.ALL,
part=ComponentPart.ALL,
):
if kind != ComponentKind.ALL and self.kind != ComponentKind.ALL:
if isinstance(self.kind, List):
if kind not in self.kind:
return False
else:
if kind != self.kind:
return False
if type != "all" and self.type != "all":
if isinstance(self.type, List):
if type not in self.type:
return False
else:
if type != self.type:
return False
if (
commodity != ComponentCommodity.ALL
and self.commodity != ComponentCommodity.ALL
):
if isinstance(self.commodity, List):
if commodity not in self.commodity:
return False
else:
if commodity != self.commodity:
return False
if part != ComponentPart.ALL and self.part != ComponentPart.ALL:
if isinstance(self.part, List):
if part not in self.part:
return False
else:
if part != self.part:
return False
return True
@staticmethod
def match_simple(
component_kind,
component_type,
component_commodities,
pattern_kind=ComponentKind.ALL,
pattern_type="all",
pattern_commodity=ComponentCommodity.ALL,
):
match_kind = pattern_kind == ComponentKind.ALL or pattern_kind == component_kind
match_type = pattern_type == "all" or pattern_type == component_type
match_commodity = (
pattern_commodity == ComponentCommodity.ALL
or pattern_commodity in component_commodities
)
return match_kind and match_type and match_commodity
class ComponentCapacity(Enum):
NONE = 1
OPTIONAL = 2
REQUIRED = 3
class AbstractComponent:
def __init__(
self,
name,
commodity_1,
commodity_2,
commodity_3,
commodity_4,
configuration,
capacity,
):
self.name = name
self.input_commodity_1 = commodity_1
self.input_commodity_2 = commodity_2
self.output_commodity_1 = commodity_3
self.output_commodity_2 = commodity_4
if "model" in configuration:
if isinstance(configuration["model"], str):
with open(configuration["model"]) as f:
configuration["model"] = json.load(f)
self._load_model(configuration, configuration.get("model", dict()))
if capacity == ComponentCapacity.NONE:
self.capacity = None
elif capacity == ComponentCapacity.OPTIONAL:
if "capacity" in configuration:
self.capacity = configuration["capacity"]
elif "min_capacity" in configuration or "max_capacity" in configuration:
min_capacity = configuration.get("min_capacity", 0)
max_capacity = configuration.get("max_capacity", None)
if min_capacity == max_capacity:
self.capacity = min_capacity
else:
self.capacity = (min_capacity, max_capacity)
else:
self.capacity = None
elif capacity == ComponentCapacity.REQUIRED:
if "capacity" in configuration:
self.capacity = configuration["capacity"]
else:
min_capacity = configuration.get("min_capacity", 0)
max_capacity = configuration.get("max_capacity", None)
if min_capacity == max_capacity:
self.capacity = min_capacity
else:
self.capacity = (min_capacity, max_capacity)
if self.capacity is not None:
self.specific_capital_expenditure = configuration["model"].get(
"specific_capital_expenditure", 0
)
self.service_life = configuration["model"].get("service_life", None)
self.factor_effort_maintenance_operation = configuration["model"].get(
"factor_effort_maintenance_operation", 0
)
if configuration is not None and "additional_model_logic" in configuration:
self.additional_model_logic = configuration["additional_model_logic"]
else:
self.additional_model_logic = dict()
def _load_model(self, configuration, model):
pass
def commodities_ordered(self):
return (
self.input_commodity_1,
self.input_commodity_2,
self.output_commodity_1,
self.output_commodity_2,
)
def iter_component_parts(self):
if False:
yield
def iter_links(self):
if False:
yield
# TODO make these functions into generator functions
def design_base_variable_names(self):
if self.capacity is None:
return []
else:
return [(self.name + ".capacity", VariableKind.UNINDEXED)]
# TODO make these functions into generator functions
def non_state_base_variable_names(self):
return []
# TODO make these functions into generator functions
def state_base_variable_names(self):
return []
def add_design_variables(self, d_block):
if self.capacity is not None:
if isinstance(self.capacity, tuple):
d_block.add(self.name + ".capacity", pyo.Var(bounds=self.capacity))
elif isinstance(self.capacity, (int, float)):
d_block.add(
self.name + ".capacity", pyo.Param(initialize=self.capacity)
)
else:
raise ValueError(f"Invalid capacity {self.capacity}!")
def add_non_state_variables(self, o_block):
pass
def add_state_variables(self, s_block):
pass
def add_non_state_model(self, d_block, o_block):
pass
def add_state_model(self, d_block, s_block):
pass
def add_additional_model_logic(self, d_block, o_block):
for logic_name, logic in self.additional_model_logic.items():
if logic["type"] == "RampPenalty":
var_name = logic["variable"]
var = o_block.component_dict[self.name + "." + var_name]
var_ub_encoded = logic["variable_ub"]
if isinstance(var_ub_encoded, (int, float)):
var_ub = var_ub_encoded
elif var_ub_encoded == "capacity":
if self.capacity is not None:
if isinstance(self.capacity, tuple):
var_ub = d_block.component_dict[self.name + ".capacity"].ub
elif isinstance(self.capacity, (int, float)):
var_ub = self.capacity
else:
raise ValueError(f"Invalid capacity {self.capacity}!")
else:
raise ValueError(f"Invalid variable_ub {var_ub_encoded}")
else:
raise ValueError(f"Invalid variable_ub {var_ub_encoded}")
z = pyo.Var(o_block.T, domain=pyo.Binary)
o_block.add(self.name + "." + logic_name + "_z", z)
change = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + "." + logic_name + "_change", change)
def rule(m, t):
if t == o_block.T.first():
return pyo.Constraint.Skip
else:
return 0 <= change[t] - (
var[o_block.T[o_block.T.ord(t) - 1]] - var[t]
)
o_block.add(
self.name + "." + logic_name + "_cons_1",
pyo.Constraint(o_block.T, rule=rule),
)
def rule(m, t):
if t == o_block.T.first():
return pyo.Constraint.Skip
else:
return (
change[t] - (var[o_block.T[o_block.T.ord(t) - 1]] - var[t])
<= 2.0 * var_ub * z[t]
)
o_block.add(
self.name + "." + logic_name + "_cons_2",
pyo.Constraint(o_block.T, rule=rule),
)
def rule(m, t):
if t == o_block.T.first():
return pyo.Constraint.Skip
else:
return 0 <= change[t] - (
var[t] - var[o_block.T[o_block.T.ord(t) - 1]]
)
o_block.add(
self.name + "." + logic_name + "_cons_3",
pyo.Constraint(o_block.T, rule=rule),
)
def rule(m, t):
if t == o_block.T.first():
return pyo.Constraint.Skip
else:
return change[t] - (
var[t] - var[o_block.T[o_block.T.ord(t) - 1]]
) <= 2.0 * var_ub * (1.0 - z[t])
o_block.add(
self.name + "." + logic_name + "_cons_4",
pyo.Constraint(o_block.T, rule=rule),
)
objective_term = pyo.quicksum(
change[t] * logic["objective_factor"] for t in o_block.T
) # This objective term should be scaled by the time step, because the change itself is bigger, when the time step is longer, so this implicetly scales the objective term
o_block.add_general_scaled_objective(objective_term)
if logic["type"] == "additional_operational_objective":
factor_encoded, var_name = logic["value"]
if isinstance(factor_encoded, (int, float)):
factor = factor_encoded
else:
raise ValueError(f"Invalid factor {factor_encoded}!")
var = o_block.component_dict[self.name + "." + var_name]
objective_term = pyo.quicksum(
factor * var[t] * o_block.step_size(t) for t in o_block.T
)
o_block.add_general_scaled_objective(objective_term)
def design_annuity(self, model, T, r, q):
if self.capacity is not None:
A_0 = (
model.component_dict[self.name + ".capacity"]
* self.specific_capital_expenditure
)
else:
A_0 = 0.0
if isinstance(A_0, float) and A_0 == 0.0:
self_design_annuity = 0.0
else:
if self.service_life is not None:
T_N = self.service_life
else:
T_N = T
f_b = self.factor_effort_maintenance_operation
self_design_annuity = design_annuity(A_0, T, T_N, r, q, f_b)
return self_design_annuity
# TODO rename model to o_block
def _operational_exprenditure(self, model):
return 0.0
def operational_annuity(
self, model, T, r, q
): # normally is w, to scale the operational expenditure A_V to the operational expenditure in the first year A_V1, but that is done for all operational annuities when right before the objective is added to the model
A_V = self._operational_exprenditure(model)
if isinstance(A_V, float) and A_V == 0.0:
self_operational_annuity = 0.0
else:
self_operational_annuity = operational_annuity(T, r, q, A_V)
return self_operational_annuity
def peak_power_cost(self, model):
return 0.0
def co2_emissions(self, model):
# heat https://www.uni-goettingen.de/de/document/download/e778b3727c64ed6f962e4c1cea80fa2f.pdf/CO2%20Emissionen_2016.pdf
# https://www.umweltbundesamt.de/presse/pressemitteilungen/bilanz-2019-co2-emissionen-pro-kilowattstunde-strom
return 0.0
class BaseBusBar(AbstractComponent):
def __init__(self, name, commodity):
super().__init__(
name=name,
commodity_1=commodity,
commodity_2=None,
commodity_3=commodity,
commodity_4=None,
configuration=dict(),
capacity=ComponentCapacity.NONE,
)
self.commodity = commodity
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return ComponentPartPattern.match_simple(
ComponentKind.BUSBAR,
self.__class__.__name__,
[self.commodity],
pattern_kind=kind,
pattern_commodity=commodity,
)
def iter_component_parts(self):
yield ComponentPart.NONE_STATE
def non_state_base_variable_names(self):
return [
(self.name + ".input_1", VariableKind.INDEXED),
(self.name + ".output_1", VariableKind.INDEXED),
]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
output = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".output_1", output)
def add_non_state_model(self, d_block, o_block):
input = o_block.component_dict[self.name + ".input_1"]
output = o_block.component_dict[self.name + ".output_1"]
def rule(m, t):
return output[t] == input[t]
o_block.add(self.name + ".sum", pyo.Constraint(o_block.T, rule=rule))
class BaseComponent(AbstractComponent):
def __init__(
self, name, commodity_1, commodity_2, commodity_3, commodity_4, configuration
):
super().__init__(
name=name,
commodity_1=commodity_1,
commodity_2=commodity_2,
commodity_3=commodity_3,
commodity_4=commodity_4,
configuration=configuration,
capacity=ComponentCapacity.REQUIRED,
)
self._setup_conversions(configuration)
def _setup_conversions(self, configuration):
self._load_conversions(configuration)
self.indep_var_1 = configuration["indep_var_1"]
self.indep_var_2 = configuration["indep_var_2"]
self.conversion_1 = configuration["conversion_1"]
self.conversion_2 = configuration["conversion_2"]
dep_var_1 = self.conversion_1[1]
dep_var_2 = self.conversion_2[1] if self.conversion_2 is not None else None
self.operational_variables = [
(self.name + "." + var, VariableKind.INDEXED)
for var in ["input_1", "input_2", "output_1", "output_2"]
if var in [self.indep_var_1, self.indep_var_2, dep_var_1, dep_var_2]
]
def _load_conversions(self, configuration):
efficiency = configuration["efficiency"]
configuration["indep_var_1"] = "output_1"
configuration["indep_var_2"] = None
configuration["conversion_1"] = ("output_1", "input_1", 1.0 / efficiency)
configuration["conversion_2"] = None
def _load_model(self, configuration, model):
configuration["efficiency"] = model["efficiency"]
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return ComponentPartPattern.match_simple(
ComponentKind.BASE,
self.__class__.__name__,
[
self.input_commodity_1,
self.input_commodity_2,
self.output_commodity_1,
self.output_commodity_2,
],
pattern_kind=kind,
pattern_commodity=commodity,
)
def iter_component_parts(self):
yield ComponentPart.DESIGN
yield ComponentPart.NONE_STATE
def non_state_base_variable_names(self):
return self.operational_variables
def add_non_state_variables(self, o_block):
dep_var_1 = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + "." + self.indep_var_1, dep_var_1)
if self.indep_var_2 is not None:
dep_var_2 = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + "." + self.indep_var_2, dep_var_2)
def add_non_state_model(self, d_block, o_block):
capacity = d_block.component_dict[self.name + ".capacity"]
dep_var_1 = o_block.component_dict[self.name + "." + self.indep_var_1]
def rule(m, t):
return dep_var_1[t] <= capacity
o_block.add(
self.name + "." + self.indep_var_1 + "_capacity_cons",
pyo.Constraint(o_block.T, rule=rule),
)
if self.indep_var_2 is not None:
dep_var_2 = o_block.component_dict[self.name + "." + self.indep_var_2]
def rule(m, t):
return dep_var_2[t] <= capacity
o_block.add(
self.name + "." + self.indep_var_2 + "_capacity_cons",
pyo.Constraint(o_block.T, rule=rule),
)
z_bi_flow = pyo.Var(o_block.T, domain=pyo.Binary)
o_block.add(self.name + ".z_bi_flow", z_bi_flow)
if isinstance(self.capacity, (int, float)):
def rule(m, t):
return dep_var_1[t] <= z_bi_flow[t] * capacity
o_block.add(
self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return dep_var_2[t] <= (1.0 - z_bi_flow[t]) * capacity
o_block.add(
self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule)
)
else:
def rule(m, t):
return dep_var_1[t] <= z_bi_flow[t] * capacity.ub
o_block.add(
self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return dep_var_2[t] <= (1.0 - z_bi_flow[t]) * capacity.ub
o_block.add(
self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule)
)
self._handle_conversion(d_block, o_block, self.conversion_1)
if self.conversion_2 is not None:
self._handle_conversion(d_block, o_block, self.conversion_2)
def _handle_conversion(self, d_block, o_block, conversion):
indep_var = o_block.component_dict[self.name + "." + conversion[0]]
dep_var = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + "." + conversion[1], dep_var)
if isinstance(conversion[2], (int, float)):
def rule(m, t):
return dep_var[t] == indep_var[t] * conversion[2]
o_block.add(
self.name
+ "."
+ conversion[0]
+ "_to_"
+ conversion[1]
+ "_conversion",
pyo.Constraint(o_block.T, rule=rule),
)
else:
def rule(m, t):
return dep_var[t] == indep_var[t] * conversion[2][t]
o_block.add(
self.name
+ "."
+ conversion[0]
+ "_to_"
+ conversion[1]
+ "_conversion",
pyo.Constraint(o_block.T, rule=rule),
)
class BaseConsumption(AbstractComponent):
def __init__(self, name, commodity, configuration):
super().__init__(
name=name,
commodity_1=commodity,
commodity_2=None,
commodity_3=None,
commodity_4=None,
configuration=configuration,
capacity=ComponentCapacity.NONE,
)
self.commodity = commodity
self.consumption = configuration["consumption"]
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return ComponentPartPattern.match_simple(
ComponentKind.CONSUMPTION,
self.__class__.__name__,
[self.commodity],
pattern_kind=kind,
pattern_commodity=commodity,
)
def iter_component_parts(self):
yield ComponentPart.NONE_STATE
def non_state_base_variable_names(self):
return [(self.name + ".input_1", VariableKind.INDEXED)]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
def add_non_state_model(self, d_block, o_block):
input = o_block.component_dict[self.name + ".input_1"]
consumption = self.consumption.resample(o_block.dynamic).values
def rule(m, t):
return input[t] == consumption[t]
o_block.add(
self.name + ".consumption_cons", pyo.Constraint(o_block.T, rule=rule)
)
class BaseGrid(AbstractComponent):
def __init__(self, name, commodity, configuration):
super().__init__(
name=name,
commodity_1=commodity,
commodity_2=None,
commodity_3=commodity,
commodity_4=None,
configuration=configuration,
capacity=ComponentCapacity.OPTIONAL,
)
self.commodity = commodity
self.price = configuration.get("price", 0)
self.injection_price = configuration.get("injection_price", 0)
self.peak_power_cost_ = configuration.get("peak_power_cost", 0)
self.co2_emissions_ = configuration.get("co2_emissions", 0)
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return ComponentPartPattern.match_simple(
ComponentKind.GRID,
self.__class__.__name__,
[self.commodity],
pattern_kind=kind,
pattern_commodity=commodity,
)
def iter_component_parts(self):
if self.capacity is not None:
yield ComponentPart.DESIGN
yield ComponentPart.NONE_STATE
def non_state_base_variable_names(self):
return [
(self.name + ".input_1", VariableKind.INDEXED),
(self.name + ".output_1", VariableKind.INDEXED),
]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
output = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".output_1", output)
def add_non_state_model(self, d_block, o_block):
input = o_block.component_dict[self.name + ".input_1"]
output = o_block.component_dict[self.name + ".output_1"]
if self.capacity is not None:
capacity = d_block.component_dict[self.name + ".capacity"]
def rule(m, t):
return input[t] <= capacity
o_block.add(
self.name + ".capacity_input_cons", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return output[t] <= capacity
o_block.add(
self.name + ".capacity_output_cons",
pyo.Constraint(o_block.T, rule=rule),
)
def _operational_exprenditure(self, model):
if isinstance(self.price, (int, float)):
output = model.component_dict[self.name + ".output_1"]
price_function = lambda t: output[t] * self.price
else:
price = self.price.resample(model.dynamic).values
output = model.component_dict[self.name + ".output_1"]
price_function = lambda t: output[t] * price[t]
if isinstance(self.injection_price, (int, float)):
input = model.component_dict[self.name + ".input_1"]
injection_price_function = lambda t: input[t] * self.injection_price
else:
injection_price = self.injection_price.resample(model.dynamic).values
input = model.component_dict[self.name + ".input_1"]
injection_price_function = lambda t: input[t] * injection_price[t]
return pyo.quicksum(
(price_function(t) - injection_price_function(t)) * model.step_size(t)
for t in model.T
)
'''
def limit_flow(flow, max_value):
return min(flow, max_value)
max_output = 500
max_input = 500
return pyo.quicksum(
(limit_flow(price_function(t), max_output) -
limit_flow(injection_price_function(t), max_input)) * model.step_size(t)
for t in model.T
)
'''
def peak_power_cost(self, model):
peak_import = pyo.Var()
model.add(self.name + ".peak_import", peak_import)
output = model.component_dict[self.name + ".output_1"]
def rule(m, t):
return output[t] <= peak_import
model.add(self.name + ".peak_import_cons", pyo.Constraint(model.T, rule=rule))
return peak_import * self.peak_power_cost_
def co2_emissions(self, model):
if isinstance(self.co2_emissions_, (int, float)):
output = model.component_dict[self.name + ".output_1"]
co2_emissions_function = lambda t: output[t] * self.co2_emissions_
else:
co2_emissions_ = self.co2_emissions_.resample(model.dynamic).values
output = model.component_dict[self.name + ".output_1"]
co2_emissions_function = lambda t: output[t] * co2_emissions_[t]
class BaseStorage(AbstractComponent):
def __init__(self, name, commodity, configuration):
super().__init__(
name=name,
commodity_1=commodity,
commodity_2=None,
commodity_3=commodity,
commodity_4=None,
configuration=configuration,
capacity=ComponentCapacity.REQUIRED,
)
self.commodity = commodity
if "first_soe" in configuration:
self.first_soe = configuration["first_soe"]
if "final_soe" in configuration:
self.final_soe = configuration["final_soe"]
def _load_model(self, configuration, model):
self.input_efficiency = model["input_efficiency"]
self.e2p_in = model["e2p_in"]
self.output_efficiency = model["output_efficiency"]
self.e2p_out = model["e2p_out"]
self.self_discharging_loss = model["self_discharging_loss"]
def match(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return ComponentPartPattern.match_simple(
ComponentKind.STORAGE,
self.__class__.__name__,
[self.commodity],
pattern_kind=kind,
pattern_commodity=commodity,
)
def iter_component_parts(self):
yield ComponentPart.DESIGN
yield ComponentPart.NONE_STATE
yield ComponentPart.STATE
def iter_links(self):
yield VariableLink(
self.name + ".input_1",
(self.name, ComponentPart.NONE_STATE),
(self.name, ComponentPart.STATE),
)
yield VariableLink(
self.name + ".output_1",
(self.name, ComponentPart.NONE_STATE),
(self.name, ComponentPart.STATE),
)
def state_base_variable_names(self):
return [
(self.name + ".input_1", VariableKind.INDEXED),
(self.name + ".output_1", VariableKind.INDEXED),
]
def state_base_variable_names(self):
return [
(self.name + ".energy", VariableKind.EXTENDED_INDEXED),
]
def add_non_state_variables(self, o_block):
input = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".input_1", input)
output = pyo.Var(o_block.T, bounds=(0, None))
o_block.add(self.name + ".output_1", output)
def add_state_variables(self, s_block):
energy = pyo.Var(s_block.T_prime, bounds=(0, None))
s_block.add(self.name + ".energy", energy)
def add_non_state_model(self, d_block, o_block):
capacity = d_block.component_dict[self.name + ".capacity"]
input = o_block.component_dict[self.name + ".input_1"]
output = o_block.component_dict[self.name + ".output_1"]
def rule(m, t):
return input[t] * self.input_efficiency <= capacity / self.e2p_in
o_block.add(
self.name + ".capacity_input_cons", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return output[t] / self.output_efficiency <= capacity / self.e2p_out
o_block.add(
self.name + ".capacity_output_cons", pyo.Constraint(o_block.T, rule=rule)
)
z_bi_flow = pyo.Var(o_block.T, domain=pyo.Binary)
o_block.add(self.name + ".z_bi_flow", z_bi_flow)
if isinstance(self.capacity, (int, float)):
def rule(m, t):
return input[t] <= z_bi_flow[t] * (
capacity / (self.e2p_in * self.input_efficiency)
)
o_block.add(
self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return output[t] <= (1.0 - z_bi_flow[t]) * (
(capacity * self.output_efficiency) / self.e2p_out
)
o_block.add(
self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule)
)
else:
def rule(m, t):
return input[t] <= z_bi_flow[t] * (
capacity.ub / (self.e2p_in * self.input_efficiency)
)
o_block.add(
self.name + ".bi_flow_cons_1", pyo.Constraint(o_block.T, rule=rule)
)
def rule(m, t):
return output[t] <= (1.0 - z_bi_flow[t]) * (
(capacity.ub * self.output_efficiency) / self.e2p_out
)
o_block.add(
self.name + ".bi_flow_cons_2", pyo.Constraint(o_block.T, rule=rule)
)
def add_state_model(self, d_block, s_block):
capacity = d_block.component_dict[self.name + ".capacity"]
energy = s_block.component_dict[self.name + ".energy"]
input = s_block.component_dict[self.name + ".input_1"]
output = s_block.component_dict[self.name + ".output_1"]
def rule(m, t):
return energy[t] <= capacity
s_block.add(self.name + ".energy_ub", pyo.Constraint(s_block.T, rule=rule))
if hasattr(self, "first_soe"):
factor_encoded, var_name = self.first_soe["value"]
var = s_block.component_dict[self.name + "." + var_name]
if isinstance(factor_encoded, (int, float)):
factor = factor_encoded
else:
raise ValueError(f"Invalid factor {factor}!")
sense = self.first_soe["sense"]
if sense == "==":
expr = energy[s_block.T_prime.first()] == factor * var
elif sense == "<=":
expr = energy[s_block.T_prime.first()] <= factor * var
elif sense == ">=":
expr = energy[s_block.T_prime.first()] >= factor * var
else:
raise ValueError(f"Invalid sense {sense}!")
s_block.add(self.name + ".fix_first_energy", pyo.Constraint(expr=expr))
def rule(m, t):
return energy[t] == energy[s_block.T_prime[s_block.T_prime.ord(t) - 1]] * (
1.0 - self.self_discharging_loss * s_block.step_size(t)
) + input[t] * self.input_efficiency * s_block.step_size(t) - output[
t
] / self.output_efficiency * s_block.step_size(
t
)
s_block.add(self.name + ".state_equation", pyo.Constraint(s_block.T, rule=rule))
if hasattr(self, "final_soe"):
factor_encoded, var = self.final_soe["value"]
if isinstance(factor_encoded, (int, float)):
factor = factor_encoded
else:
raise ValueError(f"Invalid factor {factor}!")
if var == "first_soe":
var = energy[s_block.T_prime.first()]
else:
var = s_block.component_dict[self.name + "." + var]
if self.final_soe["sense"] == "==":
expr = energy[s_block.T_prime.last()] == factor * var
elif self.final_soe["sense"] == "<=":
expr = energy[s_block.T_prime.last()] <= factor * var
elif self.final_soe["sense"] == ">=":
expr = energy[s_block.T_prime.last()] >= factor * var
else:
sense = self.final_soe["sense"]
raise ValueError(f"Invalid sense {sense}!")
s_block.add(self.name + ".fix_final_energy", pyo.Constraint(expr=expr))
from Model_Library.component.core import (
BaseBusBar,
BaseComponent,
BaseConsumption,
BaseGeneration,
BaseGrid,
BaseStorage,
ComponentCommodity,
)
from Model_Library.optimization_model import VariableKind
import pyomo.environ as pyo
class ElectricalBusBar(BaseBusBar):
def __init__(self, name, configuration):
super().__init__(name=name, commodity=ComponentCommodity.ELECTRICITY)
class BiPowerElectronic(BaseComponent):
def __init__(self, name, configuration):
super().__init__(
name=name,
commodity_1=ComponentCommodity.ELECTRICITY,
commodity_2=ComponentCommodity.ELECTRICITY,
commodity_3=ComponentCommodity.ELECTRICITY,
commodity_4=ComponentCommodity.ELECTRICITY,
configuration=configuration,
)
def _load_model(self, configuration, model):
configuration["efficiency_a"] = model["efficiency_a"]
configuration["efficiency_b"] = model["efficiency_b"]
configuration["rated_power_side_a"] = model["rated_power_side_a"]
configuration["rated_power_side_b"] = model["rated_power_side_b"]
def _load_conversions(self, configuration):
efficiency_a = configuration["efficiency_a"] # TODO move comment to docu 1 -> 2
efficiency_b = configuration["efficiency_b"] # TODO move comment to docu 2 -> 1
if configuration["rated_power_side_a"] == "output":
configuration["indep_var_1"] = "output_2"
configuration["conversion_1"] = ("output_2", "input_1", 1.0 / efficiency_a)
else:
configuration["indep_var_1"] = "input_1"
configuration["conversion_1"] = ("input_1", "output_2", efficiency_a)
if configuration["rated_power_side_b"] == "output":
configuration["indep_var_2"] = "output_1"
configuration["conversion_2"] = ("output_1", "input_2", 1.0 / efficiency_b)
else:
configuration["indep_var_2"] = "input_2"
configuration["conversion_2"] = ("input_2", "output_1", efficiency_b)
class ElectricalConsumption(BaseConsumption):
def __init__(self, name, configuration):
super().__init__(
name=name,
commodity=ComponentCommodity.ELECTRICITY,
configuration=configuration,
)
class ElectricalGrid(BaseGrid):
def __init__(self, name, configuration):
super().__init__(
name=name,
commodity=ComponentCommodity.ELECTRICITY,
configuration=configuration,
)
class Battery(BaseStorage):
def __init__(self, name, configuration):
super().__init__(
name=name,
commodity=ComponentCommodity.ELECTRICITY,
configuration=configuration,
)
...@@ -193,7 +193,12 @@ class Battery(BaseStorage): ...@@ -193,7 +193,12 @@ class Battery(BaseStorage):
class EVBattery(BaseStorage): class EVBattery(BaseStorage):
def __init__(self, name, configuration): def __init__(self, name, configuration):
super().__init__(name=name, commodity=ComponentCommodity.ELECTRICITY, configuration=configuration) super().__init__(
name=name,
commodity=ComponentCommodity.ELECTRICITY,
configuration=configuration
)
self.connected_profile = configuration.get("connected_profile", None)
def add_state_model(self, d_block, s_block): def add_state_model(self, d_block, s_block):
super().add_state_model(d_block, s_block) super().add_state_model(d_block, s_block)
...@@ -202,11 +207,12 @@ class EVBattery(BaseStorage): ...@@ -202,11 +207,12 @@ class EVBattery(BaseStorage):
energy = s_block.component_dict[self.name + ".energy"] energy = s_block.component_dict[self.name + ".energy"]
T = s_block.T T = s_block.T
# === SOC 惩罚项 ===
soc_ranges = [ soc_ranges = [
("low_heavy", 0.3, 0.1, "below"), # <30% ("low_heavy", 0.2, 0.005, "below"),
("low_light", 0.5, 0.03, "below"), # 30-50% ("low_light", 0.5, 0.001, "below"),
("high_light", 0.7, 0.03, "above"), # 70-80% ("high_light", 0.7, 0.001, "above"),
("high_heavy", 0.8, 0.1, "above"), # >80% ("high_heavy", 0.9, 0.005, "above"),
] ]
for name, threshold, factor, direction in soc_ranges: for name, threshold, factor, direction in soc_ranges:
...@@ -223,10 +229,27 @@ class EVBattery(BaseStorage): ...@@ -223,10 +229,27 @@ class EVBattery(BaseStorage):
return slack[t] >= energy[t] - thr * capacity return slack[t] >= energy[t] - thr * capacity
s_block.add(con_name, pyo.Constraint(T, rule=rule)) s_block.add(con_name, pyo.Constraint(T, rule=rule))
obj_expr = pyo.quicksum(factor * slack[t] * s_block.step_size(t) for t in T) obj_expr = pyo.quicksum(factor * slack[t] * s_block.step_size(t) for t in T)
s_block.add_general_scaled_objective(obj_expr) s_block.add_general_scaled_objective(obj_expr)
if self.connected_profile is not None:
connected = self.connected_profile.resample(s_block.dynamic).values
lookback_steps = 8
for t in T:
if t > 0 and connected[t - 1] == 1 and connected[t] == 0:
recently_drove = (connected[max(0, t - lookback_steps):t] == 0).any()
if not recently_drove:
s_block.add(
f"{self.name}.soc_departure_strict_{t}",
pyo.Constraint(expr=energy[t] >= 0.7 * capacity)
)
else:
print(f"⚠️ {self.name}: connected_profile NOT SET. No departure SOC constraints added.")
class ElectricalPassThrough(BaseComponent): class ElectricalPassThrough(BaseComponent):
def __init__(self, name, configuration): def __init__(self, name, configuration):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment