airflow kubernetes 源码

  • 2022-10-20
  • 浏览 (408)

airflow kubernetes 代码

文件路径:/airflow/providers/cncf/kubernetes/hooks/kubernetes.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import tempfile
import warnings
from typing import TYPE_CHECKING, Any, Generator

from kubernetes import client, config, watch
from kubernetes.config import ConfigException

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.utils import yaml


def _load_body_to_dict(body):
    try:
        body_dict = yaml.safe_load(body)
    except yaml.YAMLError as e:
        raise AirflowException(f"Exception when loading resource definition: {e}\n")
    return body_dict


class KubernetesHook(BaseHook):
    """
    Creates Kubernetes API connection.

    - use in cluster configuration by using ``extra__kubernetes__in_cluster`` in connection
    - use custom config by providing path to the file using ``extra__kubernetes__kube_config_path``
    - use custom configuration by providing content of kubeconfig file via
        ``extra__kubernetes__kube_config`` in connection
    - use default config by providing no extras

    This hook check for configuration option in the above order. Once an option is present it will
    use this configuration.

    .. seealso::
        For more information about Kubernetes connection:
        :doc:`/connections/kubernetes`

    :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
        to Kubernetes cluster.
    :param client_configuration: Optional dictionary of client configuration params.
        Passed on to kubernetes client.
    :param cluster_context: Optionally specify a context to use (e.g. if you have multiple
        in your kubeconfig.
    :param config_file: Path to kubeconfig file.
    :param in_cluster: Set to ``True`` if running from within a kubernetes cluster.
    :param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled.
    :param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic.
    """

    conn_name_attr = 'kubernetes_conn_id'
    default_conn_name = 'kubernetes_default'
    conn_type = 'kubernetes'
    hook_name = 'Kubernetes Cluster Connection'

    @staticmethod
    def get_connection_form_widgets() -> dict[str, Any]:
        """Returns connection widgets to add to connection form"""
        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
        from flask_babel import lazy_gettext
        from wtforms import BooleanField, StringField

        return {
            "extra__kubernetes__in_cluster": BooleanField(lazy_gettext('In cluster configuration')),
            "extra__kubernetes__kube_config_path": StringField(
                lazy_gettext('Kube config path'), widget=BS3TextFieldWidget()
            ),
            "extra__kubernetes__kube_config": StringField(
                lazy_gettext('Kube config (JSON format)'), widget=BS3TextFieldWidget()
            ),
            "extra__kubernetes__namespace": StringField(
                lazy_gettext('Namespace'), widget=BS3TextFieldWidget()
            ),
            "extra__kubernetes__cluster_context": StringField(
                lazy_gettext('Cluster context'), widget=BS3TextFieldWidget()
            ),
            "extra__kubernetes__disable_verify_ssl": BooleanField(lazy_gettext('Disable SSL')),
            "extra__kubernetes__disable_tcp_keepalive": BooleanField(lazy_gettext('Disable TCP keepalive')),
        }

    @staticmethod
    def get_ui_field_behaviour() -> dict[str, Any]:
        """Returns custom field behaviour"""
        return {
            "hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'],
            "relabeling": {},
        }

    def __init__(
        self,
        conn_id: str | None = default_conn_name,
        client_configuration: client.Configuration | None = None,
        cluster_context: str | None = None,
        config_file: str | None = None,
        in_cluster: bool | None = None,
        disable_verify_ssl: bool | None = None,
        disable_tcp_keepalive: bool | None = None,
    ) -> None:
        super().__init__()
        self.conn_id = conn_id
        self.client_configuration = client_configuration
        self.cluster_context = cluster_context
        self.config_file = config_file
        self.in_cluster = in_cluster
        self.disable_verify_ssl = disable_verify_ssl
        self.disable_tcp_keepalive = disable_tcp_keepalive
        self._is_in_cluster: bool | None = None

    @staticmethod
    def _coalesce_param(*params):
        for param in params:
            if param is not None:
                return param

    @cached_property
    def conn_extras(self):
        if self.conn_id:
            connection = self.get_connection(self.conn_id)
            extras = connection.extra_dejson
        else:
            extras = {}
        return extras

    def _get_field(self, field_name):
        if field_name.startswith('extra_'):
            raise ValueError(
                f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "
                f"when using this method."
            )
        if field_name in self.conn_extras:
            return self.conn_extras[field_name] or None
        prefixed_name = f"extra__kubernetes__{field_name}"
        return self.conn_extras.get(prefixed_name) or None

    @staticmethod
    def _deprecation_warning_core_param(deprecation_warnings):
        settings_list_str = ''.join([f"\n\t{k}={v!r}" for k, v in deprecation_warnings])
        warnings.warn(
            f"\nApplying core Airflow settings from section [kubernetes] with the following keys:"
            f"{settings_list_str}\n"
            "In a future release, KubernetesPodOperator will no longer consider core\n"
            "Airflow settings; define an Airflow connection instead.",
            DeprecationWarning,
        )

    def get_conn(self) -> client.ApiClient:
        """Returns kubernetes api session for use with requests"""
        in_cluster = self._coalesce_param(
            self.in_cluster, self.conn_extras.get("extra__kubernetes__in_cluster") or None
        )
        cluster_context = self._coalesce_param(
            self.cluster_context, self.conn_extras.get("extra__kubernetes__cluster_context") or None
        )
        kubeconfig_path = self._coalesce_param(
            self.config_file, self.conn_extras.get("extra__kubernetes__kube_config_path") or None
        )

        kubeconfig = self.conn_extras.get("extra__kubernetes__kube_config") or None
        num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])

        if num_selected_configuration > 1:
            raise AirflowException(
                "Invalid connection configuration. Options kube_config_path, "
                "kube_config, in_cluster are mutually exclusive. "
                "You can only use one option at a time."
            )

        disable_verify_ssl = self._coalesce_param(
            self.disable_verify_ssl, _get_bool(self._get_field("disable_verify_ssl"))
        )
        disable_tcp_keepalive = self._coalesce_param(
            self.disable_tcp_keepalive, _get_bool(self._get_field("disable_tcp_keepalive"))
        )

        if disable_verify_ssl is True:
            _disable_verify_ssl()
        if disable_tcp_keepalive is not True:
            _enable_tcp_keepalive()

        if in_cluster:
            self.log.debug("loading kube_config from: in_cluster configuration")
            self._is_in_cluster = True
            config.load_incluster_config()
            return client.ApiClient()

        if kubeconfig_path is not None:
            self.log.debug("loading kube_config from: %s", kubeconfig_path)
            self._is_in_cluster = False
            config.load_kube_config(
                config_file=kubeconfig_path,
                client_configuration=self.client_configuration,
                context=cluster_context,
            )
            return client.ApiClient()

        if kubeconfig is not None:
            with tempfile.NamedTemporaryFile() as temp_config:
                self.log.debug("loading kube_config from: connection kube_config")
                temp_config.write(kubeconfig.encode())
                temp_config.flush()
                self._is_in_cluster = False
                config.load_kube_config(
                    config_file=temp_config.name,
                    client_configuration=self.client_configuration,
                    context=cluster_context,
                )
            return client.ApiClient()

        return self._get_default_client(cluster_context=cluster_context)

    def _get_default_client(self, *, cluster_context: str | None = None) -> client.ApiClient:
        # if we get here, then no configuration has been supplied
        # we should try in_cluster since that's most likely
        # but failing that just load assuming a kubeconfig file
        # in the default location
        try:
            config.load_incluster_config(client_configuration=self.client_configuration)
            self._is_in_cluster = True
        except ConfigException:
            self.log.debug("loading kube_config from: default file")
            self._is_in_cluster = False
            config.load_kube_config(
                client_configuration=self.client_configuration,
                context=cluster_context,
            )
        return client.ApiClient()

    @property
    def is_in_cluster(self) -> bool:
        """Expose whether the hook is configured with ``load_incluster_config`` or not"""
        if self._is_in_cluster is not None:
            return self._is_in_cluster
        self.api_client  # so we can determine if we are in_cluster or not
        if TYPE_CHECKING:
            assert self._is_in_cluster is not None
        return self._is_in_cluster

    @cached_property
    def api_client(self) -> client.ApiClient:
        """Cached Kubernetes API client"""
        return self.get_conn()

    @cached_property
    def core_v1_client(self) -> client.CoreV1Api:
        return client.CoreV1Api(api_client=self.api_client)

    def create_custom_object(
        self, group: str, version: str, plural: str, body: str | dict, namespace: str | None = None
    ):
        """
        Creates custom resource definition object in Kubernetes

        :param group: api group
        :param version: api version
        :param plural: api plural
        :param body: crd object definition
        :param namespace: kubernetes namespace
        """
        api = client.CustomObjectsApi(self.api_client)
        if namespace is None:
            namespace = self.get_namespace()
        if isinstance(body, str):
            body_dict = _load_body_to_dict(body)
        else:
            body_dict = body
        try:
            api.delete_namespaced_custom_object(
                group=group,
                version=version,
                namespace=namespace,
                plural=plural,
                name=body_dict["metadata"]["name"],
            )
            self.log.warning("Deleted SparkApplication with the same name.")
        except client.rest.ApiException:
            self.log.info("SparkApp %s not found.", body_dict['metadata']['name'])

        try:
            response = api.create_namespaced_custom_object(
                group=group, version=version, namespace=namespace, plural=plural, body=body_dict
            )
            self.log.debug("Response: %s", response)
            return response
        except client.rest.ApiException as e:
            raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")

    def get_custom_object(
        self, group: str, version: str, plural: str, name: str, namespace: str | None = None
    ):
        """
        Get custom resource definition object from Kubernetes

        :param group: api group
        :param version: api version
        :param plural: api plural
        :param name: crd object name
        :param namespace: kubernetes namespace
        """
        api = client.CustomObjectsApi(self.api_client)
        if namespace is None:
            namespace = self.get_namespace()
        try:
            response = api.get_namespaced_custom_object(
                group=group, version=version, namespace=namespace, plural=plural, name=name
            )
            return response
        except client.rest.ApiException as e:
            raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")

    def get_namespace(self) -> str | None:
        """Returns the namespace that defined in the connection"""
        if self.conn_id:
            connection = self.get_connection(self.conn_id)
            extras = connection.extra_dejson
            namespace = extras.get("extra__kubernetes__namespace", "default")
            return namespace
        return None

    def get_pod_log_stream(
        self,
        pod_name: str,
        container: str | None = "",
        namespace: str | None = None,
    ) -> tuple[watch.Watch, Generator[str, None, None]]:
        """
        Retrieves a log stream for a container in a kubernetes pod.

        :param pod_name: pod name
        :param container: container name
        :param namespace: kubernetes namespace
        """
        watcher = watch.Watch()
        return (
            watcher,
            watcher.stream(
                self.core_v1_client.read_namespaced_pod_log,
                name=pod_name,
                container=container,
                namespace=namespace if namespace else self.get_namespace(),
            ),
        )

    def get_pod_logs(
        self,
        pod_name: str,
        container: str | None = "",
        namespace: str | None = None,
    ):
        """
        Retrieves a container's log from the specified pod.

        :param pod_name: pod name
        :param container: container name
        :param namespace: kubernetes namespace
        """
        return self.core_v1_client.read_namespaced_pod_log(
            name=pod_name,
            container=container,
            _preload_content=False,
            namespace=namespace if namespace else self.get_namespace(),
        )


def _get_bool(val) -> bool | None:
    """
    Converts val to bool if can be done with certainty.
    If we cannot infer intention we return None.
    """
    if isinstance(val, bool):
        return val
    elif isinstance(val, str):
        if val.strip().lower() == 'true':
            return True
        elif val.strip().lower() == 'false':
            return False
    return None

相关信息

airflow 源码目录

相关文章

airflow init 源码

0  赞