Skip to content
Snippets Groups Projects
Commit 3e220d97 authored by Christoph von Oy's avatar Christoph von Oy
Browse files

First implementation

parent f82ba6f7
No related branches found
No related tags found
No related merge requests found
"""
MIT License
Copyright (c) 2023 RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import pandas as pd
# The root dynamic is
# for TrivialDynamic: self
# for BackedDynamic: root dynamic of its reference
# for PartialDynamic: root dynamic of its reference
# represents a continuus set of time steps relative to the global simulation start t_start
class Dynamic:
# returns the root dynamic of this dynamic
def get_root_dynamic(self):
pass
# return true if other is an ancestor of this dynamic
def has_ancestor(self, other):
pass
# returns the number of steps in this dynamic
def number_of_steps(self):
pass
# returns all time steps in this dynamic
def time_steps(self):
pass
# return true if the dynamic contains the given index
# index is relative to the root dynamic
def has_index(self, index):
pass
# returns the indices between the given start and end positions
# p_start and p_end are relative to this dynamic
def get_indices_within_p(self, p_start, p_end):
pass
# returns the indices between the given start and end indices
# i_start and i_end are relative to the root dynamic
def get_indices_within(self, i_start, i_end):
pass
# returns the index of the given position
# position is relative to this dynamic
def get_index_of(self, position):
pass
# returns the position of the given index
# index is relative to the root dynamic
def get_position_of(self, index):
pass
# returns the length of the time step at the given position
# position is relative to this dynamic
def step_size_p(self, position):
pass
# returns the length of the time step at the given index
# index is relative to the root dynamic
def step_size(self, index):
pass
# returns the length af all time steps in this dynamic
def step_sizes(self):
pass
# constructs a sub dynamic containing time steps starting at the time steps at the given positions with the last position representing the end of the last time step
# positions are relative to this dynamic
def get_sub_dynamic_p(self, positions):
pass
# constructs a sub dynamic containing time steps starting at the time steps at the given indices with the last index representing the end of the last time step
# indices are relative to the root dynamic
def get_sub_dynamic(self, indices):
pass
# construct a sub dynamic containing time steps between the given positions
# p_start and p_end are relative to the root dynamic
def get_partial_dynamic_p(self, p_start, p_end):
pass
# construct a sub dynamic containing time steps between the given indices
# i_start and i_end are relative to the root dynamic
def get_partial_dynamic(self, i_start, i_end):
pass
def to_string(self):
to_print = "["
for p in range(self.number_of_steps()):
to_print += "(" + str(self.get_index_of(p)) + ", " + str(self.step_size_p(p)) + ")"
if p != self.number_of_steps() - 1:
to_print += ", "
to_print += "]"
return to_print
# a dynamic defined by the length of each time step
class TrivialDynamic(Dynamic):
# d_step: the length all time_steps
def __init__(self, d_steps):
self.d_steps = d_steps
def get_root_dynamic(self):
return self
def has_ancestor(self, other):
return self == other
def number_of_steps(self):
return len(self.d_steps)
def time_steps(self):
return list(range(len(self.d_steps)))
def has_index(self, index):
return 0 <= index and index <= len(self.d_steps)
def get_indices_within_p(self, p_start, p_end):
return range(p_start, p_end)
def get_indices_within(self, i_start, i_end):
return range(i_start, i_end)
def get_index_of(self, position):
return position
def get_position_of(self, index):
return index
def step_size_p(self, position):
return self.d_steps[position]
def step_size(self, index):
return self.d_steps[index]
def step_sizes(self):
return self.d_steps
def get_sub_dynamic_p(self, positions):
return BackedDynamic(self, positions)
def get_sub_dynamic(self, indices):
return BackedDynamic(self, indices)
def get_partial_dynamic_p(self, p_start, p_end):
return PartialDynamic(self, p_start, p_end)
def get_partial_dynamic(self, i_start, i_end):
return PartialDynamic(self, i_start, i_end)
# a dynamic definied by taking certain time steps form a reference dynamic
class BackedDynamic(Dynamic):
# reference: the dynamic that backs this dynamic
# indices: the indicies of the time steps contained in this dynamic with the last index representing the end of the last time step
# indices are relative to the reference dynamic
def __init__(self, reference, indices):
self.reference = reference
self.indices = indices
def get_root_dynamic(self):
return self.reference.get_root_dynamic()
def has_ancestor(self, other):
return self == other or self.reference.has_ancestor(other)
def number_of_steps(self):
return len(self.indices) - 1
def time_steps(self):
return self.indices
def has_index(self, index):
return index in self.indices
def get_indices_within_p(self, p_start, p_end):
if p_start < 0 or len(self.indices) <= p_start or p_end < 0 or len(self.indices) <= p_end:
raise IndexError("The dynamic does not have indices at the positions!")
return self.indices[p_start:p_end]
def get_indices_within(self, i_start, i_end):
p_start = self.get_position_of(i_start)
p_end = self.get_position_of(i_end)
return self.indices[p_start:p_end]
def get_index_of(self, position):
if position < 0 or len(self.indices) <= position:
raise IndexError("The dynamic does not have a index for this position!")
return self.indices[position]
def get_position_of(self, index):
if index not in self.indices:
raise IndexError('The dynamic does not have a position for this index!')
return self.indices.index(index)
def step_size_p(self, position):
if position < 0 or len(self.indices) - 1 <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return sum(self.reference.step_size(index) for index in self.reference.get_indices_within(self.indices[position], self.indices[position + 1]))
def step_size(self, index):
if index not in self.indices[:-1]:
raise IndexError("The dynamic does not have a time step at this index!")
return self.step_size_p(self.indices.index(index))
def step_sizes(self):
return [self.step_size_p(position) for position in range(self.number_of_steps)]
def get_sub_dynamic_p(self, positions):
if any(position < 0 or len(self.indices) <= position for position in positions):
raise IndexError("The dynamic does not have all requested indices!")
return BackedDynamic(self.reference, [self.indices[position] for position in positions])
def get_sub_dynamic(self, indices):
if any(index not in self.indices for index in indices):
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return BackedDynamic(self.reference, indices)
def get_partial_dynamic_p(self, p_start, p_end):
if p_start < 0 or len(self.indices) - 1 <= p_start or p_end < 0 or len(self.indices) - 1 <= p_end:
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return PartialDynamic(self, p_start, p_end)
def get_partial_dynamic(self, i_start, i_end):
if i_start not in self.indices or i_end not in self.indices:
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return PartialDynamic(self, self.indices.index(i_start), self.indices.index(i_end))
# a dynamic defined by taking a continuus intervall of time steps from a reference dynamic
class PartialDynamic(Dynamic):
# reference: the dynamic from which the intervall is taken
# start: the position in the reference dynamic of the first time step contained in this dynamic
# end: the position in the reference dynamic of the end of the last time step contained in this dynamic
# start/end are relative to the reference dynamic
def __init__(self, reference, start, end):
self.reference = reference
self.start = start
self.end = end
def get_root_dynamic(self):
return self.reference.get_root_dynamic()
def has_ancestor(self, other):
return self == other or self.reference.has_ancestor(other)
def number_of_steps(self):
return self.end - self.start
def time_steps(self):
return self.reference.time_steps()[self.start:self.end]
def has_index(self, index):
reference_position = self.reference.get_position_of(index)
return self.start <= reference_position or reference_position <= self.end
def get_indices_within_p(self, p_start, p_end):
if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end:
raise IndexError("The dynamic does not have all requested indices!")
return self.reference.get_indices_within_p(self.start + p_start, self.start + p_end)
def get_indices_within(self, i_start, i_end):
p_start = self.reference.get_position_of(i_start)
p_end = self.reference.get_position_of(i_end)
if p_start < self.start or self.end < p_start or p_end < self.start or self.end < p_end:
raise IndexError("The dynamic does not have all requested indices!")
return self.reference.get_indices_within_p(self.start + p_start, self.start + p_end)
def get_index_of(self, position):
if position < 0 or self.end - self.start < position:
raise IndexError("The dynamic does not have a index for this position!")
return self.reference.get_index_of(self.start + position)
def get_position_of(self, index):
reference_position = self.reference.get_position_of(index)
if reference_position < self.start or self.end < reference_position:
raise IndexError('The dynamic does not have a position for this index!')
return reference_position - self.start
def step_size_p(self, position):
if position < 0 or self.end - self.start <= position:
raise IndexError("The dynamic does not have a time step at this position!")
return self.reference.step_size_p(self.start + position)
def step_size(self, index):
reference_position = self.reference.get_position_of(index)
if reference_position < self.start or self.end <= reference_position:
raise IndexError('The dynamic does not have a time step at this index!')
return self.step_size_p(reference_position - self.start)
def step_sizes(self):
return [self.reference.step_size_p(position) for position in range(self.start, self.end)]
def get_sub_dynamic_p(self, positions):
if any(position < 0 or self.end - self.start < position for position in positions):
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return self.reference.get_sub_dynamic_p([self.start + position for position in positions])
def get_sub_dynamic(self, indices):
def filter(self, index):
reference_position = self.reference.get_position_of(index)
return reference_position < self.start or self.end < reference_position
if any(filter(index) for index in indices):
raise IndexError("The does not have all requested indices for the sub dynamic!")
return self.reference.get_sub_dynamic(indices)
def get_partial_dynamic_p(self, p_start, p_end):
if p_start < 0 or self.end - self.start < p_start or p_end < 0 or self.end - self.start < p_end:
raise IndexError("The dynamic does not have all requested positions for the sub dynamic!")
return PartialDynamic(self.reference, self.start + p_start, self.start + p_end)
def get_partial_dynamic(self, i_start, i_end):
reference_start = self.reference.get_position_of(i_start)
reference_end = self.reference.get_position_of(i_end)
if reference_start < self.start or self.end < reference_start or reference_end < self.start or self.end < reference_end:
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return PartialDynamic(self.reference, reference_start, reference_end)
# only works if both dynamics share the same root dynamic
def resample(values, dynamic, target_dynamic):
if dynamic.get_root_dynamic() != target_dynamic.get_root_dynamic():
raise ValueError("Both dynamics have to have the same root dynamic!")
if target_dynamic.get_index_of(0) < dynamic.get_index_of(0) or dynamic.get_index_of(dynamic.number_of_steps()) < target_dynamic.get_index_of(target_dynamic.number_of_steps()):
raise ValueError("The source dynamic does not cover the time steps contained in target_dynamic!")
non_partial_dynamic = dynamic
while isinstance(non_partial_dynamic, PartialDynamic):
non_partial_dynamic = non_partial_dynamic.reference
source_p_start = non_partial_dynamic.get_position_of(dynamic.get_index_of(0))
source_p_end = non_partial_dynamic.get_position_of(dynamic.get_index_of(dynamic.number_of_steps()))
non_partial_target_dynamic = target_dynamic
while isinstance(non_partial_target_dynamic, PartialDynamic):
non_partial_target_dynamic = non_partial_target_dynamic.reference
target_p_start = non_partial_target_dynamic.get_position_of(target_dynamic.get_index_of(0))
target_p_end = non_partial_target_dynamic.get_position_of(target_dynamic.get_index_of(target_dynamic.number_of_steps()))
if non_partial_dynamic == non_partial_target_dynamic:
return resample_same(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end)
elif isinstance(non_partial_target_dynamic, BackedDynamic) and non_partial_target_dynamic.has_ancestor(non_partial_dynamic):
return resample_to_backed(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end)
elif isinstance(non_partial_dynamic, BackedDynamic) and non_partial_dynamic.has_ancestor(non_partial_target_dynamic):
return resample_from_backed(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end)
else:
return resample_common_reference(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end)
def resample_into(values, dynamic, target_dynamic, target_values):
if dynamic.get_root_dynamic() != target_dynamic.get_root_dynamic():
raise ValueError("Both dynamics have to have the same root dynamic!")
non_partial_dynamic = dynamic
while isinstance(non_partial_dynamic, PartialDynamic):
non_partial_dynamic = non_partial_dynamic.reference
source_p_start = non_partial_dynamic.get_position_of(dynamic.get_index_of(0))
source_p_end = non_partial_dynamic.get_position_of(dynamic.get_index_of(dynamic.number_of_steps()))
non_partial_target_dynamic = target_dynamic
while isinstance(non_partial_target_dynamic, PartialDynamic):
non_partial_target_dynamic = non_partial_target_dynamic.reference
target_p_start = non_partial_target_dynamic.get_position_of(target_dynamic.get_index_of(0))
target_p_end = non_partial_target_dynamic.get_position_of(target_dynamic.get_index_of(target_dynamic.number_of_steps()))
if non_partial_dynamic == non_partial_target_dynamic:
return resample_into_same(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end, target_values)
elif isinstance(non_partial_target_dynamic, BackedDynamic) and non_partial_target_dynamic.has_ancestor(non_partial_dynamic):
return resample_into_to_backed(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end, target_values)
elif isinstance(non_partial_dynamic, BackedDynamic) and non_partial_dynamic.has_ancestor(non_partial_target_dynamic):
return resample_into_from_backed(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end, target_values)
else:
return resample_into_common_reference(values, non_partial_dynamic, source_p_start, source_p_end, non_partial_target_dynamic, target_p_start, target_p_end, target_values)
# source_dynamic and target_dynamic are the same dynamic
def resample_same(values, dynamic, source_p_start, source_p_end, target_dnymiac, target_p_start, target_p_end):
return values.loc[target_dnymiac.get_indices_within_p(target_p_start, target_p_end)]
def resample_into_same(values, dynamic, source_p_start, source_p_end, target_dnymiac, target_p_start, target_p_end, target_values):
i_start = max(dynamic.get_index_of(source_p_start), target_dnymiac.get_index_of(target_p_start))
i_end = min(dynamic.get_index_of(source_p_end), target_dnymiac.get_index_of(target_p_end))
temp_values = values.loc[dynamic.get_indices_within(i_start, i_end)]
if i_start == target_dnymiac.get_index_of(target_p_start) and i_end == target_dnymiac.get_index_of(target_p_end):
target_values.loc[:] = temp_values
else:
target_values.loc[target_dnymiac.get_indices_within(i_start, i_end)] = temp_values
return target_values
# target_dynamic is BackedDynamic and has dynamic as an ancestor
def resample_to_backed(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end):
target_values = values.loc[target_dynamic.indices[target_p_start:target_p_end]]
for target_position in range(target_p_start, target_p_end):
acc = 0
sum = 0
for source_index in dynamic.get_indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1]):
acc += values.loc[source_index] * dynamic.step_size(source_index)
sum += dynamic.step_size(source_index)
target_values.loc[target_dynamic.indices[target_position]] = acc / float(sum)
return target_values
def resample_into_to_backed(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end, target_values):
source_i_start = dynamic.get_index_of(source_p_start)
target_i_start = target_dynamic.indices[target_p_start]
if source_i_start == target_i_start:
target_p_start = target_dynamic.get_position_of(source_i_start)
# elif source_i_start < target_i_start: # target_p_start assigned here is the same as the input target_p_start so skip this branch
# target_p_start = target_dynamic.get_position_of(target_i_start)
elif source_i_start > target_i_start:
while source_i_start not in target_dynamic.indices:
source_p_start += 1
source_i_start = dynamic.get_index_of(source_p_start)
target_p_start = target_dynamic.get_position_of(source_i_start)
source_i_end = dynamic.get_index_of(source_p_end)
target_i_end = target_dynamic.indices[target_p_end]
if source_i_end == target_i_end:
target_p_end = target_dynamic.get_position_of(source_i_end)
# elif source_i_end > target_i_end: # target_p_end assigned here is the same as the input target_p_end so skip this branch
# target_p_end = target_dynamic.get_position_of(target_i_end)
elif source_i_end < target_i_end:
while source_i_end not in target_dynamic.indices:
source_p_end -= 1
source_i_end = dynamic.get_index_of(source_p_end)
target_p_end = target_dynamic.get_position_of(source_i_end)
for target_position in range(target_p_start, target_p_end):
acc = 0
sum = 0
for source_index in dynamic.get_indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1]):
acc += values.loc[source_index] * dynamic.step_size(source_index)
sum += dynamic.step_size(source_index)
target_values.loc[target_dynamic.indices[target_position]] = acc / float(sum)
return target_values
# dynamic is BackedDynamic and has target_dynamic as an ancestor
def resample_from_backed(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end):
target_values = pd.Series(dtype = values.dtype, index = (target_dynamic.get_index_of(position) for position in range(target_p_start, target_p_end)))
source_i_start = dynamic.indices[source_p_start]
target_i_start = target_dynamic.get_index_of(target_p_start)
if source_i_start < target_i_start:
if target_i_start not in dynamic.indices:
target_p_start_succ = target_p_start + 1
target_i_start_succ = target_dynamic.get_index_of(target_p_start_succ)
while target_i_start_succ not in dynamic.indices:
target_p_start_succ += 1
target_i_start_succ = target_dynamic.get_index_of(target_p_start_succ)
source_p_start = dynamic.get_position_of(target_i_start_succ)
target_indices = target_dynamic.get_indices_within(target_i_start, target_i_start_succ)
target_values.loc[target_indices] = values[dynamic.indices[source_p_start - 1]]
else:
source_p_start = dynamic.get_position_of(target_i_start)
source_i_end = dynamic.indices[source_p_end]
target_i_end = target_dynamic.get_index_of(target_p_end)
if target_i_end < source_i_end:
if target_i_end not in dynamic.indices:
target_p_end_prev = target_p_end - 1
target_i_end_prev = target_dynamic.get_index_of(target_p_end_prev)
while target_i_end_prev not in dynamic.indices:
target_p_end_prev -= 1
target_i_end_prev = target_dynamic.get_index_of(target_p_end_prev)
source_p_end = dynamic.get_position_of(target_i_end_prev)
target_indices = target_dynamic.get_indices_within(target_i_end_prev, target_i_end)
target_values.loc[target_indices] = values[dynamic.indices[source_p_end]]
for source_position in range(source_p_start, source_p_end):
target_indices = target_dynamic.get_indices_within(dynamic.indices[source_position], dynamic.indices[source_position + 1])
target_values.loc[target_indices] = values[dynamic.indices[source_position]]
return target_values
def resample_into_from_backed(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end, target_values):
source_i_start = dynamic.indices[source_p_start]
target_i_start = target_dynamic.get_index_of(target_p_start)
if source_i_start < target_i_start:
if target_i_start not in dynamic.indices:
target_p_start_succ = target_p_start + 1
target_i_start_succ = target_dynamic.get_index_of(target_p_start_succ)
while target_i_start_succ not in dynamic.indices:
target_p_start_succ += 1
target_i_start_succ = target_dynamic.get_index_of(target_p_start_succ)
source_p_start = dynamic.get_position_of(target_i_start_succ)
target_indices = target_dynamic.get_indices_within(target_i_start, target_i_start_succ)
target_values.loc[target_indices] = values[dynamic.indices[source_p_start - 1]]
else:
source_p_start = dynamic.get_position_of(target_i_start)
source_i_end = dynamic.indices[source_p_end]
target_i_end = target_dynamic.get_index_of(target_p_end)
if target_i_end < source_i_end:
if target_i_end not in dynamic.indices:
target_p_end_prev = target_p_end - 1
target_i_end_prev = target_dynamic.get_index_of(target_p_end_prev)
while target_i_end_prev not in dynamic.indices:
target_p_end_prev -= 1
target_i_end_prev = target_dynamic.get_index_of(target_p_end_prev)
source_p_end = dynamic.get_position_of(target_i_end_prev)
target_indices = target_dynamic.get_indices_within(target_i_end_prev, target_i_end)
target_values.loc[target_indices] = values[dynamic.indices[source_p_end]]
for source_position in range(source_p_start, source_p_end):
target_indices = target_dynamic.get_indices_within(dynamic.indices[source_position], dynamic.indices[source_position + 1])
target_values.loc[target_indices] = values[dynamic.indices[source_position]]
return target_values
# dynamic and target_dynamic share the same root dynamic
def resample_common_reference(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end):
target_values = pd.Series(dtype = values.dtype, index = (target_dynamic.get_index_of(position) for position in range(target_p_start, target_p_end)))
target_i_start = target_dynamic.get_index_of(target_p_start)
root_dynamic = dynamic.get_root_dynamic()
root_p_start = root_dynamic.get_position_of(target_i_start)
root_i_start = target_i_start
length = 0
while not dynamic.has_index(root_i_start):
root_p_start -= 1
root_i_start = root_dynamic.get_index_of(root_p_start)
length += root_dynamic.step_size(root_i_start)
source_position = dynamic.get_position_of(root_i_start)
target_position = target_p_start
remaining_length = dynamic.step_size_p(source_position) - length
while target_position < target_p_end:
remaining_target_length = target_dynamic.step_size_p(target_position)
acc = 0
while remaining_target_length > 0:
if remaining_length == 0:
source_position += 1
remaining_length = dynamic.step_size_p(source_position)
if remaining_target_length <= remaining_length:
acc += values[dynamic.get_index_of(source_position)] * remaining_target_length
remaining_length -= remaining_target_length
remaining_target_length -= remaining_target_length
else:
acc += values[dynamic.get_index_of(source_position)] * remaining_length
remaining_target_length -= remaining_length
remaining_length -= remaining_length
target_values[target_dynamic.get_index_of(target_position)] = acc / target_dynamic.step_size_p(target_position)
target_position += 1
return target_values
def resample_into_common_reference(values, dynamic, source_p_start, source_p_end, target_dynamic, target_p_start, target_p_end, target_values):
source_i_start = dynamic.get_index_of(source_p_start)
target_i_start = target_dynamic.get_index_of(target_p_start)
if source_i_start > target_i_start:
root_dynamic = dynamic.get_root_dynamic()
root_p_start = root_dynamic.get_position_of(source_i_start)
root_i_start = source_i_start
length = 0
while not target_dynamic.has_index(root_i_start):
length += root_dynamic.step_size(root_i_start)
root_p_start += 1
root_i_start = root_dynamic.get_index_of(root_p_start)
source_position = source_p_start
target_position = target_dynamic.get_position_of(root_i_start)
else:
root_dynamic = dynamic.get_root_dynamic()
root_p_start = root_dynamic.get_position_of(target_i_start)
root_i_start = target_i_start
length = 0
while not dynamic.has_index(root_i_start):
root_p_start -= 1
root_i_start = root_dynamic.get_index_of(root_p_start)
length += root_dynamic.step_size(root_i_start)
source_position = dynamic.get_position_of(root_i_start)
target_position = target_p_start
remaining_length = dynamic.step_size_p(source_position) - length
while target_position < target_p_end:
remaining_target_length = target_dynamic.step_size_p(target_position)
acc = 0
while remaining_target_length > 0:
if remaining_length == 0:
source_position += 1
if source_position >= source_p_end:
return target_values
remaining_length = dynamic.step_size_p(source_position)
if remaining_target_length <= remaining_length:
acc += values[dynamic.get_index_of(source_position)] * remaining_target_length
remaining_length -= remaining_target_length
remaining_target_length -= remaining_target_length
else:
acc += values[dynamic.get_index_of(source_position)] * remaining_length
remaining_target_length -= remaining_length
remaining_length -= remaining_length
target_values[target_dynamic.get_index_of(target_position)] = acc / target_dynamic.step_size_p(target_position)
target_position += 1
return target_values
def test_single_resampling(dynamic, target_dynamic):
index = [dynamic.get_index_of(position) for position in range(dynamic.number_of_steps())]
values = pd.Series(data = [float(i) for i in range(len(index))], index = index)
try:
print(resample(values, dynamic, target_dynamic))
except:
print('hey')
target_index = [target_dynamic.get_index_of(position) for position in range(target_dynamic.number_of_steps())]
target_values = pd.Series(dtype = float, index = target_index)
print(resample_into(values, dynamic, target_dynamic, target_values))
def test_resampling():
dynamics = {}
dynamics['d'] = TrivialDynamic([1 for i in range(36)])
dynamics['d_1'] = PartialDynamic(dynamics['d'], 0, 36)
dynamics['d_2'] = PartialDynamic(dynamics['d'], 1, 35)
dynamics['d_3'] = BackedDynamic(dynamics['d'], [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36])
dynamics['d_3_1'] = PartialDynamic(dynamics['d_3'], 0, 18)
dynamics['d_3_2'] = PartialDynamic(dynamics['d_3'], 1, 17)
dynamics['d_3_3'] = BackedDynamic(dynamics['d_3'], [0, 4, 8, 12, 16, 20, 24, 28, 32, 36])
dynamics['d_3_3_1'] = PartialDynamic(dynamics['d_3_3'], 0, 9)
dynamics['d_3_3_2'] = PartialDynamic(dynamics['d_3_3'], 1, 8)
dynamics['d_3_4'] = BackedDynamic(dynamics['d_3'], [2, 6, 10, 14, 18, 22, 26, 30, 34])
dynamics['d_3_4_1'] = PartialDynamic(dynamics['d_3_4'], 0, 8)
dynamics['d_3_4_2'] = PartialDynamic(dynamics['d_3_4'], 1, 7)
dynamics['d_4'] = BackedDynamic(dynamics['d'], [3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33])
dynamics['d_4_1'] = PartialDynamic(dynamics['d_4'], 0, 10)
dynamics['d_4_2'] = PartialDynamic(dynamics['d_4'], 1, 9)
dynamics['d_4_3'] = BackedDynamic(dynamics['d_4'], [3, 9, 15, 21, 27, 33])
dynamics['d_4_3_1'] = PartialDynamic(dynamics['d_4_3'], 0, 5)
dynamics['d_4_3_2'] = PartialDynamic(dynamics['d_4_3'], 1, 4)
dynamics['d_4_4'] = BackedDynamic(dynamics['d_4'], [6, 12, 18, 24, 30])
dynamics['d_4_4_1'] = PartialDynamic(dynamics['d_4_4'], 0, 4)
dynamics['d_4_4_2'] = PartialDynamic(dynamics['d_4_4'], 1, 3)
test_resampling(dynamics['d_1'], dynamics['d_2'])
test_resampling(dynamics['d_3_4'], dynamics['d_4'])
test_resampling(dynamics['d_3_4_2'], dynamics['d_4'])
test_resampling(dynamics['d_3_4'], dynamics['d_4_2'])
test_resampling(dynamics['d_3_4_2'], dynamics['d_4_2'])
...@@ -118,8 +118,8 @@ def calc_total_irradiance(irradiance, timer, beta, psi_f, phi, lambda_st, lambda ...@@ -118,8 +118,8 @@ def calc_total_irradiance(irradiance, timer, beta, psi_f, phi, lambda_st, lambda
g_total = g_diffuse + g_reflected + g_beam g_total = g_diffuse + g_reflected + g_beam
return g_total return g_total
def generate_g_t_series(irradiance, beta, psi_f, phi, lambda_st, lambda_1, t_start, t_horizon, t_step): def generate_g_t_series(irradiance, beta, psi_f, phi, lambda_st, lambda_1, t_start, dynamic):
T = list(pd.date_range(pd.Timestamp(t_start), pd.Timestamp(t_start) + timedelta(hours=t_horizon * t_step - t_step), freq=str(t_step) + 'H')) T = list(irradiance.index)
g_total_lst = [] g_total_lst = []
for t in T: for t in T:
g_total_lst.append(calc_total_irradiance(irradiance.loc[t], t, beta, psi_f, phi, lambda_st, lambda_1)) g_total_lst.append(calc_total_irradiance(irradiance.loc[t], t, beta, psi_f, phi, lambda_st, lambda_1))
......
...@@ -30,23 +30,39 @@ from Tooling.input_profile_processor.calc_irradiance import generate_g_t_series ...@@ -30,23 +30,39 @@ from Tooling.input_profile_processor.calc_irradiance import generate_g_t_series
from Tooling.modifier import Modifier from Tooling.modifier import Modifier
def process_input_profiles(input_profile_dict, t_start, t_horizon, t_step): def process_input_profiles(input_profile_dict, t_start, dynamic):
# This entire process assumes three things: d_step_min = min(dynamic.step_size_p(position) for position in range(dynamic.number_of_steps()))
# 1. t_step devides a day into an integer amount of steps # This entire process assumes four things:
# 2. t_step devides the intervall from YYYY.MM.DD 00:00:00 of the day containing t_start to t_start into an integer amount of steps # 1. d_step_min devides a day into an integer amount of steps
# 2. d_step_min devides the intervall from YYYY.MM.DD 00:00:00 of the day containing t_start to t_start into an integer amount of steps
# 3. The timestamps of the desired timeseries either line up the with timestamps provided by the profiles in the files or the other way around # 3. The timestamps of the desired timeseries either line up the with timestamps provided by the profiles in the files or the other way around
# 4. The sizes of all time steps in the given dynamic are devisible by
input_profiles = {} input_profiles = {}
for input_profile_name, input_profile_config in input_profile_dict.items(): for input_profile_name, input_profile_config in input_profile_dict.items():
if 'file' in input_profile_config: if 'file' in input_profile_config:
profile = load_profile(input_profile_name, input_profile_config['file'], t_start, t_horizon, t_step) profile = load_profile(input_profile_name, input_profile_config['file'], t_start, dynamic, d_step_min)
elif 'generate' in input_profile_config: elif 'generate' in input_profile_config:
profile = generate_profile(input_profile_name, input_profile_config['type'], input_profile_config['generate'], input_profiles, t_start, t_horizon, t_step) profile = generate_profile(input_profile_name, input_profile_config['type'], input_profile_config['generate'], input_profiles, t_start, dynamic, d_step_min)
elif 'modify' in input_profile_config: elif 'modify' in input_profile_config:
profile = modify_profile(input_profile_name, input_profile_config['type'], input_profile_config['modify'], input_profiles, t_start, t_horizon, t_step) profile = modify_profile(input_profile_name, input_profile_config['type'], input_profile_config['modify'], input_profiles, t_start, dynamic, d_step_min)
input_profiles[input_profile_name] = (input_profile_config['type'], profile) input_profiles[input_profile_name] = (input_profile_config['type'], profile)
for input_profile_name, (input_profile_type, input_profile) in input_profiles.items(): for input_profile_name, (input_profile_type, input_profile) in input_profiles.items():
input_profile = input_profile[t_start:t_start + pd.Timedelta(hours = t_horizon * t_step - t_step)] input_profile.drop(pd.date_range(input_profile.index[0], t_start - timedelta(hours = d_step_min), freq = str(d_step_min) + 'H'), inplace = True)
input_profile.drop(pd.date_range(t_start + pd.Timedelta(hours = sum(dynamic.step_size_p(position) for position in range(dynamic.number_of_steps()))), input_profile.index[-1], freq = str(d_step_min) + 'H'), inplace = True)
if len(input_profile) != dynamic.number_of_steps():
index = 0
drop = []
for position in range(dynamic.number_of_steps()):
if dynamic.step_size_p(position) != d_step_min:
number_of_steps = int(dynamic.step_size_p(position) / d_step_min)
input_profile.iloc[index] = sum(d_step_min * input_profile.iloc[index + i] for i in range(number_of_steps)) / dynamic.step_size_p(position)
drop.extend(index + i + 1 for i in range(number_of_steps - 1))
index += number_of_steps
else:
index += 1
input_profile.drop((input_profile.index[i] for i in drop), inplace = True)
if input_profile_type == 'irradiance': if input_profile_type == 'irradiance':
lambda_1 = 14.122 lambda_1 = 14.122
...@@ -54,21 +70,21 @@ def process_input_profiles(input_profile_dict, t_start, t_horizon, t_step): ...@@ -54,21 +70,21 @@ def process_input_profiles(input_profile_dict, t_start, t_horizon, t_step):
phi = 52.21 phi = 52.21
psi_f = 0 psi_f = 0
beta = 30 beta = 30
input_profile = generate_g_t_series(input_profile, beta, psi_f, phi, lambda_st, lambda_1, t_start, t_horizon, t_step) input_profile = generate_g_t_series(input_profile, beta, psi_f, phi, lambda_st, lambda_1, t_start, dynamic)
input_profile = input_profile.squeeze() input_profile = input_profile.squeeze()
input_profile.set_axis(list(range(t_horizon)), inplace = True) input_profile.set_axis(list(range(dynamic.number_of_steps())), inplace = True)
input_profiles[input_profile_name] = input_profile input_profiles[input_profile_name] = input_profile
return input_profiles return input_profiles
def resample_profile(name, profile, t_start, t_horizon, t_step, t_last): def resample_profile(name, profile, t_start, dynamic, d_step_min, t_last):
profile = profile.resample(str(t_step) + 'H').mean().interpolate('linear') profile = profile.resample(str(d_step_min) + 'H').mean().interpolate('linear')
if t_start < profile.index[0]: if t_start < profile.index[0]:
missing_indices = pd.date_range(t_start, profile.index[0] - timedelta(hours = t_step), freq = str(t_step) + 'H') missing_indices = pd.date_range(t_start, profile.index[0] - timedelta(hours = d_step_min), freq = str(d_step_min) + 'H')
print(f'For input profile {name} {len(missing_indices)} {"values are" if len(missing_indices) > 1 else "value is"} extrapolated to the beginning of the profile!') print(f'For input profile {name} {len(missing_indices)} {"values are" if len(missing_indices) > 1 else "value is"} extrapolated to the beginning of the profile!')
profile = pd.concat([pd.DataFrame([profile.iloc[0]], index = missing_indices), profile]) profile = pd.concat([pd.DataFrame([profile.iloc[0]], index = missing_indices), profile])
if profile.index[-1] < t_last: if profile.index[-1] < t_last:
missing_indices = pd.date_range(profile.index[-1] + timedelta(hours = t_step), t_last, freq = str(t_step) + 'H') missing_indices = pd.date_range(profile.index[-1] + timedelta(hours = d_step_min), t_last, freq = str(d_step_min) + 'H')
print(f'For input profile {name} {len(missing_indices)} {"values are" if len(missing_indices) > 1 else "value is"} extrapolated to the end of the profile!') print(f'For input profile {name} {len(missing_indices)} {"values are" if len(missing_indices) > 1 else "value is"} extrapolated to the end of the profile!')
profile = pd.concat([profile, pd.DataFrame([profile.iloc[-1]], index = missing_indices)]) profile = pd.concat([profile, pd.DataFrame([profile.iloc[-1]], index = missing_indices)])
return profile return profile
...@@ -82,8 +98,8 @@ def expand_profile_to_year(profile, year, t_step): ...@@ -82,8 +98,8 @@ def expand_profile_to_year(profile, year, t_step):
profile = pd.concat([profile, pd.DataFrame([profile.iloc[-1]], index = missing_indices)]) profile = pd.concat([profile, pd.DataFrame([profile.iloc[-1]], index = missing_indices)])
return profile return profile
def load_profile(name, file, t_start, t_horizon, t_step): def load_profile(name, file, t_start, dynamic, d_step_min):
t_last = t_start + pd.Timedelta(hours = t_horizon * t_step - t_step) t_last = t_start + pd.Timedelta(hours = sum(dynamic.step_size_p(position) for position in range(dynamic.number_of_steps() - 1)))
profile = pd.read_csv(file, index_col = 0) profile = pd.read_csv(file, index_col = 0)
try: try:
file_start = pd.to_datetime(profile.index[0], format = '%d-%m-%Y %H:%M:%S') file_start = pd.to_datetime(profile.index[0], format = '%d-%m-%Y %H:%M:%S')
...@@ -132,14 +148,14 @@ def load_profile(name, file, t_start, t_horizon, t_step): ...@@ -132,14 +148,14 @@ def load_profile(name, file, t_start, t_horizon, t_step):
# left_index = middle_index # left_index = middle_index
# middle_index = left_index + int((right_index - left_index) / 2) # middle_index = left_index + int((right_index - left_index) / 2)
# middle_time = pd.to_datetime(profile.index[middle_index], format = format) # middle_time = pd.to_datetime(profile.index[middle_index], format = format)
first_index = 0 # first_index = 0
last_index = len(profile) - 1 # last_index = len(profile) - 1
profile = profile[first_index:last_index + 1] # profile = profile[first_index:last_index + 1]
profile.index = pd.to_datetime(profile.index, format = format) profile.index = pd.to_datetime(profile.index, format = format)
return resample_profile(name, profile, t_start, t_horizon, t_step, t_last) return resample_profile(name, profile, t_start, dynamic, d_step_min, t_last)
def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_horizon, t_step): def generate_profile(name, profile_type, parameters, input_profiles, t_start, dynamic, d_step_min):
t_last = t_start + pd.Timedelta(hours = t_horizon * t_step - t_step) t_last = t_start + pd.Timedelta(hours = sum(dynamic.step_size_p(position) for position in range(dynamic.number_of_steps() - 1)))
years = range(t_start.year, t_last.year + 1) years = range(t_start.year, t_last.year + 1)
if profile_type == 'elec_demand': if profile_type == 'elec_demand':
...@@ -148,20 +164,20 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_ ...@@ -148,20 +164,20 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_
year_profile = ElectricalDemand(year).get_profile(parameters['yearly_demand'], 'h0', True) # True: with smoothing function for household profiles, False: no smoothing function for household profiles year_profile = ElectricalDemand(year).get_profile(parameters['yearly_demand'], 'h0', True) # True: with smoothing function for household profiles, False: no smoothing function for household profiles
profiles.append(year_profile) profiles.append(year_profile)
profile = pd.concat(profiles) profile = pd.concat(profiles)
return resample_profile(name, profile, t_start, t_horizon, t_step, t_last) return resample_profile(name, profile, t_start, dynamic, d_step_min, t_last)
elif profile_type == 'therm_demand': elif profile_type == 'therm_demand':
profiles = [] profiles = []
for year in years: for year in years:
demand_df = pd.DataFrame(index=pd.date_range(datetime(year, 1, 1, 0), demand_df = pd.DataFrame(index=pd.date_range(datetime(year, 1, 1, 0),
periods=8760 / t_step, periods=8760 / d_step_min,
freq=str(t_step) + 'H')) freq=str(d_step_min) + 'H'))
# Fixme: non-residential building only have building_class = 0 # Fixme: non-residential building only have building_class = 0
# residential building could varies from 1 to 11 # residential building could varies from 1 to 11
# same problem for hot water demand. # same problem for hot water demand.
# temporary fix # temporary fix
if len(input_profiles[parameters['temperature']][1]) < len(demand_df.index): if len(input_profiles[parameters['temperature']][1]) < len(demand_df.index):
temp = list(input_profiles[parameters['temperature']]) temp = list(input_profiles[parameters['temperature']])
temp[1] = expand_profile_to_year(input_profiles[parameters['temperature']][1], year, t_step) temp[1] = expand_profile_to_year(input_profiles[parameters['temperature']][1], year, d_step_min)
input_profiles[parameters['temperature']] = tuple(temp) input_profiles[parameters['temperature']] = tuple(temp)
year_profile = ThermalDemand(demand_df.index, year_profile = ThermalDemand(demand_df.index,
shlp_type='EFH', shlp_type='EFH',
...@@ -178,12 +194,12 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_ ...@@ -178,12 +194,12 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_
profiles = [] profiles = []
for year in years: for year in years:
demand_df = pd.DataFrame(index=pd.date_range(datetime(year, 1, 1, 0), demand_df = pd.DataFrame(index=pd.date_range(datetime(year, 1, 1, 0),
periods=8760 / t_step, periods=8760 / d_step_min,
freq=str(t_step) + 'H')) freq=str(d_step_min) + 'H'))
# temporary fix # temporary fix
if len(input_profiles[parameters['temperature']][1]) < len(demand_df.index): if len(input_profiles[parameters['temperature']][1]) < len(demand_df.index):
temp = list(input_profiles[parameters['temperature']]) temp = list(input_profiles[parameters['temperature']])
temp[1] = expand_profile_to_year(input_profiles[parameters['temperature']][1], year, t_step) temp[1] = expand_profile_to_year(input_profiles[parameters['temperature']][1], year, d_step_min)
input_profiles[parameters['temperature']] = tuple(temp) input_profiles[parameters['temperature']] = tuple(temp)
year_profile = ThermalDemand(demand_df.index, year_profile = ThermalDemand(demand_df.index,
shlp_type='EFH', shlp_type='EFH',
...@@ -200,7 +216,7 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_ ...@@ -200,7 +216,7 @@ def generate_profile(name, profile_type, parameters, input_profiles, t_start, t_
raise Exception("Generator for profile type " + str(profile_type) + " is not implemented!") raise Exception("Generator for profile type " + str(profile_type) + " is not implemented!")
def modify_profile(name, mod_type: str, parameters, input_profiles, t_start, t_horizon, t_step): def modify_profile(name, mod_type: str, parameters, input_profiles, t_start, dynamic, d_step_min):
# Example of the parameters for use in the runme # Example of the parameters for use in the runme
# ------------------------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------------------------
# 'temperature_1': ['prophet', 'modify', 'input_files/data/temperature/temperature.csv', 'temperature', # 'temperature_1': ['prophet', 'modify', 'input_files/data/temperature/temperature.csv', 'temperature',
......
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import math import math
from Tooling.dynamics.Dynamic import Dynamic
# ---------------------------------------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------------------------------------
# Functions which can be used as predictors for rolling horizon steps in timeseries # Functions which can be used as predictors for rolling horizon steps in timeseries
...@@ -22,11 +22,11 @@ class Predictor: ...@@ -22,11 +22,11 @@ class Predictor:
Name of the prediction method to be used. The default method is "same_as_last_day". Name of the prediction method to be used. The default method is "same_as_last_day".
""" """
def __init__(self, profile, type: str, method: str, t_step: float): def __init__(self, profile, type: str, method: str, dynamic: Dynamic):
self.profile = profile self.profile = profile
self.type = type self.type = type
self.method = method self.method = method
self.t_step = t_step self.dynamic = dynamic
def predict(self, time_steps): def predict(self, time_steps):
if self.method == "perfect_foresight": if self.method == "perfect_foresight":
...@@ -36,6 +36,7 @@ class Predictor: ...@@ -36,6 +36,7 @@ class Predictor:
print('Requested time forward prediction for time steps that include the first time step! Using original data for the first time step.') print('Requested time forward prediction for time steps that include the first time step! Using original data for the first time step.')
return pd.Series(self.profile[time_steps[0]], index = time_steps) return pd.Series(self.profile[time_steps[0]], index = time_steps)
else: else:
raise("Requested a prediction, this feature is currently not supported, so use original data!")
return pd.Series(self.profile[time_steps[0]] - 1, index = time_steps) return pd.Series(self.profile[time_steps[0]] - 1, index = time_steps)
elif self.method == "same_as_last_day": elif self.method == "same_as_last_day":
time_steps_per_day = int(24 / self.t_step) time_steps_per_day = int(24 / self.t_step)
...@@ -45,6 +46,7 @@ class Predictor: ...@@ -45,6 +46,7 @@ class Predictor:
previous_day_data[:time_steps_per_day - time_steps[0]] = self.profile[time_steps[0]:time_steps_per_day] previous_day_data[:time_steps_per_day - time_steps[0]] = self.profile[time_steps[0]:time_steps_per_day]
previous_day_data[time_steps_per_day - time_steps[0]:] = self.profile[:time_steps[0]] previous_day_data[time_steps_per_day - time_steps[0]:] = self.profile[:time_steps[0]]
else: else:
raise("Requested a prediction, this feature is currently not supported, so use original data!")
previous_day_data = np.array(self.profile[time_steps[0] - len(time_steps_per_day):time_steps[0]]) previous_day_data = np.array(self.profile[time_steps[0] - len(time_steps_per_day):time_steps[0]])
days_in_prediction = [(t * time_steps_per_day, (t + 1) * time_steps_per_day) for t in range(math.ceil(len(time_steps) / time_steps_per_day))] days_in_prediction = [(t * time_steps_per_day, (t + 1) * time_steps_per_day) for t in range(math.ceil(len(time_steps) / time_steps_per_day))]
days_in_prediction[-1] = (days_in_prediction[-1][0], time_steps[-1] - time_steps[0] + 1) days_in_prediction[-1] = (days_in_prediction[-1][0], time_steps[-1] - time_steps[0] + 1)
...@@ -61,6 +63,7 @@ class Predictor: ...@@ -61,6 +63,7 @@ class Predictor:
previous_week_data[:time_steps_per_week - time_steps[0]] = self.profile[time_steps[0]:time_steps_per_week] previous_week_data[:time_steps_per_week - time_steps[0]] = self.profile[time_steps[0]:time_steps_per_week]
previous_week_data[time_steps_per_week - time_steps[0]:] = self.profile[:time_steps[0]] previous_week_data[time_steps_per_week - time_steps[0]:] = self.profile[:time_steps[0]]
else: else:
raise("Requested a prediction, this feature is currently not supported, so use original data!")
previous_week_data = np.array(self.profile[time_steps[0] - len(time_steps_per_week):time_steps[0]]) previous_week_data = np.array(self.profile[time_steps[0] - len(time_steps_per_week):time_steps[0]])
weeks_in_prediction = [(t * time_steps_per_week, (t + 1) * time_steps_per_week) for t in range(math.ceil(len(time_steps) / time_steps_per_week))] weeks_in_prediction = [(t * time_steps_per_week, (t + 1) * time_steps_per_week) for t in range(math.ceil(len(time_steps) / time_steps_per_week))]
weeks_in_prediction[-1] = (weeks_in_prediction[-1][0], time_steps[-1] - time_steps[0] + 1) weeks_in_prediction[-1] = (weeks_in_prediction[-1][0], time_steps[-1] - time_steps[0] + 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment