airflow info_command 源码
airflow info_command 代码
文件路径:/airflow/cli/commands/info_command.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Config sub-commands"""
from __future__ import annotations
import locale
import logging
import os
import platform
import subprocess
import sys
from urllib.parse import urlsplit, urlunsplit
import httpx
import tenacity
from airflow import configuration
from airflow.cli.simple_table import AirflowConsole
from airflow.providers_manager import ProvidersManager
from airflow.typing_compat import Protocol
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.platform import getuser
from airflow.version import version as airflow_version
log = logging.getLogger(__name__)
class Anonymizer(Protocol):
"""Anonymizer protocol."""
def process_path(self, value) -> str:
"""Remove pii from paths"""
def process_username(self, value) -> str:
"""Remove pii from username"""
def process_url(self, value) -> str:
"""Remove pii from URL"""
class NullAnonymizer(Anonymizer):
"""Do nothing."""
def _identity(self, value) -> str:
return value
process_path = process_username = process_url = _identity
del _identity
class PiiAnonymizer(Anonymizer):
"""Remove personally identifiable info from path."""
def __init__(self):
home_path = os.path.expanduser("~")
username = getuser()
self._path_replacements = {home_path: "${HOME}", username: "${USER}"}
def process_path(self, value) -> str:
if not value:
return value
for src, target in self._path_replacements.items():
value = value.replace(src, target)
return value
def process_username(self, value) -> str:
if not value:
return value
return value[0] + "..." + value[-1]
def process_url(self, value) -> str:
if not value:
return value
url_parts = urlsplit(value)
netloc = None
if url_parts.netloc:
# unpack
userinfo = None
username = None
password = None
if "@" in url_parts.netloc:
userinfo, _, host = url_parts.netloc.partition("@")
else:
host = url_parts.netloc
if userinfo:
if ":" in userinfo:
username, _, password = userinfo.partition(":")
else:
username = userinfo
# anonymize
username = self.process_username(username) if username else None
password = "PASSWORD" if password else None
# pack
if username and password and host:
netloc = username + ":" + password + "@" + host
elif username and host:
netloc = username + "@" + host
elif password and host:
netloc = ":" + password + "@" + host
elif host:
netloc = host
else:
netloc = ""
return urlunsplit((url_parts.scheme, netloc, url_parts.path, url_parts.query, url_parts.fragment))
class OperatingSystem:
"""Operating system"""
WINDOWS = "Windows"
LINUX = "Linux"
MACOSX = "Mac OS"
CYGWIN = "Cygwin"
@staticmethod
def get_current() -> str | None:
"""Get current operating system"""
if os.name == "nt":
return OperatingSystem.WINDOWS
elif "linux" in sys.platform:
return OperatingSystem.LINUX
elif "darwin" in sys.platform:
return OperatingSystem.MACOSX
elif "cygwin" in sys.platform:
return OperatingSystem.CYGWIN
return None
class Architecture:
"""Compute architecture"""
X86_64 = "x86_64"
X86 = "x86"
PPC = "ppc"
ARM = "arm"
@staticmethod
def get_current():
"""Get architecture"""
return _MACHINE_TO_ARCHITECTURE.get(platform.machine().lower())
_MACHINE_TO_ARCHITECTURE = {
"amd64": Architecture.X86_64,
"x86_64": Architecture.X86_64,
"i686-64": Architecture.X86_64,
"i386": Architecture.X86,
"i686": Architecture.X86,
"x86": Architecture.X86,
"ia64": Architecture.X86, # Itanium is different x64 arch, treat it as the common x86.
"powerpc": Architecture.PPC,
"power macintosh": Architecture.PPC,
"ppc64": Architecture.PPC,
"armv6": Architecture.ARM,
"armv6l": Architecture.ARM,
"arm64": Architecture.ARM,
"armv7": Architecture.ARM,
"armv7l": Architecture.ARM,
}
class AirflowInfo:
"""Renders information about Airflow instance"""
def __init__(self, anonymizer):
self.anonymizer = anonymizer
@staticmethod
def _get_version(cmd: list[str], grep: bytes | None = None):
"""Return tools version."""
try:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc:
stdoutdata, _ = proc.communicate()
data = [f for f in stdoutdata.split(b"\n") if f]
if grep:
data = [line for line in data if grep in line]
if len(data) != 1:
return "NOT AVAILABLE"
else:
return data[0].decode()
except OSError:
return "NOT AVAILABLE"
@staticmethod
def _task_logging_handler():
"""Returns task logging handler."""
def get_fullname(o):
module = o.__class__.__module__
if module is None or module == str.__class__.__module__:
return o.__class__.__name__ # Avoid reporting __builtin__
else:
return module + '.' + o.__class__.__name__
try:
handler_names = [get_fullname(handler) for handler in logging.getLogger('airflow.task').handlers]
return ", ".join(handler_names)
except Exception:
return "NOT AVAILABLE"
@property
def _airflow_info(self):
executor = configuration.conf.get("core", "executor")
sql_alchemy_conn = self.anonymizer.process_url(
configuration.conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE")
)
dags_folder = self.anonymizer.process_path(
configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE")
)
plugins_folder = self.anonymizer.process_path(
configuration.conf.get("core", "plugins_folder", fallback="NOT AVAILABLE")
)
base_log_folder = self.anonymizer.process_path(
configuration.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE")
)
remote_base_log_folder = self.anonymizer.process_path(
configuration.conf.get("logging", "remote_base_log_folder", fallback="NOT AVAILABLE")
)
return [
("version", airflow_version),
("executor", executor),
("task_logging_handler", self._task_logging_handler()),
("sql_alchemy_conn", sql_alchemy_conn),
("dags_folder", dags_folder),
("plugins_folder", plugins_folder),
("base_log_folder", base_log_folder),
("remote_base_log_folder", remote_base_log_folder),
]
@property
def _system_info(self):
operating_system = OperatingSystem.get_current()
arch = Architecture.get_current()
uname = platform.uname()
_locale = locale.getdefaultlocale()
python_location = self.anonymizer.process_path(sys.executable)
python_version = sys.version.replace("\n", " ")
return [
("OS", operating_system or "NOT AVAILABLE"),
("architecture", arch or "NOT AVAILABLE"),
("uname", str(uname)),
("locale", str(_locale)),
("python_version", python_version),
("python_location", python_location),
]
@property
def _tools_info(self):
git_version = self._get_version(["git", "--version"])
ssh_version = self._get_version(["ssh", "-V"])
kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"])
gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK")
cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"])
mysql_version = self._get_version(["mysql", "--version"])
sqlite3_version = self._get_version(["sqlite3", "--version"])
psql_version = self._get_version(["psql", "--version"])
return [
("git", git_version),
("ssh", ssh_version),
("kubectl", kubectl_version),
("gcloud", gcloud_version),
("cloud_sql_proxy", cloud_sql_proxy_version),
("mysql", mysql_version),
("sqlite3", sqlite3_version),
("psql", psql_version),
]
@property
def _paths_info(self):
system_path = os.environ.get("PATH", "").split(os.pathsep)
airflow_home = self.anonymizer.process_path(configuration.get_airflow_home())
system_path = [self.anonymizer.process_path(p) for p in system_path]
python_path = [self.anonymizer.process_path(p) for p in sys.path]
airflow_on_path = any(os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path)
return [
("airflow_home", airflow_home),
("system_path", os.pathsep.join(system_path)),
("python_path", os.pathsep.join(python_path)),
("airflow_on_path", str(airflow_on_path)),
]
@property
def _providers_info(self):
return [(p.data['package-name'], p.version) for p in ProvidersManager().providers.values()]
def show(self, output: str, console: AirflowConsole | None = None) -> None:
"""Shows information about Airflow instance"""
all_info = {
"Apache Airflow": self._airflow_info,
"System info": self._system_info,
"Tools info": self._tools_info,
"Paths info": self._paths_info,
"Providers info": self._providers_info,
}
console = console or AirflowConsole(show_header=False)
if output in ("table", "plain"):
# Show each info as table with key, value column
for key, info in all_info.items():
console.print(f"\n[bold][green]{key}[/bold][/green]", highlight=False)
console.print_as(data=[{"key": k, "value": v} for k, v in info], output=output)
else:
# Render info in given format, change keys to snake_case
console.print_as(
data=[{k.lower().replace(" ", "_"): dict(v)} for k, v in all_info.items()], output=output
)
def render_text(self, output: str) -> str:
"""Exports the info to string"""
console = AirflowConsole(record=True)
with console.capture():
self.show(output=output, console=console)
return console.export_text()
class FileIoException(Exception):
"""Raises when error happens in FileIo.io integration"""
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, max=10),
retry=tenacity.retry_if_exception_type(FileIoException),
before=tenacity.before_log(log, logging.DEBUG),
after=tenacity.after_log(log, logging.DEBUG),
)
def _upload_text_to_fileio(content):
"""Upload text file to File.io service and return lnk"""
resp = httpx.post("https://file.io", content=content)
if resp.status_code not in [200, 201]:
print(resp.json())
raise FileIoException("Failed to send report to file.io service.")
try:
return resp.json()["link"]
except ValueError as e:
log.debug(e)
raise FileIoException("Failed to send report to file.io service.")
def _send_report_to_fileio(info):
print("Uploading report to file.io service.")
try:
link = _upload_text_to_fileio(str(info))
print("Report uploaded.")
print(link)
print()
except FileIoException as ex:
print(str(ex))
@suppress_logs_and_warning
def show_info(args):
"""Show information related to Airflow, system and other."""
# Enforce anonymization, when file_io upload is tuned on.
anonymizer = PiiAnonymizer() if args.anonymize or args.file_io else NullAnonymizer()
info = AirflowInfo(anonymizer)
if args.file_io:
_send_report_to_fileio(info.render_text(args.output))
else:
info.show(args.output)
相关信息
相关文章
airflow cheat_sheet_command 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦