Skip to content
Snippets Groups Projects
Commit cdee0a42 authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

sql representation rework

parent f21b8087
No related branches found
No related tags found
No related merge requests found
from collections.abc import Iterable
from typing import TypeVar, TypedDict, Callable, Generic, Literal, overload
from typing import TypeVar, TypedDict, Callable, Generic
from . import MITMDefinition
from .registry import get_mitm_def
from .definition_representation import COLUMN_GROUPS, ConceptKind, ConceptLevel, ConceptName, MITM
from mitm_tooling.utilities.python_utils import elem_wise_eq
from .definition_representation import ConceptName
T = TypeVar('T')
def dummy(): return None
def _dummy(): return None
def dummy_list(): return []
def _dummy_list(): return []
Mapper = Callable[[], T | tuple[str, T]]
......@@ -29,6 +27,8 @@ class ColGroupMaps(TypedDict, Generic[T], total=False):
def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_maps: ColGroupMaps[T],
prepended_cols: MultiMapper | None = None,
appended_cols: MultiMapper | None = None,
ensure_unique: bool = True) -> \
tuple[
list[T], dict[str, T]]:
......@@ -48,19 +48,24 @@ def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_map
created_results[name] = result
results.append(result)
if prepended_cols:
add_results(prepended_cols())
for column_group in concept_properties.column_group_ordering:
match column_group:
case 'kind' if concept_properties.is_abstract or concept_properties.is_sub:
add_results([col_group_maps.get('kind', dummy)()])
add_results([col_group_maps.get('kind', _dummy)()])
case 'type':
add_results([col_group_maps.get('type', dummy)()])
add_results([col_group_maps.get('type', _dummy)()])
case 'identity-relations':
add_results(col_group_maps.get('identity', dummy_list)())
add_results(col_group_maps.get('identity', _dummy_list)())
case 'inline-relations':
add_results(col_group_maps.get('inline', dummy_list)())
add_results(col_group_maps.get('inline', _dummy_list)())
case 'foreign-relations':
add_results(col_group_maps.get('foreign', dummy_list)())
add_results(col_group_maps.get('foreign', _dummy_list)())
case 'attributes' if concept_properties.permit_attributes:
add_results(col_group_maps.get('attributes', dummy_list)())
add_results(col_group_maps.get('attributes', _dummy_list)())
if appended_cols:
add_results(appended_cols())
return results, created_results
......@@ -7,14 +7,13 @@ from pydantic import Field
from sqlalchemy import Table, MetaData
from mitm_tooling.data_types import MITMDataType, SA_SQLTypeName, sa_sql_to_mitm_type
from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier
from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier, Queryable
from mitm_tooling.utilities.sql_utils import unqualify
from .table_identifiers import LocalTableIdentifier, \
AnyLocalTableIdentifier
ExplicitTableSelection = dict[SchemaName, set[TableName]]
ExplicitColumnSelection = dict[SchemaName, dict[TableName, set[ColumnName]]]
Queryable = sa.FromClause
class ExplicitSelectionUtils:
......
import sqlalchemy as sa
TableName = str
SchemaName = str
ShortTableIdentifier = tuple[SchemaName, TableName]
QualifiedTableName = str
Queryable = sa.FromClause
\ No newline at end of file
from __future__ import annotations
from collections.abc import Callable, Iterable
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator, Any, Sequence
import pandas as pd
import pydantic
import sqlalchemy as sa
import sqlalchemy.sql.schema
from collections.abc import Callable, Mapping
from contextlib import contextmanager
from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRelations, RelationName
from mitm_tooling.definition.definition_tools import ColGroupMaps
from mitm_tooling.utilities import python_utils
from mitm_tooling.utilities.io_utils import FilePath
from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify
from pydantic import AnyUrl, ConfigDict
from sqlalchemy import Connection, NestedTransaction
from sqlalchemy import func
from sqlalchemy.pool import StaticPool
from sqlalchemy_utils.view import create_view
from typing import TYPE_CHECKING, Generator
from mitm_tooling.definition import RelationName
from mitm_tooling.definition.definition_tools import ColGroupMaps
from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify
from . import Header, MITMData
from .common import *
from .intermediate_representation import Header, MITMData
from .sql.common import *
from ..utilities.io_utils import FilePath
if TYPE_CHECKING:
from mitm_tooling.extraction.sql.data_models import Queryable
pass
SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main'
ConceptTablesDict = dict[ConceptName, sa.Table]
ColumnsDict = dict[RelationName, sa.Column]
ViewsDict = dict[TableName, sa.Table]
ConceptTablesDict = dict[ConceptName, sa.Table]
ConceptTypeTablesDict = dict[ConceptName, dict[str, sa.Table]]
MitMConceptSchemaItemGenerator = Callable[
[MITM, ConceptName, ColumnsDict, ColumnsDict | None], Generator[sqlalchemy.sql.schema.SchemaItem, None, None]]
MitMConceptColumnGenerator = Callable[
[MITM, ConceptName], Generator[tuple[str, sa.Column], None, None]]
MitMDBViewsGenerator = Callable[[MITM, ConceptTablesDict, ConceptTypeTablesDict],
Generator[
tuple[
TableName, Queryable], None, None]]
ARTIFICIAL_ROW_ID_PREFIX = 'row'
def _get_unique_id_col_name(prefix: str | None = None) -> str:
return '__' + ((prefix + '_') if prefix else '') + 'id'
def _within_concept_id_col(mitm: MITM, concept: ConceptName) -> str:
parent_concept = get_mitm_def(mitm).get_parent(concept)
return _get_unique_id_col_name(parent_concept)
class SQLRepresentationSchema(pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
......@@ -55,76 +76,145 @@ def has_type_tables(mitm: MITM, concept: ConceptName) -> bool:
return get_mitm_def(mitm).get_properties(concept).permit_attributes
def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: Mapping[RelationName, sa.Column]) -> list[
tuple[RelationName, sa.Column]]:
def has_natural_pk(mitm: MITM, concept: ConceptName) -> bool:
return len(get_mitm_def(mitm).get_identity(concept)) > 0
def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict) -> ColumnsDict | None:
mitm_def = get_mitm_def(mitm)
concept_properties, concept_relations = mitm_def.get(concept)
prepended_cols = lambda: [_within_concept_id_col(mitm, concept)] if not has_natural_pk(mitm, concept) else None
names, mapped_names = map_col_groups(mitm_def, concept, {
'kind': lambda: 'kind',
'type': lambda: concept_properties.typing_concept,
'identity': lambda: list(concept_relations.identity)
})
}, prepended_cols=prepended_cols)
return python_utils.pick_from_mapping(created_columns, names)
return {n: created_columns[n] for n in names}
def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps,
gen_additional_schema_items: Callable[
[MITM, ConceptName, ConceptProperties, OwnedRelations,
dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]],
Generator[
sqlalchemy.sql.schema.SchemaItem, None, None]] | None = None,
override_schema: SchemaName | None = None) -> \
tuple[
sa.Table, dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]]:
mitm_def = get_mitm_def(mitm)
concept_properties, concept_relations = mitm_def.get(concept)
columns, created_columns = map_col_groups(mitm_def, concept, col_group_maps, ensure_unique=True)
def _gen_pk_constraint(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]:
yield sa.PrimaryKeyConstraint(*pk_columns.values())
ref_columns = pick_table_pk(mitm, concept, created_columns)
constraints: list[sa.sql.schema.SchemaItem] = []
if concept_relations.identity:
constraints.append(sa.PrimaryKeyConstraint(*python_utils.i_th(1)(ref_columns)))
def _gen_unique_constraint(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]:
yield sa.UniqueConstraint(*pk_columns.values())
if gen_additional_schema_items:
schema_items = gen_additional_schema_items(mitm, concept, concept_properties, concept_relations,
created_columns,
ref_columns)
constraints.extend(schema_items)
return sa.Table(table_name, meta, schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA,
*columns,
*constraints), created_columns, ref_columns
def _gen_index(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]:
yield sa.Index(*pk_columns.values())
def gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, concept_properties: ConceptProperties,
concept_relations: OwnedRelations, created_columns: dict[RelationName, sa.Column],
ref_columns: list[tuple[RelationName, sa.Column]]) -> Generator[
def _gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]:
_, concept_relations = get_mitm_def(mitm).get(concept)
# self_fk
if pk_columns:
parent_table = mk_concept_table_name(mitm, concept)
cols, refcols = zip(
*((c, qualify(table=parent_table, column=s)) for s, c in ref_columns))
*((c, qualify(table=parent_table, column=s)) for s, c in pk_columns.items()))
yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols)
for fk_name, fk_info in concept_relations.foreign.items():
cols, refcols = zip(*fk_info.fk_relations.items())
fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[
# sa.literal_column(qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c))
qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)
for c in refcols])
yield fkc
def mk_sql_rep_schema(header: Header,
gen_views: Callable[
[MITM, MITMDefinition, ConceptTablesDict, ConceptTypeTablesDict],
_schema_item_generators: tuple[MitMConceptSchemaItemGenerator, ...] = (
_gen_unique_constraint, _gen_pk_constraint, _gen_index, _gen_foreign_key_constraints,)
def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tuple[str, sa.Column], None, None]:
n = _within_concept_id_col(mitm, concept)
yield n, sa.Column(n, sa.Integer, nullable=False, unique=True)
_column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,)
def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps,
additional_column_generators: Iterable[MitMConceptColumnGenerator] | None = (
_gen_within_concept_id_col,),
schema_item_generators: Iterable[MitMConceptSchemaItemGenerator] |
None = None,
override_schema: SchemaName | None = None) -> \
tuple[
sa.Table, ColumnsDict, ColumnsDict]:
mitm_def = get_mitm_def(mitm)
prepended_cols = None
if additional_column_generators is not None:
prepended_cols = lambda: [c for generator in additional_column_generators for c in generator(mitm, concept)]
columns, created_columns = map_col_groups(mitm_def, concept, col_group_maps, prepended_cols=prepended_cols,
ensure_unique=True)
pk_cols = pick_table_pk(mitm, concept, created_columns)
schema_items: list[sa.sql.schema.SchemaItem] = []
if schema_item_generators is not None:
for generator in schema_item_generators:
schema_items.extend(generator(mitm, concept, created_columns, pk_cols))
return sa.Table(table_name, meta, schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA,
*columns,
*schema_items), created_columns, pk_cols
def _prefix_col_name(prefix: str, name: str) -> str:
return f'{prefix}_{name}'
def _gen_denormalized_views(mitm: MITM, concept_tables: ConceptTablesDict, type_tables: ConceptTypeTablesDict) -> \
Generator[
tuple[
TableName, Queryable], None, None]] | None = None,
TableName, Queryable], None, None]:
mitm_def = get_mitm_def(mitm)
for concept in mitm_def.main_concepts:
view_name = mk_concept_table_name(mitm, concept) + '_view'
q = None
if concept_t := concept_tables.get(concept):
if has_type_tables(mitm, concept):
shared_cols = {c.name for c in concept_t.columns}
if concept_type_tables := type_tables.get(concept):
selections = []
for type_name, type_t in concept_type_tables.items():
selection = (c if c.name in shared_cols else sa.alias(c, _prefix_col_name(type_name, c.name))
for c in type_t.columns)
selections.append(sa.select(selection))
q = sa.union(*selections).subquery()
else:
q = sa.select(concept_t)
if q:
yield view_name, q
for parent_concept, subs in mitm_def.sub_concept_map.items():
if concept_t := concept_tables.get(parent_concept):
for sub in subs:
view_name = mk_concept_table_name(mitm, sub) + '_view'
k = mitm_def.get_properties(sub).key
q = sa.select(concept_t).where(concept_t.columns['kind'] == k)
yield view_name, q
_view_generators: tuple[MitMDBViewsGenerator, ...] = (_gen_denormalized_views,)
def mk_sql_rep_schema(header: Header,
view_generators: Iterable[MitMDBViewsGenerator] | None = None,
override_schema: SchemaName | None = None) -> SQLRepresentationSchema:
mitm_def = get_mitm_def(header.mitm)
meta = sa.MetaData(schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA)
......@@ -151,7 +241,8 @@ def mk_sql_rep_schema(header: Header,
'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for _, resolved_fk in
mitm_def.resolve_foreign_types(concept).items() for name, dt in
resolved_fk.items()]
}, override_schema=override_schema)
}, additional_column_generators=(_gen_within_concept_id_col,), schema_item_generators=(
_gen_unique_constraint, _gen_pk_constraint, _gen_index,), override_schema=override_schema)
concept_tables[concept] = t
for he in header.header_entries:
......@@ -176,24 +267,21 @@ def mk_sql_rep_schema(header: Header,
resolved_fk.items()],
'attributes': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in
zip(he.attributes, he.attribute_dtypes)],
}, gen_additional_schema_items=gen_foreign_key_constraints, override_schema=override_schema)
}, additional_column_generators=(_gen_within_concept_id_col,),
schema_item_generators=(
_gen_unique_constraint, _gen_pk_constraint,
_gen_foreign_key_constraints, _gen_index),
override_schema=override_schema)
if he_concept not in type_tables:
type_tables[he_concept] = {}
type_tables[he_concept][he.type_name] = t
# for concept, members in concept_level_view_members.items():
if gen_views:
for name, queryable in gen_views(header.mitm, mitm_def, concept_tables, type_tables):
if view_generators is not None:
for generator in view_generators:
for name, queryable in generator(header.mitm, concept_tables, type_tables):
views[name] = create_view(name, queryable, meta)
# view_selection = sa.union_all(*(sa.select(*pk_cols) for pk_cols in members))
# views[concept] = view.create_materialized_view(mk_concept_table_name(header.mitm, concept), view_selection,
# meta)
return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views)
......@@ -201,7 +289,7 @@ EngineOrConnection = sa.Engine | sa.Connection
@contextmanager
def _nested_conn(bind: EngineOrConnection) -> Generator[Connection | NestedTransaction, None, None]:
def _nested_conn(bind: EngineOrConnection) -> Generator[sa.Connection, None, None]:
if isinstance(bind, sa.Engine):
yield bind.connect()
elif isinstance(bind, sa.Connection):
......@@ -213,25 +301,52 @@ def insert_db_schema(bind: EngineOrConnection, sql_rep_schema: SQLRepresentation
sql_rep_schema.meta.create_all(bind=bind, checkfirst=False)
def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[str, Any] | None = None) -> list[
dict[str, Any]]:
if additional_cols:
df = df.assign(**additional_cols)
return df[[c for c in cols if c in df.columns]].to_dict('records')
def _df_to_table_records(df: pd.DataFrame, table: sa.Table, additional_cols: dict[str, Any] | None = None) -> list[
dict[str, Any]]:
return _df_to_records(df, (c.name for c in table.columns), additional_cols=additional_cols)
def insert_db_instances(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None:
from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes
h = mitm_data.header
mitm = mitm_data.header.mitm
mitm_def = get_mitm_def(mitm)
mitm_dataset = mitm_data_into_mitm_dataframes(mitm_data)
mitm_dataframes = mitm_data_into_mitm_dataframes(mitm_data)
with _nested_conn(bind) as conn:
for concept, typed_dfs in mitm_dataset:
for concept, typed_dfs in mitm_dataframes:
concept_properties, concept_relations = mitm_def.get(concept)
for type_name, type_df in typed_dfs.items():
t_concept = sql_rep_schema.concept_tables[mitm_def.get_parent(concept)]
ref_cols = pick_table_pk(mitm, concept, t_concept.columns)
conn.execute(t_concept.insert(), type_df[[c.name for c in t_concept.columns]].to_dict('records'))
parent_concept = mitm_def.get_parent(concept)
t_concept = sql_rep_schema.concept_tables[parent_concept]
# if not has_natural_pk(mitm, concept):
# TODO not pretty..
# ideally, I'd use the returned "inserted_pk"
# values from the bulk insertion with an autoincrement id col
# but apparently almost no DBABI drivers support this
concept_id_col_name = _get_unique_id_col_name(parent_concept)
max_id = conn.execute(sa.select(func.max(t_concept.columns[concept_id_col_name]))).scalar() or 0
no_rows_to_insert = len(type_df)
artificial_ids = pd.RangeIndex(start=max_id + 1, stop=max_id + 1 + no_rows_to_insert,
name=concept_id_col_name)
type_df = type_df.assign(concept_id_col_name=artificial_ids)
conn.execute(t_concept.insert(), _df_to_table_records(type_df, t_concept))
if has_type_tables(mitm, concept):
# generated_ids = conn.execute(sa.select(t_concept.columns[concept_id_col_name])).scalars()
t_type = sql_rep_schema.type_tables[concept][type_name]
to_dict = type_df[[c.name for c in t_type.columns]].to_dict('records')
conn.execute(t_type.insert(), to_dict)
conn.execute(t_type.insert(), _df_to_table_records(type_df, t_type))
conn.commit()
......
......@@ -13,7 +13,7 @@ class SupersetPostProcessing(pydantic.BaseModel, ABC):
class DatasourceIdentifier(FrozenSupersetDefinition):
id: SupersetId = 'placeholder'
id: SupersetId = '-1' # -1 as a placeholder
type: Literal['table', 'annotation'] = 'table'
dataset_uuid: StrUUID = pydantic.Field(exclude=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment