prefect.input.run_input

This module contains functions that allow sending type-checked RunInput data to flows at runtime. Flows can send back responses, establishing two-way channels with senders. These functions are particularly useful for systems that require ongoing data transfer or need to react to input quickly. real-time interaction and efficient data handling. It’s designed to facilitate dynamic communication within distributed or microservices-oriented systems, making it ideal for scenarios requiring continuous data synchronization and processing. It’s particularly useful for systems that require ongoing data input and output. The following is an example of two flows. One sends a random number to the other and waits for a response. The other receives the number, squares it, and sends the result back. The sender flow then prints the result. Sender flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def sender_flow(receiver_flow_run_id: UUID):
    logger = get_run_logger()

    the_number = random.randint(1, 100)

    await NumberData(number=the_number).send_to(receiver_flow_run_id)

    receiver = NumberData.receive(flow_run_id=receiver_flow_run_id)
    squared = await receiver.next()

    logger.info(f"{the_number} squared is {squared.number}")
Receiver flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def receiver_flow():
    async for data in NumberData.receive():
        squared = data.number ** 2
        data.respond(NumberData(number=squared))

Functions

keyset_from_paused_state

keyset_from_paused_state(state: 'State') -> Keyset
Get the keyset for the given Paused state. Args:
  • -: the state to get the keyset for

keyset_from_base_key

keyset_from_base_key(base_key: str) -> Keyset
Get the keyset for the given base key. Args:
  • -: the base key to get the keyset for
Returns:
    • Dict[str, str]: the keyset

run_input_subclass_from_type

run_input_subclass_from_type(_type: Union[Type[R], Type[T], pydantic.BaseModel]) -> Union[Type[AutomaticRunInput[T]], Type[R]]
Create a new RunInput subclass from the given type.

send_input

send_input(run_input: Any, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None)

receive_input

receive_input(input_type: Union[Type[R], Type[T], pydantic.BaseModel], timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]

Classes

RunInputMetadata

BaseRunInput

Methods:

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • -: the keyset to load the input for
  • -: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • -: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None)

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None)
Save the run input response to the given key. Args:
  • -: the keyset to save the input for
  • -: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None)

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • -: a description to show when resuming a flow run that requires input
  • -: the initial data to populate the subclass

RunInput

Methods:

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • -: the keyset to load the input for
  • -: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • -: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None) -> GetInputHandler[Self]

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None)

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None)
Save the run input response to the given key. Args:
  • -: the keyset to save the input for
  • -: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None)

subclass_from_base_model_type

subclass_from_base_model_type(cls, model_cls: Type[pydantic.BaseModel]) -> Type['RunInput']
Create a new RunInput subclass from the given pydantic.BaseModel subclass. Args:
  • -: the class from which to create the new RunInput subclass

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • -: a description to show when resuming a flow run that requires input
  • -: the initial data to populate the subclass

AutomaticRunInput

Methods:

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • -: the keyset to load the input for
  • -: the flow run ID to load the input for

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • -: the keyset to load the input for
  • -: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • -: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> GetAutomaticInputHandler[T]

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None)

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None)
Save the run input response to the given key. Args:
  • -: the keyset to save the input for
  • -: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None)

subclass_from_type

subclass_from_type(cls, _type: Type[T]) -> Type['AutomaticRunInput[T]']
Create a new AutomaticRunInput subclass from the given type. This method uses the type’s name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like “list”).

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • -: a description to show when resuming a flow run that requires input
  • -: the initial data to populate the subclass

GetInputHandler

Methods:

filter_for_inputs

filter_for_inputs(self) -> list['FlowRunInput']

next

next(self) -> R

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> R

GetAutomaticInputHandler

Methods:

filter_for_inputs

filter_for_inputs(self) -> list['FlowRunInput']

next

next(self) -> Union[T, AutomaticRunInput[T]]

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> Union[T, AutomaticRunInput[T]]