Skip to content

prefect.deployments.steps.core

Core primitives for running Prefect project steps.

Project steps are YAML representations of Python functions along with their inputs.

Whenever a step is run, the following actions are taken:

  • The step's inputs and block / variable references are resolved (see the projects concepts documentation for more details)
  • The step's function is imported; if it cannot be found, the requires keyword is used to install the necessary packages
  • The step's function is called with the resolved inputs
  • The step's output is returned and used to resolve inputs for subsequent steps

StepExecutionError

Bases: Exception

Raised when a step fails to execute.

Source code in prefect/deployments/steps/core.py
36
37
38
39
class StepExecutionError(Exception):
    """
    Raised when a step fails to execute.
    """

run_step async

Runs a step, returns the step's output.

Steps are assumed to be in the format {"importable.func.name": {"kwarg1": "value1", ...}}.

The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the inputs before passing to the step function:

This keyword is used to specify packages that should be installed before running the step.

Source code in prefect/deployments/steps/core.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def run_step(step: Dict, upstream_outputs: Optional[Dict] = None) -> Dict:
    """
    Runs a step, returns the step's output.

    Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`.

    The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the
    inputs before passing to the step function:

    This keyword is used to specify packages that should be installed before running the step.
    """
    fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
    upstream_outputs = upstream_outputs or {}

    if len(step.keys()) > 1:
        raise ValueError(
            f"Step has unexpected additional keys: {', '.join(list(step.keys())[1:])}"
        )

    keywords = {
        keyword: inputs.pop(keyword)
        for keyword in RESERVED_KEYWORDS
        if keyword in inputs
    }

    inputs = apply_values(inputs, upstream_outputs)
    inputs = await resolve_block_document_references(inputs)
    inputs = await resolve_variables(inputs)
    inputs = apply_values(inputs, os.environ)
    step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
    result = await from_async.call_soon_in_new_thread(
        Call.new(step_func, **inputs)
    ).aresult()
    return result