airflow elasticache_replication_group 源码

  • 2022-10-20
  • 浏览 (421)

airflow elasticache_replication_group 代码

文件路径:/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from time import sleep

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class ElastiCacheReplicationGroupHook(AwsBaseHook):
    """
    Interact with AWS ElastiCache

    :param max_retries: Max retries for checking availability of and deleting replication group
            If this is not supplied then this is defaulted to 10
    :param exponential_back_off_factor: Multiplication factor for deciding next sleep time
            If this is not supplied then this is defaulted to 1
    :param initial_poke_interval: Initial sleep time in seconds
            If this is not supplied then this is defaulted to 60 seconds
    """

    TERMINAL_STATES = frozenset({"available", "create-failed", "deleting"})

    def __init__(
        self,
        max_retries: int = 10,
        exponential_back_off_factor: float = 1,
        initial_poke_interval: float = 60,
        *args,
        **kwargs,
    ):
        self.max_retries = max_retries
        self.exponential_back_off_factor = exponential_back_off_factor
        self.initial_poke_interval = initial_poke_interval

        kwargs["client_type"] = "elasticache"
        super().__init__(*args, **kwargs)

    def create_replication_group(self, config: dict) -> dict:
        """
        Call ElastiCache API for creating a replication group

        :param config: Configuration for creating the replication group
        :return: Response from ElastiCache create replication group API
        :rtype: dict
        """
        return self.conn.create_replication_group(**config)

    def delete_replication_group(self, replication_group_id: str) -> dict:
        """
        Call ElastiCache API for deleting a replication group

        :param replication_group_id: ID of replication group to delete
        :return: Response from ElastiCache delete replication group API
        :rtype: dict
        """
        return self.conn.delete_replication_group(ReplicationGroupId=replication_group_id)

    def describe_replication_group(self, replication_group_id: str) -> dict:
        """
        Call ElastiCache API for describing a replication group

        :param replication_group_id: ID of replication group to describe
        :return: Response from ElastiCache describe replication group API
        :rtype: dict
        """
        return self.conn.describe_replication_groups(ReplicationGroupId=replication_group_id)

    def get_replication_group_status(self, replication_group_id: str) -> str:
        """
        Get current status of replication group

        :param replication_group_id: ID of replication group to check for status
        :return: Current status of replication group
        :rtype: str
        """
        return self.describe_replication_group(replication_group_id)['ReplicationGroups'][0]['Status']

    def is_replication_group_available(self, replication_group_id: str) -> bool:
        """
        Helper for checking if replication group is available or not

        :param replication_group_id: ID of replication group to check for availability
        :return: True if available else False
        :rtype: bool
        """
        return self.get_replication_group_status(replication_group_id) == 'available'

    def wait_for_availability(
        self,
        replication_group_id: str,
        initial_sleep_time: float | None = None,
        exponential_back_off_factor: float | None = None,
        max_retries: int | None = None,
    ):
        """
        Check if replication group is available or not by performing a describe over it

        :param replication_group_id: ID of replication group to check for availability
        :param initial_sleep_time: Initial sleep time in seconds
            If this is not supplied then this is defaulted to class level value
        :param exponential_back_off_factor: Multiplication factor for deciding next sleep time
            If this is not supplied then this is defaulted to class level value
        :param max_retries: Max retries for checking availability of replication group
            If this is not supplied then this is defaulted to class level value
        :return: True if replication is available else False
        :rtype: bool
        """
        sleep_time = initial_sleep_time or self.initial_poke_interval
        exponential_back_off_factor = exponential_back_off_factor or self.exponential_back_off_factor
        max_retries = max_retries or self.max_retries
        num_tries = 0
        status = 'not-found'
        stop_poking = False

        while not stop_poking and num_tries <= max_retries:
            status = self.get_replication_group_status(replication_group_id=replication_group_id)
            stop_poking = status in self.TERMINAL_STATES

            self.log.info(
                'Current status of replication group with ID %s is %s', replication_group_id, status
            )

            if not stop_poking:
                num_tries += 1

                # No point in sleeping if all tries have exhausted
                if num_tries > max_retries:
                    break

                self.log.info('Poke retry %s. Sleep time %s seconds. Sleeping...', num_tries, sleep_time)

                sleep(sleep_time)

                sleep_time *= exponential_back_off_factor

        if status != 'available':
            self.log.warning('Replication group is not available. Current status is "%s"', status)

            return False

        return True

    def wait_for_deletion(
        self,
        replication_group_id: str,
        initial_sleep_time: float | None = None,
        exponential_back_off_factor: float | None = None,
        max_retries: int | None = None,
    ):
        """
        Helper for deleting a replication group ensuring it is either deleted or can't be deleted

        :param replication_group_id: ID of replication to delete
        :param initial_sleep_time: Initial sleep time in second
            If this is not supplied then this is defaulted to class level value
        :param exponential_back_off_factor: Multiplication factor for deciding next sleep time
            If this is not supplied then this is defaulted to class level value
        :param max_retries: Max retries for checking availability of replication group
            If this is not supplied then this is defaulted to class level value
        :return: Response from ElastiCache delete replication group API and flag to identify if deleted or not
        :rtype: (dict, bool)
        """
        deleted = False
        sleep_time = initial_sleep_time or self.initial_poke_interval
        exponential_back_off_factor = exponential_back_off_factor or self.exponential_back_off_factor
        max_retries = max_retries or self.max_retries
        num_tries = 0
        response = None

        while not deleted and num_tries <= max_retries:
            try:
                status = self.get_replication_group_status(replication_group_id=replication_group_id)

                self.log.info(
                    'Current status of replication group with ID %s is %s', replication_group_id, status
                )

                # Can only delete if status is `available`
                # Status becomes `deleting` after this call so this will only run once
                if status == 'available':
                    self.log.info("Initiating delete and then wait for it to finish")

                    response = self.delete_replication_group(replication_group_id=replication_group_id)

            except self.conn.exceptions.ReplicationGroupNotFoundFault:
                self.log.info("Replication group with ID '%s' does not exist", replication_group_id)

                deleted = True

            # This should never occur as we only issue a delete request when status is `available`
            # which is a valid status for deletion. Still handling for safety.
            except self.conn.exceptions.InvalidReplicationGroupStateFault as exp:
                # status      Error Response
                # creating  - Cache cluster <cluster_id> is not in a valid state to be deleted.
                # deleting  - Replication group <replication_group_id> has status deleting which is not valid
                #             for deletion.
                # modifying - Replication group <replication_group_id> has status deleting which is not valid
                #             for deletion.

                message = exp.response['Error']['Message']

                self.log.warning('Received error message from AWS ElastiCache API : %s', message)

            if not deleted:
                num_tries += 1

                # No point in sleeping if all tries have exhausted
                if num_tries > max_retries:
                    break

                self.log.info('Poke retry %s. Sleep time %s seconds. Sleeping...', num_tries, sleep_time)

                sleep(sleep_time)

                sleep_time *= exponential_back_off_factor

        return response, deleted

    def ensure_delete_replication_group(
        self,
        replication_group_id: str,
        initial_sleep_time: float | None = None,
        exponential_back_off_factor: float | None = None,
        max_retries: int | None = None,
    ):
        """
        Delete a replication group ensuring it is either deleted or can't be deleted

        :param replication_group_id: ID of replication to delete
        :param initial_sleep_time: Initial sleep time in second
            If this is not supplied then this is defaulted to class level value
        :param exponential_back_off_factor: Multiplication factor for deciding next sleep time
            If this is not supplied then this is defaulted to class level value
        :param max_retries: Max retries for checking availability of replication group
            If this is not supplied then this is defaulted to class level value
        :return: Response from ElastiCache delete replication group API
        :rtype: dict
        :raises AirflowException: If replication group is not deleted
        """
        self.log.info('Deleting replication group with ID %s', replication_group_id)

        response, deleted = self.wait_for_deletion(
            replication_group_id=replication_group_id,
            initial_sleep_time=initial_sleep_time,
            exponential_back_off_factor=exponential_back_off_factor,
            max_retries=max_retries,
        )

        if not deleted:
            raise AirflowException(f'Replication group could not be deleted. Response "{response}"')

        return response

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow appflow 源码

airflow athena 源码

airflow base_aws 源码

airflow batch_client 源码

airflow batch_waiters 源码

airflow cloud_formation 源码

airflow datasync 源码

airflow dms 源码

airflow dynamodb 源码

0  赞