Select Git revision
measurements.cpython-311.pyc
upload_mitm_dataset.py 2.04 KiB
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from celery.exceptions import SoftTimeLimitExceeded
from superset.extensions import celery_app
from superset.extensions import external_service_call_manager
from superset.utils.core import override_user
from .common import get_service_call_timeout, load_user_from_job_metadata
from ...commands.exceptions import CommandException
from ...commands.mitm.caching.cache_data import ReadCacheCommand
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from superset.customization.external_service_support.common import AsyncJobMetadata
@celery_app.task(name='upload_mitm_dataset',
soft_time_limit=get_service_call_timeout(),
ignore_result=True)
def upload_mitm_dataset_task(job_metadata: AsyncJobMetadata,
dataset_name: str,
mitm_zip_cache_key: str,
mitm_name: str = 'MAED') -> None:
external_service_call_manager.call_started(job_metadata)
from superset.commands.mitm.mitm_service.upload import UploadMitMDatasetCommand
with override_user(load_user_from_job_metadata(job_metadata), force=False):
try:
mitm_zip = ReadCacheCommand(mitm_zip_cache_key).run()
UploadMitMDatasetCommand(mitm_name, dataset_name, mitm_zip=mitm_zip).run()
external_service_call_manager.call_completed(job_metadata)
except CommandException as ex:
logger.error(
'An error occurred while uploading the MitMDataset, error: %s',
ex)
msg = str(ex.message if hasattr(ex, 'message') else ex)
errors = [{'message': msg}]
external_service_call_manager.call_failed(job_metadata, errors=errors)
except SoftTimeLimitExceeded as ex:
logger.warning(
'A timeout occurred while uploading the MitMDataset, error: %s',
ex)
external_service_call_manager.call_failed(job_metadata, errors=[{'message': str(ex)}])