Documentation Index Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
prefect.input.run_input
This module contains functions that allow sending type-checked RunInput data
to flows at runtime. Flows can send back responses, establishing two-way
channels with senders. These functions are particularly useful for systems that
require ongoing data transfer or need to react to input quickly.
real-time interaction and efficient data handling. It’s designed to facilitate
dynamic communication within distributed or microservices-oriented systems,
making it ideal for scenarios requiring continuous data synchronization and
processing. It’s particularly useful for systems that require ongoing data
input and output.
The following is an example of two flows. One sends a random number to the
other and waits for a response. The other receives the number, squares it, and
sends the result back. The sender flow then prints the result.
Sender flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput
class NumberData ( RunInput ):
number: int
@flow
async def sender_flow ( receiver_flow_run_id : UUID ):
logger = get_run_logger()
the_number = random.randint( 1 , 100 )
await NumberData( number = the_number).send_to(receiver_flow_run_id)
receiver = NumberData.receive( flow_run_id = receiver_flow_run_id)
squared = await receiver.next()
logger.info( f " { the_number } squared is { squared.number } " )
Receiver flow:
import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput
class NumberData ( RunInput ):
number: int
@flow
async def receiver_flow ():
async for data in NumberData.receive():
squared = data.number ** 2
data.respond(NumberData( number = squared))
Functions
keyset_from_paused_state
keyset_from_paused_state(state: 'State' ) -> Keyset
Get the keyset for the given Paused state.
Args:
- state: the state to get the keyset for
keyset_from_base_key
keyset_from_base_key(base_key: str ) -> Keyset
Get the keyset for the given base key.
Args:
- base_key: the base key to get the keyset for
Returns:
Dict[str, str]: the keyset
run_input_subclass_from_type(_type: Union[Type[R], Type[T], pydantic.BaseModel]) -> Union[Type[AutomaticRunInput[T]], Type[R]]
Create a new RunInput subclass from the given type.
asend_input(run_input: Any, flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send input to a flow run asynchronously.
send_input(run_input: Any, flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send input to a flow run.
receive_input(input_type: Union[Type[R], Type[T], pydantic.BaseModel], timeout: Optional[ float ] = 3600 , poll_interval: float = 10 , raise_timeout_error: bool = False , exclude_keys: Optional[Set[ str ]] = None , key_prefix: Optional[ str ] = None , flow_run_id: Optional[ UUID ] = None , with_metadata: bool = False ) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]
Classes
Methods:
aload
aload( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key asynchronously.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
arespond
arespond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input asynchronously.
asave
asave( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key asynchronously.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
asend_to
asend_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run asynchronously.
keyset_from_type
keyset_from_type( cls ) -> Keyset
load
load( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
load_from_flow_run_input( cls , flow_run_input: 'FlowRunInput' ) -> Self
Load the run input from a FlowRunInput object.
Args:
- flow_run_input: the flow run input to load the input for
metadata( self ) -> RunInputMetadata
respond
respond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input.
save
save( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
send_to
send_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run.
with_initial_data
with_initial_data( cls : Type[R], description: Optional[ str ] = None , ** kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field
defaults.
Args:
- description: a description to show when resuming
a flow run that requires input
- kwargs: the initial data to populate the subclass
Methods:
aload
aload( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key asynchronously.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
arespond
arespond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input asynchronously.
asave
asave( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key asynchronously.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
asend_to
asend_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run asynchronously.
keyset_from_type
keyset_from_type( cls ) -> Keyset
load
load( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
load_from_flow_run_input( cls , flow_run_input: 'FlowRunInput' ) -> Self
Load the run input from a FlowRunInput object.
Args:
- flow_run_input: the flow run input to load the input for
metadata( self ) -> RunInputMetadata
receive
receive( cls , timeout: Optional[ float ] = 3600 , poll_interval: float = 10 , raise_timeout_error: bool = False , exclude_keys: Optional[Set[ str ]] = None , key_prefix: Optional[ str ] = None , flow_run_id: Optional[ UUID ] = None ) -> GetInputHandler[Self]
respond
respond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input.
save
save( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
send_to
send_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run.
subclass_from_base_model_type
subclass_from_base_model_type( cls , model_cls: Type[pydantic.BaseModel]) -> Type[ 'RunInput' ]
Create a new RunInput subclass from the given pydantic.BaseModel
subclass.
Args:
- model_cls: the class from which
to create the new RunInput subclass
with_initial_data
with_initial_data( cls : Type[R], description: Optional[ str ] = None , ** kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field
defaults.
Args:
- description: a description to show when resuming
a flow run that requires input
- kwargs: the initial data to populate the subclass
Methods:
aload
aload( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> T
Load the run input response from the given key asynchronously.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
aload
aload( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key asynchronously.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
arespond
arespond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input asynchronously.
asave
asave( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key asynchronously.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
asend_to
asend_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run asynchronously.
keyset_from_type
keyset_from_type( cls ) -> Keyset
load
load( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> T
Load the run input response from the given key.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
load
load( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> Self
Load the run input response from the given key.
Args:
- keyset: the keyset to load the input for
- flow_run_id: the flow run ID to load the input for
load_from_flow_run_input( cls , flow_run_input: 'FlowRunInput' ) -> Self
Load the run input from a FlowRunInput object.
Args:
- flow_run_input: the flow run input to load the input for
metadata( self ) -> RunInputMetadata
receive
receive( cls , timeout: Optional[ float ] = 3600 , poll_interval: float = 10 , raise_timeout_error: bool = False , exclude_keys: Optional[Set[ str ]] = None , key_prefix: Optional[ str ] = None , flow_run_id: Optional[ UUID ] = None , with_metadata: bool = False ) -> GetAutomaticInputHandler[T]
respond
respond( self , run_input: 'RunInput' , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Respond to the sender of this input.
save
save( cls , keyset: Keyset, flow_run_id: Optional[ UUID ] = None ) -> None
Save the run input response to the given key.
Args:
- keyset: the keyset to save the input for
- flow_run_id: the flow run ID to save the input for
send_to
send_to( self , flow_run_id: UUID , sender: Optional[ str ] = None , key_prefix: Optional[ str ] = None ) -> None
Send this input to a flow run.
subclass_from_type
subclass_from_type( cls , _type: Type[T]) -> Type[ 'AutomaticRunInput[T]' ]
Create a new AutomaticRunInput subclass from the given type.
This method uses the type’s name as a key prefix to identify related
flow run inputs. This helps in ensuring that values saved under a type
(like List[int]) are retrievable under the generic type name (like “list”).
with_initial_data
with_initial_data( cls : Type[R], description: Optional[ str ] = None , ** kwargs: Any) -> Type[R]
Create a new RunInput subclass with the given initial data as field
defaults.
Args:
- description: a description to show when resuming
a flow run that requires input
- kwargs: the initial data to populate the subclass
Methods:
anext
Get the next input asynchronously.
filter_for_inputs( self ) -> list[ 'FlowRunInput' ]
Filter for inputs asynchronously.
filter_for_inputs_sync( self ) -> list[ 'FlowRunInput' ]
Filter for inputs synchronously.
next
Get the next input.
to_instance
to_instance( self , flow_run_input: 'FlowRunInput' ) -> R
Methods:
anext
anext ( self ) -> Union[T, AutomaticRunInput[T]]
Get the next input asynchronously.
filter_for_inputs( self ) -> list[ 'FlowRunInput' ]
Filter for inputs asynchronously.
filter_for_inputs_sync( self ) -> list[ 'FlowRunInput' ]
Filter for inputs synchronously.
next
next ( self ) -> Union[T, AutomaticRunInput[T]]
Get the next input.
to_instance
to_instance( self , flow_run_input: 'FlowRunInput' ) -> Union[T, AutomaticRunInput[T]]