Skip to content

prefect.utilities.callables

Utilities for working with Python callables.

ParameterSchema

Bases: BaseModel

Simple data model corresponding to an OpenAPI Schema.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class ParameterSchema(pydantic.BaseModel):
    """Simple data model corresponding to an OpenAPI `Schema`."""

    title: Literal["Parameters"] = "Parameters"
    type: Literal["object"] = "object"
    properties: Dict[str, Any] = pydantic.Field(default_factory=dict)
    required: List[str] = None
    definitions: Dict[str, Any] = None

    def dict(self, *args, **kwargs):
        """Exclude `None` fields by default to comply with
        the OpenAPI spec.
        """
        kwargs.setdefault("exclude_none", True)
        return super().dict(*args, **kwargs)

dict

Exclude None fields by default to comply with the OpenAPI spec.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
217
218
219
220
221
222
def dict(self, *args, **kwargs):
    """Exclude `None` fields by default to comply with
    the OpenAPI spec.
    """
    kwargs.setdefault("exclude_none", True)
    return super().dict(*args, **kwargs)

call_with_parameters

Call a function with parameters extracted with get_call_parameters

The function must have an identical signature to the original function or this will fail. If you need to send to a function with a different signature, extract the args/kwargs using parameters_to_positional_and_keyword directly

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
171
172
173
174
175
176
177
178
179
180
def call_with_parameters(fn: Callable, parameters: Dict[str, Any]):
    """
    Call a function with parameters extracted with `get_call_parameters`

    The function _must_ have an identical signature to the original function or this
    will fail. If you need to send to a function with a different signature, extract
    the args/kwargs using `parameters_to_positional_and_keyword` directly
    """
    args, kwargs = parameters_to_args_kwargs(fn, parameters)
    return fn(*args, **kwargs)

cloudpickle_wrapped_call

Serializes a function call using cloudpickle then returns a callable which will execute that call and return a cloudpickle serialized return value

This is particularly useful for sending calls to libraries that only use the Python built-in pickler (e.g. anyio.to_process and multiprocessing) but may require a wider range of pickling support.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
183
184
185
186
187
188
189
190
191
192
193
194
195
def cloudpickle_wrapped_call(
    __fn: Callable, *args: Any, **kwargs: Any
) -> Callable[[], bytes]:
    """
    Serializes a function call using cloudpickle then returns a callable which will
    execute that call and return a cloudpickle serialized return value

    This is particularly useful for sending calls to libraries that only use the Python
    built-in pickler (e.g. `anyio.to_process` and `multiprocessing`) but may require
    a wider range of pickling support.
    """
    payload = cloudpickle.dumps((__fn, args, kwargs))
    return partial(_run_serialized_call, payload)

collapse_variadic_parameters

Given a parameter dictionary, move any parameters stored not present in the signature into the variadic keyword argument.

Example:

```python
def foo(a, b, **kwargs):
    pass

parameters = {"a": 1, "b": 2, "c": 3, "d": 4}
collapse_variadic_parameters(foo, parameters)
# {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
```
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def collapse_variadic_parameters(
    fn: Callable, parameters: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Given a parameter dictionary, move any parameters stored not present in the
    signature into the variadic keyword argument.

    Example:

        ```python
        def foo(a, b, **kwargs):
            pass

        parameters = {"a": 1, "b": 2, "c": 3, "d": 4}
        collapse_variadic_parameters(foo, parameters)
        # {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
        ```
    """
    signature_parameters = inspect.signature(fn).parameters
    variadic_key = None
    for key, parameter in signature_parameters.items():
        if parameter.kind == parameter.VAR_KEYWORD:
            variadic_key = key
            break

    missing_parameters = set(parameters.keys()) - set(signature_parameters.keys())

    if not variadic_key and missing_parameters:
        raise ValueError(
            f"Signature for {fn} does not include any variadic keyword argument "
            "but parameters were given that are not present in the signature."
        )

    if variadic_key and not missing_parameters:
        # variadic key is present but no missing parameters, return parameters unchanged
        return parameters

    new_parameters = parameters.copy()
    if variadic_key:
        new_parameters[variadic_key] = {}

    for key in missing_parameters:
        new_parameters[variadic_key][key] = new_parameters.pop(key)

    return new_parameters

explode_variadic_parameter

Given a parameter dictionary, move any parameters stored in a variadic keyword argument parameter (i.e. **kwargs) into the top level.

Example:

```python
def foo(a, b, **kwargs):
    pass

parameters = {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
explode_variadic_parameter(foo, parameters)
# {"a": 1, "b": 2, "c": 3, "d": 4}
```
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def explode_variadic_parameter(
    fn: Callable, parameters: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Given a parameter dictionary, move any parameters stored in a variadic keyword
    argument parameter (i.e. **kwargs) into the top level.

    Example:

        ```python
        def foo(a, b, **kwargs):
            pass

        parameters = {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
        explode_variadic_parameter(foo, parameters)
        # {"a": 1, "b": 2, "c": 3, "d": 4}
        ```
    """
    variadic_key = None
    for key, parameter in inspect.signature(fn).parameters.items():
        if parameter.kind == parameter.VAR_KEYWORD:
            variadic_key = key
            break

    if not variadic_key:
        return parameters

    new_parameters = parameters.copy()
    for key, value in new_parameters.pop(variadic_key, {}).items():
        new_parameters[key] = value

    return new_parameters

get_call_parameters

Bind a call to a function to get parameter/value mapping. Default values on the signature will be included if not overridden.

Raises a ParameterBindError if the arguments/kwargs are not valid for the function

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_call_parameters(
    fn: Callable,
    call_args: Tuple[Any, ...],
    call_kwargs: Dict[str, Any],
    apply_defaults: bool = True,
) -> Dict[str, Any]:
    """
    Bind a call to a function to get parameter/value mapping. Default values on the
    signature will be included if not overridden.

    Raises a ParameterBindError if the arguments/kwargs are not valid for the function
    """
    try:
        bound_signature = inspect.signature(fn).bind(*call_args, **call_kwargs)
    except TypeError as exc:
        raise ParameterBindError.from_bind_failure(fn, exc, call_args, call_kwargs)

    if apply_defaults:
        bound_signature.apply_defaults()

    # We cast from `OrderedDict` to `dict` because Dask will not convert futures in an
    # ordered dictionary to values during execution; this is the default behavior in
    # Python 3.9 anyway.
    return dict(bound_signature.arguments)

get_parameter_defaults

Get default parameter values for a callable.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_parameter_defaults(
    fn: Callable,
) -> Dict[str, Any]:
    """
    Get default parameter values for a callable.
    """
    signature = inspect.signature(fn)

    parameter_defaults = {}

    for name, param in signature.parameters.items():
        if param.default is not signature.empty:
            parameter_defaults[name] = param.default

    return parameter_defaults

parameter_docstrings

Given a docstring in Google docstring format, parse the parameter section and return a dictionary that maps parameter names to docstring.

Parameters:

Name Type Description Default
docstring Optional[str]

The function's docstring.

required

Returns:

Type Description
Dict[str, str]

Mapping from parameter names to docstrings.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def parameter_docstrings(docstring: Optional[str]) -> Dict[str, str]:
    """
    Given a docstring in Google docstring format, parse the parameter section
    and return a dictionary that maps parameter names to docstring.

    Args:
        docstring: The function's docstring.

    Returns:
        Mapping from parameter names to docstrings.
    """
    param_docstrings = {}

    if not docstring:
        return param_docstrings

    with disable_logger("griffe.docstrings.google"), disable_logger(
        "griffe.agents.nodes"
    ):
        parsed = parse(Docstring(docstring), Parser.google)
        for section in parsed:
            if section.kind != DocstringSectionKind.parameters:
                continue
            param_docstrings = {
                parameter.name: parameter.description for parameter in section.value
            }

    return param_docstrings

parameter_schema

Given a function, generates an OpenAPI-compatible description of the function's arguments, including: - name - typing information - whether it is required - a default value - additional constraints (like possible enum values)

Parameters:

Name Type Description Default
fn function

The function whose arguments will be serialized

required

Returns:

Name Type Description
dict ParameterSchema

the argument schema

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def parameter_schema(fn: Callable) -> ParameterSchema:
    """Given a function, generates an OpenAPI-compatible description
    of the function's arguments, including:
        - name
        - typing information
        - whether it is required
        - a default value
        - additional constraints (like possible enum values)

    Args:
        fn (function): The function whose arguments will be serialized

    Returns:
        dict: the argument schema
    """
    signature = inspect.signature(fn)
    model_fields = {}
    aliases = {}
    docstrings = parameter_docstrings(inspect.getdoc(fn))

    class ModelConfig:
        arbitrary_types_allowed = True

    for position, param in enumerate(signature.parameters.values()):
        # Pydantic model creation will fail if names collide with the BaseModel type
        if hasattr(pydantic.BaseModel, param.name):
            name = param.name + "__"
            aliases[name] = param.name
        else:
            name = param.name

        type_, field = (
            Any if param.annotation is inspect._empty else param.annotation,
            pydantic.Field(
                default=... if param.default is param.empty else param.default,
                title=param.name,
                description=docstrings.get(param.name, None),
                alias=aliases.get(name),
                position=position,
            ),
        )

        # Generate a Pydantic model at each step so we can check if this parameter
        # type is supported schema generation
        try:
            pydantic.create_model(
                "CheckParameter", __config__=ModelConfig, **{name: (type_, field)}
            ).schema(by_alias=True)
        except ValueError:
            # This field's type is not valid for schema creation, update it to `Any`
            type_ = Any

        model_fields[name] = (type_, field)

    # Generate the final model and schema
    model = pydantic.create_model("Parameters", __config__=ModelConfig, **model_fields)
    schema = model.schema(by_alias=True)
    return ParameterSchema(**schema)

parameters_to_args_kwargs

Convert a parameters dictionary to positional and keyword arguments

The function must have an identical signature to the original function or this will return an empty tuple and dict.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def parameters_to_args_kwargs(
    fn: Callable,
    parameters: Dict[str, Any],
) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
    """
    Convert a `parameters` dictionary to positional and keyword arguments

    The function _must_ have an identical signature to the original function or this
    will return an empty tuple and dict.
    """
    function_params = dict(inspect.signature(fn).parameters).keys()
    # Check for parameters that are not present in the function signature
    unknown_params = parameters.keys() - function_params
    if unknown_params:
        raise SignatureMismatchError.from_bad_params(
            list(function_params), list(parameters.keys())
        )
    bound_signature = inspect.signature(fn).bind_partial()
    bound_signature.arguments = parameters

    return bound_signature.args, bound_signature.kwargs

raise_for_reserved_arguments

Raise a ReservedArgumentError if fn has any parameters that conflict with the names contained in reserved_arguments.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/utilities/callables.py
315
316
317
318
319
320
321
322
323
324
def raise_for_reserved_arguments(fn: Callable, reserved_arguments: Iterable[str]):
    """Raise a ReservedArgumentError if `fn` has any parameters that conflict
    with the names contained in `reserved_arguments`."""
    function_paremeters = inspect.signature(fn).parameters

    for argument in reserved_arguments:
        if argument in function_paremeters:
            raise ReservedArgumentError(
                f"{argument!r} is a reserved argument name and cannot be used."
            )