Skip to content
Snippets Groups Projects
Commit b5c360da authored by Münker's avatar Münker
Browse files

Transfer from public branch.

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1509 additions and 0 deletions
"""
For AOG validation, first it is necessary to verify that the liaison graph
@author: Sören Münker (mnk)
"""
import os
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
folder_path = os.path.abspath('C:/Users/...') #Insert absolute path
product_name = 'ex_rq1_1_centrifugal_pump'
liaison_file_name = product_name + '_Liaisons.xlsx'
bb_file_name = product_name + '_BB Volumes.xlsx'
liaison_file_path = os.path.join(folder_path, liaison_file_name)
liaisons_df = pd.read_excel(liaison_file_path, index_col=None, header=None)
bb_file_path = os.path.join(folder_path, bb_file_name)
part_name_df = pd.read_excel(bb_file_path)
part_names = list(part_name_df['Product'].values)
liaisons_df.index = part_names
liaisons_df.columns = part_names
G = nx.from_pandas_adjacency(liaisons_df)
nx.draw(G, with_labels=True)
plt.savefig(os.path.join(folder_path, product_name + '_liaison_graph.png'))
plt.show()
\ No newline at end of file
import networkx as nx
from src.validation.evaluation_calculation import open_aog_file, sort_by_values_len, calc_number_of_possible_PGs, calc_number_of_possible_sequences, open_assembly_tree
from src.validation.uniqueness_of_aog_pg_sequences import get_unique_assembly_tree_graphs, calc_num_sequences_per_atg
from src.graph_utilities.aog_to_assembly_tree import get_monte_carlo_optimal_assembly_tree
import sys
import matplotlib.pyplot as plt
import numpy as np
def calc_avg_num_sequences(AOG, iterations, n_possible_pg):
unique_atg = get_unique_assembly_tree_graphs(AOG, iterations, n_possible_pg)
num_sequences_block = []
for atg in unique_atg:
num_sequences_block.append(calc_number_of_possible_sequences(atg))
avg_num_sequence = sum(num_sequences_block) / len(num_sequences_block)
return avg_num_sequence
def calc_mean_num_sequences(AOG, iterations, n_possible_pg):
unique_atg = get_unique_assembly_tree_graphs(AOG, iterations, n_possible_pg)
num_sequences_block = []
for atg in unique_atg:
num_sequences_block.append(calc_number_of_possible_sequences(atg))
mean = np.mean(num_sequences_block)
return round(mean)
def calc_std_of_sequences(AOG, iterations, n_possible_pg):
unique_atg = get_unique_assembly_tree_graphs(AOG, iterations, n_possible_pg)
num_sequences_block = []
for atg in unique_atg:
num_sequences_block.append(calc_number_of_possible_sequences(atg))
std = np.std(num_sequences_block)
return std
def plot_histogram_num_sequences(AOG, iterations, n_possible_pg, save_fig=False, save_path="histogram.png"):
plt.close()
unique_atg = get_unique_assembly_tree_graphs(AOG, iterations, n_possible_pg)
num_sequences_block = []
for atg in unique_atg:
num_sequences_block.append(calc_number_of_possible_sequences(atg))
bins = calc_bin_for_histogram(num_sequences_block)
plt.hist(num_sequences_block, bins=bins)
plt.ylabel('counts')
plt.xlabel('sequences')
if save_fig:
plt.savefig(save_path)
plt.show()
plt.close()
def calc_bin_for_histogram(x):
q25, q75 = np.percentile(x, [25, 75])
bin_width = 2 * (q75 - q25) * len(x) ** (-1 / 3)
if bin_width == 0.0:
bin_width = 1
bins = round((max(x) - min(x)) / bin_width)
return bins
def sample_required(n, error_margin):
standard_deviation = 0.5 #according to https://de.wikihow.com/Die-Stichprobengr%C3%B6%C3%9Fe-berechnen
z = 1.96 # for confidency of 95%
upper = (z**2 * standard_deviation*(1-standard_deviation))/error_margin**2
lower = 1 + ((z**2 * standard_deviation*(1-standard_deviation))/(error_margin**2 * n))
sample_size = upper/lower
return round(sample_size)
def estimate_total_num_of_sequences(AOG, num_possible_pg, sample_size):
mean = calc_mean_num_sequences(AOG, sample_size, num_possible_pg)
#print(f'mean_num_sequences: {mean}')
estimate_num_sequences = round(num_possible_pg * mean)
return estimate_num_sequences
def calc_responsiveness(num_sequences, data_size):
return num_sequences / data_size
import sys
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
import logging
from statistics import mean
from src.graph_utilities.and_or_graph_generator_bottom_up import *
from src.graph_utilities.graph_helper import sort_by_values_len, get_topological_sort_list_by_assembly_tiers
from src.graph_utilities.pg_to_assembly_tree import *
from src.validation.evaluation_calculation import estimate_number_of_possible_PGs
from src.validation.responsiveness_calculation import *
from src.graph_utilities.graph_helper import estimate_num_sequences, init_assembly_tiers, update_at_df_with_tier_list
import os
import time
def open_aog_pickle(path):
with open(path, 'rb') as input_file:
and_or_graph = pickle.load(input_file)
return and_or_graph
def assembly_tier_pg_xlsx_to_df(path):
pg_df = pd.read_excel(path, header=None)
# add product names to pg_df
at_df = read_assembly_tiers(path)
product_index_list = at_df['Product']
pg_df.columns = product_index_list
pg_df.index = product_index_list
return pg_df
def read_assembly_tiers(path):
at_xl_file = pd.ExcelFile(path)
at_df = at_xl_file.parse('Assembly Directions', header=None, index_col=None)
at_df.fillna(value=0, inplace=True)
at_df = at_df[[0, 7]]
new_header = at_df.iloc[0]
at_df = at_df[1:]
at_df.columns = new_header
return at_df
def pg_df_to_assembly_tree(pg_df, at_df):
pg = nx.from_pandas_adjacency(pg_df, create_using=nx.DiGraph())
# add assembly tiers as layer
#TODO: If modified pg, then generate assembly tiers
initial_part = get_initial_part(pg)
logging.info(f'initial part: {initial_part}')
tier_list = init_assembly_tiers(pg, initial_node=initial_part)
at_df = update_at_df_with_tier_list(at_df, tier_list)
add_assembly_tiers_to_graph(pg, at_df)
mapped_data = init_mapped_data_with_sub_2(pg)
# sort by layer
sorted_node_list = get_topological_sort_list_by_assembly_tiers(pg, initial_node=initial_part)
pg_sorted = nx.DiGraph()
pg_sorted.add_nodes_from(sorted_node_list)
node_attributes = {node:data for node, data in pg.nodes(data=True)}
for key, value in node_attributes.items():
pg_sorted.nodes[key]['layer'] = value['layer']
pg_sorted.add_edges_from(pg.edges())
#nx.draw(pg_sorted, with_labels=True)
#plt.show()
pg_sorted = relabel_all_nodes(pg_sorted)
#nx.draw(pg_sorted, with_labels=True)
#plt.show()
mapped_data = get_first_tier_node_info(pg_sorted, mapped_data, initial_part)
mapped_data = get_node_info_for_other_tiers(pg_sorted, mapped_data)
pg = add_mapped_data_to_nodes(pg_sorted, mapped_data)
return pg
def add_assembly_tiers_to_graph(pg, at_df):
for idx, row in at_df.iterrows():
product = row['Product']
layer = row['Assembly Tier']
pg.nodes[product]['layer'] = layer
pg.nodes[product]['tier'] = layer
def save_graph_to_pickle(path, G):
with open(path, 'wb') as handle:
pickle.dump(G, handle, protocol=pickle.HIGHEST_PROTOCOL)
def plot_and_save_atg_from_pg(atg_from_pg, product_name, write_path):
pos = []
try:
pos = nx.multipartite_layout(atg_from_pg, subset_key='layer')
except:
print("Error while calcing pos for max_flex_atg")
plt.figure(figsize=(8, 8))
if pos != []:
nx.draw(atg_from_pg, pos=pos, with_labels=True)
else:
nx.draw(atg_from_pg, with_labels=True)
plt.axis("equal")
plt.savefig(os.path.join(write_path, product_name + '_atg_from_pg.png'))
plt.show()
plt.close()
def main():
dir_path = os.path.join('C:\\', 'Users', '...') #Insert absolute path
read_path = dir_path
write_path = dir_path
product_name = "centrifugal_pump"
log_filename = os.path.join(write_path, product_name + '_py.log')
logging.basicConfig(
format='%(levelname)s | %(asctime)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler()
]
)
# -------------------------
# component based PG to ATG
# -------------------------
logging.info('--- PG to ATG analysis started ---')
pg_path = os.path.join(read_path, product_name + '_AssemblyTiers.xlsx')
pg_df = assembly_tier_pg_xlsx_to_df(pg_path)
at_df = read_assembly_tiers(pg_path)
# Check if PG is complete
pg = nx.from_pandas_adjacency(pg_df, create_using=nx.DiGraph())
n_nodes_total = pg.number_of_nodes()
print(f'n_nodes_total: {n_nodes_total}')
nx.draw(pg, with_labels=True)
plt.show()
# count isolated nodes
isolated_nodes = list(nx.isolates(pg))
n_isolates = len(isolated_nodes)
print(f'n_isolates: {n_isolates}')
logging.info('Start transforming pg to atg.')
start = time.perf_counter()
atg_from_pg = pg_df_to_assembly_tree(pg_df, at_df)
stop = time.perf_counter()
t_pg_to_atg = stop - start
print(f't_pg_to_atg: {t_pg_to_atg}')
# Save generated atg graph
save_path = os.path.join(write_path, product_name + '_atg_from_pg.pickle')
save_graph_to_pickle(save_path, atg_from_pg)
# Plot atg_from_pg
plot_and_save_atg_from_pg(atg_from_pg, product_name, write_path)
atg_from_pg_seq = calc_number_of_possible_sequences(atg_from_pg)
print(f'atg_from_pg num of sequences: {atg_from_pg_seq}')
is_num_seq_pg_precise = True
'''
except:
logging.warning('Could not calc num of sequences for pg')
logging.info('Brute force calculation started')
# TODO calc brute force
atg_from_pg_seq = estimate_num_sequences(atg_from_pg, n_nodes_total * n_nodes_total)
print(f'atg_from_pg num of sequences: {atg_from_pg_seq}')
is_num_seq_pg_precise = False
'''
data_atg_from_pg = sys.getsizeof(atg_from_pg)
# ------------------------
# MW & LW to AOG to ATG
# ------------------------
logging.info('--- AOG to ATG analysis started ---')
# create AOG bu from liaison & mw
logging.info('Start generating AOG from MW and Liaisons')
restrict_nonstat_size = False
max_nonstat_parts = 3
result = run_experiment_xlsx(read_path, write_path, product_name, restrict_nonstat_size=restrict_nonstat_size,
max_nonstat_parts=max_nonstat_parts)
print(result)
logging.info('Saved AOG as pickle file.')
# Calc AOG values
logging.info('Load AOG from pickle file.')
aog_path = os.path.join(read_path, product_name + "_AOG.pickle")
G = open_aog_pickle(aog_path)
G_sorted = sort_by_values_len(G)
# Close unused variables to free memory
del(G)
del(pg)
del(pg_df)
del(at_df)
logging.info('Calc number of possible PGs from AOG...')
n_aog_edges = result['edge_count']
logging.info(f'n_aog_edges: {n_aog_edges}')
if n_aog_edges < 2200:
is_n_possible_pg_exact = True
num_possible_pg = calc_number_of_possible_PGs(G_sorted)
else:
logging.warning('number of possible PGs just estimated, because computing complexity too high.')
is_n_possible_pg_exact = False
num_possible_pg = estimate_number_of_possible_PGs(n_aog_edges)
logging.info('Calc sample size for total num_sequence_estimation...')
sample_size = sample_required(num_possible_pg, 0.05)
logging.info(f'sample_size: {sample_size}')
logging.info('Estimate total num_of_sequences for AOG...')
total_num_sequences = estimate_total_num_of_sequences(G_sorted, num_possible_pg, sample_size)
print(f'total_num_sequences: {total_num_sequences}')
data_size_aog = sys.getsizeof(G_sorted)
print(f'data size of AOG: {data_size_aog}')
logging.info('Create histrogram...')
plot_histogram_num_sequences(G_sorted, sample_size, num_possible_pg, save_fig=True,
save_path=os.path.join(write_path, product_name + '_n_seq_histogram.png'))
logging.info('Find an optimal ATG from AOG...')
start = time.perf_counter()
max_flex_atg = get_monte_carlo_optimal_assembly_tree(G_sorted, sample_size, opt_goal='high_flex')
stop = time.perf_counter()
t_aog_to_atg = stop - start
print(f't_aog_to_atg: {t_aog_to_atg}')
# Save generated atg graph
save_path = os.path.join(write_path, product_name + '_atg_from_aog_max_flex.pickle')
save_graph_to_pickle(save_path, max_flex_atg)
max_seq = calc_number_of_possible_sequences(max_flex_atg)
print(f'max number of sequences: {max_seq}')
data_max_flex_atg = sys.getsizeof(max_flex_atg)
print(f'data size of max_flex_atg: {data_max_flex_atg}')
# Plot max_flex_atg
pos = []
try:
pos = nx.multipartite_layout(max_flex_atg, subset_key='layer')
except:
logging.warning('Calcing pos for max_flex_atg not possible.')
plt.figure(figsize=(8, 8))
if pos != []:
nx.draw(max_flex_atg, pos=pos, with_labels=True)
else:
nx.draw(max_flex_atg, with_labels=True)
plt.axis("equal")
plt.savefig(os.path.join(write_path, product_name + '_atg_max_flex.png'))
plt.show()
plt.close()
# Save data to excel
result_data = [
["VERIFICATION", ""],
["n_nodes_total", n_nodes_total],
["n_nodes_isolated", n_isolates],
["n_subgraphs", ""],
["n_nodes_min_subgraph", ""],
["n_nodes_avg_subgraph", ""],
["n_nodes_max_subgraph", ""],
["is_num_seq_pg_precise", ""],
["", ""],
["PG TO ATG ANALYSIS", ""],
["n_seq", atg_from_pg_seq],
["t_gen [s]", t_pg_to_atg],
["data_size [byte]", data_atg_from_pg],
["", ""],
["AOG TO ATG ANALYSIS", ""],
["algorithm_type", "bottom_up"],
["restrict_non_stat_size", restrict_nonstat_size],
["max_nonstat_parts", max_nonstat_parts],
["t_gen_aog [s]", result['runtime']],
["n_nodes_aog", result['node_count']],
["n_edges_aog", result['edge_count']],
["n_possible_pg", num_possible_pg],
["n_possible_pg_exact", is_n_possible_pg_exact],
["sample_size", sample_size],
["n_seq_aog_total", total_num_sequences],
["data_size_aog [byte]", data_size_aog],
["", ""],
["Find optimal ATG from AOG", ""],
["n_seq", max_seq],
["t_gen [s]", t_aog_to_atg],
["data_size", data_max_flex_atg]
]
logging.info('Save results to excel file.')
result_data_df = pd.DataFrame(result_data)
result_data_df.to_excel(os.path.join(write_path, product_name + '_graph_analysis_results.xlsx'))
print("Done.")
if __name__ == '__main__':
main()
import sys
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
import logging
from statistics import mean
from src.graph_utilities.and_or_graph_generator_bottom_up import *
from src.graph_utilities.graph_helper import sort_by_values_len, get_topological_sort_list_by_assembly_tiers, \
init_assembly_tiers_multiple_start_nodes
from src.graph_utilities.pg_to_assembly_tree import *
from src.validation.evaluation_calculation import estimate_number_of_possible_PGs
from src.validation.responsiveness_calculation import *
from src.graph_utilities.graph_helper import estimate_num_sequences, init_assembly_tiers, update_at_df_with_tier_list
import os
import time
def open_aog_pickle(path):
with open(path, 'rb') as input_file:
and_or_graph = pickle.load(input_file)
return and_or_graph
def assembly_tier_pg_xlsx_to_df(path):
pg_df = pd.read_excel(path, header=None)
# add product names to pg_df
at_df = read_assembly_tiers(path)
product_index_list = at_df['Product']
pg_df.columns = product_index_list
pg_df.index = product_index_list
return pg_df
def read_assembly_tiers(path):
at_xl_file = pd.ExcelFile(path)
at_df = at_xl_file.parse('Assembly Directions', header=None, index_col=None)
at_df.fillna(value=0, inplace=True)
at_df = at_df[[0, 7]]
new_header = at_df.iloc[0]
at_df = at_df[1:]
at_df.columns = new_header
return at_df
def pg_df_to_assembly_tree(pg_df, at_df):
pg = nx.from_pandas_adjacency(pg_df, create_using=nx.DiGraph())
initial_parts = get_all_initial_parts(pg)
logging.info(f'initial part: {initial_parts}')
for elem in initial_parts:
pg.add_edge('Start', elem)
pg.nodes['Start']['layer'] = 0
pg.nodes['Start']['tier'] = 0
# add assembly tiers as layer
initial_part = 'Start'
tier_list = init_assembly_tiers(pg)
at_df = update_at_df_with_tier_list(at_df, tier_list)
add_assembly_tiers_to_graph(pg, at_df)
mapped_data = init_mapped_data_with_sub_2(pg)
# sort by layer
sorted_node_list = get_topological_sort_list_by_assembly_tiers(pg, initial_node='Start')
pg_sorted = nx.DiGraph()
pg_sorted.add_nodes_from(sorted_node_list)
node_attributes = {node:data for node, data in pg.nodes(data=True)}
for key, value in node_attributes.items():
pg_sorted.nodes[key]['layer'] = value['layer']
pg_sorted.add_edges_from(pg.edges())
pg_sorted = relabel_all_nodes(pg_sorted)
#nx.draw(pg_sorted, with_labels=True)
#plt.show()
mapped_data = get_first_tier_node_info(pg_sorted, mapped_data, 'Start')
mapped_data = get_node_info_for_other_tiers(pg_sorted, mapped_data)
pg = add_mapped_data_to_nodes(pg_sorted, mapped_data)
return pg
def add_assembly_tiers_to_graph(pg, at_df):
for idx, row in at_df.iterrows():
product = row['Product']
layer = row['Assembly Tier']
pg.nodes[product]['layer'] = layer
pg.nodes[product]['tier'] = layer
def save_graph_to_pickle(path, G):
with open(path, 'wb') as handle:
pickle.dump(G, handle, protocol=pickle.HIGHEST_PROTOCOL)
def plot_and_save_atg_from_pg(atg_from_pg, product_name, write_path):
pos = []
try:
pos = nx.multipartite_layout(atg_from_pg, subset_key='layer')
except:
print("Error while calcing pos for max_flex_atg")
plt.figure(figsize=(8, 8))
if pos != []:
nx.draw(atg_from_pg, pos=pos, with_labels=True)
else:
nx.draw(atg_from_pg, with_labels=True)
plt.axis("equal")
plt.savefig(os.path.join(write_path, product_name + '_atg_from_pg.png'))
plt.show()
plt.close()
def main():
dir_path = os.path.join('C:\\', 'Users', 'adam-uqef35nm77fzn5b', 'Sciebo', '03_Dissertation', '50_Versuche', 'data')
read_path = os.path.join(dir_path, "RQ1_v6")
write_path = os.path.join(dir_path, "RQ1_v6")
product_name = "ex_rq1_10_cutting_bend"
log_filename = os.path.join(write_path, product_name + '_reduce_graph_py.log')
logging.basicConfig(
format='%(levelname)s | %(asctime)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler()
]
)
# -------------------------
# component based PG to ATG
# -------------------------
logging.info('--- PG to ATG analysis started ---')
pg_path = os.path.join(read_path, product_name + '_AssemblyTiers.xlsx')
pg_df = assembly_tier_pg_xlsx_to_df(pg_path)
at_df = read_assembly_tiers(pg_path)
# Check if PG is complete
pg = nx.from_pandas_adjacency(pg_df, create_using=nx.DiGraph())
n_nodes_total = pg.number_of_nodes()
print(f'n_nodes_total: {n_nodes_total}')
isolated_nodes = list(nx.isolates(pg))
n_isolates = len(isolated_nodes)
print(f'n_isolates: {n_isolates}')
print(f'isolate_nodes: {isolated_nodes}')
nx.draw(pg, with_labels=True)
plt.show()
graph_reduction = input("Do you want to eliminate nodes? (y/n): ")
if graph_reduction == 'y':
logging.info('Remove nodes from data manually...')
input_string = input("Enter nodes to remove (separated by space): ")
remove_list = input_string.split(" ")
print(f'remove_list: {remove_list}')
#remove from pg_df, at_df
translation_df = at_df.copy()
print(pg_df)
pg_df.drop(remove_list, axis=0, inplace=True)
pg_df.drop(remove_list, axis=1, inplace=True)
print(pg_df)
print(at_df)
for elem in remove_list:
at_df.drop(at_df.index[at_df['Product']==elem], inplace=True)
print(at_df)
pg.remove_nodes_from(remove_list)
n_nodes_total = pg.number_of_nodes()
print(f'n_nodes_total: {n_nodes_total}')
logging.info('Start transforming pg to atg.')
start = time.perf_counter()
atg_from_pg = pg_df_to_assembly_tree(pg_df, at_df)
stop = time.perf_counter()
t_pg_to_atg = stop - start
print(f't_pg_to_atg: {t_pg_to_atg}')
# Save generated atg graph
save_path = os.path.join(write_path, product_name + '_atg_from_pg.pickle')
save_graph_to_pickle(save_path, atg_from_pg)
# Plot atg_from_pg
plot_and_save_atg_from_pg(atg_from_pg, product_name, write_path)
atg_from_pg_seq = calc_number_of_possible_sequences(atg_from_pg)
print(f'atg_from_pg num of sequences: {atg_from_pg_seq}')
is_num_seq_pg_precise = True
'''
except:
logging.warning('Could not calc num of sequences for pg')
logging.info('Brute force calculation started')
# TODO calc brute force
atg_from_pg_seq = estimate_num_sequences(atg_from_pg, n_nodes_total * n_nodes_total)
print(f'atg_from_pg num of sequences: {atg_from_pg_seq}')
is_num_seq_pg_precise = False
'''
data_atg_from_pg = sys.getsizeof(atg_from_pg)
# ------------------------
# MW & LW to AOG to ATG
# ------------------------
logging.info('--- AOG to ATG analysis started ---')
if graph_reduction == 'y':
logging.info('Remove same nodes from MW and Liaisons.')
liaisons_file = os.path.join(read_path, product_name + "_Liaisons.xlsx")
mw_path = os.path.join(read_path, product_name + "_Moving wedge.xlsx")
liaison_df = read_liaison_data_excel(liaisons_file)
mw_dfs = read_mw_data_excel(mw_path)
# get indexes to remove
remove_list_index = []
for elem in remove_list:
index_to_remove = translation_df.index[translation_df['Product'] == elem].tolist()
remove_list_index.append(index_to_remove[0])
remove_list_index = [x - 1 for x in remove_list_index]
print(f'remove_list_index: {remove_list_index}')
# remove column & rows from df
liaison_df.drop(remove_list_index, axis=0, inplace=True)
liaison_df.drop(remove_list_index, axis=1, inplace=True)
mw_x = mw_dfs['MW_x']
mw_y = mw_dfs['MW_y']
mw_z = mw_dfs['MW_z']
mw = [mw_x, mw_y, mw_z]
for elem in mw:
elem.drop(remove_list_index, axis=0, inplace=True)
elem.drop(remove_list_index, axis=1, inplace=True)
# save reduced matrices as xlsx
liaisons_path_reduced = os.path.join(read_path, product_name + "_Liaisons_reduced.xlsx")
mw_path_reduced = os.path.join(read_path, product_name + "_Moving wedge_reduced.xlsx")
liaison_df.to_excel(liaisons_path_reduced, sheet_name='Liaison Matrix', header=False, index=False)
with pd.ExcelWriter(mw_path_reduced) as writer:
mw[0].to_excel(writer, sheet_name='MW_x', header=False, index=False)
mw[1].to_excel(writer, sheet_name='MW_y', header=False, index=False)
mw[2].to_excel(writer, sheet_name='MW_z', header=False, index=False)
logging.info('Reduced MW and Liaison saved as XLSX.')
# create AOG bu from liaison & mw
logging.info('Start generating AOG from MW and Liaisons')
restrict_nonstat_size = False
max_nonstat_parts = 3
if graph_reduction == 'y':
result = run_experiment_xlsx(read_path, write_path, product_name, restrict_nonstat_size=restrict_nonstat_size, max_nonstat_parts=max_nonstat_parts, reduced=True)
else:
result = run_experiment_xlsx(read_path, write_path, product_name, restrict_nonstat_size=restrict_nonstat_size,
max_nonstat_parts=max_nonstat_parts, reduced=False)
print(result)
logging.info('Saved AOG as pickle file.')
# Calc AOG values
logging.info('Load AOG from pickle file.')
aog_path = os.path.join(read_path, product_name + "_AOG.pickle")
G = open_aog_pickle(aog_path)
G_sorted = sort_by_values_len(G)
n_parts_in_aog = len(G_sorted[0][0])
# Close unused variables to free memory
del(G)
del(pg)
del(pg_df)
del(at_df)
logging.info('Calc number of possible PGs from AOG...')
n_aog_edges = result['edge_count']
logging.info(f'n_aog_edges: {n_aog_edges}')
if n_aog_edges < 2200:
is_n_possible_pg_exact = True
num_possible_pg = calc_number_of_possible_PGs(G_sorted)
else:
logging.warning('number of possible PGs just estimated, because computing complexity too high.')
is_n_possible_pg_exact = False
num_possible_pg = estimate_number_of_possible_PGs(n_aog_edges)
logging.info('Calc sample size for total num_sequence_estimation...')
sample_size = sample_required(num_possible_pg, 0.05)
logging.info(f'sample_size: {sample_size}')
logging.info('Estimate total num_of_sequences for AOG...')
total_num_sequences = estimate_total_num_of_sequences(G_sorted, num_possible_pg, sample_size)
print(f'total_num_sequences: {total_num_sequences}')
data_size_aog = sys.getsizeof(G_sorted)
print(f'data size of AOG: {data_size_aog}')
logging.info('Create histrogram...')
plot_histogram_num_sequences(G_sorted, sample_size, num_possible_pg, save_fig=True,
save_path=os.path.join(write_path, product_name + '_n_seq_histogram.png'))
logging.info('Find an optimal ATG from AOG...')
start = time.perf_counter()
max_flex_atg = get_monte_carlo_optimal_assembly_tree(G_sorted, sample_size, opt_goal='high_flex')
stop = time.perf_counter()
t_aog_to_atg = stop - start
print(f't_aog_to_atg: {t_aog_to_atg}')
# Save generated atg graph
save_path = os.path.join(write_path, product_name + '_atg_from_aog_max_flex.pickle')
save_graph_to_pickle(save_path, max_flex_atg)
max_seq = calc_number_of_possible_sequences(max_flex_atg)
print(f'max number of sequences: {max_seq}')
data_max_flex_atg = sys.getsizeof(max_flex_atg)
print(f'data size of max_flex_atg: {data_max_flex_atg}')
# Plot max_flex_atg
max_flex_atg.nodes['Start']['layer'] = 0
tier_list = init_assembly_tiers(max_flex_atg)
for i, tier in enumerate(tier_list):
for node in tier:
max_flex_atg.nodes[node]['layer'] = i
pos = []
try:
pos = nx.multipartite_layout(max_flex_atg, subset_key='layer')
except:
logging.warning('Calcing pos for max_flex_atg not possible.')
plt.figure(figsize=(8, 8))
if pos != []:
nx.draw(max_flex_atg, pos=pos, with_labels=True)
else:
nx.draw(max_flex_atg, with_labels=True)
plt.axis("equal")
plt.savefig(os.path.join(write_path, product_name + '_atg_max_flex.png'))
plt.show()
plt.close()
# Save data to excel
result_data = [
["VERIFICATION", ""],
["n_nodes_total", n_nodes_total],
["n_nodes_isolated", n_isolates],
["n_subgraphs", ""],
["n_nodes_min_subgraph", ""],
["n_nodes_avg_subgraph", ""],
["n_nodes_max_subgraph", ""],
["is_num_seq_pg_precise", ""],
["", ""],
["PG TO ATG ANALYSIS", ""],
["n_seq", atg_from_pg_seq],
["t_gen [s]", t_pg_to_atg],
["data_size [byte]", data_atg_from_pg],
["", ""],
["AOG TO ATG ANALYSIS", ""],
["algorithm_type", "bottom_up"],
["restrict_non_stat_size", restrict_nonstat_size],
["max_nonstat_parts", max_nonstat_parts],
["t_gen_aog [s]", result['runtime']],
["n_parts_in_aog", n_parts_in_aog],
["n_nodes_aog", result['node_count']],
["n_edges_aog", result['edge_count']],
["n_possible_pg", num_possible_pg],
["n_possible_pg_exact", is_n_possible_pg_exact],
["sample_size", sample_size],
["n_seq_aog_total", total_num_sequences],
["data_size_aog [byte]", data_size_aog],
["", ""],
["Find optimal ATG from AOG", ""],
["n_seq", max_seq],
["t_gen [s]", t_aog_to_atg],
["data_size", data_max_flex_atg]
]
logging.info('Save results to excel file.')
result_data_df = pd.DataFrame(result_data)
result_data_df.to_excel(os.path.join(write_path, product_name + '_graph_analysis_results.xlsx'))
print("Done.")
if __name__ == '__main__':
#import cProfile
#import pstats
#pr = cProfile.Profile()
#pr.enable()
main()
#pr.disable()
#stats = pstats.Stats(pr)
#stats.sort_stats(pstats.SortKey.TIME)
#stats.dump_stats(filename='rq1_script_profiling.prof')
from OCC.Extend.DataExchange import read_step_file_with_names_colors
from OCC.Display.SimpleGui import init_display
from OCC.Core.Quantity import Quantity_Color, Quantity_TOC_RGB
from src.abd_by_occ.basic_cad_functions.part_handling import remove_fasteners, rename_shapes_with_numbers
import pickle
import networkx as nx
import matplotlib.pyplot as plt
from random import shuffle
from itertools import chain
class View():
def setup(self, controller):
self.display, self.start_display, self.add_menu, self.add_function_to_menu = init_display()
self.base_color = Quantity_Color(0.7, 0.7, 0.7, Quantity_TOC_RGB)
self.current_color = Quantity_Color(0.7, 0.1, 0.1, Quantity_TOC_RGB)
self.assembly_compound = {}
self.current_compound = {}
self.current_assembly_tier = 0
self.current_assembly_sequence_index = 0
self.add_menu('Edit')
self.add_function_to_menu('Edit', controller.init_part)
self.add_function_to_menu('Edit', controller.next_tier)
self.add_function_to_menu('Edit', controller.next_part)
def update_compounds_with_sequence(self, assembly_sequence, full_compound, assembly_tree_graph):
self.assembly_compound = {}
self.current_compound = {}
node = assembly_sequence[self.current_assembly_sequence_index]
sub_1 = assembly_tree_graph.nodes[node]['sub_1']
sub_2 = assembly_tree_graph.nodes[node]['sub_2']
for part in sub_1:
self.assembly_compound = append_shape_to_compound(full_compound, self.assembly_compound, part)
for part in sub_2:
self.current_compound = append_shape_to_compound(full_compound, self.current_compound, part)
def update_compounds_with_tiers(self, assembly_tiers, full_compound):
self.assembly_compound.update(self.current_compound)
self.current_compound = {}
for node in assembly_tiers[self.current_assembly_tier]:
self.current_compound = append_shape_to_compound(full_compound, self.current_compound, node)
def display_assembly_compound(self):
for shape in self.assembly_compound:
self.display.DisplayShape(shape, color=self.base_color, transparency=0.7)
for shape in self.current_compound:
self.display.DisplayShape(shape, color=self.current_color)
def display_assembly_graph(self, graph, assembly_sequence):
plt.close()
current_task = assembly_sequence[self.current_assembly_sequence_index]
print(f'current_task: {current_task}')
#todo: highlight current task in graph
self.update_color_map(graph, current_task)
pos = nx.multipartite_layout(graph, subset_key='tier')
nx.draw(graph, pos=pos, node_color=self.color_map, with_labels=True)
plt.show()
def init_color_map(self, graph):
color_map = []
for node in graph:
color_map.append('green')
self.color_map = color_map
def update_color_map(self, graph, node_idx):
for count, node in enumerate(graph):
if node == node_idx:
self.color_map[count] = 'red'
def highlight_node_in_graph(self, graph, node_idx):
color_map = []
for node in graph:
if node == node_idx:
color_map.append('red')
else:
color_map.append('green')
return color_map
def start_main_loop(self):
self.start_display()
class Model():
def __init__(self, full_compound, assembly_tree_graph):
self.full_compound = full_compound
self.assembly_tree_graph = assembly_tree_graph
self.init_assembly_tiers()
self.add_tier_to_nodes()
self.get_random_linear_assembly_sequence()
def init_assembly_tiers(self):
tier_list = []
initial_node = 'Start'
tier_list.append([initial_node])
current_tier = 0
while True:
next_tier_nodes = []
next_neighbor_nodes = get_next_neighbor_nodes(self.assembly_tree_graph, tier_list[current_tier])
if next_neighbor_nodes != []:
for node in next_neighbor_nodes:
if self.predecessors_already_visited(node, tier_list, self.assembly_tree_graph):
next_tier_nodes.append(node)
tier_list.append(next_tier_nodes)
current_tier += 1
else:
break
self.assembly_tiers = tier_list
def predecessors_already_visited(self, node, tier_list, G):
flatten_tier_list = list(chain.from_iterable(tier_list))
for predecessor in G.predecessors(node):
if predecessor in flatten_tier_list:
return True
else:
return False
def add_tier_to_nodes(self):
for counter, tier in enumerate(self.assembly_tiers):
for elem in tier:
if elem in self.assembly_tree_graph.nodes():
self.assembly_tree_graph.nodes[elem]['tier'] = counter
def get_initial_process(self, assembly_compound):
node_id = self.assembly_tiers[1]
node_id = node_id[0]
sub_1 = self.assembly_tree_graph.nodes[node_id]['sub_1']
sub_1 = sub_1[0]
assembly_compound = append_shape_to_compound(self.full_compound, assembly_compound, sub_1)
def get_random_linear_assembly_sequence(self):
assembly_sequence = []
for tier in self.assembly_tiers:
shuffle(tier)
for elem in tier:
assembly_sequence.append(elem)
print(f'random assembly seqeuence: {assembly_sequence}')
self.assembly_sequence = assembly_sequence
class Controller():
def __init__(self, model, view):
self.model = model
self.view = view
def start(self):
self.view.setup(self)
self.view.start_main_loop()
def init_part(self):
self.model.get_initial_process(self.view.assembly_compound)
self.view.display.EraseAll()
self.view.display_assembly_compound()
self.view.init_color_map(self.model.assembly_tree_graph)
self.view.display_assembly_graph(self.model.assembly_tree_graph, self.model.assembly_sequence)
def next_tier(self):
self.view.current_assembly_tier += 1
self.view.update_compounds_with_tiers(assembly_tiers=self.model.assembly_tiers, full_compound=self.model.full_compound)
self.view.display.EraseAll()
self.view.display_assembly_compound()
def next_part(self):
self.view.current_assembly_sequence_index += 1
self.view.update_compounds_with_sequence(assembly_sequence=self.model.assembly_sequence, full_compound=self.model.full_compound, assembly_tree_graph=self.model.assembly_tree_graph)
self.view.display.EraseAll()
self.view.display_assembly_compound()
self.view.display_assembly_graph(self.model.assembly_tree_graph, self.model.assembly_sequence)
#print(f'current_task: {self.model.assembly_sequence[self.view.current_assembly_sequence_index]}')
# ------------------
# Utility functions
# ------------------
def append_shape_to_compound(aResCompound, current_compound, node):
node_id = node
for shape in aResCompound:
label, color = aResCompound[shape]
if str(node_id) in label:
current_compound[shape] = [label, color]
break
return current_compound
# get successors
def get_next_neighbor_nodes(g, current_tier_nodes):
next_tier_nodes = []
for node in current_tier_nodes:
for neighbor in g.neighbors(node):
if neighbor not in next_tier_nodes:
if neighbor != node:
next_tier_nodes.append(neighbor)
return next_tier_nodes
def plot_graph(G):
pos = nx.multipartite_layout(G, subset_key='layer')
plt.figure(figsize=(8, 8))
nx.draw(G, pos, with_labels=True)
plt.axis("equal")
plt.show()
# ----------------
# Main Loop
# ----------------
def main():
product_name = 'centrifugal_pump'
step_file_path = '../../data/stepFiles/' + product_name + '.stp'
# Load 3D model
aResCompound = read_step_file_with_names_colors(step_file_path)
aResCompound = rename_shapes_with_numbers(aResCompound)
aResCompound = remove_fasteners(aResCompound)
# Load precedence graph
pickle_path = '../../out/assembly_tree/' + product_name + '_from_pg.pickle'
#pickle_path = '../../out/assembly_tree/' + product_name + '_translated.pickle'
with open(pickle_path, 'rb') as input_file:
assembly_tree_graph = pickle.load(input_file)
c = Controller(Model(aResCompound, assembly_tree_graph), View())
c.start()
if __name__ == '__main__':
main()
\ No newline at end of file
from src.validation.evaluation_calculation import calc_number_of_possible_PGs, calc_number_of_possible_sequences
from src.graph_utilities.aog_to_assembly_tree import get_assembly_tree_random
from src.graph_utilities.graph_helper import get_random_sequence_from_PG
import networkx as nx
from matplotlib import pyplot as plt
test_graph_3 = {
1: ([1,2,3,4,5], [1,2,3,5], [4]),
2: ([1,2,3,4,5], [1,2,3,4], [5]),
3: ([1,2,3,5], [1,2,3], [5]),
4: ([1,2,3,4], [1,2,3], [4]),
5: ([1,2,3], [1], [2,3]),
6: ([1,2,3], [2], [1,3]),
7: ([2,3], [2], [3]),
8: ([1,3], [2], [3])
}
def all_sequences_unique(AOG, n_possible_pg, max_iter=100):
unique_atg = get_unique_assembly_tree_graphs(AOG, max_iter, n_possible_pg)
num_sequences = calc_num_sequences_per_atg(unique_atg)
all_sequences = get_all_sequences(unique_atg, num_sequences, max_iter)
all_unique = all_elem_unique(all_sequences)
return all_unique
def get_unique_assembly_tree_graphs(AOG, max_iter, n_possible_pg):
unique_atg_jsons = get_all_unique_atg_jsons(AOG, max_iter, n_possible_pg)
unique_atg = get_all_unique_atg(unique_atg_jsons)
return unique_atg
def get_all_unique_atg_jsons(AOG, max_iter, n_possible_pg):
#print(f'num_ATGs: {num_atgs}')
unique_atg_jsons = []
iter_counter = 0
while (len(unique_atg_jsons) < n_possible_pg) and (iter_counter < max_iter):
atg = get_assembly_tree_random(AOG)
atg_json = nx.readwrite.json_graph.adjacency_data(atg)
if atg_json not in unique_atg_jsons:
unique_atg_jsons.append(atg_json)
iter_counter += 1
return unique_atg_jsons
def get_all_unique_atg(unique_atg_jsons):
unique_atg = []
for atg_json in unique_atg_jsons:
graph = nx.readwrite.json_graph.adjacency_graph(atg_json)
#graph.remove_edge('Start', 'Start')
unique_atg.append(graph)
return unique_atg
def calc_num_sequences_per_atg(unique_atg):
num_sequences = []
for PG in unique_atg:
seq = calc_number_of_possible_sequences(PG)
num_sequences.append(seq)
#print(f'num_sequences = {num_sequences}')
return num_sequences
def get_all_sequences(unique_atg, num_sequences, max_iter):
all_sequences = []
for idx, atg in enumerate(unique_atg):
unique_sequences = []
max_sequences = num_sequences[idx]
iter_counter = 0
while (len(unique_sequences) < max_sequences) and (iter_counter < max_iter):
random_sequence = get_random_sequence_from_PG(atg)
if random_sequence not in unique_sequences:
unique_sequences.append(random_sequence)
iter_counter += 1
all_sequences.append(unique_sequences)
#print(f'all_sequences: {all_sequences}')
return all_sequences
def all_elem_unique(all_sequences):
unique_united_list = []
all_sequence_counter = 0
unique_sequence_counter = 0
for atg_seqs in all_sequences:
for seq in atg_seqs:
all_sequence_counter += 1
if seq not in unique_united_list:
unique_united_list.append(seq)
unique_sequence_counter += 1
if all_sequence_counter == unique_sequence_counter:
return True
else:
return False
def main():
all_unique = all_sequences_unique(test_graph_3, max_iter=100)
print(f'all sequences unique? {all_unique}')
if __name__ == '__main__':
main()
File added
import os
import pickle
import networkx as nx
import matplotlib.pyplot as plt
if __name__ == '__main__':
dir_path = os.path.join('C:\\', 'Users', '...')
read_path = dir_path
write_path = dir_path
product_name = "ex_rq1_1_centrifugal_pump"
atg_path = os.path.join(read_path, product_name + '_atg_from_aog_max_flex.pickle')
with open(atg_path, 'rb') as input_file:
atg = pickle.load(input_file)
for node, data in atg.nodes(data=True):
print(node, data)
nx.draw(atg, with_labels=True)
plt.show()
aog_path = os.path.join(read_path, product_name + '_AOG.pickle')
with open(aog_path, 'rb') as input_file:
aog = pickle.load(input_file)
print('AOG')
print(aog)
from OCC.Extend.DataExchange import read_step_file_with_names_colors
from OCC.Core.BRepClass3d import BRepClass3d_SolidExplorer
def main():
step_file_path = '../data/stepFiles/centrifugal_pump.stp'
aResShapeCompound = read_step_file_with_names_colors(step_file_path)
se = BRepClass3d_SolidExplorer(list(aResShapeCompound)[0])
tree = se.GetTree()
print(tree)
if __name__ == '__main__':
main()
print("DONE")
\ No newline at end of file
from unittest import TestCase
from src.validation.evaluation_calculation import *
from src.graph_utilities.aog_to_assembly_tree import get_assembly_tree_random
from networkx.readwrite import json_graph
test_graph = {1: ([1, 2, 3, 4, 5, 6], [1, 2, 4, 5, 6], [3]),
2: ([1, 2, 4, 5, 6], [1, 2, 5, 6], [4]),
3: ([1, 2, 4, 5, 6], [1, 4, 5], [2, 6]),
4: ([1, 2, 5, 6], [1, 5], [2, 6]),
5: ([1, 4, 5], [1, 5], [4]),
6: ([1, 4, 5], [1, 4], [5]),
7: ([1, 4], [1], [4]),
8: ([1, 5], [1], [5]),
9: ([2, 6], [2], [6])}
test_graph_2 = {
1: ([1,2,3,4,5], [1,2], [3,4,5]),
2: ([1,2,3,4,5], [1,2,3,4], [5]),
3: ([1,2,3,4], [1,2,3], [4]),
4: ([1,2,3], [1], [2,3]),
5: ([1,2,3], [1,2], [3]),
6: ([1,2], [1], [2]),
7: ([2,3], [2], [3]),
8: ([3,4,5], [3,4], [5]),
9: ([3,4], [3], [4])
}
test_graph_3 = {
1: ([1,2,3,4,5], [1,2,3,5], [4]),
2: ([1,2,3,4,5], [1,2,3,4], [5]),
3: ([1,2,3,5], [1,2,3], [5]),
4: ([1,2,3,4], [1,2,3], [4]),
5: ([1,2,3], [1], [2,3]),
6: ([1,2,3], [2], [1,3]),
7: ([2,3], [2], [3]),
8: ([1,3], [2], [3])
}
test_graph_4 = {
1: ([1,2,3,4,5,6], [1,2,3,5,6], [4]),
2: ([1,2,3,4,5,6], [1,2,3,4], [5,6]),
3: ([1,2,3,5,6], [1,2,3,5], [5,6]),
4: ([1,2,3,4], [1,2,3], [4]),
5: ([1,2,3,5], [1,2,3], [5]),
6: ([1,2,3], [1], [2,3]),
7: ([1,2,3], [2], [1,3]),
8: ([2,3], [2], [3]),
9: ([1,3], [2], [3]),
10: ([5,6], [5], [6])
}
test_graph_5 = {1: ([1, 2, 3, 4, 5, 6, 7, 8], [1, 2, 4, 5, 6, 7, 8], [3]),
2: ([1, 2, 4, 5, 6, 7, 8], [1, 2], [4, 5, 6, 7, 8]),
3: ([1, 2], [1], [2]),
4: ([4, 5, 6, 7, 8], [4], [5, 6, 7, 8]),
5: ([4, 5, 6, 7, 8], [4, 7, 8], [5, 6]),
6: ([4, 7, 8], [4], [7, 8]),
7: ([4, 7, 8], [4, 7], [8]),
8: ([5, 6, 7, 8], [5, 6], [7, 8]),
9: ([5, 6], [5], [6]),
10: ([4, 7], [4], [7]),
11: ([7, 8], [7], [8])}
def get_num_unique_sets_from_random_choice(AOG, sample_size):
random_PGs = []
for i in range(0, sample_size):
atg = get_assembly_tree_random(AOG)
atg_data = json_graph.adjacency_data(atg)
random_PGs.append(atg_data)
unique_sets = [i for n, i in enumerate(random_PGs) if i not in random_PGs[n + 1:]]
num_unique_sets = len(unique_sets)
return num_unique_sets
class Test(TestCase):
def test_graph(self):
G = test_graph
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted, plot_tree=True)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 100)
self.assertTrue(num_unique_sets <= possible_PG_count)
def test_graph2(self):
G = test_graph_2
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted, plot_tree=True)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 100)
self.assertTrue(num_unique_sets <= possible_PG_count)
def test_graph3(self):
G = test_graph_3
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 100)
self.assertTrue(num_unique_sets <= possible_PG_count)
def test_graph4(self):
G = test_graph_4
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 100)
self.assertTrue(num_unique_sets <= possible_PG_count)
def test_graph5(self):
G = test_graph_5
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 100)
self.assertTrue(num_unique_sets <= possible_PG_count)
def test_case_centrifugal_pump(self):
product_name = 'centrifugal_pump'
pickle_path = '../data/and_or_pickles/' + product_name + '.pickle'
with open(pickle_path, 'rb') as input_file:
G = pickle.load(input_file)
G_sorted = sort_by_values_len(G)
possible_PG_count = calc_number_of_possible_PGs(G_sorted)
num_unique_sets = get_num_unique_sets_from_random_choice(G, 2000)
self.assertTrue(num_unique_sets <= possible_PG_count)
import unittest
import networkx as nx
from src.validation.evaluation_calculation import calc_number_of_possible_sequences
class TestCalcNumSequences(unittest.TestCase):
def test_calc_number_of_possible_sequences_Ramos_case(self):
g_example_ramos = nx.DiGraph()
g_example_ramos.add_edge('Start', 0)
g_example_ramos.add_edge(0, 1)
g_example_ramos.add_edge(0, 9)
g_example_ramos.add_edge(9, 10)
g_example_ramos.add_edge(10, 11)
g_example_ramos.add_edge(1, 2)
g_example_ramos.add_edge(1, 7)
g_example_ramos.add_edge(1, 8)
g_example_ramos.add_edge(2, 3)
g_example_ramos.add_edge(2, 5)
g_example_ramos.add_edge(2, 6)
g_example_ramos.add_edge(3, 4)
g_example_ramos.add_edge(5, 4)
g_example_ramos.add_edge(6, 4)
num_sequences = calc_number_of_possible_sequences(g_example_ramos)
self.assertTrue(num_sequences == 41580)
def test_calc_number_of_possible_sequences_ex_1(self):
g_example_1 = nx.DiGraph()
g_example_1.add_edge('Start', 0)
g_example_1.add_edge(0, 1)
g_example_1.add_edge(1, 2)
g_example_1.add_edge(1, 3)
g_example_1.add_edge(2, 4)
g_example_1.add_edge(3, 4)
g_example_1.add_edge(0, 5)
g_example_1.add_edge(5, 4)
num_sequences = calc_number_of_possible_sequences(g_example_1)
self.assertTrue(num_sequences == 8)
import unittest
import networkx as nx
from src.validation.evaluation_calculation import calc_num_plans_of_parallel_node, calc_num_plans_of_serial_node
class TestCalc(unittest.TestCase):
def test_calc_num_plans_of_parallel_node(self):
test_tree = nx.DiGraph()
test_tree.add_edge('p_0', 3)
test_tree.add_edge('p_0', 5)
num_plans = calc_num_plans_of_parallel_node(test_tree, 'p_0')
self.assertTrue(num_plans == 2)
def test_calc_num_plans_of_parallel_node2(self):
test_tree2 = nx.DiGraph()
test_tree2.add_edge('p_0', 3)
test_tree2.add_edge('p_0', 5)
test_tree2.add_edge('p_0', 6)
num_plans = calc_num_plans_of_parallel_node(test_tree2, 'p_0')
self.assertTrue(num_plans == 6)
def test_calc_num_plans_of_serial_node(self):
test_tree3 = nx.DiGraph()
test_tree3.add_edge('p_0', 3)
test_tree3.add_edge('p_0', 5)
test_tree3.add_edge('p_0', 6)
test_tree3.add_edge('s_0', 2)
test_tree3.add_edge('s_0', 4)
test_tree3.add_edge('s_0', 'p_0')
num_plans = calc_num_plans_of_serial_node(test_tree3, 's_0')
self.assertTrue(num_plans == 6)
def test_calc_num_plans_of_parallel_node3(self):
test_tree4 = nx.DiGraph()
test_tree4.add_edge('p_0', 3)
test_tree4.add_edge('p_0', 5)
test_tree4.add_edge('p_0', 6)
test_tree4.add_edge('s_0', 2)
test_tree4.add_edge('s_0', 4)
test_tree4.add_edge('s_0', 'p_0')
test_tree4.add_edge('p_1', 7)
test_tree4.add_edge('p_1', 8)
test_tree4.add_edge('p_1', 's_0')
num_plans = calc_num_plans_of_parallel_node(test_tree4, 'p_1')
self.assertTrue(num_plans == 252)
from unittest import TestCase
import pickle
from src.validation.evaluation_calculation import sort_by_values_len
from src.validation.uniqueness_of_aog_pg_sequences import all_sequences_unique
test_graph = {1: ([1, 2, 3, 4, 5, 6], [1, 2, 4, 5, 6], [3]),
2: ([1, 2, 4, 5, 6], [1, 2, 5, 6], [4]),
3: ([1, 2, 4, 5, 6], [1, 4, 5], [2, 6]),
4: ([1, 2, 5, 6], [1, 5], [2, 6]),
5: ([1, 4, 5], [1, 5], [4]),
6: ([1, 4, 5], [1, 4], [5]),
7: ([1, 4], [1], [4]),
8: ([1, 5], [1], [5]),
9: ([2, 6], [2], [6])}
test_graph_2 = {
1: ([1,2,3,4,5], [1,2], [3,4,5]),
2: ([1,2,3,4,5], [1,2,3,4], [5]),
3: ([1,2,3,4], [1,2,3], [4]),
4: ([1,2,3], [1], [2,3]),
5: ([1,2,3], [1,2], [3]),
6: ([1,2], [1], [2]),
7: ([2,3], [2], [3]),
8: ([3,4,5], [3,4], [5]),
9: ([3,4], [3], [4])
}
test_graph_3 = {
1: ([1,2,3,4,5], [1,2,3,5], [4]),
2: ([1,2,3,4,5], [1,2,3,4], [5]),
3: ([1,2,3,5], [1,2,3], [5]),
4: ([1,2,3,4], [1,2,3], [4]),
5: ([1,2,3], [1], [2,3]),
6: ([1,2,3], [2], [1,3]),
7: ([2,3], [2], [3]),
8: ([1,3], [2], [3])
}
test_graph_4 = {
1: ([1,2,3,4,5,6], [1,2,3,5,6], [4]),
2: ([1,2,3,4,5,6], [1,2,3,4], [5,6]),
3: ([1,2,3,5,6], [1,2,3,5], [5,6]),
4: ([1,2,3,4], [1,2,3], [4]),
5: ([1,2,3,5], [1,2,3], [5]),
6: ([1,2,3], [1], [2,3]),
7: ([1,2,3], [2], [1,3]),
8: ([2,3], [2], [3]),
9: ([1,3], [2], [3]),
10: ([5,6], [5], [6])
}
test_graph_5 = {1: ([1, 2, 3, 4, 5, 6, 7, 8], [1, 2, 4, 5, 6, 7, 8], [3]),
2: ([1, 2, 4, 5, 6, 7, 8], [1, 2], [4, 5, 6, 7, 8]),
3: ([1, 2], [1], [2]),
4: ([4, 5, 6, 7, 8], [4], [5, 6, 7, 8]),
5: ([4, 5, 6, 7, 8], [4, 7, 8], [5, 6]),
6: ([4, 7, 8], [4], [7, 8]),
7: ([4, 7, 8], [4, 7], [8]),
8: ([5, 6, 7, 8], [5, 6], [7, 8]),
9: ([5, 6], [5], [6]),
10: ([4, 7], [4], [7]),
11: ([7, 8], [7], [8])}
class TestUniqueness(TestCase):
def test_all_sequences_unique_case1(self):
all_unique = all_sequences_unique(test_graph)
self.assertTrue(all_unique)
def test_all_sequences_unique_case2(self):
all_unique = all_sequences_unique(test_graph_2)
self.assertTrue(all_unique)
def test_all_sequences_unique_case3(self):
all_unique = all_sequences_unique(test_graph_3, max_iter=500)
self.assertTrue(all_unique)
def test_all_sequences_unique_case4(self):
all_unique = all_sequences_unique(test_graph_4, max_iter=500)
self.assertTrue(all_unique)
def test_all_sequences_unique_case5(self):
all_unique = all_sequences_unique(test_graph_5, max_iter=500)
self.assertTrue(all_unique)
def test_all_sequences_unique_centrifugal_pump(self):
product_name = 'centrifugal_pump'
pickle_path = '../data/and_or_pickles/' + product_name + '.pickle'
with open(pickle_path, 'rb') as input_file:
G = pickle.load(input_file)
G_sorted = sort_by_values_len(G)
all_unique = all_sequences_unique(G_sorted, max_iter=500)
self.assertTrue(all_unique)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment