Skip to content
Snippets Groups Projects
Commit 0e393a62 authored by Christoph von Oy's avatar Christoph von Oy
Browse files

Moved dynamics to Model_Library

parent 72c42127
No related branches found
No related tags found
No related merge requests found
"""
MIT License
Copyright (c) 2023 RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import pandas as pd
import pyomo.environ as pyo
class GlobalDynamic:
# d_steps: length of each time step in seconds
def __init__(self, d_steps):
self.d_steps = d_steps
self.root_dynamic = TrivialDynamic(self.d_steps, self)
self.dynamics = dict()
self.dynamics[self.root_dynamic] = (dict(), dict())
self.symbol_table = dict()
self.symbol_table[self.root_dynamic] = 0
self.assignments = dict()
def root(self):
return self.root_dynamic
# indices are relative to the root dynamic
def sub_dynamic(self, dynamic, indices):
if isinstance(dynamic, PartialDynamic):
dynamic = dynamic.reference
if tuple(indices) in self.dynamics[dynamic][0].keys():
return self.dynamics[dynamic][0][tuple(indices)]
sub_dynamic = BackedDynamic(dynamic, indices, self)
self.dynamics[dynamic][0][tuple(indices)] = sub_dynamic
self.dynamics[sub_dynamic] = (dict(), dict())
self.symbol_table[sub_dynamic] = len(self.symbol_table)
return sub_dynamic
# start/end are relative to dynamic
def partial_dynamic(self, dynamic, start, end):
if start == 0 and end == dynamic.number_of_steps():
return dynamic
if isinstance(dynamic, PartialDynamic):
start += dynamic.start
end += dynamic.start
dynamic = dynamic.reference
if (start, end) in self.dynamics[dynamic][1].keys():
return self.dynamics[dynamic][1][(start, end)]
partial_dynamic = PartialDynamic(dynamic, start, end, self)
self.dynamics[dynamic][1][(start, end)] = partial_dynamic
self.dynamics[partial_dynamic] = (dict(), dict())
self.symbol_table[partial_dynamic] = len(self.symbol_table)
return partial_dynamic
def get_assignment(self, dynamic, target_dynamic):
if isinstance(dynamic, PartialDynamic):
non_partial_dynamic = dynamic.reference
else:
non_partial_dynamic = dynamic
if isinstance(target_dynamic, PartialDynamic):
non_partial_target_dynamic = target_dynamic.reference
else:
non_partial_target_dynamic = target_dynamic
if (non_partial_dynamic, non_partial_target_dynamic) not in self.assignments:
self.assignments[non_partial_dynamic, non_partial_target_dynamic] = compute_assignment(non_partial_dynamic, non_partial_target_dynamic)
return self.assignments[non_partial_dynamic, non_partial_target_dynamic]
def display(self):
output = ''
for dynamic, (sub_dynamics, partial_dynamics) in self.dynamics.items():
if len(sub_dynamics) == 0 and len(partial_dynamics) == 0:
continue
output += f'{self.symbol_table[dynamic]}:\n'
for indices, sub_dynamic in sub_dynamics.items():
output += f'\t{indices}: {self.symbol_table[sub_dynamic]}\n'
for (start, end), partial_dynamic in partial_dynamics.items():
output += f'\t{start}-{end}: {self.symbol_table[partial_dynamic]}\n'
return output
# The root dynamic is
# for TrivialDynamic: self
# for BackedDynamic: root dynamic of its reference
# for PartialDynamic: root dynamic of its reference
# represents a continuus set of time steps relative to the global simulation start t_start
class Dynamic:
# returns the root dynamic of this dynamic
def root(self):
pass
# return true if other is an ancestor of this dynamic
def has_ancestor(self, other):
pass
# returns the number of steps in this dynamic
def number_of_steps(self):
pass
# returns all time steps in this dynamic
def time_steps(self):
pass
# returns all indices in this dynamic
def all_indices(self):
pass
# return true if the dynamic contains the given index
# index is relative to the root dynamic
def has_index(self, index):
pass
# returns the indices between the given start and end positions
# p_start and p_end are relative to this dynamic
def indices_within_p(self, p_start, p_end):
pass
# returns the indices between the given start and end indices
# i_start and i_end are relative to the root dynamic
def indices_within(self, i_start, i_end):
pass
# returns the index of the given position
# position is relative to this dynamic
def index_of(self, position):
pass
# returns the position of the given index
# index is relative to the root dynamic
def position_of(self, index):
pass
# returns the length of the time step at the given position
# position is relative to this dynamic
def step_length_p(self, position):
pass
# returns the length of the time step at the given position in hours
# position is relative to this dynamic
def step_size_p(self, position):
pass
# returns the length of the time step at the given index
# index is relative to the root dynamic
def step_length(self, index):
pass
# returns the length of the time step at the given index in hours
# index is relative to the root dynamic
def step_size(self, index):
pass
# returns the length af all time steps in this dynamic
def step_lengths(self):
pass
# returns the length af all time steps in this dynamic in hours
def step_sizes(self):
pass
# constructs a sub dynamic containing time steps starting at the time steps at the given positions with the last position representing the end of the last time step
# positions are relative to this dynamic
def sub_dynamic_p(self, positions):
pass
# constructs a sub dynamic containing time steps starting at the time steps at the given indices with the last index representing the end of the last time step
# indices are relative to the root dynamic
def sub_dynamic(self, indices):
pass
# construct a sub dynamic containing time steps between the given positions
# p_start and p_end are relative to this dynamic
def partial_dynamic_p(self, p_start, p_end):
pass
# construct a sub dynamic containing time steps between the given indices
# i_start and i_end are relative to the root dynamic
def partial_dynamic(self, i_start, i_end):
pass
def display(self):
numbers = self.all_indices()
rows = [[]]
row_ends = [0]
for number in numbers:
local_position = number - numbers[0]
row_index = 0
while row_index < len(rows) and row_ends[row_index] > local_position:
row_index += 1
if row_index == len(rows):
rows.append([])
row_ends.append(0)
rows[row_index].append(number)
row_ends[row_index] = local_position + len(str(number)) + 1
row_strings = []
for items in rows:
row_string = ''
string_end = 0
for number in items:
local_position = number - numbers[0]
row_string += ' ' * (local_position - string_end)
row_string += str(number)
string_end = local_position + len(str(number))
row_strings.append(row_string)
output = ''
for row_index in range(len(row_strings)):
output += row_strings[len(row_strings) - row_index - 1] + '\n'
string_end = 0
for number in numbers:
local_position = number - numbers[0]
output += ' ' * (local_position - string_end)
output += '|'
string_end = local_position + 1
output += '\n'
return output
def display_alignment(self, other):
p_self = 0
p_other = 0
numbers = []
while p_self <= self.number_of_steps() and p_other <= other.number_of_steps():
i_self = self.index_of(p_self)
i_other = other.index_of(p_other)
if i_self < i_other:
numbers.append(i_self)
p_self += 1
elif i_self > i_other:
numbers.append(i_other)
p_other += 1
else:
numbers.append(i_self)
p_self += 1
p_other += 1
while p_self < self.number_of_steps():
numbers.append(self.index_of(p_self))
p_self += 1
else:
while p_other < other.number_of_steps():
numbers.append(other.index_of(p_other))
p_other += 1
rows = [[]]
row_ends = [0]
for number in numbers:
local_position = number - numbers[0]
row_index = 0
while row_index < len(rows) and row_ends[row_index] > local_position:
row_index += 1
if row_index == len(rows):
rows.append([])
row_ends.append(0)
rows[row_index].append(number)
row_ends[row_index] = local_position + len(str(number)) + 1
row_strings = []
for items in rows:
row_string = ''
self_string_end = 0
for number in items:
local_position = number - numbers[0]
row_string += ' ' * (local_position - self_string_end)
row_string += str(number)
self_string_end = local_position + len(str(number))
row_strings.append(row_string)
output = ''
for row_index in range(len(row_strings)):
output += row_strings[len(row_strings) - row_index - 1] + '\n'
self_string = ' ' * (self.index_of(0) - numbers[0])
self_string_end = self.index_of(0) - numbers[0]
for index in self.all_indices():
local_position = index - numbers[0]
self_string += ' ' * (local_position - self_string_end)
self_string += '|'
self_string_end = local_position + 1
output += self_string + '\n'
other_string = ' ' * (other.index_of(0) - numbers[0])
other_string_end = other.index_of(0) - numbers[0]
for index in other.all_indices():
local_position = index - numbers[0]
other_string += ' ' * (local_position - other_string_end)
other_string += '|'
other_string_end = local_position + 1
output += other_string + '\n'
return output
# a dynamic defined by the length of each time step
class TrivialDynamic(Dynamic):
# d_steps: length of each time step in seconds
def __init__(self, d_steps, global_dynamic):
self.d_steps = d_steps
self.global_dynamic = global_dynamic
def root(self):
return self
def has_ancestor(self, other):
return self == other
def number_of_steps(self):
return len(self.d_steps)
def time_steps(self):
return range(len(self.d_steps))
def all_indices(self):
return range(len(self.d_steps) + 1)
def has_index(self, index):
return 0 <= index and index <= len(self.d_steps)
def indices_within_p(self, p_start, p_end):
return pd.RangeIndex(p_start, p_end)
def indices_within(self, i_start, i_end):
return pd.RangeIndex(i_start, i_end)
def index_of(self, position):
return position
def position_of(self, index):
return index
def step_length_p(self, position):
return self.d_steps[position]
def step_size_p(self, position):
return self.d_steps[position] / 3600
def step_length(self, index):
return self.d_steps[index]
def step_size(self, index):
return self.d_steps[index] / 3600
def step_lengths(self):
return self.d_steps
def step_sizes(self):
return [d_step / 3600 for d_step in self.d_steps]
def sub_dynamic_p(self, positions):
return self.global_dynamic.sub_dynamic(self, positions)
def sub_dynamic(self, indices):
return self.global_dynamic.sub_dynamic(self, indices)
def partial_dynamic_p(self, p_start, p_end):
return self.global_dynamic.partial_dynamic(self, p_start, p_end)
def partial_dynamic(self, i_start, i_end):
return self.global_dynamic.partial_dynamic(self, i_start, i_end)
# a dynamic definied by taking certain time steps form a reference dynamic
class BackedDynamic(Dynamic):
# reference: the dynamic that backs this dynamic
# indices: the indicies of the time steps contained in this dynamic with the last index representing the end of the last time step
# indices are relative to the reference dynamic
def __init__(self, reference, indices, global_dynamic):
self.reference = reference
self.indices = indices
self.global_dynamic = global_dynamic
def root(self):
return self.reference.root()
def has_ancestor(self, other):
return self == other or self.reference.has_ancestor(other)
def number_of_steps(self):
return len(self.indices) - 1
def time_steps(self):
return self.indices[:-1]
def all_indices(self):
return self.indices
def has_index(self, index):
return index in self.indices
def indices_within_p(self, p_start, p_end):
if p_start < 0 or len(self.indices) <= p_start or p_end < 0 or len(self.indices) <= p_end:
raise IndexError("The dynamic does not have indices at the positions!")
return self.indices[p_start:p_end]
def indices_within(self, i_start, i_end):
p_start = self.position_of(i_start)
p_end = self.position_of(i_end)
return self.indices[p_start:p_end]
def index_of(self, position):
if position < 0 or len(self.indices) <= position:
raise IndexError("The dynamic does not have a index for this position!")
return self.indices[position]
def position_of(self, index):
if index not in self.indices:
raise IndexError('The dynamic does not have a position for this index!')
return self.indices.index(index)
def step_length_p(self, position):
if position < 0 or len(self.indices) - 1 <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return sum(self.reference.step_length(index) for index in self.reference.indices_within(self.indices[position], self.indices[position + 1]))
def step_size_p(self, position):
if position < 0 or len(self.indices) - 1 <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return sum(self.reference.step_length(index) for index in self.reference.indices_within(self.indices[position], self.indices[position + 1])) / 3600
def step_length(self, index):
if index not in self.indices[:-1]:
raise IndexError("The dynamic does not have a time step at this index!")
return self.step_length_p(self.indices.index(index))
def step_size(self, index):
if index not in self.indices[:-1]:
raise IndexError("The dynamic does not have a time step at this index!")
return self.step_length_p(self.indices.index(index)) / 3600
def step_lengths(self):
return [self.step_length_p(position) for position in range(self.number_of_steps())]
def step_sizes(self):
return [self.step_length_p(position) / 3600 for position in range(self.number_of_steps())]
def sub_dynamic_p(self, positions):
if any(position < 0 or len(self.indices) <= position for position in positions):
raise IndexError("The dynamic does not have all requested indices!")
return self.global_dynamic.sub_dynamic(self, [self.indices[position] for position in positions])
def sub_dynamic(self, indices):
if any(index not in self.indices for index in indices):
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return self.global_dynamic.sub_dynamic(self, indices)
def partial_dynamic_p(self, p_start, p_end):
if p_start < 0 or len(self.indices) <= p_start or p_end < 0 or len(self.indices) <= p_end:
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return self.global_dynamic.partial_dynamic(self, p_start, p_end)
def partial_dynamic(self, i_start, i_end):
if i_start not in self.indices or i_end not in self.indices:
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return self.global_dynamic.partial_dynamic(self, self.indices.index(i_start), self.indices.index(i_end))
# a dynamic defined by taking a continuus intervall of time steps from a reference dynamic
class PartialDynamic(Dynamic):
# reference: the dynamic from which the intervall is taken
# start: the position in the reference dynamic of the first time step contained in this dynamic
# end: the position in the reference dynamic of the end of the last time step contained in this dynamic
# start/end are relative to the reference dynamic
def __init__(self, reference, start, end, global_dynamic):
self.reference = reference
self.start = start
self.end = end
self.global_dynamic = global_dynamic
def root(self):
return self.reference.root()
def has_ancestor(self, other):
return self == other or self.reference.has_ancestor(other)
def number_of_steps(self):
return self.end - self.start
def time_steps(self):
return self.reference.time_steps()[self.start:self.end]
def all_indices(self):
return self.reference.all_indices()[self.start:self.end + 1]
def has_index(self, index):
if self.reference.has_index(index):
reference_position = self.reference.position_of(index)
return self.start <= reference_position and reference_position <= self.end
return False
def indices_within_p(self, p_start, p_end):
if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end:
raise IndexError("The dynamic does not have all requested indices!")
return self.reference.indices_within_p(self.start + p_start, self.start + p_end)
def indices_within(self, i_start, i_end):
p_start = self.reference.position_of(i_start)
p_end = self.reference.position_of(i_end)
if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end:
raise IndexError("The dynamic does not have all requested indices!")
return self.reference.indices_within_p(self.start + p_start, self.start + p_end)
def index_of(self, position):
if position < 0 or self.end - self.start < position:
raise IndexError("The dynamic does not have a index for this position!")
return self.reference.index_of(self.start + position)
def position_of(self, index):
reference_position = self.reference.position_of(index)
if reference_position < self.start or self.end < reference_position:
raise IndexError('The dynamic does not have a position for this index!')
return reference_position - self.start
def step_length_p(self, position):
if position < 0 or self.end - self.start <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return self.reference.step_length_p(self.start + position)
def step_size_p(self, position):
if position < 0 or self.end - self.start <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return self.reference.step_size_p(self.start + position)
def step_length(self, index):
reference_position = self.reference.position_of(index)
if reference_position < self.start or self.end <= reference_position:
raise IndexError('The dynamic does not have a time step at this index!')
return self.step_length_p(reference_position - self.start)
def step_size(self, index):
reference_position = self.reference.position_of(index)
if reference_position < self.start or self.end <= reference_position:
raise IndexError('The dynamic does not have a time step at this index!')
return self.step_size_p(reference_position - self.start)
def step_lengths(self):
return [self.reference.step_length_p(position) for position in range(self.start, self.end)]
def step_sizes(self):
return [self.reference.step_size_p(position) for position in range(self.start, self.end)]
def sub_dynamic_p(self, positions):
if any(position < 0 or self.end - self.start < position for position in positions):
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return self.global_dynamic.sub_dynamic(self, [self.index_of(position) for position in positions])
def sub_dynamic(self, indices):
def filter(index):
reference_position = self.reference.position_of(index)
return reference_position < self.start or self.end < reference_position
if any(filter(index) for index in indices):
raise IndexError("The does not have all requested indices for the sub dynamic!")
return self.global_dynamic.sub_dynamic(self, indices)
def partial_dynamic_p(self, p_start, p_end):
if p_start < 0 or self.end - self.start < p_start or p_end < 0 or self.end - self.start < p_end:
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return self.global_dynamic.partial_dynamic(self, p_start, p_end)
def partial_dynamic(self, i_start, i_end):
reference_start = self.reference.position_of(i_start)
reference_end = self.reference.position_of(i_end)
if reference_start < self.start or self.end < reference_start or reference_end < self.start or self.end < reference_end:
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return self.global_dynamic.partial_dynamic(self, self.position_of(i_start), self.position_of(i_end))
def compute_assignment(dynamic, target_dynamic):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
if isinstance(dynamic, PartialDynamic):
raise ValueError("The source dyanmic cannot be a PartialDynamic!")
if isinstance(target_dynamic, PartialDynamic):
raise ValueError("The target dyanmic cannot be a PartialDynamic!")
if dynamic == target_dynamic:
return compute_assignment_same(dynamic, target_dynamic)
elif isinstance(target_dynamic, BackedDynamic) and target_dynamic.has_ancestor(dynamic):
return compute_assignment_to_backed(dynamic, target_dynamic)
elif isinstance(dynamic, BackedDynamic) and dynamic.has_ancestor(target_dynamic):
return compute_assignment_from_backed(dynamic, target_dynamic)
else:
return compute_assignment_common_reference(dynamic, target_dynamic)
# source_dynamic and target_dynamic are the same dynamic
def compute_assignment_same(dynamic, target_dynamic):
assignment = Assignment(dynamic.all_indices(), target_dynamic.all_indices())
assignment.add_bulk(target_dynamic.time_steps())
assignment.compile()
return assignment
# target_dynamic is BackedDynamic and has dynamic as an ancestor
def compute_assignment_to_backed(dynamic, target_dynamic):
assignment = Assignment(dynamic.all_indices(), target_dynamic.indices)
for target_index in target_dynamic.indices[:-1]:
target_position = target_dynamic.position_of(target_index)
source_indices = dynamic.indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1])
if len(source_indices) == 1:
assignment.add_individual(source_indices[0], 1, target_index)
else:
acc = []
for source_index in source_indices:
acc.append((source_index, dynamic.step_length(source_index) / target_dynamic.step_length_p(target_position)))
assignment.add_expression(acc, target_index)
assignment.compile()
return assignment
# dynamic is BackedDynamic and has target_dynamic as an ancestor
def compute_assignment_from_backed(dynamic, target_dynamic):
assignment = Assignment(dynamic.indices, target_dynamic.all_indices())
for source_position in range(dynamic.number_of_steps()):
source_index = dynamic.indices[source_position]
target_indices = target_dynamic.indices_within(source_index, dynamic.indices[source_position + 1])
assignment.add_distribute(source_index, target_indices)
assignment.compile()
return assignment
# dynamic and target_dynamic are BackedDynamic and share the same root dynamic
def compute_assignment_common_reference(dynamic, target_dynamic):
assignment = Assignment(dynamic.indices, target_dynamic.indices)
if dynamic.indices[-1] <= target_dynamic.indices[0] or target_dynamic.indices[-1] <= dynamic.indices[0]:
assignment.compile()
return assignment
target_i_start = target_dynamic.indices[0]
if target_i_start not in dynamic.indices:
source_i_start = dynamic.indices[0]
if source_i_start < target_i_start:
root = dynamic.root()
root_p_start = target_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
while root_p_start not in dynamic.indices[:-1]: # because root is a TrivialDynamic, root_p_start is equivalent to root_i_start
root_p_start -= 1
length += root.step_length_p(root_p_start)
source_position = dynamic.indices.index(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
target_position = 0
remaining_length = dynamic.step_length_p(source_position) - length
else: # Here source_i_start > target_i_start becuase the case of source_i_start == target_i_start is handled in the else branch of target_i_srat not in dynamic.indices
root = dynamic.root()
root_p_start = source_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
source_position = 0
while root_p_start not in target_dynamic.indices:
length += root.step_length_p(root_p_start)
root_p_start += 1
if root_p_start in dynamic.indices[:-1]:
length = 0
source_position += 1
elif root_p_start > dynamic.indices[-1]: # because root is a TrivialDynamic, positions and indices are equivalent
assignment.compile() # here, we discover that the entire source_dynamic does not cover one time_step of the target_dynamic
return assignment
target_position = target_dynamic.position_of(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
remaining_length = dynamic.step_length_p(source_position) - length
else:
source_position = dynamic.indices.index(target_i_start)
target_position = 0
remaining_length = dynamic.step_length(target_i_start)
while target_position < len(target_dynamic.indices) - 1:
remaining_target_length = target_dynamic.step_length_p(target_position)
acc = []
while remaining_target_length > 0:
if remaining_length == 0:
source_position += 1
if source_position >= len(dynamic.indices) - 1:
assignment.compile()
return assignment
remaining_length = dynamic.step_length_p(source_position)
if remaining_target_length <= remaining_length:
acc.append((dynamic.indices[source_position], remaining_target_length))
remaining_length -= remaining_target_length
remaining_target_length -= remaining_target_length
else:
acc.append((dynamic.indices[source_position], remaining_length))
remaining_target_length -= remaining_length
remaining_length -= remaining_length
for i, (index, factor) in enumerate(acc):
acc[i] = (index, factor / target_dynamic.step_length_p(target_position))
assignment.add_expression(acc, target_dynamic.index_of(target_position))
target_position += 1
assignment.compile()
return assignment
class Assignment:
def __init__(self, indices, target_indices):
self.index = list(indices)
self.target_index = list(target_indices)
self.bulk = pd.Series(False, index=indices[:-1])
self.bulk_target = pd.Series(False, index=target_indices[:-1])
self.distributes = []
self.expressions = []
self.data = dict.fromkeys(target_indices[:-1], None)
def add_bulk(self, targets):
self.bulk[targets] = True
self.bulk_target[targets] = True
for target in targets:
self.data[target] = (target, 1.0)
def add_distribute(self, source, targets):
self.bulk[targets[0]] = True
self.bulk_target[targets[0]] = True
if len(targets) > 1:
self.distributes.append((source, targets[1:]))
for target in targets:
self.data[target] = (source, 1.0)
def add_individual(self, source, factor, target):
if target == source and factor == 1.0:
self.bulk[target] = True
self.bulk_target[target] = True
elif factor == 1.0:
if len(self.distributes) == 0 or self.distributes[-1][0] != source:
self.distributes.append((source, []))
self.distributes[-1][1].append(target)
else:
raise ValueError(f'Tried to add a individual with a factor unequal to 1!')
self.data[target] = (source, factor)
def add_expression(self, expression, target):
if len(expression) == 1:
self.add_individual(expression[0][0], expression[0][1], target)
else:
self.expressions.append((expression, target))
self.data[target] = expression
def compile(self):
index_shift = {self.index[p]: self.index[p + 1] for p in range(len(self.index[:-1]))}
target_index_shift = {self.target_index[p]: self.target_index[p + 1] for p in range(len(self.target_index[:-1]))}
self.source_target_start = dict.fromkeys(self.index[:-1], None)
self.target_source_start = dict.fromkeys(self.target_index[:-1], None)
self.source_target_end = dict.fromkeys(self.index[1:], None)
self.target_source_end = dict.fromkeys(self.target_index[1:], None)
last_un_p_source = 0
last_un_p_source_target_end = None
last_source_target_end = None
for target, data in self.data.items():
if data is not None:
first_source = data[0][0] if isinstance(data, list) else data[0]
last_source = data[-1][0] if isinstance(data, list) else data[0]
p_stop = self.index.index(first_source) + 1
for p in range(last_un_p_source, p_stop):
self.source_target_start[self.index[p]] = target
last_un_p_source = p_stop
self.target_source_start[target] = first_source
if isinstance(data, list):
for source, _f in data[1:]:
self.source_target_end[source] = target
self.source_target_end[index_shift[last_source]] = target_index_shift[target]
self.target_source_end[target_index_shift[target]] = index_shift[last_source]
last_un_p_source_target_end = self.index.index(index_shift[last_source]) + 1
last_source_target_end = target_index_shift[target]
if last_un_p_source_target_end is not None:
for p in range(last_un_p_source_target_end, len(self.index)):
self.source_target_end[self.index[p]] = last_source_target_end
self.first_distribute = dict.fromkeys(self.target_index[:-1], None)
last_un_p_distribute = 0
for n, distribute in enumerate(self.distributes):
for p in range(last_un_p_distribute, self.target_index.index(distribute[1][0])):
self.first_distribute[self.target_index[p]] = (n, 0)
for m, target in enumerate(distribute[1]):
self.first_distribute[target] = (n, m)
last_un_p_distribute = self.target_index.index(distribute[1][-1]) + 1
self.last_distribute = dict.fromkeys(self.target_index[1:], None)
last_un_p_distribute = len(self.target_index)
for n_prime in range(len(self.distributes)):
n = len(self.distributes) - n_prime - 1
distribute = self.distributes[n]
for p in range(self.target_index.index(distribute[1][-1]) + 1, last_un_p_distribute):
self.last_distribute[self.target_index[p]] = (n, len(distribute[1]))
for m, target in enumerate(distribute[1][1:]):
self.last_distribute[target] = (n, m + 1)
last_un_p_distribute = self.target_index.index(distribute[1][0]) + 1
self.first_expression = dict.fromkeys(self.target_index[:-1], None)
last_un_p_expression = 0
for n, expression in enumerate(self.expressions):
for p in range(last_un_p_expression, self.target_index.index(expression[1]) + 1):
self.first_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
self.last_expression = dict.fromkeys(self.target_index[1:], None)
last_un_p_expression = len(self.target_index)
for n_prime in range(len(self.expressions)):
n = len(self.expressions) - n_prime - 1
expression = self.expressions[n]
for p in range(self.target_index.index(expression[1]) + 1, last_un_p_expression):
self.last_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
def resample(values, dynamic, target_dynamic, target_values=None):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
if target_values is None:
target_values = pd.Series(index = target_dynamic.time_steps())
source_i_start = dynamic.index_of(0)
source_i_end = dynamic.index_of(dynamic.number_of_steps())
target_i_start = target_dynamic.index_of(0)
target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps())
if source_i_end <= target_i_start or target_i_end <= source_i_start:
return target_values
assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic)
if source_i_start < target_i_start:
source_i_start = assignment.target_source_start[target_i_start]
elif source_i_start > target_i_start:
target_i_start = assignment.source_target_start[source_i_start]
if target_i_start is None:
return target_values
if source_i_end > target_i_end:
source_i_end = assignment.target_source_end[target_i_end]
elif source_i_end < target_i_end:
target_i_end = assignment.source_target_end[source_i_end]
if target_i_end is None:
return target_values
if target_i_start >= target_i_end:
return target_values
target_values.loc[(target_i_start <= target_values.index) & (target_values.index < target_i_end) & assignment.bulk_target.loc[target_dynamic.index_of(0):target_dynamic.index_of(target_dynamic.number_of_steps() - 1)].values] = values.loc[(source_i_start <= values.index) & (values.index < source_i_end) & assignment.bulk.loc[dynamic.index_of(0):dynamic.index_of(dynamic.number_of_steps() - 1)].values]
if assignment.first_distribute[target_i_start] is not None and assignment.last_distribute[target_i_end] is not None:
first_distribute = assignment.first_distribute[target_i_start]
last_distribute = assignment.last_distribute[target_i_end]
if first_distribute[0] == last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:last_distribute[1]]] = values.loc[distribute[0]]
elif first_distribute[0] < last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:]] = values.loc[distribute[0]]
for i in range(first_distribute[0] + 1, last_distribute[0]):
distribute = assignment.distributes[i]
target_values.loc[distribute[1]] = values.loc[distribute[0]]
distribute = assignment.distributes[last_distribute[0]]
target_values.loc[distribute[1][:last_distribute[1]]] = values.loc[distribute[0]]
if assignment.first_expression[target_i_start] is not None and assignment.last_expression[target_i_end] is not None:
for i in range(assignment.first_expression[target_i_start], assignment.last_expression[target_i_end] + 1):
expression = assignment.expressions[i]
target_values[expression[1]] = sum(values[i] * f for i, f in expression[0])
return target_values
def resample_variable(variable, dynamic, target_dynamic, target_set):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
source_i_start = dynamic.index_of(0)
source_i_end = dynamic.index_of(dynamic.number_of_steps())
target_i_start = target_dynamic.index_of(0)
target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps())
if target_i_start < source_i_start or source_i_end < target_i_end:
raise ValueError("The dynamic of the source variable has to cover the dynamic of the target variable!")
assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic)
target_variable = dict()
for t in target_set:
data = assignment.data[t]
if isinstance(data, list):
target_variable[t] = pyo.quicksum(variable[variable_index] * factor for variable_index, factor in assignment.data[t])
else:
target_variable[t] = variable[data[0]] * data[1]
return target_variable
def test_single_resampling(dynamic_1, dynamic_2, f):
import numpy as np
values = pd.Series(data = [float(i) for i in range(dynamic_1.number_of_steps())], index = dynamic_1.time_steps())
display_alignment = False
output = ''
# try:
# target_values = resample(values, dynamic_1, dynamic_2)
# worked = True
# output += 'fine |'
# except ValueError:
# worked = False
# output += 'Value|'
# except IndexError:
# worked = False
# output += 'Index|'
# except SyntaxError:
# worked = False
# output += 'Loop |'
# except KeyError:
# worked = False
# output += 'Key |'
try:
target_values_new = resample(values, dynamic_1, dynamic_2)
worked_new = True
output += 'fine |'
except ValueError:
worked_new = False
output += 'Value|'
except:
worked_new = False
output += 'error|'
# if worked and worked_new:
# if not all(np.isclose(target_values.values, target_values_new.values, equal_nan=True)):
# output += 'error|'
# display_alignment = True
# else:
# output += 'fine |'
# else:
# output += ' |'
# target_values_into = pd.Series(index = dynamic_2.time_steps())
# try:
# resample_into(values, dynamic_1, dynamic_2, target_values_into)
# worked_into = True
# output += 'fine |'
# except ValueError:
# worked_into = False
# output += 'Value|'
# except IndexError:
# worked_into = False
# output += 'Index|'
# except SyntaxError:
# worked_into = False
# output += 'Loop |'
# except KeyError:
# worked_into = False
# output += 'Key |'
target_values_into_new = pd.Series(index = dynamic_2.time_steps())
try:
resample(values, dynamic_1, dynamic_2, target_values=target_values_into_new)
worked_into_new = True
output += 'fine |'
except ValueError:
worked_into_new = False
output += 'Value|'
except:
worked_into_new = False
output += 'error|'
# if worked_into and worked_into_new:
# if not all(np.isclose(target_values_into.values, target_values_into_new.values, equal_nan=True)):
# output += 'error|'
# display_alignment = True
# else:
# output += 'fine |'
# else:
# output += ' |'
model = pyo.ConcreteModel()
model.set = pyo.Set(initialize = list(dynamic_1.time_steps()), ordered=True)
model.target_set = pyo.Set(initialize = list(dynamic_2.time_steps()), ordered=True)
def rule(m, t):
return float(t)
model.variable = pyo.Var(model.set, initialize = rule)
# try:
# model.expression = resample_variable(model.variable, dynamic_1, dynamic_2, model.target_set)
# worked_variable = True
# output += 'fine |'
# except ValueError:
# worked_variable = False
# output += 'Value|'
# except IndexError:
# worked_variable = False
# output += 'Index|'
# except SyntaxError:
# worked_variable = False
# output += 'Loop |'
# except KeyError:
# worked_variable = False
# output += 'Key |'
try:
model.expression_new = resample_variable(model.variable, dynamic_1, dynamic_2, model.target_set)
worked_variable_new = True
output += 'fine |'
except ValueError:
worked_variable_new = False
output += 'Value|'
except:
worked_variable_new = False
output += 'error|'
# if worked_variable and worked_variable_new:
# if not all(np.isclose(np.array(list(i.expr() for i in model.expression.values())), np.array(list(i.expr() for i in model.expression_new.values())))):
# output += 'error\n'
# display_alignment = True
# else:
# output += 'fine\n'
# else:
# output += ' \n'
# if display_alignment:
# output += dynamic_1.display_alignment(dynamic_2)
f.write(output)
def test_resampling():
# import random
# random.seed(0)
# global_dynamic = GlobalDynamic([1 for i in range(100)])
# dynamics = []
# dynamics.append(global_dynamic.root())
# for i in range(100):
# dynamic_number = random.randint(0, len(dynamics) - 1)
# dynamic = dynamics[dynamic_number]
# if random.random() < 0.75:
# original_indices = list(dynamic.all_indices())
# number = random.randint(2, len(original_indices))
# indices = []
# for i in range(number):
# choice = random.choice(original_indices)
# original_indices.remove(choice)
# indices.append(choice)
# indices.sort()
# sub_dynamic = dynamic.sub_dynamic(indices)
# if sub_dynamic not in dynamics:
# dynamics.append(sub_dynamic)
# else:
# original_positions = list(range(dynamic.number_of_steps() + 1))
# positions = []
# for i in range(2):
# choice = random.choice(original_positions)
# original_positions.remove(choice)
# positions.append(choice)
# positions.sort()
# p_start = positions[0]
# p_end = positions[1]
# partial_dynamic = dynamic.partial_dynamic_p(p_start, p_end)
# if partial_dynamic not in dynamics:
# dynamics.append(partial_dynamic)
# f = open("resampling.txt", "w")
# for i, dynamic_1 in enumerate(dynamics):
# print(f'{i}')
# for j, dynamic_2 in enumerate(dynamics):
# test_single_resampling(dynamic_1, dynamic_2, f)
global_dynamic = GlobalDynamic([1 for i in range(8)])
root = global_dynamic.root()
sub_root = root.sub_dynamic([0, 1, 2, 3])
sub_dynamics = []
for i in range(16):
positions = [j for j in range(4) if (i >> j) % 2 == 0]
if len(positions) >= 2:
sub_dynamics.append(sub_root.sub_dynamic_p(positions))
blocks = []
blocks.append([root.sub_dynamic([0, 1, 2, 3])])
dynamics_2 = []
for i in range(5):
dynamics_2.append(root.sub_dynamic([j for j in range(5) if j != i]))
blocks.append(dynamics_2)
dynamics_3 = []
for i in range(5):
for j in range(i + 1, 6):
dynamics_3.append(root.sub_dynamic([k for k in range(6) if k not in [i, j]]))
blocks.append(dynamics_3)
dynamics_4 = []
for i in range(5):
for j in range(i + 1, 6):
for k in range(j + 1, 7):
dynamics_4.append(root.sub_dynamic([l for l in range(7) if l not in [i, j, k]]))
blocks.append(dynamics_4)
dynamics_5 = []
for i in range(5):
for j in range(i + 1, 6):
for k in range(j + 1, 7):
for l in range(k + 1, 8):
dynamics_5.append(root.sub_dynamic([m for m in range(8) if m not in [i, j, k, l]]))
blocks.append(dynamics_5)
partial_intervalls = []
for i in range(3):
for j in range(i + 1, 4):
partial_intervalls.append((i, j))
f = open("resampling.txt", "w")
f.write(f'{sub_root.all_indices()} -> {sub_root.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
for (start_2, end_2) in partial_intervalls:
partial_1 = sub_root.partial_dynamic_p(start_1, end_1)
partial_2 = sub_root.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
for sub_dynamic in sub_dynamics:
f.write(f'{sub_root.all_indices()} -> {sub_dynamic.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
if end_1 > sub_root.number_of_steps():
continue
for (start_2, end_2) in partial_intervalls:
if end_2 > sub_dynamic.number_of_steps():
continue
partial_1 = sub_root.partial_dynamic_p(start_1, end_1)
partial_2 = sub_dynamic.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
test_single_resampling(partial_2, partial_1, f)
for block_number, block in enumerate(blocks):
print(f'Block {block_number}')
length = block_number + 4
for i, dynamic_1 in enumerate(block):
for j, dynamic_2 in enumerate(block):
if any(k not in dynamic_1.all_indices() and k not in dynamic_2.all_indices() for k in range(length)):
continue
print(f'{i} -> {j}')
f.write(f'{dynamic_1.all_indices()} -> {dynamic_2.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
for (start_2, end_2) in partial_intervalls:
partial_1 = dynamic_1.partial_dynamic_p(start_1, end_1)
partial_2 = dynamic_2.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment