Skip to content
Snippets Groups Projects
Commit 9c293d53 authored by Christoph von Oy's avatar Christoph von Oy
Browse files

Removed unused files

parent 6bb636ea
No related branches found
No related tags found
No related merge requests found
def compute_assignment(dynamic: Union[RootDynamic, BackedDynamic], target_dynamic: Union[RootDynamic, BackedDynamic]) -> 'Assignment':
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
if dynamic == target_dynamic:
return compute_assignment_same(dynamic, target_dynamic)
elif isinstance(target_dynamic, BackedDynamic) and target_dynamic.has_ancestor(dynamic):
return compute_assignment_to_backed(dynamic, target_dynamic)
elif isinstance(dynamic, BackedDynamic) and dynamic.has_ancestor(target_dynamic):
return compute_assignment_from_backed(dynamic, target_dynamic)
else:
return compute_assignment_common_reference(dynamic, target_dynamic)
# source_dynamic and target_dynamic are the same dynamic
def compute_assignment_same(dynamic: Union[RootDynamic, BackedDynamic], target_dynamic: Union[RootDynamic, BackedDynamic]) -> 'Assignment':
assignment = Assignment(dynamic._all_indices(), target_dynamic._all_indices())
assignment.add_bulk(list(target_dynamic.time_steps()))
assignment.compile()
return assignment
# target_dynamic is BackedDynamic and has dynamic as an ancestor
def compute_assignment_to_backed(dynamic: Union[RootDynamic, BackedDynamic], target_dynamic: BackedDynamic) -> 'Assignment':
assignment = Assignment(dynamic._all_indices(), target_dynamic.indices)
for target_index in target_dynamic.indices[:-1]:
target_position = target_dynamic.position_of(target_index)
source_indices = dynamic.indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1])
if len(source_indices) == 1:
assignment.add_individual(source_indices[0], 1, target_index)
else:
acc = []
for source_index in source_indices:
acc.append((source_index, dynamic.step_length(source_index) / target_dynamic.step_length_p(target_position)))
assignment.add_expression(acc, target_index)
assignment.compile()
return assignment
# dynamic is BackedDynamic and has target_dynamic as an ancestor
def compute_assignment_from_backed(dynamic: BackedDynamic, target_dynamic: Union[RootDynamic, BackedDynamic]) -> 'Assignment':
assignment = Assignment(dynamic.indices, target_dynamic._all_indices())
for source_position in range(dynamic.number_of_steps()):
source_index = dynamic.indices[source_position]
target_indices = target_dynamic.indices_within(source_index, dynamic.indices[source_position + 1])
assignment.add_distribute(source_index, target_indices)
assignment.compile()
return assignment
# dynamic and target_dynamic are BackedDynamic and share the same root dynamic
def compute_assignment_common_reference(dynamic: BackedDynamic, target_dynamic: BackedDynamic) -> 'Assignment':
assignment = Assignment(dynamic.indices, target_dynamic.indices)
if dynamic.indices[-1] <= target_dynamic.indices[0] or target_dynamic.indices[-1] <= dynamic.indices[0]:
assignment.compile()
return assignment
target_i_start = target_dynamic.indices[0]
if target_i_start not in dynamic.indices:
source_i_start = dynamic.indices[0]
if source_i_start < target_i_start:
root = dynamic.root()
root_p_start = target_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
while root_p_start not in dynamic.indices[:-1]: # because root is a TrivialDynamic, root_p_start is equivalent to root_i_start
root_p_start -= 1
length += root.step_length_p(root_p_start)
source_position = dynamic.indices.index(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
target_position = 0
remaining_length = dynamic.step_length_p(source_position) - length
else: # Here source_i_start > target_i_start becuase the case of source_i_start == target_i_start is handled in the else branch of target_i_srat not in dynamic.indices
root = dynamic.root()
root_p_start = source_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
source_position = 0
while root_p_start not in target_dynamic.indices:
length += root.step_length_p(root_p_start)
root_p_start += 1
if root_p_start in dynamic.indices[:-1]:
length = 0
source_position += 1
elif root_p_start > dynamic.indices[-1]: # because root is a TrivialDynamic, positions and indices are equivalent
assignment.compile() # here, we discover that the entire source_dynamic does not cover one time_step of the target_dynamic
return assignment
target_position = target_dynamic.position_of(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
remaining_length = dynamic.step_length_p(source_position) - length
else:
source_position = dynamic.indices.index(target_i_start)
target_position = 0
remaining_length = dynamic.step_length(target_i_start)
while target_position < len(target_dynamic.indices) - 1:
remaining_target_length = target_dynamic.step_length_p(target_position)
acc = []
while remaining_target_length > 0:
if remaining_length == 0:
source_position += 1
if source_position >= len(dynamic.indices) - 1:
assignment.compile()
return assignment
remaining_length = dynamic.step_length_p(source_position)
if remaining_target_length <= remaining_length:
acc.append((dynamic.indices[source_position], remaining_target_length))
remaining_length -= remaining_target_length
remaining_target_length -= remaining_target_length
else:
acc.append((dynamic.indices[source_position], remaining_length))
remaining_target_length -= remaining_length
remaining_length -= remaining_length
for i, (index, factor) in enumerate(acc):
acc[i] = (index, factor / target_dynamic.step_length_p(target_position))
assignment.add_expression(acc, target_dynamic.index_of(target_position))
target_position += 1
assignment.compile()
return assignment
class Assignment:
def __init__(self, indices, target_indices):
self.index = indices
self.target_index = target_indices
self.bulk = pd.Series(False, index=indices[:-1])
self.bulk_target = pd.Series(False, index=target_indices[:-1])
self.distributes = []
self.expressions = []
self.data = dict.fromkeys(target_indices[:-1], None)
def add_bulk(self, targets):
self.bulk[targets] = True
self.bulk_target[targets] = True
for target in targets:
self.data[target] = (target, 1.0)
def add_distribute(self, source, targets):
self.bulk[targets[0]] = True
self.bulk_target[targets[0]] = True
if len(targets) > 1:
self.distributes.append((source, targets[1:]))
for target in targets:
self.data[target] = (source, 1.0)
def add_individual(self, source, factor, target):
if target == source and factor == 1.0:
self.bulk[target] = True
self.bulk_target[target] = True
elif factor == 1.0:
if len(self.distributes) == 0 or self.distributes[-1][0] != source:
self.distributes.append((source, []))
self.distributes[-1][1].append(target)
else:
raise ValueError(f'Tried to add a individual with a factor unequal to 1!')
self.data[target] = (source, factor)
def add_expression(self, expression, target):
if len(expression) == 1:
self.add_individual(expression[0][0], expression[0][1], target)
else:
self.expressions.append((expression, target))
self.data[target] = expression
def compile(self):
index_shift = {self.index[p]: self.index[p + 1] for p in range(len(self.index[:-1]))}
target_index_shift = {self.target_index[p]: self.target_index[p + 1] for p in range(len(self.target_index[:-1]))}
self.source_target_start = dict.fromkeys(self.index[:-1], None)
self.target_source_start = dict.fromkeys(self.target_index[:-1], None)
self.source_target_end = dict.fromkeys(self.index[1:], None)
self.target_source_end = dict.fromkeys(self.target_index[1:], None)
last_un_p_source = 0
last_un_p_source_target_end = None
last_source_target_end = None
for target, data in self.data.items():
if data is not None:
first_source = data[0][0] if isinstance(data, list) else data[0]
last_source = data[-1][0] if isinstance(data, list) else data[0]
p_stop = self.index.index(first_source) + 1
for p in range(last_un_p_source, p_stop):
self.source_target_start[self.index[p]] = target
last_un_p_source = p_stop
self.target_source_start[target] = first_source
if isinstance(data, list):
for source, _f in data[1:]:
self.source_target_end[source] = target
self.source_target_end[index_shift[last_source]] = target_index_shift[target]
self.target_source_end[target_index_shift[target]] = index_shift[last_source]
last_un_p_source_target_end = self.index.index(index_shift[last_source]) + 1
last_source_target_end = target_index_shift[target]
if last_un_p_source_target_end is not None:
for p in range(last_un_p_source_target_end, len(self.index)):
self.source_target_end[self.index[p]] = last_source_target_end
self.first_distribute = dict.fromkeys(self.target_index[:-1], None)
last_un_p_distribute = 0
for n, distribute in enumerate(self.distributes):
for p in range(last_un_p_distribute, self.target_index.index(distribute[1][0])):
self.first_distribute[self.target_index[p]] = (n, 0)
for m, target in enumerate(distribute[1]):
self.first_distribute[target] = (n, m)
last_un_p_distribute = self.target_index.index(distribute[1][-1]) + 1
self.last_distribute = dict.fromkeys(self.target_index[1:], None)
last_un_p_distribute = len(self.target_index)
for n_prime in range(len(self.distributes)):
n = len(self.distributes) - n_prime - 1
distribute = self.distributes[n]
for p in range(self.target_index.index(distribute[1][-1]) + 1, last_un_p_distribute):
self.last_distribute[self.target_index[p]] = (n, len(distribute[1]))
for m, target in enumerate(distribute[1][1:]):
self.last_distribute[target] = (n, m + 1)
last_un_p_distribute = self.target_index.index(distribute[1][0]) + 1
self.first_expression = dict.fromkeys(self.target_index[:-1], None)
last_un_p_expression = 0
for n, expression in enumerate(self.expressions):
for p in range(last_un_p_expression, self.target_index.index(expression[1]) + 1):
self.first_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
self.last_expression = dict.fromkeys(self.target_index[1:], None)
last_un_p_expression = len(self.target_index)
for n_prime in range(len(self.expressions)):
n = len(self.expressions) - n_prime - 1
expression = self.expressions[n]
for p in range(self.target_index.index(expression[1]) + 1, last_un_p_expression):
self.last_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
def resample_tree(values, dynamic: TreeDynamic, target_dynamic: TreeDynamic) -> pd.Series:
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
target_values = pd.Series(np.nan, index = target_dynamic.time_steps())
source_i_start = dynamic.index_of(0)
source_i_end = dynamic.index_of(dynamic.number_of_steps())
target_i_start = target_dynamic.index_of(0)
target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps())
if source_i_end <= target_i_start or target_i_end <= source_i_start:
return target_values
assignment = dynamic.dynamic_tree.get_assignment(dynamic, target_dynamic)
if source_i_start < target_i_start:
source_i_start = assignment.target_source_start[target_i_start]
elif source_i_start > target_i_start:
target_i_start = assignment.source_target_start[source_i_start]
if target_i_start is None:
return target_values
if source_i_end > target_i_end:
source_i_end = assignment.target_source_end[target_i_end]
elif source_i_end < target_i_end:
target_i_end = assignment.source_target_end[source_i_end]
if target_i_end is None:
return target_values
if target_i_start >= target_i_end:
return target_values
target_values.loc[(target_i_start <= target_values.index) & (target_values.index < target_i_end) & assignment.bulk_target.loc[target_dynamic.index_of(0):target_dynamic.index_of(target_dynamic.number_of_steps() - 1)].values] = values.loc[(source_i_start <= values.index) & (values.index < source_i_end) & assignment.bulk.loc[dynamic.index_of(0):dynamic.index_of(dynamic.number_of_steps() - 1)].values]
if assignment.first_distribute[target_i_start] is not None and assignment.last_distribute[target_i_end] is not None:
first_distribute = assignment.first_distribute[target_i_start]
last_distribute = assignment.last_distribute[target_i_end]
if first_distribute[0] == last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:last_distribute[1]]] = values.loc[distribute[0]]
elif first_distribute[0] < last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:]] = values.loc[distribute[0]]
for i in range(first_distribute[0] + 1, last_distribute[0]):
distribute = assignment.distributes[i]
target_values.loc[distribute[1]] = values.loc[distribute[0]]
distribute = assignment.distributes[last_distribute[0]]
target_values.loc[distribute[1][:last_distribute[1]]] = values.loc[distribute[0]]
if assignment.first_expression[target_i_start] is not None and assignment.last_expression[target_i_end] is not None:
for i in range(assignment.first_expression[target_i_start], assignment.last_expression[target_i_end] + 1):
expression = assignment.expressions[i]
target_values[expression[1]] = sum(values[i] * f for i, f in expression[0])
return target_values
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment