Skip to content
Snippets Groups Projects
Commit c75acc01 authored by Simon Wolf's avatar Simon Wolf
Browse files

Added loadability functionality to import OECL from Cortado and pass own data...

Added loadability functionality to import OECL from Cortado and pass own data structure from backend to cortado. Furthermore adapted evaluation package of pm4py and integrated it into project to get rid of dependency mismatch between cortado and ocpa/ocsv.
parent ddea8c87
Branches
No related tags found
1 merge request!1Added loadability functionality to import OECL from Cortado and pass own data...
Showing
with 470 additions and 128 deletions
No preview for this file type
...@@ -12,6 +12,10 @@ from endpoints.load_event_log import calculate_event_log_properties ...@@ -12,6 +12,10 @@ from endpoints.load_event_log import calculate_event_log_properties
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from pydantic import BaseModel from pydantic import BaseModel
from backend_utilities import mine_log as ml
import tempfile
import os
router = APIRouter(tags=["importing"], prefix="/importing") router = APIRouter(tags=["importing"], prefix="/importing")
...@@ -39,6 +43,44 @@ class FilePathInput(BaseModel): ...@@ -39,6 +43,44 @@ class FilePathInput(BaseModel):
file_path: str file_path: str
@router.post("/loadOCELFromFile")
async def load_ocel_from_file(
file: UploadFile = File(...),
config_repo: ConfigurationRepository = Depends(get_config_repo),
):
cache.pcache = {}
# Save the uploaded file to a temporary location
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(await file.read())
tmp_file_path = tmp_file.name
content = "".join([line.decode("UTF-8") for line in file.file])
try:
print(f"FILE: {tmp_file_path}")
ocel = await ml.process_ocel(filename=tmp_file_path)
except FileNotFoundError as e:
raise HTTPException(
status_code=404, detail=f"Event log not found ({tmp_file_path})"
)
finally:
# Clean up the temporary file
os.remove(tmp_file_path)
#event_log = xes_importer.deserialize(content)
#use_mp = (
# len(event_log) > config_repo.get_configuration().min_traces_variant_detection_mp
#)
#info = calculate_event_log_properties(event_log, use_mp=use_mp)
print(f"OCEL: {ocel}")
print("\n\n\n")
return ocel
class FilePathInput(BaseModel):
file_path: str
@router.post("/loadEventLogFromFilePath") @router.post("/loadEventLogFromFilePath")
async def load_event_log_from_file_path( async def load_event_log_from_file_path(
d: FilePathInput, config_repo: ConfigurationRepository = Depends(get_config_repo) d: FilePathInput, config_repo: ConfigurationRepository = Depends(get_config_repo)
......
...@@ -2,15 +2,18 @@ from fastapi.routing import APIRouter ...@@ -2,15 +2,18 @@ from fastapi.routing import APIRouter
from typing import Callable from typing import Callable
from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect from starlette.websockets import WebSocket, WebSocketState, WebSocketDisconnect
from api.routes.conformance.variantConformance import ( from api.routes.conformance.variantConformance import (
calculate_alignment_intern_with_timeout, calculate_alignment_intern_with_timeout,
get_alignment_callback, get_alignment_callback,
) )
from api.routes.variants.subvariantMining import ( from api.routes.variants.subvariantMining import (
mine_repetition_patterns_with_timeout, mine_repetition_patterns_with_timeout,
RepetitionsMiningConfig, RepetitionsMiningConfig,
get_repetition_mining_callback, get_repetition_mining_callback,
) )
from backend_utilities.configuration.repository import ConfigurationRepositoryFactory from backend_utilities.configuration.repository import ConfigurationRepositoryFactory
from backend_utilities.multiprocessing.pool_factory import PoolFactory from backend_utilities.multiprocessing.pool_factory import PoolFactory
from cache import cache from cache import cache
......
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import uvicorn
from ocpa.objects.log.importer.ocel import factory as ocel_import_factory
from ocpa.visualization.log.variants import factory as variants_visualization_factory
from ocpa.algo.util.filtering.log import case_filtering
from ocpa.objects.log.exporter.ocel import factory as ocel_export_factory
import ocsv.Input_Extraction_Definition as IED # Seems like a helper class to define data structures and querying lanes
import ocsv.Super_Variant_Definition as SVD # Super Variant Definition, super lane definition
import ocsv.Super_Variant_Visualization as SVV # Visualization of super variants
import ocsv.Intra_Variant_Summarization as IAVS #
import ocsv.Summarization_Selection as SS
import ocsv.Intra_Variant_Generation as IAVG
import ocsv.Inter_Variant_Summarization as IEVS
import ocsv.Inter_Variant_Generation as IEVG
import ocsv.Super_Variant_Hierarchy as SVH
import time
import numpy as np
app = FastAPI()
class Parameters(BaseModel):
execution_extraction: Optional[str] = "leading_type"
leading_type: Optional[str] = "application"
max_levels: Optional[int] = 4
frequency_distribution_type: Optional[str] = "NORMAL"
@app.post("/process_ocel/")
#async def process_ocel(file: UploadFile = File(...), parameters: Parameters = None):
async def process_ocel(parameters: Parameters = Parameters()):
'''
# Save the uploaded file
file_location = f"/tmp/{file.filename}"
with open(file_location, "wb+") as file_object:
file_object.write(file.file.read())
'''
# Predefined filename
filename = "../ocsv/ocsv/EventLogs/BPI2017-Top10.jsonocel"
#parameters = {"execution_extraction": "leading_type",
#"leading_type": "application"}
# Load the OCEL file
ocel = ocel_import_factory.apply(file_path=filename, parameters=parameters)
# Step 1: Summarization Generation
all_summarizations, per_variant_dict, per_encoding_dict = IAVG.complete_intra_variant_summarization(ocel)
# Step 2: Summarization Matching
summarizations = SS.intra_variant_summarization_selection(all_summarizations, per_variant_dict, per_encoding_dict)
# Step 3: Inter-Variant Summarization
IEVG.NESTED_STRUCTURES = True
initial_set = [summarizations[i] for i in range(len(summarizations))]
hierarchies, final_super_variants = IEVG.generate_super_variant_hierarchy(
initial_set, parameters.max_levels, frequency_distribution_type=getattr(IEVG.Distribution, parameters.frequency_distribution_type)
)
# Extract super variants
values = []
for super_variant in final_super_variants[0]:
values.append("NEW SUPER VARIANT")
values.append(super_variant)
def extract_super_variants(nested_structure):
"""Recursively extract SuperVariant objects from a nested structure."""
super_variants = []
if isinstance(nested_structure, tuple):
for item in nested_structure:
super_variants.extend(extract_super_variants(item))
elif isinstance(nested_structure, list):
for item in nested_structure:
super_variants.extend(extract_super_variants(item))
elif isinstance(nested_structure, SVD.SuperVariant):
super_variants.append(nested_structure)
return super_variants
# Create a useful data structure out of the nested list of super variants and tuples containing super variants.
super_variants_dict = {}
current_super_variant = None
for line in values:
if line == "NEW SUPER VARIANT":
current_super_variant = None
else:
if current_super_variant is None:
current_super_variant = []
super_variants_dict[len(super_variants_dict)] = current_super_variant
current_super_variant.extend(extract_super_variants(line))
def extract_hierarchy_info(hierarchy, level=0, info=None):
"""Recursively extract information about the hierarchical structure."""
if info is None:
info = {"levels": {}, "super_variants": []}
if isinstance(hierarchy, dict):
for key, value in hierarchy.items():
if key not in info["levels"]:
info["levels"][key] = []
info["levels"][key].append(value)
extract_hierarchy_info(value, level + 1, info)
elif isinstance(hierarchy, list):
for item in hierarchy:
extract_hierarchy_info(item, level, info)
elif isinstance(hierarchy, tuple):
for item in hierarchy:
extract_hierarchy_info(item, level, info)
elif isinstance(hierarchy, SVD.SuperVariant):
info["super_variants"].append(hierarchy)
return info
# Extract hierarchy information
hierarchy_info_list = []
for hierarchy in hierarchies:
hierarchy_info = extract_hierarchy_info(hierarchy)
hierarchy_info_list.append(hierarchy_info)
return {
"super_variants_dict": super_variants_dict,
"hierarchy_info_list": hierarchy_info_list
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from evaluation import generalization, precision, replay_fitness, simplicity, evaluator, wf_net
import pkgutil
if pkgutil.find_loader("pyemd"):
# import the EMD only if the pyemd package is installed
from evaluation import earth_mover_distance
if pkgutil.find_loader("networkx") and pkgutil.find_loader("sympy"):
# import the Woflan package only if NetworkX and sympy are installed
from evaluation import soundness
File added
File added
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.evaluation.earth_mover_distance import evaluator, variants
File added
File added
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.evaluation.earth_mover_distance.variants import pyemd
from enum import Enum
from pm4py.util import exec_utils
import deprecation
from pm4py.meta import VERSION
import warnings
class Variants(Enum):
PYEMD = pyemd
DEFAULT_VARIANT = Variants.PYEMD
@deprecation.deprecated(deprecated_in="2.2.5", removed_in="3.0",
current_version=VERSION,
details="Use the pm4py.algo.evaluation.earth_mover_distance package")
def apply(lang1, lang2, variant=Variants.PYEMD, parameters=None):
"""
Gets the EMD language between the two languages
Parameters
-------------
lang1
First language
lang2
Second language
variant
Variants of the algorithm
parameters
Parameters
variants
Variants of the algorithm, including:
- Variants.PYEMD: pyemd based distance
Returns
-------------
dist
EMD distance
"""
warnings.warn("Use the pm4py.algo.evaluation.earth_mover_distance package")
return exec_utils.get_variant(variant).apply(lang1, lang2, parameters=parameters)
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.evaluation.earth_mover_distance.variants import pyemd
File added
File added
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.util.regex import SharedObj, get_new_char
from pm4py.util import string_distance
import numpy as np
from pyemd import emd
from pm4py.util import exec_utils
class Parameters:
STRING_DISTANCE = "string_distance"
def normalized_levensthein(s1, s2):
"""
Normalized Levensthein distance
Parameters
-------------
s1
First string
s2
Second string
Returns
--------------
dist
Distance
"""
return float(string_distance.levenshtein(s1, s2)) / float(max(len(s1), len(s2)))
def get_act_correspondence(activities, parameters=None):
"""
Gets an encoding for each activity
Parameters
--------------
activities
Activities of the two languages
parameters
Parameters
Returns
-------------
encoding
Encoding into hex characters
"""
if parameters is None:
parameters = {}
shared_obj = SharedObj()
ret = {}
for act in activities:
get_new_char(act, shared_obj)
ret[act] = shared_obj.mapping_dictio[act]
return ret
def encode_two_languages(lang1, lang2, parameters=None):
"""
Encode the two languages into hexadecimal strings
Parameters
--------------
lang1
Language 1
lang2
Language 2
parameters
Parameters of the algorithm
Returns
--------------
enc1
Encoding of the first language
enc2
Encoding of the second language
"""
if parameters is None:
parameters = {}
all_activities = sorted(list(set(y for x in lang1 for y in x).union(set(y for x in lang2 for y in x))))
acts_corresp = get_act_correspondence(all_activities, parameters=parameters)
enc1 = {}
enc2 = {}
for k in lang1:
new_key = "".join(acts_corresp[i] for i in k)
enc1[new_key] = lang1[k]
for k in lang2:
new_key = "".join(acts_corresp[i] for i in k)
enc2[new_key] = lang2[k]
# each language should have the same keys, even if not present
for x in enc1:
if x not in enc2:
enc2[x] = 0.0
for x in enc2:
if x not in enc1:
enc1[x] = 0.0
enc1 = [(x, y) for x, y in enc1.items()]
enc2 = [(x, y) for x, y in enc2.items()]
# sort the keys in a decreasing way
enc1 = sorted(enc1, reverse=True, key=lambda x: x[0])
enc2 = sorted(enc2, reverse=True, key=lambda x: x[0])
return enc1, enc2
def apply(lang1, lang2, parameters=None):
"""
Calculates the EMD distance between the two stochastic languages
Parameters
-------------
lang1
First language
lang2
Second language
parameters
Parameters of the algorithm, including:
- Parameters.STRING_DISTANCE: function that accepts two strings and returns a distance
Returns
---------------
emd_dist
EMD distance
"""
if parameters is None:
parameters = {}
distance_function = exec_utils.get_param_value(Parameters.STRING_DISTANCE, parameters, normalized_levensthein)
enc1, enc2 = encode_two_languages(lang1, lang2, parameters=parameters)
# transform everything into a numpy array
first_histogram = np.array([x[1] for x in enc1])
second_histogram = np.array([x[1] for x in enc2])
# including a distance matrix that includes the distance between
# the traces
distance_matrix = []
for x in enc1:
distance_matrix.append([])
for y in enc2:
# calculates the (normalized) distance between the strings
dist = distance_function(x[0], y[0])
distance_matrix[-1].append(float(dist))
distance_matrix = np.array(distance_matrix)
ret = emd(first_histogram, second_histogram, distance_matrix)
return ret
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py import util as pmutil
from pm4py.algo.conformance.tokenreplay.variants import token_replay
from evaluation.generalization.variants import token_based as generalization_token_based
from evaluation.precision.variants import etconformance_token as precision_token_based
from evaluation.replay_fitness.variants import token_replay as fitness_token_based
from evaluation.simplicity.variants import arc_degree as simplicity_arc_degree
from pm4py.objects import log as log_lib
from pm4py.objects.conversion.log import converter as log_conversion
from pm4py.util import xes_constants as xes_util
from pm4py.util import constants
from enum import Enum
from pm4py.util import exec_utils
import deprecation
from pm4py.meta import VERSION
import warnings
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
PARAM_FITNESS_WEIGHT = 'fitness_weight'
PARAM_PRECISION_WEIGHT = 'precision_weight'
PARAM_SIMPLICITY_WEIGHT = 'simplicity_weight'
PARAM_GENERALIZATION_WEIGHT = 'generalization_weight'
@deprecation.deprecated(deprecated_in="2.2.5", removed_in="3.0",
current_version=VERSION,
details="Use the pm4py.algo.evaluation.evaluator class")
def apply(log, net, initial_marking, final_marking, parameters=None):
"""
Calculates all metrics based on token-based replay and returns a unified dictionary
Parameters
-----------
log
Log
net
Petri net
initial_marking
Initial marking
final_marking
Final marking
parameters
Parameters
Returns
-----------
dictionary
Dictionary containing fitness, precision, generalization and simplicity; along with the average weight of
these metrics
"""
warnings.warn("Use the pm4py.algo.evaluation.evaluator class")
if parameters is None:
parameters = {}
log = log_conversion.apply(log, parameters, log_conversion.TO_EVENT_LOG)
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, log_lib.util.xes.DEFAULT_NAME_KEY)
fitness_weight = exec_utils.get_param_value(Parameters.PARAM_FITNESS_WEIGHT, parameters, 0.25)
precision_weight = exec_utils.get_param_value(Parameters.PARAM_PRECISION_WEIGHT, parameters, 0.25)
simplicity_weight = exec_utils.get_param_value(Parameters.PARAM_SIMPLICITY_WEIGHT, parameters, 0.25)
generalization_weight = exec_utils.get_param_value(Parameters.PARAM_GENERALIZATION_WEIGHT, parameters, 0.25)
sum_of_weights = (fitness_weight + precision_weight + simplicity_weight + generalization_weight)
fitness_weight = fitness_weight / sum_of_weights
precision_weight = precision_weight / sum_of_weights
simplicity_weight = simplicity_weight / sum_of_weights
generalization_weight = generalization_weight / sum_of_weights
parameters_tr = {token_replay.Parameters.ACTIVITY_KEY: activity_key}
aligned_traces = token_replay.apply(log, net, initial_marking, final_marking, parameters=parameters_tr)
parameters = {
token_replay.Parameters.ACTIVITY_KEY: activity_key
}
fitness = fitness_token_based.evaluate(aligned_traces)
precision = precision_token_based.apply(log, net, initial_marking, final_marking, parameters=parameters)
generalization = generalization_token_based.get_generalization(net, aligned_traces)
simplicity = simplicity_arc_degree.apply(net)
metrics_average_weight = fitness_weight * fitness["log_fitness"] + precision_weight * precision \
+ generalization_weight * generalization + simplicity_weight * simplicity
fscore = 0.0
if (fitness['log_fitness'] + precision) > 0:
fscore = (2*fitness['log_fitness']*precision)/(fitness['log_fitness']+precision)
dictionary = {
"fitness": fitness,
"precision": precision,
"generalization": generalization,
"simplicity": simplicity,
"metricsAverageWeight": metrics_average_weight,
"fscore": fscore
}
return dictionary
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
from evaluation.generalization import evaluator, variants
File added
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment