Skip to content
Snippets Groups Projects
Commit 70eb851a authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

added proper streaming capability for export

parent 4c26730a
No related branches found
No related tags found
No related merge requests found
import dataclasses import datetime
import pydantic
import io import io
import logging import logging
import os import os
import zipfile import zipfile
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import BinaryIO, Iterator from typing import Iterable
import pandas as pd import pydantic
from mitm_tooling.definition import MITM, ConceptName, get_mitm_def from mitm_tooling.definition import MITM, get_mitm_def
from mitm_tooling.representation.intermediate_representation import HeaderEntry, Header, StreamingConceptData, MITMData, \
StreamingMITMData
from mitm_tooling.representation.file_representation import write_header_file, write_data_file from mitm_tooling.representation.file_representation import write_header_file, write_data_file
from mitm_tooling.representation.intermediate_representation import Header, MITMData, \
StreamingMITMData
from mitm_tooling.utilities.io_utils import DataSink, ByteSink, use_bytes_io, ensure_ext, FilePath from mitm_tooling.utilities.io_utils import DataSink, ByteSink, use_bytes_io, ensure_ext, FilePath
logger = logging.getLogger('api') logger = logging.getLogger('api')
...@@ -81,9 +78,39 @@ class StreamingZippedExport(FileExport): ...@@ -81,9 +78,39 @@ class StreamingZippedExport(FileExport):
logger.debug(f'Wrote {len(df_chunk)} rows to {fn} (streaming export).') logger.debug(f'Wrote {len(df_chunk)} rows to {fn} (streaming export).')
with zf.open('header.csv', 'w') as hf: with zf.open('header.csv', 'w') as hf:
header_df = Header(mitm=self.mitm, header_entries=tuple(collected_header_entries)).generate_header_df() header_df = Header(mitm=self.mitm,
header_entries=tuple(collected_header_entries)).generate_header_df()
write_header_file(header_df, hf) write_header_file(header_df, hf)
def to_stream(self, chunk_size: int = 65536) -> Iterable[bytes]:
from stream_zip import stream_zip, ZIP_64
from stat import S_IFREG
mitm_def = get_mitm_def(self.mitm)
collected_header_entries = []
def files():
modified_at = datetime.datetime.now()
mode = S_IFREG | 0o600
for c, concept_data in self.streaming_mitm_data:
fn = ensure_ext(mitm_def.get_properties(c).plural, '.csv')
def concept_file_data(concept_data=concept_data):
yield write_data_file(concept_data.structure_df, sink=None, append=False).encode('utf-8')
for df_chunks in concept_data.chunk_iterators:
for df_chunk, header_entries in df_chunks:
collected_header_entries.extend(header_entries)
yield write_data_file(df_chunk, sink=None, append=True).encode('utf-8')
yield fn, modified_at, mode, ZIP_64, concept_file_data()
header_df = Header(mitm=self.mitm, header_entries=tuple(collected_header_entries)).generate_header_df()
yield 'header.csv', modified_at, mode, ZIP_64, (write_header_file(header_df, sink=None).encode('utf-8'),)
return stream_zip(files(), chunk_size=chunk_size)
def write_zip(target: FilePath, mitm_data: MITMData): def write_zip(target: FilePath, mitm_data: MITMData):
return ZippedExport(mitm=mitm_data.header.mitm, filename=os.path.basename(target), mitm_data=mitm_data).write(target) return ZippedExport(mitm=mitm_data.header.mitm,
filename=os.path.basename(target),
mitm_data=mitm_data).write(target)
...@@ -7,16 +7,16 @@ from .common import guess_k_of_header_df ...@@ -7,16 +7,16 @@ from .common import guess_k_of_header_df
from .common import mk_header_file_columns, mk_concept_file_header from .common import mk_header_file_columns, mk_concept_file_header
def write_header_file(df: pd.DataFrame, sink: DataSink) -> None: def write_header_file(df: pd.DataFrame, sink: DataSink | None) -> str | None:
if isinstance(sink, FilePath): if isinstance(sink, FilePath):
ensure_directory_exists(sink) ensure_directory_exists(sink)
df.to_csv(sink, header=True, index=False, sep=';') return df.to_csv(sink, header=True, index=False, sep=';')
def write_data_file(df: pd.DataFrame, sink: DataSink, append: bool = False) -> None: def write_data_file(df: pd.DataFrame, sink: DataSink | None, append: bool = False) -> str | None:
if isinstance(sink, FilePath): if isinstance(sink, FilePath):
ensure_directory_exists(sink) ensure_directory_exists(sink)
df.to_csv(sink, header=not append, index=False, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z') return df.to_csv(sink, header=not append, index=False, sep=';', date_format='%Y-%m-%dT%H:%M:%S.%f%z')
def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFrame: def read_header_file(source: DataSource, normalize: bool = False) -> pd.DataFrame:
......
from typing import Type from typing import Type, Union
from mitm_tooling.definition import MITM from mitm_tooling.definition import MITM
from mitm_tooling.representation import Header from mitm_tooling.representation import Header
...@@ -6,7 +6,7 @@ from .maed.registry import MAEDVisualizationType, maed_visualization_creators ...@@ -6,7 +6,7 @@ from .maed.registry import MAEDVisualizationType, maed_visualization_creators
from ..asset_bundles.identifier import MitMDatasetIdentifierBundle from ..asset_bundles.identifier import MitMDatasetIdentifierBundle
from ..visualizations.abstract import VisualizationsCreator, SupersetVisualizationBundle from ..visualizations.abstract import VisualizationsCreator, SupersetVisualizationBundle
VisualizationType = MAEDVisualizationType | None VisualizationType = Union[MAEDVisualizationType]
mitm_visualization_creators = { mitm_visualization_creators = {
MITM.MAED: maed_visualization_creators MITM.MAED: maed_visualization_creators
......
...@@ -6,8 +6,8 @@ authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aac ...@@ -6,8 +6,8 @@ authors = [{ name = "Leah Tacke genannt Unterberg", email = "l.tgu@pads.rwth-aac
requires-python = ">=3.11,<3.14" requires-python = ">=3.11,<3.14"
readme = "README.md" readme = "README.md"
dependencies = [ dependencies = [
"pydantic>=2.9.2", "pydantic>=2.11.4",
"pyyaml==6.0.2", "pyyaml>=6.0",
"genson>=1.3.0", "genson>=1.3.0",
"sqlalchemy[postgresql-psycopg, postgresql]>=2", "sqlalchemy[postgresql-psycopg, postgresql]>=2",
"sqlalchemy-utils>=0.38.0", "sqlalchemy-utils>=0.38.0",
...@@ -15,7 +15,8 @@ dependencies = [ ...@@ -15,7 +15,8 @@ dependencies = [
"pandas[performance, excel, hdf5, output_formatting, computation, postgresql, mysql, sql-other, plot, compression]>=2.0", "pandas[performance, excel, hdf5, output_formatting, computation, postgresql, mysql, sql-other, plot, compression]>=2.0",
"matplotlib", "matplotlib",
"seaborn", "seaborn",
"plotly", "plotly>=6",
"stream-zip"
] ]
[project.optional-dependencies] [project.optional-dependencies]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment