airflow user_command 源码
airflow user_command 代码
文件路径:/airflow/cli/commands/user_command.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""User sub-commands"""
from __future__ import annotations
import functools
import getpass
import json
import os
import random
import re
import string
from typing import Any
from marshmallow import Schema, fields, validate
from marshmallow.exceptions import ValidationError
from airflow.cli.simple_table import AirflowConsole
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.www.app import cached_app
class UserSchema(Schema):
"""user collection item schema"""
id = fields.Int()
firstname = fields.Str(required=True)
lastname = fields.Str(required=True)
username = fields.Str(required=True)
email = fields.Email(required=True)
roles = fields.List(fields.Str, required=True, validate=validate.Length(min=1))
@suppress_logs_and_warning
def users_list(args):
"""Lists users at the command line"""
appbuilder = cached_app().appbuilder
users = appbuilder.sm.get_all_users()
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'roles']
AirflowConsole().print_as(
data=users, output=args.output, mapper=lambda x: {f: x.__getattribute__(f) for f in fields}
)
@cli_utils.action_cli(check_db=True)
def users_create(args):
"""Creates new user in the DB"""
appbuilder = cached_app().appbuilder
role = appbuilder.sm.find_role(args.role)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
raise SystemExit(f'{args.role} is not a valid role. Valid roles are: {valid_roles}')
if args.use_random_password:
password = ''.join(random.choice(string.printable) for _ in range(16))
elif args.password:
password = args.password
else:
password = getpass.getpass('Password:')
password_confirmation = getpass.getpass('Repeat for confirmation:')
if password != password_confirmation:
raise SystemExit('Passwords did not match')
if appbuilder.sm.find_user(args.username):
print(f'{args.username} already exist in the db')
return
user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname, args.email, role, password)
if user:
print(f'User "{args.username}" created with role "{args.role}"')
else:
raise SystemExit('Failed to create user')
def _find_user(args):
if not args.username and not args.email:
raise SystemExit('Missing args: must supply one of --username or --email')
if args.username and args.email:
raise SystemExit('Conflicting args: must supply either --username or --email, but not both')
appbuilder = cached_app().appbuilder
user = appbuilder.sm.find_user(username=args.username, email=args.email)
if not user:
raise SystemExit(f'User "{args.username or args.email}" does not exist')
return user
@cli_utils.action_cli
def users_delete(args):
"""Deletes user from DB"""
user = _find_user(args)
appbuilder = cached_app().appbuilder
if appbuilder.sm.del_register_user(user):
print(f'User "{user.username}" deleted')
else:
raise SystemExit('Failed to delete user')
@cli_utils.action_cli
def users_manage_role(args, remove=False):
"""Deletes or appends user roles"""
user = _find_user(args)
appbuilder = cached_app().appbuilder
role = appbuilder.sm.find_role(args.role)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
raise SystemExit(f'"{args.role}" is not a valid role. Valid roles are: {valid_roles}')
if remove:
if role not in user.roles:
raise SystemExit(f'User "{user.username}" is not a member of role "{args.role}"')
user.roles = [r for r in user.roles if r != role]
appbuilder.sm.update_user(user)
print(f'User "{user.username}" removed from role "{args.role}"')
else:
if role in user.roles:
raise SystemExit(f'User "{user.username}" is already a member of role "{args.role}"')
user.roles.append(role)
appbuilder.sm.update_user(user)
print(f'User "{user.username}" added to role "{args.role}"')
def users_export(args):
"""Exports all users to the json file"""
appbuilder = cached_app().appbuilder
users = appbuilder.sm.get_all_users()
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'roles']
# In the User model the first and last name fields have underscores,
# but the corresponding parameters in the CLI don't
def remove_underscores(s):
return re.sub("_", "", s)
users = [
{
remove_underscores(field): user.__getattribute__(field)
if field != 'roles'
else [r.name for r in user.roles]
for field in fields
}
for user in users
]
with open(args.export, 'w') as file:
file.write(json.dumps(users, sort_keys=True, indent=4))
print(f"{len(users)} users successfully exported to {file.name}")
@cli_utils.action_cli
def users_import(args):
"""Imports users from the json file"""
json_file = getattr(args, 'import')
if not os.path.exists(json_file):
raise SystemExit(f"File '{json_file}' does not exist")
users_list = None
try:
with open(json_file) as file:
users_list = json.loads(file.read())
except ValueError as e:
raise SystemExit(f"File '{json_file}' is not valid JSON. Error: {e}")
users_created, users_updated = _import_users(users_list)
if users_created:
print("Created the following users:\n\t{}".format("\n\t".join(users_created)))
if users_updated:
print("Updated the following users:\n\t{}".format("\n\t".join(users_updated)))
def _import_users(users_list: list[dict[str, Any]]):
appbuilder = cached_app().appbuilder
users_created = []
users_updated = []
try:
UserSchema(many=True).load(users_list)
except ValidationError as e:
msg = []
for row_num, failure in e.normalized_messages().items():
msg.append(f'[Item {row_num}]')
for key, value in failure.items():
msg.append(f'\t{key}: {value}')
raise SystemExit("Error: Input file didn't pass validation. See below:\n{}".format('\n'.join(msg)))
for user in users_list:
roles = []
for rolename in user['roles']:
role = appbuilder.sm.find_role(rolename)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
raise SystemExit(f'Error: "{rolename}" is not a valid role. Valid roles are: {valid_roles}')
roles.append(role)
existing_user = appbuilder.sm.find_user(email=user['email'])
if existing_user:
print(f"Found existing user with email '{user['email']}'")
if existing_user.username != user['username']:
raise SystemExit(
f"Error: Changing the username is not allowed - please delete and recreate the user with"
f" email {user['email']!r}"
)
existing_user.roles = roles
existing_user.first_name = user['firstname']
existing_user.last_name = user['lastname']
appbuilder.sm.update_user(existing_user)
users_updated.append(user['email'])
else:
print(f"Creating new user with email '{user['email']}'")
appbuilder.sm.add_user(
username=user['username'],
first_name=user['firstname'],
last_name=user['lastname'],
email=user['email'],
role=roles,
)
users_created.append(user['email'])
return users_created, users_updated
add_role = functools.partial(users_manage_role, remove=False)
remove_role = functools.partial(users_manage_role, remove=True)
相关信息
相关文章
airflow cheat_sheet_command 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦