superset __init__ 源码

  • 2022-10-20
  • 浏览 (485)

superset init 代码

文件路径:/superset/db_engine_specs/init.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Compatibility layer for different database engines

This modules stores logic specific to different database engines. Things
like time-related functions that are similar but not identical, or
information as to expose certain features or not and how to expose them.

For instance, Hive/Presto supports partitions and have a specific API to
list partitions. Other databases like Vertica also support partitions but
have different API to get to them. Other databases don't support partitions
at all. The classes here will use a common interface to specify all this.

The general idea is to use static classes and an inheritance scheme.
"""
import inspect
import logging
import pkgutil
from collections import defaultdict
from importlib import import_module
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Type

import sqlalchemy.databases
import sqlalchemy.dialects
from pkg_resources import iter_entry_points
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.engine.url import URL

from superset import app
from superset.db_engine_specs.base import BaseEngineSpec

logger = logging.getLogger(__name__)


def is_engine_spec(obj: Any) -> bool:
    """
    Return true if a given object is a DB engine spec.
    """
    return (
        inspect.isclass(obj)
        and issubclass(obj, BaseEngineSpec)
        and obj != BaseEngineSpec
    )


def load_engine_specs() -> List[Type[BaseEngineSpec]]:
    """
    Load all engine specs, native and 3rd party.
    """
    engine_specs: List[Type[BaseEngineSpec]] = []

    # load standard engines
    db_engine_spec_dir = str(Path(__file__).parent)
    for module_info in pkgutil.iter_modules([db_engine_spec_dir], prefix="."):
        module = import_module(module_info.name, package=__name__)
        engine_specs.extend(
            getattr(module, attr)
            for attr in module.__dict__
            if is_engine_spec(getattr(module, attr))
        )
    # load additional engines from external modules
    for ep in iter_entry_points("superset.db_engine_specs"):
        try:
            engine_spec = ep.load()
        except Exception:  # pylint: disable=broad-except
            logger.warning("Unable to load Superset DB engine spec: %s", ep.name)
            continue
        engine_specs.append(engine_spec)

    return engine_specs


def get_engine_spec(backend: str, driver: Optional[str] = None) -> Type[BaseEngineSpec]:
    """
    Return the DB engine spec associated with a given SQLAlchemy URL.

    Note that if a driver is not specified the function returns the first DB engine spec
    that supports the backend. Also, if a driver is specified but no DB engine explicitly
    supporting that driver exists then a backend-only match is done, in order to allow new
    drivers to work with Superset even if they are not listed in the DB engine spec
    drivers.
    """
    engine_specs = load_engine_specs()

    if driver is not None:
        for engine_spec in engine_specs:
            if engine_spec.supports_backend(backend, driver):
                return engine_spec

    # check ignoring the driver, in order to support new drivers; this will return a
    # random DB engine spec that supports the engine
    for engine_spec in engine_specs:
        if engine_spec.supports_backend(backend):
            return engine_spec

    # default to the generic DB engine spec
    return BaseEngineSpec


# there's a mismatch between the dialect name reported by the driver in these
# libraries and the dialect name used in the URI
backend_replacements = {
    "drilldbapi": "drill",
    "exasol": "exa",
}


def get_available_engine_specs() -> Dict[Type[BaseEngineSpec], Set[str]]:
    """
    Return available engine specs and installed drivers for them.
    """
    drivers: Dict[str, Set[str]] = defaultdict(set)

    # native SQLAlchemy dialects
    for attr in sqlalchemy.databases.__all__:
        dialect = getattr(sqlalchemy.dialects, attr)
        for attribute in dialect.__dict__.values():
            if (
                hasattr(attribute, "dialect")
                and inspect.isclass(attribute.dialect)
                and issubclass(attribute.dialect, DefaultDialect)
                # adodbapi dialect is removed in SQLA 1.4 and doesn't implement the
                # `dbapi` method, hence needs to be ignored to avoid logging a warning
                and attribute.dialect.driver != "adodbapi"
            ):
                try:
                    attribute.dialect.dbapi()
                except ModuleNotFoundError:
                    continue
                except Exception as ex:  # pylint: disable=broad-except
                    logger.warning(
                        "Unable to load dialect %s: %s", attribute.dialect, ex
                    )
                    continue
                drivers[attr].add(attribute.dialect.driver)

    # installed 3rd-party dialects
    for ep in iter_entry_points("sqlalchemy.dialects"):
        try:
            dialect = ep.load()
        except Exception as ex:  # pylint: disable=broad-except
            logger.warning("Unable to load SQLAlchemy dialect %s: %s", dialect, ex)
        else:
            backend = dialect.name
            if isinstance(backend, bytes):
                backend = backend.decode()
            backend = backend_replacements.get(backend, backend)

            driver = getattr(dialect, "driver", dialect.name)
            if isinstance(driver, bytes):
                driver = driver.decode()
            drivers[backend].add(driver)

    available_engines = {}
    for engine_spec in load_engine_specs():
        driver = drivers[engine_spec.engine]

        # do not add denied db engine specs to available list
        dbs_denylist = app.config["DBS_AVAILABLE_DENYLIST"]
        dbs_denylist_engines = dbs_denylist.keys()

        if (
            engine_spec.engine in dbs_denylist_engines
            and hasattr(engine_spec, "default_driver")
            and engine_spec.default_driver in dbs_denylist[engine_spec.engine]
        ):
            continue

        # lookup driver by engine aliases.
        if not driver and engine_spec.engine_aliases:
            for alias in engine_spec.engine_aliases:
                driver = drivers[alias]
                if driver:
                    break

        available_engines[engine_spec] = driver

    return available_engines

相关信息

superset 源码目录

相关文章

superset ascend 源码

superset athena 源码

superset aurora 源码

superset base 源码

superset bigquery 源码

superset clickhouse 源码

superset cockroachdb 源码

superset crate 源码

superset databricks 源码

superset db2 源码

0  赞