diff --git a/mitm_tooling/definition/definition_tools.py b/mitm_tooling/definition/definition_tools.py index 5fa9c3e02f6bcf9ec48527e244569729bd440dce..95343f3e3177535b8b9c31ea9c85f072f7c2b321 100644 --- a/mitm_tooling/definition/definition_tools.py +++ b/mitm_tooling/definition/definition_tools.py @@ -1,18 +1,16 @@ from collections.abc import Iterable -from typing import TypeVar, TypedDict, Callable, Generic, Literal, overload +from typing import TypeVar, TypedDict, Callable, Generic from . import MITMDefinition -from .registry import get_mitm_def -from .definition_representation import COLUMN_GROUPS, ConceptKind, ConceptLevel, ConceptName, MITM -from mitm_tooling.utilities.python_utils import elem_wise_eq +from .definition_representation import ConceptName T = TypeVar('T') -def dummy(): return None +def _dummy(): return None -def dummy_list(): return [] +def _dummy_list(): return [] Mapper = Callable[[], T | tuple[str, T]] @@ -29,6 +27,8 @@ class ColGroupMaps(TypedDict, Generic[T], total=False): def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_maps: ColGroupMaps[T], + prepended_cols: MultiMapper | None = None, + appended_cols: MultiMapper | None = None, ensure_unique: bool = True) -> \ tuple[ list[T], dict[str, T]]: @@ -48,19 +48,24 @@ def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_map created_results[name] = result results.append(result) + if prepended_cols: + add_results(prepended_cols()) for column_group in concept_properties.column_group_ordering: + match column_group: case 'kind' if concept_properties.is_abstract or concept_properties.is_sub: - add_results([col_group_maps.get('kind', dummy)()]) + add_results([col_group_maps.get('kind', _dummy)()]) case 'type': - add_results([col_group_maps.get('type', dummy)()]) + add_results([col_group_maps.get('type', _dummy)()]) case 'identity-relations': - add_results(col_group_maps.get('identity', dummy_list)()) + add_results(col_group_maps.get('identity', _dummy_list)()) case 'inline-relations': - add_results(col_group_maps.get('inline', dummy_list)()) + add_results(col_group_maps.get('inline', _dummy_list)()) case 'foreign-relations': - add_results(col_group_maps.get('foreign', dummy_list)()) + add_results(col_group_maps.get('foreign', _dummy_list)()) case 'attributes' if concept_properties.permit_attributes: - add_results(col_group_maps.get('attributes', dummy_list)()) + add_results(col_group_maps.get('attributes', _dummy_list)()) + if appended_cols: + add_results(appended_cols()) return results, created_results diff --git a/mitm_tooling/extraction/sql/data_models/db_meta.py b/mitm_tooling/extraction/sql/data_models/db_meta.py index 6ffcc00082f15f2cd3dcb2e0ea7d78c634da0504..2da47ceb09a8ca0c9d17127b1bc5c3dfd939ec92 100644 --- a/mitm_tooling/extraction/sql/data_models/db_meta.py +++ b/mitm_tooling/extraction/sql/data_models/db_meta.py @@ -7,14 +7,13 @@ from pydantic import Field from sqlalchemy import Table, MetaData from mitm_tooling.data_types import MITMDataType, SA_SQLTypeName, sa_sql_to_mitm_type -from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier +from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier, Queryable from mitm_tooling.utilities.sql_utils import unqualify from .table_identifiers import LocalTableIdentifier, \ AnyLocalTableIdentifier ExplicitTableSelection = dict[SchemaName, set[TableName]] ExplicitColumnSelection = dict[SchemaName, dict[TableName, set[ColumnName]]] -Queryable = sa.FromClause class ExplicitSelectionUtils: diff --git a/mitm_tooling/representation/sql/common.py b/mitm_tooling/representation/sql/common.py index 4c45e44fafb9b87a4eddd69a4b590363af20c92b..aa14dbb3fe0a819f9efe20c0ddb39f8218d851bb 100644 --- a/mitm_tooling/representation/sql/common.py +++ b/mitm_tooling/representation/sql/common.py @@ -1,4 +1,7 @@ +import sqlalchemy as sa + TableName = str SchemaName = str ShortTableIdentifier = tuple[SchemaName, TableName] QualifiedTableName = str +Queryable = sa.FromClause \ No newline at end of file diff --git a/mitm_tooling/representation/sql_representation.py b/mitm_tooling/representation/sql_representation.py index 14cfdddcb9c2827b79796752d359a77def31671b..195cc19bcd59f46f8efd19fe8d78f1540597d5f2 100644 --- a/mitm_tooling/representation/sql_representation.py +++ b/mitm_tooling/representation/sql_representation.py @@ -1,34 +1,55 @@ from __future__ import annotations +from collections.abc import Callable, Iterable +from contextlib import contextmanager +from typing import TYPE_CHECKING, Generator, Any, Sequence + +import pandas as pd import pydantic -import sqlalchemy as sa import sqlalchemy.sql.schema -from collections.abc import Callable, Mapping -from contextlib import contextmanager -from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRelations, RelationName -from mitm_tooling.definition.definition_tools import ColGroupMaps -from mitm_tooling.utilities import python_utils -from mitm_tooling.utilities.io_utils import FilePath -from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify from pydantic import AnyUrl, ConfigDict -from sqlalchemy import Connection, NestedTransaction +from sqlalchemy import func from sqlalchemy.pool import StaticPool from sqlalchemy_utils.view import create_view -from typing import TYPE_CHECKING, Generator +from mitm_tooling.definition import RelationName +from mitm_tooling.definition.definition_tools import ColGroupMaps +from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify +from . import Header, MITMData from .common import * -from .intermediate_representation import Header, MITMData from .sql.common import * +from ..utilities.io_utils import FilePath if TYPE_CHECKING: - from mitm_tooling.extraction.sql.data_models import Queryable + pass SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main' -ConceptTablesDict = dict[ConceptName, sa.Table] +ColumnsDict = dict[RelationName, sa.Column] ViewsDict = dict[TableName, sa.Table] +ConceptTablesDict = dict[ConceptName, sa.Table] ConceptTypeTablesDict = dict[ConceptName, dict[str, sa.Table]] +MitMConceptSchemaItemGenerator = Callable[ + [MITM, ConceptName, ColumnsDict, ColumnsDict | None], Generator[sqlalchemy.sql.schema.SchemaItem, None, None]] +MitMConceptColumnGenerator = Callable[ + [MITM, ConceptName], Generator[tuple[str, sa.Column], None, None]] +MitMDBViewsGenerator = Callable[[MITM, ConceptTablesDict, ConceptTypeTablesDict], +Generator[ + tuple[ + TableName, Queryable], None, None]] + +ARTIFICIAL_ROW_ID_PREFIX = 'row' + + +def _get_unique_id_col_name(prefix: str | None = None) -> str: + return '__' + ((prefix + '_') if prefix else '') + 'id' + + +def _within_concept_id_col(mitm: MITM, concept: ConceptName) -> str: + parent_concept = get_mitm_def(mitm).get_parent(concept) + return _get_unique_id_col_name(parent_concept) + class SQLRepresentationSchema(pydantic.BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) @@ -55,76 +76,145 @@ def has_type_tables(mitm: MITM, concept: ConceptName) -> bool: return get_mitm_def(mitm).get_properties(concept).permit_attributes -def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: Mapping[RelationName, sa.Column]) -> list[ - tuple[RelationName, sa.Column]]: +def has_natural_pk(mitm: MITM, concept: ConceptName) -> bool: + return len(get_mitm_def(mitm).get_identity(concept)) > 0 + + +def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict) -> ColumnsDict | None: mitm_def = get_mitm_def(mitm) concept_properties, concept_relations = mitm_def.get(concept) + prepended_cols = lambda: [_within_concept_id_col(mitm, concept)] if not has_natural_pk(mitm, concept) else None names, mapped_names = map_col_groups(mitm_def, concept, { 'kind': lambda: 'kind', 'type': lambda: concept_properties.typing_concept, 'identity': lambda: list(concept_relations.identity) - }) + }, prepended_cols=prepended_cols) - return python_utils.pick_from_mapping(created_columns, names) + return {n: created_columns[n] for n in names} -def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps, - gen_additional_schema_items: Callable[ - [MITM, ConceptName, ConceptProperties, OwnedRelations, - dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]], - Generator[ - sqlalchemy.sql.schema.SchemaItem, None, None]] | None = None, - override_schema: SchemaName | None = None) -> \ - tuple[ - sa.Table, dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]]: - mitm_def = get_mitm_def(mitm) - concept_properties, concept_relations = mitm_def.get(concept) - - columns, created_columns = map_col_groups(mitm_def, concept, col_group_maps, ensure_unique=True) +def _gen_pk_constraint(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict, + pk_columns: ColumnsDict | None) -> Generator[ + sa.sql.schema.SchemaItem, None, None]: + yield sa.PrimaryKeyConstraint(*pk_columns.values()) - ref_columns = pick_table_pk(mitm, concept, created_columns) - constraints: list[sa.sql.schema.SchemaItem] = [] - if concept_relations.identity: - constraints.append(sa.PrimaryKeyConstraint(*python_utils.i_th(1)(ref_columns))) +def _gen_unique_constraint(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict, + pk_columns: ColumnsDict | None) -> Generator[ + sa.sql.schema.SchemaItem, None, None]: + yield sa.UniqueConstraint(*pk_columns.values()) - if gen_additional_schema_items: - schema_items = gen_additional_schema_items(mitm, concept, concept_properties, concept_relations, - created_columns, - ref_columns) - constraints.extend(schema_items) - return sa.Table(table_name, meta, schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA, - *columns, - *constraints), created_columns, ref_columns +def _gen_index(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict, + pk_columns: ColumnsDict | None) -> Generator[ + sa.sql.schema.SchemaItem, None, None]: + yield sa.Index(*pk_columns.values()) -def gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, concept_properties: ConceptProperties, - concept_relations: OwnedRelations, created_columns: dict[RelationName, sa.Column], - ref_columns: list[tuple[RelationName, sa.Column]]) -> Generator[ +def _gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict, + pk_columns: ColumnsDict | None) -> Generator[ sa.sql.schema.SchemaItem, None, None]: + _, concept_relations = get_mitm_def(mitm).get(concept) + # self_fk - parent_table = mk_concept_table_name(mitm, concept) - cols, refcols = zip( - *((c, qualify(table=parent_table, column=s)) for s, c in ref_columns)) + if pk_columns: + parent_table = mk_concept_table_name(mitm, concept) + cols, refcols = zip( + *((c, qualify(table=parent_table, column=s)) for s, c in pk_columns.items())) + yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols) - yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols) for fk_name, fk_info in concept_relations.foreign.items(): cols, refcols = zip(*fk_info.fk_relations.items()) fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[ - # sa.literal_column(qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)) qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c) for c in refcols]) yield fkc +_schema_item_generators: tuple[MitMConceptSchemaItemGenerator, ...] = ( + _gen_unique_constraint, _gen_pk_constraint, _gen_index, _gen_foreign_key_constraints,) + + +def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tuple[str, sa.Column], None, None]: + n = _within_concept_id_col(mitm, concept) + yield n, sa.Column(n, sa.Integer, nullable=False, unique=True) + + +_column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,) + + +def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps, + additional_column_generators: Iterable[MitMConceptColumnGenerator] | None = ( + _gen_within_concept_id_col,), + schema_item_generators: Iterable[MitMConceptSchemaItemGenerator] | + None = None, + override_schema: SchemaName | None = None) -> \ + tuple[ + sa.Table, ColumnsDict, ColumnsDict]: + mitm_def = get_mitm_def(mitm) + + prepended_cols = None + if additional_column_generators is not None: + prepended_cols = lambda: [c for generator in additional_column_generators for c in generator(mitm, concept)] + + columns, created_columns = map_col_groups(mitm_def, concept, col_group_maps, prepended_cols=prepended_cols, + ensure_unique=True) + + pk_cols = pick_table_pk(mitm, concept, created_columns) + + schema_items: list[sa.sql.schema.SchemaItem] = [] + if schema_item_generators is not None: + for generator in schema_item_generators: + schema_items.extend(generator(mitm, concept, created_columns, pk_cols)) + + return sa.Table(table_name, meta, schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA, + *columns, + *schema_items), created_columns, pk_cols + + +def _prefix_col_name(prefix: str, name: str) -> str: + return f'{prefix}_{name}' + + +def _gen_denormalized_views(mitm: MITM, concept_tables: ConceptTablesDict, type_tables: ConceptTypeTablesDict) -> \ + Generator[ + tuple[ + TableName, Queryable], None, None]: + mitm_def = get_mitm_def(mitm) + + for concept in mitm_def.main_concepts: + view_name = mk_concept_table_name(mitm, concept) + '_view' + q = None + if concept_t := concept_tables.get(concept): + if has_type_tables(mitm, concept): + shared_cols = {c.name for c in concept_t.columns} + if concept_type_tables := type_tables.get(concept): + selections = [] + for type_name, type_t in concept_type_tables.items(): + selection = (c if c.name in shared_cols else sa.alias(c, _prefix_col_name(type_name, c.name)) + for c in type_t.columns) + selections.append(sa.select(selection)) + q = sa.union(*selections).subquery() + else: + q = sa.select(concept_t) + if q: + yield view_name, q + + for parent_concept, subs in mitm_def.sub_concept_map.items(): + if concept_t := concept_tables.get(parent_concept): + for sub in subs: + view_name = mk_concept_table_name(mitm, sub) + '_view' + k = mitm_def.get_properties(sub).key + q = sa.select(concept_t).where(concept_t.columns['kind'] == k) + yield view_name, q + + +_view_generators: tuple[MitMDBViewsGenerator, ...] = (_gen_denormalized_views,) + + def mk_sql_rep_schema(header: Header, - gen_views: Callable[ - [MITM, MITMDefinition, ConceptTablesDict, ConceptTypeTablesDict], - Generator[ - tuple[ - TableName, Queryable], None, None]] | None = None, + view_generators: Iterable[MitMDBViewsGenerator] | None = None, override_schema: SchemaName | None = None) -> SQLRepresentationSchema: mitm_def = get_mitm_def(header.mitm) meta = sa.MetaData(schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA) @@ -151,7 +241,8 @@ def mk_sql_rep_schema(header: Header, 'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for _, resolved_fk in mitm_def.resolve_foreign_types(concept).items() for name, dt in resolved_fk.items()] - }, override_schema=override_schema) + }, additional_column_generators=(_gen_within_concept_id_col,), schema_item_generators=( + _gen_unique_constraint, _gen_pk_constraint, _gen_index,), override_schema=override_schema) concept_tables[concept] = t for he in header.header_entries: @@ -176,23 +267,20 @@ def mk_sql_rep_schema(header: Header, resolved_fk.items()], 'attributes': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in zip(he.attributes, he.attribute_dtypes)], - }, gen_additional_schema_items=gen_foreign_key_constraints, override_schema=override_schema) + }, additional_column_generators=(_gen_within_concept_id_col,), + schema_item_generators=( + _gen_unique_constraint, _gen_pk_constraint, + _gen_foreign_key_constraints, _gen_index), + override_schema=override_schema) if he_concept not in type_tables: type_tables[he_concept] = {} type_tables[he_concept][he.type_name] = t - # for concept, members in concept_level_view_members.items(): - - if gen_views: - for name, queryable in gen_views(header.mitm, mitm_def, concept_tables, type_tables): - views[name] = create_view(name, queryable, meta) - - # view_selection = sa.union_all(*(sa.select(*pk_cols) for pk_cols in members)) - - # views[concept] = view.create_materialized_view(mk_concept_table_name(header.mitm, concept), view_selection, - - # meta) + if view_generators is not None: + for generator in view_generators: + for name, queryable in generator(header.mitm, concept_tables, type_tables): + views[name] = create_view(name, queryable, meta) return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views) @@ -201,7 +289,7 @@ EngineOrConnection = sa.Engine | sa.Connection @contextmanager -def _nested_conn(bind: EngineOrConnection) -> Generator[Connection | NestedTransaction, None, None]: +def _nested_conn(bind: EngineOrConnection) -> Generator[sa.Connection, None, None]: if isinstance(bind, sa.Engine): yield bind.connect() elif isinstance(bind, sa.Connection): @@ -213,25 +301,52 @@ def insert_db_schema(bind: EngineOrConnection, sql_rep_schema: SQLRepresentation sql_rep_schema.meta.create_all(bind=bind, checkfirst=False) +def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[str, Any] | None = None) -> list[ + dict[str, Any]]: + if additional_cols: + df = df.assign(**additional_cols) + return df[[c for c in cols if c in df.columns]].to_dict('records') + + +def _df_to_table_records(df: pd.DataFrame, table: sa.Table, additional_cols: dict[str, Any] | None = None) -> list[ + dict[str, Any]]: + return _df_to_records(df, (c.name for c in table.columns), additional_cols=additional_cols) + + def insert_db_instances(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None: from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes h = mitm_data.header mitm = mitm_data.header.mitm mitm_def = get_mitm_def(mitm) - mitm_dataset = mitm_data_into_mitm_dataframes(mitm_data) + mitm_dataframes = mitm_data_into_mitm_dataframes(mitm_data) with _nested_conn(bind) as conn: - for concept, typed_dfs in mitm_dataset: + for concept, typed_dfs in mitm_dataframes: concept_properties, concept_relations = mitm_def.get(concept) for type_name, type_df in typed_dfs.items(): - t_concept = sql_rep_schema.concept_tables[mitm_def.get_parent(concept)] - ref_cols = pick_table_pk(mitm, concept, t_concept.columns) - conn.execute(t_concept.insert(), type_df[[c.name for c in t_concept.columns]].to_dict('records')) + parent_concept = mitm_def.get_parent(concept) + t_concept = sql_rep_schema.concept_tables[parent_concept] + + # if not has_natural_pk(mitm, concept): + # TODO not pretty.. + # ideally, I'd use the returned "inserted_pk" + # values from the bulk insertion with an autoincrement id col + # but apparently almost no DBABI drivers support this + + concept_id_col_name = _get_unique_id_col_name(parent_concept) + max_id = conn.execute(sa.select(func.max(t_concept.columns[concept_id_col_name]))).scalar() or 0 + no_rows_to_insert = len(type_df) + artificial_ids = pd.RangeIndex(start=max_id + 1, stop=max_id + 1 + no_rows_to_insert, + name=concept_id_col_name) + type_df = type_df.assign(concept_id_col_name=artificial_ids) + + conn.execute(t_concept.insert(), _df_to_table_records(type_df, t_concept)) if has_type_tables(mitm, concept): + # generated_ids = conn.execute(sa.select(t_concept.columns[concept_id_col_name])).scalars() + t_type = sql_rep_schema.type_tables[concept][type_name] - to_dict = type_df[[c.name for c in t_type.columns]].to_dict('records') - conn.execute(t_type.insert(), to_dict) + conn.execute(t_type.insert(), _df_to_table_records(type_df, t_type)) conn.commit() diff --git a/mitm_tooling/transformation/superset/definitions/core.py b/mitm_tooling/transformation/superset/definitions/core.py index e2bee4b68b5f9cd919d400e3443bf2f90de80892..0122835721f73e97b10386945ab25f477562c6a0 100644 --- a/mitm_tooling/transformation/superset/definitions/core.py +++ b/mitm_tooling/transformation/superset/definitions/core.py @@ -13,7 +13,7 @@ class SupersetPostProcessing(pydantic.BaseModel, ABC): class DatasourceIdentifier(FrozenSupersetDefinition): - id: SupersetId = 'placeholder' + id: SupersetId = '-1' # -1 as a placeholder type: Literal['table', 'annotation'] = 'table' dataset_uuid: StrUUID = pydantic.Field(exclude=True)