airflow cli_parser 源码
airflow cli_parser 代码
文件路径:/airflow/cli/cli_parser.py
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Command-line interface"""
from __future__ import annotations
import argparse
import json
import os
import textwrap
from argparse import Action, ArgumentError, RawTextHelpFormatter
from functools import lru_cache
from typing import Callable, Iterable, NamedTuple, Union
import lazy_object_proxy
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.cli import ColorMode
from airflow.utils.helpers import partition
from airflow.utils.module_loading import import_string
from airflow.utils.timezone import parse as parsedate
BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
def lazy_load_command(import_path: str) -> Callable:
"""Create a lazy loader for command"""
_, _, name = import_path.rpartition('.')
def command(*args, **kwargs):
func = import_string(import_path)
return func(*args, **kwargs)
command.__name__ = name
return command
class DefaultHelpParser(argparse.ArgumentParser):
"""CustomParser to display help message"""
def _check_value(self, action, value):
"""Override _check_value and check conditionally added command"""
if action.dest == 'subcommand' and value == 'celery':
executor = conf.get('core', 'EXECUTOR')
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
classes = ()
try:
from airflow.executors.celery_executor import CeleryExecutor
classes += (CeleryExecutor,)
except ImportError:
message = (
"The celery subcommand requires that you pip install the celery module. "
"To do it, run: pip install 'apache-airflow[celery]'"
)
raise ArgumentError(action, message)
try:
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
classes += (CeleryKubernetesExecutor,)
except ImportError:
pass
if not issubclass(executor_cls, classes):
message = (
f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and '
f'executors derived from them, your current executor: {executor}, subclassed from: '
f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
)
raise ArgumentError(action, message)
if action.dest == 'subcommand' and value == 'kubernetes':
try:
import kubernetes.client # noqa: F401
except ImportError:
message = (
"The kubernetes subcommand requires that you pip install the kubernetes python client. "
"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
)
raise ArgumentError(action, message)
if action.choices is not None and value not in action.choices:
check_legacy_command(action, value)
super()._check_value(action, value)
def error(self, message):
"""Override error and use print_instead of print_usage"""
self.print_help()
self.exit(2, f'\n{self.prog} command error: {message}, see help above.\n')
# Used in Arg to enable `None' as a distinct value from "not passed"
_UNSET = object()
class Arg:
"""Class to keep information about command line argument"""
def __init__(
self,
flags=_UNSET,
help=_UNSET,
action=_UNSET,
default=_UNSET,
nargs=_UNSET,
type=_UNSET,
choices=_UNSET,
required=_UNSET,
metavar=_UNSET,
dest=_UNSET,
):
self.flags = flags
self.kwargs = {}
for k, v in locals().items():
if v is _UNSET:
continue
if k in ("self", "flags"):
continue
self.kwargs[k] = v
def add_to_parser(self, parser: argparse.ArgumentParser):
"""Add this argument to an ArgumentParser"""
parser.add_argument(*self.flags, **self.kwargs)
def positive_int(*, allow_zero):
"""Define a positive int type for an argument."""
def _check(value):
try:
value = int(value)
if allow_zero and value == 0:
return value
if value > 0:
return value
except ValueError:
pass
raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
return _check
def string_list_type(val):
"""Parses comma-separated list and returns list of string (strips whitespace)"""
return [x.strip() for x in val.split(',')]
def string_lower_type(val):
"""Lowers arg"""
if not val:
return
return val.strip().lower()
# Shared
ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
ARG_EXECUTION_DATE_OPTIONAL = Arg(
("execution_date",), nargs='?', help="The execution date of the DAG (optional)", type=parsedate
)
ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
('execution_date_or_run_id',), help="The execution_date of the DAG or run_id of the DAGRun"
)
ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL = Arg(
('execution_date_or_run_id',),
nargs='?',
help="The execution_date of the DAG or run_id of the DAGRun (optional)",
)
ARG_TASK_REGEX = Arg(
("-t", "--task-regex"), help="The regex to filter specific task_ids to backfill (optional)"
)
ARG_SUBDIR = Arg(
("-S", "--subdir"),
help=(
"File location or directory from which to look for the dag. "
"Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
"value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
),
default='[AIRFLOW_HOME]/dags' if BUILD_DOCS else settings.DAGS_FOLDER,
)
ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate)
ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate)
ARG_OUTPUT_PATH = Arg(
(
"-o",
"--output-path",
),
help="The output for generated yaml files",
type=str,
default="[CWD]" if BUILD_DOCS else os.getcwd(),
)
ARG_DRY_RUN = Arg(
("-n", "--dry-run"),
help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
action="store_true",
)
ARG_PID = Arg(("--pid",), help="PID file location", nargs='?')
ARG_DAEMON = Arg(
("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
)
ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
ARG_YES = Arg(
("-y", "--yes"),
help="Do not prompt to confirm. Use with care!",
action="store_true",
default=False,
)
ARG_OUTPUT = Arg(
(
"-o",
"--output",
),
help="Output format. Allowed values: json, yaml, plain, table (default: table)",
metavar="(table, json, yaml, plain)",
choices=("table", "json", "yaml", "plain"),
default="table",
)
ARG_COLOR = Arg(
('--color',),
help="Do emit colored output (default: auto)",
choices={ColorMode.ON, ColorMode.OFF, ColorMode.AUTO},
default=ColorMode.AUTO,
)
# DB args
ARG_VERSION_RANGE = Arg(
("-r", "--range"),
help="Version range(start:end) for offline sql generation. Example: '2.0.2:2.2.3'",
default=None,
)
ARG_REVISION_RANGE = Arg(
('--revision-range',),
help=(
"Migration revision range(start:end) to use for offline sql generation. "
"Example: ``a13f7613ad25:7b2661a43ba3``"
),
default=None,
)
# list_dag_runs
ARG_DAG_ID_REQ_FLAG = Arg(
("-d", "--dag-id"), required=True, help="The id of the dag"
) # TODO: convert this to a positional arg in Airflow 3
ARG_NO_BACKFILL = Arg(
("--no-backfill",), help="filter all the backfill dagruns given the dag id", action="store_true"
)
ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to the state")
# list_jobs
ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
ARG_LIMIT = Arg(("--limit",), help="Return a limited number of records")
# next_execution
ARG_NUM_EXECUTIONS = Arg(
("-n", "--num-executions"),
default=1,
type=positive_int(allow_zero=False),
help="The number of next execution datetimes to show",
)
# backfill
ARG_MARK_SUCCESS = Arg(
("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
)
ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more verbose", action="store_true")
ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
ARG_DONOT_PICKLE = Arg(
("-x", "--donot-pickle"),
help=(
"Do not attempt to pickle the DAG object to send over "
"to the workers, just tell the workers to run their version "
"of the code"
),
action="store_true",
)
ARG_BF_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help=(
"Skip upstream tasks, run only the tasks "
"matching the regexp. Only works in conjunction "
"with task_regex"
),
action="store_true",
)
ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST = Arg(
("-I", "--ignore-first-depends-on-past"),
help=(
"Ignores depends_on_past dependencies for the first "
"set of tasks only (subsequent executions in the backfill "
"DO respect depends_on_past)"
),
action="store_true",
)
ARG_POOL = Arg(("--pool",), "Resource pool to use")
ARG_DELAY_ON_LIMIT = Arg(
("--delay-on-limit",),
help=(
"Amount of time in seconds to wait when the limit "
"on maximum active dag runs (max_active_runs) has "
"been reached before trying to execute a dag run "
"again"
),
type=float,
default=1.0,
)
ARG_RESET_DAG_RUN = Arg(
("--reset-dagruns",),
help=(
"if set, the backfill will delete existing "
"backfill-related DAG runs and start "
"anew with fresh, running DAG runs"
),
action="store_true",
)
ARG_RERUN_FAILED_TASKS = Arg(
("--rerun-failed-tasks",),
help=(
"if set, the backfill will auto-rerun "
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"
),
action="store_true",
)
ARG_CONTINUE_ON_FAILURES = Arg(
("--continue-on-failures",),
help=("if set, the backfill will keep going even if some of the tasks failed"),
action="store_true",
)
ARG_RUN_BACKWARDS = Arg(
(
"-B",
"--run-backwards",
),
help=(
"if set, the backfill will run tasks from the most "
"recent day first. if there are tasks that depend_on_past "
"this option will throw an exception"
),
action="store_true",
)
ARG_TREAT_DAG_AS_REGEX = Arg(
("--treat-dag-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
action="store_true",
)
# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
help=(
"After completing the backfill, shows the diagram for current DAG Run.\n"
"\n"
"The diagram is in DOT language\n"
),
action='store_true',
)
ARG_IMGCAT_DAGRUN = Arg(
("--imgcat-dagrun",),
help=(
"After completing the dag run, prints a diagram on the screen for the "
"current DAG Run using the imgcat tool.\n"
),
action='store_true',
)
ARG_SAVE_DAGRUN = Arg(
("--save-dagrun",),
help="After completing the backfill, saves the diagram for current DAG Run to the indicated file.\n\n",
)
# list_tasks
ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
# tasks_run
# This is a hidden option -- not meant for users to set or know about
ARG_SHUT_DOWN_LOGGING = Arg(
("--no-shut-down-logging",),
help=argparse.SUPPRESS,
dest="shut_down_logging",
action="store_false",
default=True,
)
# clear
ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", action="store_true")
ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", action="store_true")
ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", action="store_true")
ARG_EXCLUDE_PARENTDAG = Arg(
("-X", "--exclude-parentdag"),
help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
action="store_true",
)
ARG_DAG_REGEX = Arg(
("-R", "--dag-regex"), help="Search dag_id as regex instead of exact string", action="store_true"
)
# show_dag
ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated file.")
ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", action='store_true')
# trigger_dag
ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
ARG_CONF = Arg(('-c', '--conf'), help="JSON string that gets pickled into the DagRun's conf attribute")
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
# db
ARG_DB_TABLES = Arg(
("-t", "--tables"),
help=lazy_object_proxy.Proxy(
lambda: f"Table names to perform maintenance on (use comma-separated list).\n"
f"Options: {import_string('airflow.cli.commands.db_command.all_tables')}"
),
type=string_list_type,
)
ARG_DB_CLEANUP_TIMESTAMP = Arg(
("--clean-before-timestamp",),
help="The date or timestamp before which data should be purged.\n"
"If no timezone info is supplied then dates are assumed to be in airflow default timezone.\n"
"Example: '2022-01-01 00:00:00+01:00'",
type=parsedate,
required=True,
)
ARG_DB_DRY_RUN = Arg(
("--dry-run",),
help="Perform a dry run",
action="store_true",
)
ARG_DB_SKIP_ARCHIVE = Arg(
("--skip-archive",),
help="Don't preserve purged records in an archive table.",
action="store_true",
)
# pool
ARG_POOL_NAME = Arg(("pool",), metavar='NAME', help="Pool name")
ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
ARG_POOL_IMPORT = Arg(
("file",),
metavar="FILEPATH",
help="Import pools from JSON file. Example format::\n"
+ textwrap.indent(
textwrap.dedent(
'''
{
"pool_1": {"slots": 5, "description": ""},
"pool_2": {"slots": 10, "description": "test"}
}'''
),
" " * 4,
),
)
ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to JSON file")
# variables
ARG_VAR = Arg(("key",), help="Variable key")
ARG_VAR_VALUE = Arg(("value",), metavar='VALUE', help="Variable value")
ARG_DEFAULT = Arg(
("-d", "--default"), metavar="VAL", default=None, help="Default value returned if variable does not exist"
)
ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable", action="store_true")
ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs='?')
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs='?', default=conf.get('kerberos', 'keytab'))
# run
ARG_INTERACTIVE = Arg(
('-N', '--interactive'),
help='Do not capture standard output and error streams (useful for interactive debugging)',
action='store_true',
)
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
# all dependencies (not just past success), e.g. the ignore_depends_on_past
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
# the "ignore_all_dependencies" command should be called the"force" command
# instead.
ARG_FORCE = Arg(
("-f", "--force"),
help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
action="store_true",
)
ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
ARG_IGNORE_ALL_DEPENDENCIES = Arg(
("-A", "--ignore-all-dependencies"),
help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
action="store_true",
)
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
# vague (e.g. a task being in the appropriate state to be run is also a dependency
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
# slightly better (as it ignores all dependencies that are specific to the task),
# so deprecate the old command name and use this instead.
ARG_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
action="store_true",
)
ARG_IGNORE_DEPENDS_ON_PAST = Arg(
("-I", "--ignore-depends-on-past"),
help="Ignore depends_on_past dependencies (but respect upstream dependencies)",
action="store_true",
)
ARG_SHIP_DAG = Arg(
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(('--map-index',), type=int, default=-1, help="Mapped task index")
# database
ARG_MIGRATION_TIMEOUT = Arg(
("-t", "--migration-wait-timeout"),
help="timeout to wait for db to migrate ",
type=int,
default=60,
)
ARG_DB_RESERIALIZE_DAGS = Arg(
("--no-reserialize-dags",),
# Not intended for user, so dont show in help
help=argparse.SUPPRESS,
action="store_false",
default=True,
dest="reserialize_dags",
)
ARG_DB_VERSION__UPGRADE = Arg(
("-n", "--to-version"),
help=(
"(Optional) The airflow version to upgrade to. Note: must provide either "
"`--to-revision` or `--to-version`."
),
)
ARG_DB_REVISION__UPGRADE = Arg(
("-r", "--to-revision"),
help="(Optional) If provided, only run migrations up to and including this Alembic revision.",
)
ARG_DB_VERSION__DOWNGRADE = Arg(
("-n", "--to-version"),
help="(Optional) If provided, only run migrations up to this version.",
)
ARG_DB_FROM_VERSION = Arg(
("--from-version",),
help="(Optional) If generating sql, may supply a *from* version",
)
ARG_DB_REVISION__DOWNGRADE = Arg(
("-r", "--to-revision"),
help="The Alembic revision to downgrade to. Note: must provide either `--to-revision` or `--to-version`.",
)
ARG_DB_FROM_REVISION = Arg(
("--from-revision",),
help="(Optional) If generating sql, may supply a *from* Alembic revision",
)
ARG_DB_SQL_ONLY = Arg(
("-s", "--show-sql-only"),
help="Don't actually run migrations; just print out sql scripts for offline migration. "
"Required if using either `--from-revision` or `--from-version`.",
action="store_true",
default=False,
)
ARG_DB_SKIP_INIT = Arg(
("-s", "--skip-init"),
help="Only remove tables; do not perform db init.",
action="store_true",
default=False,
)
# webserver
ARG_PORT = Arg(
("-p", "--port"),
default=conf.get('webserver', 'WEB_SERVER_PORT'),
type=int,
help="The port on which to run the server",
)
ARG_SSL_CERT = Arg(
("--ssl-cert",),
default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
help="Path to the SSL certificate for the webserver",
)
ARG_SSL_KEY = Arg(
("--ssl-key",),
default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
help="Path to the key to use with the SSL certificate",
)
ARG_WORKERS = Arg(
("-w", "--workers"),
default=conf.get('webserver', 'WORKERS'),
type=int,
help="Number of workers to run the webserver on",
)
ARG_WORKERCLASS = Arg(
("-k", "--workerclass"),
default=conf.get('webserver', 'WORKER_CLASS'),
choices=['sync', 'eventlet', 'gevent', 'tornado'],
help="The worker class to use for Gunicorn",
)
ARG_WORKER_TIMEOUT = Arg(
("-t", "--worker-timeout"),
default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'),
type=int,
help="The timeout for waiting on webserver workers",
)
ARG_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get('webserver', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server",
)
ARG_DEBUG = Arg(
("-d", "--debug"), help="Use the server that ships with Flask in debug mode", action="store_true"
)
ARG_ACCESS_LOGFILE = Arg(
("-A", "--access-logfile"),
default=conf.get('webserver', 'ACCESS_LOGFILE'),
help="The logfile to store the webserver access log. Use '-' to print to stderr",
)
ARG_ERROR_LOGFILE = Arg(
("-E", "--error-logfile"),
default=conf.get('webserver', 'ERROR_LOGFILE'),
help="The logfile to store the webserver error log. Use '-' to print to stderr",
)
ARG_ACCESS_LOGFORMAT = Arg(
("-L", "--access-logformat"),
default=conf.get('webserver', 'ACCESS_LOGFORMAT'),
help="The access log format for gunicorn logs",
)
# scheduler
ARG_NUM_RUNS = Arg(
("-n", "--num-runs"),
default=conf.getint('scheduler', 'num_runs'),
type=int,
help="Set the number of runs to execute before exiting",
)
ARG_DO_PICKLE = Arg(
("-p", "--do-pickle"),
default=False,
help=(
"Attempt to pickle the DAG object to send over "
"to the workers, instead of letting workers run their version "
"of the code"
),
action="store_true",
)
# worker
ARG_QUEUES = Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get('operators', 'DEFAULT_QUEUE'),
)
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.get('celery', 'worker_concurrency'),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
help="Set the hostname of celery worker if you have multiple workers on a single machine",
)
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
default=False,
help="Don't synchronize with other workers at start-up",
action="store_true",
)
ARG_WITHOUT_GOSSIP = Arg(
("--without-gossip",),
default=False,
help="Don't subscribe to other workers events",
action="store_true",
)
# flower
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get('celery', 'FLOWER_HOST'),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.get('celery', 'FLOWER_PORT'),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"), default=conf.get('celery', 'FLOWER_URL_PREFIX'), help="URL prefix for Flower"
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get('celery', 'FLOWER_BASIC_AUTH'),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
"Example: flower_basic_auth = user1:password1,user2:password2"
),
)
ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
ARG_POST_MORTEM = Arg(
("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
)
ARG_ENV_VARS = Arg(
("--env-vars",),
help="Set env var in both parsing time and runtime for each of entry supplied in a JSON dict",
type=json.loads,
)
# connections
ARG_CONN_ID = Arg(('conn_id',), help='Connection id, required to get/add/delete a connection', type=str)
ARG_CONN_ID_FILTER = Arg(
('--conn-id',), help='If passed, only items with the specified connection ID will be displayed', type=str
)
ARG_CONN_URI = Arg(
('--conn-uri',), help='Connection URI, required to add a connection without conn_type', type=str
)
ARG_CONN_JSON = Arg(
('--conn-json',), help='Connection JSON, required to add a connection using JSON representation', type=str
)
ARG_CONN_TYPE = Arg(
('--conn-type',), help='Connection type, required to add a connection without conn_uri', type=str
)
ARG_CONN_DESCRIPTION = Arg(
('--conn-description',), help='Connection description, optional when adding a connection', type=str
)
ARG_CONN_HOST = Arg(('--conn-host',), help='Connection host, optional when adding a connection', type=str)
ARG_CONN_LOGIN = Arg(('--conn-login',), help='Connection login, optional when adding a connection', type=str)
ARG_CONN_PASSWORD = Arg(
('--conn-password',), help='Connection password, optional when adding a connection', type=str
)
ARG_CONN_SCHEMA = Arg(
('--conn-schema',), help='Connection schema, optional when adding a connection', type=str
)
ARG_CONN_PORT = Arg(('--conn-port',), help='Connection port, optional when adding a connection', type=str)
ARG_CONN_EXTRA = Arg(
('--conn-extra',), help='Connection `Extra` field, optional when adding a connection', type=str
)
ARG_CONN_EXPORT = Arg(
('file',),
help='Output file path for exporting the connections',
type=argparse.FileType('w', encoding='UTF-8'),
)
ARG_CONN_EXPORT_FORMAT = Arg(
('--format',),
help='Deprecated -- use `--file-format` instead. File format to use for the export.',
type=str,
choices=['json', 'yaml', 'env'],
)
ARG_CONN_EXPORT_FILE_FORMAT = Arg(
('--file-format',), help='File format for the export', type=str, choices=['json', 'yaml', 'env']
)
ARG_CONN_SERIALIZATION_FORMAT = Arg(
('--serialization-format',),
help='When exporting as `.env` format, defines how connections should be serialized. Default is `uri`.',
type=string_lower_type,
choices=['json', 'uri'],
)
ARG_CONN_IMPORT = Arg(("file",), help="Import connections from a file")
# providers
ARG_PROVIDER_NAME = Arg(
('provider_name',), help='Provider name, required to get provider information', type=str
)
ARG_FULL = Arg(
('-f', '--full'),
help='Full information about the provider, including documentation information.',
required=False,
action="store_true",
)
# users
ARG_USERNAME = Arg(('-u', '--username'), help='Username of the user', required=True, type=str)
ARG_USERNAME_OPTIONAL = Arg(('-u', '--username'), help='Username of the user', type=str)
ARG_FIRSTNAME = Arg(('-f', '--firstname'), help='First name of the user', required=True, type=str)
ARG_LASTNAME = Arg(('-l', '--lastname'), help='Last name of the user', required=True, type=str)
ARG_ROLE = Arg(
('-r', '--role'),
help='Role of the user. Existing roles include Admin, User, Op, Viewer, and Public',
required=True,
type=str,
)
ARG_EMAIL = Arg(('-e', '--email'), help='Email of the user', required=True, type=str)
ARG_EMAIL_OPTIONAL = Arg(('-e', '--email'), help='Email of the user', type=str)
ARG_PASSWORD = Arg(
('-p', '--password'),
help='Password of the user, required to create a user without --use-random-password',
type=str,
)
ARG_USE_RANDOM_PASSWORD = Arg(
('--use-random-password',),
help='Do not prompt for password. Use random string instead.'
' Required to create a user without --password ',
default=False,
action='store_true',
)
ARG_USER_IMPORT = Arg(
("import",),
metavar="FILEPATH",
help="Import users from JSON file. Example format::\n"
+ textwrap.indent(
textwrap.dedent(
'''
[
{
"email": "foo@bar.org",
"firstname": "Jon",
"lastname": "Doe",
"roles": ["Public"],
"username": "jondoe"
}
]'''
),
" " * 4,
),
)
ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users to JSON file")
# roles
ARG_CREATE_ROLE = Arg(('-c', '--create'), help='Create a new role', action='store_true')
ARG_LIST_ROLES = Arg(('-l', '--list'), help='List roles', action='store_true')
ARG_ROLES = Arg(('role',), help='The name of a role', nargs='*')
ARG_PERMISSIONS = Arg(('-p', '--permission'), help='Show role permissions', action='store_true')
ARG_ROLE_RESOURCE = Arg(('-r', '--resource'), help='The name of permissions', nargs='*', required=True)
ARG_ROLE_ACTION = Arg(('-a', '--action'), help='The action of permissions', nargs='*')
ARG_ROLE_ACTION_REQUIRED = Arg(('-a', '--action'), help='The action of permissions', nargs='*', required=True)
ARG_AUTOSCALE = Arg(('-a', '--autoscale'), help="Minimum and Maximum number of worker to autoscale")
ARG_SKIP_SERVE_LOGS = Arg(
("-s", "--skip-serve-logs"),
default=False,
help="Don't start the serve logs process along with the workers",
action="store_true",
)
ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", nargs=None)
ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", nargs=None)
ARG_ROLE_EXPORT_FMT = Arg(
('-p', '--pretty'),
help='Format output JSON file by sorting role names and indenting by 4 spaces',
action='store_true',
)
# info
ARG_ANONYMIZE = Arg(
('--anonymize',),
help='Minimize any personal identifiable information. Use it when sharing output with others.',
action='store_true',
)
ARG_FILE_IO = Arg(
('--file-io',), help='Send output to file.io service and returns link.', action='store_true'
)
# config
ARG_SECTION = Arg(
("section",),
help="The section name",
)
ARG_OPTION = Arg(
("option",),
help="The option name",
)
# kubernetes cleanup-pods
ARG_NAMESPACE = Arg(
("--namespace",),
default=conf.get('kubernetes_executor', 'namespace'),
help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",
)
ARG_MIN_PENDING_MINUTES = Arg(
("--min-pending-minutes",),
default=30,
type=positive_int(allow_zero=False),
help=(
"Pending pods created before the time interval are to be cleaned up, "
"measured in minutes. Default value is 30(m). The minimum value is 5(m)."
),
)
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
('--job-type',),
choices=('BackfillJob', 'LocalTaskJob', 'SchedulerJob', 'TriggererJob'),
action='store',
help='The type of job(s) that will be checked.',
)
ARG_JOB_HOSTNAME_FILTER = Arg(
("--hostname",),
default=None,
type=str,
help="The hostname of job(s) that will be checked.",
)
ARG_JOB_HOSTNAME_CALLABLE_FILTER = Arg(
("--local",),
action='store_true',
help="If passed, this command will only show jobs from the local host "
"(those with a hostname matching what `hostname_callable` returns).",
)
ARG_JOB_LIMIT = Arg(
("--limit",),
default=1,
type=positive_int(allow_zero=True),
help="The number of recent jobs that will be checked. To disable limit, set 0. ",
)
ARG_ALLOW_MULTIPLE = Arg(
("--allow-multiple",),
action='store_true',
help="If passed, this command will be successful even if multiple matching alive jobs are found.",
)
# sync-perm
ARG_INCLUDE_DAGS = Arg(
("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true"
)
# triggerer
ARG_CAPACITY = Arg(
("--capacity",),
type=positive_int(allow_zero=False),
help="The maximum number of triggers that a Triggerer will run at one time.",
)
# reserialize
ARG_CLEAR_ONLY = Arg(
("--clear-only",),
action="store_true",
help="If passed, serialized DAGs will be cleared but not reserialized.",
)
ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
ARG_CONN_HOST,
ARG_CONN_LOGIN,
ARG_CONN_PASSWORD,
ARG_CONN_SCHEMA,
ARG_CONN_PORT,
]
class ActionCommand(NamedTuple):
"""Single CLI command"""
name: str
help: str
func: Callable
args: Iterable[Arg]
description: str | None = None
epilog: str | None = None
class GroupCommand(NamedTuple):
"""ClI command with subcommands"""
name: str
help: str
subcommands: Iterable
description: str | None = None
epilog: str | None = None
CLICommand = Union[ActionCommand, GroupCommand]
DAGS_COMMANDS = (
ActionCommand(
name='list',
help="List all the DAGs",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_list_dags'),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='list-import-errors',
help="List all the DAGs that have import errors",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_list_import_errors'),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='report',
help='Show DagBag loading report',
func=lazy_load_command('airflow.cli.commands.dag_command.dag_report'),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='list-runs',
help="List DAG runs given a DAG id",
description=(
"List DAG runs given a DAG id. If state option is given, it will only search for all the "
"dagruns with the given state. If no_backfill option is given, it will filter out all "
"backfill dagruns for given dag id. If start_date is given, it will filter out all the "
"dagruns that were executed before this date. If end_date is given, it will filter out "
"all the dagruns that were executed after this date. "
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_list_dag_runs'),
args=(
ARG_DAG_ID_REQ_FLAG,
ARG_NO_BACKFILL,
ARG_STATE,
ARG_OUTPUT,
ARG_VERBOSE,
ARG_START_DATE,
ARG_END_DATE,
),
),
ActionCommand(
name='list-jobs',
help="List the jobs",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_list_jobs'),
args=(ARG_DAG_ID_OPT, ARG_STATE, ARG_LIMIT, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='state',
help="Get the status of a dag run",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_state'),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR),
),
ActionCommand(
name='next-execution',
help="Get the next execution datetimes of a DAG",
description=(
"Get the next execution datetimes of a DAG. It returns one execution unless the "
"num-executions option is given"
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS),
),
ActionCommand(
name='pause',
help='Pause a DAG',
func=lazy_load_command('airflow.cli.commands.dag_command.dag_pause'),
args=(ARG_DAG_ID, ARG_SUBDIR),
),
ActionCommand(
name='unpause',
help='Resume a paused DAG',
func=lazy_load_command('airflow.cli.commands.dag_command.dag_unpause'),
args=(ARG_DAG_ID, ARG_SUBDIR),
),
ActionCommand(
name='trigger',
help='Trigger a DAG run',
func=lazy_load_command('airflow.cli.commands.dag_command.dag_trigger'),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE),
),
ActionCommand(
name='delete',
help="Delete all DB records related to the specified DAG",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_delete'),
args=(ARG_DAG_ID, ARG_YES),
),
ActionCommand(
name='show',
help="Displays DAG's tasks with their dependencies",
description=(
"The --imgcat option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"The --save option saves the result to the indicated file.\n"
"\n"
"The file format is determined by the file extension. "
"For more information about supported "
"format, see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.dot\n"
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_show'),
args=(
ARG_DAG_ID,
ARG_SUBDIR,
ARG_SAVE,
ARG_IMGCAT,
),
),
ActionCommand(
name='show-dependencies',
help="Displays DAGs with their dependencies",
description=(
"The --imgcat option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"The --save option saves the result to the indicated file.\n"
"\n"
"The file format is determined by the file extension. "
"For more information about supported "
"format, see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags show-dependencies --save output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags show-dependencies --save output.dot\n"
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_dependencies_show'),
args=(
ARG_SUBDIR,
ARG_SAVE,
ARG_IMGCAT,
),
),
ActionCommand(
name='backfill',
help="Run subsections of a DAG for a specified date range",
description=(
"Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
"backfill will first prompt users whether airflow should clear all the previous dag_run and "
"task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
"will auto re-run the previous failed task instances within the backfill date range"
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_backfill'),
args=(
ARG_DAG_ID,
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_MARK_SUCCESS,
ARG_LOCAL,
ARG_DONOT_PICKLE,
ARG_YES,
ARG_CONTINUE_ON_FAILURES,
ARG_BF_IGNORE_DEPENDENCIES,
ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
ARG_SUBDIR,
ARG_POOL,
ARG_DELAY_ON_LIMIT,
ARG_DRY_RUN,
ARG_VERBOSE,
ARG_CONF,
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_AS_REGEX,
),
),
ActionCommand(
name='test',
help="Execute one single DagRun",
description=(
"Execute one single DagRun for a given DAG and execution date, "
"using the DebugExecutor.\n"
"\n"
"The --imgcat-dagrun option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"If --save-dagrun is used, then, after completing the backfill, saves the diagram "
"for current DAG Run to the indicated file.\n"
"The file format is determined by the file extension. "
"For more information about supported format, "
"see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun output.dot\n"
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_test'),
args=(
ARG_DAG_ID,
ARG_EXECUTION_DATE_OPTIONAL,
ARG_CONF,
ARG_SUBDIR,
ARG_SHOW_DAGRUN,
ARG_IMGCAT_DAGRUN,
ARG_SAVE_DAGRUN,
),
),
ActionCommand(
name='reserialize',
help="Reserialize all DAGs by parsing the DagBag files",
description=(
"Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
"from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
"version of Airflow that you are running."
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
args=(
ARG_CLEAR_ONLY,
ARG_SUBDIR,
),
),
)
TASKS_COMMANDS = (
ActionCommand(
name='list',
help="List the tasks within a DAG",
func=lazy_load_command('airflow.cli.commands.task_command.task_list'),
args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, ARG_VERBOSE),
),
ActionCommand(
name='clear',
help="Clear a set of task instance, as if they never ran",
func=lazy_load_command('airflow.cli.commands.task_command.task_clear'),
args=(
ARG_DAG_ID,
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_SUBDIR,
ARG_UPSTREAM,
ARG_DOWNSTREAM,
ARG_YES,
ARG_ONLY_FAILED,
ARG_ONLY_RUNNING,
ARG_EXCLUDE_SUBDAGS,
ARG_EXCLUDE_PARENTDAG,
ARG_DAG_REGEX,
),
),
ActionCommand(
name='state',
help="Get the status of a task instance",
func=lazy_load_command('airflow.cli.commands.task_command.task_state'),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
),
ActionCommand(
name='failed-deps',
help="Returns the unmet dependencies for a task instance",
description=(
"Returns the unmet dependencies for a task instance from the perspective of the scheduler. "
"In other words, why a task instance doesn't get scheduled and then queued by the scheduler, "
"and then run by an executor."
),
func=lazy_load_command('airflow.cli.commands.task_command.task_failed_deps'),
args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_SUBDIR, ARG_MAP_INDEX),
),
ActionCommand(
name='render',
help="Render a task instance's template(s)",
func=lazy_load_command('airflow.cli.commands.task_command.task_render'),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
),
ActionCommand(
name='run',
help="Run a single task instance",
func=lazy_load_command('airflow.cli.commands.task_command.task_run'),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_MARK_SUCCESS,
ARG_FORCE,
ARG_POOL,
ARG_CFG_PATH,
ARG_LOCAL,
ARG_RAW,
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_IGNORE_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
),
),
ActionCommand(
name='test',
help="Test a task instance",
description=(
"Test a task instance. This will run a task without checking for dependencies or recording "
"its state in the database"
),
func=lazy_load_command('airflow.cli.commands.task_command.task_test'),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL,
ARG_SUBDIR,
ARG_DRY_RUN,
ARG_TASK_PARAMS,
ARG_POST_MORTEM,
ARG_ENV_VARS,
ARG_MAP_INDEX,
),
),
ActionCommand(
name='states-for-dag-run',
help="Get the status of all task instances in a dag run",
func=lazy_load_command('airflow.cli.commands.task_command.task_states_for_dag_run'),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT, ARG_VERBOSE),
),
)
POOLS_COMMANDS = (
ActionCommand(
name='list',
help='List pools',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='get',
help='Get pool size',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_get'),
args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='set',
help='Configure pool',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_set'),
args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='delete',
help='Delete pool',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_delete'),
args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='import',
help='Import pools',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_import'),
args=(ARG_POOL_IMPORT, ARG_VERBOSE),
),
ActionCommand(
name='export',
help='Export all pools',
func=lazy_load_command('airflow.cli.commands.pool_command.pool_export'),
args=(ARG_POOL_EXPORT,),
),
)
VARIABLES_COMMANDS = (
ActionCommand(
name='list',
help='List variables',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='get',
help='Get variable',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_get'),
args=(ARG_VAR, ARG_DESERIALIZE_JSON, ARG_DEFAULT, ARG_VERBOSE),
),
ActionCommand(
name='set',
help='Set variable',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_set'),
args=(ARG_VAR, ARG_VAR_VALUE, ARG_SERIALIZE_JSON),
),
ActionCommand(
name='delete',
help='Delete variable',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_delete'),
args=(ARG_VAR,),
),
ActionCommand(
name='import',
help='Import variables',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_import'),
args=(ARG_VAR_IMPORT,),
),
ActionCommand(
name='export',
help='Export all variables',
func=lazy_load_command('airflow.cli.commands.variable_command.variables_export'),
args=(ARG_VAR_EXPORT,),
),
)
DB_COMMANDS = (
ActionCommand(
name='init',
help="Initialize the metadata database",
func=lazy_load_command('airflow.cli.commands.db_command.initdb'),
args=(),
),
ActionCommand(
name="check-migrations",
help="Check if migration have finished",
description="Check if migration have finished (or continually check until timeout)",
func=lazy_load_command('airflow.cli.commands.db_command.check_migrations'),
args=(ARG_MIGRATION_TIMEOUT,),
),
ActionCommand(
name='reset',
help="Burn down and rebuild the metadata database",
func=lazy_load_command('airflow.cli.commands.db_command.resetdb'),
args=(ARG_YES, ARG_DB_SKIP_INIT),
),
ActionCommand(
name='upgrade',
help="Upgrade the metadata database to latest version",
description=(
"Upgrade the schema of the metadata database. "
"To print but not execute commands, use option ``--show-sql-only``. "
"If using options ``--from-revision`` or ``--from-version``, you must also use "
"``--show-sql-only``, because if actually *running* migrations, we should only "
"migrate from the *current* Alembic revision."
),
func=lazy_load_command('airflow.cli.commands.db_command.upgradedb'),
args=(
ARG_DB_REVISION__UPGRADE,
ARG_DB_VERSION__UPGRADE,
ARG_DB_SQL_ONLY,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
),
),
ActionCommand(
name='downgrade',
help="Downgrade the schema of the metadata database.",
description=(
"Downgrade the schema of the metadata database. "
"You must provide either `--to-revision` or `--to-version`. "
"To print but not execute commands, use option `--show-sql-only`. "
"If using options `--from-revision` or `--from-version`, you must also use `--show-sql-only`, "
"because if actually *running* migrations, we should only migrate from the *current* Alembic "
"revision."
),
func=lazy_load_command('airflow.cli.commands.db_command.downgrade'),
args=(
ARG_DB_REVISION__DOWNGRADE,
ARG_DB_VERSION__DOWNGRADE,
ARG_DB_SQL_ONLY,
ARG_YES,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
),
),
ActionCommand(
name='shell',
help="Runs a shell to access the database",
func=lazy_load_command('airflow.cli.commands.db_command.shell'),
args=(),
),
ActionCommand(
name='check',
help="Check if the database can be reached",
func=lazy_load_command('airflow.cli.commands.db_command.check'),
args=(),
),
ActionCommand(
name='clean',
help="Purge old records in metastore tables",
func=lazy_load_command('airflow.cli.commands.db_command.cleanup_tables'),
args=(
ARG_DB_TABLES,
ARG_DB_DRY_RUN,
ARG_DB_CLEANUP_TIMESTAMP,
ARG_VERBOSE,
ARG_YES,
ARG_DB_SKIP_ARCHIVE,
),
),
)
CONNECTIONS_COMMANDS = (
ActionCommand(
name='get',
help='Get a connection',
func=lazy_load_command('airflow.cli.commands.connection_command.connections_get'),
args=(ARG_CONN_ID, ARG_COLOR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='list',
help='List connections',
func=lazy_load_command('airflow.cli.commands.connection_command.connections_list'),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_CONN_ID_FILTER),
),
ActionCommand(
name='add',
help='Add a connection',
func=lazy_load_command('airflow.cli.commands.connection_command.connections_add'),
args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) + tuple(ALTERNATIVE_CONN_SPECS_ARGS),
),
ActionCommand(
name='delete',
help='Delete a connection',
func=lazy_load_command('airflow.cli.commands.connection_command.connections_delete'),
args=(ARG_CONN_ID, ARG_COLOR),
),
ActionCommand(
name='export',
help='Export all connections',
description=(
"All connections can be exported in STDOUT using the following command:\n"
"airflow connections export -\n"
"The file format can be determined by the provided file extension. E.g., The following "
"command will export the connections in JSON format:\n"
"airflow connections export /tmp/connections.json\n"
"The --file-format parameter can be used to control the file format. E.g., "
"the default format is JSON in STDOUT mode, which can be overridden using: \n"
"airflow connections export - --file-format yaml\n"
"The --file-format parameter can also be used for the files, for example:\n"
"airflow connections export /tmp/connections --file-format json.\n"
"When exporting in `env` file format, you control whether URI format or JSON format "
"is used to serialize the connection by passing `uri` or `json` with option "
"`--serialization-format`.\n"
),
func=lazy_load_command('airflow.cli.commands.connection_command.connections_export'),
args=(
ARG_CONN_EXPORT,
ARG_CONN_EXPORT_FORMAT,
ARG_CONN_EXPORT_FILE_FORMAT,
ARG_CONN_SERIALIZATION_FORMAT,
),
),
ActionCommand(
name='import',
help='Import connections from a file',
description=(
"Connections can be imported from the output of the export command.\n"
"The filetype must by json, yaml or env and will be automatically inferred."
),
func=lazy_load_command('airflow.cli.commands.connection_command.connections_import'),
args=(ARG_CONN_IMPORT,),
),
)
PROVIDERS_COMMANDS = (
ActionCommand(
name='list',
help='List installed providers',
func=lazy_load_command('airflow.cli.commands.provider_command.providers_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='get',
help='Get detailed information about a provider',
func=lazy_load_command('airflow.cli.commands.provider_command.provider_get'),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_FULL, ARG_COLOR, ARG_PROVIDER_NAME),
),
ActionCommand(
name='links',
help='List extra links registered by the providers',
func=lazy_load_command('airflow.cli.commands.provider_command.extra_links_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='widgets',
help='Get information about registered connection form widgets',
func=lazy_load_command('airflow.cli.commands.provider_command.connection_form_widget_list'),
args=(
ARG_OUTPUT,
ARG_VERBOSE,
),
),
ActionCommand(
name='hooks',
help='List registered provider hooks',
func=lazy_load_command('airflow.cli.commands.provider_command.hooks_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='behaviours',
help='Get information about registered connection types with custom behaviours',
func=lazy_load_command('airflow.cli.commands.provider_command.connection_field_behaviours'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='logging',
help='Get information about task logging handlers provided',
func=lazy_load_command('airflow.cli.commands.provider_command.logging_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='secrets',
help='Get information about secrets backends provided',
func=lazy_load_command('airflow.cli.commands.provider_command.secrets_backends_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='auth',
help='Get information about API auth backends provided',
func=lazy_load_command('airflow.cli.commands.provider_command.auth_backend_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
)
USERS_COMMANDS = (
ActionCommand(
name='list',
help='List users',
func=lazy_load_command('airflow.cli.commands.user_command.users_list'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='create',
help='Create a user',
func=lazy_load_command('airflow.cli.commands.user_command.users_create'),
args=(
ARG_ROLE,
ARG_USERNAME,
ARG_EMAIL,
ARG_FIRSTNAME,
ARG_LASTNAME,
ARG_PASSWORD,
ARG_USE_RANDOM_PASSWORD,
),
epilog=(
'examples:\n'
'To create an user with "Admin" role and username equals to "admin", run:\n'
'\n'
' $ airflow users create \\\n'
' --username admin \\\n'
' --firstname FIRST_NAME \\\n'
' --lastname LAST_NAME \\\n'
' --role Admin \\\n'
' --email admin@example.org'
),
),
ActionCommand(
name='delete',
help='Delete a user',
func=lazy_load_command('airflow.cli.commands.user_command.users_delete'),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL),
),
ActionCommand(
name='add-role',
help='Add role to a user',
func=lazy_load_command('airflow.cli.commands.user_command.add_role'),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE),
),
ActionCommand(
name='remove-role',
help='Remove role from a user',
func=lazy_load_command('airflow.cli.commands.user_command.remove_role'),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE),
),
ActionCommand(
name='import',
help='Import users',
func=lazy_load_command('airflow.cli.commands.user_command.users_import'),
args=(ARG_USER_IMPORT,),
),
ActionCommand(
name='export',
help='Export all users',
func=lazy_load_command('airflow.cli.commands.user_command.users_export'),
args=(ARG_USER_EXPORT,),
),
)
ROLES_COMMANDS = (
ActionCommand(
name='list',
help='List roles',
func=lazy_load_command('airflow.cli.commands.role_command.roles_list'),
args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name='create',
help='Create role',
func=lazy_load_command('airflow.cli.commands.role_command.roles_create'),
args=(ARG_ROLES, ARG_VERBOSE),
),
ActionCommand(
name='delete',
help='Delete role',
func=lazy_load_command('airflow.cli.commands.role_command.roles_delete'),
args=(ARG_ROLES, ARG_VERBOSE),
),
ActionCommand(
name='add-perms',
help='Add roles permissions',
func=lazy_load_command('airflow.cli.commands.role_command.roles_add_perms'),
args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED, ARG_VERBOSE),
),
ActionCommand(
name='del-perms',
help='Delete roles permissions',
func=lazy_load_command('airflow.cli.commands.role_command.roles_del_perms'),
args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE),
),
ActionCommand(
name='export',
help='Export roles (without permissions) from db to JSON file',
func=lazy_load_command('airflow.cli.commands.role_command.roles_export'),
args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE),
),
ActionCommand(
name='import',
help='Import roles (without permissions) from JSON file to db',
func=lazy_load_command('airflow.cli.commands.role_command.roles_import'),
args=(ARG_ROLE_IMPORT, ARG_VERBOSE),
),
)
CELERY_COMMANDS = (
ActionCommand(
name='worker',
help="Start a Celery worker node",
func=lazy_load_command('airflow.cli.commands.celery_command.worker'),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
ARG_CELERY_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_UMASK,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_AUTOSCALE,
ARG_SKIP_SERVE_LOGS,
ARG_WITHOUT_MINGLE,
ARG_WITHOUT_GOSSIP,
),
),
ActionCommand(
name='flower',
help="Start a Celery Flower",
func=lazy_load_command('airflow.cli.commands.celery_command.flower'),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
ARG_FLOWER_CONF,
ARG_FLOWER_URL_PREFIX,
ARG_FLOWER_BASIC_AUTH,
ARG_BROKER_API,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
),
),
ActionCommand(
name='stop',
help="Stop the Celery worker gracefully",
func=lazy_load_command('airflow.cli.commands.celery_command.stop_worker'),
args=(ARG_PID,),
),
)
CONFIG_COMMANDS = (
ActionCommand(
name='get-value',
help='Print the value of the configuration',
func=lazy_load_command('airflow.cli.commands.config_command.get_value'),
args=(
ARG_SECTION,
ARG_OPTION,
),
),
ActionCommand(
name='list',
help='List options for the configuration',
func=lazy_load_command('airflow.cli.commands.config_command.show_config'),
args=(ARG_COLOR,),
),
)
KUBERNETES_COMMANDS = (
ActionCommand(
name='cleanup-pods',
help=(
"Clean up Kubernetes pods "
"(created by KubernetesExecutor/KubernetesPodOperator) "
"in evicted/failed/succeeded/pending states"
),
func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'),
args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES),
),
ActionCommand(
name='generate-dag-yaml',
help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
"launching into a cluster",
func=lazy_load_command('airflow.cli.commands.kubernetes_command.generate_pod_yaml'),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH),
),
)
JOBS_COMMANDS = (
ActionCommand(
name='check',
help="Checks if job(s) are still alive",
func=lazy_load_command('airflow.cli.commands.jobs_command.check'),
args=(
ARG_JOB_TYPE_FILTER,
ARG_JOB_HOSTNAME_FILTER,
ARG_JOB_HOSTNAME_CALLABLE_FILTER,
ARG_JOB_LIMIT,
ARG_ALLOW_MULTIPLE,
),
epilog=(
'examples:\n'
'To check if the local scheduler is still working properly, run:\n'
'\n'
' $ airflow jobs check --job-type SchedulerJob --local"\n'
'\n'
'To check if any scheduler is running when you are using high availability, run:\n'
'\n'
' $ airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100'
),
),
)
airflow_commands: list[CLICommand] = [
GroupCommand(
name='dags',
help='Manage DAGs',
subcommands=DAGS_COMMANDS,
),
GroupCommand(
name="kubernetes", help='Tools to help run the KubernetesExecutor', subcommands=KUBERNETES_COMMANDS
),
GroupCommand(
name='tasks',
help='Manage tasks',
subcommands=TASKS_COMMANDS,
),
GroupCommand(
name='pools',
help="Manage pools",
subcommands=POOLS_COMMANDS,
),
GroupCommand(
name='variables',
help="Manage variables",
subcommands=VARIABLES_COMMANDS,
),
GroupCommand(
name='jobs',
help="Manage jobs",
subcommands=JOBS_COMMANDS,
),
GroupCommand(
name='db',
help="Database operations",
subcommands=DB_COMMANDS,
),
ActionCommand(
name='kerberos',
help="Start a kerberos ticket renewer",
func=lazy_load_command('airflow.cli.commands.kerberos_command.kerberos'),
args=(ARG_PRINCIPAL, ARG_KEYTAB, ARG_PID, ARG_DAEMON, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE),
),
ActionCommand(
name='webserver',
help="Start a Airflow webserver instance",
func=lazy_load_command('airflow.cli.commands.webserver_command.webserver'),
args=(
ARG_PORT,
ARG_WORKERS,
ARG_WORKERCLASS,
ARG_WORKER_TIMEOUT,
ARG_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_ACCESS_LOGFILE,
ARG_ERROR_LOGFILE,
ARG_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
ActionCommand(
name='scheduler',
help="Start a scheduler instance",
func=lazy_load_command('airflow.cli.commands.scheduler_command.scheduler'),
args=(
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_SKIP_SERVE_LOGS,
),
epilog=(
'Signals:\n'
'\n'
' - SIGUSR2: Dump a snapshot of task state being tracked by the executor.\n'
'\n'
' Example:\n'
' pkill -f -USR2 "airflow scheduler"'
),
),
ActionCommand(
name='triggerer',
help="Start a triggerer instance",
func=lazy_load_command('airflow.cli.commands.triggerer_command.triggerer'),
args=(
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_CAPACITY,
),
),
ActionCommand(
name='dag-processor',
help="Start a standalone Dag Processor instance",
func=lazy_load_command('airflow.cli.commands.dag_processor_command.dag_processor'),
args=(
ARG_PID,
ARG_DAEMON,
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
),
),
ActionCommand(
name='version',
help="Show the version",
func=lazy_load_command('airflow.cli.commands.version_command.version'),
args=(),
),
ActionCommand(
name='cheat-sheet',
help="Display cheat sheet",
func=lazy_load_command('airflow.cli.commands.cheat_sheet_command.cheat_sheet'),
args=(ARG_VERBOSE,),
),
GroupCommand(
name='connections',
help="Manage connections",
subcommands=CONNECTIONS_COMMANDS,
),
GroupCommand(
name='providers',
help="Display providers",
subcommands=PROVIDERS_COMMANDS,
),
GroupCommand(
name='users',
help="Manage users",
subcommands=USERS_COMMANDS,
),
GroupCommand(
name='roles',
help='Manage roles',
subcommands=ROLES_COMMANDS,
),
ActionCommand(
name='sync-perm',
help="Update permissions for existing roles and optionally DAGs",
func=lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'),
args=(ARG_INCLUDE_DAGS,),
),
ActionCommand(
name='rotate-fernet-key',
func=lazy_load_command('airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key'),
help='Rotate encrypted connection credentials and variables',
description=(
'Rotate all encrypted connection credentials and variables; see '
'https://airflow.apache.org/docs/apache-airflow/stable/howto/secure-connections.html'
'#rotating-encryption-keys'
),
args=(),
),
GroupCommand(name="config", help='View configuration', subcommands=CONFIG_COMMANDS),
ActionCommand(
name='info',
help='Show information about current Airflow and environment',
func=lazy_load_command('airflow.cli.commands.info_command.show_info'),
args=(
ARG_ANONYMIZE,
ARG_FILE_IO,
ARG_VERBOSE,
ARG_OUTPUT,
),
),
ActionCommand(
name='plugins',
help='Dump information about loaded plugins',
func=lazy_load_command('airflow.cli.commands.plugins_command.dump_plugins'),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
GroupCommand(
name="celery",
help='Celery components',
description=(
'Start celery components. Works only when using CeleryExecutor. For more information, see '
'https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html'
),
subcommands=CELERY_COMMANDS,
),
ActionCommand(
name='standalone',
help='Run an all-in-one copy of Airflow',
func=lazy_load_command('airflow.cli.commands.standalone_command.standalone'),
args=tuple(),
),
]
ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands}
def _remove_dag_id_opt(command: ActionCommand):
cmd = command._asdict()
cmd['args'] = (arg for arg in command.args if arg is not ARG_DAG_ID)
return ActionCommand(**cmd)
dag_cli_commands: list[CLICommand] = [
GroupCommand(
name='dags',
help='Manage DAGs',
subcommands=[
_remove_dag_id_opt(sp)
for sp in DAGS_COMMANDS
if sp.name in ['backfill', 'list-runs', 'pause', 'unpause', 'test']
],
),
GroupCommand(
name='tasks',
help='Manage tasks',
subcommands=[_remove_dag_id_opt(sp) for sp in TASKS_COMMANDS if sp.name in ['list', 'test', 'run']],
),
]
DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}
class AirflowHelpFormatter(argparse.HelpFormatter):
"""
Custom help formatter to display help message.
It displays simple commands and groups of commands in separate sections.
"""
def _format_action(self, action: Action):
if isinstance(action, argparse._SubParsersAction):
parts = []
action_header = self._format_action_invocation(action)
action_header = '%*s%s\n' % (self._current_indent, '', action_header)
parts.append(action_header)
self._indent()
subactions = action._get_subactions()
action_subcommands, group_subcommands = partition(
lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand), subactions
)
parts.append("\n")
parts.append('%*s%s:\n' % (self._current_indent, '', "Groups"))
self._indent()
for subaction in group_subcommands:
parts.append(self._format_action(subaction))
self._dedent()
parts.append("\n")
parts.append('%*s%s:\n' % (self._current_indent, '', "Commands"))
self._indent()
for subaction in action_subcommands:
parts.append(self._format_action(subaction))
self._dedent()
self._dedent()
# return a single string
return self._join_parts(parts)
return super()._format_action(action)
@lru_cache(maxsize=None)
def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
"""Creates and returns command line argument parser"""
parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
subparsers = parser.add_subparsers(dest='subcommand', metavar="GROUP_OR_COMMAND")
subparsers.required = True
command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
subparser_list = command_dict.keys()
sub_name: str
for sub_name in sorted(subparser_list):
sub: CLICommand = command_dict[sub_name]
_add_command(subparsers, sub)
return parser
def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
"""Sort subcommand optional args, keep positional args"""
def get_long_option(arg: Arg):
"""Get long option from Arg.flags"""
return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
yield from positional
yield from sorted(optional, key=lambda x: get_long_option(x).lower())
def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) -> None:
sub_proc = subparsers.add_parser(
sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog
)
sub_proc.formatter_class = RawTextHelpFormatter
if isinstance(sub, GroupCommand):
_add_group_command(sub, sub_proc)
elif isinstance(sub, ActionCommand):
_add_action_command(sub, sub_proc)
else:
raise AirflowException("Invalid command definition.")
def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) -> None:
for arg in _sort_args(sub.args):
arg.add_to_parser(sub_proc)
sub_proc.set_defaults(func=sub.func)
def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser) -> None:
subcommands = sub.subcommands
sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND")
sub_subparsers.required = True
for command in sorted(subcommands, key=lambda x: x.name):
_add_command(sub_subparsers, command)
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦