From 020cff8f291881587d9c1d3ef953cf9a6a15e767 Mon Sep 17 00:00:00 2001 From: "christoph.von.oy" <christoph.von.oy@rwth-aachen.de> Date: Fri, 20 Dec 2024 15:58:39 +0100 Subject: [PATCH] Removed old Architecture --- component/adapter.py | 5 +- dynamics.py | 522 +------------------------------------------ topology.py | 370 +----------------------------- 3 files changed, 17 insertions(+), 880 deletions(-) diff --git a/component/adapter.py b/component/adapter.py index 0120523..5b8e0aa 100644 --- a/component/adapter.py +++ b/component/adapter.py @@ -30,7 +30,6 @@ from Model_Library.component.core import ( ComponentLink, ComponentPart, ) -from Model_Library.dynamics import resample_variable from Model_Library.optimization_model import VariableKind import pyomo.environ as pyo @@ -165,13 +164,13 @@ class MemberAdapter(AbstractComponent): self.member, self.grid.name + ".output_1", "into_member", - (self.name, ComponentPart.NONE_STATE) + (self.name, ComponentPart.NONE_STATE), ) yield MemberLink( self.member, self.grid.name + ".input_1", "from_member", - (self.name, ComponentPart.NONE_STATE) + (self.name, ComponentPart.NONE_STATE), ) def non_state_base_variable_names(self): diff --git a/dynamics.py b/dynamics.py index 50ea22c..e3b7f41 100644 --- a/dynamics.py +++ b/dynamics.py @@ -811,11 +811,6 @@ def compute_assignment_common_reference( class Assignment(abc.ABC): - # - @abc.abstractmethod - def is_view_resample(self) -> bool: - pass - # proviedes a view indexed by target into the values indexed by source @abc.abstractmethod def resample( @@ -828,19 +823,6 @@ class Assignment(abc.ABC): ): # values: type hinting a np-array, type hinting a np-array pass - # adds the values indexed by source into the target values indexed by target - @abc.abstractmethod - def resample_into( - self, - values, - source_start: int, - source_end: int, - target_values, - target_start: int, - target_end: int, - ): # values: type hinting a np-array, target_values: type hinting a np-array - pass - # generates expressions representing the resampling @abc.abstractmethod def resample_variable( @@ -853,9 +835,6 @@ class AssignmentSame(Assignment): def __init__(self): pass - def is_view_resample(self) -> bool: - return True - def resample( self, values, @@ -870,23 +849,6 @@ class AssignmentSame(Assignment): raise IndexError("Source values do not cover all target time steps!") return values[:, target_start - source_start : target_end - source_start] - def resample_into( - self, - values, - source_start: int, - source_end: int, - target_values, - target_start: int, - target_end: int, - ): # values: type hinting a np-array, target_values: type hinting a np-array - if source_start < target_start: - raise IndexError("Target values do not cover all source time steps!") - if target_end < source_end: - raise IndexError("Target values do not cover all source time steps!") - target_values[ - :, source_start - target_start : source_end - target_start - ] = values - def resample_variable( self, variable, source_start, source_end, target_start, target_end ): @@ -939,9 +901,6 @@ class AssignmentToBacked(Assignment): self.expressions ) - def is_view_resample(self) -> bool: - return False - def resample( self, values, @@ -970,38 +929,6 @@ class AssignmentToBacked(Assignment): target_values[:, local_target_position] = acc return target_values - def resample_into( - self, - values, - source_start: int, - source_end: int, - target_values, - target_start: int, - target_end: int, - ): # values: type hinting a np-array, target_values: type hinting a np-array - source_i_start = self.indices[source_start] - source_i_end = self.indices[source_end] - target_i_start = self.target_indices[target_start] - target_i_end = self.target_indices[target_end] - if source_i_start < target_i_start: - raise IndexError("Target values do not cover all source time steps!") - if target_i_end < source_i_end: - raise IndexError("Target values do not cover all source time steps!") - first_complete_expression = self.first_complete_expression[source_start] - for local_target_position, expression in enumerate( - self.expressions[ - max(first_complete_expression, target_start) : min( - self.last_complete_expression[source_end], target_end - ) - ] - ): - acc = 0.0 - for source_position, factor in expression: - acc += factor * values[:, source_position - source_start] - target_values[ - :, local_target_position + first_complete_expression - target_start - ] = acc - def resample_variable( self, variable, source_start, source_end, target_start, target_end ): @@ -1056,9 +983,6 @@ class AssignmentFromBacked(Assignment): self.source_to_target[source_position] = first_target_position self.source_to_target[-1] = self.distributions[-1][1] - def is_view_resample(self) -> bool: - return True - def resample( self, values, @@ -1079,36 +1003,6 @@ class AssignmentFromBacked(Assignment): :, self.distribution_positions[target_start:target_end] - source_start ] - def resample_into( - self, - values, - source_start: int, - source_end: int, - target_values, - target_start: int, - target_end: int, - ): # values: type hinting a np-array, target_values: type hinting a np-array - source_i_start = self.indices[source_start] - source_i_end = self.indices[source_end] - target_i_start = self.target_indices[target_start] - target_i_end = self.target_indices[target_end] - if source_i_start < target_i_start: - raise IndexError("Target values do not cover all source time steps!") - if target_i_end < source_i_end: - raise IndexError("Target values do not cover all source time steps!") - target_values[ - :, - self.source_to_target[source_start] - - target_start : self.source_to_target[source_end] - - target_start, - ] = values[ - :, - self.distribution_positions[ - self.source_to_target[source_start] : self.source_to_target[source_end] - ] - - source_start, - ] - def resample_variable( self, variable, source_start, source_end, target_start, target_end ): @@ -1175,9 +1069,6 @@ class AssignmentCommon(Assignment): last_existing_expression + 1 ) - def is_view_resample(self) -> bool: - return False - def resample( self, values, @@ -1206,38 +1097,6 @@ class AssignmentCommon(Assignment): target_values[:, local_target_position] = acc return target_values - def resample_into( - self, - values, - source_start: int, - source_end: int, - target_values, - target_start: int, - target_end: int, - ): # values: type hinting a np-array, target_values: type hinting a np-array - source_i_start = self.indices[source_start] - source_i_end = self.indices[source_end] - target_i_start = self.target_indices[target_start] - target_i_end = self.target_indices[target_end] - if source_i_start < target_i_start: - raise IndexError("Target values do not cover all source time steps!") - if target_i_end < source_i_end: - raise IndexError("Target values do not cover all source time steps!") - first_complete_expression = self.first_complete_expression[source_start] - for local_target_position, expression in enumerate( - self.expressions[ - max(first_complete_expression, target_start) : min( - self.last_complete_expression[source_end], target_end - ) - ] - ): - acc = 0.0 - for source_position, factor in expression: - acc += factor * values[:, source_position - source_start] - target_values[ - :, local_target_position + first_complete_expression - target_start - ] = acc - def resample_variable( self, variable, source_start, source_end, target_start, target_end ): @@ -1279,9 +1138,6 @@ class AssignmentWrapper: self.target_start = target_start self.target_end = target_end - def is_view_resample(self): - return self.assignment.is_view_resample() - def resample( self, values ): # values: type hinting a np-array, type hinting a np-array @@ -1293,20 +1149,6 @@ class AssignmentWrapper: self.target_end, ) - def resample_into( - self, - values, - target_values, - ): # values: type hinting a np-array, target_values: type hinting a np-array - return self.assignment.resample_into( - values, - self.source_start, - self.source_end, - target_values, - self.target_start, - self.target_end, - ) - def resample_variable(self, variable): return self.assignment.resample_variable( variable, @@ -1317,345 +1159,6 @@ class AssignmentWrapper: ) -# hold information about the architecture of a model or result -class Architecture(abc.ABC): - # returns the dynamic that a model constructed using this architecture should hold - @abc.abstractmethod - def model_dynamic(self) -> Dynamic: - pass - - -class TrivialArchitecture(Architecture): - def __init__(self, dynamic): - self.dynamic = dynamic - - def model_dynamic(self) -> Dynamic: - return self.dynamic - - -class PeriodAggregation(Architecture): - # dynamic: the dynamic that is aggregated - # periods: the segment lengths of all periods - # period_order: the order the periods have to be arranged in order to reconstruct the original dynamic - def __init__( - self, dynamic: Dynamic, periods: List[List[int]], period_order: List[int] - ): - # sanity check 1: every time step of dynamic has to have the same length - step_length = dynamic.step_lengths()[0] - if np.any(dynamic.step_lengths() != step_length): - raise ValueError("Every time step of dynamic has to have the same length!") - # sanity check 2: every period has to have the same length - period_length = step_length * sum(segment_size for segment_size in periods[0]) - if any( - step_length * sum(segment_size for segment_size in period) != period_length - for period in periods[1:] - ): - raise ValueError("Every period has to have the same length!") - # sanity check 3: the time step length has to divide the period length - number_of_time_steps_per_period = int(period_length / step_length) - if number_of_time_steps_per_period * step_length != period_length: - raise ValueError("The time step lengh has to divide the period length!") - # sanity check 4: the period length has to divide the dynamic length into the number of elements in the period order - dynamic_length = np.sum(dynamic.step_lengths()) - if period_length * len(period_order) != dynamic_length: - raise ValueError( - "The period length has to divide the dynamic length into the number of elements in the period order!" - ) - - self.dynamic = dynamic - root_dynamic = DynamicTree( - np.full(number_of_time_steps_per_period, step_length, dtype=int) - ).root() - self.period_dynamics = [] - self.n_p = [] - running_index = 0 - offsets = [] - for i, period in enumerate(periods): - index = 0 - period_indices = [0] - for segment_size in period: - index += segment_size - period_indices.append(index) - self.period_dynamics.append( - PeriodDynamic( - self, i, root_dynamic.sub_dynamic(np.array(period_indices)) - ) - ) - self.n_p.append( - sum(1 for period_index in period_order if period_index == i) - ) - offsets.append((running_index, running_index + len(period))) - running_index += len(period) - self.n = len(period_order) - - self.value_dynamic = AggregatedDynamic( - self, len(periods), running_index, offsets - ) - - # sanity check 5: all elements in period order are valid period indices (above n_p[i] is the number of times that period index i appears in period order -> sum(n_p) == len(period_order)) - if sum(n_p for n_p in self.n_p) != len(period_order): - raise ValueError( - "All elements in period order have to be valid period indices!" - ) - - def number_of_periods(self) -> int: - return len(self.period_dynamics) - - def model_dynamic(self) -> Dynamic: - return self.dynamic - - -class AggregatedDynamic(Dynamic): - def __init__( - self, - period_aggregation: PeriodAggregation, - number_of_periods: int, - length: int, - offsets: List[Tuple[int, int]], - ): - self.period_aggregation = period_aggregation - self.number_of_periods = number_of_periods - self.length = length - self.offsets = offsets - - def number_of_steps(self) -> int: - raise NotImplementedError - - def shape(self) -> int: - return self.length - - def first_state_shape(self) -> int: - return self.number_of_periods - - def pandas_index(self) -> pd.Index: - first_level = np.empty(self.shape(), dtype=int) - second_level = np.empty(self.shape(), dtype=int) - for i, (start, end) in enumerate(self.offsets): - first_level[start:end] = i - second_level[start:end] = np.arange(0, end - start, dtype=int) - return pd.MultiIndex.from_arrays([first_level, second_level]) - - def first_state_pandas_index(self) -> pd.Index: - first_level = np.arange(0, self.first_state_shape(), dtype=int) - second_level = np.full(self.first_state_shape(), -1, dtype=int) - return pd.MultiIndex.from_arrays([first_level, second_level]) - - def step_size(self, position) -> float: - raise NotImplementedError - - def step_lengths(self): # type hinting a np-array of ints - raise NotImplementedError - - def _all_indices(self): # type hinting a np-array of ints - raise NotImplementedError - - -class PeriodDynamic(Dynamic): - # period_aggregation: the period aggregation that is period originates from - # period_index: the index of this period in the period aggregation - # dynamic: the dynamic of this period - def __init__( - self, - period_aggregation: PeriodAggregation, - period_index: int, - dynamic: BackedDynamic, - ): - self.period_aggregation = period_aggregation - self.period_index = period_index - self.dynamic = dynamic - - def number_of_steps(self) -> int: - return self.dynamic.number_of_steps() - - def shape(self) -> int: - return self.dynamic.shape() - - def first_state_shape(self) -> int: - return self.dynamic.first_state_shape() - - def pandas_index(self) -> pd.Index: - return self.dynamic.pandas_index() - - def first_state_pandas_index(self) -> pd.Index: - return self.dynamic.first_state_pandas_index() - - def step_size(self, position) -> float: - return self.dynamic.step_size(position) - - def step_lengths(self): # type hinting a np-array of ints - return self.dynamic.step_lengths() - - def _all_indices(self): # type hinting a np-array of ints - return self.dynamic._all_indices() - - -def is_view_resample(dynamic: Dynamic, target_dynamic: Dynamic) -> bool: - if dynamic == target_dynamic: - return True - elif isinstance(dynamic, TreeDynamic) and isinstance(target_dynamic, TreeDynamic): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to be part of the same dynamic tree!") - return dynamic.dynamic_tree.get_assignment( - dynamic, target_dynamic - ).is_view_resample() - elif isinstance(dynamic, AggregatedDynamic) and isinstance( - target_dynamic, PeriodDynamic - ): - if dynamic.period_aggregation != target_dynamic.period_aggregation: - raise ValueError( - f"The aggregated dynamic and the period dynamic have to be part of the same period aggregation!" - ) - return True - else: - raise ValueError( - f"Invalid dynamic type combination {type(dynamic)} -> {type(target_dynamic)}!" - ) - - -def resample( - values, dynamic: Dynamic, target_dynamic: Dynamic -): # values: type hinting a np-array, type hinting a np-array - if dynamic == target_dynamic: - return values - elif isinstance(dynamic, TreeDynamic) and isinstance(target_dynamic, TreeDynamic): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to be part of the same dynamic tree!") - return dynamic.dynamic_tree.get_assignment(dynamic, target_dynamic).resample( - values - ) - elif isinstance(dynamic, AggregatedDynamic) and isinstance( - target_dynamic, PeriodDynamic - ): - if dynamic.period_aggregation != target_dynamic.period_aggregation: - raise ValueError( - f"The aggregated dynamic and the period dynamic have to be part of the same period aggregation!" - ) - offset = dynamic.offsets[target_dynamic.period_index] - return values[:, offset[0] : offset[1]] - else: - raise ValueError( - f"Invalid dynamic type combination {type(dynamic)} -> {type(target_dynamic)}!" - ) - - -def resample_into( - values, dynamic: Dynamic, target_values, target_dynamic: Dynamic -): # values: type hinting a np-array, target_values: type hinting a np-array - if dynamic == target_dynamic: - target_values[:] = values - elif isinstance(dynamic, TreeDynamic) and isinstance(target_dynamic, TreeDynamic): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to be part of the same dynamic tree!") - dynamic.dynamic_tree.get_assignment(dynamic, target_dynamic).resample_into( - values, target_values - ) - elif isinstance(dynamic, PeriodDynamic) and isinstance( - target_dynamic, AggregatedDynamic - ): - if dynamic.period_aggregation != target_dynamic.period_aggregation: - raise ValueError( - f"The period dynamic and the aggregated dynamic have to be part of the same period aggregation!" - ) - offset = target_dynamic.offsets[dynamic.period_index] - target_values[:, offset[0] : offset[1]] = values - else: - raise ValueError( - f"Invalid dynamic type combination {type(dynamic)} -> {type(target_dynamic)}!" - ) - - -def resample_first_state_into( - values, dynamic: Dynamic, target_values, target_dynamic: Dynamic -): # values: type hinting a np-array, target_values: type hinting a np-array - if dynamic == target_dynamic: - target_values[:] = values - elif isinstance(dynamic, TreeDynamic) and isinstance(target_dynamic, TreeDynamic): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to be part of the same dynamic tree!") - target_values[:] = values - elif isinstance(dynamic, PeriodDynamic) and isinstance( - target_dynamic, AggregatedDynamic - ): - if dynamic.period_aggregation != target_dynamic.period_aggregation: - raise ValueError( - f"The period dynamic and the aggregated dynamic have to be part of the same period aggregation!" - ) - target_values[:, dynamic.period_index] = values[ - :, 0 - ] # TODO: the first dimension of values and target_values is 0, somehow values[:,0] is necessary, and values on its own does not work - - -def resample_variable(variable, dynamic: Dynamic, target_dynamic: Dynamic): - if dynamic == target_dynamic: - return variable - elif isinstance(dynamic, TreeDynamic) and isinstance(target_dynamic, TreeDynamic): - return dynamic.dynamic_tree.get_assignment( - dynamic, target_dynamic - ).resample_variable(variable) - else: - raise ValueError( - f"Invalid dynamic type combination {type(dynamic)} -> {type(target_dynamic)}!" - ) - - -class Profile: - def __init__(self, values, dynamic: Dynamic): # values: type hinting np-array - if isinstance(values[0], np.number): - self.is_bool = False - elif isinstance(values[0], np.bool_): - self.is_bool = True - else: - raise ValueError(f"Invalid data type {type(values[0])}!") - self.values = values - self.dynamic = dynamic - - @staticmethod - def from_csv( - path, dynamic: Dynamic - ): # path: [path to a file], type hinting dict[str, Profile]] - if isinstance(dynamic, (TrivialDynamic, TreeDynamic)): - df = pd.read_csv(path) - if len(df) != dynamic.number_of_steps(): - raise ValueError( - f"The number of rows in the csv file and the number of steps in the dynamic have to be the same!" - ) - return { - column: Profile(df[column].values, dynamic) for column in df.columns - } - elif isinstance(dynamic, AggregatedDynamic): - df = pd.read_csv(path, index_col=[0, 1]) - if len(df.index.levels[0]) != dynamic.number_of_periods: - raise ValueError( - f"The number of periods in the csv file and the number of periods in the dynamic have the be the same!" - ) - if any( - len(df.loc[period_index]) != period_dynamic.number_of_steps() - for period_index, period_dynamic in enumerate( - dynamic.period_aggregation.period_dynamics - ) - ): - raise ValueError( - f"The number of rows in the csv file for a period and the number of segments in that period have the be the same!" - ) - return { - column: Profile(df[column].values, dynamic) for column in df.columns - } - else: - raise ValueError(f"Invalid dynamic type {type(dynamic)}!") - - def resample(self, dynamic: Dynamic) -> "Profile": - if dynamic == self.dynamic: - return self - if self.is_bool and not is_view_resample(self.dynamic, dynamic): - raise ValueError( - f"A bool profile can only be resampled with a view resampling!" - ) - return Profile( - resample(np.expand_dims(self.values, axis=0), self.dynamic, dynamic)[0], - dynamic, - ) - - def test_single_resampling( dynamic: TreeDynamic, target_dynamic: TreeDynamic, @@ -1670,27 +1173,15 @@ def test_single_resampling( target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps()) resample_possible = i_start <= target_i_start and target_i_end <= i_end try: - result = resample(np.expand_dims(values, axis=0), dynamic, target_dynamic)[0] + result = dynamic.dynamic_tree.get_assignment(dynamic, target_dynamic).resample( + np.expand_dims(values, axis=0) + )[0] except Exception as error: if str(error) == "Source values do not cover all target time steps!": f.write("olap # ") else: f.write(str(error) + " # ") - resample_into_possible = target_i_start <= i_start and i_end <= target_i_end - try: - result_into = np.full( - (1, target_dynamic.number_of_steps()), np.nan, dtype=float - ) - resample_into( - np.expand_dims(values, axis=0), dynamic, target_dynamic, result_into - ) - except Exception as error: - if str(error) == "Target values do not cover all source time steps!": - f.write("olap # ") - else: - f.write(str(error) + " # ") - result_into = result_into[0] - if not resample_possible and not resample_into_possible: + if not resample_possible: return target_values = np.full(target_dynamic.number_of_steps(), np.nan, dtype=float) # resample from source to ancestor @@ -1723,11 +1214,6 @@ def test_single_resampling( f.write("fine # ") else: f.write("math # ") - if resample_into_possible: - if all(np.isclose(target_values, result_into, equal_nan=True)): - f.write("fine # ") - else: - f.write("math # ") ancestor_values[:] = np.nan diff --git a/topology.py b/topology.py index b3b13e0..37b0263 100644 --- a/topology.py +++ b/topology.py @@ -38,7 +38,6 @@ from Model_Library.component.core import ( VariableLink, ) from Model_Library.component.adapter import AssetLink, MemberLink -from Model_Library.dynamics import PeriodAggregation, TrivialArchitecture from Model_Library.optimization_model import ( EntityResult, OptimizationBlock, @@ -284,7 +283,7 @@ class Topology: for name in sub_configurations } - model = self.build_model_new( + model = self.build_model( architecture, component_iden_strs, strategy, @@ -301,23 +300,23 @@ class Topology: if not model.is_ok(): raise RuntimeError("Model is infeasible or unbounded!") - self.create_empty_entity_result_new( + self.create_empty_entity_result( key, architecture, component_iden_strs, _extract_sub_configuration(sub_configurations, [0, 1, 2]), ) - self.extract_result_new( + self.extract_result( model, key, _extract_sub_configuration(sub_configurations, [0]) ) - def build_model_new( + def build_model( self, architecture, component_ident_strs, strategy, sub_model_configurations ): model = OptimizationModel(self._name, architecture.get_dynamic("")) - self._build_model_new( + self._build_model( architecture, component_ident_strs, strategy, @@ -330,7 +329,7 @@ class Topology: return model - def _build_model_new( + def _build_model( self, architecture, input_component_ident_strs, @@ -345,7 +344,7 @@ class Topology: sub_strategy = sub_model_configurations[name][2] sub_sub_model_configutation = sub_model_configurations[name][3] sub_block = block.add_block(name, sub_architecture.get_dynamic("")) - asset._build_model_new( + asset._build_model( sub_architecture, sub_component_ident_strs, sub_strategy, @@ -895,270 +894,7 @@ class Topology: return objective_exprs - def build_model(self, architecture, strategy): - model = OptimizationModel(self._name, architecture.model_dynamic()) - - self.fill_block(model, architecture, strategy) - - model.collect_objectives() - - return model - - def fill_block(self, block, architecture, strategy): - for asset in self._assets.values(): - asset_block = OptimizationBlock(asset._name, architecture.model_dynamic()) - block.add(asset._name, asset_block) - asset.fill_block(asset_block, architecture, strategy) - - if strategy is None: - objectives = {"objective": []} - elif isinstance(strategy, list): - objectives = {"objective": strategy} - elif isinstance(strategy, dict): - objectives = strategy - else: - raise ValueError(f"Invalid strategy type!") - - if isinstance(architecture, TrivialArchitecture): - design_objectives = self.fill_design_block(block, objectives) - - operational_objectives = self.fill_operational_blocks( - block, block, block, objectives - ) - - w = (365.0 * 24.0) / (np.sum(block.dynamic.step_lengths()) / 3600.0) - - for name in objectives: - scaled_expression, one_time_expression = operational_objectives[name] - objective = ( - design_objectives[name] - + w - * ( - pyo.quicksum(term for term in block.general_scaled_expression) - + scaled_expression - ) - + one_time_expression - ) - block.add_objective(name, objective) - - elif isinstance(architecture, PeriodAggregation): - design_objectives = self.fill_design_block(block, objectives) - - period_blocks = [] - operational_objectives = [] - for period_index, period_dynamic in enumerate(architecture.period_dynamics): - period_block = OptimizationBlock(str(period_index), period_dynamic) - block.add(str(period_index), period_block) - period_blocks.append(period_block) - - operational_objectives.append( - self.fill_operational_blocks( - block, period_block, period_block, objectives - ) - ) - - w = (365.0 * 24.0) / (np.sum(block.dynamic.step_lengths()) / 3600.0) - - for name in objectives: - scaled_expression = 0.0 - one_time_expression = 0.0 - for period_index in range(architecture.number_of_periods()): - ( - period_scaled_expression, - period_one_time_expression, - ) = operational_objectives[period_index][name] - scaled_expression += architecture.n_p[period_index] * ( - period_scaled_expression - + pyo.quicksum( - term - for term in period_blocks[ - period_index - ].general_scaled_expression - ) - ) - one_time_expression += ( - architecture.n_p[period_index] * period_one_time_expression - ) - objective = design_objectives[name] + w * ( - scaled_expression + (1.0 / architecture.n) * one_time_expression - ) - block.add_objective(name, objective) - - else: - raise ValueError(f"Invalid architecture type {type(architecture)}") - - def fill_design_block(self, d_block, objectives): - for component in self._components: - self._components[component].add_design_variables(d_block) - - for logic_name, logic in self._additional_model_logic.items(): - if logic["type"] == "EqualCapacity": - components = logic["components"] - for i in range(len(components) - 1): - - def rule(m): - return ( - d_block.component_dict[components[i] + ".capacity"] - == d_block.component_dict[components[i + 1] + ".capacity"] - ) - - d_block.add( - logic_name + "_cons_" + str(i), pyo.Constraint(rule=rule) - ) - - design_objectives = dict() - for name, objective in objectives.items(): - expression = 0.0 - if "annuity" in objective: - annuity = 0.0 - for component in self._components.values(): - annuity += component.design_annuity( - d_block, - self._planning_horizon, - self._price_change_factor, - self._interest_factor, - ) - expression += annuity - design_objectives[name] = expression - - return design_objectives - - def fill_operational_blocks(self, d_block, o_block, s_block, objectives): - for component in self._components: - self._components[component].add_non_state_variables(o_block) - self._components[component].add_state_variables(s_block) - self._components[component].add_non_state_model(d_block, o_block) - self._components[component].add_state_model(d_block, o_block, s_block) - self._components[component].add_additional_model_logic(d_block, o_block) - - for connector in self._connectors.values(): - for flow in connector.flows: - o_block.add(flow, pyo.Var(o_block.T, bounds=(0, None))) - - connector_var = o_block.component_dict[connector.name] - - def rule(m, t): - return connector_var[t] == pyo.quicksum( - o_block.component_dict[flow][t] for flow in connector.flows - ) - - o_block.add(connector.name + "_sum", pyo.Constraint(o_block.T, rule=rule)) - - for i, connection in enumerate(self._connections): - - def rule(m, t): - return pyo.quicksum( - o_block.component_dict[out_flow][t] - for out_flow in connection.out_flows - ) == pyo.quicksum( - o_block.component_dict[in_flow][t] - for in_flow in connection.in_flows - ) * ( - 1.0 - connection.loss_factor - ) - - o_block.add(str(i) + "_sum", pyo.Constraint(o_block.T, rule=rule)) - - if connection.capacity is not None: - capacity = connection.capacity - - def rule(m, t): - return ( - pyo.quicksum( - o_block.component_dict[in_flow][t] - for in_flow in connection.in_flows - ) - <= capacity - ) - - o_block.add(str(i) + "_capacity", pyo.Constraint(o_block.T, rule=rule)) - - for logic_name, logic in self._additional_model_logic.items(): - if logic["type"] == "ConnectorEnable": - enable = logic["enable"].resample(o_block.dynamic).values - for i, connector in enumerate(logic["connectors"]): - connector_var = o_block.component_dict[connector] - - def rule(m, t): - if not enable[t]: - return connector_var[t] == 0 - else: - return pyo.Constraint.Skip - - o_block.add( - logic_name + "_cons_" + str(i), - pyo.Constraint(o_block.T, rule=rule), - ) - - if logic["type"] == "ConnectionEnable": - enable = logic["enable"].resample(o_block.dynamic).values - for i, connection in enumerate(logic["connections"]): - flow_from = connection["from"] - flow_to = connection["to"] - if (flow_from, flow_to) in self._connections_map: - connection = self._connections[ - self._connections_map[flow_from, flow_to] - ] - # if flow in self._removed_flows: - # flow = self._removed_flows[flow] - flow_var = o_block.component_dict[connection.in_flows[0]] - - def rule(m, t): - if not enable[t]: - return flow_var[t] == 0 - else: - return pyo.Constraint.Skip - - o_block.add( - logic_name + "_cons_" + str(i), - pyo.Constraint(o_block.T, rule=rule), - ) - - operational_objectives = dict() - for name, objective in objectives.items(): - scaled_expression = 0.0 - one_time_expression = 0.0 - if "annuity" in objective: - annuity = 0.0 - for component in self._components.values(): - annuity += component.operational_annuity( - o_block, - self._planning_horizon, - self._price_change_factor, - self._interest_factor, - ) - scaled_expression += annuity - - if "peak_power_cost" in objective: - peak_power_cost = 0.0 - for component in self._components.values(): - peak_power_cost += component.peak_power_cost(o_block) - - T = self._planning_horizon - r = self._price_change_factor - q = self._interest_factor - if q == 1.0: - a = 1.0 / T - else: - a = (q - 1.0) / (1.0 - q ** (-T)) - if q == r: - b = T / q - else: - b = (1.0 - (r / q) ** T) / (q - r) - - one_time_expression += peak_power_cost * a * b - - if "co2_emissions" in objective: - co2_emissions = 0.0 - for component in self._components.values(): - co2_emissions += component.co2_emissions(o_block) - scaled_expression += co2_emissions - - operational_objectives[name] = (scaled_expression, one_time_expression) - - return operational_objectives - - def create_empty_entity_result_new( + def create_empty_entity_result( self, key, architecture, input_component_ident_strs, sub_result_configurations ): for name, asset in self._assets.items(): @@ -1166,7 +902,7 @@ class Topology: sub_architecture = sub_result_configurations[name][1] sub_component_ident_strs = sub_result_configurations[name][2] sub_sub_result_configurations = sub_result_configurations[name][3] - asset.create_empty_entity_result_new( + asset.create_empty_entity_result( sub_key, sub_architecture, sub_component_ident_strs, @@ -1264,74 +1000,11 @@ class Topology: for dist, _ in dists.values(): dist.up() - def create_empty_entity_result(self, key, architecture): - for asset in self._assets.values(): - asset.create_empty_entity_result(key, architecture) - - if isinstance(architecture, TrivialArchitecture): - base_variable_names = [] - for component in self._components: - base_variable_names.extend( - self._components[component].design_base_variable_names() - ) - base_variable_names.extend( - self._components[component].non_state_base_variable_names() - ) - base_variable_names.extend( - self._components[component].state_base_variable_names() - ) - - for connector in self._connectors.values(): - for flow in connector.flows: - base_variable_names.append((flow, VariableKind.INDEXED)) - - result = EntityResult(architecture) - result.register_dynamic(architecture.dynamic, "", base_variable_names) - - elif isinstance(architecture, PeriodAggregation): - design_base_variable_names = [] - for component in self._components: - design_base_variable_names.extend( - self._components[component].design_base_variable_names() - ) - - result = EntityResult(architecture) - result.register_dynamic( - architecture.dynamic, "", design_base_variable_names - ) - - operational_base_variable_names = [] - for component in self._components: - operational_base_variable_names.extend( - self._components[component].non_state_base_variable_names() - ) - operational_base_variable_names.extend( - self._components[component].state_base_variable_names() - ) - - for connector in self._connectors.values(): - for flow in connector.flows: - operational_base_variable_names.append((flow, VariableKind.INDEXED)) - - result.register_dynamic( - architecture.value_dynamic, - "aggregated", - operational_base_variable_names, - ) - - else: - raise ValueError(f"Invalid architecture type {type(architecture)}") - - result.compile() - - self._results[key] = result - self._last_result_key = key - - def extract_result_new(self, model, key, sub_keys): + def extract_result(self, model, key, sub_keys): for name, asset in self._assets.items(): sub_key = sub_keys[name][0] sub_sub_keys = sub_keys[name][1] - asset.extract_result_new(model, sub_key, sub_sub_keys) + asset.extract_result(model, sub_key, sub_sub_keys) result = self._results[key] @@ -1342,27 +1015,6 @@ class Topology: model.all_blocks[block_prefix + local_block_prefix] ) - def extract_result(self, block, key): - for asset in self._assets.values(): - asset_block = block.blocks[asset._name] - asset.extract_result(asset_block, key) - - result = self._results[key] - - if isinstance(result.architecture, TrivialArchitecture): - result.extract_result(block, result.architecture.dynamic) - - elif isinstance(result.architecture, PeriodAggregation): - result.extract_result(block, result.architecture.dynamic) - - for period_index in range(result.architecture.number_of_periods()): - period_block = block.blocks[str(period_index)] - - result.extract_result(period_block, result.architecture.value_dynamic) - - else: - raise ValueError(f"Invalid architecture type {type(result.architecture)}") - def save_results(self, path, keys=None): for asset in self._assets.values(): asset.save_results(path, keys) -- GitLab