prefect.context

Async and thread safe models for passing runtime context data.

These contexts should never be directly mutated by the user.

For more user-accessible information about the current run, see prefect.runtime.

Functions

serialize_context

serialize_context(asset_ctx_kwargs: Union[dict[str, Any], None] = None) -> dict[str, Any]

Serialize the current context for use in a remote execution environment.

Optionally provide asset_ctx_kwargs to create new AssetContext, that will be used in the remote execution environment. This is useful for TaskRunners, who rely on creating the task run in the remote environment.

hydrated_context

hydrated_context(serialized_context: Optional[dict[str, Any]] = None, client: Union[PrefectClient, SyncPrefectClient, None] = None) -> Generator[None, Any, None]

get_run_context

get_run_context() -> Union[FlowRunContext, TaskRunContext]

Get the current run context from within a task or flow function.

Returns:

  • A FlowRunContext or TaskRunContext depending on the function type.

Raises:

  • RuntimeError: If called outside of a flow or task run.

get_settings_context

get_settings_context() -> SettingsContext

Get the current settings context which contains profile information and the settings that are being used.

Generally, the settings that are being used are a combination of values from the profile and environment. See prefect.context.use_profile for more details.

tags

tags(*new_tags: str) -> Generator[set[str], None, None]

Context manager to add tags to flow and task run calls.

Tags are always combined with any existing tags.

Examples:

from prefect import tags, task, flow @task def my_task(): pass Run a task with tags @flow def my_flow(): with tags(“a”, “b”): my_task() # has tags: a, b Run a flow with tags @flow def my_flow(): pass with tags(“a”, “b”): my_flow() # has tags: a, b Run a task with nested tag contexts @flow def my_flow(): with tags(“a”, “b”): with tags(“c”, “d”): my_task() # has tags: a, b, c, d my_task() # has tags: a, b Inspect the current tags @flow def my_flow(): with tags(“c”, “d”): with tags(“e”, “f”) as current_tags: print(current_tags) with tags(“a”, “b”): my_flow()

use_profile

use_profile(profile: Union[Profile, str], override_environment_variables: bool = False, include_current_context: bool = True) -> Generator[SettingsContext, Any, None]

Switch to a profile for the duration of this context.

Profile contexts are confined to an async context in a single thread.

Args:

  • profile: The name of the profile to load or an instance of a Profile.
  • override_environment_variable: If set, variables in the profile will take precedence over current environment variables. By default, environment variables will override profile settings.
  • include_current_context: If set, the new settings will be constructed with the current settings context as a base. If not set, the use_base settings will be loaded from the environment and defaults.

root_settings_context

root_settings_context() -> SettingsContext

Return the settings context that will exist as the root context for the module.

The profile to use is determined with the following precedence

  • Command line via ‘prefect —profile <name>’
  • Environment variable via ‘PREFECT_PROFILE’
  • Profiles file via the ‘active’ key

Classes

ContextModel

A base model for context data that forbids mutation and extra data while providing a context manager

Methods:

get

get(cls: type[Self]) -> Optional[Self]

Get the current context instance

model_copy

model_copy(self: Self) -> Self

Duplicate the context model, optionally choosing which fields to include, exclude, or change.

Returns:

  • A new model instance.

serialize

serialize(self, include_secrets: bool = True) -> dict[str, Any]

Serialize the context model to a dictionary that can be pickled with cloudpickle.

SyncClientContext

A context for managing the sync Prefect client instances.

Clients were formerly tracked on the TaskRunContext and FlowRunContext, but having two separate places and the addition of both sync and async clients made it difficult to manage. This context is intended to be the single source for sync clients.

The client creates a sync client, which can either be read directly from the context object OR loaded with get_client, inject_client, or other Prefect utilities.

with SyncClientContext.get_or_create() as ctx: c1 = get_client(sync_client=True) c2 = get_client(sync_client=True) assert c1 is c2 assert c1 is ctx.client

Methods:

get_or_create

get_or_create(cls) -> Generator[Self, None, None]

AsyncClientContext

A context for managing the async Prefect client instances.

Clients were formerly tracked on the TaskRunContext and FlowRunContext, but having two separate places and the addition of both sync and async clients made it difficult to manage. This context is intended to be the single source for async clients.

The client creates an async client, which can either be read directly from the context object OR loaded with get_client, inject_client, or other Prefect utilities.

with AsyncClientContext.get_or_create() as ctx: c1 = get_client(sync_client=False) c2 = get_client(sync_client=False) assert c1 is c2 assert c1 is ctx.client

RunContext

The base context for a flow or task run. Data in this context will always be available when get_run_context is called.

Methods:

serialize

serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]

EngineContext

The context for a flow run. Data in this context is only available from within a flow run function.

Methods:

serialize

serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]

TaskRunContext

The context for a task run. Data in this context is only available from within a task run function.

Methods:

serialize

serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]

AssetContext

The asset context for a materializing task run. Contains all asset-related information needed for asset event emission and downstream asset dependency propagation.

Methods:

from_task_and_inputs

from_task_and_inputs(cls, task: 'Task[Any, Any]', task_run_id: UUID, task_inputs: Optional[dict[str, set[Any]]] = None, copy_to_child_ctx: bool = False) -> 'AssetContext'

Create an AssetContext from a task and its resolved inputs.

Args:

  • task: The task instance
  • task_run_id: The task run ID
  • task_inputs: The resolved task inputs (TaskRunResult objects)
  • copy_to_child_ctx: Whether this context should be copied on a child AssetContext

Returns:

  • Configured AssetContext

add_asset_metadata

add_asset_metadata(self, asset_key: str, metadata: dict[str, Any]) -> None

Add metadata for a materialized asset.

Args:

  • asset_key: The asset key
  • metadata: Metadata dictionary to add

Raises:

  • ValueError: If asset_key is not in downstream_assets

asset_as_resource

asset_as_resource(asset: Asset) -> dict[str, str]

Convert Asset to event resource format.

asset_as_related(asset: Asset) -> dict[str, str]

Convert Asset to event related format.

related_materialized_by(by: str) -> dict[str, str]

Create a related resource for the tool that performed the materialization

emit_events

emit_events(self, state: State) -> None

Emit asset events

update_tracked_assets

update_tracked_assets(self) -> None

Update the flow run context with assets that should be propagated downstream.

serialize

serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]

Serialize the AssetContext for distributed execution.

TagsContext

The context for prefect.tags management.

Methods:

get

get(cls) -> Self

SettingsContext

The context for a Prefect settings.

This allows for safe concurrent access and modification of settings.

Methods:

get

get(cls) -> Optional['SettingsContext']