Skip to content
Snippets Groups Projects
Commit 4915e75b authored by Chieh Lin's avatar Chieh Lin
Browse files

multiple database

parent efc59f34
No related branches found
No related tags found
No related merge requests found
Pipeline #366576 failed
No preview for this file type
from my_flask_app import app from my_flask_app import app
from .models.models import Table, Column, Theme from .models.models import CustomTable, CustomColumn, Theme
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask import jsonify, render_template, request, session, send_file from flask import jsonify, redirect, render_template, request, url_for, session
from sqlalchemy import text from sqlalchemy import Inspector, MetaData, create_engine, text, inspect
import re, pydot, base64 import pydot, base64, os
import pydot from sqlalchemy.orm import scoped_session, sessionmaker
# Set up database # Set up database (call db.engine)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql://postgres:password@localhost:5432/test" # app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql://postgres:password@localhost:5432/test"
db = SQLAlchemy() db = SQLAlchemy()
db.init_app(app) # db.init_app(app)
app.secret_key = 'my_secret_key' # Needed for session management app.secret_key = 'my_secret_key' # Needed for session management
dropped_items = [] dropped_items = []
@app.route('/', methods=['POST', 'GET']) @app.route('/', methods=['POST', 'GET'])
def index(): def index():
if request.method == 'POST':
# Handle the form submission
database_uri = request.form['database_uri']
session['db_uri'] = database_uri
engine = create_engine(database_uri)
session_factory = sessionmaker(bind=engine)
db.session = scoped_session(session_factory)
insp = inspect(engine)
metadata_obj = MetaData()
# Determine the type and naem of database
database_type = engine.dialect.name # postgresql, sqlite
database = database_name_from_uri(engine, database_uri)
# Initialize variables # Initialize variables
database = db.engine.url.database
tables_selected = [] tables_selected = []
# tables_selected = ['machine_sensor', 'machine_tool', 'machine_trace'] schemas = getSchema(insp)
schemas = getSchema()
themes = getThemes() themes = getThemes()
schema_Selected = request.form.get('schema', None) schema_Selected = request.form.get('schema', None)
show_all = request.form.get('show_all') == 'True' show_all = request.form.get('show_all') == 'True'
tables1 = importMetadata(database, schema_Selected, tables_selected, show_all) tables1 = importMetadata(engine, schema_Selected, tables_selected, show_all)
graph_DOT1 = createGraph(tables1, themes["Blue Navy"], True, True, True) # graph_DOT1 = createGraph(tables1, themes["Blue Navy"], True, True, True)
image1 = generate_erd(graph_DOT1) # image1 = generate_erd(graph_DOT1)
if dropped_items==[]: if dropped_items==[]:
image2 = "" image2 = ""
else: # else:
tables2 = importMetadata(database, None, dropped_items, False) # tables2 = importMetadata(engine, None, dropped_items, False)
graph_DOT2 = createGraph(tables2, themes["Blue Navy"], True, True, True) # graph_DOT2 = createGraph(tables2, themes["Blue Navy"], True, True, True)
image2 = generate_erd(graph_DOT2) # image2 = generate_erd(graph_DOT2)
# return render_template('app.html', database=database, schemas=schemas, show_all=show_all, schema_Selected=schema_Selected, tables=tables1, image1=image1, image2=image2, dropped_items=dropped_items)
print(getTableSchema('event_data'))
return render_template('app.html', database=database, schemas=schemas, show_all=show_all, schema_Selected=schema_Selected, tables=tables1, image1=image1, image2=image2, dropped_items=dropped_items) # print(insp.get_foreign_keys('machine_sensor'))
# print(insp.get_unique_constraints('segmentation_data', schema='segmentation'))
return render_template('app.html', database=database, schemas=schemas, show_all=show_all, schema_Selected=schema_Selected, tables=tables1, dropped_items=dropped_items)
else:
# Display the form
return render_template('app.html')
@app.route('/handle-drop', methods=['POST']) @app.route('/handle-drop', methods=['POST'])
...@@ -67,8 +85,6 @@ def handle_drop(): ...@@ -67,8 +85,6 @@ def handle_drop():
return jsonify(image2=image2) return jsonify(image2=image2)
@app.route('/get-table-data', methods=['POST']) @app.route('/get-table-data', methods=['POST'])
def get_table_data(): def get_table_data():
data = request.json data = request.json
...@@ -83,6 +99,15 @@ def get_table_data(): ...@@ -83,6 +99,15 @@ def get_table_data():
return jsonify({'html_table': html_table}) return jsonify({'html_table': html_table})
def database_name_from_uri(engine, database_uri: str):
if engine.dialect.name == 'postgresql':
return engine.url.database
elif engine.dialect.name == 'sqlite':
return os.path.splitext(os.path.basename(database_uri.split('///')[-1]))[0]
else:
return 'Unknown'
def generate_html_table(content): def generate_html_table(content):
if not content: if not content:
return "No data found." return "No data found."
...@@ -104,7 +129,6 @@ def generate_html_table(content): ...@@ -104,7 +129,6 @@ def generate_html_table(content):
return table_html return table_html
def query_database_for_table_content(table_name, number=20): def query_database_for_table_content(table_name, number=20):
# Initialize content list # Initialize content list
content_list = [] content_list = []
...@@ -136,6 +160,7 @@ def query_database_for_table_content(table_name, number=20): ...@@ -136,6 +160,7 @@ def query_database_for_table_content(table_name, number=20):
return content_list return content_list
# Only postgresql needs this function (database_type = 'postgresql')
def getTableSchema(table_name): def getTableSchema(table_name):
sql= text(f""" sql= text(f"""
SELECT table_schema SELECT table_schema
...@@ -146,212 +171,254 @@ def getTableSchema(table_name): ...@@ -146,212 +171,254 @@ def getTableSchema(table_name):
return schema return schema
def getSchema(insp):
def getSchema(): # sql = text("""SELECT schema_name FROM information_schema.schemata;""")
sql = text("""SELECT schema_name FROM information_schema.schemata;""") # result = db.session.execute(sql)
result = db.session.execute(sql) # schemas = [row[0] for row in result]
schemas = [row[0] for row in result] schemas = insp.get_schema_names()
return schemas return schemas
def importMetadata(database, schema=None, tables_selected=None, show_all=False): def importMetadata(engine, schema=None, tables_selected=None, show_all=False):
tables = {} tables = {}
if database == '': if engine == None:
return tables return tables
# Convert tables_selected to a list to ensure compatibility with SQL IN operation. # Convert tables_selected to a list to ensure compatibility with SQL IN operation.
tables_selected_list = list(tables_selected) if tables_selected else None tables_selected_list = list(tables_selected) if tables_selected else None
# Fetch initial tables based on schema and table_names. # Fetch initial tables based on schema and table_names.
tables = fetch_initial_tables(schema, tables_selected_list) tables = fetch_initial_tables(engine, schema, tables_selected_list)
# If show_all is True, expand the list to include related tables. # If show_all is True, expand the list to include related tables.
if show_all: if show_all:
expand_to_include_related_tables(tables) expand_to_include_related_tables(engine, tables)
# Fetch columns for each table. # Fetch columns for each table.
fetch_columns_for_tables(tables) fetch_columns_for_tables(engine, tables)
# Fetch constraints (PK, FK, Unique) for each table. # Fetch constraints (PK, FK, Unique) for each table.
fetch_constraints_for_tables(tables) fetch_constraints_for_tables(engine, tables)
return tables return tables
def fetch_initial_tables(schema, tables_selected_list): def fetch_initial_tables(engine, schema=None, tables_selected_list=None):
if isinstance(engine, str):
engine = create_engine(engine)
tables = {} tables = {}
insp = inspect(engine)
# Construct WHERE clauses based on input parameters. database_type = engine.dialect.name
schema_condition = "AND table_schema = :schema" if schema else ""
tables_selected_condition = "AND table_name = ANY(:tables_selected)" if tables_selected_list else "" # Get all table names in the database (or specific schema for PostgreSQL)
all_tables = []
# Fetching tables with dynamic schema and table name filtering. if schema!=None and database_type == 'postgresql':
sql_tables = text(f""" all_tables = insp.get_table_names(schema=schema)
SELECT table_name, table_schema elif schema==None and database_type == 'postgresql':
FROM information_schema.tables for schema_of_schemas in insp.get_schema_names():
WHERE table_type = 'BASE TABLE' for table_name in insp.get_table_names(schema=schema_of_schemas):
{schema_condition} {tables_selected_condition}; all_tables.append(table_name)
""") else: # For SQLite
all_tables = insp.get_table_names()
# Adjust parameters based on the conditions.
params = {} # Filter tables if a specific list is provided
if schema:
params['schema'] = schema
if tables_selected_list: if tables_selected_list:
params['tables_selected'] = tables_selected_list table_names = [table for table in all_tables if table in tables_selected_list]
else:
result = db.session.execute(sql_tables, params) table_names = all_tables
for row in result: for table_name in table_names:
tableName, tableSchema = row # For PostgreSQL, use the provided schema, otherwise use the default schema
table = Table(tableName, tableSchema) # adding schema to the table comment table_schema = getTableSchema(table_name) if database_type == 'postgresql' else insp.default_schema_name
tables[tableName] = table table = CustomTable(table_name, table_schema)
tables[table_name] = table
table.label = f"n{len(tables)}" table.label = f"n{len(tables)}"
return tables return tables
def expand_to_include_related_tables(tables): def expand_to_include_related_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
# Create an inspector object
insp = inspect(engine)
# This dictionary will temporarily store related tables to fetch later. # This dictionary will temporarily store related tables to fetch later.
related_tables_to_fetch = {} related_tables_to_fetch = {}
# Iterate over initially fetched tables to find foreign key relationships. # Iterate over initially fetched tables to find foreign key relationships.
for tableName, table in tables.items(): for tableName, table in tables.items():
# Fetch foreign key relationships for the current table. # Fetch foreign key relationships for the current table using the inspector.
sql_fk = text(""" fks = insp.get_foreign_keys(tableName, schema=table.schema)
SELECT for fk in fks:
ccu.table_name AS pk_table_name, referenced_table_name = fk['referred_table']
ccu.table_schema AS pk_table_schema referenced_schema = fk['referred_schema']
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema AND tc.table_name = kcu.table_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE
tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = :table_name
""")
fk_result = db.session.execute(sql_fk, {'table_name': tableName})
for referenced_table_name, referenced_schema in fk_result:
if referenced_table_name not in tables and referenced_table_name not in related_tables_to_fetch: if referenced_table_name not in tables and referenced_table_name not in related_tables_to_fetch:
related_tables_to_fetch[referenced_table_name] = referenced_schema related_tables_to_fetch[referenced_table_name] = referenced_schema
# Fetch and add related tables. # Fetch and add related tables.
for tableName, tableSchema in related_tables_to_fetch.items(): for tableName, tableSchema in related_tables_to_fetch.items():
# Assuming a function fetch_table_details(tableName, tableSchema) that fetches and returns # Create a CustomTable object for each related table.
# a Table object with columns and constraints populated. table = CustomTable(tableName, tableSchema)
table = Table(tableName, tableSchema)
tables[tableName] = table tables[tableName] = table
return tables return tables
def fetch_columns_for_tables(tables): def fetch_columns_for_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
insp = inspect(engine)
for tableName, table in tables.items(): for tableName, table in tables.items():
sql_columns = text(""" # Use the inspector to get column information for each table
SELECT column_name, data_type, is_nullable, column_default columns = insp.get_columns(tableName, schema=table.schema)
FROM information_schema.columns for col in columns:
WHERE table_name = :table_name; name = col['name']
""") datatype = col['type']
column_result = db.session.execute(sql_columns, {'table_name': tableName}) nullable = col['nullable']
default = col['default']
for col in column_result: # Create a CustomColumn object with the retrieved information
name, datatype, nullable, default = col column = CustomColumn(table, name, '')
column = Column(table, name, '')
column.setDataType({ column.setDataType({
"type": datatype, "type": str(datatype),
"nullable": nullable == 'YES', "nullable": nullable,
"default": default "default": default
}) })
# Append the column to the table's columns list
table.columns.append(column) table.columns.append(column)
return tables return tables
def fetch_constraints_for_tables(tables): def fetch_constraints_for_tables(engine, tables):
if isinstance(engine, str):
engine = create_engine(engine)
insp = inspect(engine)
# Fetching Unique Constraints # Fetching Unique Constraints
for tableName, table in tables.items(): for tableName, table in tables.items():
sql_unique = text(""" unique_constraints = insp.get_unique_constraints(tableName, schema=table.schema)
SELECT kcu.column_name, tc.constraint_name for uc in unique_constraints:
FROM information_schema.table_constraints AS tc for column_name in uc['column_names']:
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name column = table.getColumn(column_name)
WHERE tc.table_name = :table_name AND tc.constraint_type = 'UNIQUE';
""")
unique_result = db.session.execute(sql_unique, {'table_name': tableName})
for col in unique_result:
name, constraintName = col
column = table.getColumn(name)
if column: if column:
column.isunique = True column.isunique = True
if constraintName not in table.uniques: if uc['name'] not in table.uniques:
table.uniques[constraintName] = [] table.uniques[uc['name']] = []
table.uniques[constraintName].append(column) table.uniques[uc['name']].append(column)
# Primary Keys # Primary Keys
for tableName, table in tables.items(): for tableName, table in tables.items():
sql_pk = text(""" pk_constraint = insp.get_pk_constraint(tableName, schema=table.schema)
SELECT kcu.column_name, tc.constraint_name, kcu.ordinal_position for column_name in pk_constraint['constrained_columns']:
FROM information_schema.table_constraints AS tc column = table.getColumn(column_name)
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_name = :table_name AND tc.constraint_type = 'PRIMARY KEY';
""")
pk_result = db.session.execute(sql_pk, {'table_name': tableName})
for col in pk_result:
name, constraintName, ordinal_position = col
column = table.getColumn(name)
if column: if column:
column.ispk = True column.ispk = True
column.pkconstraint = constraintName column.pkconstraint = pk_constraint['name']
# Assuming you want to order PKs, though not directly used in provided class
# Fetching Foreign Keys for each table # Foreign Keys
for tableName, table in tables.items(): for tableName, table in tables.items():
sql_fk = text(""" fks = insp.get_foreign_keys(tableName, schema=table.schema)
SELECT for fk in fks:
tc.constraint_name, fk_columns = fk['constrained_columns']
tc.table_name AS fk_table_name, referred_table = fk['referred_table']
kcu.column_name AS fk_column_name, referred_columns = fk['referred_columns']
ccu.table_name AS pk_table_name, for fk_column, ref_column in zip(fk_columns, referred_columns):
ccu.column_name AS pk_column_name, column = table.getColumn(fk_column)
ccu.table_schema AS pk_table_schema if column:
FROM column.fkof = f"{referred_table}.{ref_column}"
information_schema.table_constraints AS tc if fk['name'] not in table.fks:
JOIN information_schema.key_column_usage AS kcu table.fks[fk['name']] = []
ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema AND tc.table_name = kcu.table_name table.fks[fk['name']].append(column)
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE
tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = :table_name
""")
fk_result = db.session.execute(sql_fk, {'table_name': tableName})
for row in fk_result:
constraintName, fkTableName, fkColumnName, pkTableName, pkColumnName, pkTableSchema = row
# Ensure the foreign key table is the current table being processed
if fkTableName != tableName:
continue
fkTable = tables.get(fkTableName) return tables
pkTable = tables.get(pkTableName)
if fkTable and pkTable:
fkColumn = fkTable.getColumn(fkColumnName)
pkColumn = pkTable.getColumn(pkColumnName)
if fkColumn and pkColumn: # def fetch_constraints_for_tables(engine, tables):
# Here, instead of assigning pkColumn directly, store relevant info # # Fetching Unique Constraints
fkColumn.fkof = pkColumn # Adjust based on your application's needs # for tableName, table in tables.items():
if constraintName not in fkTable.fks: # sql_unique = text("""
fkTable.fks[constraintName] = [] # SELECT kcu.column_name, tc.constraint_name
fkTable.fks[constraintName].append(fkColumn) # FROM information_schema.table_constraints AS tc
return tables # JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
# WHERE tc.table_name = :table_name AND tc.constraint_type = 'UNIQUE';
# """)
# unique_result = db.session.execute(sql_unique, {'table_name': tableName})
# for col in unique_result:
# name, constraintName = col
# column = table.getColumn(name)
# if column:
# column.isunique = True
# if constraintName not in table.uniques:
# table.uniques[constraintName] = []
# table.uniques[constraintName].append(column)
# # Primary Keys
# for tableName, table in tables.items():
# sql_pk = text("""
# SELECT kcu.column_name, tc.constraint_name, kcu.ordinal_position
# FROM information_schema.table_constraints AS tc
# JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
# WHERE tc.table_name = :table_name AND tc.constraint_type = 'PRIMARY KEY';
# """)
# pk_result = db.session.execute(sql_pk, {'table_name': tableName})
# for col in pk_result:
# name, constraintName, ordinal_position = col
# column = table.getColumn(name)
# if column:
# column.ispk = True
# column.pkconstraint = constraintName
# # Assuming you want to order PKs, though not directly used in provided class
# # Fetching Foreign Keys for each table
# for tableName, table in tables.items():
# sql_fk = text("""
# SELECT
# tc.constraint_name,
# tc.table_name AS fk_table_name,
# kcu.column_name AS fk_column_name,
# ccu.table_name AS pk_table_name,
# ccu.column_name AS pk_column_name,
# ccu.table_schema AS pk_table_schema
# FROM
# information_schema.table_constraints AS tc
# JOIN information_schema.key_column_usage AS kcu
# ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema AND tc.table_name = kcu.table_name
# JOIN information_schema.constraint_column_usage AS ccu
# ON ccu.constraint_name = tc.constraint_name
# WHERE
# tc.constraint_type = 'FOREIGN KEY'
# AND tc.table_name = :table_name
# """)
# fk_result = db.session.execute(sql_fk, {'table_name': tableName})
# for row in fk_result:
# constraintName, fkTableName, fkColumnName, pkTableName, pkColumnName, pkTableSchema = row
# # Ensure the foreign key table is the current table being processed
# if fkTableName != tableName:
# continue
# fkTable = tables.get(fkTableName)
# pkTable = tables.get(pkTableName)
# if fkTable and pkTable:
# fkColumn = fkTable.getColumn(fkColumnName)
# pkColumn = pkTable.getColumn(pkColumnName)
# if fkColumn and pkColumn:
# # Here, instead of assigning pkColumn directly, store relevant info
# fkColumn.fkof = pkColumn # Adjust based on your application's needs
# if constraintName not in fkTable.fks:
# fkTable.fks[constraintName] = []
# fkTable.fks[constraintName].append(fkColumn)
# return tables
def createGraph(tables, theme, showColumns, showTypes, useUpperCase): def createGraph(tables, theme, showColumns, showTypes, useUpperCase):
...@@ -393,139 +460,4 @@ def getThemes(): ...@@ -393,139 +460,4 @@ def getThemes():
if __name__ == "__main__": if __name__ == "__main__":
app.run(port=5001) app.run(debug=True)
\ No newline at end of file
class Theme:
def __init__(self, color, fillcolor, fillcolorC,
bgcolor, icolor, tcolor, style, shape, pencolor, penwidth):
self.color = color
self.fillcolor = fillcolor
self.fillcolorC = fillcolorC
self.bgcolor = bgcolor
self.icolor = icolor
self.tcolor = tcolor
self.style = style
self.shape = shape
self.pencolor = pencolor
self.penwidth = penwidth
class Table:
def __init__(self, name, comment):
self.name = name
self.comment = comment if comment is not None and comment != 'None' else ''
self.label = None
self.columns = [] # list of all columns
self.uniques = {} # dictionary with UNIQUE constraints, by name + list of columns
self.pks = [] # list of PK columns (if any)
self.fks = {} # dictionary with FK constraints, by name + list of FK columns
@classmethod
def getClassName(cls, name, useUpperCase, withQuotes=True):
if re.match("^[A-Z_0-9]*$", name) == None:
return f'"{name}"' if withQuotes else name
return name.upper() if useUpperCase else name.lower()
def getName(self, useUpperCase, withQuotes=True):
return Table.getClassName(self.name, useUpperCase, withQuotes)
def getColumn(self, name):
for column in self.columns:
if column.name == name:
return column
return None
def getDotShape(self, theme, showColumns, showTypes, useUpperCase):
fillcolor = theme.fillcolorC if showColumns else theme.fillcolor
colspan = "2" if showTypes else "1"
tableName = self.getName(useUpperCase, False)
s = (f' {self.label} [\n'
+ f' fillcolor="{fillcolor}" color="{theme.color}" penwidth="1"\n'
+ f' label=<<table style="{theme.style}" border="0" cellborder="0" cellspacing="0" cellpadding="1">\n'
+ f' <tr><td bgcolor="{theme.bgcolor}" align="center"'
+ f' colspan="{colspan}"><font color="{theme.tcolor}"><b>{tableName}</b></font></td></tr>\n')
if showColumns:
for column in self.columns:
name = column.getName(useUpperCase, False)
if column.ispk: name = f"<u>{name}</u>"
if column.fkof != None: name = f"<i>{name}</i>"
if column.nullable: name = f"{name}*"
if column.identity: name = f"{name} I"
if column.isunique: name = f"{name} U"
datatype = column.datatype
if useUpperCase: datatype = datatype.upper()
if showTypes:
s += (f' <tr><td align="left"><font color="{theme.icolor}">{name}&nbsp;</font></td>\n'
+ f' <td align="left"><font color="{theme.icolor}">{datatype}</font></td></tr>\n')
else:
s += f' <tr><td align="left"><font color="{theme.icolor}">{name}</font></td></tr>\n'
return s + ' </table>>\n ]\n'
def getDotLinks(self, theme):
s = ""
for constraint in self.fks:
fks = self.fks[constraint]
fk1 = fks[0]
dashed = "" if not fk1.nullable else ' style="dashed"'
arrow = "" if fk1.ispk and len(self.pks) == len(fk1.fkof.table.pks) else ' arrowtail="crow"'
s += (f' {self.label} -> {fk1.fkof.table.label}'
+ f' [ penwidth="{theme.penwidth}" color="{theme.pencolor}"{dashed}{arrow} ]\n')
return s
class Column:
def __init__(self, table, name, comment):
self.table = table
self.name = name
self.comment = comment if comment is not None and comment != 'None' else ''
self.nullable = True
self.datatype = None # with (length, or precision/scale)
self.identity = False
self.isunique = False
self.ispk = False
self.pkconstraint = None
self.fkof = None # points to the PK column on the other side
def getName(self, useUpperCase, withQuotes=True):
return Table.getClassName(self.name, useUpperCase, withQuotes)
def setDataType(self, datatype):
self.datatype = datatype["type"]
self.nullable = bool(datatype["nullable"])
if self.datatype == "FIXED":
self.datatype = "NUMBER"
elif "fixed" in datatype:
fixed = bool(datatype["fixed"])
if self.datatype == "TEXT":
self.datatype = "CHAR" if fixed else "VARCHAR"
if "length" in datatype:
self.datatype += f"({str(datatype['length'])})"
elif "scale" in datatype:
if int(datatype['precision']) == 0:
self.datatype += f"({str(datatype['scale'])})"
if self.datatype == "TIMESTAMP_NTZ(9)":
self.datatype = "TIMESTAMP"
elif "scale" in datatype and int(datatype['scale']) == 0:
self.datatype += f"({str(datatype['precision'])})"
if self.datatype == "NUMBER(38)":
self.datatype = "INT"
elif self.datatype.startswith("NUMBER("):
self.datatype = f"INT({str(datatype['precision'])})"
elif "scale" in datatype:
self.datatype += f"({str(datatype['precision'])},{str(datatype['scale'])})"
#if column.datatype.startswith("NUMBER("):
# column.datatype = f"FLOAT({str(datatype['precision'])},{str(datatype['scale'])})"
self.datatype = self.datatype.lower()
No preview for this file type
...@@ -15,10 +15,10 @@ class Theme: ...@@ -15,10 +15,10 @@ class Theme:
self.penwidth = penwidth self.penwidth = penwidth
class Table(): class CustomTable():
def __init__(self, name, comment): def __init__(self, name, schema):
self.name = name self.name = name
self.comment = comment if comment is not None and comment != 'None' else '' self.schema = schema if schema is not None and schema != 'None' else '' # table schema
self.label = None self.label = None
self.columns = [] # list of all columns self.columns = [] # list of all columns
...@@ -34,7 +34,7 @@ class Table(): ...@@ -34,7 +34,7 @@ class Table():
return name.upper() if useUpperCase else name.lower() return name.upper() if useUpperCase else name.lower()
def getName(self, useUpperCase, withQuotes=True): def getName(self, useUpperCase, withQuotes=True):
return Table.getClassName(self.name, useUpperCase, withQuotes) return CustomTable.getClassName(self.name, useUpperCase, withQuotes)
def getColumn(self, name): def getColumn(self, name):
for column in self.columns: for column in self.columns:
...@@ -85,7 +85,7 @@ class Table(): ...@@ -85,7 +85,7 @@ class Table():
return s return s
class Column: class CustomColumn:
def __init__(self, table, name, comment): def __init__(self, table, name, comment):
self.table = table self.table = table
self.name = name self.name = name
...@@ -101,7 +101,7 @@ class Column: ...@@ -101,7 +101,7 @@ class Column:
def getName(self, useUpperCase, withQuotes=True): def getName(self, useUpperCase, withQuotes=True):
return Table.getClassName(self.name, useUpperCase, withQuotes) return CustomTable.getClassName(self.name, useUpperCase, withQuotes)
def setDataType(self, datatype): def setDataType(self, datatype):
......
...@@ -384,6 +384,12 @@ ...@@ -384,6 +384,12 @@
<!-- Add more items as needed --> <!-- Add more items as needed -->
</ul> </ul>
<!-- set_database.html -->
<form method="post" action="/">
<input type="text" name="database_uri" placeholder="Enter Database URI">
<input type="submit" value="Set Database">
</form>
<div id="resize-handle-left" class="resize-handle-left"></div> <div id="resize-handle-left" class="resize-handle-left"></div>
</div> </div>
......
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Select Database</title>
<!-- Add some CSS for styling -->
</head>
<body>
<div>
<!-- <h3>Current Database: {{ current_db }}</h3> -->
<h3>Select a Database</h3>
<ul>
{% for db in databases %}
<li>{{ db }}</li>
{% endfor %}
</ul>
</div>
</body>
</html>
<!DOCTYPE html>
<html>
<head>
<title>Set Database</title>
</head>
<body>
<form method="POST" action="/set-database">
<label for="database_uri">Database URI:</label>
<input type="text" name="database_uri" id="database_uri">
<input type="submit" value="Set Database">
</form>
</body>
</html>
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment