Something went wrong on our end
Select Git revision
histogram.cpp
-
Stauder, Lucas authoredStauder, Lucas authored
main.py 40.86 KiB
from datetime import datetime
import pandas as pd
from my_flask_app import app
from .models.models import CustomTable, CustomColumn, Theme, CompressedDataType, Observation_Spec, RegType, RegRole
from flask_sqlalchemy import SQLAlchemy
from flask import jsonify, redirect, render_template, request, session, url_for, json
from sqlalchemy import ARRAY, BIGINT, BOOLEAN, DOUBLE_PRECISION, FLOAT, INT, INTEGER, JSON, NUMERIC, SMALLINT, TIMESTAMP, UUID, VARCHAR, MetaData, String, create_engine, text, inspect
import pydot, base64, os, logging
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.dialects.postgresql.base import ischema_names
from sqlalchemy.dialects.postgresql import JSONB, TSTZRANGE, INTERVAL, BYTEA, JSON, UUID, DOUBLE_PRECISION, BYTEA, ARRAY, REAL, TSTZRANGE, UUID, BYTEA, JSONB, JSON, ARRAY, FLOAT, INTEGER, TIMESTAMP, TEXT, BOOLEAN, VARCHAR, NUMERIC, REAL
from sqlalchemy.dialects.sqlite import JSON, FLOAT, INTEGER, TIMESTAMP, TEXT, BOOLEAN, VARCHAR, NUMERIC, REAL
from bs4 import BeautifulSoup
# Set up database (call db.engine)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql://postgres:password@localhost:5432/test"
db = SQLAlchemy()
# db.init_app(app)
app.secret_key = 'my_secret_key' # Needed for session management
# dropped_items = []
# Register the custom type with SQLAlchemy's PostgreSQL dialect
ischema_names['_timescaledb_internal.compressed_data'] = CompressedDataType
ischema_names['regtype'] = RegType
ischema_names['regrole'] = RegRole
@app.errorhandler(400)
def log_bad_request(error):
logging.error(f"Bad request: {request} {request.data} {request.args}")
# You can add more request details to log here
return "Bad request", 400
@app.route('/', methods=['POST', 'GET'])
def index():
try:
print("1")
if request.method == 'POST':
print("2")
database_uri = request.form.get('database_uri')
print(database_uri)
if database_uri != '' and database_uri != None:
print("3")
if database_uri != session.get('db_uri', ''):
session['db_uri'] = database_uri
session['target_table_names'] = []
session['target_table_name'] = ''
session['self_defined_labels'] = []
session['schema_selected'] = ''
session['show_all'] = False
session['object_name'] = {}
session['data_header']={'event':[], 'measurement':[], 'segment':[], 'segmentData':[]}
session['current_data_header'] = {'tablename': '', 'type': '', 'label': [], 'features_name': []}
print("4")
# Initialize inspector here, outside the inner if-else
print("4.5")
engine = create_engine(database_uri)
insp = inspect(engine)
session_factory = sessionmaker(bind=engine)
db.session = scoped_session(session_factory)
metadata_obj = MetaData()
print("5")
else:
database_uri = session.get('db_uri', '')
engine = create_engine(database_uri)
insp = inspect(engine)
session_factory = sessionmaker(bind=engine)
db.session = scoped_session(session_factory)
print("6")
database = database_name_from_uri(engine, database_uri) if database_uri != '' else ''
schemas = getSchema(insp) if session['db_uri'] != '' else []
themes = getThemes()
tables_selected = []
dropped_items = session.get('target_table_names', [])
self_labels = session.get('self_defined_labels', [])
schema_Selected = request.form.get('schema', None)
if schema_Selected != None:
session['schema_selected'] = schema_Selected
print("888" + schema_Selected)
show_all = request.form.get('show_all') == 'True'
if show_all != False:
session['show_all'] = show_all
tables1 = importMetadata(engine, schema_Selected, None, show_all)
graph_DOT1 = createGraph(tables1, themes["Blue Navy"], True, True, True)
image1 = generate_erd(graph_DOT1)
print("111")
for name in dropped_items:
print(name)
if dropped_items==[]:
image2 = ""
else:
tables2 = importMetadata(engine, None, dropped_items, False)
graph_DOT2 = createGraph(tables2, themes["Blue Navy"], True, True, True)
image2 = generate_erd(graph_DOT2)
print("222")
extract_ME_table(engine, 'event_data', 'E', 'time', ['col', 'machine_id'], [' col ', ' name ', ' Z_ACTUAL_ZERO_POINT '], ['value(fine, coarse)'], datetime(2022, 11, 1, 17, 5), datetime(2022, 11, 3, 0, 0))
return render_template('app.html', database=database, schemas=schemas, show_all=show_all, schema_Selected=schema_Selected, tables=tables1, image1=image1, image2=image2, dropped_items=dropped_items, self_labels=self_labels)
else:
# Display the form
return render_template('app.html')
except Exception as e:
return f"An error occurred: {str(e)}", 400
@app.route('/handle-drop', methods=['POST'])
def handle_drop():
data = request.json
# if data.get('reset') == True or data.get('reset') != None:
# print(data.get('reset'))
# session['target_table_names'] = []
# dropped_items = []
# graph_DOT2 = "digraph {};"
# image2 = generate_erd(graph_DOT2)
# database_uri = session.get('db_uri', '')
# engine = create_engine(database_uri)
# insp = inspect(engine)
# schema_Selected = session.get('schema', None)
# print("999"+ str(schema_Selected))
# show_all = session.get('show_all', False)
# tables_in_CustomTable = importMetadata(engine, schema_Selected, None, show_all)
# tables1 = list(tables_in_CustomTable.keys()) if tables_in_CustomTable != {} else []
# return jsonify(image2=image2, dropped_items=dropped_items, tables=tables1)
item_name = data.get('item')
action = data.get('action')
dropped_items = session.get('target_table_names', '')
print("444")
if action == 'added':
dropped_items.append(item_name)
elif action == 'removed' and item_name in dropped_items:
dropped_items.remove(item_name)
session['target_table_names'] = dropped_items
# Regenerate ERD based on the updated dropped_items
engine = create_engine(session['db_uri'])
themes = getThemes()
tables2 = importMetadata(engine, None, dropped_items, False)
graph_DOT2 = createGraph(tables2, themes["Blue Navy"], True, True, True)
image2 = generate_erd(graph_DOT2)
return jsonify(image2=image2)
@app.route('/get-table-data', methods=['POST'])
def get_table_data():
data = request.json
table_name = data.get('table_name')
if table_name == None:
return jsonify({ 'html_table': generate_html_table([]), 'table_columns': '' })
session['target_table_name'] = table_name
engine = create_engine(session['db_uri'])
content = query_database_for_table_content(engine, table_name)
# Convert content to HTML table format
html_table = generate_html_table(content)
# Get the table columns and values
table_instance = getTableInstance(engine, table_name)
table_columns = [column.name for column in table_instance.columns]
sorted_table_columns = sorted(table_columns)
# de-nested the JSON columns from the feature_columns
feature_columns = sorted_table_columns
if check_json_column(engine, table_name) != []:
json_column_names = check_json_column(engine, table_name)
for column_name in json_column_names:
feature_columns.remove(column_name)
jsonKeys = handle_json_column(engine, table_name, column_name) #[('line',), ('coarse', 'fine'), ('name',), ('percent',), ('axisID', 'coarse', 'fine')]
for key in jsonKeys:
feature_columns.append( column_name + str(key) ) if len(key) > 1 else feature_columns.append( column_name + str(key).replace(',', ''))
return jsonify({ 'html_table': html_table, 'table_columns': sorted_table_columns, 'feature_columns': feature_columns })
@app.route('/get-label-column', methods=['POST'])
def get_label_column():
data = request.json
label_column_name = data.get('label_column_name')
engine = create_engine(session['db_uri'])
table_name = session.get('target_table_name', '')
session['target_label_column'] = label_column_name if label_column_name != '' else ''
labels = showDistinctValues(engine, table_name, label_column_name) if label_column_name != '' else []
print("8")
print(labels)
# Add the self-defined labels from the session
target_labels = session.get('self_defined_labels', [])
print("9")
for label in target_labels:
print(label)
return jsonify({ 'table_labels': labels, 'defined_labels': target_labels })
@app.route('/add-self-defined-label', methods=['POST'])
def add_label():
data = request.json
self_defined_label = data.get('self_defined_label')
self_defined_labels = session.get('self_defined_labels', [])
if self_defined_label not in self_defined_labels:
self_defined_labels.append(self_defined_label)
session['self_defined_labels'] = self_defined_labels
print("777")
for label in self_defined_labels:
print(label)
return jsonify({'defined_labels': self_defined_labels})
@app.route('/add-data-header', methods=['POST'])
def add_data_header():
data_header = session.get('data_header', {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]})
data = request.json
type = data.get('type') if data.get('type') != None else ''
label_list = eval(data.get('label'))if data.get('label') != None else ['','','']
object_column = data.get('object_column') if data.get('object_column') != None else ''
value_list = data.get('value_list') if data.get('value_list') != None else []
current_table = session.get('target_table_name', '')
if type == 'event':
observation = Observation_Spec(current_table, 'E', label_list, value_list)
if observation.to_dict() not in data_header['event']:
data_header['event'].append(observation.to_dict())
elif type == 'measurement':
observation = Observation_Spec(current_table, 'M', label_list, value_list)
if observation.to_dict() not in data_header['measurement']:
data_header['measurement'].append(observation.to_dict())
elif type == 'segment':
observation = Observation_Spec(current_table, 'S', label_list, [])
if observation.to_dict() not in data_header['segment']:
data_header['segment'].append(observation.to_dict())
elif type == 'segmentData':
observation = Observation_Spec(current_table, 'SD', label_list, value_list)
if observation.to_dict() not in data_header['segmentData']:
data_header['segmentData'].append(observation.to_dict())
elif type == 'object':
obj = session.get('object_name', {})
obj[current_table] = object_column
session['object_name'] = obj
return jsonify()
print(observation.to_dict())
print("88888")
print(data_header)
session['data_header'] = data_header
data_header_table = generate_html_header_table()
return jsonify({'data_header': data_header, 'data_header_table': data_header_table})
@app.route('/reset-data-header-table', methods=['POST'])
def reset_data_header_table():
session['data_header'] = {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]}
data_header_table = generate_html_header_table()
return jsonify({'data_header_table': data_header_table})
@app.route('/init-data-header-table', methods=['POST'])
def init_data_header_table():
data_header_table = generate_html_header_table()
return jsonify({'data_header_table': data_header_table})
@app.route('/delete-data-header', methods=['POST'])
def delete_data_header():
data_header = session.get('data_header', {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]})
if data_header == {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]}:
data_header_table = generate_html_header_table()
return jsonify({'data_header_table': data_header_table})
data = request.json
str = data['value']
res = readHeaderHTML(str)
type = res['type']
if type == 'M':
print(data_header['measurement'])
data_header['measurement'].remove(res)
elif type == 'E':
data_header['event'].remove(res)
elif type == 'S':
data_header['segment'].remove(res)
elif type == 'SD':
data_header['segmentData'].remove(res)
session['data_header'] = data_header
data_header_table = generate_html_header_table()
return jsonify({'data_header_table': data_header_table})
@app.route('/get-MD-info', methods=['POST'])
def get_MD_info():
data = request.json
str = data['value']
soup = BeautifulSoup(str, 'html.parser')
tds = soup.find_all('td')
res = readHeaderHTML(str) # res = {'tablename': '', 'type': '', 'label': [], 'features_name': []}
type = res['type']
label_column = res['label'][0]
session['current_data_header'] = res
print(res)
if type == 'E' or type == 'M':
time = getTimeColumns(res['tablename']) # list
object = getObjectColumns(res['tablename']) # list
if res['label'][0] == 'col' and res['label'][1] in object:
object.remove(res['label'][1])
return jsonify({'time': time, 'object': object})
elif type == 'S':
time = getTimeColumns(res['tablename']) # list
object = getObjectColumns(res['tablename']) # list
# index =
return jsonify({'time': time, 'object': object})
elif type == 'SD':
time = getTimeColumns(res['tablename']) # list
object = getObjectColumns(res['tablename'])
# index =
return jsonify({'time': time, 'object': object})
@app.route('/get-ME-table', methods=['POST'])
def get_ME_table():
engine = create_engine(session.get('db_uri', ''))
data = request.json
current_data_header = session.get('current_data_header', {'tablename': '', 'type': '', 'label': [], 'features_name': []})
table_name = current_data_header['tablename']
type = current_data_header['type']
time_column = data['time_column']
object_column = data['object_column']
optgroupLabel = data['optgroupLabel']
object_list = [optgroupLabel, object_column]
label_list = current_data_header['label']
features = []
for feature in current_data_header['features_name']:
if '(' in feature and ')' in feature: # "value('coarse', 'fine')"
feature = feature.replace("'", "")
features.append(feature)
else:
features.append(feature)
start_time = datetime.strptime(data['minDatetime'], '%Y-%m-%d %H:%M:%S') if 'minDatetime' in data else None
end_time = datetime.strptime(data['maxDatetime'], '%Y-%m-%d %H:%M:%S') if 'maxDatetime' in data else None
print(table_name)
print(type)
print(time_column)
print(object_list)
print(label_list)
print(features)
query_result = extract_ME_table(engine, table_name, type, time_column, object_list, label_list, features, start_time, end_time)
table_HTML = get_ME_table_HTML(query_result)
if start_time == None and end_time == None:
min_datetime, max_datetime = get_min_max_datetime(engine, table_name, time_column)
return jsonify({'table_HTML': table_HTML, 'min_datetime': min_datetime, 'max_datetime': max_datetime})
return jsonify({'table_HTML': table_HTML})
def readHeaderHTML(str):
soup = BeautifulSoup(str, 'html.parser')
tds = soup.find_all('td')
res = {'tablename': '', 'type': '', 'label': [], 'features_name': []}
res['tablename'] = tds[1].get_text()
res['type'] = tds[2].get_text()
res['label'] = json.loads(tds[3]['data-value'])
features = []
for td in tds[4:]:
data_value = td['data-value']
value_list = json.loads(data_value)
if value_list[0] == '':
a = value_list[1]
features.append(a)
else:
a = value_list[0] + '('
for i in range(len(value_list)-1):
a += "'" + value_list[i+1] + "', "
a = a[:len(a)-2]
a += ')'
if a not in features:
features.append(a)
res['features_name'] = features # ['machine_id', "value('coarse', 'fine')"]
print(features)
return res
def check_json_column(engine, table_name) -> list:
insp = inspect(engine)
schema = getTableSchema(table_name) if insp.dialect.name == 'postgresql' else insp.default_schema_name
json_column_names = []
columnList = insp.get_columns(table_name, schema) # List of columns
for column in columnList: # column is a dictionary: {'name': 'machine_id', 'type': INTEGER(), 'nullable': False, 'default': None, 'autoincrement': False, 'comment': None}
column_type = column['type']
if type(column_type) == JSON or type(column_type) == JSONB:
json_column_names.append(column['name'])
print(json_column_names)
return json_column_names
def handle_json_column(engine, table_name, column_name):
insp = inspect(engine)
# Create a connection from the engine
with engine.connect() as conn:
# Prepare the SQL query to fetch a sample JSON object from the specified column
if insp.dialect.name == 'postgresql':
schema = getTableSchema(table_name)
query = f"SELECT DISTINCT {column_name} FROM {schema}.{table_name}"
else:
query = f"SELECT DISTINCT {column_name} FROM {table_name}"
# Execute the query and fetch the result
result = conn.execute(text(query)).fetchall()
# df = pd.read_sql_query(query, engine)
conn.close()
jsonKeys = []
for row in result: # row is a tuple, row[0] is a dictionary
# print(row[0])
name = tuple(sorted(row[0].keys()))if type(row[0]) == dict else json.loads(row[0]).keys()
if name not in jsonKeys:
jsonKeys.append(name)
print(jsonKeys)
return jsonKeys
def database_name_from_uri(engine, database_uri: str):
if engine.dialect.name == 'postgresql':
return engine.url.database
elif engine.dialect.name == 'sqlite':
return os.path.splitext(os.path.basename(database_uri.split('///')[-1]))[0]
else:
return 'Unknown'
def count_feature_columns(data_header) -> int:
max_count = 0
for key in data_header.keys():
for dict_item in data_header[key]:
count = 0
for value in dict_item.get('features_name', []):
num = tuple(value.split('(')[1].split(')')[0].replace("'","").split(', ')) if '(' in value and ')' in value else 1
count += len(num) if type(num) == tuple else 1
if count > max_count:
max_count = count
return max_count
def count_data_header(data_header):
count = 0
for key in data_header.keys():
count += len(data_header[key])
return count
def generate_html_header_table():
# session['data_header'] = {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]}
content = session.get('data_header', {'event':[], 'measurement':[], 'segment':[], 'segmentData':[]})
count_f_columns= count_feature_columns(content)
count_d_headers = count_data_header(content)
# Generate column headers
table_html = "<table class='uk-table uk-table-small uk-table-hover uk-table-divider uk-table-striped' style='cursor: pointer;'><thead><tr>"
table_html += "<th></th>"
table_html += "<th class='uk-table-expand'>Table name</th>"
table_html += "<th>Type</th>"
table_html += "<th class='uk-table-expand'>Label</th>"
for i in range(count_f_columns):
table_html += f"<th>F_{i+1}</th>"
table_html += "</tr></thead><tbody>"
# Generate table rows
index = 0
for key in content.keys():
for dict_item in content[key]:
table_html += f"<tr onclick='getRowData(this)'>"
table_html += "<td><button type='button' class='btn-close' aria-label='Close' data-bs-dismiss='modal' onclick='deleteRow1(event, this)'></button></td>"
table_html += f"<td>{dict_item.get('tablename', '')}</td>"
table_html += f"<td data-id>{dict_item.get('type', '')}</td>"
print("723723")
print(dict_item.get('label', ''))
label_value = json.dumps(dict_item.get('label', ''))
table_html += f"<td data-value='{label_value}'>{dict_item.get('label', '')[2]}</td>"
print("823823")
for value in dict_item.get('features_name', []):
if '(' in value and ')' in value:
feature_cloumn = value.split('(')[0]
print(feature_cloumn)
multiple_columns = tuple(value.split('(')[1].split(')')[0].replace("'","").split(', '))
print(multiple_columns)
for column in multiple_columns:
print("624624")
tmp = [feature_cloumn]
for col in multiple_columns:
tmp.append(col)
print(tmp)
data_value = json.dumps(tmp)
table_html += f"<td data-value='{data_value}'>{column}</td>"
else:
data_value = json.dumps(["", value])
table_html += f"<td data-value='{data_value}'>{value}</td>"
index += 1
table_html += "</tr>"
table_html += "</tbody></table>"
print(table_html)
return table_html
def generate_html_table(content):
if not content:
return "No data found."
# Generate column headers
columns = content[0]
table_html = "<table class='uk-table uk-table-small uk-table-hover uk-table-divider'><thead><tr>"
for col in columns:
table_html += f"<th>{col}</th>"
table_html += "</tr></thead><tbody>"
# Generate table rows
for i in range(1, len(content)):
table_html += "<tr>"
for item in content[i]:
table_html += f"<td>{item}</td>"
table_html += "</tr>"
table_html += "</tbody></table>"
return table_html
def getTimeColumns(table_name:str) -> list:
engine = create_engine(session.get('db_uri', ''))
insp = inspect(engine)
schema = getTableSchema(table_name) if insp.dialect.name == 'postgresql' else insp.default_schema_name
columns = insp.get_columns(table_name, schema)
timestamp_columns = [column['name'] for column in columns if str(column['type']) == 'TIMESTAMP']
print(timestamp_columns)
return timestamp_columns
def getObjectColumns(table_name:str) -> list:
engine = create_engine(session.get('db_uri', ''))
insp = inspect(engine)
schema = getTableSchema(table_name) if insp.dialect.name == 'postgresql' else insp.default_schema_name
columns = insp.get_columns(table_name, schema)
object_columns = [column['name'] for column in columns if str(column['type']) == 'VARCHAR' or str(column['type']) == 'INTEGER' or str(column['type']) == 'NUMERIC']
print(object_columns)
return object_columns
def query_database_for_table_content(engine, table_name, number=100):
# Initialize content list
content_list = []
# Create a connection from the engine
conn = engine.connect()
insp = inspect(engine)
# Get the schema of the table
if engine.dialect.name == 'postgresql':
schema = getTableSchema(table_name)
# Query the database to get the content of the table
sql_content = f"SELECT * FROM {schema}.{table_name} LIMIT :number"
else:
schema = insp.default_schema_name
sql_content = f"SELECT * FROM {table_name} LIMIT :number"
result = conn.execute(text(sql_content), {'table_name': table_name, 'number': number}).fetchall()
conn.close() # Close the connection
if not result:
return []
# Get the column names using the inspector
columns = [col['name'] for col in insp.get_columns(table_name, schema)]
content_list.append(columns)
# Append rows to content list
for row in result:
content_list.append(list(row))
return content_list
# Only postgresql needs this function (database_type = 'postgresql')
def getTableSchema(table_name):
sql= text(f"""
SELECT table_schema
FROM information_schema.tables
WHERE table_name = :table_name;
""")
schema = db.session.execute(sql, {'table_name': table_name}).fetchone()[0]
return schema
def getSchema(insp):
# sql = text("""SELECT schema_name FROM information_schema.schemata;""")
# result = db.session.execute(sql)
# schemas = [row[0] for row in result]
schemas = insp.get_schema_names()
return schemas
def getTableInstance(engine, table_name):
insp = inspect(engine)
table = importMetadata(engine, None, [table_name], True)[table_name]
return table
def showDistinctValues(engine, table_name, column_name):
insp = inspect(engine)
conn = engine.connect()
if insp.dialect.name == 'postgresql':
schema = getTableSchema(table_name)
sql = f"SELECT DISTINCT {column_name} FROM {schema}.{table_name}"
else:
sql = f"SELECT DISTINCT {column_name} FROM {table_name}"
result = conn.execute(text(sql))
names = [row[0] for row in result.fetchall()] if result != None else []
conn.close()
return names
def get_min_max_datetime(engine, table_name, time_column, start_time=None, end_time=None):
schema = getTableSchema(table_name) if engine.dialect.name == 'postgresql' else engine.dialect.default_schema_name
# Formulate the SQL query using the text function
query = text(f"SELECT MIN({time_column}) AS start_datetime, MAX({time_column}) AS end_datetime FROM {schema}.{table_name};")
# Execute the query
with engine.connect() as connection:
row = connection.execute(query).mappings().fetchone()
# Extract the min and max datetime values
if row:
min_datetime, max_datetime = row['start_datetime'], row['end_datetime']
print("Minimum datetime:", min_datetime)
print("Maximum datetime:", max_datetime)
return min_datetime, max_datetime
else:
print("No datetimes found.")
return None, None
def extract_ME_table(engine, table_name: str, type: str, time_column: str, object: list, label: list, features_name: list, start_time: datetime = None, end_time: datetime = None) -> list:
conn = engine.connect()
table_instance = getTableInstance(engine, table_name)
label_column = label[1].strip()
label_value = label[2].strip()
object_column_value = object[1].strip()
join_clause = ''
full_table_name = f"{table_instance.schema}.{table_instance.name}" if table_instance.schema else table_instance.name
sql_columns = [f"{full_table_name}.{time_column}"]
# Handling object_column logic
if object[0].strip() != 'self' and object[0].strip() == 'col':
print("1")
object_column = table_instance.getColumn(object_column_value)
object_column_name = object_column.fkof.table.name if object_column.fkof else ''
if object_column and object_column.fkof and object_column_name in session.get('object_name', {}):
related_table_instance = getTableInstance(engine, object_column_name)
full_related_table_name = f"{related_table_instance.schema}.{related_table_instance.name}" if related_table_instance.schema else related_table_instance.name
join_clause = f"LEFT JOIN {full_related_table_name} ON {full_table_name}.{object_column.name} = {full_related_table_name}.{object_column.fkof.name}"
sql_columns.append(f"{full_related_table_name}.{session.get('object_name').get(object_column_name)} AS {object_column_value}")
else:
sql_columns.append(f"{full_table_name}.{object_column.name}")
print("12")
# If label[0] is not 'self', add it to SQL columns
if label[0].strip() != 'self':
sql_columns.append(f"{full_table_name}.{label_column}")
print("123")
# Handling JSON extractions
json_extractions = []
for feature in features_name:
if '(' in feature and ')' in feature:
column_name, keys = feature[:-1].split('(')
keys = keys.split(', ')
for key in keys:
json_extraction = f"{full_table_name}.{column_name}->>'{key}' AS {key}"
json_extractions.append(json_extraction)
else:
sql_columns.append(f"{full_table_name}.{feature}")
print("1234")
# Adding JSON extractions to the select clause
sql_select = ', '.join(sql_columns + json_extractions)
# Constructing SQL query
sql_joins = join_clause
if label[0].strip() == 'col':
sql_where = f"WHERE {full_table_name}.{label_column} = :label_value"
if start_time:
sql_where += f" AND {full_table_name}.{time_column} >= :start_time"
if end_time:
sql_where += f" AND {full_table_name}.{time_column} <= :end_time"
else:
sql_where = ''
if start_time:
sql_where += f"WHERE {full_table_name}.{time_column} >= :start_time"
if end_time:
sql_where += f" AND {full_table_name}.{time_column} <= :end_time"
sql_query = f"SELECT {sql_select} FROM {full_table_name} {sql_joins} {sql_where} ORDER BY {time_column} ASC LIMIT 500"
print("12345")
# Executing the query
params = {'label_value': label_value}
if start_time:
params['start_time'] = start_time
if end_time:
params['end_time'] = end_time
# res = conn.execute(text(sql_query), params).fetchall()
# Print the query for debugging
print("SQL Query:", sql_query)
print("Parameters:", params)
# Executing the query
try:
res = conn.execute(text(sql_query), params).fetchall()
except Exception as e:
print(f"Error executing query: {e}")
return []
# Append object and label values if necessary
final_res = []
for row in res:
modified_row = list(row)
if object[0].strip() == 'self':
modified_row.insert(1, object_column_value)
if label[0].strip() == 'self':
label_index = 2 if object[0].strip() != 'self' else 1
modified_row.insert(label_index, label[2])
modified_row.insert(2, type)
final_res.append(modified_row)
for row in final_res:
print(row)
return final_res
def get_ME_table_HTML(data: list) -> str:
# Start the HTML <body> content with an opening <table> tag
html_content = ""
# Iterate over the data to populate the table rows
for row in data:
html_content += "<tr><td><input class='uk-checkbox' type='checkbox' aria-label='Checkbox'></td>"
for cell in row:
if isinstance(cell, datetime):
# cell = cell.isoformat()
cell = cell.strftime('%Y-%m-%d %H:%M:%S:%f')
html_content += f"<td>{cell}</td>"
html_content += "</tr>\n"
return html_content
def importMetadata(engine, schema=None, tables_selected=None, show_all=False):
tables = {}
if engine == None:
return tables
# Convert tables_selected to a list to ensure compatibility with SQL IN operation.
tables_selected_list = list(tables_selected) if tables_selected else None
# Fetch initial tables based on schema and table_names.
tables = fetch_initial_tables(engine, schema, tables_selected_list)
# If show_all is True, expand the list to include related tables.
if show_all:
tables = expand_to_include_related_tables(engine, tables)
# Fetch columns for each table.
tables = fetch_columns_for_tables(engine, tables)
# Fetch constraints (PK, FK, Unique) for each table.
tables = fetch_constraints_for_tables(engine, tables)
return tables
def fetch_initial_tables(engine, schema=None, tables_selected_list=None):
if isinstance(engine, str):
engine = create_engine(engine)
tables = {}
insp = inspect(engine)
database_type = engine.dialect.name
# Get all table names in the database (or specific schema for PostgreSQL)
all_tables = []
if schema!=None and database_type == 'postgresql':
all_tables = insp.get_table_names(schema=schema)
elif schema==None and database_type == 'postgresql':
for schema_of_schemas in insp.get_schema_names():
for table_name in insp.get_table_names(schema=schema_of_schemas):
all_tables.append(table_name)
else: # For SQLite
all_tables = insp.get_table_names()
# Filter tables if a specific list is provided
table_names = []
if tables_selected_list:
table_names = [table for table in all_tables if table in tables_selected_list]
else:
table_names = all_tables
for table_name in table_names:
# For PostgreSQL, use the provided schema, otherwise use the default schema
table_schema = getTableSchema(table_name) if database_type == 'postgresql' else insp.default_schema_name
table = CustomTable(table_name, table_schema)
tables[table_name] = table
table.label = f"n{len(tables)}"
return tables
def expand_to_include_related_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
# Create an inspector object
insp = inspect(engine)
database_type = engine.dialect.name
# This dictionary will temporarily store related tables to fetch later.
related_tables_to_fetch = {}
# Iterate over initially fetched tables to find foreign key relationships.
for tableName, table in tables.items():
# Fetch foreign key relationships for the current table using the inspector.
fks = insp.get_foreign_keys(tableName, schema=table.schema)
for fk in fks:
referenced_table_name = fk['referred_table']
# default schema is 'public' for postgresql
referenced_schema = insp.default_schema_name if fk['referred_schema']==None else fk['referred_schema']
if referenced_table_name not in tables and referenced_table_name not in related_tables_to_fetch:
related_tables_to_fetch[referenced_table_name] = referenced_schema
# Fetch and add related tables.
for tableName, tableSchema in related_tables_to_fetch.items():
# Create a CustomTable object for each related table.
table = CustomTable(tableName, tableSchema)
tables[tableName] = table
table.label = f"n{len(tables)}"
return tables
def fetch_columns_for_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
insp = inspect(engine)
for tableName, table in tables.items():
# Use the inspector to get column information for each table
columns = insp.get_columns(tableName, schema=table.schema)
for col in columns:
name = col['name']
datatype = col['type']
if isinstance(datatype, UUID):
datatype = "UUID"
elif isinstance(datatype, INTEGER):
datatype = "INTEGER"
elif isinstance(datatype, JSONB):
datatype = "JSONB"
elif isinstance(datatype, JSON):
datatype = "JSON"
elif isinstance(datatype, TIMESTAMP):
datatype = "TIMESTAMP"
elif isinstance(datatype, TSTZRANGE):
datatype = "TSTZRANGE"
elif isinstance(datatype, VARCHAR):
datatype = "VARCHAR"
elif isinstance(datatype, BOOLEAN):
datatype = "BOOLEAN"
elif isinstance(datatype, NUMERIC):
datatype = "NUMERIC"
elif isinstance(datatype, FLOAT):
datatype = "FLOAT"
elif isinstance(datatype, String):
datatype = "String"
elif isinstance(datatype, SMALLINT):
datatype = "SMALLINT"
elif isinstance(datatype, BIGINT):
datatype = "BIGINT"
elif isinstance(datatype, RegType):
datatype = "RegType"
elif isinstance(datatype, RegRole):
datatype = "RegRole"
elif isinstance(datatype, CompressedDataType):
datatype = "CompressedDataType"
elif isinstance(datatype, ARRAY):
datatype = "ARRAY"
elif isinstance(datatype, INTERVAL):
datatype = "INTERVAL"
elif isinstance(datatype, DOUBLE_PRECISION):
datatype = "DOUBLE_PRECISION"
elif isinstance(datatype, BYTEA):
datatype = "BYTEA"
nullable = col['nullable']
default = col['default']
# Create a CustomColumn object with the retrieved information
column = CustomColumn(table, name, '')
column.setDataType({
"type": datatype,
"nullable": nullable,
"default": default
})
# Append the column to the table's columns list
table.columns.append(column)
return tables
def fetch_constraints_for_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
insp = inspect(engine)
# Fetching Unique Constraints
for tableName, table in tables.items():
unique_constraints = insp.get_unique_constraints(tableName, schema=table.schema)
for uc in unique_constraints:
for column_name in uc['column_names']:
column = table.getColumn(column_name)
if column:
column.isunique = True
if uc['name'] not in table.uniques:
table.uniques[uc['name']] = []
table.uniques[uc['name']].append(column)
# Primary Keys
for tableName, table in tables.items():
pk_constraint = insp.get_pk_constraint(tableName, schema=table.schema)
for column_name in pk_constraint['constrained_columns']:
column = table.getColumn(column_name)
if column:
column.ispk = True
column.pkconstraint = pk_constraint['name']
# Foreign Keys
for tableName, table in tables.items():
fks = insp.get_foreign_keys(tableName, schema=table.schema)
for fk in fks:
fk_columns = fk['constrained_columns']
referred_table = fk['referred_table']
referred_columns = fk['referred_columns']
if referred_table in tables:
for (fk_column, ref_column) in zip(fk_columns, referred_columns):
fkColumn = table.getColumn(fk_column)
pkColumn = tables.get(referred_table).getColumn(ref_column)
if fkColumn and pkColumn:
fkColumn.fkof = pkColumn
if fk['name'] not in table.fks:
table.fks[fk['name']] = []
table.fks[fk['name']].append(fkColumn)
return tables
def createGraph(tables, theme, showColumns, showTypes, useUpperCase):
s = ('digraph {\n'
+ ' graph [ rankdir="LR" bgcolor="#ffffff" ]\n'
+ f' node [ style="filled" shape="{theme.shape}" gradientangle="180" ]\n'
+ ' edge [ arrowhead="none" arrowtail="none" dir="both" ]\n\n')
for name in tables:
s += tables[name].getDotShape(theme, showColumns, showTypes, useUpperCase)
s += "\n"
for name in tables:
s += tables[name].getDotLinks(theme)
s += "}\n"
return s
def generate_erd(graph_DOT):
graph_module = pydot.graph_from_dot_data(graph_DOT)
graph = graph_module[0]
png_image_data = graph.create_png()
encoded_image = base64.b64encode(png_image_data).decode('utf-8')
return encoded_image
def getThemes():
return {
"Common Gray": Theme("#6c6c6c", "#e0e0e0", "#f5f5f5",
"#e0e0e0", "#000000", "#000000", "rounded", "Mrecord", "#696969", "1"),
"Blue Navy": Theme("#1a5282", "#1a5282", "#ffffff",
"#1a5282", "#000000", "#ffffff", "rounded", "Mrecord", "#0078d7", "2"),
#"Gradient Green": Theme("#716f64", "#008080:#ffffff", "#008080:#ffffff",
# "transparent", "#000000", "#000000", "rounded", "Mrecord", "#696969", "1"),
#"Blue Sky": Theme("#716f64", "#d3dcef:#ffffff", "#d3dcef:#ffffff",
# "transparent", "#000000", "#000000", "rounded", "Mrecord", "#696969", "1"),
"Common Gray Box": Theme("#6c6c6c", "#e0e0e0", "#f5f5f5",
"#e0e0e0", "#000000", "#000000", "rounded", "record", "#696969", "1")
}
if __name__ == "__main__":
app.run(debug=True)