Skip to main content

prefect_shell.commands

Tasks for interacting with shell commands

Functions

shell_run_command

shell_run_command(command: str, env: dict[str, str] | None = None, helper_command: str | None = None, shell: str | None = None, extension: str | None = None, return_all: bool = False, stream_level: int = logging.INFO, cwd: str | bytes | os.PathLike[str] | None = None) -> list[str] | str
Runs arbitrary shell commands. Args:
  • command: Shell command to be executed; can also be provided post-initialization by calling this task instance.
  • env: Dictionary of environment variables to use for the subprocess; can also be provided at runtime.
  • helper_command: String representing a shell command, which will be executed prior to the command in the same process. Can be used to change directories, define helper functions, etc. for different commands in a flow.
  • shell: Shell to run the command with.
  • extension: File extension to be appended to the command to be executed.
  • return_all: Whether this task should return all lines of stdout as a list, or just the last line as a string.
  • stream_level: The logging level of the stream; defaults to 20 equivalent to logging.INFO.
  • cwd: The working directory context the command will be executed within
Returns:
  • If return all, returns all lines as a list; else the last line as a string.

Classes

ShellProcess

A class representing a shell process. Supports both async (anyio.abc.Process) and sync (subprocess.Popen) processes. Methods:

afetch_result

afetch_result(self) -> list[str]
Retrieve the output of the shell operation (async version). Returns:
  • The lines output from the shell operation as a list.

await_for_completion

await_for_completion(self) -> None
Wait for the shell command to complete after a process is triggered (async version).

fetch_result

fetch_result(self) -> list[str]
Retrieve the output of the shell operation (sync version). Returns:
  • The lines output from the shell operation as a list.

pid

pid(self) -> int
The PID of the process. Returns:
  • The PID of the process.

return_code

return_code(self) -> int | None
The return code of the process. Returns:
  • The return code of the process, or None if the process is still running.

wait_for_completion

wait_for_completion(self) -> None
Wait for the shell command to complete after a process is triggered (sync version).

ShellOperation

A block representing a shell operation, containing multiple commands. For long-lasting operations, use the trigger method and utilize the block as a context manager for automatic closure of processes when context is exited. If not, manually call the close method to close processes. For short-lasting operations, use the run method. Context is automatically managed with this method. Attributes:
  • commands: A list of commands to execute sequentially.
  • stream_output: Whether to stream output.
  • env: A dictionary of environment variables to set for the shell operation.
  • working_dir: The working directory context the commands will be executed within.
  • shell: The shell to use to execute the commands.
  • extension: The extension to use for the temporary file. if unset defaults to .ps1 on Windows and .sh on other platforms.
Examples: Load a configured block:
from prefect_shell import ShellOperation

shell_operation = ShellOperation.load("BLOCK_NAME")
Methods:

aclose

aclose(self)
Close the job block (async version).

arun

arun(self, **open_kwargs: dict[str, Any]) -> list[str]
Runs a shell command (async version), but unlike the trigger method, additionally waits and fetches the result directly, automatically managing the context. This method is ideal for short-lasting shell commands; for long-lasting shell commands, it is recommended to use the trigger method instead. Args:
  • **open_kwargs: Additional keyword arguments to pass to open_process.
Returns:
  • The lines output from the shell command as a list.
Examples: Sleep for 5 seconds and then print “Hello, world!”:
from prefect_shell import ShellOperation

shell_output = await ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"]
).arun()

atrigger

atrigger(self, **open_kwargs: dict[str, Any]) -> ShellProcess
Triggers a shell command and returns the shell command run object to track the execution of the run (async version). This method is ideal for long-lasting shell commands; for short-lasting shell commands, it is recommended to use the run method instead. Args:
  • **open_kwargs: Additional keyword arguments to pass to open_process.
Returns:
  • A ShellProcess object.
Examples: Sleep for 5 seconds and then print “Hello, world!”:
from prefect_shell import ShellOperation

async with ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"],
) as shell_operation:
    shell_process = await shell_operation.atrigger()
    await shell_process.await_for_completion()
    shell_output = await shell_process.afetch_result()

close

close(self)
Close the job block (sync version).

run

run(self, **open_kwargs: dict[str, Any]) -> list[str]
Runs a shell command (sync version), but unlike the trigger method, additionally waits and fetches the result directly, automatically managing the context. This method is ideal for short-lasting shell commands; for long-lasting shell commands, it is recommended to use the trigger method instead. Args:
  • **open_kwargs: Additional keyword arguments to pass to subprocess.Popen.
Returns:
  • The lines output from the shell command as a list.
Examples: Sleep for 5 seconds and then print “Hello, world!”:
from prefect_shell import ShellOperation

shell_output = ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"]
).run()

trigger

trigger(self, **open_kwargs: dict[str, Any]) -> ShellProcess
Triggers a shell command and returns the shell command run object to track the execution of the run (sync version). This method is ideal for long-lasting shell commands; for short-lasting shell commands, it is recommended to use the run method instead. Args:
  • **open_kwargs: Additional keyword arguments to pass to subprocess.Popen.
Returns:
  • A ShellProcess object.
Examples: Sleep for 5 seconds and then print “Hello, world!”:
from prefect_shell import ShellOperation

with ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"],
) as shell_operation:
    shell_process = shell_operation.trigger()
    shell_process.wait_for_completion()
    shell_output = shell_process.fetch_result()