airflow subprocess 源码
airflow subprocess 代码
文件路径:/airflow/hooks/subprocess.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import os
import signal
from collections import namedtuple
from subprocess import PIPE, STDOUT, Popen
from tempfile import TemporaryDirectory, gettempdir
from airflow.hooks.base import BaseHook
SubprocessResult = namedtuple('SubprocessResult', ['exit_code', 'output'])
class SubprocessHook(BaseHook):
"""Hook for running processes with the ``subprocess`` module"""
def __init__(self) -> None:
self.sub_process: Popen[bytes] | None = None
super().__init__()
def run_command(
self,
command: list[str],
env: dict[str, str] | None = None,
output_encoding: str = 'utf-8',
cwd: str | None = None,
) -> SubprocessResult:
"""
Execute the command.
If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards.
If ``env`` is not supplied, ``os.environ`` is passed
:param command: the command to run
:param env: Optional dict containing environment variables to be made available to the shell
environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used.
Note, that in case you have Sentry configured, original variables from the environment
will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See
:doc:`/logging-monitoring/errors` for details.
:param output_encoding: encoding to use for decoding stdout
:param cwd: Working directory to run the command in.
If None (default), the command is run in a temporary directory.
:return: :class:`namedtuple` containing ``exit_code`` and ``output``, the last line from stderr
or stdout
"""
self.log.info('Tmp dir root location: \n %s', gettempdir())
with contextlib.ExitStack() as stack:
if cwd is None:
cwd = stack.enter_context(TemporaryDirectory(prefix='airflowtmp'))
def pre_exec():
# Restore default signal disposition and invoke setsid
for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'):
if hasattr(signal, sig):
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()
self.log.info('Running command: %s', command)
self.sub_process = Popen(
command,
stdout=PIPE,
stderr=STDOUT,
cwd=cwd,
env=env if env or env == {} else os.environ,
preexec_fn=pre_exec,
)
self.log.info('Output:')
line = ''
if self.sub_process is None:
raise RuntimeError("The subprocess should be created here and is None!")
if self.sub_process.stdout is not None:
for raw_line in iter(self.sub_process.stdout.readline, b''):
line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip()
self.log.info("%s", line)
self.sub_process.wait()
self.log.info('Command exited with return code %s', self.sub_process.returncode)
return_code: int = self.sub_process.returncode
return SubprocessResult(exit_code=return_code, output=line)
def send_sigterm(self):
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
self.log.info('Sending SIGTERM signal to process group')
if self.sub_process and hasattr(self.sub_process, 'pid'):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦