Skip to content

prefect.input.run_input

This module contains functions that allow sending type-checked RunInput data to flows at runtime. Flows can send back responses, establishing two-way channels with senders. These functions are particularly useful for systems that require ongoing data transfer or need to react to input quickly. real-time interaction and efficient data handling. It's designed to facilitate dynamic communication within distributed or microservices-oriented systems, making it ideal for scenarios requiring continuous data synchronization and processing. It's particularly useful for systems that require ongoing data input and output.

The following is an example of two flows. One sends a random number to the other and waits for a response. The other receives the number, squares it, and sends the result back. The sender flow then prints the result.

Sender flow:

import random
from uuid import UUID
from prefect import flow, get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def sender_flow(receiver_flow_run_id: UUID):
    logger = get_run_logger()

    the_number = random.randint(1, 100)

    await NumberData(number=the_number).send_to(receiver_flow_run_id)

    receiver = NumberData.receive(flow_run_id=receiver_flow_run_id)
    squared = await receiver.next()

    logger.info(f"{the_number} squared is {squared.number}")

Receiver flow:

import random
from uuid import UUID
from prefect import flow, get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def receiver_flow():
    async for data in NumberData.receive():
        squared = data.number ** 2
        data.respond(NumberData(number=squared))

AutomaticRunInput

Bases: RunInput, Generic[T]

Source code in prefect/input/run_input.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class AutomaticRunInput(RunInput, Generic[T]):
    value: T

    @classmethod
    @sync_compatible
    async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> T:
        """
        Load the run input response from the given key.

        Args:
            - keyset (Keyset): the keyset to load the input for
            - flow_run_id (UUID, optional): the flow run ID to load the input for
        """
        instance = await super().load(keyset, flow_run_id=flow_run_id)
        return instance.value

    @classmethod
    def subclass_from_type(cls, _type: Type[T]) -> Type["AutomaticRunInput[T]"]:
        """
        Create a new `AutomaticRunInput` subclass from the given type.

        This method uses the type's name as a key prefix to identify related
        flow run inputs. This helps in ensuring that values saved under a type
        (like List[int]) are retrievable under the generic type name (like "list").
        """
        fields: Dict[str, Any] = {"value": (_type, ...)}

        # Explanation for using getattr for type name extraction:
        # - "__name__": This is the usual attribute for getting the name of
        #   most types.
        # - "_name": Used as a fallback, some type annotations in Python 3.9
        #   and earlier might only have this attribute instead of __name__.
        # - If neither is available, defaults to an empty string to prevent
        #   errors, but typically we should find at least one valid name
        #   attribute. This will match all automatic inputs sent to the flow
        #   run, rather than a specific type.
        #
        # This approach ensures compatibility across Python versions and
        # handles various edge cases in type annotations.

        type_prefix: str = getattr(
            _type, "__name__", getattr(_type, "_name", "")
        ).lower()

        class_name = f"{type_prefix}AutomaticRunInput"

        # Creating a new Pydantic model class dynamically with the name based
        # on the type prefix.
        new_cls: Type["AutomaticRunInput"] = pydantic.create_model(
            class_name, **fields, __base__=AutomaticRunInput
        )
        return new_cls

    @classmethod
    def receive(cls, *args, **kwargs):
        if kwargs.get("key_prefix") is None:
            kwargs["key_prefix"] = f"{cls.__name__.lower()}-auto"

        return GetAutomaticInputHandler(run_input_cls=cls, *args, **kwargs)

load async classmethod

Load the run input response from the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to load the input for

required
- flow_run_id (UUID

the flow run ID to load the input for

required
Source code in prefect/input/run_input.py
327
328
329
330
331
332
333
334
335
336
337
338
@classmethod
@sync_compatible
async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> T:
    """
    Load the run input response from the given key.

    Args:
        - keyset (Keyset): the keyset to load the input for
        - flow_run_id (UUID, optional): the flow run ID to load the input for
    """
    instance = await super().load(keyset, flow_run_id=flow_run_id)
    return instance.value

subclass_from_type classmethod

Create a new AutomaticRunInput subclass from the given type.

This method uses the type's name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like "list").

Source code in prefect/input/run_input.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
@classmethod
def subclass_from_type(cls, _type: Type[T]) -> Type["AutomaticRunInput[T]"]:
    """
    Create a new `AutomaticRunInput` subclass from the given type.

    This method uses the type's name as a key prefix to identify related
    flow run inputs. This helps in ensuring that values saved under a type
    (like List[int]) are retrievable under the generic type name (like "list").
    """
    fields: Dict[str, Any] = {"value": (_type, ...)}

    # Explanation for using getattr for type name extraction:
    # - "__name__": This is the usual attribute for getting the name of
    #   most types.
    # - "_name": Used as a fallback, some type annotations in Python 3.9
    #   and earlier might only have this attribute instead of __name__.
    # - If neither is available, defaults to an empty string to prevent
    #   errors, but typically we should find at least one valid name
    #   attribute. This will match all automatic inputs sent to the flow
    #   run, rather than a specific type.
    #
    # This approach ensures compatibility across Python versions and
    # handles various edge cases in type annotations.

    type_prefix: str = getattr(
        _type, "__name__", getattr(_type, "_name", "")
    ).lower()

    class_name = f"{type_prefix}AutomaticRunInput"

    # Creating a new Pydantic model class dynamically with the name based
    # on the type prefix.
    new_cls: Type["AutomaticRunInput"] = pydantic.create_model(
        class_name, **fields, __base__=AutomaticRunInput
    )
    return new_cls

RunInput

Bases: BaseModel

Source code in prefect/input/run_input.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class RunInput(pydantic.BaseModel):
    class Config:
        extra = "forbid"

    _description: Optional[str] = pydantic.PrivateAttr(default=None)
    _metadata: RunInputMetadata = pydantic.PrivateAttr()

    @property
    def metadata(self) -> RunInputMetadata:
        return self._metadata

    @classmethod
    def keyset_from_type(cls) -> Keyset:
        return keyset_from_base_key(cls.__name__.lower())

    @classmethod
    @sync_compatible
    async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Save the run input response to the given key.

        Args:
            - keyset (Keyset): the keyset to save the input for
            - flow_run_id (UUID, optional): the flow run ID to save the input for
        """

        if HAS_PYDANTIC_V2:
            schema = create_v2_schema(cls.__name__, model_base=cls)
        else:
            schema = cls.schema(by_alias=True)

        await create_flow_run_input(
            key=keyset["schema"], value=schema, flow_run_id=flow_run_id
        )

        description = cls._description if isinstance(cls._description, str) else None
        if description:
            await create_flow_run_input(
                key=keyset["description"],
                value=description,
                flow_run_id=flow_run_id,
            )

    @classmethod
    @sync_compatible
    async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Load the run input response from the given key.

        Args:
            - keyset (Keyset): the keyset to load the input for
            - flow_run_id (UUID, optional): the flow run ID to load the input for
        """
        flow_run_id = ensure_flow_run_id(flow_run_id)
        value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
        if value:
            instance = cls(**value)
        else:
            instance = cls()
        instance._metadata = RunInputMetadata(
            key=keyset["response"], sender=None, receiver=flow_run_id
        )
        return instance

    @classmethod
    def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput"):
        """
        Load the run input from a FlowRunInput object.

        Args:
            - flow_run_input (FlowRunInput): the flow run input to load the input for
        """
        instance = cls(**flow_run_input.decoded_value)
        instance._metadata = RunInputMetadata(
            key=flow_run_input.key,
            sender=flow_run_input.sender,
            receiver=flow_run_input.flow_run_id,
        )
        return instance

    @classmethod
    def with_initial_data(
        cls: Type[R], description: Optional[str] = None, **kwargs: Any
    ) -> Type[R]:
        """
        Create a new `RunInput` subclass with the given initial data as field
        defaults.

        Args:
            - description (str, optional): a description to show when resuming
                a flow run that requires input
            - kwargs (Any): the initial data to populate the subclass
        """
        fields: Dict[str, Any] = {}
        for key, value in kwargs.items():
            fields[key] = (type(value), value)
        model = pydantic.create_model(cls.__name__, **fields, __base__=cls)

        if description is not None:
            model._description = description

        return model

    @sync_compatible
    async def respond(
        self,
        run_input: "RunInput",
        sender: Optional[str] = None,
        key_prefix: Optional[str] = None,
    ):
        flow_run_id = None
        if self.metadata.sender and self.metadata.sender.startswith("prefect.flow-run"):
            _, _, id = self.metadata.sender.rpartition(".")
            flow_run_id = UUID(id)

        if not flow_run_id:
            raise RuntimeError(
                "Cannot respond to an input that was not sent by a flow run."
            )

        await _send_input(
            flow_run_id=flow_run_id,
            run_input=run_input,
            sender=sender,
            key_prefix=key_prefix,
        )

    @sync_compatible
    async def send_to(
        self,
        flow_run_id: UUID,
        sender: Optional[str] = None,
        key_prefix: Optional[str] = None,
    ):
        await _send_input(
            flow_run_id=flow_run_id,
            run_input=self,
            sender=sender,
            key_prefix=key_prefix,
        )

    @classmethod
    def receive(
        cls,
        timeout: Optional[float] = 3600,
        poll_interval: float = 10,
        raise_timeout_error: bool = False,
        exclude_keys: Optional[Set[str]] = None,
        key_prefix: Optional[str] = None,
        flow_run_id: Optional[UUID] = None,
    ):
        if key_prefix is None:
            key_prefix = f"{cls.__name__.lower()}-auto"

        return GetInputHandler(
            run_input_cls=cls,
            key_prefix=key_prefix,
            timeout=timeout,
            poll_interval=poll_interval,
            raise_timeout_error=raise_timeout_error,
            exclude_keys=exclude_keys,
            flow_run_id=flow_run_id,
        )

    @classmethod
    def subclass_from_base_model_type(
        cls, model_cls: Type[pydantic.BaseModel]
    ) -> Type["RunInput"]:
        """
        Create a new `RunInput` subclass from the given `pydantic.BaseModel`
        subclass.

        Args:
            - model_cls (pydantic.BaseModel subclass): the class from which
                to create the new `RunInput` subclass
        """
        return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {})  # type: ignore

load async classmethod

Load the run input response from the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to load the input for

required
- flow_run_id (UUID

the flow run ID to load the input for

required
Source code in prefect/input/run_input.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@classmethod
@sync_compatible
async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Load the run input response from the given key.

    Args:
        - keyset (Keyset): the keyset to load the input for
        - flow_run_id (UUID, optional): the flow run ID to load the input for
    """
    flow_run_id = ensure_flow_run_id(flow_run_id)
    value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
    if value:
        instance = cls(**value)
    else:
        instance = cls()
    instance._metadata = RunInputMetadata(
        key=keyset["response"], sender=None, receiver=flow_run_id
    )
    return instance

load_from_flow_run_input classmethod

Load the run input from a FlowRunInput object.

Parameters:

Name Type Description Default
- flow_run_input (FlowRunInput

the flow run input to load the input for

required
Source code in prefect/input/run_input.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@classmethod
def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput"):
    """
    Load the run input from a FlowRunInput object.

    Args:
        - flow_run_input (FlowRunInput): the flow run input to load the input for
    """
    instance = cls(**flow_run_input.decoded_value)
    instance._metadata = RunInputMetadata(
        key=flow_run_input.key,
        sender=flow_run_input.sender,
        receiver=flow_run_input.flow_run_id,
    )
    return instance

save async classmethod

Save the run input response to the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to save the input for

required
- flow_run_id (UUID

the flow run ID to save the input for

required
Source code in prefect/input/run_input.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
@sync_compatible
async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Save the run input response to the given key.

    Args:
        - keyset (Keyset): the keyset to save the input for
        - flow_run_id (UUID, optional): the flow run ID to save the input for
    """

    if HAS_PYDANTIC_V2:
        schema = create_v2_schema(cls.__name__, model_base=cls)
    else:
        schema = cls.schema(by_alias=True)

    await create_flow_run_input(
        key=keyset["schema"], value=schema, flow_run_id=flow_run_id
    )

    description = cls._description if isinstance(cls._description, str) else None
    if description:
        await create_flow_run_input(
            key=keyset["description"],
            value=description,
            flow_run_id=flow_run_id,
        )

subclass_from_base_model_type classmethod

Create a new RunInput subclass from the given pydantic.BaseModel subclass.

Parameters:

Name Type Description Default
- model_cls (pydantic.BaseModel subclass

the class from which to create the new RunInput subclass

required
Source code in prefect/input/run_input.py
309
310
311
312
313
314
315
316
317
318
319
320
321
@classmethod
def subclass_from_base_model_type(
    cls, model_cls: Type[pydantic.BaseModel]
) -> Type["RunInput"]:
    """
    Create a new `RunInput` subclass from the given `pydantic.BaseModel`
    subclass.

    Args:
        - model_cls (pydantic.BaseModel subclass): the class from which
            to create the new `RunInput` subclass
    """
    return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {})  # type: ignore

with_initial_data classmethod

Create a new RunInput subclass with the given initial data as field defaults.

Parameters:

Name Type Description Default
- description (str

a description to show when resuming a flow run that requires input

required
- kwargs (Any

the initial data to populate the subclass

required
Source code in prefect/input/run_input.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@classmethod
def with_initial_data(
    cls: Type[R], description: Optional[str] = None, **kwargs: Any
) -> Type[R]:
    """
    Create a new `RunInput` subclass with the given initial data as field
    defaults.

    Args:
        - description (str, optional): a description to show when resuming
            a flow run that requires input
        - kwargs (Any): the initial data to populate the subclass
    """
    fields: Dict[str, Any] = {}
    for key, value in kwargs.items():
        fields[key] = (type(value), value)
    model = pydantic.create_model(cls.__name__, **fields, __base__=cls)

    if description is not None:
        model._description = description

    return model

keyset_from_base_key

Get the keyset for the given base key.

Parameters:

Name Type Description Default
- base_key (str

the base key to get the keyset for

required

Returns:

Type Description
Keyset
  • Dict[str, str]: the keyset
Source code in prefect/input/run_input.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def keyset_from_base_key(base_key: str) -> Keyset:
    """
    Get the keyset for the given base key.

    Args:
        - base_key (str): the base key to get the keyset for

    Returns:
        - Dict[str, str]: the keyset
    """
    return {
        "description": f"{base_key}-description",
        "response": f"{base_key}-response",
        "schema": f"{base_key}-schema",
    }

keyset_from_paused_state

Get the keyset for the given Paused state.

Parameters:

Name Type Description Default
- state (State

the state to get the keyset for

required
Source code in prefect/input/run_input.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def keyset_from_paused_state(state: "State") -> Keyset:
    """
    Get the keyset for the given Paused state.

    Args:
        - state (State): the state to get the keyset for
    """

    if not state.is_paused():
        raise RuntimeError(f"{state.type.value!r} is unsupported.")

    state_name = state.name or ""
    base_key = f"{state_name.lower()}-{str(state.state_details.pause_key)}"
    return keyset_from_base_key(base_key)

run_input_subclass_from_type

Create a new RunInput subclass from the given type.

Source code in prefect/input/run_input.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def run_input_subclass_from_type(
    _type: Union[Type[R], Type[T], pydantic.BaseModel],
) -> Union[Type[AutomaticRunInput[T]], Type[R]]:
    """
    Create a new `RunInput` subclass from the given type.
    """
    if isclass(_type):
        if issubclass(_type, RunInput):
            return cast(Type[R], _type)
        elif issubclass(_type, pydantic.BaseModel):
            return cast(Type[R], RunInput.subclass_from_base_model_type(_type))

    # Could be something like a typing._GenericAlias or any other type that
    # isn't a `RunInput` subclass or `pydantic.BaseModel` subclass. Try passing
    # it to AutomaticRunInput to see if we can create a model from it.
    return cast(
        Type[AutomaticRunInput[T]],
        AutomaticRunInput.subclass_from_type(cast(Type[T], _type)),
    )