prefect.cli.shell

Provides a set of tools for executing shell commands as Prefect flows. Includes functionalities for running shell commands ad-hoc or serving them as Prefect flows, with options for logging output, scheduling, and deployment customization.

Functions

output_stream

output_stream(pipe: IO[str], logger_function: Callable[[str], None]) -> None
Read from a pipe line by line and log using the provided logging function. Args:
  • pipe: A file-like object for reading process output.
  • logger_function: A logging function from the logger.

output_collect

output_collect(pipe: IO[str], container: list[str]) -> None
Collects output from a subprocess pipe and stores it in a container list. Args:
  • pipe: The output pipe of the subprocess, either stdout or stderr.
  • container: A list to store the collected output lines.

run_shell_process

run_shell_process(command: str, log_output: bool = True, stream_stdout: bool = False, log_stderr: bool = False, popen_kwargs: Optional[Dict[str, Any]] = None)
Asynchronously executes the specified shell command and logs its output. This function is designed to be used within Prefect flows to run shell commands as part of task execution. It handles both the execution of the command and the collection of its output for logging purposes. Args:
  • command: The shell command to execute.
  • log_output: If True, the output of the command (both stdout and stderr) is logged to Prefect.
  • stream_stdout: If True, the stdout of the command is streamed to Prefect logs.
  • log_stderr: If True, the stderr of the command is logged to Prefect logs.
  • popen_kwargs: Additional keyword arguments to pass to the subprocess.Popen call.

watch

watch(command: str, log_output: bool = typer.Option(True, help='Log the output of the command to Prefect logs.'), flow_run_name: str = typer.Option(None, help='Name of the flow run.'), flow_name: str = typer.Option('Shell Command', help='Name of the flow.'), stream_stdout: bool = typer.Option(True, help='Stream the output of the command.'), tag: Annotated[Optional[List[str]], typer.Option(help='Optional tags for the flow run.')] = None)
Executes a shell command and observes it as Prefect flow. Args:
  • command: The shell command to be executed.
  • log_output: If True, logs the command’s output. Defaults to True.
  • flow_run_name: An optional name for the flow run.
  • flow_name: An optional name for the flow. Useful for identification in the Prefect UI.
  • tag: An optional list of tags for categorizing and filtering flows in the Prefect UI.

serve

serve(command: str, flow_name: str = typer.Option(..., help='Name of the flow'), deployment_name: str = typer.Option('CLI Runner Deployment', help='Name of the deployment'), deployment_tags: List[str] = typer.Option(None, '--tag', help='Tag for the deployment (can be provided multiple times)'), log_output: bool = typer.Option(True, help='Stream the output of the command', hidden=True), stream_stdout: bool = typer.Option(True, help='Stream the output of the command'), cron_schedule: str = typer.Option(None, help='Cron schedule for the flow'), timezone: str = typer.Option(None, help='Timezone for the schedule'), concurrency_limit: int = typer.Option(None, min=1, help='The maximum number of flow runs that can execute at the same time'), run_once: bool = typer.Option(False, help='Run the agent loop once, instead of forever.'))
Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc. This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs, setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks. Args:
  • command: The shell command the flow will execute.
  • name: The name assigned to the flow. This is required..
  • deployment_tags: Optional tags for the deployment to facilitate filtering and organization.
  • log_output: If True, streams the output of the shell command to the Prefect logs. Defaults to True.
  • cron_schedule: A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually.
  • timezone: The timezone for the cron schedule. This is important if the schedule should align with local time.
  • concurrency_limit: The maximum number of instances of the flow that can run simultaneously.
  • deployment_name: The name of the deployment. This helps distinguish deployments within the Prefect platform.
  • run_once: When True, the flow will only run once upon deployment initiation, rather than continuously.