diff --git a/dynamics/Dynamic.py b/dynamics/Dynamic.py deleted file mode 100644 index 19f12f8f30bfc2295c8721493d9ee1eccbdeaa62..0000000000000000000000000000000000000000 --- a/dynamics/Dynamic.py +++ /dev/null @@ -1,1084 +0,0 @@ -""" -MIT License - -Copyright (c) 2023 RWTH Aachen University - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -import pandas as pd -import pyomo.environ as pyo - -class GlobalDynamic: - # d_steps: length of each time step in seconds - def __init__(self, d_steps): - self.d_steps = d_steps - self.root_dynamic = TrivialDynamic(self.d_steps, self) - self.dynamics = dict() - self.dynamics[self.root_dynamic] = (dict(), dict()) - self.symbol_table = dict() - self.symbol_table[self.root_dynamic] = 0 - self.assignments = dict() - - def root(self): - return self.root_dynamic - - # indices are relative to the root dynamic - def sub_dynamic(self, dynamic, indices): - if isinstance(dynamic, PartialDynamic): - dynamic = dynamic.reference - if tuple(indices) in self.dynamics[dynamic][0].keys(): - return self.dynamics[dynamic][0][tuple(indices)] - sub_dynamic = BackedDynamic(dynamic, indices, self) - self.dynamics[dynamic][0][tuple(indices)] = sub_dynamic - self.dynamics[sub_dynamic] = (dict(), dict()) - self.symbol_table[sub_dynamic] = len(self.symbol_table) - return sub_dynamic - - # start/end are relative to dynamic - def partial_dynamic(self, dynamic, start, end): - if start == 0 and end == dynamic.number_of_steps(): - return dynamic - if isinstance(dynamic, PartialDynamic): - start += dynamic.start - end += dynamic.start - dynamic = dynamic.reference - if (start, end) in self.dynamics[dynamic][1].keys(): - return self.dynamics[dynamic][1][(start, end)] - partial_dynamic = PartialDynamic(dynamic, start, end, self) - self.dynamics[dynamic][1][(start, end)] = partial_dynamic - self.dynamics[partial_dynamic] = (dict(), dict()) - self.symbol_table[partial_dynamic] = len(self.symbol_table) - return partial_dynamic - - def get_assignment(self, dynamic, target_dynamic): - if isinstance(dynamic, PartialDynamic): - non_partial_dynamic = dynamic.reference - else: - non_partial_dynamic = dynamic - if isinstance(target_dynamic, PartialDynamic): - non_partial_target_dynamic = target_dynamic.reference - else: - non_partial_target_dynamic = target_dynamic - if (non_partial_dynamic, non_partial_target_dynamic) not in self.assignments: - self.assignments[non_partial_dynamic, non_partial_target_dynamic] = compute_assignment(non_partial_dynamic, non_partial_target_dynamic) - return self.assignments[non_partial_dynamic, non_partial_target_dynamic] - - def display(self): - output = '' - for dynamic, (sub_dynamics, partial_dynamics) in self.dynamics.items(): - if len(sub_dynamics) == 0 and len(partial_dynamics) == 0: - continue - output += f'{self.symbol_table[dynamic]}:\n' - for indices, sub_dynamic in sub_dynamics.items(): - output += f'\t{indices}: {self.symbol_table[sub_dynamic]}\n' - for (start, end), partial_dynamic in partial_dynamics.items(): - output += f'\t{start}-{end}: {self.symbol_table[partial_dynamic]}\n' - return output - -# The root dynamic is -# for TrivialDynamic: self -# for BackedDynamic: root dynamic of its reference -# for PartialDynamic: root dynamic of its reference - -# represents a continuus set of time steps relative to the global simulation start t_start -class Dynamic: - # returns the root dynamic of this dynamic - def root(self): - pass - - # return true if other is an ancestor of this dynamic - def has_ancestor(self, other): - pass - - # returns the number of steps in this dynamic - def number_of_steps(self): - pass - - # returns all time steps in this dynamic - def time_steps(self): - pass - - # returns all indices in this dynamic - def all_indices(self): - pass - - # return true if the dynamic contains the given index - # index is relative to the root dynamic - def has_index(self, index): - pass - - # returns the indices between the given start and end positions - # p_start and p_end are relative to this dynamic - def indices_within_p(self, p_start, p_end): - pass - - # returns the indices between the given start and end indices - # i_start and i_end are relative to the root dynamic - def indices_within(self, i_start, i_end): - pass - - # returns the index of the given position - # position is relative to this dynamic - def index_of(self, position): - pass - - # returns the position of the given index - # index is relative to the root dynamic - def position_of(self, index): - pass - - # returns the length of the time step at the given position - # position is relative to this dynamic - def step_length_p(self, position): - pass - - # returns the length of the time step at the given position in hours - # position is relative to this dynamic - def step_size_p(self, position): - pass - - # returns the length of the time step at the given index - # index is relative to the root dynamic - def step_length(self, index): - pass - - # returns the length of the time step at the given index in hours - # index is relative to the root dynamic - def step_size(self, index): - pass - - # returns the length af all time steps in this dynamic - def step_lengths(self): - pass - - # returns the length af all time steps in this dynamic in hours - def step_sizes(self): - pass - - # constructs a sub dynamic containing time steps starting at the time steps at the given positions with the last position representing the end of the last time step - # positions are relative to this dynamic - def sub_dynamic_p(self, positions): - pass - - # constructs a sub dynamic containing time steps starting at the time steps at the given indices with the last index representing the end of the last time step - # indices are relative to the root dynamic - def sub_dynamic(self, indices): - pass - - # construct a sub dynamic containing time steps between the given positions - # p_start and p_end are relative to this dynamic - def partial_dynamic_p(self, p_start, p_end): - pass - - # construct a sub dynamic containing time steps between the given indices - # i_start and i_end are relative to the root dynamic - def partial_dynamic(self, i_start, i_end): - pass - - def display(self): - numbers = self.all_indices() - rows = [[]] - row_ends = [0] - for number in numbers: - local_position = number - numbers[0] - row_index = 0 - while row_index < len(rows) and row_ends[row_index] > local_position: - row_index += 1 - if row_index == len(rows): - rows.append([]) - row_ends.append(0) - rows[row_index].append(number) - row_ends[row_index] = local_position + len(str(number)) + 1 - row_strings = [] - for items in rows: - row_string = '' - string_end = 0 - for number in items: - local_position = number - numbers[0] - row_string += ' ' * (local_position - string_end) - row_string += str(number) - string_end = local_position + len(str(number)) - row_strings.append(row_string) - output = '' - for row_index in range(len(row_strings)): - output += row_strings[len(row_strings) - row_index - 1] + '\n' - string_end = 0 - for number in numbers: - local_position = number - numbers[0] - output += ' ' * (local_position - string_end) - output += '|' - string_end = local_position + 1 - output += '\n' - return output - - def display_alignment(self, other): - p_self = 0 - p_other = 0 - numbers = [] - while p_self <= self.number_of_steps() and p_other <= other.number_of_steps(): - i_self = self.index_of(p_self) - i_other = other.index_of(p_other) - if i_self < i_other: - numbers.append(i_self) - p_self += 1 - elif i_self > i_other: - numbers.append(i_other) - p_other += 1 - else: - numbers.append(i_self) - p_self += 1 - p_other += 1 - while p_self < self.number_of_steps(): - numbers.append(self.index_of(p_self)) - p_self += 1 - else: - while p_other < other.number_of_steps(): - numbers.append(other.index_of(p_other)) - p_other += 1 - rows = [[]] - row_ends = [0] - for number in numbers: - local_position = number - numbers[0] - row_index = 0 - while row_index < len(rows) and row_ends[row_index] > local_position: - row_index += 1 - if row_index == len(rows): - rows.append([]) - row_ends.append(0) - rows[row_index].append(number) - row_ends[row_index] = local_position + len(str(number)) + 1 - row_strings = [] - for items in rows: - row_string = '' - self_string_end = 0 - for number in items: - local_position = number - numbers[0] - row_string += ' ' * (local_position - self_string_end) - row_string += str(number) - self_string_end = local_position + len(str(number)) - row_strings.append(row_string) - output = '' - for row_index in range(len(row_strings)): - output += row_strings[len(row_strings) - row_index - 1] + '\n' - self_string = ' ' * (self.index_of(0) - numbers[0]) - self_string_end = self.index_of(0) - numbers[0] - for index in self.all_indices(): - local_position = index - numbers[0] - self_string += ' ' * (local_position - self_string_end) - self_string += '|' - self_string_end = local_position + 1 - output += self_string + '\n' - other_string = ' ' * (other.index_of(0) - numbers[0]) - other_string_end = other.index_of(0) - numbers[0] - for index in other.all_indices(): - local_position = index - numbers[0] - other_string += ' ' * (local_position - other_string_end) - other_string += '|' - other_string_end = local_position + 1 - output += other_string + '\n' - return output - -# a dynamic defined by the length of each time step -class TrivialDynamic(Dynamic): - # d_steps: length of each time step in seconds - def __init__(self, d_steps, global_dynamic): - self.d_steps = d_steps - self.global_dynamic = global_dynamic - - def root(self): - return self - - def has_ancestor(self, other): - return self == other - - def number_of_steps(self): - return len(self.d_steps) - - def time_steps(self): - return range(len(self.d_steps)) - - def all_indices(self): - return range(len(self.d_steps) + 1) - - def has_index(self, index): - return 0 <= index and index <= len(self.d_steps) - - def indices_within_p(self, p_start, p_end): - return pd.RangeIndex(p_start, p_end) - - def indices_within(self, i_start, i_end): - return pd.RangeIndex(i_start, i_end) - - def index_of(self, position): - return position - - def position_of(self, index): - return index - - def step_length_p(self, position): - return self.d_steps[position] - - def step_size_p(self, position): - return self.d_steps[position] / 3600 - - def step_length(self, index): - return self.d_steps[index] - - def step_size(self, index): - return self.d_steps[index] / 3600 - - def step_lengths(self): - return self.d_steps - - def step_sizes(self): - return [d_step / 3600 for d_step in self.d_steps] - - def sub_dynamic_p(self, positions): - return self.global_dynamic.sub_dynamic(self, positions) - - def sub_dynamic(self, indices): - return self.global_dynamic.sub_dynamic(self, indices) - - def partial_dynamic_p(self, p_start, p_end): - return self.global_dynamic.partial_dynamic(self, p_start, p_end) - - def partial_dynamic(self, i_start, i_end): - return self.global_dynamic.partial_dynamic(self, i_start, i_end) - -# a dynamic definied by taking certain time steps form a reference dynamic -class BackedDynamic(Dynamic): - # reference: the dynamic that backs this dynamic - # indices: the indicies of the time steps contained in this dynamic with the last index representing the end of the last time step - # indices are relative to the reference dynamic - def __init__(self, reference, indices, global_dynamic): - self.reference = reference - self.indices = indices - self.global_dynamic = global_dynamic - - def root(self): - return self.reference.root() - - def has_ancestor(self, other): - return self == other or self.reference.has_ancestor(other) - - def number_of_steps(self): - return len(self.indices) - 1 - - def time_steps(self): - return self.indices[:-1] - - def all_indices(self): - return self.indices - - def has_index(self, index): - return index in self.indices - - def indices_within_p(self, p_start, p_end): - if p_start < 0 or len(self.indices) <= p_start or p_end < 0 or len(self.indices) <= p_end: - raise IndexError("The dynamic does not have indices at the positions!") - return self.indices[p_start:p_end] - - def indices_within(self, i_start, i_end): - p_start = self.position_of(i_start) - p_end = self.position_of(i_end) - return self.indices[p_start:p_end] - - def index_of(self, position): - if position < 0 or len(self.indices) <= position: - raise IndexError("The dynamic does not have a index for this position!") - return self.indices[position] - - def position_of(self, index): - if index not in self.indices: - raise IndexError('The dynamic does not have a position for this index!') - return self.indices.index(index) - - def step_length_p(self, position): - if position < 0 or len(self.indices) - 1 <= position: - raise IndexError("The dynamic does not have a time step at this position!") - return sum(self.reference.step_length(index) for index in self.reference.indices_within(self.indices[position], self.indices[position + 1])) - - def step_size_p(self, position): - if position < 0 or len(self.indices) - 1 <= position: - raise IndexError("The dynamic does not have a time step at this position!") - return sum(self.reference.step_length(index) for index in self.reference.indices_within(self.indices[position], self.indices[position + 1])) / 3600 - - def step_length(self, index): - if index not in self.indices[:-1]: - raise IndexError("The dynamic does not have a time step at this index!") - return self.step_length_p(self.indices.index(index)) - - def step_size(self, index): - if index not in self.indices[:-1]: - raise IndexError("The dynamic does not have a time step at this index!") - return self.step_length_p(self.indices.index(index)) / 3600 - - def step_lengths(self): - return [self.step_length_p(position) for position in range(self.number_of_steps())] - - def step_sizes(self): - return [self.step_length_p(position) / 3600 for position in range(self.number_of_steps())] - - def sub_dynamic_p(self, positions): - if any(position < 0 or len(self.indices) <= position for position in positions): - raise IndexError("The dynamic does not have all requested indices!") - return self.global_dynamic.sub_dynamic(self, [self.indices[position] for position in positions]) - - def sub_dynamic(self, indices): - if any(index not in self.indices for index in indices): - raise IndexError("The dynamic does not have all requested indices for the sub dynamic!") - return self.global_dynamic.sub_dynamic(self, indices) - - def partial_dynamic_p(self, p_start, p_end): - if p_start < 0 or len(self.indices) <= p_start or p_end < 0 or len(self.indices) <= p_end: - raise IndexError("The dynamic does not have all requested positions for the sub dynamic!") - return self.global_dynamic.partial_dynamic(self, p_start, p_end) - - def partial_dynamic(self, i_start, i_end): - if i_start not in self.indices or i_end not in self.indices: - raise IndexError("The dynamic does not have all requested indices for the sub dynamic!") - return self.global_dynamic.partial_dynamic(self, self.indices.index(i_start), self.indices.index(i_end)) - -# a dynamic defined by taking a continuus intervall of time steps from a reference dynamic -class PartialDynamic(Dynamic): - # reference: the dynamic from which the intervall is taken - # start: the position in the reference dynamic of the first time step contained in this dynamic - # end: the position in the reference dynamic of the end of the last time step contained in this dynamic - # start/end are relative to the reference dynamic - def __init__(self, reference, start, end, global_dynamic): - self.reference = reference - self.start = start - self.end = end - self.global_dynamic = global_dynamic - - def root(self): - return self.reference.root() - - def has_ancestor(self, other): - return self == other or self.reference.has_ancestor(other) - - def number_of_steps(self): - return self.end - self.start - - def time_steps(self): - return self.reference.time_steps()[self.start:self.end] - - def all_indices(self): - return self.reference.all_indices()[self.start:self.end + 1] - - def has_index(self, index): - if self.reference.has_index(index): - reference_position = self.reference.position_of(index) - return self.start <= reference_position and reference_position <= self.end - return False - - def indices_within_p(self, p_start, p_end): - if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end: - raise IndexError("The dynamic does not have all requested indices!") - return self.reference.indices_within_p(self.start + p_start, self.start + p_end) - - def indices_within(self, i_start, i_end): - p_start = self.reference.position_of(i_start) - p_end = self.reference.position_of(i_end) - if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end: - raise IndexError("The dynamic does not have all requested indices!") - return self.reference.indices_within_p(self.start + p_start, self.start + p_end) - - def index_of(self, position): - if position < 0 or self.end - self.start < position: - raise IndexError("The dynamic does not have a index for this position!") - return self.reference.index_of(self.start + position) - - def position_of(self, index): - reference_position = self.reference.position_of(index) - if reference_position < self.start or self.end < reference_position: - raise IndexError('The dynamic does not have a position for this index!') - return reference_position - self.start - - def step_length_p(self, position): - if position < 0 or self.end - self.start <= position: - raise IndexError("The dynamic does not have a time step at this position!") - return self.reference.step_length_p(self.start + position) - - def step_size_p(self, position): - if position < 0 or self.end - self.start <= position: - raise IndexError("The dynamic does not have a time step at this position!") - return self.reference.step_size_p(self.start + position) - - def step_length(self, index): - reference_position = self.reference.position_of(index) - if reference_position < self.start or self.end <= reference_position: - raise IndexError('The dynamic does not have a time step at this index!') - return self.step_length_p(reference_position - self.start) - - def step_size(self, index): - reference_position = self.reference.position_of(index) - if reference_position < self.start or self.end <= reference_position: - raise IndexError('The dynamic does not have a time step at this index!') - return self.step_size_p(reference_position - self.start) - - def step_lengths(self): - return [self.reference.step_length_p(position) for position in range(self.start, self.end)] - - def step_sizes(self): - return [self.reference.step_size_p(position) for position in range(self.start, self.end)] - - def sub_dynamic_p(self, positions): - if any(position < 0 or self.end - self.start < position for position in positions): - raise IndexError("The dynamic does not have all requested positions for the sub dynamic!") - return self.global_dynamic.sub_dynamic(self, [self.index_of(position) for position in positions]) - - def sub_dynamic(self, indices): - def filter(index): - reference_position = self.reference.position_of(index) - return reference_position < self.start or self.end < reference_position - if any(filter(index) for index in indices): - raise IndexError("The does not have all requested indices for the sub dynamic!") - return self.global_dynamic.sub_dynamic(self, indices) - - def partial_dynamic_p(self, p_start, p_end): - if p_start < 0 or self.end - self.start < p_start or p_end < 0 or self.end - self.start < p_end: - raise IndexError("The dynamic does not have all requested positions for the sub dynamic!") - return self.global_dynamic.partial_dynamic(self, p_start, p_end) - - def partial_dynamic(self, i_start, i_end): - reference_start = self.reference.position_of(i_start) - reference_end = self.reference.position_of(i_end) - if reference_start < self.start or self.end < reference_start or reference_end < self.start or self.end < reference_end: - raise IndexError("The dynamic does not have all requested indices for the sub dynamic!") - return self.global_dynamic.partial_dynamic(self, self.position_of(i_start), self.position_of(i_end)) - -def compute_assignment(dynamic, target_dynamic): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to have the same root dynamic!") - if isinstance(dynamic, PartialDynamic): - raise ValueError("The source dyanmic cannot be a PartialDynamic!") - if isinstance(target_dynamic, PartialDynamic): - raise ValueError("The target dyanmic cannot be a PartialDynamic!") - if dynamic == target_dynamic: - return compute_assignment_same(dynamic, target_dynamic) - elif isinstance(target_dynamic, BackedDynamic) and target_dynamic.has_ancestor(dynamic): - return compute_assignment_to_backed(dynamic, target_dynamic) - elif isinstance(dynamic, BackedDynamic) and dynamic.has_ancestor(target_dynamic): - return compute_assignment_from_backed(dynamic, target_dynamic) - else: - return compute_assignment_common_reference(dynamic, target_dynamic) - -# source_dynamic and target_dynamic are the same dynamic -def compute_assignment_same(dynamic, target_dynamic): - assignment = Assignment(dynamic.all_indices(), target_dynamic.all_indices()) - assignment.add_bulk(target_dynamic.time_steps()) - assignment.compile() - return assignment - -# target_dynamic is BackedDynamic and has dynamic as an ancestor -def compute_assignment_to_backed(dynamic, target_dynamic): - assignment = Assignment(dynamic.all_indices(), target_dynamic.indices) - for target_index in target_dynamic.indices[:-1]: - target_position = target_dynamic.position_of(target_index) - source_indices = dynamic.indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1]) - if len(source_indices) == 1: - assignment.add_individual(source_indices[0], 1, target_index) - else: - acc = [] - for source_index in source_indices: - acc.append((source_index, dynamic.step_length(source_index) / target_dynamic.step_length_p(target_position))) - assignment.add_expression(acc, target_index) - assignment.compile() - return assignment - -# dynamic is BackedDynamic and has target_dynamic as an ancestor -def compute_assignment_from_backed(dynamic, target_dynamic): - assignment = Assignment(dynamic.indices, target_dynamic.all_indices()) - for source_position in range(dynamic.number_of_steps()): - source_index = dynamic.indices[source_position] - target_indices = target_dynamic.indices_within(source_index, dynamic.indices[source_position + 1]) - assignment.add_distribute(source_index, target_indices) - assignment.compile() - return assignment - -# dynamic and target_dynamic are BackedDynamic and share the same root dynamic -def compute_assignment_common_reference(dynamic, target_dynamic): - assignment = Assignment(dynamic.indices, target_dynamic.indices) - if dynamic.indices[-1] <= target_dynamic.indices[0] or target_dynamic.indices[-1] <= dynamic.indices[0]: - assignment.compile() - return assignment - target_i_start = target_dynamic.indices[0] - if target_i_start not in dynamic.indices: - source_i_start = dynamic.indices[0] - if source_i_start < target_i_start: - root = dynamic.root() - root_p_start = target_i_start # because root is a TrivialDynamic, positions and indices are equivalent - length = 0 - while root_p_start not in dynamic.indices[:-1]: # because root is a TrivialDynamic, root_p_start is equivalent to root_i_start - root_p_start -= 1 - length += root.step_length_p(root_p_start) - source_position = dynamic.indices.index(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent - target_position = 0 - remaining_length = dynamic.step_length_p(source_position) - length - else: # Here source_i_start > target_i_start becuase the case of source_i_start == target_i_start is handled in the else branch of target_i_srat not in dynamic.indices - root = dynamic.root() - root_p_start = source_i_start # because root is a TrivialDynamic, positions and indices are equivalent - length = 0 - source_position = 0 - while root_p_start not in target_dynamic.indices: - length += root.step_length_p(root_p_start) - root_p_start += 1 - if root_p_start in dynamic.indices[:-1]: - length = 0 - source_position += 1 - elif root_p_start > dynamic.indices[-1]: # because root is a TrivialDynamic, positions and indices are equivalent - assignment.compile() # here, we discover that the entire source_dynamic does not cover one time_step of the target_dynamic - return assignment - target_position = target_dynamic.position_of(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent - remaining_length = dynamic.step_length_p(source_position) - length - else: - source_position = dynamic.indices.index(target_i_start) - target_position = 0 - remaining_length = dynamic.step_length(target_i_start) - while target_position < len(target_dynamic.indices) - 1: - remaining_target_length = target_dynamic.step_length_p(target_position) - acc = [] - while remaining_target_length > 0: - if remaining_length == 0: - source_position += 1 - if source_position >= len(dynamic.indices) - 1: - assignment.compile() - return assignment - remaining_length = dynamic.step_length_p(source_position) - if remaining_target_length <= remaining_length: - acc.append((dynamic.indices[source_position], remaining_target_length)) - remaining_length -= remaining_target_length - remaining_target_length -= remaining_target_length - else: - acc.append((dynamic.indices[source_position], remaining_length)) - remaining_target_length -= remaining_length - remaining_length -= remaining_length - for i, (index, factor) in enumerate(acc): - acc[i] = (index, factor / target_dynamic.step_length_p(target_position)) - assignment.add_expression(acc, target_dynamic.index_of(target_position)) - target_position += 1 - assignment.compile() - return assignment - -class Assignment: - def __init__(self, indices, target_indices): - self.index = list(indices) - self.target_index = list(target_indices) - self.bulk = pd.Series(False, index=indices[:-1]) - self.bulk_target = pd.Series(False, index=target_indices[:-1]) - self.distributes = [] - self.expressions = [] - self.data = dict.fromkeys(target_indices[:-1], None) - - def add_bulk(self, targets): - self.bulk[targets] = True - self.bulk_target[targets] = True - for target in targets: - self.data[target] = (target, 1.0) - - def add_distribute(self, source, targets): - self.bulk[targets[0]] = True - self.bulk_target[targets[0]] = True - if len(targets) > 1: - self.distributes.append((source, targets[1:])) - for target in targets: - self.data[target] = (source, 1.0) - - def add_individual(self, source, factor, target): - if target == source and factor == 1.0: - self.bulk[target] = True - self.bulk_target[target] = True - elif factor == 1.0: - if len(self.distributes) == 0 or self.distributes[-1][0] != source: - self.distributes.append((source, [])) - self.distributes[-1][1].append(target) - else: - raise ValueError(f'Tried to add a individual with a factor unequal to 1!') - self.data[target] = (source, factor) - - def add_expression(self, expression, target): - if len(expression) == 1: - self.add_individual(expression[0][0], expression[0][1], target) - else: - self.expressions.append((expression, target)) - self.data[target] = expression - - def compile(self): - index_shift = {self.index[p]: self.index[p + 1] for p in range(len(self.index[:-1]))} - target_index_shift = {self.target_index[p]: self.target_index[p + 1] for p in range(len(self.target_index[:-1]))} - self.source_target_start = dict.fromkeys(self.index[:-1], None) - self.target_source_start = dict.fromkeys(self.target_index[:-1], None) - self.source_target_end = dict.fromkeys(self.index[1:], None) - self.target_source_end = dict.fromkeys(self.target_index[1:], None) - last_un_p_source = 0 - last_un_p_source_target_end = None - last_source_target_end = None - for target, data in self.data.items(): - if data is not None: - first_source = data[0][0] if isinstance(data, list) else data[0] - last_source = data[-1][0] if isinstance(data, list) else data[0] - p_stop = self.index.index(first_source) + 1 - for p in range(last_un_p_source, p_stop): - self.source_target_start[self.index[p]] = target - last_un_p_source = p_stop - self.target_source_start[target] = first_source - if isinstance(data, list): - for source, _f in data[1:]: - self.source_target_end[source] = target - self.source_target_end[index_shift[last_source]] = target_index_shift[target] - self.target_source_end[target_index_shift[target]] = index_shift[last_source] - last_un_p_source_target_end = self.index.index(index_shift[last_source]) + 1 - last_source_target_end = target_index_shift[target] - if last_un_p_source_target_end is not None: - for p in range(last_un_p_source_target_end, len(self.index)): - self.source_target_end[self.index[p]] = last_source_target_end - - self.first_distribute = dict.fromkeys(self.target_index[:-1], None) - last_un_p_distribute = 0 - for n, distribute in enumerate(self.distributes): - for p in range(last_un_p_distribute, self.target_index.index(distribute[1][0])): - self.first_distribute[self.target_index[p]] = (n, 0) - for m, target in enumerate(distribute[1]): - self.first_distribute[target] = (n, m) - last_un_p_distribute = self.target_index.index(distribute[1][-1]) + 1 - - self.last_distribute = dict.fromkeys(self.target_index[1:], None) - last_un_p_distribute = len(self.target_index) - for n_prime in range(len(self.distributes)): - n = len(self.distributes) - n_prime - 1 - distribute = self.distributes[n] - for p in range(self.target_index.index(distribute[1][-1]) + 1, last_un_p_distribute): - self.last_distribute[self.target_index[p]] = (n, len(distribute[1])) - for m, target in enumerate(distribute[1][1:]): - self.last_distribute[target] = (n, m + 1) - last_un_p_distribute = self.target_index.index(distribute[1][0]) + 1 - - self.first_expression = dict.fromkeys(self.target_index[:-1], None) - last_un_p_expression = 0 - for n, expression in enumerate(self.expressions): - for p in range(last_un_p_expression, self.target_index.index(expression[1]) + 1): - self.first_expression[self.target_index[p]] = n - last_un_p_expression = self.target_index.index(expression[1]) + 1 - - self.last_expression = dict.fromkeys(self.target_index[1:], None) - last_un_p_expression = len(self.target_index) - for n_prime in range(len(self.expressions)): - n = len(self.expressions) - n_prime - 1 - expression = self.expressions[n] - for p in range(self.target_index.index(expression[1]) + 1, last_un_p_expression): - self.last_expression[self.target_index[p]] = n - last_un_p_expression = self.target_index.index(expression[1]) + 1 - -def resample(values, dynamic, target_dynamic, target_values=None): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to have the same root dynamic!") - if target_values is None: - target_values = pd.Series(index = target_dynamic.time_steps()) - source_i_start = dynamic.index_of(0) - source_i_end = dynamic.index_of(dynamic.number_of_steps()) - target_i_start = target_dynamic.index_of(0) - target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps()) - if source_i_end <= target_i_start or target_i_end <= source_i_start: - return target_values - assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic) - if source_i_start < target_i_start: - source_i_start = assignment.target_source_start[target_i_start] - elif source_i_start > target_i_start: - target_i_start = assignment.source_target_start[source_i_start] - if target_i_start is None: - return target_values - if source_i_end > target_i_end: - source_i_end = assignment.target_source_end[target_i_end] - elif source_i_end < target_i_end: - target_i_end = assignment.source_target_end[source_i_end] - if target_i_end is None: - return target_values - if target_i_start >= target_i_end: - return target_values - - target_values.loc[(target_i_start <= target_values.index) & (target_values.index < target_i_end) & assignment.bulk_target.loc[target_dynamic.index_of(0):target_dynamic.index_of(target_dynamic.number_of_steps() - 1)].values] = values.loc[(source_i_start <= values.index) & (values.index < source_i_end) & assignment.bulk.loc[dynamic.index_of(0):dynamic.index_of(dynamic.number_of_steps() - 1)].values] - if assignment.first_distribute[target_i_start] is not None and assignment.last_distribute[target_i_end] is not None: - first_distribute = assignment.first_distribute[target_i_start] - last_distribute = assignment.last_distribute[target_i_end] - if first_distribute[0] == last_distribute[0]: - distribute = assignment.distributes[first_distribute[0]] - target_values.loc[distribute[1][first_distribute[1]:last_distribute[1]]] = values.loc[distribute[0]] - elif first_distribute[0] < last_distribute[0]: - distribute = assignment.distributes[first_distribute[0]] - target_values.loc[distribute[1][first_distribute[1]:]] = values.loc[distribute[0]] - for i in range(first_distribute[0] + 1, last_distribute[0]): - distribute = assignment.distributes[i] - target_values.loc[distribute[1]] = values.loc[distribute[0]] - distribute = assignment.distributes[last_distribute[0]] - target_values.loc[distribute[1][:last_distribute[1]]] = values.loc[distribute[0]] - if assignment.first_expression[target_i_start] is not None and assignment.last_expression[target_i_end] is not None: - for i in range(assignment.first_expression[target_i_start], assignment.last_expression[target_i_end] + 1): - expression = assignment.expressions[i] - target_values[expression[1]] = sum(values[i] * f for i, f in expression[0]) - return target_values - -def resample_variable(variable, dynamic, target_dynamic, target_set): - if dynamic.root() != target_dynamic.root(): - raise ValueError("Both dynamics have to have the same root dynamic!") - source_i_start = dynamic.index_of(0) - source_i_end = dynamic.index_of(dynamic.number_of_steps()) - target_i_start = target_dynamic.index_of(0) - target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps()) - if target_i_start < source_i_start or source_i_end < target_i_end: - raise ValueError("The dynamic of the source variable has to cover the dynamic of the target variable!") - assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic) - target_variable = dict() - for t in target_set: - data = assignment.data[t] - if isinstance(data, list): - target_variable[t] = pyo.quicksum(variable[variable_index] * factor for variable_index, factor in assignment.data[t]) - else: - target_variable[t] = variable[data[0]] * data[1] - return target_variable - -def test_single_resampling(dynamic_1, dynamic_2, f): - import numpy as np - values = pd.Series(data = [float(i) for i in range(dynamic_1.number_of_steps())], index = dynamic_1.time_steps()) - display_alignment = False - output = '' - # try: - # target_values = resample(values, dynamic_1, dynamic_2) - # worked = True - # output += 'fine |' - # except ValueError: - # worked = False - # output += 'Value|' - # except IndexError: - # worked = False - # output += 'Index|' - # except SyntaxError: - # worked = False - # output += 'Loop |' - # except KeyError: - # worked = False - # output += 'Key |' - try: - target_values_new = resample(values, dynamic_1, dynamic_2) - worked_new = True - output += 'fine |' - except ValueError: - worked_new = False - output += 'Value|' - except: - worked_new = False - output += 'error|' - # if worked and worked_new: - # if not all(np.isclose(target_values.values, target_values_new.values, equal_nan=True)): - # output += 'error|' - # display_alignment = True - # else: - # output += 'fine |' - # else: - # output += ' |' - # target_values_into = pd.Series(index = dynamic_2.time_steps()) - # try: - # resample_into(values, dynamic_1, dynamic_2, target_values_into) - # worked_into = True - # output += 'fine |' - # except ValueError: - # worked_into = False - # output += 'Value|' - # except IndexError: - # worked_into = False - # output += 'Index|' - # except SyntaxError: - # worked_into = False - # output += 'Loop |' - # except KeyError: - # worked_into = False - # output += 'Key |' - target_values_into_new = pd.Series(index = dynamic_2.time_steps()) - try: - resample(values, dynamic_1, dynamic_2, target_values=target_values_into_new) - worked_into_new = True - output += 'fine |' - except ValueError: - worked_into_new = False - output += 'Value|' - except: - worked_into_new = False - output += 'error|' - # if worked_into and worked_into_new: - # if not all(np.isclose(target_values_into.values, target_values_into_new.values, equal_nan=True)): - # output += 'error|' - # display_alignment = True - # else: - # output += 'fine |' - # else: - # output += ' |' - model = pyo.ConcreteModel() - model.set = pyo.Set(initialize = list(dynamic_1.time_steps()), ordered=True) - model.target_set = pyo.Set(initialize = list(dynamic_2.time_steps()), ordered=True) - def rule(m, t): - return float(t) - model.variable = pyo.Var(model.set, initialize = rule) - # try: - # model.expression = resample_variable(model.variable, dynamic_1, dynamic_2, model.target_set) - # worked_variable = True - # output += 'fine |' - # except ValueError: - # worked_variable = False - # output += 'Value|' - # except IndexError: - # worked_variable = False - # output += 'Index|' - # except SyntaxError: - # worked_variable = False - # output += 'Loop |' - # except KeyError: - # worked_variable = False - # output += 'Key |' - try: - model.expression_new = resample_variable(model.variable, dynamic_1, dynamic_2, model.target_set) - worked_variable_new = True - output += 'fine |' - except ValueError: - worked_variable_new = False - output += 'Value|' - except: - worked_variable_new = False - output += 'error|' - # if worked_variable and worked_variable_new: - # if not all(np.isclose(np.array(list(i.expr() for i in model.expression.values())), np.array(list(i.expr() for i in model.expression_new.values())))): - # output += 'error\n' - # display_alignment = True - # else: - # output += 'fine\n' - # else: - # output += ' \n' - # if display_alignment: - # output += dynamic_1.display_alignment(dynamic_2) - f.write(output) - -def test_resampling(): - # import random - # random.seed(0) - # global_dynamic = GlobalDynamic([1 for i in range(100)]) - # dynamics = [] - # dynamics.append(global_dynamic.root()) - # for i in range(100): - # dynamic_number = random.randint(0, len(dynamics) - 1) - # dynamic = dynamics[dynamic_number] - # if random.random() < 0.75: - # original_indices = list(dynamic.all_indices()) - # number = random.randint(2, len(original_indices)) - # indices = [] - # for i in range(number): - # choice = random.choice(original_indices) - # original_indices.remove(choice) - # indices.append(choice) - # indices.sort() - # sub_dynamic = dynamic.sub_dynamic(indices) - # if sub_dynamic not in dynamics: - # dynamics.append(sub_dynamic) - # else: - # original_positions = list(range(dynamic.number_of_steps() + 1)) - # positions = [] - # for i in range(2): - # choice = random.choice(original_positions) - # original_positions.remove(choice) - # positions.append(choice) - # positions.sort() - # p_start = positions[0] - # p_end = positions[1] - # partial_dynamic = dynamic.partial_dynamic_p(p_start, p_end) - # if partial_dynamic not in dynamics: - # dynamics.append(partial_dynamic) - # f = open("resampling.txt", "w") - # for i, dynamic_1 in enumerate(dynamics): - # print(f'{i}') - # for j, dynamic_2 in enumerate(dynamics): - # test_single_resampling(dynamic_1, dynamic_2, f) - - global_dynamic = GlobalDynamic([1 for i in range(8)]) - root = global_dynamic.root() - sub_root = root.sub_dynamic([0, 1, 2, 3]) - sub_dynamics = [] - for i in range(16): - positions = [j for j in range(4) if (i >> j) % 2 == 0] - if len(positions) >= 2: - sub_dynamics.append(sub_root.sub_dynamic_p(positions)) - blocks = [] - blocks.append([root.sub_dynamic([0, 1, 2, 3])]) - dynamics_2 = [] - for i in range(5): - dynamics_2.append(root.sub_dynamic([j for j in range(5) if j != i])) - blocks.append(dynamics_2) - dynamics_3 = [] - for i in range(5): - for j in range(i + 1, 6): - dynamics_3.append(root.sub_dynamic([k for k in range(6) if k not in [i, j]])) - blocks.append(dynamics_3) - dynamics_4 = [] - for i in range(5): - for j in range(i + 1, 6): - for k in range(j + 1, 7): - dynamics_4.append(root.sub_dynamic([l for l in range(7) if l not in [i, j, k]])) - blocks.append(dynamics_4) - dynamics_5 = [] - for i in range(5): - for j in range(i + 1, 6): - for k in range(j + 1, 7): - for l in range(k + 1, 8): - dynamics_5.append(root.sub_dynamic([m for m in range(8) if m not in [i, j, k, l]])) - blocks.append(dynamics_5) - partial_intervalls = [] - for i in range(3): - for j in range(i + 1, 4): - partial_intervalls.append((i, j)) - f = open("resampling.txt", "w") - f.write(f'{sub_root.all_indices()} -> {sub_root.all_indices()}\n') - for (start_1, end_1) in partial_intervalls: - for (start_2, end_2) in partial_intervalls: - partial_1 = sub_root.partial_dynamic_p(start_1, end_1) - partial_2 = sub_root.partial_dynamic_p(start_2, end_2) - test_single_resampling(partial_1, partial_2, f) - for sub_dynamic in sub_dynamics: - f.write(f'{sub_root.all_indices()} -> {sub_dynamic.all_indices()}\n') - for (start_1, end_1) in partial_intervalls: - if end_1 > sub_root.number_of_steps(): - continue - for (start_2, end_2) in partial_intervalls: - if end_2 > sub_dynamic.number_of_steps(): - continue - partial_1 = sub_root.partial_dynamic_p(start_1, end_1) - partial_2 = sub_dynamic.partial_dynamic_p(start_2, end_2) - test_single_resampling(partial_1, partial_2, f) - test_single_resampling(partial_2, partial_1, f) - for block_number, block in enumerate(blocks): - print(f'Block {block_number}') - length = block_number + 4 - for i, dynamic_1 in enumerate(block): - for j, dynamic_2 in enumerate(block): - if any(k not in dynamic_1.all_indices() and k not in dynamic_2.all_indices() for k in range(length)): - continue - print(f'{i} -> {j}') - f.write(f'{dynamic_1.all_indices()} -> {dynamic_2.all_indices()}\n') - for (start_1, end_1) in partial_intervalls: - for (start_2, end_2) in partial_intervalls: - partial_1 = dynamic_1.partial_dynamic_p(start_1, end_1) - partial_2 = dynamic_2.partial_dynamic_p(start_2, end_2) - test_single_resampling(partial_1, partial_2, f)