Skip to content
Snippets Groups Projects
Commit d60a571f authored by Christoph von Oy's avatar Christoph von Oy
Browse files

Added and tested new resampling

parent 4570fa91
Branches
No related tags found
No related merge requests found
......@@ -66,6 +66,19 @@ class GlobalDynamic:
self.symbol_table[partial_dynamic] = len(self.symbol_table)
return partial_dynamic
def get_assignment(self, dynamic, target_dynamic):
if isinstance(dynamic, PartialDynamic):
non_partial_dynamic = dynamic.reference
else:
non_partial_dynamic = dynamic
if isinstance(target_dynamic, PartialDynamic):
non_partial_target_dynamic = target_dynamic.reference
else:
non_partial_target_dynamic = target_dynamic
if (non_partial_dynamic, non_partial_target_dynamic) not in self.assignments:
self.assignments[non_partial_dynamic, non_partial_target_dynamic] = compute_assignment(non_partial_dynamic, non_partial_target_dynamic)
return self.assignments[non_partial_dynamic, non_partial_target_dynamic]
def display(self):
output = ''
for dynamic, (sub_dynamics, partial_dynamics) in self.dynamics.items():
......@@ -501,6 +514,294 @@ class PartialDynamic(Dynamic):
raise IndexError("The dynamic does not have all requested indices for the sub dynamic!")
return self.global_dynamic.partial_dynamic(self, self.position_of(i_start), self.position_of(i_end))
def compute_assignment(dynamic, target_dynamic):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
if isinstance(dynamic, PartialDynamic):
raise ValueError("The source dyanmic cannot be a PartialDynamic!")
if isinstance(target_dynamic, PartialDynamic):
raise ValueError("The target dyanmic cannot be a PartialDynamic!")
if dynamic == target_dynamic:
return compute_assignment_same(dynamic, target_dynamic)
elif isinstance(target_dynamic, BackedDynamic) and target_dynamic.has_ancestor(dynamic):
return compute_assignment_to_backed(dynamic, target_dynamic)
elif isinstance(dynamic, BackedDynamic) and dynamic.has_ancestor(target_dynamic):
return compute_assignment_from_backed(dynamic, target_dynamic)
else:
return compute_assignment_common_reference(dynamic, target_dynamic)
# source_dynamic and target_dynamic are the same dynamic
def compute_assignment_same(dynamic, target_dynamic):
assignment = Assignment(dynamic.all_indices(), target_dynamic.all_indices())
assignment.add_bulk(target_dynamic.time_steps())
assignment.compile()
return assignment
# target_dynamic is BackedDynamic and has dynamic as an ancestor
def compute_assignment_to_backed(dynamic, target_dynamic):
assignment = Assignment(dynamic.all_indices(), target_dynamic.indices)
for target_index in target_dynamic.indices[:-1]:
target_position = target_dynamic.position_of(target_index)
source_indices = dynamic.indices_within(target_dynamic.indices[target_position], target_dynamic.indices[target_position + 1])
if len(source_indices) == 1:
assignment.add_individual(source_indices[0], 1, target_index)
else:
acc = []
for source_index in source_indices:
acc.append((source_index, dynamic.step_size(source_index) / target_dynamic.step_size_p(target_position)))
assignment.add_expression(acc, target_index)
assignment.compile()
return assignment
# dynamic is BackedDynamic and has target_dynamic as an ancestor
def compute_assignment_from_backed(dynamic, target_dynamic):
assignment = Assignment(dynamic.indices, target_dynamic.all_indices())
for source_position in range(dynamic.number_of_steps()):
source_index = dynamic.indices[source_position]
target_indices = target_dynamic.indices_within(source_index, dynamic.indices[source_position + 1])
assignment.add_distribute(source_index, target_indices)
assignment.compile()
return assignment
# dynamic and target_dynamic are BackedDynamic and share the same root dynamic
def compute_assignment_common_reference(dynamic, target_dynamic):
assignment = Assignment(dynamic.indices, target_dynamic.indices)
if dynamic.indices[-1] <= target_dynamic.indices[0] or target_dynamic.indices[-1] <= dynamic.indices[0]:
assignment.compile()
return assignment
target_i_start = target_dynamic.indices[0]
if target_i_start not in dynamic.indices:
source_i_start = dynamic.indices[0]
if source_i_start < target_i_start:
root = dynamic.root()
root_p_start = target_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
while root_p_start not in dynamic.indices[:-1]: # because root is a TrivialDynamic, root_p_start is equivalent to root_i_start
root_p_start -= 1
length += root.step_size_p(root_p_start)
source_position = dynamic.indices.index(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
target_position = 0
remaining_length = dynamic.step_size_p(source_position) - length
else: # Here source_i_start > target_i_start becuase the case of source_i_start == target_i_start is handled in the else branch of target_i_srat not in dynamic.indices
root = dynamic.root()
root_p_start = source_i_start # because root is a TrivialDynamic, positions and indices are equivalent
length = 0
source_position = 0
while root_p_start not in target_dynamic.indices:
length += root.step_size_p(root_p_start)
root_p_start += 1
if root_p_start in dynamic.indices[:-1]:
length = 0
source_position += 1
elif root_p_start > dynamic.indices[-1]: # because root is a TrivialDynamic, positions and indices are equivalent
assignment.compile() # here, we discover that the entire source_dynamic does not cover one time_step of the target_dynamic
return assignment
target_position = target_dynamic.position_of(root_p_start) # because root is a TrivialDynamic, positions and indices are equivalent
remaining_length = dynamic.step_size_p(source_position) - length
else:
source_position = dynamic.indices.index(target_i_start)
target_position = 0
remaining_length = dynamic.step_size(target_i_start)
while target_position < len(target_dynamic.indices) - 1:
remaining_target_length = target_dynamic.step_size_p(target_position)
acc = []
while remaining_target_length > 0:
if remaining_length == 0:
source_position += 1
if source_position >= len(dynamic.indices) - 1:
assignment.compile()
return assignment
remaining_length = dynamic.step_size_p(source_position)
if remaining_target_length <= remaining_length:
acc.append((dynamic.indices[source_position], remaining_target_length))
remaining_length -= remaining_target_length
remaining_target_length -= remaining_target_length
else:
acc.append((dynamic.indices[source_position], remaining_length))
remaining_target_length -= remaining_length
remaining_length -= remaining_length
for i, (index, factor) in enumerate(acc):
acc[i] = (index, factor / target_dynamic.step_size_p(target_position))
assignment.add_expression(acc, target_dynamic.index_of(target_position))
target_position += 1
assignment.compile()
return assignment
class Assignment:
def __init__(self, indices, target_indices):
self.index = list(indices)
self.target_index = list(target_indices)
self.bulk = pd.Series(False, index=indices[:-1])
self.bulk_target = pd.Series(False, index=target_indices[:-1])
self.distributes = []
self.expressions = []
self.data = dict.fromkeys(target_indices[:-1], None)
def add_bulk(self, targets):
self.bulk[targets] = True
self.bulk_target[targets] = True
for target in targets:
self.data[target] = (target, 1.0)
def add_distribute(self, source, targets):
self.bulk[targets[0]] = True
self.bulk_target[targets[0]] = True
if len(targets) > 1:
self.distributes.append((source, targets[1:]))
for target in targets:
self.data[target] = (source, 1.0)
def add_individual(self, source, factor, target):
if target == source and factor == 1.0:
self.bulk[target] = True
self.bulk_target[target] = True
elif factor == 1.0:
if len(self.distributes) == 0 or self.distributes[-1][0] != source:
self.distributes.append((source, []))
self.distributes[-1][1].append(target)
else:
raise ValueError(f'Tried to add a individual with a factor unequal to 1!')
self.data[target] = (source, factor)
def add_expression(self, expression, target):
if len(expression) == 1:
self.add_individual(expression[0][0], expression[0][1], target)
else:
self.expressions.append((expression, target))
self.data[target] = expression
def compile(self):
index_shift = {self.index[p]: self.index[p + 1] for p in range(len(self.index[:-1]))}
target_index_shift = {self.target_index[p]: self.target_index[p + 1] for p in range(len(self.target_index[:-1]))}
self.source_target_start = dict.fromkeys(self.index[:-1], None)
self.target_source_start = dict.fromkeys(self.target_index[:-1], None)
self.source_target_end = dict.fromkeys(self.index[1:], None)
self.target_source_end = dict.fromkeys(self.target_index[1:], None)
last_un_p_source = 0
last_un_p_source_target_end = None
last_source_target_end = None
for target, data in self.data.items():
if data is not None:
first_source = data[0][0] if isinstance(data, list) else data[0]
last_source = data[-1][0] if isinstance(data, list) else data[0]
p_stop = self.index.index(first_source) + 1
for p in range(last_un_p_source, p_stop):
self.source_target_start[self.index[p]] = target
last_un_p_source = p_stop
self.target_source_start[target] = first_source
if isinstance(data, list):
for source, _f in data[1:]:
self.source_target_end[source] = target
self.source_target_end[index_shift[last_source]] = target_index_shift[target]
self.target_source_end[target_index_shift[target]] = index_shift[last_source]
last_un_p_source_target_end = self.index.index(index_shift[last_source]) + 1
last_source_target_end = target_index_shift[target]
if last_un_p_source_target_end is not None:
for p in range(last_un_p_source_target_end, len(self.index)):
self.source_target_end[self.index[p]] = last_source_target_end
self.first_distribute = dict.fromkeys(self.target_index[:-1], None)
last_un_p_distribute = 0
for n, distribute in enumerate(self.distributes):
for p in range(last_un_p_distribute, self.target_index.index(distribute[1][0])):
self.first_distribute[self.target_index[p]] = (n, 0)
for m, target in enumerate(distribute[1]):
self.first_distribute[target] = (n, m)
last_un_p_distribute = self.target_index.index(distribute[1][-1]) + 1
self.last_distribute = dict.fromkeys(self.target_index[1:], None)
last_un_p_distribute = len(self.target_index)
for n_prime in range(len(self.distributes)):
n = len(self.distributes) - n_prime - 1
distribute = self.distributes[n]
for p in range(self.target_index.index(distribute[1][-1]) + 1, last_un_p_distribute):
self.last_distribute[self.target_index[p]] = (n, len(distribute[1]))
for m, target in enumerate(distribute[1][1:]):
self.last_distribute[target] = (n, m + 1)
last_un_p_distribute = self.target_index.index(distribute[1][0]) + 1
self.first_expression = dict.fromkeys(self.target_index[:-1], None)
last_un_p_expression = 0
for n, expression in enumerate(self.expressions):
for p in range(last_un_p_expression, self.target_index.index(expression[1]) + 1):
self.first_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
self.last_expression = dict.fromkeys(self.target_index[1:], None)
last_un_p_expression = len(self.target_index)
for n_prime in range(len(self.expressions)):
n = len(self.expressions) - n_prime - 1
expression = self.expressions[n]
for p in range(self.target_index.index(expression[1]) + 1, last_un_p_expression):
self.last_expression[self.target_index[p]] = n
last_un_p_expression = self.target_index.index(expression[1]) + 1
def resample_new(values, dynamic, target_dynamic, target_values=None):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
if target_values is None:
target_values = pd.Series(index = target_dynamic.time_steps())
source_i_start = dynamic.index_of(0)
source_i_end = dynamic.index_of(dynamic.number_of_steps())
target_i_start = target_dynamic.index_of(0)
target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps())
if source_i_end <= target_i_start or target_i_end <= source_i_start:
return target_values
assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic)
if source_i_start < target_i_start:
source_i_start = assignment.target_source_start[target_i_start]
elif source_i_start > target_i_start:
target_i_start = assignment.source_target_start[source_i_start]
if target_i_start is None:
return target_values
if source_i_end > target_i_end:
source_i_end = assignment.target_source_end[target_i_end]
elif source_i_end < target_i_end:
target_i_end = assignment.source_target_end[source_i_end]
if target_i_end is None:
return target_values
if target_i_start >= target_i_end:
return target_values
target_values.loc[(target_i_start <= target_values.index) & (target_values.index < target_i_end) & assignment.bulk_target.loc[target_dynamic.index_of(0):target_dynamic.index_of(target_dynamic.number_of_steps() - 1)].values] = values.loc[(source_i_start <= values.index) & (values.index < source_i_end) & assignment.bulk.loc[dynamic.index_of(0):dynamic.index_of(dynamic.number_of_steps() - 1)].values]
if assignment.first_distribute[target_i_start] is not None and assignment.last_distribute[target_i_end] is not None:
first_distribute = assignment.first_distribute[target_i_start]
last_distribute = assignment.last_distribute[target_i_end]
if first_distribute[0] == last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:last_distribute[1]]] = values.loc[distribute[0]]
elif first_distribute[0] < last_distribute[0]:
distribute = assignment.distributes[first_distribute[0]]
target_values.loc[distribute[1][first_distribute[1]:]] = values.loc[distribute[0]]
for i in range(first_distribute[0] + 1, last_distribute[0]):
distribute = assignment.distributes[i]
target_values.loc[distribute[1]] = values.loc[distribute[0]]
distribute = assignment.distributes[last_distribute[0]]
target_values.loc[distribute[1][:last_distribute[1]]] = values.loc[distribute[0]]
if assignment.first_expression[target_i_start] is not None and assignment.last_expression[target_i_end] is not None:
for i in range(assignment.first_expression[target_i_start], assignment.last_expression[target_i_end] + 1):
expression = assignment.expressions[i]
target_values[expression[1]] = sum(values[i] * f for i, f in expression[0])
return target_values
def resample_variable_new(variable, dynamic, target_dynamic, target_set):
if dynamic.root() != target_dynamic.root():
raise ValueError("Both dynamics have to have the same root dynamic!")
source_i_start = dynamic.index_of(0)
source_i_end = dynamic.index_of(dynamic.number_of_steps())
target_i_start = target_dynamic.index_of(0)
target_i_end = target_dynamic.index_of(target_dynamic.number_of_steps())
if target_i_start < source_i_start or source_i_end < target_i_end:
raise ValueError("The dynamic of the source variable has to cover the dynamic of the target variable!")
assignment = dynamic.global_dynamic.get_assignment(dynamic, target_dynamic)
def rule(m, t):
data = assignment.data[t]
if isinstance(data, list):
return pyo.quicksum(variable[variable_index] * factor for variable_index, factor in assignment.data[t])
else:
return variable[data[0]] * data[1]
return pyo.Expression(target_set, rule=rule)
# only works if both dynamics share the same root dynamic
def resample(values, dynamic, target_dynamic):
if dynamic.root() != target_dynamic.root():
......@@ -583,6 +884,8 @@ def resample_into_to_backed(values, dynamic, source_p_start, source_p_end, targe
while source_i_start not in target_dynamic.indices:
source_p_start += 1
source_i_start = dynamic.index_of(source_p_start)
if source_i_start > target_dynamic.indices[-1]:
raise SyntaxError
target_p_start = target_dynamic.position_of(source_i_start)
source_i_end = dynamic.index_of(source_p_end)
target_i_end = target_dynamic.indices[target_p_end]
......@@ -594,6 +897,8 @@ def resample_into_to_backed(values, dynamic, source_p_start, source_p_end, targe
while source_i_end not in target_dynamic.indices:
source_p_end -= 1
source_i_end = dynamic.index_of(source_p_end)
if source_i_end < target_dynamic.indices[0]:
raise SyntaxError
target_p_end = target_dynamic.position_of(source_i_end)
for target_position in range(target_p_start, target_p_end):
acc = 0
......@@ -648,6 +953,8 @@ def resample_into_from_backed(values, dynamic, source_p_start, source_p_end, tar
while target_i_start_succ not in dynamic.indices:
target_p_start_succ += 1
target_i_start_succ = target_dynamic.index_of(target_p_start_succ)
if target_i_start_succ > dynamic.indices[-1]:
raise SyntaxError
source_p_start = dynamic.position_of(target_i_start_succ)
target_indices = target_dynamic.indices_within(target_i_start, target_i_start_succ)
target_values.loc[target_indices] = values[dynamic.indices[source_p_start - 1]]
......@@ -662,6 +969,8 @@ def resample_into_from_backed(values, dynamic, source_p_start, source_p_end, tar
while target_i_end_prev not in dynamic.indices:
target_p_end_prev -= 1
target_i_end_prev = target_dynamic.index_of(target_p_end_prev)
if target_i_end_prev < dynamic.indices[0]:
raise SyntaxError
source_p_end = dynamic.position_of(target_i_end_prev)
target_indices = target_dynamic.indices_within(target_i_end_prev, target_i_end)
target_values.loc[target_indices] = values[dynamic.indices[source_p_end]]
......@@ -841,63 +1150,231 @@ def resample_variable_common_reference(variable, dynamic, source_p_start, source
return pyo.quicksum(variable * factor for variable, factor in assignment[t])
return pyo.Expression(target_set, rule=rule)
def test_single_resampling(dynamic, target_dynamic):
index = [dynamic.index_of(position) for position in range(dynamic.number_of_steps())]
values = pd.Series(data = [float(i) for i in range(len(index))], index = index)
def test_single_resampling(dynamic_1, dynamic_2, f):
import numpy as np
values = pd.Series(data = [float(i) for i in range(dynamic_1.number_of_steps())], index = dynamic_1.time_steps())
display_alignment = False
output = ''
try:
print(resample(values, dynamic, target_dynamic))
target_values = resample(values, dynamic_1, dynamic_2)
worked = True
output += 'fine |'
except ValueError:
worked = False
output += 'Value|'
except IndexError:
worked = False
output += 'Index|'
except SyntaxError:
worked = False
output += 'Loop |'
except KeyError:
worked = False
output += 'Key |'
try:
target_values_new = resample_new(values, dynamic_1, dynamic_2)
worked_new = True
output += 'fine |'
except ValueError:
worked_new = False
output += 'Value|'
except:
print('hey')
target_index = [target_dynamic.index_of(position) for position in range(target_dynamic.number_of_steps())]
target_values = pd.Series(dtype = float, index = target_index)
worked_new = False
output += 'error|'
if worked and worked_new:
if not all(np.isclose(target_values.values, target_values_new.values, equal_nan=True)):
output += 'error|'
display_alignment = True
else:
output += 'fine |'
else:
output += ' |'
target_values_into = pd.Series(index = dynamic_2.time_steps())
try:
resample_into(values, dynamic_1, dynamic_2, target_values_into)
worked_into = True
output += 'fine |'
except ValueError:
worked_into = False
output += 'Value|'
except IndexError:
worked_into = False
output += 'Index|'
except SyntaxError:
worked_into = False
output += 'Loop |'
except KeyError:
worked_into = False
output += 'Key |'
target_values_into_new = pd.Series(index = dynamic_2.time_steps())
try:
print(resample_into(values, dynamic, target_dynamic, target_values))
resample_new(values, dynamic_1, dynamic_2, target_values=target_values_into_new)
worked_into_new = True
output += 'fine |'
except ValueError:
worked_into_new = False
output += 'Value|'
except:
print('hey')
worked_into_new = False
output += 'error|'
if worked_into and worked_into_new:
if not all(np.isclose(target_values_into.values, target_values_into_new.values, equal_nan=True)):
output += 'error|'
display_alignment = True
else:
output += 'fine |'
else:
output += ' |'
model = pyo.ConcreteModel()
model.set = pyo.Set(initialize = list(dynamic.time_steps()), ordered=True)
model.target_set = pyo.Set(initialize = list(target_dynamic.time_steps()), ordered=True)
model.variable = pyo.Var(model.set)
model.set = pyo.Set(initialize = list(dynamic_1.time_steps()), ordered=True)
model.target_set = pyo.Set(initialize = list(dynamic_2.time_steps()), ordered=True)
def rule(m, t):
return float(t)
model.variable = pyo.Var(model.set, initialize = rule)
try:
model.expression = resample_variable(model.variable, dynamic_1, dynamic_2, model.target_set)
worked_variable = True
output += 'fine |'
except ValueError:
worked_variable = False
output += 'Value|'
except IndexError:
worked_variable = False
output += 'Index|'
except SyntaxError:
worked_variable = False
output += 'Loop |'
except KeyError:
worked_variable = False
output += 'Key |'
try:
model.expression = resample_variable(model.variable, dynamic, target_dynamic, model.target_set)
print(model.expression.pprint())
model.expression_new = resample_variable_new(model.variable, dynamic_1, dynamic_2, model.target_set)
worked_variable_new = True
output += 'fine |'
except ValueError:
worked_variable_new = False
output += 'Value|'
except:
print('hey')
worked_variable_new = False
output += 'error|'
if worked_variable and worked_variable_new:
if not all(np.isclose(np.array(list(i.expr() for i in model.expression.values())), np.array(list(i.expr() for i in model.expression_new.values())))):
output += 'error\n'
display_alignment = True
else:
output += 'fine\n'
else:
output += ' \n'
if display_alignment:
output += dynamic_1.display_alignment(dynamic_2)
f.write(output)
def test_resampling():
import random
random.seed(0)
global_dynamic = GlobalDynamic([1 for i in range(100)])
dynamics = []
dynamics.append(global_dynamic.root())
for i in range(100):
dynamic_number = random.randint(0, len(dynamics) - 1)
dynamic = dynamics[dynamic_number]
if random.random() < 0.75:
original_indices = list(dynamic.all_indices())
number = random.randint(2, len(original_indices))
indices = []
for i in range(number):
choice = random.choice(original_indices)
original_indices.remove(choice)
indices.append(choice)
indices.sort()
sub_dynamic = dynamic.sub_dynamic(indices)
if sub_dynamic not in dynamics:
dynamics.append(sub_dynamic)
else:
original_positions = list(range(dynamic.number_of_steps() + 1))
positions = []
for i in range(2):
choice = random.choice(original_positions)
original_positions.remove(choice)
positions.append(choice)
positions.sort()
p_start = positions[0]
p_end = positions[1]
partial_dynamic = dynamic.partial_dynamic_p(p_start, p_end)
if partial_dynamic not in dynamics:
dynamics.append(partial_dynamic)
print(global_dynamic.display(), end='')
test_single_resampling(dynamics[0], dynamics[0])
print(dynamics[0].display_alignment(dynamics[0]), end='')
# import random
# random.seed(0)
# global_dynamic = GlobalDynamic([1 for i in range(100)])
# dynamics = []
# dynamics.append(global_dynamic.root())
# for i in range(100):
# dynamic_number = random.randint(0, len(dynamics) - 1)
# dynamic = dynamics[dynamic_number]
# if random.random() < 0.75:
# original_indices = list(dynamic.all_indices())
# number = random.randint(2, len(original_indices))
# indices = []
# for i in range(number):
# choice = random.choice(original_indices)
# original_indices.remove(choice)
# indices.append(choice)
# indices.sort()
# sub_dynamic = dynamic.sub_dynamic(indices)
# if sub_dynamic not in dynamics:
# dynamics.append(sub_dynamic)
# else:
# original_positions = list(range(dynamic.number_of_steps() + 1))
# positions = []
# for i in range(2):
# choice = random.choice(original_positions)
# original_positions.remove(choice)
# positions.append(choice)
# positions.sort()
# p_start = positions[0]
# p_end = positions[1]
# partial_dynamic = dynamic.partial_dynamic_p(p_start, p_end)
# if partial_dynamic not in dynamics:
# dynamics.append(partial_dynamic)
# f = open("resampling.txt", "w")
# for i, dynamic_1 in enumerate(dynamics):
# print(f'{i}')
# for j, dynamic_2 in enumerate(dynamics):
# test_single_resampling(dynamic_1, dynamic_2, f)
global_dynamic = GlobalDynamic([1 for i in range(8)])
root = global_dynamic.root()
sub_root = root.sub_dynamic([0, 1, 2, 3])
sub_dynamics = []
for i in range(16):
positions = [j for j in range(4) if (i >> j) % 2 == 0]
if len(positions) >= 2:
sub_dynamics.append(sub_root.sub_dynamic_p(positions))
blocks = []
blocks.append([root.sub_dynamic([0, 1, 2, 3])])
dynamics_2 = []
for i in range(5):
dynamics_2.append(root.sub_dynamic([j for j in range(5) if j != i]))
blocks.append(dynamics_2)
dynamics_3 = []
for i in range(5):
for j in range(i + 1, 6):
dynamics_3.append(root.sub_dynamic([k for k in range(6) if k not in [i, j]]))
blocks.append(dynamics_3)
dynamics_4 = []
for i in range(5):
for j in range(i + 1, 6):
for k in range(j + 1, 7):
dynamics_4.append(root.sub_dynamic([l for l in range(7) if l not in [i, j, k]]))
blocks.append(dynamics_4)
dynamics_5 = []
for i in range(5):
for j in range(i + 1, 6):
for k in range(j + 1, 7):
for l in range(k + 1, 8):
dynamics_5.append(root.sub_dynamic([m for m in range(8) if m not in [i, j, k, l]]))
blocks.append(dynamics_5)
partial_intervalls = []
for i in range(3):
for j in range(i + 1, 4):
partial_intervalls.append((i, j))
f = open("resampling.txt", "w")
f.write(f'{sub_root.all_indices()} -> {sub_root.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
for (start_2, end_2) in partial_intervalls:
partial_1 = sub_root.partial_dynamic_p(start_1, end_1)
partial_2 = sub_root.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
for sub_dynamic in sub_dynamics:
f.write(f'{sub_root.all_indices()} -> {sub_dynamic.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
if end_1 > sub_root.number_of_steps():
continue
for (start_2, end_2) in partial_intervalls:
if end_2 > sub_dynamic.number_of_steps():
continue
partial_1 = sub_root.partial_dynamic_p(start_1, end_1)
partial_2 = sub_dynamic.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
test_single_resampling(partial_2, partial_1, f)
for block_number, block in enumerate(blocks):
print(f'Block {block_number}')
length = block_number + 4
for i, dynamic_1 in enumerate(block):
for j, dynamic_2 in enumerate(block):
if any(k not in dynamic_1.all_indices() and k not in dynamic_2.all_indices() for k in range(length)):
continue
print(f'{i} -> {j}')
f.write(f'{dynamic_1.all_indices()} -> {dynamic_2.all_indices()}\n')
for (start_1, end_1) in partial_intervalls:
for (start_2, end_2) in partial_intervalls:
partial_1 = dynamic_1.partial_dynamic_p(start_1, end_1)
partial_2 = dynamic_2.partial_dynamic_p(start_2, end_2)
test_single_resampling(partial_1, partial_2, f)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment