superset dao 源码

  • 2022-10-20
superset dao 代码


# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from sqlalchemy.exc import SQLAlchemyError

from superset import security_manager
from superset.dao.base import BaseDAO
from superset.dashboards.commands.exceptions import DashboardNotFoundError
from superset.dashboards.filters import DashboardAccessFilter
from superset.extensions import db
from superset.models.core import FavStar, FavStarClassName
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.utils.core import get_user_id
from superset.utils.dashboard_filter_scopes_converter import copy_filter_scopes

logger = logging.getLogger(__name__)

class DashboardDAO(BaseDAO):
    model_cls = Dashboard
    base_filter = DashboardAccessFilter

    def get_by_id_or_slug(id_or_slug: str) -> Dashboard:
        dashboard = Dashboard.get(id_or_slug)
        if not dashboard:
            raise DashboardNotFoundError()
        return dashboard

    def get_datasets_for_dashboard(id_or_slug: str) -> List[Any]:
        dashboard = DashboardDAO.get_by_id_or_slug(id_or_slug)
        return dashboard.datasets_trimmed_for_slices()

    def get_charts_for_dashboard(id_or_slug: str) -> List[Slice]:
        return DashboardDAO.get_by_id_or_slug(id_or_slug).slices

    def get_dashboard_changed_on(
        id_or_slug_or_dashboard: Union[str, Dashboard]
    ) -> datetime:
        Get latest changed datetime for a dashboard.

        :param id_or_slug_or_dashboard: A dashboard or the ID or slug of the dashboard.
        :returns: The datetime the dashboard was last changed.

        dashboard = (
            if isinstance(id_or_slug_or_dashboard, str)
            else id_or_slug_or_dashboard
        # drop microseconds in datetime to match with last_modified header
        return dashboard.changed_on.replace(microsecond=0)

    def get_dashboard_and_slices_changed_on(  # pylint: disable=invalid-name
        id_or_slug_or_dashboard: Union[str, Dashboard]
    ) -> datetime:
        Get latest changed datetime for a dashboard. The change could be a dashboard
        metadata change, or a change to one of its dependent slices.

        :param id_or_slug_or_dashboard: A dashboard or the ID or slug of the dashboard.
        :returns: The datetime the dashboard was last changed.

        dashboard = (
            if isinstance(id_or_slug_or_dashboard, str)
            else id_or_slug_or_dashboard
        dashboard_changed_on = DashboardDAO.get_dashboard_changed_on(dashboard)
        slices = dashboard.slices
        slices_changed_on = max(
            [slc.changed_on for slc in slices]
            + ([datetime.fromtimestamp(0)] if len(slices) == 0 else [])
        # drop microseconds in datetime to match with last_modified header
        return max(dashboard_changed_on, slices_changed_on).replace(microsecond=0)

    def get_dashboard_and_datasets_changed_on(  # pylint: disable=invalid-name
        id_or_slug_or_dashboard: Union[str, Dashboard]
    ) -> datetime:
        Get latest changed datetime for a dashboard. The change could be a dashboard
        metadata change, a change to one of its dependent datasets.

        :param id_or_slug_or_dashboard: A dashboard or the ID or slug of the dashboard.
        :returns: The datetime the dashboard was last changed.

        dashboard = (
            if isinstance(id_or_slug_or_dashboard, str)
            else id_or_slug_or_dashboard
        dashboard_changed_on = DashboardDAO.get_dashboard_changed_on(dashboard)
        datasources = dashboard.datasources
        datasources_changed_on = max(
            [datasource.changed_on for datasource in datasources]
            + ([datetime.fromtimestamp(0)] if len(datasources) == 0 else [])
        # drop microseconds in datetime to match with last_modified header
        return max(dashboard_changed_on, datasources_changed_on).replace(microsecond=0)

    def validate_slug_uniqueness(slug: str) -> bool:
        if not slug:
            return True
        dashboard_query = db.session.query(Dashboard).filter(Dashboard.slug == slug)
        return not db.session.query(dashboard_query.exists()).scalar()

    def validate_update_slug_uniqueness(dashboard_id: int, slug: Optional[str]) -> bool:
        if slug is not None:
            dashboard_query = db.session.query(Dashboard).filter(
                Dashboard.slug == slug, != dashboard_id
            return not db.session.query(dashboard_query.exists()).scalar()
        return True

    def update_charts_owners(model: Dashboard, commit: bool = True) -> Dashboard:
        owners = list(model.owners)
        for slc in model.slices:
            slc.owners = list(set(owners) | set(slc.owners))
        if commit:
        return model

    def bulk_delete(models: Optional[List[Dashboard]], commit: bool = True) -> None:
        item_ids = [ for model in models] if models else []
        # bulk delete, first delete related data
        if models:
            for model in models:
                model.slices = []
                model.owners = []
        # bulk delete itself
            if commit:
        except SQLAlchemyError as ex:
            raise ex

    def set_dash_metadata(  # pylint: disable=too-many-locals
        dashboard: Dashboard,
        data: Dict[Any, Any],
        old_to_new_slice_ids: Optional[Dict[int, int]] = None,
        commit: bool = False,
    ) -> Dashboard:
        positions = data.get("positions")
        new_filter_scopes = {}
        md = dashboard.params_dict

        if positions is not None:
            # find slices in the position data
            slice_ids = [
                value.get("meta", {}).get("chartId")
                for value in positions.values()
                if isinstance(value, dict)

            session = db.session()
            current_slices = session.query(Slice).filter(

            dashboard.slices = current_slices

            # add UUID to positions
            uuid_map = { str(slice.uuid) for slice in current_slices}
            for obj in positions.values():
                if (
                    isinstance(obj, dict)
                    and obj["type"] == "CHART"
                    and obj["meta"]["chartId"]
                    chart_id = obj["meta"]["chartId"]
                    obj["meta"]["uuid"] = uuid_map.get(chart_id)

            # remove leading and trailing white spaces in the dumped json
            dashboard.position_json = json.dumps(
                positions, indent=None, separators=(",", ":"), sort_keys=True

            if "filter_scopes" in data:
                # replace filter_id and immune ids from old slice id to new slice id:
                # and remove slice ids that are not in dash anymore
                slc_id_dict: Dict[int, int] = {}
                if old_to_new_slice_ids:
                    slc_id_dict = {
                        old: new
                        for old, new in old_to_new_slice_ids.items()
                        if new in slice_ids
                    slc_id_dict = {sid: sid for sid in slice_ids}
                new_filter_scopes = copy_filter_scopes(
                    old_filter_scopes=json.loads(data["filter_scopes"] or "{}")
                    if isinstance(data["filter_scopes"], str)
                    else data["filter_scopes"],

            default_filters_data = json.loads(data.get("default_filters", "{}"))
            applicable_filters = {
                key: v
                for key, v in default_filters_data.items()
                if int(key) in slice_ids
            md["default_filters"] = json.dumps(applicable_filters)

            # positions have its own column, no need to store it in metadata
            md.pop("positions", None)

        # The css and dashboard_title properties are not part of the metadata
        # TODO (geido): remove by refactoring/deprecating save_dash endpoint
        if data.get("css") is not None:
            dashboard.css = data.get("css")
        if data.get("dashboard_title") is not None:
            dashboard.dashboard_title = data.get("dashboard_title")

        if new_filter_scopes:
            md["filter_scopes"] = new_filter_scopes
            md.pop("filter_scopes", None)

        md.setdefault("timed_refresh_immune_slices", [])

        if data.get("color_namespace") is None:
            md.pop("color_namespace", None)
            md["color_namespace"] = data.get("color_namespace")

        md["expanded_slices"] = data.get("expanded_slices", {})
        md["refresh_frequency"] = data.get("refresh_frequency", 0)
        md["color_scheme"] = data.get("color_scheme", "")
        md["label_colors"] = data.get("label_colors", {})
        md["shared_label_colors"] = data.get("shared_label_colors", {})
        md["color_scheme_domain"] = data.get("color_scheme_domain", [])
        dashboard.json_metadata = json.dumps(md)

        if commit:
        return dashboard

    def favorited_ids(dashboards: List[Dashboard]) -> List[FavStar]:
        ids = [ for dash in dashboards]
        return [
            for star in db.session.query(FavStar.obj_id)
                FavStar.class_name == FavStarClassName.DASHBOARD,
                FavStar.user_id == get_user_id(),


