airflow pool_command 源码
airflow pool_command 代码
文件路径:/airflow/cli/commands/pool_command.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Pools sub-commands"""
from __future__ import annotations
import json
import os
from json import JSONDecodeError
from airflow.api.client import get_current_api_client
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import PoolNotFound
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
def _show_pools(pools, output):
AirflowConsole().print_as(
data=pools,
output=output,
mapper=lambda x: {
"pool": x[0],
"slots": x[1],
"description": x[2],
},
)
@suppress_logs_and_warning
def pool_list(args):
"""Displays info of all the pools"""
api_client = get_current_api_client()
pools = api_client.get_pools()
_show_pools(pools=pools, output=args.output)
@suppress_logs_and_warning
def pool_get(args):
"""Displays pool info by a given name"""
api_client = get_current_api_client()
try:
pools = [api_client.get_pool(name=args.pool)]
_show_pools(pools=pools, output=args.output)
except PoolNotFound:
raise SystemExit(f"Pool {args.pool} does not exist")
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_set(args):
"""Creates new pool with a given name and slots"""
api_client = get_current_api_client()
api_client.create_pool(name=args.pool, slots=args.slots, description=args.description)
print(f"Pool {args.pool} created")
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_delete(args):
"""Deletes pool by a given name"""
api_client = get_current_api_client()
try:
api_client.delete_pool(name=args.pool)
print(f"Pool {args.pool} deleted")
except PoolNotFound:
raise SystemExit(f"Pool {args.pool} does not exist")
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_import(args):
"""Imports pools from the file"""
if not os.path.exists(args.file):
raise SystemExit(f"Missing pools file {args.file}")
pools, failed = pool_import_helper(args.file)
if len(failed) > 0:
raise SystemExit(f"Failed to update pool(s): {', '.join(failed)}")
print(f"Uploaded {len(pools)} pool(s)")
def pool_export(args):
"""Exports all of the pools to the file"""
pools = pool_export_helper(args.file)
print(f"Exported {len(pools)} pools to {args.file}")
def pool_import_helper(filepath):
"""Helps import pools from the json file"""
api_client = get_current_api_client()
with open(filepath) as poolfile:
data = poolfile.read()
try:
pools_json = json.loads(data)
except JSONDecodeError as e:
raise SystemExit(f"Invalid json file: {e}")
pools = []
failed = []
for k, v in pools_json.items():
if isinstance(v, dict) and len(v) == 2:
pools.append(api_client.create_pool(name=k, slots=v["slots"], description=v["description"]))
else:
failed.append(k)
return pools, failed
def pool_export_helper(filepath):
"""Helps export all of the pools to the json file"""
api_client = get_current_api_client()
pool_dict = {}
pools = api_client.get_pools()
for pool in pools:
pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2]}
with open(filepath, 'w') as poolfile:
poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
return pools
相关信息
相关文章
airflow cheat_sheet_command 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦