Skip to content

prefect.projects.steps.core

Core primitives for running Prefect project steps.

Project steps are YAML representations of Python functions along with their inputs.

Whenever a step is run, the following actions are taken:

  • The step's inputs and block / variable references are resolved (see the projects concepts documentation for more details)
  • The step's function is imported; if it cannot be found, the requires keyword is used to install the necessary packages
  • The step's function is called with the resolved inputs
  • The step's output is returned and used to resolve inputs for subsequent steps

run_step async

Runs a step, returns the step's output.

Steps are assumed to be in the format {"importable.func.name": {"kwarg1": "value1", ...}}.

The 'requires' keyword is reserved for specific purposes and will be removed from the inputs before passing to the step function:

This keyword is used to specify packages that should be installed before running the step.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/steps/core.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def run_step(step: dict) -> dict:
    """
    Runs a step, returns the step's output.

    Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`.

    The 'requires' keyword is reserved for specific purposes and will be removed from the
    inputs before passing to the step function:

    This keyword is used to specify packages that should be installed before running the step.
    """
    fqn, inputs = step.popitem()

    if step:
        raise ValueError(
            f"Step has unexpected additional keys: {', '.join(step.keys())}"
        )

    keywords = {
        keyword: inputs.pop(keyword)
        for keyword in RESERVED_KEYWORDS
        if keyword in inputs
    }

    inputs = await resolve_block_document_references(inputs)
    inputs = await resolve_variables(inputs)

    step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
    return await from_async.call_soon_in_new_thread(
        Call.new(step_func, **inputs)
    ).aresult()