Skip to content
Snippets Groups Projects
Commit b2754f3a authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

refactored sql representation and improved superset definition generation

parent b8caffe4
No related branches found
No related tags found
No related merge requests found
Showing
with 458 additions and 107 deletions
...@@ -241,3 +241,4 @@ cython_debug/ ...@@ -241,3 +241,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
sandbox/
\ No newline at end of file
...@@ -4,7 +4,7 @@ lock: ...@@ -4,7 +4,7 @@ lock:
@poetry lock --no-update @poetry lock --no-update
update: update:
@poetry update @poetry update --with dev
build: build:
@poetry build @poetry build
......
...@@ -7,8 +7,19 @@ class ColumnDataTypeConversionException(Exception): ...@@ -7,8 +7,19 @@ class ColumnDataTypeConversionException(Exception):
pass pass
def convert_df(df: pd.DataFrame, data_types: dict[str, MITMDataType], inplace=False): def convert_df(df: pd.DataFrame, data_types: dict[str, MITMDataType], inplace=False, skip_unmapped: bool = False):
res = pd.DataFrame(index=df.index) if not inplace else df unconverted_columns = [c for c in df.columns if c not in data_types]
if inplace:
if skip_unmapped:
res = df.drop(unconverted_columns, axis='columns', inplace=True)
else:
res = df
else:
if skip_unmapped:
data = None
else:
data = df[unconverted_columns]
res = pd.DataFrame(data=data, index=df.index)
for col, dt in data_types.items(): for col, dt in data_types.items():
try: try:
......
...@@ -66,29 +66,46 @@ class ConceptProperties(pydantic.BaseModel): ...@@ -66,29 +66,46 @@ class ConceptProperties(pydantic.BaseModel):
del props['key'] del props['key']
return props return props
@property
def is_abstract(self) -> bool:
return self.nature[1] == ConceptKind.Abstract
@property
def is_main(self) -> bool:
return self.nature[0] == ConceptLevel.Main
@property
def is_sub(self) -> bool:
return self.nature[0] == ConceptLevel.Sub
@property
def is_weak(self) -> bool:
return self.nature[0] == ConceptLevel.Weak
class MITMDefinition(pydantic.BaseModel): class MITMDefinition(pydantic.BaseModel):
main_concepts: set[ConceptName] main_concepts: set[ConceptName]
weak_concepts: dict[ConceptName, MITMDataType] weak_concepts: dict[ConceptName, MITMDataType]
sub_concepts: dict[ConceptName, set[ConceptName]] sub_concept_map: dict[ConceptName, set[ConceptName]]
concept_relations: dict[ConceptName, OwnedRelations] concept_relations: dict[ConceptName, OwnedRelations] # only defined on the main_concepts level
concept_properties: dict[ConceptName, ConceptProperties] concept_properties: dict[ConceptName, ConceptProperties] # available for each individual concept
@pydantic.computed_field() @pydantic.computed_field()
@cached_property @cached_property
def leaf_concepts(self) -> set[ConceptName]: def leaf_concepts(self) -> set[ConceptName]:
return {c for c in self.main_concepts if c not in self.sub_concepts} | {sc for c in self.main_concepts for sc in return {c for c in self.main_concepts if c not in self.sub_concept_map} | {sc for c in self.main_concepts for sc
self.sub_concepts.get(c, [])} in
self.sub_concept_map.get(c, [])}
@pydantic.computed_field() @pydantic.computed_field()
@cached_property @cached_property
def abstract_concepts(self) -> set[ConceptName]: def abstract_concepts(self) -> set[ConceptName]:
return {c for c in self.sub_concepts} return {c for c in self.sub_concept_map}
@pydantic.computed_field() @pydantic.computed_field()
@cached_property @cached_property
def parent_concepts_map(self) -> dict[ConceptName, ConceptName]: def parent_concept_map(self) -> dict[ConceptName, ConceptName]:
return {sub: c for c, subs in self.sub_concepts.items() for sub in subs} return {sub: c for c, subs in self.sub_concept_map.items() for sub in subs}
@property @property
def inverse_concept_key_map(self) -> dict[str, ConceptName]: def inverse_concept_key_map(self) -> dict[str, ConceptName]:
...@@ -97,8 +114,14 @@ class MITMDefinition(pydantic.BaseModel): ...@@ -97,8 +114,14 @@ class MITMDefinition(pydantic.BaseModel):
def get_parent(self, concept: ConceptName) -> ConceptName | None: def get_parent(self, concept: ConceptName) -> ConceptName | None:
if concept in self.main_concepts: if concept in self.main_concepts:
return concept return concept
elif concept in (pm := self.parent_concepts_map): elif concept in (pcm := self.parent_concept_map):
return pm[concept] return pcm[concept]
def get_leafs(self, concept: ConceptName) -> set[ConceptName] | None:
if concept in (scm := self.sub_concept_map):
return scm[concept]
elif concept in self.leaf_concepts:
return {concept}
def get(self, concept: ConceptName) -> tuple[ConceptProperties, OwnedRelations]: def get(self, concept: ConceptName) -> tuple[ConceptProperties, OwnedRelations]:
a, b = self.get_properties(concept), self.get_relations(concept) a, b = self.get_properties(concept), self.get_relations(concept)
...@@ -228,14 +251,14 @@ class MITMDefinitionFile(pydantic.BaseModel): ...@@ -228,14 +251,14 @@ class MITMDefinitionFile(pydantic.BaseModel):
def to_definition(self) -> MITMDefinition: def to_definition(self) -> MITMDefinition:
main_concepts = set() main_concepts = set()
sub_concepts = {} sub_concept_map = {}
for c in self.concepts: for c in self.concepts:
if isinstance(c, str): if isinstance(c, str):
main_concepts.add(c) main_concepts.add(c)
elif isinstance(c, dict): elif isinstance(c, dict):
concept, sub = next(iter(c.items())) concept, sub = next(iter(c.items()))
main_concepts.add(concept) main_concepts.add(concept)
sub_concepts[concept] = set(sub) sub_concept_map[concept] = set(sub)
concept_relations = {c: self.owned_relations[c].to_definition() for c in main_concepts if concept_relations = {c: self.owned_relations[c].to_definition() for c in main_concepts if
c in self.owned_relations} c in self.owned_relations}
del c del c
...@@ -248,7 +271,7 @@ class MITMDefinitionFile(pydantic.BaseModel): ...@@ -248,7 +271,7 @@ class MITMDefinitionFile(pydantic.BaseModel):
concept_properties = {} concept_properties = {}
for c in main_concepts: for c in main_concepts:
parent_props = handle_properties(c) parent_props = handle_properties(c)
for sc in sub_concepts.get(c, []): for sc in sub_concept_map.get(c, []):
child_props = [] child_props = []
props = handle_properties(sc, **parent_props.inheritable_props()) props = handle_properties(sc, **parent_props.inheritable_props())
if not parent_props.permit_attributes: if not parent_props.permit_attributes:
...@@ -257,5 +280,6 @@ class MITMDefinitionFile(pydantic.BaseModel): ...@@ -257,5 +280,6 @@ class MITMDefinitionFile(pydantic.BaseModel):
child_props.append(props) child_props.append(props)
assert all(parent_props.typing_concept == props.typing_concept for props in child_props) assert all(parent_props.typing_concept == props.typing_concept for props in child_props)
return MITMDefinition(main_concepts=main_concepts, weak_concepts=self.weak_concepts, sub_concepts=sub_concepts, return MITMDefinition(main_concepts=main_concepts, weak_concepts=self.weak_concepts,
sub_concept_map=sub_concept_map,
concept_relations=concept_relations, concept_properties=concept_properties) concept_relations=concept_relations, concept_properties=concept_properties)
...@@ -50,7 +50,7 @@ def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_map ...@@ -50,7 +50,7 @@ def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_map
for column_group in concept_properties.column_group_ordering: for column_group in concept_properties.column_group_ordering:
match column_group: match column_group:
case 'kind' if any(elem_wise_eq(concept_properties.nature, (ConceptLevel.Sub, ConceptKind.Abstract))): case 'kind' if concept_properties.is_abstract or concept_properties.is_sub:
add_results([col_group_maps.get('kind', dummy)()]) add_results([col_group_maps.get('kind', dummy)()])
case 'type': case 'type':
add_results([col_group_maps.get('type', dummy)()]) add_results([col_group_maps.get('type', dummy)()])
......
from . import sql from . import sql, df
__all__ = ['sql'] __all__ = ['sql', 'df']
\ No newline at end of file \ No newline at end of file
from .unpack_intermediate import unpack_mitm_data from .intermediate_transformation import unpack_mitm_data, pack_mitm_dataset
\ No newline at end of file from . import intermediate_transformation
\ No newline at end of file
...@@ -40,7 +40,7 @@ def unpack_concept_table_as_typed_dfs(header: Header, concept: ConceptName, df: ...@@ -40,7 +40,7 @@ def unpack_concept_table_as_typed_dfs(header: Header, concept: ConceptName, df:
concept_properties, concept_relations = mitm_def.get(concept) concept_properties, concept_relations = mitm_def.get(concept)
with_header_entry = {} with_header_entry = {}
if concept in mitm_def.sub_concepts: if concept_properties.is_abstract: # e.g. MAED.observation
for (key, typ), idx in df.groupby(['kind', concept_properties.typing_concept]).groups.items(): for (key, typ), idx in df.groupby(['kind', concept_properties.typing_concept]).groups.items():
key, type_name = str(key), str(typ) key, type_name = str(key), str(typ)
specific_concept = mitm_def.inverse_concept_key_map[key] specific_concept = mitm_def.inverse_concept_key_map[key]
...@@ -71,6 +71,15 @@ def unpack_concept_table_as_typed_dfs(header: Header, concept: ConceptName, df: ...@@ -71,6 +71,15 @@ def unpack_concept_table_as_typed_dfs(header: Header, concept: ConceptName, df:
def unpack_mitm_data(mitm_data: MITMData) -> MITMDataset: def unpack_mitm_data(mitm_data: MITMData) -> MITMDataset:
mitm_data = mitm_data.as_specialized()
return MITMDataset(header=mitm_data.header, return MITMDataset(header=mitm_data.header,
dfs={concept: unpack_concept_table_as_typed_dfs(mitm_data.header, concept, df) for concept, df in dfs={concept: unpack_concept_table_as_typed_dfs(mitm_data.header, concept, df) for concept, df in
mitm_data}) mitm_data})
def pack_mitm_dataset(mitm_dataset: MITMDataset) -> MITMData:
return MITMData(header=mitm_dataset.header, concept_dfs={concept:
pack_typed_dfs_as_concept_table(
mitm_dataset.header.mitm, concept,
typed_dfs.values()) for concept, typed_dfs in
mitm_dataset if len(typed_dfs) > 1}).as_generalized()
...@@ -11,12 +11,12 @@ class TableDoesNotExist(Exception): ...@@ -11,12 +11,12 @@ class TableDoesNotExist(Exception):
pass pass
class OtherMeta(TypedDict): class AdditionalMeta(TypedDict):
default_schema: SchemaName default_schema: SchemaName
def connect_and_reflect(engine: Engine, meta: MetaData | None = None, allowed_schemas: Collection[str] | None = None, def connect_and_reflect(engine: Engine, meta: MetaData | None = None, allowed_schemas: Collection[str] | None = None,
reflect_kwargs: dict[str, Any] | None = None) -> tuple[MetaData, OtherMeta]: reflect_kwargs: dict[str, Any] | None = None) -> tuple[MetaData, AdditionalMeta]:
inspector = inspect(engine) inspector = inspect(engine)
schemas = inspector.get_schema_names() schemas = inspector.get_schema_names()
...@@ -33,7 +33,7 @@ def connect_and_reflect(engine: Engine, meta: MetaData | None = None, allowed_sc ...@@ -33,7 +33,7 @@ def connect_and_reflect(engine: Engine, meta: MetaData | None = None, allowed_sc
else: else:
meta.reflect(engine, **kwargs) meta.reflect(engine, **kwargs)
return meta, OtherMeta(default_schema=(inspector.default_schema_name if inspector.default_schema_name else next(iter(meta._schemas)))) return meta, AdditionalMeta(default_schema=(inspector.default_schema_name if inspector.default_schema_name else next(iter(meta._schemas))))
def derive_table_meta_info(sa_meta: MetaData, name: str, schema: SchemaName | None = None, def derive_table_meta_info(sa_meta: MetaData, name: str, schema: SchemaName | None = None,
......
...@@ -74,7 +74,7 @@ class Exportable(pydantic.BaseModel): ...@@ -74,7 +74,7 @@ class Exportable(pydantic.BaseModel):
filename = self.filename if self.filename else f'{self.mitm}.zip' filename = self.filename if self.filename else f'{self.mitm}.zip'
return StreamingZippedExport(mitm=self.mitm, filename=filename, return StreamingZippedExport(mitm=self.mitm, filename=filename,
streaming_mitm_data=StreamingMITMData(data_sources=data_sources)) streaming_mitm_data=StreamingMITMData(mitm=self.mitm, data_sources=data_sources))
class MappingExport(pydantic.BaseModel): class MappingExport(pydantic.BaseModel):
......
...@@ -45,7 +45,7 @@ class IndividualMappingValidationContext: ...@@ -45,7 +45,7 @@ class IndividualMappingValidationContext:
@property @property
def is_sub_concept(self) -> bool: def is_sub_concept(self) -> bool:
return self.claimed_concept in self.mitm_def.parent_concepts_map return self.claimed_concept in self.mitm_def.parent_concept_map
@property @property
def is_weak_concept(self) -> bool: def is_weak_concept(self) -> bool:
......
from .exporting import ZippedExport, StreamingZippedExport from .exporting import ZippedExport, StreamingZippedExport, write_zip
from .importing import ZippedImport, FolderImport from .importing import ZippedImport, FolderImport, read_zip
from . import exporting from . import exporting
from . import importing from . import importing
\ No newline at end of file
...@@ -15,7 +15,7 @@ from mitm_tooling.definition import MITM, ConceptName, get_mitm_def ...@@ -15,7 +15,7 @@ from mitm_tooling.definition import MITM, ConceptName, get_mitm_def
from mitm_tooling.representation.intermediate_representation import HeaderEntry, Header, StreamingConceptData, MITMData, \ from mitm_tooling.representation.intermediate_representation import HeaderEntry, Header, StreamingConceptData, MITMData, \
StreamingMITMData StreamingMITMData
from mitm_tooling.representation.file_representation import write_header_file, write_data_file from mitm_tooling.representation.file_representation import write_header_file, write_data_file
from mitm_tooling.utilities.io_utils import DataSink, ByteSink, use_bytes_io, ensure_ext from mitm_tooling.utilities.io_utils import DataSink, ByteSink, use_bytes_io, ensure_ext, FilePath
logger = logging.getLogger('api') logger = logging.getLogger('api')
...@@ -83,3 +83,7 @@ class StreamingZippedExport(FileExport): ...@@ -83,3 +83,7 @@ class StreamingZippedExport(FileExport):
with zf.open('header.csv', 'w') as hf: with zf.open('header.csv', 'w') as hf:
header_df = Header(mitm=self.mitm, header_entries=collected_header_entries).generate_header_df() header_df = Header(mitm=self.mitm, header_entries=collected_header_entries).generate_header_df()
write_header_file(header_df, hf) write_header_file(header_df, hf)
def write_zip(target: FilePath, mitm_data: MITMData):
return ZippedExport(mitm=mitm_data.header.mitm, filename=os.path.basename(target), mitm_data=mitm_data).write(target)
...@@ -6,4 +6,4 @@ from .file_representation import write_header_file, write_data_file, read_data_f ...@@ -6,4 +6,4 @@ from .file_representation import write_header_file, write_data_file, read_data_f
from .common import mk_concept_file_header from .common import mk_concept_file_header
from .intermediate_representation import HeaderEntry, Header, MITMData, StreamingMITMData, StreamingConceptData, ColumnName from .intermediate_representation import HeaderEntry, Header, MITMData, StreamingMITMData, StreamingConceptData, ColumnName
from .df_representation import MITMDataset from .df_representation import MITMDataset
from .sql_representation import mk_db_schema, insert_mitm_data, mk_sqlite from .sql_representation import mk_db_schema, insert_mitm_data, mk_sqlite, SQLRepresentationSchema
...@@ -8,11 +8,14 @@ from typing import TYPE_CHECKING, Self, Any ...@@ -8,11 +8,14 @@ from typing import TYPE_CHECKING, Self, Any
import pandas as pd import pandas as pd
import pydantic import pydantic
from numba.np.npdatetime_helpers import combine_datetime_timedelta_units
from pydantic import ConfigDict from pydantic import ConfigDict
from mitm_tooling.definition import get_mitm_def from mitm_tooling.definition import get_mitm_def
from mitm_tooling.data_types.data_types import MITMDataType from mitm_tooling.data_types.data_types import MITMDataType
from mitm_tooling.definition.definition_representation import ConceptName, MITM from mitm_tooling.definition.definition_representation import ConceptName, MITM
from utilities.python_utils import take_first
from .common import guess_k_of_header_df, mk_header_file_columns from .common import guess_k_of_header_df, mk_header_file_columns
logger = logging.getLogger('api') logger = logging.getLogger('api')
...@@ -97,6 +100,26 @@ class MITMData(Iterable[tuple[ConceptName, pd.DataFrame]], pydantic.BaseModel): ...@@ -97,6 +100,26 @@ class MITMData(Iterable[tuple[ConceptName, pd.DataFrame]], pydantic.BaseModel):
def __iter__(self): def __iter__(self):
return iter(self.concept_dfs.items()) return iter(self.concept_dfs.items())
def as_generalized(self) -> Self:
mitm_def = get_mitm_def(self.header.mitm)
dfs = defaultdict(list)
for c, df in self:
c = mitm_def.get_parent(c)
dfs[c].append(df)
return MITMData(header=self.header, dfs=dict(dfs))
def as_specialized(self) -> Self:
mitm_def = get_mitm_def(self.header.mitm)
dfs = defaultdict(list)
for c, df in self:
if mitm_def.get_properties(c).is_abstract:
leaf_concepts = mitm_def.get_leafs(c)
for sub_c, idx in df.groupby('kind').groups.items():
dfs[sub_c].append(df.loc[idx])
else:
dfs[c].append(df)
return MITMData(header=self.header, dfs=dict(dfs))
class StreamingConceptData(pydantic.BaseModel): class StreamingConceptData(pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
...@@ -108,7 +131,24 @@ class StreamingConceptData(pydantic.BaseModel): ...@@ -108,7 +131,24 @@ class StreamingConceptData(pydantic.BaseModel):
class StreamingMITMData(Iterable[tuple[ConceptName, StreamingConceptData]], pydantic.BaseModel): class StreamingMITMData(Iterable[tuple[ConceptName, StreamingConceptData]], pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
mitm: MITM
data_sources: dict[ConceptName, StreamingConceptData] = pydantic.Field(default_factory=dict) data_sources: dict[ConceptName, StreamingConceptData] = pydantic.Field(default_factory=dict)
def __iter__(self): def __iter__(self):
return iter(self.data_sources.items()) return iter(self.data_sources.items())
def as_generalized(self) -> Self:
mitm_def = get_mitm_def(self.mitm)
combined_data_sources = defaultdict(list)
for c, ds in self:
combined_data_sources[mitm_def.get_parent(c)].append(ds)
data_sources = {}
for c, ds_list in combined_data_sources.items():
structure_dfs = [ds.structure_df for ds in ds_list]
assert all(
a.equals(b) for a, b in
zip(structure_dfs[:-1],
structure_dfs[1:])), f'concept {c} not generalizable in {self} (structure_dfs differ)'
data_sources[c] = StreamingConceptData(structure_df=take_first(structure_dfs),
chunk_iterators=[it for ds in ds_list for it in ds.chunk_iterators])
return StreamingMITMData(mitm=self.mitm, data_sources=data_sources)
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable, Iterator, Generator, Mapping from collections.abc import Callable, Iterator, Generator, Mapping
import pydantic
import sqlalchemy as sa import sqlalchemy as sa
import sqlalchemy.sql.schema import sqlalchemy.sql.schema
from pydantic_core import Url from pydantic import AnyUrl
from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRelations
from mitm_tooling.data_types import MITMDataType from mitm_tooling.data_types import MITMDataType
from mitm_tooling.definition import ConceptName, MITM, get_mitm_def, ConceptKind, ConceptLevel, RelationName from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRelations, ConceptName, MITM, get_mitm_def, \
ConceptKind, ConceptLevel, RelationName
from mitm_tooling.definition.definition_tools import map_col_groups, ColGroupMaps from mitm_tooling.definition.definition_tools import map_col_groups, ColGroupMaps
from mitm_tooling.extraction.sql.data_models import Queryable, TableName, ColumnName
from .df_representation import MITMDataset from .df_representation import MITMDataset
from .intermediate_representation import Header, MITMData, ColumnName from .intermediate_representation import Header, MITMData
from mitm_tooling.utilities.sql_utils import create_sa_engine from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify
from mitm_tooling.utilities import python_utils from mitm_tooling.utilities import python_utils
from mitm_tooling.utilities.sql_utils import qualify
from sqlalchemy_utils.view import create_view
SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main'
def mk_concept_table_name(mitm: MITM, concept: ConceptName) -> str: def mk_concept_table_name(mitm: MITM, concept: ConceptName) -> TableName:
return get_mitm_def(mitm).get_properties(concept).plural return get_mitm_def(mitm).get_properties(concept).plural
def mk_type_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName) -> str: def mk_type_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName) -> TableName:
return get_mitm_def(mitm).get_properties(concept).key + '_' + type_name.lower() return get_mitm_def(mitm).get_properties(concept).key + '_' + type_name.lower()
def mk_link_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName, fk_name: RelationName) -> str: def mk_link_table_name(mitm: MITM, concept: ConceptName, type_name: RelationName, fk_name: RelationName) -> TableName:
return mk_type_table_name(mitm, concept, type_name) + '_' + fk_name.lower() return mk_type_table_name(mitm, concept, type_name) + '_' + fk_name.lower()
...@@ -46,8 +50,8 @@ def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: Mapping[Rel ...@@ -46,8 +50,8 @@ def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: Mapping[Rel
return python_utils.pick_from_mapping(created_columns, names) return python_utils.pick_from_mapping(created_columns, names)
def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: str, col_group_maps: ColGroupMaps, def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps,
additional_schema_item_maker: Callable[ gen_additional_schema_items: Callable[
[MITM, ConceptName, ConceptProperties, OwnedRelations, [MITM, ConceptName, ConceptProperties, OwnedRelations,
dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]], dict[RelationName, sa.Column], list[tuple[RelationName, sa.Column]]],
Generator[ Generator[
...@@ -65,21 +69,58 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: st ...@@ -65,21 +69,58 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: st
if concept_relations.identity: if concept_relations.identity:
constraints.append(sa.PrimaryKeyConstraint(*python_utils.i_th(1)(ref_columns))) constraints.append(sa.PrimaryKeyConstraint(*python_utils.i_th(1)(ref_columns)))
if additional_schema_item_maker: if gen_additional_schema_items:
constraints.extend(iter( schema_items = gen_additional_schema_items(mitm, concept, concept_properties, concept_relations,
additional_schema_item_maker(mitm, concept, concept_properties, concept_relations, created_columns, created_columns,
ref_columns))) ref_columns)
constraints.extend(schema_items)
print(constraints) print(constraints)
return sa.Table(table_name, meta, *columns, *constraints), created_columns, ref_columns return sa.Table(table_name, meta, schema=SQL_REPRESENTATION_DEFAULT_SCHEMA, *columns,
*constraints), created_columns, ref_columns
def gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, concept_properties: ConceptProperties,
concept_relations: OwnedRelations, created_columns: dict[RelationName, sa.Column],
ref_columns: list[tuple[RelationName, sa.Column]]) -> Generator[
sa.sql.schema.SchemaItem, None, None]:
# self_fk
parent_table = mk_concept_table_name(mitm, concept)
cols, refcols = zip(
*((c, qualify(table=parent_table, column=s)) for s, c in ref_columns))
yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols)
for fk_name, fk_info in concept_relations.foreign.items():
cols, refcols = zip(*fk_info.fk_relations.items())
fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[
# sa.literal_column(qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c))
qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)
for c in refcols])
yield fkc
ConceptTablesDict = dict[ConceptName, sa.Table]
ViewsDict = dict[TableName, sa.Table]
ConceptTypeTablesDict = dict[ConceptName, dict[TableName, sa.Table]]
def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[str, sa.Table]]]: class SQLRepresentationSchema(pydantic.BaseModel):
meta: sa.MetaData
concept_tables: ConceptTablesDict
type_tables: ConceptTypeTablesDict
views: ViewsDict
def mk_db_schema(header: Header, gen_views: Callable[
[MITM, MITMDefinition, ConceptTablesDict, ConceptTypeTablesDict],
Generator[
tuple[
TableName, Queryable], None, None]] | None = None) -> SQLRepresentationSchema:
mitm_def = get_mitm_def(header.mitm) mitm_def = get_mitm_def(header.mitm)
meta = sa.MetaData() meta = sa.MetaData(schema=SQL_REPRESENTATION_DEFAULT_SCHEMA)
tables: dict[ConceptName, dict[str, sa.Table]] = {} concept_tables: ConceptTablesDict = {}
views: dict[ConceptName, sa.Table] = {} type_tables: ConceptTypeTablesDict = {}
views: dict[str, sa.Table] = {}
for concept in mitm_def.main_concepts: for concept in mitm_def.main_concepts:
concept_properties, concept_relations = mitm_def.get(concept) concept_properties, concept_relations = mitm_def.get(concept)
...@@ -106,21 +147,6 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st ...@@ -106,21 +147,6 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st
if has_type_tables(header.mitm, he_concept): if has_type_tables(header.mitm, he_concept):
concept_properties, concept_relations = mitm_def.get(he_concept) concept_properties, concept_relations = mitm_def.get(he_concept)
def foreign_key_constraints(mitm, concept, concept_properties, concept_relations, created_columns,
ref_columns):
# self_fk
parent_table = mk_concept_table_name(mitm, concept)
cols, refcols = zip(
*((c, qualify(table=parent_table, column=s)) for s, c in ref_columns))
yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols)
for fk_name, fk_info in concept_relations.foreign.items():
cols, refcols = zip(*fk_info.fk_relations.items())
fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[
# sa.literal_column(qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c))
qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)
for c in refcols])
yield fkc
table_name = mk_type_table_name(header.mitm, he_concept, he.type_name) table_name = mk_type_table_name(header.mitm, he_concept, he.type_name)
t, t_columns, t_ref_columns = mk_table(meta, header.mitm, he_concept, table_name, { t, t_columns, t_ref_columns = mk_table(meta, header.mitm, he_concept, table_name, {
...@@ -138,21 +164,25 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st ...@@ -138,21 +164,25 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st
resolved_fk.items()], resolved_fk.items()],
'attributes': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in 'attributes': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in
zip(he.attributes, he.attribute_dtypes)], zip(he.attributes, he.attribute_dtypes)],
}, additional_schema_item_maker=foreign_key_constraints) }, gen_additional_schema_items=gen_foreign_key_constraints)
if he_concept not in tables: if he_concept not in type_tables:
tables[he_concept] = {} type_tables[he_concept] = {}
tables[he_concept][he.type_name] = t type_tables[he_concept][he.type_name] = t
# for concept, members in concept_level_view_members.items(): # for concept, members in concept_level_view_members.items():
if gen_views:
for name, queryable in gen_views(header.mitm, mitm_def, concept_tables, type_tables):
views[name] = create_view(name, queryable, meta)
# view_selection = sa.union_all(*(sa.select(*pk_cols) for pk_cols in members)) # view_selection = sa.union_all(*(sa.select(*pk_cols) for pk_cols in members))
# views[concept] = view.create_materialized_view(mk_concept_table_name(header.mitm, concept), view_selection, # views[concept] = view.create_materialized_view(mk_concept_table_name(header.mitm, concept), view_selection,
# meta) # meta)
return meta, tables # , views return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views)
def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMData): def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMData):
...@@ -160,35 +190,30 @@ def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMDat ...@@ -160,35 +190,30 @@ def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMDat
h = mitm_data.header h = mitm_data.header
mitm = mitm_data.header.mitm mitm = mitm_data.header.mitm
for concept, df in mitm_data: for concept, df in mitm_data.as_specialized():
concept_table = mk_concept_table_name(mitm, concept) concept_table = mk_concept_table_name(mitm, concept)
t_concept = meta.tables[concept_table] t_concept = meta.tables[concept_table]
ref_cols = pick_table_pk(mitm, concept, t_concept.columns) ref_cols = pick_table_pk(mitm, concept, t_concept.columns)
parent_insert = t_concept.insert().values(df[[c.name for c in t_concept.columns]].to_dict('records')) conn.execute(t_concept.insert(), df[[c.name for c in t_concept.columns]].to_dict('records'))
conn.execute(parent_insert)
if has_type_tables(mitm, concept): if has_type_tables(mitm, concept):
concept_properties, concept_relations = get_mitm_def(mitm).get(concept) concept_properties, concept_relations = get_mitm_def(mitm).get(concept)
for typ, idx in df.groupby(concept_properties.typing_concept).groups.items(): for typ, idx in df.groupby(concept_properties.typing_concept).groups.items():
type_df = df.loc[idx] type_df = df.loc[idx]
t_type = meta.tables[mk_type_table_name(mitm, concept, str(typ))] t_type = meta.tables[mk_type_table_name(mitm, concept, str(typ))]
sub_insert = t_type.insert().values(type_df[[c.name for c in t_type.columns]].to_dict('records')) conn.execute(t_type.insert(), type_df[[c.name for c in t_type.columns]].to_dict('records'))
conn.execute(sub_insert)
conn.commit() conn.commit()
def insert_mitm_data(engine: sa.Engine, mitm_data: MITMData) -> tuple[ def insert_mitm_data(engine: sa.Engine, mitm_data: MITMData) -> SQLRepresentationSchema:
sa.MetaData, dict[ConceptName, dict[str, sa.Table]]]: sql_rep_schema = mk_db_schema(mitm_data.header)
meta, tables = mk_db_schema(mitm_data.header) sql_rep_schema.meta.create_all(engine)
meta.create_all(engine) insert_db_instances(engine, sql_rep_schema.meta, mitm_data)
insert_db_instances(engine, meta, mitm_data) return sql_rep_schema
return meta, tables
def mk_sqlite(mitm_data: MITMData, file_path: str | None = ':memory:') -> tuple[ def mk_sqlite(mitm_data: MITMData, file_path: str | None = ':memory:') -> tuple[sa.Engine, SQLRepresentationSchema]:
sa.Engine, sa.MetaData, dict[ConceptName, dict[str, sa.Table]]]: engine = create_sa_engine(AnyUrl(f'sqlite:///{file_path}'))
engine = create_sa_engine(Url(f'sqlite:///{file_path}')) sql_rep_schema = insert_mitm_data(engine, mitm_data)
meta, tables = insert_mitm_data(engine, mitm_data) # print([f'{t.name}: {t.columns} {t.constraints}' for ts in sql_rep_schema.type_tables.values() for t in ts.values()])
print(meta.tables) return engine, sql_rep_schema
print([f'{t.name}: {t.columns} {t.constraints}' for ts in tables.values() for t in ts.values()])
return engine, meta, tables
from . import superset
__all__ = ['superset']
\ No newline at end of file
from .superset_representation import mk_inferred_superset_dataset_def, mk_superset_dataset_def
from . import dataset_definition, superset_representation
\ No newline at end of file
from abc import ABC, abstractmethod
from datetime import datetime, tzinfo
from typing import Any, Annotated
import pydantic
from uuid import UUID
from mitm_tooling.data_types import MITMDataType
BetterUUID = Annotated[
UUID,
pydantic.BeforeValidator(lambda x: UUID(x) if isinstance(x, str) else x),
pydantic.PlainSerializer(lambda x: str(x)),
pydantic.Field(
description="Better annotation for UUID, parses from string format. Serializes to string format."
),
]
class SupersetDefFile(pydantic.BaseModel, ABC):
model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)
@property
@abstractmethod
def filename(self) -> str:
pass
class SupersetDatabaseDef(SupersetDefFile):
database_name: str
sqlalchemy_uri: pydantic.AnyUrl
uuid: BetterUUID
cache_timeout: str | None = None
expose_in_sqllab: bool = True
allow_run_async: bool = False
allow_ctas: bool = False
allow_cvas: bool = False
allow_dml: bool = False
allow_file_upload: bool = False
extra: dict[str, Any] = pydantic.Field(default_factory=lambda: {
'allows_virtual_table_explore': True
})
impersonate_user: bool = False
version: str = '1.0.0'
@property
def filename(self):
return self.database_name
class SupersetMetricDef(pydantic.BaseModel):
metric_name: str
verbose_name: str
expression: str
metric_type: str | None = None
description: str | None = None
d3format: str | None = None
currency: str | None = None
extra: dict[str, Any] = pydantic.Field(default_factory=dict)
warning_text: str | None = None
class SupersetColumnDef(pydantic.BaseModel):
column_name: str
verbose_name: str | None = None
is_dttm: bool = False
is_active: bool = True
type: str = str(MITMDataType.Text.sa_sql_type)
advanced_data_type: str | None = None
groupby: bool = True
filterable: bool = True
expression: str | None = None
description: str | None = None
python_date_format: str = None
extra: dict[str, Any] = pydantic.Field(default_factory=dict)
class SupersetTableDef(SupersetDefFile):
model_config = pydantic.ConfigDict(populate_by_name=True)
table_name: str
schema_name: str = pydantic.Field(alias='schema')
uuid: BetterUUID
main_dttm_col: str | None = None
description: str | None = None
default_endpoint: str | None = None
offset: int = 0
cache_timeout: str | None = None
catalog: str | None = None
sql: str = ''
params: Any = None
template_params: Any = None
filter_select_enabled: bool = True
fetch_values_predicate: str | None = None
extra: dict[str, Any] = pydantic.Field(default_factory=dict)
normalize_columns: bool = False
always_filter_main_dttm: bool = False
metrics: list[SupersetMetricDef] = pydantic.Field(default_factory=list)
columns: list[SupersetColumnDef] = pydantic.Field(default_factory=list)
@property
def filename(self):
return self.table_name
BetterDatetime = Annotated[datetime,
pydantic.BeforeValidator(lambda x: datetime.fromisoformat(x) if isinstance(x, str) else x),
pydantic.PlainSerializer(lambda x: str(x)),
pydantic.Field(
description="Better annotation for datetime, parses from string format. Serializes to string format."
)]
class SupersetMetadataDef(SupersetDefFile):
version: str = '1.0.0'
type: str = 'Database'
timestamp: BetterDatetime = pydantic.Field(default_factory=datetime.utcnow)
@property
def filename(self) -> str:
return 'metadata'
class SupersetDef(pydantic.BaseModel):
database: SupersetDatabaseDef
datasets: list[SupersetTableDef]
metadata: SupersetMetadataDef = pydantic.Field(default_factory=SupersetMetadataDef)
def to_folder_structure(self) -> dict[str, Any]:
db_name = self.database.database_name
db_folder = {db_name: self.database}
datasets = list(self.datasets)
dataset_folder = {db_name: datasets}
return {'databases': [self.database], 'datasets': dataset_folder, '.': self.metadata}
import os.path
import uuid
import zipfile
import sqlalchemy as sa
import yaml
from pydantic import AnyUrl
from mitm_tooling.extraction.sql.data_models import DBMetaInfo
from mitm_tooling.extraction.sql.db import create_sa_engine, connect_and_reflect
from mitm_tooling.utilities.io_utils import DataSink, FilePath, ByteSink, use_bytes_io
from mitm_tooling.representation import MITMData, mk_sqlite, mk_db_schema
from mitm_tooling.representation.sql_representation import MITMData, mk_sqlite, mk_db_schema, \
SQL_REPRESENTATION_DEFAULT_SCHEMA
from mitm_tooling.data_types import MITMDataType
from .dataset_definition import SupersetTableDef, SupersetColumnDef, SupersetDatabaseDef, SupersetDef, SupersetDefFile
def tentative_superset_mount_url(db_name: str) -> AnyUrl:
return AnyUrl(f'sqlite:////mounted-files/{db_name}.sqlite?check_same_thread=false')
def write_superset_def_as_zip(target: ByteSink, superset_def: SupersetDef):
folder_structure = superset_def.to_folder_structure()
with use_bytes_io(target, expected_file_ext='.zip', mode='wb', create_file_if_necessary=True) as f:
with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zf:
def mk_node(arg, prefix: str | None = None):
if isinstance(arg, SupersetDefFile):
fn = f'{arg.filename}.yaml'
if prefix:
fn = os.path.join(prefix, fn)
dump = arg.model_dump(by_alias=True, mode='python')
s = yaml.dump(dump, default_flow_style=False)
zf.writestr(fn, s)
# with zf.open(fn, 'w') as df:
# yaml.dump(dump, df)
elif isinstance(arg, list):
for arg in arg:
mk_node(arg, prefix=prefix)
elif isinstance(arg, dict):
for folder, folder_content in arg.items():
path = None
if folder != '.' and prefix:
path = os.path.join(prefix, folder)
elif prefix:
path = prefix
elif folder != '.':
path = folder
if folder != '.':
zf.mkdir(path)
mk_node(folder_content, prefix=path)
mk_node(folder_structure)
def write_superset_def(output_path: FilePath, superset_def: SupersetDef):
write_superset_def_as_zip(output_path, superset_def)
def infer_superset_dataset_def(sqlite_file_path: FilePath) -> SupersetDef:
engine = create_sa_engine(AnyUrl(f'sqlite:///{str(sqlite_file_path)}'))
meta, _ = connect_and_reflect(engine)
db_meta = DBMetaInfo.from_sa_meta(meta, default_schema=SQL_REPRESENTATION_DEFAULT_SCHEMA)
datasets = []
for schema_name, schema_tables in db_meta.db_structure.items():
for table_name, table in schema_tables.items():
cols = []
for c in table.columns:
dt = table.column_properties[c].mitm_data_type
cols.append(
SupersetColumnDef(column_name=c,
is_dttm=dt is MITMDataType.Datetime,
groupby=dt not in {MITMDataType.Json,
MITMDataType.Numeric,
MITMDataType.Datetime},
type=str(dt.sa_sql_type) # .as_generic()) #.dialect_impl(sa.Dialect.get_dialect_cls(sa.URL.create(drivername='sqlite', database=':memory:'))()
))
datasets.append(
SupersetTableDef(table_name=table_name, schema_name=schema_name, uuid=uuid.uuid4(), columns=cols))
db_name = os.path.splitext(os.path.basename(sqlite_file_path))[0]
return SupersetDef(
database=SupersetDatabaseDef(database_name=db_name,
sqlalchemy_uri=tentative_superset_mount_url(db_name),
uuid=uuid.uuid4()),
datasets=datasets)
def mk_inferred_superset_dataset_def(output_path: FilePath, sqlite_file_path: FilePath):
write_superset_def(output_path, infer_superset_dataset_def(sqlite_file_path))
def mk_superset_dataset_def(mitm_data: MITMData, sqlite_file_path: str | None = ':memory:',
definition_file_path: str | None = 'superset_definition.zip'):
engine, sql_rep_schema = mk_sqlite(mitm_data, file_path=sqlite_file_path)
mk_inferred_superset_dataset_def(definition_file_path, sqlite_file_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment