Skip to content
Snippets Groups Projects
Commit 2fc46b05 authored by Leah Tacke genannt Unterberg's avatar Leah Tacke genannt Unterberg
Browse files

squashed some bugs and added MITM_SUPPORT_INFO.md

parent a096af56
Branches
No related tags found
No related merge requests found
Showing with 187 additions and 26 deletions
# mitm-superset
## General Superset Architecture
In the following, I want to give a short overview of the superset application architecture, in particular with respect to the docker compose setup.
### React Frontend
Defined in `superset-frontend/` and depending on whether the mode in the docker-compose file is set to `development` or `production`, the frontend will either be "statically" built using some webpack build and served by the nginx container (port 80), or it is served with hot reloading by the superset_node container (port 9000).
I think it is also served by the flask backend itself (8080) if built beforehand.
I've had issues with the hot reloading. When using it, calls that fail on the node server due to, e.g., syntax errors, are proxied to the flask backend and will show up as 404s.
A relevant setting is "BUILD_FRONTEND_IN_DOCKER" in the docker-compose file.
Depending on host and docker container os differences, I would recommend caution here.
If built in docker, the startup time is even longer.
I've also had issues where the mounted `superset-frontend/node_modules` folder where folders in it became inaccessible (access denied even to SYSTEM) on my host os (Windows).
Frontend routes have to be registered in `superset-frontend/src/views/routes.tsx`.
In addition to the chart components, which can also be defined via plugins, and special components like the SQL lab or dashboard/explore, there are basically CRUD tables for the main assets.
These components can reuse the `useListViewResource` hook that wraps the API calls to the backend, as long as the resource follows the conventions.
### Flask Backend
Backend (API) Routes are registered in `superset/init/__init__.py` with a prefix of `api/v1/<resource_name>`.
They typically use `BaseSupersetModelRestApi` as a base class which has quite a lot of potential for trip ups.
From what I understand, historically, the frontend views, including charts, were defined in `superset/views.py` and served by flask.
Currently, the flask backend provides mostly/only headless api routes which are consumed by the frontend.
Still, the tabs and links that appear in the menu bar (e.g., "Dashboards") are defined in the backend init.
API endpoints are generally defined in the following structure: `superset/<resource_name_plural>/api.py`.
- Note the route method whitelist `include_route_methods`. Otherwise, the route will not be accessible, even if it is defined with `@expose`. This gives 404s.
- (At least) when using the `@protect` decorator, the route method may have to be added in `method_permission_name`. When testing a newly added route, consider that per-method permissions may not be automatically added (even for the admin) until full restart, or even recreation of the mounted superset_db volume. This may lead to 405s.
The MitM routes are defined in `superset/customization/`.
Application DB Models (User, Role, etc.) are defined in `superset/models`.
They usually have an accompanying DAO (Data Access Object) class in `superset/daos`.
The main models corresponding to managed assets are `Database`, `Datasource` (actually also `Dataset` and particularly `SqlaTable`), `Chart` (actually `Slice`), and `Dashboard`.
Such models inherit from the `ImportExportMixin`, giving them some automatic, but much more manually overridden, im- and export functionality.
Generally, ("marshmallow") schemas are used for input validation in routes and commands and are defined manually to mostly align with the sqlalchemy models themselves.
There are quite a few things to consider here, particularly, _related models_ (e.g., `slices` in `Dashboard`), which are defined via `relationship()` in the sqlalchemy models.
When importing a dashboard, the `slices` field has a different structure than when retrieving it via the API.
It may contain a list of ids or a list of objects, depending on the context.
Important to note is that superset maintains a full history of `alembic` DB migrations in `superset/migrations`.
This is important to consider when creating new models or modifying existing ones, as the database (e.g., postgres `superset_db` container) is initialized with this history of revisions.
Generally, logic, such as an asset importer, is defined via _commands_ in `superset/commands`.
However, a lot of "smaller" logic ends up duplicated in local `utils.py` files, which is not ideal. You may even find duplicated asset import code (`AssetImportCommand` and any `<Asset>ImportCommand`).
Some commands are also defined in `superset/cli/main.py` and are accessible via `superset <command>` in a shell on the `superset_app` container.
To support long-running or regularly scheduled background tasks, superset uses `celery` and `redis` as a message broker.
All celery tasks are defined in `superset/tasks/`.
Importantly, every single file with a task has to be listed in the imports in the `CELERY_CONFIG` to be able to be picked up by the celery worker container `superset_worker`.
Note that the arguments for tasks have to be serialized and sent to the worker by celery.
The workers have the same flask app config, so they can seamlessly access the same database via the same means as the main backend with the routes.
Presumably, this is one of the reasons why CRUD operations are also defined as commands that take as input only ids (ints) and not any non-serializable objects like sqlalchemy model instances or connections/sessions etc.
This way, the commands can be executed in the worker context as well.
### Celery Workers
As mentioned, celery workers are used for long-running tasks.
In theory, celery supports retrieval of task result via a result backend (configured to be redis), but superset does not appear use them.
Tasks are defined without a return type and instead, if necessary, manually write their results into redis via flask caching.
For example, async query results or chart data prefetches.
Additionally, the tasks write their task metadata events (including state changes) to a redis event stream, which is somehow connected to a websocket server that the frontend can listen to.
### Websocket
This minimal node server is used for a websocket connection to the frontend.
To my understanding, this is only actually used if the feature flag `GLOBAL_ASYNC_QUERIES` is set, and the mechanism is set to "ws" (vs. "polling").
It needs to be properly configured in the superset config, including setting the redis db for caching and communicating of asynchronously computed results.
### Configuration
Relevant for the docker compose setup are the `docker/` and `requirements/` folders.
The former contains `pythonpath_dev/superset_config.py` and my `pythonpath_dev/superset_config_docker.py`, which are the main configuration files that are loaded by the flask app.
The merge behavior is quite basic from what I can tell, so one has to be careful with redefining complex keys such as the `CELERY_CONFIG` dict where one cannot simply change one key.
Also, `.env` and `.env-local` and my `.env-mitm` are located there.
Some of the python config loads environment variables.
The `requirements/` folder contains the python package requirements for backend and worker containers.
The files are generated by the `scripts/uv-pip-compile.sh` script which has to be done manually, after changing (extra) dependencies in `pyproject.toml` or when you want development packages like `pydebug` to be available.
For example, the `mitm` dependency group has to be mentioned in the first line of the `development.in` file (`-e .[...,mitm]`).
This installs the superset package itself (path: `.`) as editable (`-e`) with the `mitm` dependency group.
There are also many dependency groups for different database dialects, such as `postgres`, `mysql`, `mssql`, etc.
Without manually adding a dialect-implementing package, having a sqlalchemy uri like `postgresql+psycopg2://...` will not work.
## MitM Additions
The mitm support touches all parts of the superset architecture.
It is gated with the feature flag "MITM_SUPPORT" in the superset config.
### MitMDataset Asset
A new kind of asset: `MitMDataset`.
It represents a coherent collection of datasets/datasources, which, to my understanding, can only be individual tables.
Specifically, it is related to exactly one database, multiple datasets (corresponding to tables in the relational representation of a Model-in-the-Middle), slices (charts) and dashboards.
It is introduced in its own db migration revision.
It has its own DAO, API route, CRUD commands, and importer.
The frontend is extended with a new tab in the menu bar and a new list component.
Note that this asset is just a "database metadata" asset, just like a dataset just represents the existence of some kind of table on a database, not the data itself.
Ordinarily, superset just manages links to external databases that live outside its own deployment.
For seamless integration, the MitM files (e.g., .maed/.zip) themselves need to be handled.
While superset has some support for uploading (csv) files to databases that are configured to permit it, this is insufficient for proper MitM support.
To have a separation of concerns, I decided to implement MitM supporting functionality as (tightly-coupled) "microservices".
For example, logic relating to the handling of MitM files (e.g., .maed/.zip), including upload and insertion into a managed relational database, is implemented in a separate microservice: `superset-mitm-service`.
### External Service Support
To support this externalization, a new extension called `external_service_manager` was introduced (`superset/extensions/`).
It is supposed to allow adding new external services to superset that can easily be internally accessed by other extensions and be directly exposed via a thin api endpoint (`ext_service`).
Request forwarding uses asynchronous celery tasks in combination with the shared (redis) cache.
Also, the async query job metadata event stream is supposed to be reused to communicate job status updates and potentially, a result url.
In addition to full request forwarding, the external service manager can be used from within the backend to realize a feature via an external service.
For example, the aforementioned upload of MitM files makes multiple requests to the supporting service in the `upload_mitm_dataset_task`.
To make passing around large results more feasible, there are some new (streaming) write to/read from cache commands.
......@@ -17,6 +17,10 @@ specific language governing permissions and limitations
under the License.
-->
# (MitM) Superset
You can read about the modifications made to add explicit support for MitM data here: [MitM Support](MITM_SUPPORT_INFO.md).
# Superset
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/license/apache-2-0)
......
......@@ -8,7 +8,7 @@ from superset.models.mitm import MitMDataset
from superset.utils.decorators import transaction, on_error
from .utils import *
from ..exceptions import MitMDatasetCreationError, \
MitMDatasetInvalidError
MitMDatasetInvalidError, ValidationError
class CreateMitMDatasetCommand(CreateMixin, BaseCommand):
......
import logging
from functools import partial
from superset import security_manager
......@@ -13,6 +14,8 @@ from ...chart.delete import DeleteChartCommand
from ...database.delete import DeleteDatabaseCommand
from ...dashboard.delete import DeleteDashboardCommand
logger = logging.getLogger(__name__)
class DeleteMitMDatasetsCommand(BaseCommand):
def __init__(self, model_ids: list[int], delete_related: bool = False):
self._model_ids = model_ids
......@@ -26,8 +29,15 @@ class DeleteMitMDatasetsCommand(BaseCommand):
if self.delete_related:
dbs = []
for item in self._models:
logger.info(f"Deleting related objects for {item}")
logger.info(f"Deleting {[d.id for d in item.dashboards]} dashboards")
logger.info(f"Deleting {[c.id for c in item.slices]} charts")
logger.info(f"Deleting {[ds.id for ds in item.tables]} datasets")
if item.dashboards:
DeleteDashboardCommand([d.id for d in item.dashboards]).run()
if item.slices:
DeleteChartCommand([c.id for c in item.slices]).run()
if item.tables:
DeleteDatasetCommand([ds.id for ds in item.tables]).run()
# DashboardDAO.delete(item.dashboards)
# ChartDAO.delete(item.slices)
......
......@@ -11,7 +11,7 @@ from superset.utils.decorators import transaction, on_error
from .common import MitMDatasetBaseCommand
from .schemas.mitm_service_schema import ListTrackedMitMDataset, TrackedMitMDataset
from .schemas.utils import cast_json
from ..exceptions import *
from ..exceptions import ValidationError, MitMDatasetGetError
from ...base import BaseCommand
......
......@@ -576,7 +576,7 @@ class GenerateVisualizationsRequest(BaseModel):
visualization_types: list[MAEDVisualizationType] | None = Field(
['baseline'], title='Visualization Types'
)
override_metadata_type: str | None = None
class HTTPValidationError(BaseModel):
detail: list[ValidationError] | None = Field(None, title='Detail')
......
......@@ -18,12 +18,20 @@ from ...importers.v1.utils import get_contents_from_bundle
class UploadMitMDatasetCommand(BaseCommand):
def __init__(self, mitm: str, dataset_name: str, mitm_zip: bytes) -> None:
def __init__(self, mitm: str, dataset_name: str, mitm_zip: bytes | None = None, mitm_zip_cache_key: str | None = None) -> None:
self.mitm = mitm
self.dataset_name = dataset_name
self.mitm_zip = mitm_zip
self.mitm_zip_cache_key = mitm_zip_cache_key
def _run(self) -> Any:
files = {}
if self.mitm_zip is None and self.mitm_zip_cache_key is not None:
files = { 'cached_files': {'mitm_zip': self.mitm_zip_cache_key}}
elif self.mitm_zip is not None and self.mitm_zip_cache_key is None:
files = { 'raw_files': {'mitm_zip': io.BytesIO(self.mitm_zip)}}
upload_request = FormDataRequest(method='POST',
headers=[],
path='/mitm_dataset/upload',
......@@ -31,10 +39,11 @@ class UploadMitMDatasetCommand(BaseCommand):
'dataset_name': self.dataset_name,
'mitm': self.mitm,
},
raw_files={'mitm_zip': self.mitm_zip},
files_meta={
'mitm_zip': (
'mitm.zip', 'application/zip', {})})
'mitm.zip', 'application/zip', {})},
**files
)
upload_response = external_service_call_manager.sync_service_request(
ExternalService.MITM, upload_request)
......@@ -51,7 +60,7 @@ class UploadMitMDatasetCommand(BaseCommand):
# encoding='application/json',
path=f'/definitions/mitm_dataset/{uuid}/import/zip',
url_params={
'include_visualizations': False,
'include_visualizations': True,
})
definition_response = external_service_call_manager.sync_service_request(
......@@ -59,7 +68,8 @@ class UploadMitMDatasetCommand(BaseCommand):
contents = None
if definition_response.status_code == 200 and definition_response.content is not None:
with zipfile.ZipFile(definition_response.content) as bundle:
with io.BytesIO(definition_response.content) as bio:
with zipfile.ZipFile(bio) as bundle:
contents = get_contents_from_bundle(bundle)
if contents:
from superset.commands.mitm.mitm_dataset.importers.v1 import \
......@@ -74,5 +84,7 @@ class UploadMitMDatasetCommand(BaseCommand):
return self._run()
def validate(self) -> None:
if not self.mitm or not self.dataset_name or not self.mitm_zip:
if not self.mitm_zip and not self.mitm_zip_cache_key:
raise MitMDatasetUploadError('Either `mitm_zip` or `mitm_zip_cache_key` must be provided.')
if not self.mitm or not self.dataset_name:
raise MitMDatasetUploadError()
from __future__ import annotations
import io
import zipfile
from shillelagh.fields import External
......@@ -29,14 +31,12 @@ class GenerateVisualizationsCommand(MitMDatasetBaseCommand):
json_data = mitm_service_schema.GenerateVisualizationsRequest(
reuse_existing_identifiers=True,
track_identifiers=True,
visualization_types=self.visualization_types)
visualization_types=self.visualization_types,
override_metadata_type='Dashboard')
viz_request = JsonRequest(method='POST',
headers=[],
path=f'/definitions/mitm_dataset/viz/{uuid}/import/zip',
url_params={
'as_assets': True
},
json_data=json_data)
definition_response = external_service_call_manager.sync_service_request(
......@@ -45,7 +45,8 @@ class GenerateVisualizationsCommand(MitMDatasetBaseCommand):
contents = None
if definition_response.status_code == 200 and definition_response.content is not None:
with zipfile.ZipFile(definition_response.content) as bundle:
with io.BytesIO(definition_response.content) as bio:
with zipfile.ZipFile(bio) as bundle:
contents = get_contents_from_bundle(bundle)
if contents:
from superset.commands.dashboard.importers.dispatcher import ImportDashboardsCommand
......
......@@ -148,7 +148,7 @@ class FormDataRequest(TempDirMixin, ForwardableRequestBase):
form_data: dict[str, Any] = pydantic.Field(default_factory=dict)
raw_files: dict[str, bytes] = pydantic.Field(repr=False, default_factory=dict)
raw_files: dict[str, bytes | io.BytesIO] = pydantic.Field(repr=False, default_factory=dict)
filesystem_files: dict[str, str | os.PathLike[str]] = pydantic.Field(default_factory=dict)
cached_files: dict[str, CacheKey] = pydantic.Field(default_factory=dict)
......
......@@ -30,12 +30,15 @@ from .schemas import MitMDatasetPostSchema, MitMDatasetPutSchema, get_export_ids
from ... import event_logger
from ...commands.importers.exceptions import NoValidFilesFoundError
from ...commands.importers.v1.utils import get_contents_from_bundle
from ...commands.mitm.caching.cache_data import WriteCacheCommand
from ...commands.mitm.caching.utils import random_cache_key
from ...commands.mitm.exceptions import *
from ...commands.mitm.mitm_dataset.create import CreateMitMDatasetCommand
from ...commands.mitm.mitm_dataset.delete import DeleteMitMDatasetsCommand
from ...commands.mitm.mitm_dataset.export import ExportMitMDatasetsCommand
from ...commands.mitm.mitm_dataset.importers.v1 import ImportMitMDatasetsCommand
from ...commands.mitm.mitm_dataset.update import UpdateMitMDatasetCommand
from ...commands.mitm.mitm_service.schemas.mitm_service_schema import ValidationError
from ...commands.mitm.mitm_service.utils import test_access
from ...utils import json
from ...utils.core import get_user_id
......@@ -550,7 +553,15 @@ class MitMDatasetRestApi(BaseSupersetModelRestApi):
log_to_statsd=False,
)
@requires_form_data
def upload(self, **kwargs: Any) -> Response:
def upload(self) -> Response:
logger.info('Received upload request')
logger.info(f"Request headers: {request.headers}")
logger.info(f"Request form data: {request.form}")
logger.info(f"Request files: {[file.filename for file in request.files.values()]}")
# logger.info('Content Length: %s', request.content_length)
from flask import current_app
logger.info('Max Content Length: %s', current_app.config['MAX_CONTENT_LENGTH'])
dataset_name = request.form.get('dataset_name')
mitm_name = request.form.get('mitm')
zip = request.form.get('mitm_zip')
......@@ -563,14 +574,16 @@ class MitMDatasetRestApi(BaseSupersetModelRestApi):
mitm_zip = mitm_zip or zip
mitm_bytes = mitm_zip.read()
ck = random_cache_key()
WriteCacheCommand(ck, mitm_bytes).run()
from superset.customization.external_service_support.service_registry import \
ExternalService
from superset.tasks.mitm.upload_mitm_dataset import upload_mitm_dataset_task
def handler(jm):
return upload_mitm_dataset_task.delay(jm, dataset_name, mitm_name,
mitm_bytes)
return upload_mitm_dataset_task.delay(jm, dataset_name=dataset_name, mitm_zip_cache_key=ck,
mitm_name=mitm_name)
from superset.extensions import external_service_call_manager
job_metadata = external_service_call_manager.submit_async_internally_handled(
......
......@@ -10,6 +10,7 @@ from superset.extensions import external_service_call_manager
from superset.utils.core import override_user
from .common import get_service_call_timeout, load_user_from_job_metadata
from ...commands.exceptions import CommandException
from ...commands.mitm.caching.cache_data import ReadCacheCommand
logger = logging.getLogger(__name__)
......@@ -22,7 +23,7 @@ if TYPE_CHECKING:
ignore_result=True)
def upload_mitm_dataset_task(job_metadata: AsyncJobMetadata,
dataset_name: str,
mitm_zip: bytes,
mitm_zip_cache_key: str,
mitm_name: str = 'MAED') -> None:
external_service_call_manager.call_started(job_metadata)
......@@ -30,7 +31,9 @@ def upload_mitm_dataset_task(job_metadata: AsyncJobMetadata,
with override_user(load_user_from_job_metadata(job_metadata), force=False):
try:
UploadMitMDatasetCommand(mitm_name, dataset_name, mitm_zip).run()
mitm_zip = ReadCacheCommand(mitm_zip_cache_key).run()
UploadMitMDatasetCommand(mitm_name, dataset_name, mitm_zip=mitm_zip).run()
external_service_call_manager.call_completed(job_metadata)
except CommandException as ex:
......
......@@ -41,7 +41,7 @@ Content-Disposition: form-data; name="dataset_name";
My New Dataset
--boundary
Content-Disposition: form-data; name="mitm_name";
Content-Disposition: form-data; name="mitm";
MAED
--boundary
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment