Skip to content
Snippets Groups Projects
Commit d85ecf1d authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

added meta table to sql rep

parent 17a2122b
Branches
Tags
No related merge requests found
...@@ -8,7 +8,7 @@ lock: ...@@ -8,7 +8,7 @@ lock:
uv lock uv lock
sync: sync:
uv sync uv sync --group dev
build: build:
uv build uv build
......
...@@ -18,12 +18,12 @@ MultiMapper = Callable[[], list[T] | Iterable[tuple[str, T]]] ...@@ -18,12 +18,12 @@ MultiMapper = Callable[[], list[T] | Iterable[tuple[str, T]]]
class ColGroupMaps(TypedDict, Generic[T], total=False): class ColGroupMaps(TypedDict, Generic[T], total=False):
kind: Mapper | None kind: Mapper[T] | None
type: Mapper | None type: Mapper[T] | None
identity: MultiMapper | None identity: MultiMapper[T] | None
inline: MultiMapper | None inline: MultiMapper[T] | None
foreign: MultiMapper | None foreign: MultiMapper[T] | None
attributes: MultiMapper | None attributes: MultiMapper[T] | None
def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_maps: ColGroupMaps[T], def map_col_groups(mitm_def: MITMDefinition, concept: ConceptName, col_group_maps: ColGroupMaps[T],
......
...@@ -5,7 +5,7 @@ import pydantic ...@@ -5,7 +5,7 @@ import pydantic
from pydantic import ConfigDict from pydantic import ConfigDict
from mitm_tooling.definition import ConceptName, TypeName from mitm_tooling.definition import ConceptName, TypeName
from .intermediate_representation import Header from .intermediate_representation import Header, HeaderEntry
class MITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel): class MITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel):
...@@ -26,3 +26,6 @@ class StreamingMITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.Data ...@@ -26,3 +26,6 @@ class StreamingMITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.Data
def __iter__(self): def __iter__(self):
return iter(self.df_iters.items()) return iter(self.df_iters.items())
MitMDataFrameStream = Iterable[tuple[ConceptName, Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]]]
TypedMitMDataFrameStream = Iterable[tuple[ConceptName, Iterable[tuple[TypeName, HeaderEntry, Iterable[pd.DataFrame]]]]]
\ No newline at end of file
...@@ -2,7 +2,7 @@ from __future__ import annotations ...@@ -2,7 +2,7 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable, Iterable from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING, Generator, Any, Sequence from typing import TYPE_CHECKING, Generator, Any, Sequence, TypedDict
import pandas as pd import pandas as pd
import pydantic import pydantic
...@@ -15,7 +15,8 @@ from mitm_tooling.definition import RelationName, MITMDefinition ...@@ -15,7 +15,8 @@ from mitm_tooling.definition import RelationName, MITMDefinition
from mitm_tooling.definition.definition_tools import ColGroupMaps from mitm_tooling.definition.definition_tools import ColGroupMaps
from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify, use_nested_conn from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify, use_nested_conn
from .common import * from .common import *
from .intermediate_representation import Header, MITMData, TypeName from .df_representation import TypedMitMDataFrameStream
from .intermediate_representation import Header, MITMData, TypeName, HeaderEntry
from .sql.common import * from .sql.common import *
from ..utilities.backports.sqlchemy_sql_views import create_view from ..utilities.backports.sqlchemy_sql_views import create_view
from ..utilities.io_utils import FilePath from ..utilities.io_utils import FilePath
...@@ -26,13 +27,20 @@ if TYPE_CHECKING: ...@@ -26,13 +27,20 @@ if TYPE_CHECKING:
SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main' SQL_REPRESENTATION_DEFAULT_SCHEMA = 'main'
class HeaderMetaTables(TypedDict):
header_meta_definition: sa.Table
header_meta_types: sa.Table
header_meta_type_attributes: sa.Table
ColumnsDict = dict[RelationName, sa.Column] ColumnsDict = dict[RelationName, sa.Column]
ViewsDict = dict[TableName, sa.Table] ViewsDict = dict[TableName, sa.Table]
ConceptTablesDict = dict[ConceptName, sa.Table] ConceptTablesDict = dict[ConceptName, sa.Table]
ConceptTypeTablesDict = dict[ConceptName, dict[TypeName, sa.Table]] ConceptTypeTablesDict = dict[ConceptName, dict[TypeName, sa.Table]]
MitMConceptSchemaItemGenerator = Callable[ MitMConceptSchemaItemGenerator = Callable[
[MITM, ConceptName, TableName, ColumnsDict, ColumnsDict | None], Generator[ [MITM, ConceptName, SchemaName, TableName, ColumnsDict, ColumnsDict | None], Generator[
sqlalchemy.sql.schema.SchemaItem, None, None]] sqlalchemy.sql.schema.SchemaItem, None, None]]
MitMConceptColumnGenerator = Callable[ MitMConceptColumnGenerator = Callable[
[MITM, ConceptName], Generator[tuple[str, sa.Column], None, None]] [MITM, ConceptName], Generator[tuple[str, sa.Column], None, None]]
...@@ -44,6 +52,10 @@ Generator[ ...@@ -44,6 +52,10 @@ Generator[
ARTIFICIAL_ROW_ID_PREFIX = 'row' ARTIFICIAL_ROW_ID_PREFIX = 'row'
def _prefix_col_name(prefix: str, name: str) -> str:
return f'{prefix}_{name}'
def _get_unique_id_col_name(prefix: str | None = None) -> str: def _get_unique_id_col_name(prefix: str | None = None) -> str:
return '__' + ((prefix + '_') if prefix else '') + 'id' return '__' + ((prefix + '_') if prefix else '') + 'id'
...@@ -56,10 +68,11 @@ def _within_concept_id_col(mitm: MITM, concept: ConceptName) -> str: ...@@ -56,10 +68,11 @@ def _within_concept_id_col(mitm: MITM, concept: ConceptName) -> str:
class SQLRepresentationSchema(pydantic.BaseModel): class SQLRepresentationSchema(pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
meta: sa.MetaData sa_meta: sa.MetaData
concept_tables: ConceptTablesDict meta_tables: HeaderMetaTables | None = None
type_tables: ConceptTypeTablesDict concept_tables: ConceptTablesDict = pydantic.Field(default_factory=ConceptTablesDict)
views: ViewsDict type_tables: ConceptTypeTablesDict = pydantic.Field(default_factory=ConceptTypeTablesDict)
views: ViewsDict = pydantic.Field(default_factory=ViewsDict)
def mk_concept_table_name(mitm: MITM, concept: ConceptName) -> TableName: def mk_concept_table_name(mitm: MITM, concept: ConceptName) -> TableName:
...@@ -98,25 +111,41 @@ def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict ...@@ -98,25 +111,41 @@ def pick_table_pk(mitm: MITM, concept: ConceptName, created_columns: ColumnsDict
return {n: created_columns[n] for n in names} return {n: created_columns[n] for n in names}
def _gen_unique_constraint(mitm: MITM, concept: ConceptName, table_name: TableName, created_columns: ColumnsDict, def _gen_unique_constraint(mitm: MITM,
concept: ConceptName,
schema_name: SchemaName,
table_name: TableName,
created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[ pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]: sa.sql.schema.SchemaItem, None, None]:
yield sa.UniqueConstraint(*pk_columns.values()) yield sa.UniqueConstraint(*pk_columns.values())
def _gen_pk_constraint(mitm: MITM, concept: ConceptName, table_name: TableName, created_columns: ColumnsDict, def _gen_pk_constraint(mitm: MITM,
concept: ConceptName,
schema_name: SchemaName,
table_name: TableName,
created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[ pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]: sa.sql.schema.SchemaItem, None, None]:
yield sa.PrimaryKeyConstraint(*pk_columns.values()) yield sa.PrimaryKeyConstraint(*pk_columns.values())
def _gen_index(mitm: MITM, concept: ConceptName, table_name: TableName, created_columns: ColumnsDict, def _gen_index(mitm: MITM,
concept: ConceptName,
schema_name: SchemaName,
table_name: TableName,
created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[ pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]: sa.sql.schema.SchemaItem, None, None]:
yield sa.Index(f'{table_name}.index', *pk_columns.values(), unique=True) yield sa.Index(f'{table_name}.index', *pk_columns.values(), unique=True)
def _gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, table_name: TableName, created_columns: ColumnsDict, def _gen_foreign_key_constraints(mitm: MITM,
concept: ConceptName,
schema_name: SchemaName,
table_name: TableName,
created_columns: ColumnsDict,
pk_columns: ColumnsDict | None) -> Generator[ pk_columns: ColumnsDict | None) -> Generator[
sa.sql.schema.SchemaItem, None, None]: sa.sql.schema.SchemaItem, None, None]:
mitm_def = get_mitm_def(mitm) mitm_def = get_mitm_def(mitm)
...@@ -127,13 +156,13 @@ def _gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, table_name: T ...@@ -127,13 +156,13 @@ def _gen_foreign_key_constraints(mitm: MITM, concept: ConceptName, table_name: T
parent_concept = mitm_def.get_parent(concept) parent_concept = mitm_def.get_parent(concept)
parent_table = mk_concept_table_name(mitm, parent_concept) parent_table = mk_concept_table_name(mitm, parent_concept)
cols, refcols = zip( cols, refcols = zip(
*((c, qualify(table=parent_table, column=s)) for s, c in pk_columns.items())) *((c, qualify(schema=schema_name, table=parent_table, column=s)) for s, c in pk_columns.items()))
yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols) yield sa.ForeignKeyConstraint(name='parent', columns=cols, refcolumns=refcols)
for fk_name, fk_info in concept_relations.foreign.items(): for fk_name, fk_info in concept_relations.foreign.items():
cols, refcols = zip(*fk_info.fk_relations.items()) cols, refcols = zip(*fk_info.fk_relations.items())
fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[ fkc = sa.ForeignKeyConstraint(name=fk_name, columns=[created_columns[c] for c in cols], refcolumns=[
qualify(table=mk_concept_table_name(mitm, fk_info.target_concept), column=c) qualify(schema=schema_name, table=mk_concept_table_name(mitm, fk_info.target_concept), column=c)
for c in refcols]) for c in refcols])
yield fkc yield fkc
...@@ -150,7 +179,11 @@ def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tu ...@@ -150,7 +179,11 @@ def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tu
_column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,) _column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,)
def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: TableName, col_group_maps: ColGroupMaps, def mk_table(meta: sa.MetaData,
mitm: MITM,
concept: ConceptName,
table_name: TableName,
col_group_maps: ColGroupMaps[sa.Column],
additional_column_generators: Iterable[MitMConceptColumnGenerator] | None = ( additional_column_generators: Iterable[MitMConceptColumnGenerator] | None = (
_gen_within_concept_id_col,), _gen_within_concept_id_col,),
schema_item_generators: Iterable[MitMConceptSchemaItemGenerator] | schema_item_generators: Iterable[MitMConceptSchemaItemGenerator] |
...@@ -159,6 +192,7 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: Ta ...@@ -159,6 +192,7 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: Ta
tuple[ tuple[
sa.Table, ColumnsDict, ColumnsDict]: sa.Table, ColumnsDict, ColumnsDict]:
mitm_def = get_mitm_def(mitm) mitm_def = get_mitm_def(mitm)
schema = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
prepended_cols = None prepended_cols = None
if additional_column_generators is not None: if additional_column_generators is not None:
...@@ -172,15 +206,45 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: Ta ...@@ -172,15 +206,45 @@ def mk_table(meta: sa.MetaData, mitm: MITM, concept: ConceptName, table_name: Ta
schema_items: list[sa.sql.schema.SchemaItem] = [] schema_items: list[sa.sql.schema.SchemaItem] = []
if schema_item_generators is not None: if schema_item_generators is not None:
for generator in schema_item_generators: for generator in schema_item_generators:
schema_items.extend(generator(mitm, concept, table_name, created_columns, pk_cols)) schema_items.extend(generator(mitm, concept, schema, table_name, created_columns, pk_cols))
return sa.Table(table_name, meta, schema=override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA, return sa.Table(table_name, meta,
*columns, *columns,
*schema_items), created_columns, pk_cols *schema_items,
schema=schema), created_columns, pk_cols
def _prefix_col_name(prefix: str, name: str) -> str:
return f'{prefix}_{name}' def mk_header_tables(meta: sa.MetaData,
override_schema: SchemaName | None = None) -> HeaderMetaTables:
schema = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
header_meta_types = sa.Table('header_meta_types', meta,
sa.Column('kind', MITMDataType.Text.sa_sql_type, primary_key=True),
sa.Column('type', MITMDataType.Text.sa_sql_type, primary_key=True),
sa.Column('concept', MITMDataType.Text.sa_sql_type),
schema=schema
)
header_meta_type_attributes = sa.Table('header_meta_type_attributes', meta,
sa.Column('kind', MITMDataType.Text.sa_sql_type, primary_key=True),
sa.Column('type', MITMDataType.Text.sa_sql_type, primary_key=True),
sa.Column('attribute_order', MITMDataType.Integer.sa_sql_type, primary_key=True),
sa.Column('attribute_name', MITMDataType.Text.sa_sql_type),
sa.Column('attribute_data_type', MITMDataType.Text.sa_sql_type),
sa.ForeignKeyConstraint(name='header_meta_type',
columns=['kind', 'type'],
refcolumns=[header_meta_types.c.kind,
header_meta_types.c.type]),
schema=schema
)
header_meta_definition = sa.Table('header_meta_definition',
meta,
sa.Column('mitm', MITMDataType.Text.sa_sql_type, primary_key=True),
sa.Column('mitm_def', MITMDataType.Json.sa_sql_type),
schema=schema)
return HeaderMetaTables(header_meta_definition=header_meta_definition,
header_meta_types=header_meta_types,
header_meta_type_attributes=header_meta_type_attributes)
def _gen_denormalized_views(mitm: MITM, concept_tables: ConceptTablesDict, type_tables: ConceptTypeTablesDict) -> \ def _gen_denormalized_views(mitm: MITM, concept_tables: ConceptTablesDict, type_tables: ConceptTypeTablesDict) -> \
...@@ -239,7 +303,8 @@ _view_generators: tuple[MitMDBViewsGenerator, ...] = (_gen_denormalized_views,) ...@@ -239,7 +303,8 @@ _view_generators: tuple[MitMDBViewsGenerator, ...] = (_gen_denormalized_views,)
def mk_sql_rep_schema(header: Header, def mk_sql_rep_schema(header: Header,
view_generators: Iterable[MitMDBViewsGenerator] | None = (_gen_denormalized_views,), view_generators: Iterable[MitMDBViewsGenerator] | None = (_gen_denormalized_views,),
override_schema: SchemaName | None = None, override_schema: SchemaName | None = None,
skip_fk_constraints: bool = False) -> SQLRepresentationSchema: skip_fk_constraints: bool = False,
include_meta_tables: bool = True) -> SQLRepresentationSchema:
schema_name = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA schema_name = override_schema if override_schema else SQL_REPRESENTATION_DEFAULT_SCHEMA
mitm_def = get_mitm_def(header.mitm) mitm_def = get_mitm_def(header.mitm)
meta = sa.MetaData(schema=schema_name) meta = sa.MetaData(schema=schema_name)
...@@ -258,7 +323,7 @@ def mk_sql_rep_schema(header: Header, ...@@ -258,7 +323,7 @@ def mk_sql_rep_schema(header: Header,
header.mitm, header.mitm,
concept, concept,
table_name, table_name,
{ col_group_maps={
'kind': lambda: ('kind', sa.Column('kind', 'kind': lambda: ('kind', sa.Column('kind',
MITMDataType.Text.sa_sql_type, MITMDataType.Text.sa_sql_type,
nullable=False)), nullable=False)),
...@@ -321,11 +386,58 @@ def mk_sql_rep_schema(header: Header, ...@@ -321,11 +386,58 @@ def mk_sql_rep_schema(header: Header,
for name, queryable in generator(header.mitm, concept_tables, type_tables): for name, queryable in generator(header.mitm, concept_tables, type_tables):
views[name] = create_view(name, queryable, meta, schema=schema_name) views[name] = create_view(name, queryable, meta, schema=schema_name)
return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views) meta_tables = None
if include_meta_tables:
meta_tables = mk_header_tables(meta, override_schema=schema_name)
return SQLRepresentationSchema(sa_meta=meta,
meta_tables=meta_tables,
concept_tables=concept_tables,
type_tables=type_tables,
views=views)
def insert_db_schema(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema) -> None: def insert_db_schema(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema) -> None:
sql_rep_schema.meta.create_all(bind=bind, checkfirst=True) sql_rep_schema.sa_meta.create_all(bind=bind, checkfirst=True)
def insert_header_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, header: Header) -> None:
if (meta_tables := sql_rep_schema.meta_tables):
mitm_def_json = header.mitm_def.model_dump(mode='json', by_alias=True, exclude_unset=True, exclude_none=True)
with use_nested_conn(bind) as conn:
conn.execute(
meta_tables['header_meta_definition'].insert().values([{'mitm': header.mitm, 'mitm_def': mitm_def_json}]))
conn.execute(meta_tables['header_meta_types'].insert().values(
[{'kind': he.kind, 'type': he.type_name, 'concept': he.concept}
for
he
in header.header_entries]))
conn.execute(meta_tables['header_meta_type_attributes'].insert().values([{
'kind': he.kind,
'type': he.type_name,
'attribute_order': i,
'attribute_name': a,
'attribute_data_type': str(dt)} for he in header.header_entries for
i, (a, dt) in enumerate(zip(he.attributes,
he.attribute_dtypes))]))
conn.commit()
def drop_header_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema) -> None:
if (meta_tables := sql_rep_schema.meta_tables):
with use_nested_conn(bind) as conn:
meta_tables['header_meta_definition'].drop(conn)
meta_tables['header_meta_type_attributes'].drop(conn)
meta_tables['header_meta_types'].drop(conn)
conn.commit()
def update_header_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, header: Header) -> None:
drop_header_data(bind, sql_rep_schema)
insert_header_data(bind, sql_rep_schema, header)
def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[str, Any] | None = None) -> list[ def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[str, Any] | None = None) -> list[
...@@ -376,10 +488,13 @@ def _insert_type_dfs(conn: sa.Connection, ...@@ -376,10 +488,13 @@ def _insert_type_dfs(conn: sa.Connection,
sql_rep_schema: SQLRepresentationSchema, sql_rep_schema: SQLRepresentationSchema,
mitm_def: MITMDefinition, mitm_def: MITMDefinition,
concept: ConceptName, concept: ConceptName,
typed_dfs: Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]) -> tuple[int, int]: typed_dfs: Iterable[tuple[TypeName, HeaderEntry, Iterable[pd.DataFrame]]]) -> tuple[
list[HeaderEntry], int, int]:
total_inserted_instances, total_inserted_rows = 0, 0 total_inserted_instances, total_inserted_rows = 0, 0
offsets = defaultdict(int) offsets = defaultdict(int)
for type_name, type_dfs in typed_dfs: inserted_types = []
for type_name, he, type_dfs in typed_dfs:
inserted_types.append(he)
for type_df in type_dfs: for type_df in type_dfs:
inserted_instances, inserted_rows = _insert_type_df(conn, inserted_instances, inserted_rows = _insert_type_df(conn,
sql_rep_schema, sql_rep_schema,
...@@ -391,40 +506,54 @@ def _insert_type_dfs(conn: sa.Connection, ...@@ -391,40 +506,54 @@ def _insert_type_dfs(conn: sa.Connection,
offsets[type_name] += inserted_instances offsets[type_name] += inserted_instances
total_inserted_instances += inserted_instances total_inserted_instances += inserted_instances
total_inserted_rows += inserted_rows total_inserted_rows += inserted_rows
return total_inserted_instances, total_inserted_rows return inserted_types, total_inserted_instances, total_inserted_rows
class SQLRepInsertionResult(pydantic.BaseModel):
inserted_types: list[HeaderEntry]
inserted_instances: int
inserted_rows: int
def insert_instances(bind: AnyDBBind, def insert_instances(bind: AnyDBBind,
sql_rep_schema: SQLRepresentationSchema, sql_rep_schema: SQLRepresentationSchema,
mitm: MITM, mitm: MITM,
instances: Iterable[tuple[ConceptName, Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]]]) -> \ instances: TypedMitMDataFrameStream) -> \
tuple[int, int]: SQLRepInsertionResult:
total_inserted_instances, total_inserted_rows = 0, 0 total_inserted_instances, total_inserted_rows = 0, 0
total_inserted_types = []
mitm_def = get_mitm_def(mitm) mitm_def = get_mitm_def(mitm)
with use_nested_conn(bind) as conn: with use_nested_conn(bind) as conn:
for concept, typed_dfs in instances: for concept, typed_dfs in instances:
inserted_instances, inserted_rows = _insert_type_dfs(conn, inserted_types, inserted_instances, inserted_rows = _insert_type_dfs(conn,
sql_rep_schema, sql_rep_schema,
mitm_def, mitm_def,
concept, concept,
typed_dfs) typed_dfs)
total_inserted_instances += inserted_instances total_inserted_instances += inserted_instances
total_inserted_rows += inserted_rows total_inserted_rows += inserted_rows
total_inserted_types.extend(inserted_types)
conn.commit() conn.commit()
return inserted_instances, inserted_rows return SQLRepInsertionResult(inserted_instances=total_inserted_instances,
inserted_rows=total_inserted_rows,
inserted_types=total_inserted_types)
def insert_mitm_data_instances(bind: AnyDBBind, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> tuple[ def insert_mitm_data_instances(bind: AnyDBBind,
int, int]: sql_rep_schema: SQLRepresentationSchema,
mitm_data: MITMData) -> SQLRepInsertionResult:
from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes
mitm_dataframes = mitm_data_into_mitm_dataframes(mitm_data) mitm_dataframes = mitm_data_into_mitm_dataframes(mitm_data)
instances: Iterable[tuple[ConceptName, Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]]] = ((c, ((t, (df,)) for t, df in dfs.items())) for c, dfs in mitm_dataframes) he_dict = mitm_dataframes.header.as_dict
instances: TypedMitMDataFrameStream = (
(c, ((t, he_dict[c][t], (df,)) for t, df in dfs.items())) for c, dfs in mitm_dataframes)
return insert_instances(bind, sql_rep_schema, mitm_data.header.mitm, instances) return insert_instances(bind, sql_rep_schema, mitm_data.header.mitm, instances)
def insert_mitm_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None: def insert_mitm_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None:
insert_db_schema(bind, sql_rep_schema) insert_db_schema(bind, sql_rep_schema)
insert_mitm_data_instances(bind, sql_rep_schema, mitm_data) insert_mitm_data_instances(bind, sql_rep_schema, mitm_data)
insert_header_data(bind, sql_rep_schema, mitm_data.header)
def mk_sqlite(mitm_data: MITMData, file_path: FilePath | None = ':memory:', autoclose: bool = True) -> tuple[ def mk_sqlite(mitm_data: MITMData, file_path: FilePath | None = ':memory:', autoclose: bool = True) -> tuple[
......
...@@ -3,9 +3,12 @@ from typing import Iterator, Iterable ...@@ -3,9 +3,12 @@ from typing import Iterator, Iterable
import pandas as pd import pandas as pd
from mitm_tooling.definition import get_mitm_def, TypeName from mitm_tooling.definition import get_mitm_def, TypeName
from mitm_tooling.extraction.sql.mapping import Exportable, DataProvider from mitm_tooling.extraction.sql.mapping import Exportable
from mitm_tooling.representation.sql_representation import SQLRepresentationSchema, insert_db_schema, insert_instances from mitm_tooling.representation import HeaderEntry, Header
from mitm_tooling.utilities.sql_utils import AnyDBBind, use_db_bind, use_nested_conn from mitm_tooling.representation.df_representation import TypedMitMDataFrameStream
from mitm_tooling.representation.sql_representation import SQLRepresentationSchema, insert_db_schema, insert_instances, \
SQLRepInsertionResult, insert_header_data
from mitm_tooling.utilities.sql_utils import AnyDBBind, use_db_bind
def insert_exportable_instances( def insert_exportable_instances(
...@@ -14,44 +17,40 @@ def insert_exportable_instances( ...@@ -14,44 +17,40 @@ def insert_exportable_instances(
target: AnyDBBind, target: AnyDBBind,
target_sql_rep_schema: SQLRepresentationSchema, target_sql_rep_schema: SQLRepresentationSchema,
stream_data: bool = False, stream_data: bool = False,
) -> tuple[int, int]: ) -> SQLRepInsertionResult:
mitm_def = get_mitm_def(exportable.mitm) mitm_def = get_mitm_def(exportable.mitm)
with use_db_bind(source) as source_conn:
def instances(): def instances() -> TypedMitMDataFrameStream:
with use_db_bind(source) as source_conn:
for c, dps in exportable.data_providers.items(): for c, dps in exportable.data_providers.items():
def typed_df_chunks_iter(c=c, dps=dps) -> Iterator[
def typed_df_chunks_iter() -> Iterator[tuple[TypeName, Iterable[pd.DataFrame]]]: tuple[TypeName, HeaderEntry, Iterable[pd.DataFrame]]]:
for dp in dps: for dp in dps:
def local_iter(dp: DataProvider) -> Iterator[tuple[TypeName, Iterable[pd.DataFrame]]]:
chunks = dp.instance_provider.apply_db_chunked(source_conn) if stream_data else [ chunks = dp.instance_provider.apply_db_chunked(source_conn) if stream_data else [
dp.instance_provider.apply_db(source_conn)] dp.instance_provider.apply_db(source_conn)]
for df_chunk in chunks: for df_chunk in chunks:
df_chunk = dp.instance_postprocessor.apply_df(df_chunk) df_chunk = dp.instance_postprocessor.apply_df(df_chunk)
for type_name, type_idx in df_chunk.groupby(mitm_def.get_properties(c).typing_concept).groups.items(): for type_name, type_idx in df_chunk.groupby(mitm_def.get_properties(c).typing_concept).groups.items():
yield type_name, (df_chunk.loc[type_idx],) # exactly one header entry per type
# alternative but probably less efficient hes = dp.header_entry_provider.apply_df(df_chunk.loc[type_idx])
# chunk_hes = dp.header_entry_provider.apply_df(df_chunk) assert len(hes) == 1, f'expected exactly one header entry per type, got {len(hes)}'
# for he in chunk_hes: he = hes[0]
# df_sub_chunk = df_chunk[df_chunk[mitm_def.get_properties(c).typing_concept] == he.type_name]
# yield he.type_name, df_sub_chunk
yield from local_iter(dp) yield str(type_name), he, (df_chunk.loc[type_idx],)
yield c, typed_df_chunks_iter() yield c, typed_df_chunks_iter(c, dps)
inserted_instances, inserted_rows = insert_instances(target, return insert_instances(target,
target_sql_rep_schema, target_sql_rep_schema,
exportable.mitm, exportable.mitm,
instances()) instances())
return inserted_instances, inserted_rows
def insert_exportable(source: AnyDBBind, def insert_exportable(target: AnyDBBind,
exportable: Exportable,
target: AnyDBBind,
sql_rep_schema: SQLRepresentationSchema, sql_rep_schema: SQLRepresentationSchema,
source: AnyDBBind,
exportable: Exportable,
stream_data: bool = False) -> None: stream_data: bool = False) -> None:
insert_db_schema(target, sql_rep_schema) insert_db_schema(target, sql_rep_schema)
insert_exportable_instances(source, exportable, target, sql_rep_schema, stream_data=stream_data) insertion_result = insert_exportable_instances(source, exportable, target, sql_rep_schema, stream_data=stream_data)
insert_header_data(target, sql_rep_schema, Header(mitm=exportable.mitm, header_entries=tuple(insertion_result.inserted_types)))
...@@ -19,7 +19,7 @@ def db_engine_into_db_meta(engine: sa.Engine) -> DBMetaInfo: ...@@ -19,7 +19,7 @@ def db_engine_into_db_meta(engine: sa.Engine) -> DBMetaInfo:
def sql_rep_schema_into_db_meta(sql_rep_schema: SQLRepresentationSchema, def sql_rep_schema_into_db_meta(sql_rep_schema: SQLRepresentationSchema,
default_schema: str = SQL_REPRESENTATION_DEFAULT_SCHEMA) -> DBMetaInfo: default_schema: str = SQL_REPRESENTATION_DEFAULT_SCHEMA) -> DBMetaInfo:
return DBMetaInfo.from_sa_meta(sql_rep_schema.meta, default_schema=default_schema) return DBMetaInfo.from_sa_meta(sql_rep_schema.sa_meta, default_schema=default_schema)
......
[project] [project]
name = "mitm-tooling" name = "mitm-tooling"
version = "0.6.0" version = "0.6.1"
description = "" description = ""
authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aachen.de" }] authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aachen.de" }]
requires-python = ">=3.11,<3.14" requires-python = ">=3.11,<3.14"
......
import os
import unittest
from pydantic import AnyUrl
from mitm_tooling.transformation.superset.common import DBConnectionInfo
from mitm_tooling.transformation.superset.visualizations.maed.registry import MAEDVisualizationType
from mitm_tooling.utilities.identifiers import name_plus_uuid
class MyTestCase(unittest.TestCase):
def test_something(self):
pass
def test_sql_repr(self):
from mitm_tooling.representation import Header, HeaderEntry, mk_sql_rep_schema
from mitm_tooling.definition import MITM
from mitm_tooling.data_types import MITMDataType
h = Header(mitm=MITM.MAED, header_entries=(
HeaderEntry(concept='measurement', kind='M', type_name='A', attributes=('x',),
attribute_dtypes=(MITMDataType.Numeric,)),
HeaderEntry(concept='segment', kind='S', type_name='annotation', attributes=(),
attribute_dtypes=()),
HeaderEntry(concept='segment_data', kind='SD', type_name='annotation_info', attributes=('y',),
attribute_dtypes=(MITMDataType.Json,)),
))
sql_rep = mk_sql_rep_schema(h)
print(sql_rep.meta)
print()
print(sql_rep.concept_tables)
print(sql_rep.type_tables)
print()
def test_writing_sqlite(self):
from mitm_tooling.representation import Header, HeaderEntry, MITMData, mk_sqlite
from mitm_tooling.definition import MITM
from mitm_tooling.data_types import MITMDataType
h = Header(mitm=MITM.MAED, header_entries=(
HeaderEntry(concept='measurement', kind='M', type_name='A', attributes=('x',),
attribute_dtypes=(MITMDataType.Numeric,)),
HeaderEntry(concept='segment', kind='S', type_name='annotation', attributes=(),
attribute_dtypes=()),
HeaderEntry(concept='segment_data', kind='SD', type_name='annotation_info', attributes=('y',),
attribute_dtypes=(MITMDataType.Json,)),
))
mk_sqlite(MITMData(header=h), file_path='gendb.sqlite')
def test_with_synthetic(self):
from mitm_tooling.representation import mk_sqlite
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED)
if os.path.exists('synthetic.sqlite'):
os.remove('synthetic.sqlite')
mk_sqlite(syn, 'synthetic.sqlite')
def test_with_synthetic_variation(self):
from mitm_tooling.representation import mk_sqlite
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic-variation.maed', MITM.MAED)
if os.path.exists('synthetic-variation.sqlite'):
os.remove('synthetic-variation.sqlite')
mk_sqlite(syn, 'synthetic-variation.sqlite')
def test_mitm_datasource_assets(self):
from mitm_tooling.io import importing, MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED, header_only=True)
from mitm_tooling.transformation.superset import mk_superset_mitm_dataset_bundle, write_superset_import_as_zip
from mitm_tooling.transformation.superset.definitions import MetadataType
dataset_bundle = mk_superset_mitm_dataset_bundle(syn.header,
DBConnectionInfo(
sql_alchemy_uri=AnyUrl(
'sqlite://synthetic-variation.sqlite'),
explicit_db_name='SyntheticExampleDB',
schema_name='main'),
name_plus_uuid(
'SyntheticExampleDataset'),
)
write_superset_import_as_zip('superset_datasource_import',
dataset_bundle.datasource_bundle.to_import(MetadataType.SqlaTable))
write_superset_import_as_zip('superset_mitm_dataset_import', dataset_bundle.to_import())
def test_superset_mitm_dataset(self):
from mitm_tooling.io import importing, MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED, header_only=True)
from mitm_tooling.transformation.superset import mk_superset_mitm_dataset_bundle, write_superset_import_as_zip
from mitm_tooling.transformation.superset.definitions import MetadataType
full_bundle = mk_superset_mitm_dataset_bundle(syn.header,
DBConnectionInfo(
sql_alchemy_uri=AnyUrl(
'postgresql://mitm-pg-user:superweirdpasswordpleasedonotcrack@mitm-db:5432/mitm_db'),
explicit_db_name='MitM DB',
schema_name='myname_0_2c3d7f1584b1'), # myname_0_39f207b32588
name_plus_uuid(
'SyntheticExampleDataset'),
visualization_types=[MAEDVisualizationType.Baseline]
)
write_superset_import_as_zip('superset_mitm_dataset_db_import',
full_bundle.datasource_bundle.to_import(MetadataType.Database))
write_superset_import_as_zip('superset_mitm_dataset_datasource_import',
full_bundle.datasource_bundle.to_import(MetadataType.SqlaTable))
write_superset_import_as_zip('superset_mitm_dataset_chart_import',
full_bundle.visualization_bundle.to_import(MetadataType.Slice))
write_superset_import_as_zip('superset_mitm_dataset_dashboard_import',
full_bundle.visualization_bundle.to_import(MetadataType.Dashboard))
write_superset_import_as_zip('superset_mitm_dataset_with_viz_import', full_bundle.to_import())
write_superset_import_as_zip('superset_viz_import', full_bundle.visualization_bundle.to_import())
if __name__ == '__main__':
unittest.main()
from mitm_tooling.transformation.sql.from_exportable import insert_exportable
from mitm_tooling.utilities.sql_utils import create_sa_engine
def test_exportable_sql_rep_import():
insert_exportable()
...
\ No newline at end of file
import os
import pytest
def test_sql_repr():
from mitm_tooling.representation import Header, HeaderEntry, mk_sql_rep_schema
from mitm_tooling.definition import MITM
from mitm_tooling.data_types import MITMDataType
h = Header(mitm=MITM.MAED, header_entries=(
HeaderEntry(concept='measurement', kind='M', type_name='A', attributes=('x',),
attribute_dtypes=(MITMDataType.Numeric,)),
HeaderEntry(concept='segment', kind='S', type_name='annotation', attributes=(),
attribute_dtypes=()),
HeaderEntry(concept='segment_data', kind='SD', type_name='annotation_info', attributes=('y',),
attribute_dtypes=(MITMDataType.Json,)),
))
sql_rep = mk_sql_rep_schema(h)
print(sql_rep.sa_meta)
print()
print(sql_rep.meta_tables)
print(sql_rep.concept_tables)
print(sql_rep.type_tables)
print()
def test_writing_sqlite():
from mitm_tooling.representation import Header, HeaderEntry, MITMData, mk_sqlite
from mitm_tooling.definition import MITM
from mitm_tooling.data_types import MITMDataType
h = Header(mitm=MITM.MAED, header_entries=(
HeaderEntry(concept='measurement', kind='M', type_name='A', attributes=('x',),
attribute_dtypes=(MITMDataType.Numeric,)),
HeaderEntry(concept='segment', kind='S', type_name='annotation', attributes=(),
attribute_dtypes=()),
HeaderEntry(concept='segment_data', kind='SD', type_name='annotation_info', attributes=('y',),
attribute_dtypes=(MITMDataType.Json,)),
))
if os.path.exists('gendb.sqlite'):
os.remove('gendb.sqlite')
mk_sqlite(MITMData(header=h), file_path='gendb.sqlite')
def test_with_synthetic_trimmed():
from mitm_tooling.representation import mk_sqlite
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic-trimmed.maed', MITM.MAED)
if os.path.exists('synthetic-trimmed.sqlite'):
os.remove('synthetic-trimmed.sqlite')
print(syn.header)
mk_sqlite(syn, 'synthetic-trimmed.sqlite')
@pytest.mark.skip(reason="This test is too slow")
def test_with_synthetic():
from mitm_tooling.representation import mk_sqlite
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED)
if os.path.exists('synthetic.sqlite'):
os.remove('synthetic.sqlite')
mk_sqlite(syn, 'synthetic.sqlite')
# @pytest.mark.skip(reason="This test is too slow")
def test_with_synthetic_variation():
from mitm_tooling.representation import mk_sqlite
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic-variation.maed', MITM.MAED)
if os.path.exists('synthetic-variation.sqlite'):
os.remove('synthetic-variation.sqlite')
mk_sqlite(syn, 'synthetic-variation.sqlite')
import pytest
from pydantic import AnyUrl
from mitm_tooling.utilities.identifiers import name_plus_uuid
def test_mitm_datasource_assets():
from mitm_tooling.io import importing, MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED, header_only=True)
from mitm_tooling.transformation.superset import mk_superset_mitm_dataset_bundle, write_superset_import_as_zip
from mitm_tooling.transformation.superset.definitions import MetadataType
from mitm_tooling.transformation.superset.common import DBConnectionInfo
dataset_bundle = mk_superset_mitm_dataset_bundle(syn.header,
DBConnectionInfo(
sql_alchemy_uri=AnyUrl(
'sqlite://synthetic-variation.sqlite'),
explicit_db_name='SyntheticExampleDB',
schema_name='main'),
name_plus_uuid(
'SyntheticExampleDataset'),
)
write_superset_import_as_zip('superset_datasource_import',
dataset_bundle.datasource_bundle.to_import(MetadataType.SqlaTable))
write_superset_import_as_zip('superset_mitm_dataset_import', dataset_bundle.to_import())
def test_superset_mitm_dataset():
from mitm_tooling.io import importing, MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED, header_only=True)
from mitm_tooling.transformation.superset import mk_superset_mitm_dataset_bundle, write_superset_import_as_zip
from mitm_tooling.transformation.superset.definitions import MetadataType
from mitm_tooling.transformation.superset.common import DBConnectionInfo
from mitm_tooling.transformation.superset import MAEDVisualizationType
full_bundle = mk_superset_mitm_dataset_bundle(syn.header,
DBConnectionInfo(
sql_alchemy_uri=AnyUrl(
'postgresql://mitm-pg-user:superweirdpasswordpleasedonotcrack@mitm-db:5432/mitm_db'),
explicit_db_name='MitM DB',
schema_name='myname_0_2c3d7f1584b1'), # myname_0_39f207b32588
name_plus_uuid(
'SyntheticExampleDataset'),
visualization_types=[MAEDVisualizationType.Baseline]
)
write_superset_import_as_zip('superset_mitm_dataset_db_import',
full_bundle.datasource_bundle.to_import(MetadataType.Database))
write_superset_import_as_zip('superset_mitm_dataset_datasource_import',
full_bundle.datasource_bundle.to_import(MetadataType.SqlaTable))
write_superset_import_as_zip('superset_mitm_dataset_chart_import',
full_bundle.visualization_bundle.to_import(MetadataType.Slice))
write_superset_import_as_zip('superset_mitm_dataset_dashboard_import',
full_bundle.visualization_bundle.to_import(MetadataType.Dashboard))
write_superset_import_as_zip('superset_mitm_dataset_with_viz_import', full_bundle.to_import())
write_superset_import_as_zip('superset_viz_import', full_bundle.visualization_bundle.to_import())
import unittest from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes
from transformation.df import unpack_mitm_data
class MyTestCase(unittest.TestCase): def test_to_df():
def test_to_df(self):
from mitm_tooling.io import importing from mitm_tooling.io import importing
from mitm_tooling.definition import MITM from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED) syn = importing.read_zip('synthetic.maed', MITM.MAED)
mitm_dataset = unpack_mitm_data(syn) mitm_dataframes = mitm_data_into_mitm_dataframes(syn)
for c, typed_dfs in mitm_dataset: for c, typed_dfs in mitm_dataframes:
for type_name, df in typed_dfs.items(): for type_name, df in typed_dfs.items():
print(c, type_name)
print(df.head()) print(df.head())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment