Skip to content
Snippets Groups Projects
Select Git revision
  • 02c154a1e596ae97d4af50f8594644e0cf137cf5
  • master default protected
  • v0.5
  • v0.4
  • v0.3.1
  • v0.3
  • v0.2.3
  • v0.2.2
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1.0
  • v0.0.1
13 results

setup.py

Blame
  • asyncEvent.ts 7.04 KiB
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    import {
      ensureIsArray,
      FeatureFlag,
      getClientErrorObject,
      isFeatureEnabled,
      logging,
      makeApi,
      parseErrorJson,
      SupersetClient,
      SupersetError,
    } from '@superset-ui/core';
    import getBootstrapData from 'src/utils/getBootstrapData';
    
    type AsyncEvent = {
      id?: string | null;
      channel_id: string;
      job_id: string;
      user_id?: string;
      status: string;
      errors?: SupersetError[];
      result_url: string | null;
    };
    
    type CachedDataResponse = {
      status: string;
      data: any;
    };
    type AppConfig = Record<string, any>;
    type ListenerFn = (asyncEvent: AsyncEvent) => Promise<any>;
    
    const TRANSPORT_POLLING = 'polling';
    const TRANSPORT_WS = 'ws';
    const JOB_STATUS = {
      PENDING: 'pending',
      RUNNING: 'running',
      ERROR: 'error',
      DONE: 'done',
    };
    const LOCALSTORAGE_KEY = 'last_async_event_id';
    const POLLING_URL = '/api/v1/async_event/';
    const MAX_RETRIES = 6;
    const RETRY_DELAY = 100;
    
    let config: AppConfig;
    let transport: string;
    let pollingDelayMs: number;
    let pollingTimeoutId: number;
    let listenersByJobId: Record<string, ListenerFn>;
    let retriesByJobId: Record<string, number>;
    let lastReceivedEventId: string | null | undefined;
    
    const addListener = (id: string, fn: any) => {
      listenersByJobId[id] = fn;
    };
    
    const removeListener = (id: string) => {
      if (!listenersByJobId[id]) return;
      delete listenersByJobId[id];
    };
    
    export const useAsyncEventHandling = () => {
      return { addListener, removeListener };
    };
    
    const fetchCachedData = async (
      asyncEvent: AsyncEvent,
    ): Promise<CachedDataResponse> => {
      let status = 'success';
      let data;
      try {
        const { json } = await SupersetClient.get({
          endpoint: String(asyncEvent.result_url),
        });
        data = 'result' in json ? json.result : json;
      } catch (response) {
        status = 'error';
        data = await getClientErrorObject(response);
      }
    
      return { status, data };
    };
    
    export const waitForAsyncData = async (asyncResponse: AsyncEvent) =>
      new Promise((resolve, reject) => {
        const jobId = asyncResponse.job_id;
        const listener = async (asyncEvent: AsyncEvent) => {
          switch (asyncEvent.status) {
            case JOB_STATUS.DONE: {
              let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const
              data = ensureIsArray(data);
              if (status === 'success') {
                resolve(data);
              } else {
                reject(data);
              }
              break;
            }
            case JOB_STATUS.ERROR: {
              const err = parseErrorJson(asyncEvent);
              reject(err);
              break;
            }
            default: {
              logging.warn('received event with status', asyncEvent.status);
            }
          }
          removeListener(jobId);
        };
        addListener(jobId, listener);
      });
    
    const fetchEvents = makeApi<
      { last_id?: string | null },
      { result: AsyncEvent[] }
    >({
      method: 'GET',
      endpoint: POLLING_URL,
    });
    
    const setLastId = (asyncEvent: AsyncEvent) => {
      lastReceivedEventId = asyncEvent.id;
      try {
        localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
      } catch (err) {
        logging.warn('Error saving event Id to localStorage', err);
      }
    };
    
    export const processEvents = async (events: AsyncEvent[]) => {
      events.forEach((asyncEvent: AsyncEvent) => {
        const jobId = asyncEvent.job_id;
        const listener = listenersByJobId[jobId];
        if (listener) {
          listener(asyncEvent);
          delete retriesByJobId[jobId];
        } else {
          // handle race condition where event is received
          // before listener is registered
          if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0;
          retriesByJobId[jobId] += 1;
    
          if (retriesByJobId[jobId] <= MAX_RETRIES) {
            setTimeout(() => {
              processEvents([asyncEvent]);
            }, RETRY_DELAY * retriesByJobId[jobId]);
          } else {
            delete retriesByJobId[jobId];
            logging.warn('listener not found for job_id', asyncEvent.job_id);
          }
        }
        setLastId(asyncEvent);
      });
    };
    
    const loadEventsFromApi = async () => {
      const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {};
      if (Object.keys(listenersByJobId).length) {
        try {
          const { result: events } = await fetchEvents(eventArgs);
          if (events?.length) await processEvents(events);
        } catch (err) {
          logging.warn(err);
        }
      }
    
      if (transport === TRANSPORT_POLLING) {
        pollingTimeoutId = window.setTimeout(loadEventsFromApi, pollingDelayMs);
      }
    };
    
    const wsConnectMaxRetries = 6;
    const wsConnectErrorDelay = 2500;
    let wsConnectRetries = 0;
    let wsConnectTimeout: any;
    let ws: WebSocket;
    
    const wsConnect = (): void => {
      let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL;
      if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`;
      ws = new WebSocket(url);
    
      ws.addEventListener('open', () => {
        logging.log('WebSocket connected');
        clearTimeout(wsConnectTimeout);
        wsConnectRetries = 0;
      });
    
      ws.addEventListener('close', () => {
        wsConnectTimeout = setTimeout(() => {
          wsConnectRetries += 1;
          if (wsConnectRetries <= wsConnectMaxRetries) {
            wsConnect();
          } else {
            logging.warn('WebSocket not available, falling back to async polling');
            loadEventsFromApi();
          }
        }, wsConnectErrorDelay);
      });
    
      ws.addEventListener('error', () => {
        // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
        if (ws.readyState < 2) ws.close();
      });
    
      ws.addEventListener('message', async event => {
        let events: AsyncEvent[] = [];
        try {
          events = [JSON.parse(event.data)];
          await processEvents(events);
        } catch (err) {
          logging.warn(err);
        }
      });
    };
    
    export const init = (appConfig?: AppConfig) => {
      if (!isFeatureEnabled(FeatureFlag.GlobalAsyncQueries)) return;
      if (pollingTimeoutId) clearTimeout(pollingTimeoutId);
    
      listenersByJobId = {};
      retriesByJobId = {};
      lastReceivedEventId = null;
    
      config = appConfig || getBootstrapData().common.conf;
      transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
      pollingDelayMs = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
    
      try {
        lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
      } catch (err) {
        logging.warn('Failed to fetch last event Id from localStorage');
      }
    
      if (transport === TRANSPORT_POLLING) {
        loadEventsFromApi();
      }
      if (transport === TRANSPORT_WS) {
        wsConnect();
      }
    };
    
    init();