diff --git a/mitm_tooling/io/exporting.py b/mitm_tooling/io/exporting.py index c236c810884fd792fee30153b4f8e3401258efa0..05a7c22da1610c3042b057876ebb88baa86d8d6d 100644 --- a/mitm_tooling/io/exporting.py +++ b/mitm_tooling/io/exporting.py @@ -1,20 +1,17 @@ -import dataclasses - -import pydantic - +import datetime import io import logging import os import zipfile from abc import ABC, abstractmethod -from typing import BinaryIO, Iterator +from typing import Iterable -import pandas as pd +import pydantic -from mitm_tooling.definition import MITM, ConceptName, get_mitm_def -from mitm_tooling.representation.intermediate_representation import HeaderEntry, Header, StreamingConceptData, MITMData, \ - StreamingMITMData +from mitm_tooling.definition import MITM, get_mitm_def from mitm_tooling.representation.file_representation import write_header_file, write_data_file +from mitm_tooling.representation.intermediate_representation import Header, MITMData, \ + StreamingMITMData from mitm_tooling.utilities.io_utils import DataSink, ByteSink, use_bytes_io, ensure_ext, FilePath logger = logging.getLogger('api') @@ -81,9 +78,39 @@ class StreamingZippedExport(FileExport): logger.debug(f'Wrote {len(df_chunk)} rows to {fn} (streaming export).') with zf.open('header.csv', 'w') as hf: - header_df = Header(mitm=self.mitm, header_entries=tuple(collected_header_entries)).generate_header_df() + header_df = Header(mitm=self.mitm, + header_entries=tuple(collected_header_entries)).generate_header_df() write_header_file(header_df, hf) + def to_stream(self, chunk_size: int = 65536) -> Iterable[bytes]: + from stream_zip import stream_zip, ZIP_64 + from stat import S_IFREG + mitm_def = get_mitm_def(self.mitm) + collected_header_entries = [] + + def files(): + modified_at = datetime.datetime.now() + mode = S_IFREG | 0o600 + + for c, concept_data in self.streaming_mitm_data: + fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv') + + def concept_file_data(concept_data=concept_data): + yield write_data_file(concept_data.structure_df, sink=None, append=False).encode('utf-8') + for df_chunks in concept_data.chunk_iterators: + for df_chunk, header_entries in df_chunks: + collected_header_entries.extend(header_entries) + yield write_data_file(df_chunk, sink=None, append=True).encode('utf-8') + + yield fn, modified_at, mode, ZIP_64, concept_file_data() + + header_df = Header(mitm=self.mitm, header_entries=tuple(collected_header_entries)).generate_header_df() + yield 'header.csv', modified_at, mode, ZIP_64, (write_header_file(header_df, sink=None).encode('utf-8'),) + + return stream_zip(files(), chunk_size=chunk_size) + def write_zip(target: FilePath, mitm_data: MITMData): - return ZippedExport(mitm=mitm_data.header.mitm, filename=os.path.basename(target), mitm_data=mitm_data).write(target) + return ZippedExport(mitm=mitm_data.header.mitm, + filename=os.path.basename(target), + mitm_data=mitm_data).write(target) diff --git a/mitm_tooling/representation/file_representation.py b/mitm_tooling/representation/file_representation.py index 70c1e5b0038c80831a9896aa4ded8709ae3ad020..f5b6b1831b61c979263108a91a238bcb40df0520 100644 --- a/mitm_tooling/representation/file_representation.py +++ b/mitm_tooling/representation/file_representation.py @@ -7,16 +7,16 @@ from .common import guess_k_of_header_df from .common import mk_header_file_columns, mk_concept_file_header -def write_header_file(df: pd.DataFrame, sink: DataSink) -> None: +def write_header_file(df: pd.DataFrame, sink: DataSink | None) -> str | None: if isinstance(sink, FilePath): ensure_directory_exists(sink) - df.to_csv(sink, header=True, index=False, sep=';') + return df.to_csv(sink, header=True, index=False, sep=';') -def write_data_file(df: pd.DataFrame, sink: DataSink, append: bool = False) -> None: +def write_data_file(df: pd.DataFrame, sink: DataSink | None, append: bool = False) -> str | None: if isinstance(sink, FilePath): ensure_directory_exists(sink) - df.to_csv(sink, header=not append, index=False, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z') + return df.to_csv(sink, header=not append, index=False, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z') def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFrame: diff --git a/mitm_tooling/transformation/superset/visualizations/registry.py b/mitm_tooling/transformation/superset/visualizations/registry.py index 48da14020b4396adf0a95aa7933a6eadd7674726..d702d27577ded0b582fcd7845178ee8863940e14 100644 --- a/mitm_tooling/transformation/superset/visualizations/registry.py +++ b/mitm_tooling/transformation/superset/visualizations/registry.py @@ -1,4 +1,4 @@ -from typing import Type +from typing import Type, Union from mitm_tooling.definition import MITM from mitm_tooling.representation import Header @@ -6,7 +6,7 @@ from .maed.registry import MAEDVisualizationType, maed_visualization_creators from ..asset_bundles.identifier import MitMDatasetIdentifierBundle from ..visualizations.abstract import VisualizationsCreator, SupersetVisualizationBundle -VisualizationType = MAEDVisualizationType | None +VisualizationType = Union[MAEDVisualizationType] mitm_visualization_creators = { MITM.MAED: maed_visualization_creators diff --git a/pyproject.toml b/pyproject.toml index 62e7582312ec5c431ac2e4300fe554a5f2ea7521..c1195cd71a68df03fbefd2fde7819852bc77b0cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,8 +6,8 @@ authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aac requires-python = ">=3.11,<3.14" readme = "README.md" dependencies = [ - "pydantic>=2.9.2", - "pyyaml==6.0.2", + "pydantic>=2.11.4", + "pyyaml>=6.0", "genson>=1.3.0", "sqlalchemy[postgresql-psycopg, postgresql]>=2", "sqlalchemy-utils>=0.38.0", @@ -15,7 +15,8 @@ dependencies = [ "pandas[performance, excel, hdf5, output_formatting, computation, postgresql, mysql, sql-other, plot, compression]>=2.0", "matplotlib", "seaborn", - "plotly", + "plotly>=6", + "stream-zip" ] [project.optional-dependencies]