Skip to main content

prefect.input.run_input

This module contains functions that allow sending type-checked RunInput data to flows at runtime. Flows can send back responses, establishing two-way channels with senders. These functions are particularly useful for systems that require ongoing data transfer or need to react to input quickly. real-time interaction and efficient data handling. It’s designed to facilitate dynamic communication within distributed or microservices-oriented systems, making it ideal for scenarios requiring continuous data synchronization and processing. It’s particularly useful for systems that require ongoing data input and output. The following is an example of two flows. One sends a random number to the other and waits for a response. The other receives the number, squares it, and sends the result back. The sender flow then prints the result. Sender flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def sender_flow(receiver_flow_run_id: UUID):
    logger = get_run_logger()

    the_number = random.randint(1, 100)

    await NumberData(number=the_number).send_to(receiver_flow_run_id)

    receiver = NumberData.receive(flow_run_id=receiver_flow_run_id)
    squared = await receiver.next()

    logger.info(f"{the_number} squared is {squared.number}")
Receiver flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def receiver_flow():
    async for data in NumberData.receive():
        squared = data.number ** 2
        data.respond(NumberData(number=squared))

Functions

keyset_from_paused_state

keyset_from_paused_state(state: 'State') -> Keyset
Get the keyset for the given Paused state. Args:
  • - state: the state to get the keyset for

keyset_from_base_key

keyset_from_base_key(base_key: str) -> Keyset
Get the keyset for the given base key. Args:
  • - base_key: the base key to get the keyset for
Returns:
    • Dict[str, str]: the keyset

run_input_subclass_from_type

run_input_subclass_from_type(_type: Union[Type[R], Type[T], pydantic.BaseModel]) -> Union[Type[AutomaticRunInput[T]], Type[R]]
Create a new RunInput subclass from the given type.

asend_input

asend_input(run_input: Any, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send input to a flow run asynchronously.

send_input

send_input(run_input: Any, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send input to a flow run.

receive_input

receive_input(input_type: Union[Type[R], Type[T], pydantic.BaseModel], timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]

Classes

RunInputMetadata

BaseRunInput

Methods:

aload

aload(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key asynchronously. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

arespond

arespond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input asynchronously.

asave

asave(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key asynchronously. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

asend_to

asend_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run asynchronously.

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • - flow_run_input: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input.

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run.

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • - description: a description to show when resuming a flow run that requires input
  • - kwargs: the initial data to populate the subclass

RunInput

Methods:

aload

aload(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key asynchronously. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

arespond

arespond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input asynchronously.

asave

asave(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key asynchronously. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

asend_to

asend_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run asynchronously.

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • - flow_run_input: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None) -> GetInputHandler[Self]

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input.

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run.

subclass_from_base_model_type

subclass_from_base_model_type(cls, model_cls: Type[pydantic.BaseModel]) -> Type['RunInput']
Create a new RunInput subclass from the given pydantic.BaseModel subclass. Args:
  • - model_cls: the class from which to create the new RunInput subclass

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • - description: a description to show when resuming a flow run that requires input
  • - kwargs: the initial data to populate the subclass

AutomaticRunInput

Methods:

aload

aload(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> T
Load the run input response from the given key asynchronously. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

aload

aload(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key asynchronously. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

arespond

arespond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input asynchronously.

asave

asave(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key asynchronously. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

asend_to

asend_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run asynchronously.

keyset_from_type

keyset_from_type(cls) -> Keyset

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> T
Load the run input response from the given key. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

load

load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self
Load the run input response from the given key. Args:
  • - keyset: the keyset to load the input for
  • - flow_run_id: the flow run ID to load the input for

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self
Load the run input from a FlowRunInput object. Args:
  • - flow_run_input: the flow run input to load the input for

metadata

metadata(self) -> RunInputMetadata

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> GetAutomaticInputHandler[T]

respond

respond(self, run_input: 'RunInput', sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Respond to the sender of this input.

save

save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> None
Save the run input response to the given key. Args:
  • - keyset: the keyset to save the input for
  • - flow_run_id: the flow run ID to save the input for

send_to

send_to(self, flow_run_id: UUID, sender: Optional[str] = None, key_prefix: Optional[str] = None) -> None
Send this input to a flow run.

subclass_from_type

subclass_from_type(cls, _type: Type[T]) -> Type['AutomaticRunInput[T]']
Create a new AutomaticRunInput subclass from the given type. This method uses the type’s name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like “list”).

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field defaults. Args:
  • - description: a description to show when resuming a flow run that requires input
  • - kwargs: the initial data to populate the subclass

GetInputHandler

Methods:

anext

anext(self) -> R
Get the next input asynchronously.

filter_for_inputs

filter_for_inputs(self) -> list['FlowRunInput']
Filter for inputs asynchronously.

filter_for_inputs_sync

filter_for_inputs_sync(self) -> list['FlowRunInput']
Filter for inputs synchronously.

next

next(self) -> R
Get the next input.

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> R

GetAutomaticInputHandler

Methods:

anext

anext(self) -> Union[T, AutomaticRunInput[T]]
Get the next input asynchronously.

filter_for_inputs

filter_for_inputs(self) -> list['FlowRunInput']
Filter for inputs asynchronously.

filter_for_inputs_sync

filter_for_inputs_sync(self) -> list['FlowRunInput']
Filter for inputs synchronously.

next

next(self) -> Union[T, AutomaticRunInput[T]]
Get the next input.

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> Union[T, AutomaticRunInput[T]]