prefect.utilities.processutils
¶
open_process
¶
Like anyio.open_process
but with:
- Support for Windows command joining
- Termination of the process on exception during yield
- Forced cleanup of process resources during cancellation
Source code in prefect/utilities/processutils.py
@asynccontextmanager
async def open_process(command: List[str], **kwargs):
"""
Like `anyio.open_process` but with:
- Support for Windows command joining
- Termination of the process on exception during yield
- Forced cleanup of process resources during cancellation
"""
# Passing a string to open_process is equivalent to shell=True which is
# generally necessary for Unix-like commands on Windows but otherwise should
# be avoided
if sys.platform == "win32":
command = " ".join(command)
process = await anyio.open_process(command, **kwargs)
try:
async with process:
yield process
finally:
try:
process.terminate()
except ProcessLookupError:
# Occurs if the process is already terminated
pass
# Ensure the process resource is closed. If not shielded from cancellation,
# this resource an be left open and the subprocess output can be appear after
# the parent process has exited.
with anyio.CancelScope(shield=True):
await process.aclose()
run_process
async
¶
Like anyio.run_process
but with:
- Use of our
open_process
utility to ensure resources are cleaned up - Simple
stream_output
support to connect the subprocess to the parent stdout/err - Support for submission with
TaskGroup.start
marking as 'started' after the process has been created. When used, the PID is returned to the task status.
Source code in prefect/utilities/processutils.py
async def run_process(
command: List[str],
stream_output: Union[bool, Tuple[Optional[TextSink], Optional[TextSink]]] = False,
task_status: Optional[anyio.abc.TaskStatus] = None,
**kwargs,
):
"""
Like `anyio.run_process` but with:
- Use of our `open_process` utility to ensure resources are cleaned up
- Simple `stream_output` support to connect the subprocess to the parent stdout/err
- Support for submission with `TaskGroup.start` marking as 'started' after the
process has been created. When used, the PID is returned to the task status.
"""
if stream_output is True:
stream_output = (sys.stdout, sys.stderr)
async with open_process(
command,
stdout=subprocess.PIPE if stream_output else subprocess.DEVNULL,
stderr=subprocess.PIPE if stream_output else subprocess.DEVNULL,
**kwargs,
) as process:
if task_status is not None:
task_status.started(process.pid)
if stream_output:
await consume_process_output(
process, stdout_sink=stream_output[0], stderr_sink=stream_output[1]
)
await process.wait()
return process