Skip to content
Snippets Groups Projects
Commit b8caffe4 authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

work on sql representation and data conversion

parent 934b5d1f
Branches
No related tags found
No related merge requests found
from .unpack_intermediate import unpack_mitm_data
\ No newline at end of file
import itertools
from collections import defaultdict
from collections.abc import Sequence
import pandas as pd
from mitm_tooling.data_types import convert, MITMDataType
from mitm_tooling.definition import get_mitm_def, MITM, ConceptName
from mitm_tooling.definition.definition_tools import map_col_groups
from mitm_tooling.representation import MITMData, MITMDataset, Header
from mitm_tooling.representation import mk_concept_file_header
from mitm_tooling.representation.common import guess_k_of_header_df, mk_header_file_columns
def pack_typed_dfs_as_concept_table(mitm: MITM, concept: ConceptName, dfs: Sequence[pd.DataFrame]) -> pd.DataFrame:
assert len(dfs) > 0
normalized_dfs = []
for df in dfs:
base_cols, col_dts = mk_concept_file_header(mitm, concept, 0)
attr_cols = set(df.columns) - set(base_cols)
k = len(attr_cols)
normal_form_cols = list(base_cols) + list(attr_cols)
df = df.reindex(columns=normal_form_cols)
df = convert.convert_df(df, col_dts | {c: MITMDataType.Unknown for c in attr_cols})
squashed_form_cols = mk_concept_file_header(mitm, concept, k)[0]
df.columns = squashed_form_cols
normalized_dfs.append((df, k))
max_k = max(normalized_dfs, key=lambda x: x[1])[1]
squashed_form_cols = mk_concept_file_header(mitm, concept, max_k)[0]
return pd.concat([df for df, _ in normalized_dfs], axis='rows', ignore_index=True).reindex(
columns=squashed_form_cols)
def unpack_concept_table_as_typed_dfs(header: Header, concept: ConceptName, df: pd.DataFrame) -> dict[
str, pd.DataFrame]:
mitm_def = get_mitm_def(header.mitm)
concept_properties, concept_relations = mitm_def.get(concept)
with_header_entry = {}
if concept in mitm_def.sub_concepts:
for (key, typ), idx in df.groupby(['kind', concept_properties.typing_concept]).groups.items():
key, type_name = str(key), str(typ)
specific_concept = mitm_def.inverse_concept_key_map[key]
he = header.get(specific_concept, type_name)
assert he is not None, 'missing type entry in header'
with_header_entry[(specific_concept, type_name)] = (he, df.loc[idx])
else:
for typ, idx in df.groupby(concept_properties.typing_concept).groups.items():
type_name = str(typ)
he = header.get(concept, type_name)
assert he is not None, 'missing type entry in header'
with_header_entry[(concept, type_name)] = (he, df.loc[idx])
res = {}
for (concept, type_name), (he, type_df) in with_header_entry.items():
k = he.get_k()
base_cols, base_dts = mk_concept_file_header(header.mitm, concept, 0)
normal_form_cols, _ = mk_concept_file_header(header.mitm, concept, k)
type_df = type_df.reindex(columns=normal_form_cols)
unpacked_cols = list(base_cols) + list(he.attributes)
unpacked_dts = base_dts | dict(zip(he.attributes, he.attribute_dtypes))
type_df.columns = unpacked_cols
res[he.type_name] = convert.convert_df(type_df, unpacked_dts)
return res
def unpack_mitm_data(mitm_data: MITMData) -> MITMDataset:
return MITMDataset(header=mitm_data.header,
dfs={concept: unpack_concept_table_as_typed_dfs(mitm_data.header, concept, df) for concept, df in
mitm_data})
...@@ -16,7 +16,8 @@ from ..transformation import PostProcessing ...@@ -16,7 +16,8 @@ from ..transformation import PostProcessing
from .mapping import ConceptMapping, DataProvider, ConceptMappingException, InstancesProvider, \ from .mapping import ConceptMapping, DataProvider, ConceptMappingException, InstancesProvider, \
InstancesPostProcessor InstancesPostProcessor
from mitm_tooling.io import ZippedExport, StreamingZippedExport from mitm_tooling.io import ZippedExport, StreamingZippedExport
from mitm_tooling.representation import HeaderEntry, Header, StreamingConceptData, StreamingMITMData, MITMData, mk_concept_file_header from mitm_tooling.representation import HeaderEntry, Header, StreamingConceptData, StreamingMITMData, MITMData
from mitm_tooling.representation import mk_concept_file_header
STREAMING_CHUNK_SIZE = 100_000 STREAMING_CHUNK_SIZE = 100_000
...@@ -49,7 +50,6 @@ class Exportable(pydantic.BaseModel): ...@@ -49,7 +50,6 @@ class Exportable(pydantic.BaseModel):
return ZippedExport(mitm=self.mitm, filename=filename, mitm_data=MITMData(header=header, concept_dfs=tables)) return ZippedExport(mitm=self.mitm, filename=filename, mitm_data=MITMData(header=header, concept_dfs=tables))
def stream_to_file(self, db_session: Session, validate: bool = False) -> StreamingZippedExport: def stream_to_file(self, db_session: Session, validate: bool = False) -> StreamingZippedExport:
data_sources = {} data_sources = {}
...@@ -60,12 +60,12 @@ class Exportable(pydantic.BaseModel): ...@@ -60,12 +60,12 @@ class Exportable(pydantic.BaseModel):
chunk_iterators = [] chunk_iterators = []
for dp in dps: for dp in dps:
def local_iter(dp: DataProvider, columns=concept_file_columns) -> Iterator[ def local_iter(dp: DataProvider, columns=tuple(concept_file_columns)) -> Iterator[
tuple[pd.DataFrame, list[HeaderEntry]]]: tuple[pd.DataFrame, list[HeaderEntry]]]:
for df_chunk in dp.instance_provider.from_session_chunked(db_session, STREAMING_CHUNK_SIZE): for df_chunk in dp.instance_provider.from_session_chunked(db_session, STREAMING_CHUNK_SIZE):
if validate: if validate:
raise NotImplementedError raise NotImplementedError
df_chunk = df_chunk.reindex(columns=columns, copy=False) df_chunk = df_chunk.reindex(columns=list(columns), copy=False)
yield dp.instance_postprocessor.apply(df_chunk), dp.header_entry_provider.from_df(df_chunk) yield dp.instance_postprocessor.apply(df_chunk), dp.header_entry_provider.from_df(df_chunk)
chunk_iterators.append(local_iter(dp)) chunk_iterators.append(local_iter(dp))
......
from . import intermediate_representation from . import intermediate_representation
from . import file_representation from . import file_representation
from . import df_representation
from . import sql_representation from . import sql_representation
from .file_representation import mk_concept_file_header, write_header_file, write_data_file, read_data_file, read_header_file from .file_representation import write_header_file, write_data_file, read_data_file, read_header_file
from .common import mk_concept_file_header
from .intermediate_representation import HeaderEntry, Header, MITMData, StreamingMITMData, StreamingConceptData, ColumnName from .intermediate_representation import HeaderEntry, Header, MITMData, StreamingMITMData, StreamingConceptData, ColumnName
from .df_representation import MITMDataset
from .sql_representation import mk_db_schema, insert_mitm_data, mk_sqlite from .sql_representation import mk_db_schema, insert_mitm_data, mk_sqlite
def guess_k(df): import itertools
from mitm_tooling.data_types import MITMDataType
from mitm_tooling.definition import MITM, ConceptName, get_mitm_def
from mitm_tooling.definition.definition_tools import map_col_groups
def guess_k_of_header_df(df):
return sum((1 for c in df.columns if c.startswith('a_') and not c.startswith('a_dt'))) return sum((1 for c in df.columns if c.startswith('a_') and not c.startswith('a_dt')))
def mk_header_file_columns(k: int) -> list[str]:
return ['kind', 'type'] + list(
itertools.chain(*((f'a_{i}', f'a_dt_{i}') for i in range(1, k + 1))))
def mk_concept_file_header(mitm: MITM, concept: ConceptName, k: int) -> tuple[list[str], dict[str, MITMDataType]]:
mitm_def = get_mitm_def(mitm)
_, dts = map_col_groups(mitm_def, concept, {
'kind': lambda: ('kind', MITMDataType.Text),
'type': lambda: (mitm_def.get_properties(concept).typing_concept, MITMDataType.Text),
'identity': lambda: mitm_def.resolve_identity_type(concept).items(),
'inline': lambda: mitm_def.resolve_inlined_types(concept).items(),
'foreign': lambda: [
(name, dt) for fk_types in mitm_def.resolve_foreign_types(concept).values() for name, dt in
fk_types.items()],
'attributes': lambda: [(f'a_{i}', MITMDataType.Unknown) for i in range(1, k + 1)],
})
return list(dts.keys()), dict(dts)
def mk_attr_columns(k: int) -> list[str]:
return [f'a_{i}' for i in range(1, k + 1)]
from collections.abc import Iterable
import pandas as pd import pandas as pd
import pydantic import pydantic
from pydantic import ConfigDict
from mitm_tooling.definition import ConceptName from mitm_tooling.definition import ConceptName
from .intermediate_representation import Header from .intermediate_representation import Header
class MITMDataset(pydantic.BaseModel): class MITMDataset(Iterable[tuple[str, dict[str, pd.DataFrame]]], pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
header: Header header: Header
dfs: dict[ConceptName, dict[str, pd.DataFrame]] dfs: dict[ConceptName, dict[str, pd.DataFrame]]
def __iter__(self):
return iter(self.dfs.items())
import itertools
import os
from collections.abc import Iterable
from typing import BinaryIO, TextIO
import pandas as pd import pandas as pd
from mitm_tooling.data_types import MITMDataType
from mitm_tooling.data_types.convert import convert_df from mitm_tooling.data_types.convert import convert_df
from mitm_tooling.definition import get_mitm_def, MITM, ConceptName from mitm_tooling.definition import MITM, ConceptName
from mitm_tooling.definition.definition_tools import map_col_groups from mitm_tooling.utilities.io_utils import DataSink, DataSource, use_for_pandas_io, FilePath, ensure_directory_exists
from mitm_tooling.representation.common import guess_k from .common import guess_k_of_header_df
from mitm_tooling.utilities.io_utils import DataSink, DataSource, use_for_pandas_io, FilePath, ensure_directory_exists, ensure_ext from .common import mk_header_file_columns, mk_concept_file_header
from mitm_tooling.utilities.python_utils import i_th
def mk_header_file_columns(k: int) -> list[str]:
return ['kind', 'type'] + list(
itertools.chain(*((f'a_{i}', f'a_dt_{i}') for i in range(1, k + 1))))
def mk_concept_file_header(mitm: MITM, concept: ConceptName, k: int) -> tuple[list[str], dict[str, MITMDataType]]:
mitm_def = get_mitm_def(mitm)
_, dts = map_col_groups(mitm_def, concept, {
'kind': lambda: ('kind', MITMDataType.Text),
'type': lambda: (mitm_def.get_properties(concept).typing_concept, MITMDataType.Text),
'identity': lambda: mitm_def.resolve_identity_type(concept).items(),
'inline': lambda: mitm_def.resolve_inlined_types(concept).items(),
'foreign': lambda: [
(name, dt) for fk_types in mitm_def.resolve_foreign_types(concept).values() for name, dt in
fk_types.items()],
'attributes': lambda: [(f'a_{i}', MITMDataType.Unknown) for i in range(1, k + 1)],
})
return list(dts.keys()), dict(dts)
def write_header_file(df: pd.DataFrame, sink: DataSink) -> None: def write_header_file(df: pd.DataFrame, sink: DataSink) -> None:
...@@ -52,7 +23,7 @@ def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFram ...@@ -52,7 +23,7 @@ def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFram
with use_for_pandas_io(source) as f: with use_for_pandas_io(source) as f:
df = pd.read_csv(f, sep=';') df = pd.read_csv(f, sep=';')
if normalize: if normalize:
k = guess_k(df) k = guess_k_of_header_df(df)
df = df.astype(pd.StringDtype()).reindex(columns=mk_header_file_columns(k)) df = df.astype(pd.StringDtype()).reindex(columns=mk_header_file_columns(k))
return df return df
...@@ -62,7 +33,7 @@ def read_data_file(source: DataSource, target_mitm: MITM | None = None, target_c ...@@ -62,7 +33,7 @@ def read_data_file(source: DataSource, target_mitm: MITM | None = None, target_c
with use_for_pandas_io(source) as f: with use_for_pandas_io(source) as f:
df = pd.read_csv(f, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z', low_memory=False) df = pd.read_csv(f, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z', low_memory=False)
if normalize and target_mitm and target_concept: if normalize and target_mitm and target_concept:
k = guess_k(df) k = guess_k_of_header_df(df)
cols, column_dts = mk_concept_file_header(target_mitm, target_concept, k) cols, column_dts = mk_concept_file_header(target_mitm, target_concept, k)
df = df.reindex(columns=cols) df = df.reindex(columns=cols)
convert_df(df, column_dts, inplace=True) convert_df(df, column_dts, inplace=True)
......
...@@ -2,8 +2,9 @@ from __future__ import annotations ...@@ -2,8 +2,9 @@ from __future__ import annotations
import itertools import itertools
import logging import logging
from collections.abc import Iterator, Iterable, Sequence from collections import defaultdict
from typing import TYPE_CHECKING, Self from collections.abc import Iterator, Iterable, Sequence, Mapping
from typing import TYPE_CHECKING, Self, Any
import pandas as pd import pandas as pd
import pydantic import pydantic
...@@ -12,8 +13,7 @@ from pydantic import ConfigDict ...@@ -12,8 +13,7 @@ from pydantic import ConfigDict
from mitm_tooling.definition import get_mitm_def from mitm_tooling.definition import get_mitm_def
from mitm_tooling.data_types.data_types import MITMDataType from mitm_tooling.data_types.data_types import MITMDataType
from mitm_tooling.definition.definition_representation import ConceptName, MITM from mitm_tooling.definition.definition_representation import ConceptName, MITM
from .common import guess_k from .common import guess_k_of_header_df, mk_header_file_columns
from .file_representation import mk_header_file_columns
logger = logging.getLogger('api') logger = logging.getLogger('api')
ColumnName = str ColumnName = str
...@@ -77,6 +77,16 @@ class Header(pydantic.BaseModel): ...@@ -77,6 +77,16 @@ class Header(pydantic.BaseModel):
lol = [he.to_row() for he in deduplicated.values()] lol = [he.to_row() for he in deduplicated.values()]
return pd.DataFrame(data=lol, columns=mk_header_file_columns(k)) return pd.DataFrame(data=lol, columns=mk_header_file_columns(k))
def get(self, concept: ConceptName, type_name: str) -> HeaderEntry | None:
return self._map.get(concept, {}).get(type_name)
@property
def _map(self) -> Mapping[ConceptName, Mapping[str, HeaderEntry]]:
res = defaultdict(dict)
for he in self.header_entries:
res[he.concept][he.type_name] = he
return dict(res)
class MITMData(Iterable[tuple[ConceptName, pd.DataFrame]], pydantic.BaseModel): class MITMData(Iterable[tuple[ConceptName, pd.DataFrame]], pydantic.BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
...@@ -102,5 +112,3 @@ class StreamingMITMData(Iterable[tuple[ConceptName, StreamingConceptData]], pyda ...@@ -102,5 +112,3 @@ class StreamingMITMData(Iterable[tuple[ConceptName, StreamingConceptData]], pyda
def __iter__(self): def __iter__(self):
return iter(self.data_sources.items()) return iter(self.data_sources.items())
...@@ -9,6 +9,7 @@ from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRela ...@@ -9,6 +9,7 @@ from mitm_tooling.definition import MITMDefinition, ConceptProperties, OwnedRela
from mitm_tooling.data_types import MITMDataType from mitm_tooling.data_types import MITMDataType
from mitm_tooling.definition import ConceptName, MITM, get_mitm_def, ConceptKind, ConceptLevel, RelationName from mitm_tooling.definition import ConceptName, MITM, get_mitm_def, ConceptKind, ConceptLevel, RelationName
from mitm_tooling.definition.definition_tools import map_col_groups, ColGroupMaps from mitm_tooling.definition.definition_tools import map_col_groups, ColGroupMaps
from .df_representation import MITMDataset
from .intermediate_representation import Header, MITMData, ColumnName from .intermediate_representation import Header, MITMData, ColumnName
from mitm_tooling.utilities.sql_utils import create_sa_engine from mitm_tooling.utilities.sql_utils import create_sa_engine
from mitm_tooling.utilities import python_utils from mitm_tooling.utilities import python_utils
...@@ -156,6 +157,7 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st ...@@ -156,6 +157,7 @@ def mk_db_schema(header: Header) -> tuple[sa.MetaData, dict[ConceptName, dict[st
def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMData): def insert_db_instances(engine: sa.Engine, meta: sa.MetaData, mitm_data: MITMData):
with engine.connect() as conn: with engine.connect() as conn:
h = mitm_data.header
mitm = mitm_data.header.mitm mitm = mitm_data.header.mitm
for concept, df in mitm_data: for concept, df in mitm_data:
...@@ -189,5 +191,4 @@ def mk_sqlite(mitm_data: MITMData, file_path: str | None = ':memory:') -> tuple[ ...@@ -189,5 +191,4 @@ def mk_sqlite(mitm_data: MITMData, file_path: str | None = ':memory:') -> tuple[
meta, tables = insert_mitm_data(engine, mitm_data) meta, tables = insert_mitm_data(engine, mitm_data)
print(meta.tables) print(meta.tables)
print([f'{t.name}: {t.columns} {t.constraints}' for ts in tables.values() for t in ts.values()]) print([f'{t.name}: {t.columns} {t.constraints}' for ts in tables.values() for t in ts.values()])
meta.create_all(engine)
return engine, meta, tables return engine, meta, tables
import pandas as pd
from mitm_tooling.extraction.df import unpack_mitm_data
def test_to_df():
from mitm_tooling.io import importing
from mitm_tooling.definition import MITM
syn = importing.read_zip('synthetic.maed', MITM.MAED)
mitm_dataset = unpack_mitm_data(syn)
for c, typed_dfs in mitm_dataset:
for type_name, df in typed_dfs.items():
print(df.head())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment