diff --git a/mitm_tooling/extraction/sql/data_models/db_meta.py b/mitm_tooling/extraction/sql/data_models/db_meta.py index f73350600fca9478a56cc89119be8815163c2290..db2374d00fbb2c81e806d992e3585d2bc4d07ea4 100644 --- a/mitm_tooling/extraction/sql/data_models/db_meta.py +++ b/mitm_tooling/extraction/sql/data_models/db_meta.py @@ -7,7 +7,8 @@ from pydantic import Field from sqlalchemy import Table, MetaData from mitm_tooling.data_types import MITMDataType, SA_SQLTypeName, sa_sql_to_mitm_type -from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier, Queryable +from mitm_tooling.representation.sql_representation import ColumnName, TableName, SchemaName, ShortTableIdentifier, \ + Queryable from mitm_tooling.utilities.sql_utils import unqualify from .table_identifiers import LocalTableIdentifier, \ AnyLocalTableIdentifier @@ -118,7 +119,8 @@ class TableMetaInfo(TableMetaInfoBase): @classmethod def from_sa_table(cls, t: Table, queryable_source: Queryable | None = None, default_schema: str | None = None) -> Self: - fkcs = [ForeignKeyConstraint.from_sa_constraint(fkc, t.schema or default_schema) for fkc in t.foreign_key_constraints] + fkcs = [ForeignKeyConstraint.from_sa_constraint(fkc, t.schema or default_schema) for fkc in + t.foreign_key_constraints] col_props = {c.name: ColumnProperties(nullable=c.nullable, unique=bool(c.unique), part_of_index=any(c.name in ind.columns for ind in t.indexes), part_of_pk=c.primary_key, part_of_fk=len(c.foreign_keys) > 0, @@ -172,7 +174,7 @@ class TableMetaInfo(TableMetaInfoBase): class DBMetaInfoBase(pydantic.BaseModel): db_structure: dict[SchemaName, dict[TableName, TableMetaInfoBase]] - @pydantic.computed_field(repr=False) + @cached_property def tables(self) -> dict[ShortTableIdentifier, TableMetaInfoBase]: return {tm.short_table_identifier: tm for schema, tables in self.db_structure.items() for table, tm in tables.items()} @@ -185,8 +187,7 @@ class DBMetaInfo(DBMetaInfoBase): default_schema: SchemaName sa_meta: MetaData - @pydantic.computed_field(repr=False) - @property + @cached_property def tables(self) -> dict[ShortTableIdentifier, TableMetaInfo]: return {tm.short_table_identifier: tm for schema, tables in self.db_structure.items() for table, tm in tables.items()} diff --git a/mitm_tooling/extraction/sql/data_models/db_probe.py b/mitm_tooling/extraction/sql/data_models/db_probe.py index 40e4543a3144d6446376344f86310de5a6865951..e8c6a54fd67ba5b8e47f28752896c0ead13f3da0 100644 --- a/mitm_tooling/extraction/sql/data_models/db_probe.py +++ b/mitm_tooling/extraction/sql/data_models/db_probe.py @@ -1,14 +1,15 @@ import logging +from functools import cached_property from typing import Any import pydantic from pydantic import NonNegativeInt, Field from mitm_tooling.data_types.data_types import MITMDataType -from .db_meta import TableMetaInfoBase, DBMetaInfoBase, DBMetaInfo from mitm_tooling.representation import ColumnName +from mitm_tooling.representation.sql.common import ShortTableIdentifier, SchemaName, TableName +from .db_meta import TableMetaInfoBase, DBMetaInfoBase, DBMetaInfo from .probe_models import SampleSummary -from mitm_tooling.representation.sql.common import ShortTableIdentifier logger = logging.getLogger('api') @@ -31,34 +32,60 @@ class TableProbe(TableProbeBase): class DBProbeMinimal(pydantic.BaseModel): - table_probes: dict[ShortTableIdentifier, TableProbeMinimal] = Field(default_factory=dict) + db_structured_table_probes: dict[SchemaName, dict[TableName, TableProbeMinimal]] = Field(default_factory=dict) + + @cached_property + def table_probes(self) -> dict[ShortTableIdentifier, TableProbeMinimal]: + return {(schema_name, table_name): tp for schema_name, schema_probes in self.db_structured_table_probes.items() + for table_name, tp in schema_probes.items()} class DBProbeBase(DBProbeMinimal): db_meta: DBMetaInfoBase - table_probes: dict[ShortTableIdentifier, TableProbeBase] = Field(default_factory=dict) + db_structured_table_probes: dict[SchemaName, dict[TableName, TableProbeBase]] = Field(default_factory=dict) + + @cached_property + def table_probes(self) -> dict[ShortTableIdentifier, TableProbeBase]: + return {(schema_name, table_name): tp for schema_name, schema_probes in self.db_structured_table_probes.items() + for table_name, tp in schema_probes.items()} class DBProbe(DBProbeBase): db_meta: DBMetaInfo - table_probes: dict[ShortTableIdentifier, TableProbe] = Field(default_factory=dict) + db_structured_table_probes: dict[SchemaName, dict[TableName, TableProbe]] = Field(default_factory=dict) + + @cached_property + def table_probes(self) -> dict[ShortTableIdentifier, TableProbe]: + return {(schema_name, table_name): tp for schema_name, schema_probes in self.db_structured_table_probes.items() + for table_name, tp in schema_probes.items()} def update_meta(self, db_meta: DBMetaInfo): if self.db_meta: - for new_ti, new_table in db_meta.tables.items(): - if (table := self.db_meta.tables.get(new_ti, None)) is not None: - if table != new_table: - # remove associated probe - logger.info(f'Removed table probe of {new_ti} due to metadata refresh.') - self.table_probes.pop(new_ti, None) - # self.table_probes.get(new_ti).table_info = new_table + remaining_probes = {} + for schema_name, existing_tables in self.db_meta.db_structure.items(): + if (schema_name in db_meta.db_structure) and ( + new_tables := db_meta.db_structure[schema_name]) is not None: + schema_local_probes = {} + for table_name, existing_table in existing_tables.items(): + if ((table := new_tables.get(table_name, None)) is not None) and table == existing_table: + schema_local_probes[table_name] = self.db_structured_table_probes[schema_name][table_name] + else: + logger.info(f'Removed table probe of {schema_name}.{table_name} due to metadata refresh.') + + if len(schema_local_probes) > 0: + remaining_probes[schema_name] = schema_local_probes + + self.db_structured_table_probes = remaining_probes self.db_meta = db_meta def update_probes(self, *probes: tuple[ShortTableIdentifier, TableProbe]): for ti, tp in probes: - self.table_probes[ti] = tp + schema_name, table_name = ti + self.db_structured_table_probes[schema_name][table_name] = tp def drop_probes(self, *to_drop: ShortTableIdentifier): for ti in to_drop: - if ti in self.table_probes: - del self.table_probes[ti] + schema_name, table_name = ti + if schema_name in self.db_structured_table_probes: + if table_name in self.db_structured_table_probes[schema_name]: + del self.db_structured_table_probes[schema_name][table_name] diff --git a/mitm_tooling/extraction/sql/db/db_probing.py b/mitm_tooling/extraction/sql/db/db_probing.py index 925bc6a38dfa7289b37707f7dd1df1cdf4c48d8a..c229a160387f877eb62042807dd4f9ab5e72054e 100644 --- a/mitm_tooling/extraction/sql/db/db_probing.py +++ b/mitm_tooling/extraction/sql/db/db_probing.py @@ -244,7 +244,7 @@ def create_table_probe(db_session: Session, table_meta: TableMetaInfo, sample_si row_count = query_row_count(db_session, queryable_source) _, df = sample_queryable(db_session, {queryable_source}, sample_size=sample_size)[0] inferred_types, sample_summaries = analyze_queryable_sample(queryable_source, df) - sampled_values = df.to_dict(orient='list') # {str(c): vs for c, vs in df.to_dict(orient='list').items()} + sampled_values = {str(c): [str(v) for v in vs] for c, vs in df.to_dict(orient='list').items()} return TableProbe(table_meta=table_meta, row_count=row_count, sampled_values=sampled_values, inferred_types=inferred_types, sample_summaries=sample_summaries) diff --git a/mitm_tooling/extraction/sql/mapping/export.py b/mitm_tooling/extraction/sql/mapping/export.py index caf71f41b8f616be600e780da9a28de7b729ca7b..539c18850025c38963603b743703f76590c9ea32 100644 --- a/mitm_tooling/extraction/sql/mapping/export.py +++ b/mitm_tooling/extraction/sql/mapping/export.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING import pandas as pd import pydantic +from pydantic import ConfigDict import sqlalchemy as sa from sqlalchemy.orm import Session @@ -18,16 +19,26 @@ from .mapping import ConceptMapping, DataProvider, ConceptMappingException, Inst from mitm_tooling.io import ZippedExport, StreamingZippedExport from mitm_tooling.representation import HeaderEntry, Header, StreamingConceptData, StreamingMITMData, MITMData from mitm_tooling.representation import mk_concept_file_header +from mitm_tooling.utilities.sql_utils import AnyDBBind STREAMING_CHUNK_SIZE = 100_000 class Exportable(pydantic.BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + mitm: MITM data_providers: dict[ConceptName, list[DataProvider]] filename: str | None = None - def export_to_memory(self, db_session: Session, validate: bool = False) -> ZippedExport: + def generate_header(self, bind: AnyDBBind) -> Header: + header_entries = [] + for _, dps in self.data_providers.items(): + for dp in dps: + header_entries.extend(dp.header_entry_provider.apply_db(bind)) + return Header(mitm=self.mitm, header_entries=tuple(header_entries)) + + def export_to_memory(self, bind: AnyDBBind, validate: bool = False) -> ZippedExport: header_entries = [] tables = {} @@ -35,10 +46,10 @@ class Exportable(pydantic.BaseModel): dfs = [] for dp in dps: - df = dp.instance_provider.apply_session(db_session) + df = dp.instance_provider.apply_db(bind) if validate: raise NotImplementedError - df = dp.instance_postprocessor.apply(df) + df = dp.instance_postprocessor.apply_df(df) dfs.append(df) header_entries += dp.header_entry_provider.apply_df(df) @@ -50,8 +61,7 @@ class Exportable(pydantic.BaseModel): return ZippedExport(mitm=self.mitm, filename=filename, mitm_data=MITMData(header=header, concept_dfs=tables)) - def export_as_stream(self, db_session: Session, validate: bool = False) -> StreamingZippedExport: - + def export_as_stream(self, bind: AnyDBBind, validate: bool = False) -> StreamingZippedExport: data_sources = {} for c, dps in self.data_providers.items(): k = max((dp.header_entry_provider.type_arity for dp in dps)) @@ -62,11 +72,11 @@ class Exportable(pydantic.BaseModel): for dp in dps: def local_iter(dp: DataProvider, columns=tuple(concept_file_columns)) -> Iterator[ tuple[pd.DataFrame, list[HeaderEntry]]]: - for df_chunk in dp.instance_provider.apply_session_chunked(db_session, STREAMING_CHUNK_SIZE): + for df_chunk in dp.instance_provider.apply_db_chunked(bind, STREAMING_CHUNK_SIZE): if validate: raise NotImplementedError df_chunk = df_chunk.reindex(columns=list(columns), copy=False) - yield dp.instance_postprocessor.apply(df_chunk), dp.header_entry_provider.apply_df(df_chunk) + yield dp.instance_postprocessor.apply_df(df_chunk), dp.header_entry_provider.apply_df(df_chunk) chunk_iterators.append(local_iter(dp)) @@ -87,7 +97,7 @@ class MappingExport(pydantic.BaseModel): data_providers: dict[ConceptName, list[DataProvider]] = {} meta = sa.MetaData(schema='export') - for i, concept_mapping in enumerate(self.mapped_concepts): + for concept_mapping in self.mapped_concepts: if concept_mapping.mitm != self.mitm: continue diff --git a/mitm_tooling/extraction/sql/mapping/mapping.py b/mitm_tooling/extraction/sql/mapping/mapping.py index 54eb83ed63e16ee759efcea0e07895c1e7b5a5fd..07d96a0e0bab063706bfa1569f49f397f58d58f1 100644 --- a/mitm_tooling/extraction/sql/mapping/mapping.py +++ b/mitm_tooling/extraction/sql/mapping/mapping.py @@ -19,6 +19,7 @@ from ..transformation.db_transformation import TableNotFoundException, col_by_na from ..transformation.df_transformation import transform_df from mitm_tooling.utilities.python_utils import normalize_into_dict, elem_wise_eq, ExtraInfoExc +from mitm_tooling.utilities.sql_utils import AnyDBBind, use_db_bind from .validation_models import IndividualMappingValidationContext, \ MappingGroupValidationContext @@ -39,11 +40,13 @@ class ColumnContentProvider(pydantic.BaseModel): static_value: str | None = None) -> Self: return ColumnContentProvider(column_name=tup[0], column_element=tup[1], is_present_in_table=is_present_in_table, static_value=static_value) + @classmethod def from_static(cls, name: str, value: str, dt: MITMDataType = MITMDataType.Text) -> Self: ce = sa.literal(value, dt.sa_sql_type_cls).alias(name) return ColumnContentProvider(column_name=name, column_element=ce, is_present_in_table=False, static_value=value) + @dataclasses.dataclass class HeaderEntryProvider: concept: ConceptName @@ -57,9 +60,10 @@ class HeaderEntryProvider: def type_arity(self): return len(self.attributes) - def apply_session(self, db_session: Session) -> list[HeaderEntry]: - distinct = db_session.execute( - sa.select(self.kind_provider.column_element, self.type_provider.column_element).distinct()).all() + def apply_db(self, bind: AnyDBBind) -> list[HeaderEntry]: + with use_db_bind(bind) as conn: + distinct = conn.execute( + sa.select(self.kind_provider.column_element, self.type_provider.column_element).distinct()).all() return self.apply_iterable(((kind, type_name) for kind, type_name in distinct)) def apply_df(self, df: pd.DataFrame) -> list[HeaderEntry]: @@ -85,7 +89,7 @@ class HeaderEntryProvider: class InstancesPostProcessor: transforms: list[TableTransforms] = dataclasses.field(default_factory=list) - def apply(self, df: pd.DataFrame): + def apply_df(self, df: pd.DataFrame): return transform_df(df, self.transforms) @@ -93,17 +97,19 @@ class InstancesPostProcessor: class InstancesProvider: virtual_view: VirtualView - def apply_session(self, db_session: Session) -> pd.DataFrame: + def apply_db(self, bind: AnyDBBind) -> pd.DataFrame: tm = self.virtual_view.table_meta - results = db_session.execute(tm.queryable_source.select()).all() + with use_db_bind(bind) as conn: + results = conn.execute(tm.queryable_source.select()).all() df = pd.DataFrame.from_records(results, columns=list(tm.columns)) return df - def apply_session_chunked(self, db_session: Session, chunk_size: int) -> Iterable[pd.DataFrame]: + def apply_db_chunked(self, bind: AnyDBBind, chunk_size: int = 100_000) -> Iterable[pd.DataFrame]: tm = self.virtual_view.table_meta - results = db_session.execute(tm.queryable_source.select()).partitions(chunk_size) - for result_chunk in results: - yield pd.DataFrame.from_records(result_chunk, columns=list(tm.columns)) + with use_db_bind(bind) as conn: + results = conn.execute(tm.queryable_source.select()).partitions(chunk_size) + for result_chunk in results: + yield pd.DataFrame.from_records(result_chunk, columns=list(tm.columns)) @dataclasses.dataclass diff --git a/mitm_tooling/io/exporting.py b/mitm_tooling/io/exporting.py index 05a7c22da1610c3042b057876ebb88baa86d8d6d..0effd86256e4a5bdefc2502b297a6be11e200b6a 100644 --- a/mitm_tooling/io/exporting.py +++ b/mitm_tooling/io/exporting.py @@ -82,7 +82,7 @@ class StreamingZippedExport(FileExport): header_entries=tuple(collected_header_entries)).generate_header_df() write_header_file(header_df, hf) - def to_stream(self, chunk_size: int = 65536) -> Iterable[bytes]: + def iter_bytes(self, chunk_size: int = 65536) -> Iterable[bytes]: from stream_zip import stream_zip, ZIP_64 from stat import S_IFREG mitm_def = get_mitm_def(self.mitm) @@ -110,6 +110,7 @@ class StreamingZippedExport(FileExport): return stream_zip(files(), chunk_size=chunk_size) + def write_zip(target: FilePath, mitm_data: MITMData): return ZippedExport(mitm=mitm_data.header.mitm, filename=os.path.basename(target), diff --git a/mitm_tooling/representation/df_representation.py b/mitm_tooling/representation/df_representation.py index e0571f07cc469d8ab4f3324c0c42caf2c71c9096..19943041b263159f9e0235fb051dbcac8624e72e 100644 --- a/mitm_tooling/representation/df_representation.py +++ b/mitm_tooling/representation/df_representation.py @@ -4,15 +4,25 @@ import pandas as pd import pydantic from pydantic import ConfigDict -from mitm_tooling.definition import ConceptName +from mitm_tooling.definition import ConceptName, TypeName from .intermediate_representation import Header -class MITMDataFrames(Iterable[tuple[str, dict[str, pd.DataFrame]]], pydantic.BaseModel): +class MITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) header: Header - dfs: dict[ConceptName, dict[str, pd.DataFrame]] + dfs: dict[ConceptName, dict[TypeName, pd.DataFrame]] def __iter__(self): return iter(self.dfs.items()) + + +class StreamingMITMDataFrames(Iterable[tuple[ConceptName, dict[TypeName, pd.DataFrame]]], pydantic.BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + header: Header + df_iters: dict[ConceptName, dict[TypeName, Iterable[pd.DataFrame]]] + + def __iter__(self): + return iter(self.df_iters.items()) diff --git a/mitm_tooling/representation/sql_representation.py b/mitm_tooling/representation/sql_representation.py index 6ab8a3db689b0524b34a91f470fc3fe1cb4d0c2c..4a88a8a143ec1884e28051c694e418c639d6acae 100644 --- a/mitm_tooling/representation/sql_representation.py +++ b/mitm_tooling/representation/sql_representation.py @@ -1,7 +1,7 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Callable, Iterable -from contextlib import contextmanager from typing import TYPE_CHECKING, Generator, Any, Sequence import pandas as pd @@ -11,14 +11,15 @@ from pydantic import AnyUrl, ConfigDict from sqlalchemy import func from sqlalchemy.pool import StaticPool -from mitm_tooling.definition import RelationName +from mitm_tooling.definition import RelationName, MITMDefinition from mitm_tooling.definition.definition_tools import ColGroupMaps -from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify +from mitm_tooling.utilities.sql_utils import create_sa_engine, qualify, use_nested_conn from .common import * from .intermediate_representation import Header, MITMData, TypeName from .sql.common import * -from ..utilities.io_utils import FilePath from ..utilities.backports.sqlchemy_sql_views import create_view +from ..utilities.io_utils import FilePath +from ..utilities.sql_utils import EngineOrConnection if TYPE_CHECKING: pass @@ -143,7 +144,7 @@ _schema_item_generators: tuple[MitMConceptSchemaItemGenerator, ...] = ( def _gen_within_concept_id_col(mitm: MITM, concept: ConceptName) -> Generator[tuple[str, sa.Column], None, None]: n = _within_concept_id_col(mitm, concept) - yield n, sa.Column(n, sa.Integer, nullable=False, unique=True) + yield n, sa.Column(n, sa.Integer, nullable=False, unique=True, index=True) _column_generators: tuple[MitMConceptColumnGenerator, ...] = (_gen_within_concept_id_col,) @@ -253,20 +254,34 @@ def mk_sql_rep_schema(header: Header, table_name = mk_concept_table_name(header.mitm, concept) - t, t_columns, t_ref_columns = mk_table(meta, header.mitm, concept, table_name, { - 'kind': lambda: ('kind', sa.Column('kind', MITMDataType.Text.sa_sql_type, nullable=False)), - 'type': lambda: (concept_properties.typing_concept, sa.Column(concept_properties.typing_concept, - MITMDataType.Text.sa_sql_type, - nullable=False)), - 'identity': lambda: [(name, sa.Column(name, dt.sa_sql_type, nullable=False)) for - name, dt in - mitm_def.resolve_identity_type(concept).items()], - 'inline': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for name, dt in - mitm_def.resolve_inlined_types(concept).items()], - 'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for _, resolved_fk in - mitm_def.resolve_foreign_types(concept).items() for name, dt in - resolved_fk.items()] - }, additional_column_generators=(_gen_within_concept_id_col,), schema_item_generators=base_schema_item_generators, override_schema=schema_name) + t, t_columns, t_ref_columns = mk_table(meta, + header.mitm, + concept, + table_name, + { + 'kind': lambda: ('kind', sa.Column('kind', + MITMDataType.Text.sa_sql_type, + nullable=False)), + 'type': lambda: (concept_properties.typing_concept, + sa.Column(concept_properties.typing_concept, + MITMDataType.Text.sa_sql_type, + nullable=False)), + 'identity': lambda: [ + (name, sa.Column(name, dt.sa_sql_type, nullable=False)) for + name, dt in + mitm_def.resolve_identity_type(concept).items()], + 'inline': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for + name, dt in + mitm_def.resolve_inlined_types(concept).items()], + 'foreign': lambda: [(name, sa.Column(name, dt.sa_sql_type)) for + _, resolved_fk in + mitm_def.resolve_foreign_types(concept).items() + for name, dt in + resolved_fk.items()] + }, + additional_column_generators=(_gen_within_concept_id_col,), + schema_item_generators=base_schema_item_generators, + override_schema=schema_name) concept_tables[concept] = t type_table_schema_item_generators = base_schema_item_generators + ( @@ -309,20 +324,8 @@ def mk_sql_rep_schema(header: Header, return SQLRepresentationSchema(meta=meta, concept_tables=concept_tables, type_tables=type_tables, views=views) -EngineOrConnection = sa.Engine | sa.Connection - - -@contextmanager -def _nested_conn(bind: EngineOrConnection) -> Generator[sa.Connection, None, None]: - if isinstance(bind, sa.Engine): - yield bind.connect() - elif isinstance(bind, sa.Connection): - with bind.begin_nested(): - yield bind - - def insert_db_schema(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema) -> None: - sql_rep_schema.meta.create_all(bind=bind, checkfirst=False) + sql_rep_schema.meta.create_all(bind=bind, checkfirst=True) def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[str, Any] | None = None) -> list[ @@ -334,51 +337,94 @@ def _df_to_records(df: pd.DataFrame, cols: Sequence[str], additional_cols: dict[ def _df_to_table_records(df: pd.DataFrame, table: sa.Table, additional_cols: dict[str, Any] | None = None) -> list[ dict[str, Any]]: - return _df_to_records(df, (c.name for c in table.columns), additional_cols=additional_cols) + return _df_to_records(df, [c.name for c in table.columns], additional_cols=additional_cols) + + +def _insert_type_df(conn: sa.Connection, + sql_rep_schema: SQLRepresentationSchema, + mitm_def: MITMDefinition, + concept: ConceptName, + type_name: TypeName, + type_df: pd.DataFrame, + artificial_id_offset: int | None = None) -> tuple[int, int]: + parent_concept = mitm_def.get_parent(concept) + t_concept = sql_rep_schema.concept_tables[parent_concept] + # if not has_natural_pk(mitm, concept): + # TODO not pretty.. + # ideally, I'd use the returned "inserted_pk" + # values from the bulk insertion with an autoincrement id col + # but apparently almost no DBABI drivers support this + no_instances = len(type_df) + concept_id_col_name = _get_unique_id_col_name(parent_concept) + max_id = conn.execute(sa.select(func.max(t_concept.columns[concept_id_col_name]))).scalar() or 0 + start_id = max_id + (artificial_id_offset or 0) + 1 + artificial_ids = pd.RangeIndex(start=start_id, stop=start_id + no_instances, + name=concept_id_col_name) + # type_df[concept_id_col_name] = artificial_ids + type_df = type_df.assign(**{concept_id_col_name: artificial_ids}) + conn.execute(t_concept.insert(), _df_to_table_records(type_df, t_concept)) + inserted_rows = no_instances + if has_type_tables(mitm, concept): + # generated_ids = conn.execute(sa.select(t_concept.columns[concept_id_col_name])).scalars() + t_type = sql_rep_schema.type_tables[concept][type_name] + conn.execute(t_type.insert(), _df_to_table_records(type_df, t_type)) + inserted_rows += no_instances + return no_instances, inserted_rows + + +def _insert_type_dfs(conn: sa.Connection, + sql_rep_schema: SQLRepresentationSchema, + mitm_def: MITMDefinition, + concept: ConceptName, + typed_dfs: Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]) -> tuple[int, int]: + total_inserted_instances, total_inserted_rows = 0, 0 + offsets = defaultdict(int) + for type_name, type_dfs in typed_dfs: + for type_df in type_dfs: + inserted_instances, inserted_rows = _insert_type_df(conn, + sql_rep_schema, + mitm_def, + concept, + type_name, + type_df, + artificial_id_offset=offsets[type_name]) + offsets[type_name] += inserted_instances + total_inserted_instances += inserted_instances + total_inserted_rows += inserted_rows + return total_inserted_instances, total_inserted_rows + + +def insert_instances(bind: AnyDBBind, + sql_rep_schema: SQLRepresentationSchema, + mitm: MITM, + instances: Iterable[tuple[ConceptName, Iterable[tuple[TypeName, Iterable[pd.DataFrame]]]]]) -> \ + tuple[int, int]: + total_inserted_instances, total_inserted_rows = 0, 0 + mitm_def = get_mitm_def(mitm) + with use_nested_conn(bind) as conn: + for concept, typed_dfs in instances: + inserted_instances, inserted_rows = _insert_type_dfs(conn, + sql_rep_schema, + mitm_def, + concept, + typed_dfs) + total_inserted_instances += inserted_instances + total_inserted_rows += inserted_rows + conn.commit() + return inserted_instances, inserted_rows -def insert_db_instances(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None: +def insert_mitm_data_instances(bind: AnyDBBind, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> tuple[ + int, int]: from mitm_tooling.transformation.df import mitm_data_into_mitm_dataframes - h = mitm_data.header - mitm = mitm_data.header.mitm - mitm_def = get_mitm_def(mitm) mitm_dataframes = mitm_data_into_mitm_dataframes(mitm_data) - with _nested_conn(bind) as conn: - for concept, typed_dfs in mitm_dataframes: - concept_properties, concept_relations = mitm_def.get(concept) - for type_name, type_df in typed_dfs.items(): - - parent_concept = mitm_def.get_parent(concept) - t_concept = sql_rep_schema.concept_tables[parent_concept] - - # if not has_natural_pk(mitm, concept): - # TODO not pretty.. - # ideally, I'd use the returned "inserted_pk" - # values from the bulk insertion with an autoincrement id col - # but apparently almost no DBABI drivers support this - - concept_id_col_name = _get_unique_id_col_name(parent_concept) - max_id = conn.execute(sa.select(func.max(t_concept.columns[concept_id_col_name]))).scalar() or 0 - no_rows_to_insert = len(type_df) - artificial_ids = pd.RangeIndex(start=max_id + 1, stop=max_id + 1 + no_rows_to_insert, - name=concept_id_col_name) - type_df[concept_id_col_name] = artificial_ids - # type_df = type_df.assign({concept_id_col_name : artificial_ids}) - - conn.execute(t_concept.insert(), _df_to_table_records(type_df, t_concept)) - - if has_type_tables(mitm, concept): - # generated_ids = conn.execute(sa.select(t_concept.columns[concept_id_col_name])).scalars() - - t_type = sql_rep_schema.type_tables[concept][type_name] - conn.execute(t_type.insert(), _df_to_table_records(type_df, t_type)) - - conn.commit() + instances = ((t, (df,) for t, df in dfs.items()) for c, dfs in mitm_dataframes) + return insert_instances(bind, sql_rep_schema, mitm_data.header.mitm, instances) -def insert_mitm_data(bind: EngineOrConnection, sql_rep_schema, mitm_data: MITMData) -> None: +def insert_mitm_data(bind: EngineOrConnection, sql_rep_schema: SQLRepresentationSchema, mitm_data: MITMData) -> None: insert_db_schema(bind, sql_rep_schema) - insert_db_instances(bind, sql_rep_schema, mitm_data) + insert_mitm_data_instances(bind, sql_rep_schema, mitm_data) def mk_sqlite(mitm_data: MITMData, file_path: FilePath | None = ':memory:', autoclose: bool = True) -> tuple[ diff --git a/mitm_tooling/transformation/sql/from_exportable.py b/mitm_tooling/transformation/sql/from_exportable.py new file mode 100644 index 0000000000000000000000000000000000000000..30493bc395a66dc65ea7596965f46baedf30717c --- /dev/null +++ b/mitm_tooling/transformation/sql/from_exportable.py @@ -0,0 +1,57 @@ +from typing import Iterator, Iterable + +import pandas as pd + +from mitm_tooling.definition import get_mitm_def, TypeName +from mitm_tooling.extraction.sql.mapping import Exportable, DataProvider +from mitm_tooling.representation.sql_representation import SQLRepresentationSchema, insert_db_schema, insert_instances +from mitm_tooling.utilities.sql_utils import AnyDBBind, use_db_bind, use_nested_conn + + +def insert_exportable_instances( + source: AnyDBBind, + exportable: Exportable, + target: AnyDBBind, + target_sql_rep_schema: SQLRepresentationSchema, + stream_data: bool = False, +) -> tuple[int, int]: + mitm_def = get_mitm_def(exportable.mitm) + with use_db_bind(source) as source_conn: + + def instances(): + for c, dps in exportable.data_providers.items(): + + def typed_df_chunks_iter() -> Iterator[tuple[TypeName, Iterable[pd.DataFrame]]]: + for dp in dps: + def local_iter(dp: DataProvider) -> Iterator[tuple[TypeName, Iterable[pd.DataFrame]]]: + chunks = dp.instance_provider.apply_db_chunked(source_conn) if stream_data else [ + dp.instance_provider.apply_db(source_conn)] + for df_chunk in chunks: + df_chunk = dp.instance_postprocessor.apply_df(df_chunk) + for type_name, type_idx in df_chunk.groupby(mitm_def.get_properties(c).typing_concept).groups.items(): + yield type_name, (df_chunk.loc[type_idx],) + # alternative but probably less efficient + # chunk_hes = dp.header_entry_provider.apply_df(df_chunk) + # for he in chunk_hes: + # df_sub_chunk = df_chunk[df_chunk[mitm_def.get_properties(c).typing_concept] == he.type_name] + # yield he.type_name, df_sub_chunk + + yield from local_iter(dp) + + yield c, typed_df_chunks_iter() + + inserted_instances, inserted_rows = insert_instances(target, + target_sql_rep_schema, + exportable.mitm, + instances()) + + return inserted_instances, inserted_rows + + +def insert_exportable(source: AnyDBBind, + exportable: Exportable, + target: AnyDBBind, + sql_rep_schema: SQLRepresentationSchema, + stream_data: bool = False) -> None: + insert_db_schema(target, sql_rep_schema) + insert_exportable_instances(source, exportable, target, sql_rep_schema, stream_data=stream_data) diff --git a/mitm_tooling/transformation/sql/from_intermediate.py b/mitm_tooling/transformation/sql/from_intermediate.py index b5a3de67b6166872df7e39a7d225446829522556..aae92e9eda12d01a37eb53f452b1080ffb47f83f 100644 --- a/mitm_tooling/transformation/sql/from_intermediate.py +++ b/mitm_tooling/transformation/sql/from_intermediate.py @@ -1,19 +1,14 @@ from mitm_tooling.extraction.sql.data_models import DBMetaInfo from mitm_tooling.representation import MITMData from mitm_tooling.representation.intermediate_representation import Header -from mitm_tooling.representation.sql_representation import mk_sql_rep_schema, SQLRepresentationSchema, \ - SQL_REPRESENTATION_DEFAULT_SCHEMA - - -def sql_rep_schema_to_db_meta(sql_rep_schema: SQLRepresentationSchema) -> DBMetaInfo: - return DBMetaInfo.from_sa_meta(sql_rep_schema.meta, default_schema=SQL_REPRESENTATION_DEFAULT_SCHEMA) +from mitm_tooling.representation.sql_representation import mk_sql_rep_schema def header_into_db_meta(header: Header, override_schema: str | None = None) -> DBMetaInfo: + from .from_sql import sql_rep_schema_into_db_meta sql_rep_schema = mk_sql_rep_schema(header, override_schema=override_schema) - return sql_rep_schema_to_db_meta(sql_rep_schema) + return sql_rep_schema_into_db_meta(sql_rep_schema) def mitm_data_into_db_meta(mitm_data: MITMData, override_schema: str | None = None) -> DBMetaInfo: return header_into_db_meta(mitm_data.header, override_schema=override_schema) - diff --git a/mitm_tooling/transformation/sql/from_sql.py b/mitm_tooling/transformation/sql/from_sql.py new file mode 100644 index 0000000000000000000000000000000000000000..38e0f3a6e8aa11e64fd67ca0b09990e152c42608 --- /dev/null +++ b/mitm_tooling/transformation/sql/from_sql.py @@ -0,0 +1,26 @@ +from typing import Iterator + +import pandas as pd +import sqlalchemy as sa + +from mitm_tooling.definition import MITM, get_mitm_def, TypeName +from mitm_tooling.extraction.sql.data_models import DBMetaInfo +from mitm_tooling.extraction.sql.db import connect_and_reflect +from mitm_tooling.representation import HeaderEntry +from mitm_tooling.representation.sql_representation import SQL_REPRESENTATION_DEFAULT_SCHEMA, SQLRepresentationSchema, \ + EngineOrConnection, insert_db_schema +from mitm_tooling.utilities.sql_utils import use_nested_conn, use_db_bind, AnyDBBind + + +def db_engine_into_db_meta(engine: sa.Engine) -> DBMetaInfo: + sa_meta, more_meta = connect_and_reflect(engine) + return DBMetaInfo.from_sa_meta(sa_meta, default_schema=more_meta.default_schema) + + +def sql_rep_schema_into_db_meta(sql_rep_schema: SQLRepresentationSchema, + default_schema: str = SQL_REPRESENTATION_DEFAULT_SCHEMA) -> DBMetaInfo: + return DBMetaInfo.from_sa_meta(sql_rep_schema.meta, default_schema=default_schema) + + + + diff --git a/mitm_tooling/utilities/sql_utils.py b/mitm_tooling/utilities/sql_utils.py index e4c51b8021c991b9b66fb9b24bcf85973be13c74..17679580f23ec99a2d59a871b57af537298e88dd 100644 --- a/mitm_tooling/utilities/sql_utils.py +++ b/mitm_tooling/utilities/sql_utils.py @@ -1,8 +1,17 @@ +from __future__ import annotations + +from contextlib import contextmanager + +import sqlalchemy import sqlalchemy as sa from pydantic import AnyUrl from sqlalchemy import Engine -from typing import Type +from typing import Type, Generator + +from sqlalchemy.orm import Session +EngineOrConnection = sa.Engine | sa.Connection +AnyDBBind = EngineOrConnection | sqlalchemy.orm.Session def qualify(*, table: str, schema: str | None = None, column: str | None = None): res = table @@ -33,3 +42,26 @@ def sa_url_into_any_url(url: sa.engine.URL) -> AnyUrl: def dialect_cls_from_url(url: AnyUrl) -> Type[sa.engine.Dialect]: return any_url_into_sa_url(url).get_dialect() + + +@contextmanager +def use_nested_conn(bind: AnyDBBind) -> Generator[sa.Connection, None, None]: + if isinstance(bind, sa.Engine): + yield bind.connect() + elif isinstance(bind, sa.Connection): + with bind.begin_nested(): + yield bind + elif isinstance(bind, Session): + with bind.begin_nested(): + yield bind.connection() + +@contextmanager +def use_db_bind(bind: AnyDBBind) -> Generator[sa.Connection, None, None]: + if isinstance(bind, Session): + yield bind.connection() + if isinstance(bind, sa.Connection): + yield bind + elif isinstance(bind, sa.Engine): + yield bind.connect() + else: + raise TypeError(f"Expected Engine, Connection or Session, got {type(bind)}") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c1195cd71a68df03fbefd2fde7819852bc77b0cc..a11967075cc0545e9071bdc8f9cf2c05f13bb842 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "mitm-tooling" -version = "0.5.3" +version = "0.6.0" description = "" authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aachen.de" }] requires-python = ">=3.11,<3.14"