Skip to content
Snippets Groups Projects
Select Git revision
  • d82c5b6447b1c81913e92d3fd888fde45c63b5cc
  • master default protected
2 results

cluster-options-example.yml

Blame
  • upload_mitm_dataset.py 2.04 KiB
    from __future__ import annotations
    
    import logging
    from typing import TYPE_CHECKING
    
    from celery.exceptions import SoftTimeLimitExceeded
    
    from superset.extensions import celery_app
    from superset.extensions import external_service_call_manager
    from superset.utils.core import override_user
    from .common import get_service_call_timeout, load_user_from_job_metadata
    from ...commands.exceptions import CommandException
    from ...commands.mitm.caching.cache_data import ReadCacheCommand
    
    logger = logging.getLogger(__name__)
    
    if TYPE_CHECKING:
        from superset.customization.external_service_support.common import AsyncJobMetadata
    
    
    @celery_app.task(name='upload_mitm_dataset',
                     soft_time_limit=get_service_call_timeout(),
                     ignore_result=True)
    def upload_mitm_dataset_task(job_metadata: AsyncJobMetadata,
                                 dataset_name: str,
                                 mitm_zip_cache_key: str,
                                 mitm_name: str = 'MAED') -> None:
        external_service_call_manager.call_started(job_metadata)
    
        from superset.commands.mitm.mitm_service.upload import UploadMitMDatasetCommand
    
        with override_user(load_user_from_job_metadata(job_metadata), force=False):
            try:
                mitm_zip = ReadCacheCommand(mitm_zip_cache_key).run()
    
                UploadMitMDatasetCommand(mitm_name, dataset_name, mitm_zip=mitm_zip).run()
    
                external_service_call_manager.call_completed(job_metadata)
            except CommandException as ex:
                logger.error(
                    'An error occurred while uploading the MitMDataset, error: %s',
                    ex)
                msg = str(ex.message if hasattr(ex, 'message') else ex)
                errors = [{'message': msg}]
                external_service_call_manager.call_failed(job_metadata, errors=errors)
            except SoftTimeLimitExceeded as ex:
                logger.warning(
                    'A timeout occurred while uploading the MitMDataset, error: %s',
                    ex)
                external_service_call_manager.call_failed(job_metadata, errors=[{'message': str(ex)}])