Select Git revision
BP_RWTHVRContentExamplesGameModeBase.uasset
topology.py 23.83 KiB
"""
MIT License
Copyright (c) 2023 RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from Model_Library.component.core import ComponentCommodity, ComponentKind
from Model_Library.dynamics import PeriodAggregation, TrivialArchitecture
from Model_Library.optimization_model import (
EntityResult,
OptimizationBlock,
OptimizationModel,
VariableKind,
)
from enum import Enum
import importlib
import inspect
import numpy as np
import os
import pandas as pd
import pyomo.environ as pyo
module = importlib.import_module(".", package="Model_Library.component")
component_library = {}
for name, klass in inspect.getmembers(module, inspect.isclass):
component_library[name] = klass
# class ConnectorMode(Enum):
# EMPTY = 1
# SINGLE_CONTRACTED = 2
# SINGLE = 3
# MULTIPLE = 4
class Connector:
def __init__(self, name):
self.name = name
self.flows = []
# self.other_sides = []
# def replace_flow(self, flow, replacement):
# index = [i for i in range(len(self.flows)) if self.flows[i] == flow][0]
# self.flows[index] = replacement
class Connection:
def __init__(self, in_flows, out_flows, loss_factor):
self.in_flows = in_flows
self.out_flows = out_flows
self.loss_factor = loss_factor
# def replace_flow(self, flow, replacement):
# if flow in self.in_flows:
# index = [i for i in range(len(self.in_flows)) if self.in_flows[i] == flow][
# 0
# ]
# self.in_flows[index] = replacement
# else:
# index = [
# i for i in range(len(self.out_flows)) if self.out_flows[i] == flow
# ][0]
# self.out_flows[index] = replacement
class Topology:
def __init__(self, name, configuration, members, assets):
self._name = name
self._members = members
self._assets = assets
self._components = dict()
self._connectors = dict()
for component_name, component_configuration in configuration[
"components"
].items():
component_type = component_configuration["type"]
if component_type == "MemberAdapter":
component = component_library[component_type](
component_name, component_configuration, members
)
elif component_type == "AssetAdapter":
component = component_library[component_type](
component_name, component_configuration, assets
)
else:
component = component_library[component_type](
component_name, component_configuration
)
self._components[component_name] = component
(
input_commodity_1,
input_commodity_2,
output_commodity_1,
output_commodity_2,
) = component.get_input_output_commodities()
if input_commodity_1 != None:
self._connectors[component_name + ".input_1"] = Connector(
component_name + ".input_1"
)
if input_commodity_2 != None:
self._connectors[component_name + ".input_2"] = Connector(
component_name + ".input_2"
)
if output_commodity_1 != None:
self._connectors[component_name + ".output_1"] = Connector(
component_name + ".output_1"
)
if output_commodity_2 != None:
self._connectors[component_name + ".output_2"] = Connector(
component_name + ".output_2"
)
self._flows = []
self._connections = []
self._connections_map = dict()
for connection in configuration["connections"]:
index = len(self._connections)
if connection["type"] == "OneToOne":
flow_from = connection["from"]
in_flow = flow_from + "_" + str(index)
self._flows.append(in_flow)
connector_from = self._connectors[flow_from]
connector_from.flows.append(in_flow)
# connector_from.other_sides.append(index)
in_flows = [in_flow]
flow_to = connection["to"]
out_flow = str(index) + "_" + flow_to
self._flows.append(out_flow)
connector_to = self._connectors[flow_to]
connector_to.flows.append(out_flow)
# connector_to.other_sides.append(index)
out_flows = [out_flow]
self._connections_map[flow_from, flow_to] = index
elif connection["type"] == "Sum":
in_flows = []
out_flows = []
for member in connection["members"]:
if "output" in member:
flow = member + "_" + str(index)
self._flows.append(flow)
in_flows.append(flow)
else:
flow = str(index) + "_" + member
self._flows.append(flow)
out_flows.append(flow)
connector = self._connectors[member]
connector.flows.append(flow)
# connector.other_sides.append(index)
if "loss_factor" in connection:
loss_factor = connection["loss_factor"]
else:
loss_factor = 0.0
self._connections.append(Connection(in_flows, out_flows, loss_factor))
# self._removed_flows = dict()
# for connector in self._connectors.values():
# if len(connector.flows) == 0:
# connector.mode = ConnectorMode.EMPTY
# elif len(connector.flows) > 1:
# connector.mode = ConnectorMode.MULTIPLE
# else:
# # the connector is single, but if we can contract depends on the other side
# other_side = connector.other_sides[0]
# if isinstance(other_side, Connector):
# # the other side is a connector
# # test if the connector on the other side has been assigned a mode
# if hasattr(other_side, "mode"):
# # the connector on the other side has been assigend a mode, so it could be contracted
# if other_side.mode != ConnectorMode.SINGLE_CONTRACTED:
# # it is not, we can contract
# connector.mode = ConnectorMode.SINGLE_CONTRACTED
# else:
# # it is, we cannot contract
# connector.mode = ConnectorMode.SINGLE
# else:
# # the connector on the other side has not been assigend a mode, so it is not contracted, so we can contract
# connector.mode = ConnectorMode.SINGLE_CONTRACTED
# else:
# # the other side is a sum connection so we can contract
# other_side = self._sum_connections[other_side]
# connector.mode = ConnectorMode.SINGLE_CONTRACTED
# # contract the connector
# if connector.mode == ConnectorMode.SINGLE_CONTRACTED:
# # remove flow from topology
# flow_to_remove = connector.flows[0]
# self._flows.remove(flow_to_remove)
# self._removed_flows[flow_to_remove] = connector.name
# # replace flow in both connectors
# connector.flows[0] = connector.name
# other_side.replace_flow(flow_to_remove, connector.name)
if "additional_model_logic" in configuration:
self._additional_model_logic = configuration["additional_model_logic"]
else:
self._additional_model_logic = dict()
if "planning_horizon" in configuration:
self._planning_horizon = configuration["planning_horizon"]
else:
self._planning_horizon = 1
if "price_change_factor" in configuration:
self._price_change_factor = configuration["price_change_factor"]
else:
self._price_change_factor = 1.0
if "interest_factor" in configuration:
self._interest_factor = configuration["interest_factor"]
else:
self._interest_factor = 1.0
self._results = dict()
self._last_result_key = None
def get_components(self, kind=ComponentKind.ALL, commodity=ComponentCommodity.ALL):
return (
component
for component in self._components.values()
if component.match(kind=kind, commodity=commodity)
)
def optimize(self, key, architecture, strategy):
model = self.build_model(architecture, strategy)
options = dict()
options["MIPGap"] = 0.01
options["Presolve"] = 2
options["TimeLimit"] = 200
model.solve(options, False)
if not model.is_ok():
raise RuntimeError("Model is infeasible or unbounded!")
self.create_empty_entity_result(key, architecture)
self.extract_result(model, key)
def build_model(self, architecture, strategy):
model = OptimizationModel(self._name, architecture.model_dynamic())
self.fill_block(model, architecture, strategy)
model.collect_objectives()
return model
def fill_block(self, block, architecture, strategy):
for asset in self._assets.values():
asset_block = OptimizationBlock(asset._name, architecture.model_dynamic())
block.add(asset._name, asset_block)
asset.fill_block(asset_block, architecture, strategy)
if strategy is None:
objectives = {"objective": []}
elif isinstance(strategy, list):
objectives = {"objective": strategy}
elif isinstance(strategy, dict):
objectives = strategy
else:
raise ValueError(f"Invalid strategy type!")
if isinstance(architecture, TrivialArchitecture):
design_objectives = self.fill_design_block(block, objectives)
operational_objectives = self.fill_operational_block(
block, block, objectives
)
w = (365.0 * 24.0) / (np.sum(block.dynamic.step_lengths()) / 3600.0)
for name in objectives:
scaled_expression, one_time_expression = operational_objectives[name]
objective = (
design_objectives[name]
+ w
* (
pyo.quicksum(term for term in block.general_scaled_expression)
+ scaled_expression
)
+ one_time_expression
)
block.add_objective(name, objective)
elif isinstance(architecture, PeriodAggregation):
design_objectives = self.fill_design_block(block, objectives)
period_blocks = []
operational_objectives = []
for period_index, period_dynamic in enumerate(architecture.period_dynamics):
period_block = OptimizationBlock(str(period_index), period_dynamic)
block.add(str(period_index), period_block)
period_blocks.append(period_block)
operational_objectives.append(
self.fill_operational_block(block, period_block, objectives)
)
w = (365.0 * 24.0) / (np.sum(block.dynamic.step_lengths()) / 3600.0)
for name in objectives:
scaled_expression = 0.0
one_time_expression = 0.0
for period_index in range(architecture.number_of_periods()):
(
period_scaled_expression,
period_one_time_expression,
) = operational_objectives[period_index][name]
scaled_expression += architecture.n_p[period_index] * (
period_scaled_expression
+ pyo.quicksum(
term
for term in period_blocks[
period_index
].general_scaled_expression
)
)
one_time_expression += (
architecture.n_p[period_index] * period_one_time_expression
)
objective = design_objectives[name] + w * (
scaled_expression + (1.0 / architecture.n) * one_time_expression
)
block.add_objective(name, objective)
else:
raise ValueError(f"Invalid architecture type {type(architecture)}")
def fill_design_block(self, d_block, objectives):
for component in self._components:
self._components[component].build_design_model(d_block)
for logic_name, logic in self._additional_model_logic.items():
if logic["type"] == "EqualCapacity":
components = logic["components"]
for i in range(len(components) - 1):
def rule(m):
return (
d_block.component_dict[components[i] + ".capacity"]
== d_block.component_dict[components[i + 1] + ".capacity"]
)
d_block.add(
logic_name + "_cons_" + str(i), pyo.Constraint(rule=rule)
)
design_objectives = dict()
for name, objective in objectives.items():
expression = 0.0
if "annuity" in objective:
annuity = 0.0
for component in self._components.values():
annuity += component.design_annuity(
d_block,
self._planning_horizon,
self._price_change_factor,
self._interest_factor,
)
expression += annuity
design_objectives[name] = expression
return design_objectives
def fill_operational_block(self, d_block, o_block, objectives):
for component in self._components:
self._components[component].build_operational_model(d_block, o_block)
for flow in self._flows:
o_block.add(flow, pyo.Var(o_block.T, bounds=(0, None)))
for connector in self._connectors.values():
connector_var = o_block.component_dict[connector.name]
def rule(m, t):
return connector_var[t] == pyo.quicksum(
o_block.component_dict[flow][t] for flow in connector.flows
)
o_block.add(connector.name + "_sum", pyo.Constraint(o_block.T, rule=rule))
for i, connection in enumerate(self._connections):
def rule(m, t):
return pyo.quicksum(
o_block.component_dict[out_flow][t]
for out_flow in connection.out_flows
) == pyo.quicksum(
o_block.component_dict[in_flow][t]
for in_flow in connection.in_flows
) * (
1.0 - connection.loss_factor
)
o_block.add(str(i) + "_sum", pyo.Constraint(o_block.T, rule=rule))
for logic_name, logic in self._additional_model_logic.items():
if logic["type"] == "ConnectorEnable":
enable = logic["enable"]
for i, connector in enumerate(logic["connectors"]):
connector_var = o_block.component_dict[connector]
def rule(m, t):
if enable[t] == 0:
return connector_var[t] == 0
else:
return pyo.Constraint.Skip
o_block.add(
logic_name + "_cons_" + str(i),
pyo.Constraint(o_block.T, rule=rule),
)
if logic["type"] == "ConnectionEnable":
enable = logic["enable"]
for i, connection in enumerate(logic["connections"]):
flow_from = connection["from"]
flow_to = connection["to"]
if (flow_from, flow_to) in self._connections_map:
connection = self._connections[
self._connections_map[flow_from, flow_to]
]
# if flow in self._removed_flows:
# flow = self._removed_flows[flow]
flow_var = o_block.component_dict[connection.in_flows[0]]
def rule(m, t):
if enable[t] == 0:
return flow_var[t] == 0
else:
return pyo.Constraint.Skip
o_block.add(
logic_name + "_cons_" + str(i),
pyo.Constraint(o_block.T, rule=rule),
)
operational_objectives = dict()
for name, objective in objectives.items():
scaled_expression = 0.0
one_time_expression = 0.0
if "annuity" in objective:
annuity = 0.0
for component in self._components.values():
annuity += component.operational_annuity(
o_block,
self._planning_horizon,
self._price_change_factor,
self._interest_factor,
)
scaled_expression += annuity
if "peak_power_cost" in objective:
peak_power_cost = 0.0
for component in self._components.values():
peak_power_cost += component.peak_power_cost(o_block)
T = self._planning_horizon
r = self._price_change_factor
q = self._interest_factor
if q == 1.0:
a = 1.0 / T
else:
a = (q - 1.0) / (1.0 - q ** (-T))
if q == r:
b = T / q
else:
b = (1.0 - (r / q) ** T) / (q - r)
one_time_expression += peak_power_cost * a * b
if "co2_emissions" in objective:
co2_emissions = 0.0
for component in self._components.values():
co2_emissions += component.co2_emissions(o_block)
scaled_expression += co2_emissions
operational_objectives[name] = (scaled_expression, one_time_expression)
return operational_objectives
def create_empty_entity_result(self, key, architecture):
for asset in self._assets.values():
asset.create_empty_entity_result(key, architecture)
if isinstance(architecture, TrivialArchitecture):
base_variable_names = []
for component in self._components:
base_variable_names.extend(
self._components[component].design_base_variable_names()
)
base_variable_names.extend(
self._components[component].operational_base_variable_names()
)
for flow in self._flows:
base_variable_names.append((flow, VariableKind.INDEXED))
result = EntityResult(architecture)
result.register_dynamic(architecture.dynamic, "", base_variable_names)
elif isinstance(architecture, PeriodAggregation):
design_base_variable_names = []
for component in self._components:
design_base_variable_names.extend(
self._components[component].design_base_variable_names()
)
result = EntityResult(architecture)
result.register_dynamic(
architecture.dynamic, "", design_base_variable_names
)
operational_base_variable_names = []
for component in self._components:
operational_base_variable_names.extend(
self._components[component].operational_base_variable_names()
)
for flow in self._flows:
operational_base_variable_names.append((flow, VariableKind.INDEXED))
result.register_dynamic(
architecture.value_dynamic,
"aggregated",
operational_base_variable_names,
)
else:
raise ValueError(f"Invalid architecture type {type(architecture)}")
result.compile()
self._results[key] = result
self._last_result_key = key
def extract_result(self, block, key):
for asset in self._assets.values():
asset_block = block.blocks[asset._name]
asset.extract_result(asset_block, key)
result = self._results[key]
if isinstance(result.architecture, TrivialArchitecture):
result.extract_result(block, result.architecture.dynamic)
elif isinstance(result.architecture, PeriodAggregation):
result.extract_result(block, result.architecture.dynamic)
for period_index in range(result.architecture.number_of_periods()):
period_block = block.blocks[str(period_index)]
result.extract_result(period_block, result.architecture.value_dynamic)
else:
raise ValueError(f"Invalid architecture type {type(result.architecture)}")
def save_results(self, path, keys=None):
for asset in self._assets.values():
asset.save_results(path, keys)
if keys is None:
keys = [self._last_result_key]
elif isinstance(keys, str):
keys = [keys]
if not os.path.exists(path):
os.makedirs(path)
with pd.ExcelWriter(os.path.join(path, self._name + ".xlsx")) as writer:
for key in keys:
self._results[key].to_excel(writer, sheet_name=key)
class Prosumer(Topology):
def __init__(self, name, configuration):
super().__init__(name, configuration, dict(), dict())
class DistrictAsset(Topology):
def __init__(self, name, configuration):
super().__init__(name, configuration, dict(), dict())
class District(Topology):
def __init__(self, name, configuration, prosumers, district_assets):
super().__init__(name, configuration, prosumers, district_assets)
class CityAsset(Topology):
def __init__(self, name, configuration):
super().__init__(name, configuration, dict(), dict())
class City(Topology):
def __init__(self, name, configuration, districts, city_assets):
super().__init__(name, configuration, districts, city_assets)