Skip to content
Snippets Groups Projects
Select Git revision
  • d10e237644da2e12fac9dc6f4d8d87c8f6b96f74
  • master default protected
  • develop
3 results

sql_representation.py

Blame
  • sql_representation.py 19.87 KiB
    from __future__ import annotations
    
    from collections.abc import Callable, Iterable
    from enum import StrEnum
    from typing import TYPE_CHECKING, Generator, TypedDict
    
    import pydantic
    import sqlalchemy as sa
    from pydantic import ConfigDict
    from sqlalchemy.sql.schema import SchemaItem
    
    from mitm_tooling.definition import RelationName, MITMDefinition, ConceptName, TypeName, MITM, get_mitm_def
    from mitm_tooling.definition.definition_tools import ColGroupMaps, map_col_groups
    from mitm_tooling.utilities.backports.sqlchemy_sql_views import create_view
    from mitm_tooling.utilities.sql_utils import qualify
    from .common import *
    from ..intermediate.header import Header
    from ...data_types import MITMDataType
    
    if TYPE_CHECKING:
        pass
    
    SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main'
    
    
    class HeaderMetaTableName(StrEnum):
        KeyValue = 'header_meta_key_value'
        HeaderMetaDefinition = 'header_meta_definition'
        Types = 'header_meta_types'
        TypeAttributes = 'header_meta_type_attributes'
    
    
    class HeaderMetaTables(pydantic.BaseModel):
        model_config = ConfigDict(arbitrary_types_allowed=True)
    
        key_value: sa.Table
        types: sa.Table
        type_attributes: sa.Table
    
    
    ColumnsDict = dict[RelationName, sa.Column]
    ViewsDict = dict[TableName, sa.Table]
    ConceptTablesDict = dict[ConceptName, sa.Table]
    ConceptTypeTablesDict = dict[ConceptName, dict[TypeName, sa.Table]]
    
    MitMConceptSchemaItemGenerator = Callable[
        [MITM, ConceptName, SchemaName, TableName, ColumnsDict, ColumnsDict | None], Generator[SchemaItem, None, None]]
    MitMConceptColumnGenerator = Callable[
        [MITM, ConceptName], Generator[tuple[str, sa.Column], None, None]]
    MitMDBViewsGenerator = Callable[[MITM, ConceptTablesDict, ConceptTypeTablesDict],
    Generator[
        tuple[
            TableName, Queryable], None, None]]
    
    ARTIFICIAL_ROW_ID_PREFIX = 'row'
    
    
    def _prefix_col_name(prefix: str, name: str) -> str:
        return f'{prefix}_{name}'
    
    
    def _get_unique_id_col_name(prefix: str | None = None) -> str:
        return '__' + ((prefix + '_') if prefix else '') + 'id'
    
    
    def _within_concept_id_col(mitm: MITM, concept: ConceptName) -> str:
        parent_concept = get_mitm_def(mitm).get_parent(concept)
        return _get_unique_id_col_name(parent_concept)
    
    
    class SQLRepresentationSchema(pydantic.BaseModel):
        model_config = ConfigDict(arbitrary_types_allowed=True)
    
        mitm: MITM
        sa_meta: sa.MetaData
        meta_tables: HeaderMetaTables | None = None
        concept_tables: ConceptTablesDict = pydantic.Field(default_factory=ConceptTablesDict)
        type_tables: ConceptTypeTablesDict = pydantic.Field(default_factory=ConceptTypeTablesDict)
        views: ViewsDict = pydantic.Field(default_factory=ViewsDict)
    
        def get_concept_table(self, concept: ConceptName) -> sa.Table | None:
            return self.concept_tables.get(concept)
    
        def get_type_table(self, concept: ConceptName, type_name: TypeName) -> sa.Table | None:
            return self.type_tables.get(concept, {}).get(type_name)
    
    def mk_concept_table_name(mitm: MITM, concept: ConceptName) -> TableName:
        return get_mitm_def(mitm).get_properties(concept).plural
    
    
    def mk_type_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName) -> TableName:
        return get_mitm_def(mitm).get_properties(concept).key + '_' + type_name.lower()
    
    
    def mk_link_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName, fk_name: RelationName) -> TableName:
        return mk_type_table_name(mitm, concept, type_name) + '_' + fk_name.lower()
    
    
    def has_type_tables(mitm_def: MITMDefinition, concept: ConceptName) -> bool:
        return mitm_def.get_properties(concept).permit_attributes
    
    
    def has_natural_pk(mitm_def: MITMDefinition, concept: ConceptName) -> bool:
        return len(mitm_def.get_identity(concept)) > 0
    
    
    def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict) -> ColumnsDict | None:
        mitm_def = get_mitm_def(mitm)
        concept_properties, concept_relations = mitm_def.get(concept)
    
        prepended_cols = None
        if not has_natural_pk(mitm_def, concept):
            prepended_cols = lambda: [_within_concept_id_col(mitm, concept)]
        names, mapped_names = map_col_groups(mitm_def, concept, {
            'kind': lambda: 'kind',
            'type': lambda: concept_properties.typing_concept,
            'identity': lambda: list(concept_relations.identity)
        }, prepended_cols=prepended_cols)
    
        return {n: created_columns[n] for n in names}
    
    
    def _gen_unique_constraint(mitm: MITM,
                               concept: ConceptName,
                               schema_name: SchemaName,
                               table_name: TableName,
                               created_columns: ColumnsDict,
                               pk_columns: ColumnsDict | None) -> Generator[
        sa.sql.schema.SchemaItem, None, None]:
        yield sa.UniqueConstraint(*pk_columns.values())
    
    
    def _gen_pk_constraint(mitm: MITM,
                           concept: ConceptName,
                           schema_name: SchemaName,
                           table_name: TableName,
                           created_columns: ColumnsDict,
                           pk_columns: ColumnsDict | None) -> Generator[
        sa.sql.schema.SchemaItem, None, None]:
        yield sa.PrimaryKeyConstraint(*pk_columns.values())
    
    
    def _gen_index(mitm: MITM,
                   concept: ConceptName,
                   schema_name: SchemaName,
                   table_name: TableName,
                   created_columns: ColumnsDict,
                   pk_columns: ColumnsDict | None) -> Generator[
        sa.sql.schema.SchemaItem, None, None]:
        yield sa.Index(f'{table_name}.index', *pk_columns.values(), unique=True)
    
    
    def _gen_foreign_key_constraints(mitm: MITM,
                                     concept: ConceptName,
                                     schema_name: SchemaName,
                                     table_name: TableName,
                                     created_columns: ColumnsDict,
                                     pk_columns: ColumnsDict | None) -> Generator[
        sa.sql.schema.SchemaItem, None, None]:
        mitm_def = get_mitm_def(mitm)
        _, concept_relations = mitm_def.get(concept)
    
        # self_fk
        if pk_columns:
            parent_concept = mitm_def.get_parent(concept)
            parent_table = mk_concept_table_name(mitm, parent_concept)
            cols, refcols = zip(
                *((c, qualify(schema=schema_name, table=parent_table, column=s)) for s, c in pk_columns.items()))
            yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols)
    
        for fk_name, fk_info in concept_relations.foreign.items():
            cols, refcols = zip(*fk_info.fk_relations.items())
            fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[
                qualify(schema=schema_name, table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)
                for c in refcols])
            yield fkc
    
    
    _schema_item_generators: tuple[MitMConceptSchemaItemGenerator, ...] = (
        _gen_unique_constraint, _gen_pk_constraint, _gen_index, _gen_foreign_key_constraints,)
    
    
    def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tuple[str, sa.Column], None, None]:
        n = _within_concept_id_col(mitm, concept)
        yield n, sa.Column(n, sa.Integer, nullable=False, unique=True, index=True)
    
    
    _column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,)
    
    
    def mk_table(meta: sa.MetaData,
                 mitm: MITM,
                 concept: ConceptName,
                 table_name: TableName,
                 col_group_maps: ColGroupMaps[sa.Column],
                 additional_column_generators: Iterable[MitMConceptColumnGenerator] | None = (
                         _gen_within_concept_id_col,),
                 schema_item_generators: Iterable[MitMConceptSchemaItemGenerator] |
                                         None = (_gen_unique_constraint, _gen_pk_constraint, _gen_index,),
                 override_schema: SchemaName | None = None) -> \
            tuple[
                sa.Table, ColumnsDict, ColumnsDict]:
        mitm_def = get_mitm_def(mitm)
        schema = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
    
        prepended_cols = None
        if additional_column_generators is not None:
            prepended_cols = lambda: [c for generator in additional_column_generators for c in generator(mitm, concept)]
    
        columns, created_columns = map_col_groups(mitm_def, concept, col_group_maps, prepended_cols=prepended_cols,
                                                  ensure_unique=True)
    
        pk_cols = pick_table_pk(mitm, concept, created_columns)
    
        schema_items: list[sa.sql.schema.SchemaItem] = []
        if schema_item_generators is not None:
            for generator in schema_item_generators:
                schema_items.extend(generator(mitm, concept, schema, table_name, created_columns, pk_cols))
    
        return sa.Table(table_name, meta,
                        *columns,
                        *schema_items,
                        schema=schema), created_columns, pk_cols
    
    
    def mk_header_tables(meta: sa.MetaData,
                         override_schema: SchemaName | None = None) -> HeaderMetaTables:
        schema = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
    
        header_meta_types = sa.Table(HeaderMetaTableName.Types, meta,
                                     sa.Column('kind', MITMDataType.Text.sa_sql_type, primary_key=True),
                                     sa.Column('type', MITMDataType.Text.sa_sql_type, primary_key=True),
                                     sa.Column('concept', MITMDataType.Text.sa_sql_type),
                                     schema=schema
                                     )
        header_meta_type_attributes = sa.Table(HeaderMetaTableName.TypeAttributes, meta,
                                               sa.Column('kind', MITMDataType.Text.sa_sql_type, primary_key=True),
                                               sa.Column('type', MITMDataType.Text.sa_sql_type, primary_key=True),
                                               sa.Column('attribute_order',
                                                         MITMDataType.Integer.sa_sql_type,
                                                         primary_key=True),
                                               sa.Column('attribute_name', MITMDataType.Text.sa_sql_type),
                                               sa.Column('attribute_dtype', MITMDataType.Text.sa_sql_type),
                                               sa.ForeignKeyConstraint(name='header_meta_type',
                                                                       columns=['kind', 'type'],
                                                                       refcolumns=[header_meta_types.c.kind,
                                                                                   header_meta_types.c.type]),
                                               schema=schema
                                               )
    
        header_meta_key_value = sa.Table(HeaderMetaTableName.KeyValue,
                                          meta,
                                          sa.Column('key', MITMDataType.Text.sa_sql_type, primary_key=True),
                                          sa.Column('value', MITMDataType.Json.sa_sql_type),
                                          schema=schema)
    
        return HeaderMetaTables(key_value=header_meta_key_value,
                                types=header_meta_types,
                                type_attributes=header_meta_type_attributes)
    
    
    def _gen_denormalized_views(mitm: MITM, concept_tables: ConceptTablesDict, type_tables: ConceptTypeTablesDict) -> \
            Generator[
                tuple[
                    TableName, Queryable], None, None]:
        mitm_def = get_mitm_def(mitm)
    
        for main_concept in mitm_def.main_concepts:
            for concept in mitm_def.get_leafs(main_concept):
                view_name = mk_concept_table_name(mitm, concept) + '_denormalized_view'
                q = None
                if has_type_tables(mitm_def, concept):
                    selections = []
    
                    for leaf_concept in mitm_def.get_leafs(concept):
                        if concept_type_tables := type_tables.get(leaf_concept):
                            col_sets = [{(c.name, str(c.type)) for c in t.columns} for t in concept_type_tables.values()]
                            shared_cols = set.intersection(*col_sets)
                            all_cols = set.union(*col_sets)
    
                            for type_name, type_t in concept_type_tables.items():
                                selection = []
                                for (col_name, col_type) in all_cols:
                                    if (c := type_t.columns.get(col_name)) is not None and str(c.type) == col_type:
                                        selection.append(c)
                                    else:
                                        selection.append(sa.null().label(col_name))
    
                                # selection = (c if (c.name, str(c.type)) in shared_cols else sa.label(_prefix_col_name(type_name, c.name), c)
                                #             for c in type_t.columns)
                                selections.append(sa.select(*selection))
    
                    if selections:
                        q = sa.union_all(*selections).subquery()
                else:
                    if (concept_t := concept_tables.get(concept)) is not None:
                        # base_cols = {(c.name, str(c.type)) for c in concept_t.columns}
                        q = sa.select(concept_t)
    
                if q is not None:
                    yield view_name, q
    
        for parent_concept, subs in mitm_def.sub_concept_map.items():
            if (concept_t := concept_tables.get(parent_concept)) is not None:
                for sub in subs:
                    view_name = mk_concept_table_name(mitm, sub) + '_view'
                    k = mitm_def.get_properties(sub).key
                    q = sa.select(concept_t).where(concept_t.columns['kind'] == k)
                    yield view_name, q
    
    
    _view_generators: tuple[MitMDBViewsGenerator, ...] = (_gen_denormalized_views,)
    
    
    def mk_sql_rep_schema(header: Header,
                          view_generators: Iterable[MitMDBViewsGenerator] | None = (_gen_denormalized_views,),
                          override_schema: SchemaName | None = None,
                          skip_fk_constraints: bool = False,
                          include_meta_tables: bool = True) -> SQLRepresentationSchema:
        schema_name = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
        mitm_def = get_mitm_def(header.mitm)
        meta = sa.MetaData(schema=schema_name)
    
        concept_tables: ConceptTablesDict = {}
        type_tables: ConceptTypeTablesDict = {}
        views: dict[str, sa.Table] = {}
    
        base_schema_item_generators = (_gen_unique_constraint, _gen_pk_constraint, _gen_index,)
        for concept in mitm_def.main_concepts:
            concept_properties, concept_relations = mitm_def.get(concept)
    
            table_name = mk_concept_table_name(header.mitm, concept)
    
            t, t_columns, t_ref_columns = mk_table(meta,
                                                   header.mitm,
                                                   concept,
                                                   table_name,
                                                   col_group_maps={
                                                       'kind': lambda: ('kind', sa.Column('kind',
                                                                                          MITMDataType.Text.sa_sql_type,
                                                                                          nullable=False)),
                                                       'type': lambda: (concept_properties.typing_concept,
                                                                        sa.Column(concept_properties.typing_concept,
                                                                                  MITMDataType.Text.sa_sql_type,
                                                                                  nullable=False)),
                                                       'identity': lambda: [
                                                           (name, sa.Column(name, dt.sa_sql_type, nullable=False)) for
                                                           name, dt in
                                                           mitm_def.resolve_identity_type(concept).items()],
                                                       'inline': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for
                                                                          name, dt in
                                                                          mitm_def.resolve_inlined_types(concept).items()],
                                                       'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for
                                                                           _, resolved_fk in
                                                                           mitm_def.resolve_foreign_types(concept).items()
                                                                           for name, dt in
                                                                           resolved_fk.items()]
                                                   },
                                                   additional_column_generators=(_gen_within_concept_id_col,),
                                                   schema_item_generators=base_schema_item_generators,
                                                   override_schema=schema_name)
            concept_tables[concept] = t
    
        type_table_schema_item_generators = base_schema_item_generators + (
            _gen_foreign_key_constraints,) if not skip_fk_constraints else base_schema_item_generators
        for he in header.header_entries:
            he_concept = he.concept
            if has_type_tables(mitm_def, he_concept):
                concept_properties, concept_relations = mitm_def.get(he_concept)
    
                table_name = mk_type_table_name(header.mitm, he_concept, he.type_name)
    
                t, t_columns, t_ref_columns = mk_table(meta, header.mitm, he_concept, table_name, {
                    'kind': lambda: ('kind', sa.Column('kind', MITMDataType.Text.sa_sql_type, nullable=False)),
                    'type': lambda: (concept_properties.typing_concept, sa.Column(concept_properties.typing_concept,
                                                                                  MITMDataType.Text.sa_sql_type,
                                                                                  nullable=False)),
                    'identity': lambda: [(name, sa.Column(name, dt.sa_sql_type, nullable=False)) for
                                         name, dt in
                                         mitm_def.resolve_identity_type(he_concept).items()],
                    'inline': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in
                                       mitm_def.resolve_inlined_types(he_concept).items()],
                    'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for _, resolved_fk in
                                        mitm_def.resolve_foreign_types(he_concept).items() for name, dt in
                                        resolved_fk.items()],
                    'attributes': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in
                                           he.iter_attr_dtype_pairs()],
                }, additional_column_generators=(_gen_within_concept_id_col,),
                                                       schema_item_generators=type_table_schema_item_generators,
                                                       override_schema=schema_name)
    
                if he_concept not in type_tables:
                    type_tables[he_concept] = {}
                type_tables[he_concept][he.type_name] = t
    
        if view_generators is not None:
            for generator in view_generators:
                for name, queryable in generator(header.mitm, concept_tables, type_tables):
                    views[name] = create_view(name, queryable, meta, schema=schema_name)
    
        meta_tables = None
        if include_meta_tables:
            meta_tables = mk_header_tables(meta, override_schema=schema_name)
    
        return SQLRepresentationSchema(mitm=header.mitm,
                                       sa_meta=meta,
                                       meta_tables=meta_tables,
                                       concept_tables=concept_tables,
                                       type_tables=type_tables,
                                       views=views)