Skip to content

prefect.tasks

Module containing the base workflow task class and decorator - for most use cases, using the @task decorator is preferred.

Task

A Prefect task definition.

Note

We recommend using the @task decorator for most use-cases.

Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function creates a new task run.

To preserve the input and output types, we use the generic type variables P and R for "Parameters" and "Returns" respectively.

Parameters:

Name Description Default
name

An optional name for the task; if not provided, the name will be inferred from the given function.

required
description

An optional string description for the task.

required
tags

An optional set of tags to be associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime.

required
version

An optional string specifying the version of this task definition

required
cache_key_fn

An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again.

required
cache_expiration

An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire.

required
retries

An optional number of times to retry on task run failure.

required
retry_delay_seconds

An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.

required
Source code in prefect/tasks.py
class Task(Generic[P, R]):
    """
    A Prefect task definition.

    !!! note
        We recommend using [the `@task` decorator][prefect.tasks.task] for most use-cases.

    Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function
    creates a new task run.

    To preserve the input and output types, we use the generic type variables P and R for "Parameters" and
    "Returns" respectively.

    Args:
        name: An optional name for the task; if not provided, the name will be inferred
            from the given function.
        description: An optional string description for the task.
        tags: An optional set of tags to be associated with runs of this task. These
            tags are combined with any tags defined by a `prefect.tags` context at
            task runtime.
        version: An optional string specifying the version of this task definition
        cache_key_fn: An optional callable that, given the task run context and call
            parameters, generates a string key; if the key matches a previous completed
            state, that state result will be restored instead of running the task again.
        cache_expiration: An optional amount of time indicating how long cached states
            for this task should be restorable; if not provided, cached states will
            never expire.
        retries: An optional number of times to retry on task run failure.
        retry_delay_seconds: An optional number of seconds to wait before retrying the
            task after failure. This is only applicable if `retries` is nonzero.
    """

    # NOTE: These parameters (types, defaults, and docstrings) should be duplicated
    #       exactly in the @task decorator
    def __init__(
        self,
        fn: Callable[P, R],
        name: str = None,
        description: str = None,
        tags: Iterable[str] = None,
        version: str = None,
        cache_key_fn: Callable[
            ["TaskRunContext", Dict[str, Any]], Optional[str]
        ] = None,
        cache_expiration: datetime.timedelta = None,
        retries: int = 0,
        retry_delay_seconds: Union[float, int] = 0,
    ):
        if not callable(fn):
            raise TypeError("'fn' must be callable")

        self.description = description or inspect.getdoc(fn)
        update_wrapper(self, fn)
        self.fn = fn
        self.isasync = inspect.iscoroutinefunction(self.fn)

        self.name = name or self.fn.__name__
        self.version = version

        raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])

        self.tags = set(tags if tags else [])
        self.task_key = to_qualified_name(self.fn)

        self.cache_key_fn = cache_key_fn
        self.cache_expiration = cache_expiration

        # TaskRunPolicy settings
        # TODO: We can instantiate a `TaskRunPolicy` and add Pydantic bound checks to
        #       validate that the user passes positive numbers here
        self.retries = retries
        self.retry_delay_seconds = retry_delay_seconds

        # Warn if this task's `name` conflicts with another task while having a
        # different function. This is to detect the case where two or more tasks
        # share a name or are lambdas, which should result in a warning, and to
        # differentiate it from the case where the task was 'copied' via
        # `with_options`, which should not result in a warning.
        registry = PrefectObjectRegistry.get()

        if registry and any(
            other
            for other in registry.get_instances(Task)
            if other.name == self.name and id(other.fn) != id(self.fn)
        ):
            file = inspect.getsourcefile(self.fn)
            line_number = inspect.getsourcelines(self.fn)[1]
            warnings.warn(
                f"A task named {self.name!r} and defined at '{file}:{line_number}' "
                "conflicts with another task. Consider specifying a unique `name` "
                "parameter in the task definition:\n\n "
                "`@task(name='my_unique_name', ...)`"
            )

    def with_options(
        self,
        *,
        name: str = None,
        description: str = None,
        tags: Iterable[str] = None,
        cache_key_fn: Callable[
            ["TaskRunContext", Dict[str, Any]], Optional[str]
        ] = None,
        cache_expiration: datetime.timedelta = None,
        retries: int = 0,
        retry_delay_seconds: Union[float, int] = 0,
    ):
        """
        Create a new task from the current object, updating provided options.

        Args:
            name: A new name for the task.
            description: A new description for the task.
            tags: A new set of tags for the task. If given, existing tags are ignored,
                not merged.
            cache_key_fn: A new cache key function for the task.
            cache_expiration: A new cache expiration time for the task.
            retries: A new number of times to retry on task run failure.
            retry_delay_seconds: A new number of seconds to wait before retrying the
                task after failure. This is only applicable if `retries` is nonzero.

        Returns:
            A new `Task` instance.

        Examples:

            Create a new task from an existing task and update the name

            >>> @task(name="My task")
            >>> def my_task():
            >>>     return 1
            >>>
            >>> new_task = my_task.with_options(name="My new task")

            Create a new task from an existing task and update the retry settings

            >>> from random import randint
            >>>
            >>> @task(retries=1, retry_delay_seconds=5)
            >>> def my_task():
            >>>     x = randint(0, 5)
            >>>     if x >= 3:  # Make a task that fails sometimes
            >>>         raise ValueError("Retry me please!")
            >>>     return x
            >>>
            >>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)

            Use a task with updated options within a flow

            >>> @task(name="My task")
            >>> def my_task():
            >>>     return 1
            >>>
            >>> @flow
            >>> my_flow():
            >>>     new_task = my_task.with_options(name="My new task")
            >>>     new_task()
        """
        return Task(
            fn=self.fn,
            name=name or self.name,
            description=description or self.description,
            tags=tags or copy(self.tags),
            cache_key_fn=cache_key_fn or self.cache_key_fn,
            cache_expiration=cache_expiration or self.cache_expiration,
            retries=retries or self.retries,
            retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
        )

    @overload
    def __call__(
        self: "Task[P, NoReturn]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> None:
        # `NoReturn` matches if a type can't be inferred for the function which stops a
        # sync function from matching the `Coroutine` overload
        ...

    @overload
    def __call__(
        self: "Task[P, T]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> T:
        ...

    @overload
    def __call__(
        self: "Task[P, T]",
        *args: P.args,
        return_state: Literal[True],
        **kwargs: P.kwargs,
    ) -> State[T]:
        ...

    def __call__(
        self,
        *args: P.args,
        return_state: bool = False,
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        **kwargs: P.kwargs,
    ):
        """
        Run the task and return the result. If `return_state` is True returns
        the result is wrapped in a Prefect State which provides error handling.
        """
        from prefect.engine import enter_task_run_engine
        from prefect.task_runners import SequentialTaskRunner

        # Convert the call args/kwargs to a parameter dict
        parameters = get_call_parameters(self.fn, args, kwargs)

        return_type = "state" if return_state else "result"

        return enter_task_run_engine(
            self,
            parameters=parameters,
            wait_for=wait_for,
            task_runner=SequentialTaskRunner(),
            return_type=return_type,
            mapped=False,
        )

    @overload
    def _run(
        self: "Task[P, NoReturn]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> PrefectFuture[None, Sync]:
        # `NoReturn` matches if a type can't be inferred for the function which stops a
        # sync function from matching the `Coroutine` overload
        ...

    @overload
    def _run(
        self: "Task[P, Coroutine[Any, Any, T]]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> Awaitable[State[T]]:
        ...

    @overload
    def _run(
        self: "Task[P, T]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> State[T]:
        ...

    def _run(
        self,
        *args: P.args,
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        **kwargs: P.kwargs,
    ) -> Union[State, Awaitable[State]]:
        """
        Run the task and return the final state.
        """
        from prefect.engine import enter_task_run_engine
        from prefect.task_runners import SequentialTaskRunner

        # Convert the call args/kwargs to a parameter dict
        parameters = get_call_parameters(self.fn, args, kwargs)

        return enter_task_run_engine(
            self,
            parameters=parameters,
            wait_for=wait_for,
            return_type="state",
            task_runner=SequentialTaskRunner(),
            mapped=False,
        )

    @overload
    def submit(
        self: "Task[P, NoReturn]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> PrefectFuture[None, Sync]:
        # `NoReturn` matches if a type can't be inferred for the function which stops a
        # sync function from matching the `Coroutine` overload
        ...

    @overload
    def submit(
        self: "Task[P, Coroutine[Any, Any, T]]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> Awaitable[PrefectFuture[T, Async]]:
        ...

    @overload
    def submit(
        self: "Task[P, T]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> PrefectFuture[T, Sync]:
        ...

    @overload
    def submit(
        self: "Task[P, T]",
        *args: P.args,
        return_state: Literal[True],
        **kwargs: P.kwargs,
    ) -> State[T]:
        ...

    def submit(
        self,
        *args: Any,
        return_state: bool = False,
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        **kwargs: Any,
    ) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
        """
        Submit a run of the task to a worker.

        Must be called within a flow function. If writing an async task, this call must
        be awaited.

        Will create a new task run in the backing API and submit the task to the flow's
        task runner. This call only blocks execution while the task is being submitted,
        once it is submitted, the flow function will continue executing. However, note
        that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
        and they are fully resolved on submission.

        Args:
            *args: Arguments to run the task with
            return_state: Return the result of the flow run wrapped in a
            Prefect State.
            wait_for: Upstream task futures to wait for before starting the task
            **kwargs: Keyword arguments to run the task with

        Returns:
            If `return_state` is False a future allowing asynchronous access to
                the state of the task
            If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to
                the state of the task

        Examples:

            Define a task

            >>> from prefect import task
            >>> @task
            >>> def my_task():
            >>>     return "hello"

            Run a task in a flow

            >>> from prefect import flow
            >>> @flow
            >>> def my_flow():
            >>>     my_task.submit()

            Wait for a task to finish

            >>> @flow
            >>> def my_flow():
            >>>     my_task.submit().wait()

            Use the result from a task in a flow

            >>> @flow
            >>> def my_flow():
            >>>     print(my_task.submit().result())
            >>>
            >>> my_flow()
            hello

            Run an async task in an async flow

            >>> @task
            >>> async def my_async_task():
            >>>     pass
            >>>
            >>> @flow
            >>> async def my_flow():
            >>>     await my_async_task.submit()

            Run a sync task in an async flow

            >>> @flow
            >>> async def my_flow():
            >>>     my_task.submit()

            Enforce ordering between tasks that do not exchange data
            >>> @task
            >>> def task_1():
            >>>     pass
            >>>
            >>> @task
            >>> def task_2():
            >>>     pass
            >>>
            >>> @flow
            >>> def my_flow():
            >>>     x = task_1.submit()
            >>>
            >>>     # task 2 will wait for task_1 to complete
            >>>     y = task_2.submit(wait_for=[x])

        """

        from prefect.engine import enter_task_run_engine

        # Convert the call args/kwargs to a parameter dict
        parameters = get_call_parameters(self.fn, args, kwargs)
        return_type = "state" if return_state else "future"

        return enter_task_run_engine(
            self,
            parameters=parameters,
            wait_for=wait_for,
            return_type=return_type,
            task_runner=None,  # Use the flow's task runner
            mapped=False,
        )

    @overload
    def map(
        self: "Task[P, NoReturn]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> List[PrefectFuture[None, Sync]]:
        # `NoReturn` matches if a type can't be inferred for the function which stops a
        # sync function from matching the `Coroutine` overload
        ...

    @overload
    def map(
        self: "Task[P, Coroutine[Any, Any, T]]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> List[Awaitable[PrefectFuture[T, Async]]]:
        ...

    @overload
    def map(
        self: "Task[P, T]",
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> List[PrefectFuture[T, Sync]]:
        ...

    @overload
    def map(
        self: "Task[P, T]",
        *args: P.args,
        return_state: Literal[True],
        **kwargs: P.kwargs,
    ) -> List[State[T]]:
        ...

    def map(
        self,
        *args: Any,
        return_state: bool = False,
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        **kwargs: Any,
    ) -> List[Union[PrefectFuture, Awaitable[PrefectFuture]]]:
        """
        Submit a mapped run of the task to a worker.

        Must be called within a flow function. If writing an async task, this
        call must be awaited.

        Must be called with an iterable per task function argument. All
        iterables must be the same length.

        Will create as many task runs as the length of the iterable(s) in the
        backing API and submit the task runs to the flow's task runner. This
        call blocks if given a future as input while the future is resolved. It
        also blocks while the tasks are being submitted, once they are
        submitted, the flow function will continue executing. However, note
        that the `SequentialTaskRunner` does not implement parallel execution
        for sync tasks and they are fully resolved on submission.

        Args:
            *args: Iterable arguments to run the tasks with
            return_state: Return a list of Prefect States that wrap the results
                of each task run.
            wait_for: Upstream task futures to wait for before starting the task
            **kwargs: Keyword iterable arguments to run the task with

        Returns:
            A list of futures allowing asynchronous access to the state of the
            tasks

        Examples:

            Define a task

            >>> from prefect import task
            >>> @task
            >>> def my_task(x):
            >>>     return x + 1

            Run a map in a flow

            >>> from prefect import flow
            >>> @flow
            >>> def my_flow():
            >>>     my_task.map([1, 2, 3])

            Wait for mapping to finish

            >>> @flow
            >>> def my_flow():
            >>>     futures = my_task.map([1, 2, 3])
            >>>     for future in futures:
            >>>         future.wait()

            Use the result from a map in a flow

            >>> @flow
            >>> def my_flow():
            >>>     futures = my_task.map([1, 2, 3])
            >>>     for future in futures:
            >>>         future.result()
            >>> my_flow()
            [2, 3, 4]

            Enforce ordering between tasks that do not exchange data
            >>> @task
            >>> def task_1():
            >>>     pass
            >>>
            >>> @task
            >>> def task_2():
            >>>     pass
            >>>
            >>> @flow
            >>> def my_flow():
            >>>     x = task_1.submit()
            >>>
            >>>     # task 2 will wait for task_1 to complete
            >>>     y = task_2.map([1, 2, 3], wait_for=[x])
        """

        from prefect.engine import enter_task_run_engine

        # Convert the call args/kwargs to a parameter dict
        parameters = get_call_parameters(self.fn, args, kwargs)
        return_type = "state" if return_state else "future"

        return enter_task_run_engine(
            self,
            parameters=parameters,
            wait_for=wait_for,
            return_type=return_type,
            task_runner=None,
            mapped=True,
        )

Task.__call__ special

Run the task and return the result. If return_state is True returns the result is wrapped in a Prefect State which provides error handling.

Source code in prefect/tasks.py
def __call__(
    self,
    *args: P.args,
    return_state: bool = False,
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    **kwargs: P.kwargs,
):
    """
    Run the task and return the result. If `return_state` is True returns
    the result is wrapped in a Prefect State which provides error handling.
    """
    from prefect.engine import enter_task_run_engine
    from prefect.task_runners import SequentialTaskRunner

    # Convert the call args/kwargs to a parameter dict
    parameters = get_call_parameters(self.fn, args, kwargs)

    return_type = "state" if return_state else "result"

    return enter_task_run_engine(
        self,
        parameters=parameters,
        wait_for=wait_for,
        task_runner=SequentialTaskRunner(),
        return_type=return_type,
        mapped=False,
    )

Task.map

Submit a mapped run of the task to a worker.

Must be called within a flow function. If writing an async task, this call must be awaited.

Must be called with an iterable per task function argument. All iterables must be the same length.

Will create as many task runs as the length of the iterable(s) in the backing API and submit the task runs to the flow's task runner. This call blocks if given a future as input while the future is resolved. It also blocks while the tasks are being submitted, once they are submitted, the flow function will continue executing. However, note that the SequentialTaskRunner does not implement parallel execution for sync tasks and they are fully resolved on submission.

Parameters:

Name Description Default
*args

Iterable arguments to run the tasks with

Any
()
return_state

Return a list of Prefect States that wrap the results of each task run.

bool
False
wait_for

Upstream task futures to wait for before starting the task

Optional[Iterable[prefect.futures.PrefectFuture]]
None
**kwargs

Keyword iterable arguments to run the task with

Any
{}

Returns:

Type Description
List[Union[prefect.futures.PrefectFuture, Awaitable[prefect.futures.PrefectFuture]]]

A list of futures allowing asynchronous access to the state of the tasks

Examples:

Define a task

>>> from prefect import task
>>> @task
>>> def my_task(x):
>>>     return x + 1

Run a map in a flow

>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>>     my_task.map([1, 2, 3])

Wait for mapping to finish

>>> @flow
>>> def my_flow():
>>>     futures = my_task.map([1, 2, 3])
>>>     for future in futures:
>>>         future.wait()

Use the result from a map in a flow

>>> @flow
>>> def my_flow():
>>>     futures = my_task.map([1, 2, 3])
>>>     for future in futures:
>>>         future.result()
>>> my_flow()
[2, 3, 4]

Enforce ordering between tasks that do not exchange data

>>> @task
>>> def task_1():
>>>     pass
>>>
>>> @task
>>> def task_2():
>>>     pass
>>>
>>> @flow
>>> def my_flow():
>>>     x = task_1.submit()
>>>
>>>     # task 2 will wait for task_1 to complete
>>>     y = task_2.map([1, 2, 3], wait_for=[x])
Source code in prefect/tasks.py
def map(
    self,
    *args: Any,
    return_state: bool = False,
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    **kwargs: Any,
) -> List[Union[PrefectFuture, Awaitable[PrefectFuture]]]:
    """
    Submit a mapped run of the task to a worker.

    Must be called within a flow function. If writing an async task, this
    call must be awaited.

    Must be called with an iterable per task function argument. All
    iterables must be the same length.

    Will create as many task runs as the length of the iterable(s) in the
    backing API and submit the task runs to the flow's task runner. This
    call blocks if given a future as input while the future is resolved. It
    also blocks while the tasks are being submitted, once they are
    submitted, the flow function will continue executing. However, note
    that the `SequentialTaskRunner` does not implement parallel execution
    for sync tasks and they are fully resolved on submission.

    Args:
        *args: Iterable arguments to run the tasks with
        return_state: Return a list of Prefect States that wrap the results
            of each task run.
        wait_for: Upstream task futures to wait for before starting the task
        **kwargs: Keyword iterable arguments to run the task with

    Returns:
        A list of futures allowing asynchronous access to the state of the
        tasks

    Examples:

        Define a task

        >>> from prefect import task
        >>> @task
        >>> def my_task(x):
        >>>     return x + 1

        Run a map in a flow

        >>> from prefect import flow
        >>> @flow
        >>> def my_flow():
        >>>     my_task.map([1, 2, 3])

        Wait for mapping to finish

        >>> @flow
        >>> def my_flow():
        >>>     futures = my_task.map([1, 2, 3])
        >>>     for future in futures:
        >>>         future.wait()

        Use the result from a map in a flow

        >>> @flow
        >>> def my_flow():
        >>>     futures = my_task.map([1, 2, 3])
        >>>     for future in futures:
        >>>         future.result()
        >>> my_flow()
        [2, 3, 4]

        Enforce ordering between tasks that do not exchange data
        >>> @task
        >>> def task_1():
        >>>     pass
        >>>
        >>> @task
        >>> def task_2():
        >>>     pass
        >>>
        >>> @flow
        >>> def my_flow():
        >>>     x = task_1.submit()
        >>>
        >>>     # task 2 will wait for task_1 to complete
        >>>     y = task_2.map([1, 2, 3], wait_for=[x])
    """

    from prefect.engine import enter_task_run_engine

    # Convert the call args/kwargs to a parameter dict
    parameters = get_call_parameters(self.fn, args, kwargs)
    return_type = "state" if return_state else "future"

    return enter_task_run_engine(
        self,
        parameters=parameters,
        wait_for=wait_for,
        return_type=return_type,
        task_runner=None,
        mapped=True,
    )

Task.submit

Submit a run of the task to a worker.

Must be called within a flow function. If writing an async task, this call must be awaited.

Will create a new task run in the backing API and submit the task to the flow's task runner. This call only blocks execution while the task is being submitted, once it is submitted, the flow function will continue executing. However, note that the SequentialTaskRunner does not implement parallel execution for sync tasks and they are fully resolved on submission.

Parameters:

Name Description Default
*args

Arguments to run the task with

Any
()
return_state

Return the result of the flow run wrapped in a

bool
False
wait_for

Upstream task futures to wait for before starting the task

Optional[Iterable[prefect.futures.PrefectFuture]]
None
**kwargs

Keyword arguments to run the task with

Any
{}

Returns:

Type Description
Union[prefect.futures.PrefectFuture, Awaitable[prefect.futures.PrefectFuture]]

If return_state is False a future allowing asynchronous access to the state of the task If return_state is True a future wrapped in a Prefect State allowing asynchronous access to the state of the task

Examples:

Define a task

>>> from prefect import task
>>> @task
>>> def my_task():
>>>     return "hello"

Run a task in a flow

>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>>     my_task.submit()

Wait for a task to finish

>>> @flow
>>> def my_flow():
>>>     my_task.submit().wait()

Use the result from a task in a flow

>>> @flow
>>> def my_flow():
>>>     print(my_task.submit().result())
>>>
>>> my_flow()
hello

Run an async task in an async flow

>>> @task
>>> async def my_async_task():
>>>     pass
>>>
>>> @flow
>>> async def my_flow():
>>>     await my_async_task.submit()

Run a sync task in an async flow

>>> @flow
>>> async def my_flow():
>>>     my_task.submit()

Enforce ordering between tasks that do not exchange data

>>> @task
>>> def task_1():
>>>     pass
>>>
>>> @task
>>> def task_2():
>>>     pass
>>>
>>> @flow
>>> def my_flow():
>>>     x = task_1.submit()
>>>
>>>     # task 2 will wait for task_1 to complete
>>>     y = task_2.submit(wait_for=[x])
Source code in prefect/tasks.py
def submit(
    self,
    *args: Any,
    return_state: bool = False,
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    **kwargs: Any,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
    """
    Submit a run of the task to a worker.

    Must be called within a flow function. If writing an async task, this call must
    be awaited.

    Will create a new task run in the backing API and submit the task to the flow's
    task runner. This call only blocks execution while the task is being submitted,
    once it is submitted, the flow function will continue executing. However, note
    that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
    and they are fully resolved on submission.

    Args:
        *args: Arguments to run the task with
        return_state: Return the result of the flow run wrapped in a
        Prefect State.
        wait_for: Upstream task futures to wait for before starting the task
        **kwargs: Keyword arguments to run the task with

    Returns:
        If `return_state` is False a future allowing asynchronous access to
            the state of the task
        If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to
            the state of the task

    Examples:

        Define a task

        >>> from prefect import task
        >>> @task
        >>> def my_task():
        >>>     return "hello"

        Run a task in a flow

        >>> from prefect import flow
        >>> @flow
        >>> def my_flow():
        >>>     my_task.submit()

        Wait for a task to finish

        >>> @flow
        >>> def my_flow():
        >>>     my_task.submit().wait()

        Use the result from a task in a flow

        >>> @flow
        >>> def my_flow():
        >>>     print(my_task.submit().result())
        >>>
        >>> my_flow()
        hello

        Run an async task in an async flow

        >>> @task
        >>> async def my_async_task():
        >>>     pass
        >>>
        >>> @flow
        >>> async def my_flow():
        >>>     await my_async_task.submit()

        Run a sync task in an async flow

        >>> @flow
        >>> async def my_flow():
        >>>     my_task.submit()

        Enforce ordering between tasks that do not exchange data
        >>> @task
        >>> def task_1():
        >>>     pass
        >>>
        >>> @task
        >>> def task_2():
        >>>     pass
        >>>
        >>> @flow
        >>> def my_flow():
        >>>     x = task_1.submit()
        >>>
        >>>     # task 2 will wait for task_1 to complete
        >>>     y = task_2.submit(wait_for=[x])

    """

    from prefect.engine import enter_task_run_engine

    # Convert the call args/kwargs to a parameter dict
    parameters = get_call_parameters(self.fn, args, kwargs)
    return_type = "state" if return_state else "future"

    return enter_task_run_engine(
        self,
        parameters=parameters,
        wait_for=wait_for,
        return_type=return_type,
        task_runner=None,  # Use the flow's task runner
        mapped=False,
    )

Task.with_options

Create a new task from the current object, updating provided options.

Parameters:

Name Description Default
name

A new name for the task.

str
None
description

A new description for the task.

str
None
tags

A new set of tags for the task. If given, existing tags are ignored, not merged.

Iterable[str]
None
cache_key_fn

A new cache key function for the task.

Callable[[TaskRunContext, Dict[str, Any]], Optional[str]]
None
cache_expiration

A new cache expiration time for the task.

timedelta
None
retries

A new number of times to retry on task run failure.

int
0
retry_delay_seconds

A new number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.

Union[float, int]
0

Returns:

Type Description

A new Task instance.

Examples:

Create a new task from an existing task and update the name

>>> @task(name="My task")
>>> def my_task():
>>>     return 1
>>>
>>> new_task = my_task.with_options(name="My new task")

Create a new task from an existing task and update the retry settings

>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>>     x = randint(0, 5)
>>>     if x >= 3:  # Make a task that fails sometimes
>>>         raise ValueError("Retry me please!")
>>>     return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)

Use a task with updated options within a flow

>>> @task(name="My task")
>>> def my_task():
>>>     return 1
>>>
>>> @flow
>>> my_flow():
>>>     new_task = my_task.with_options(name="My new task")
>>>     new_task()
Source code in prefect/tasks.py
def with_options(
    self,
    *,
    name: str = None,
    description: str = None,
    tags: Iterable[str] = None,
    cache_key_fn: Callable[
        ["TaskRunContext", Dict[str, Any]], Optional[str]
    ] = None,
    cache_expiration: datetime.timedelta = None,
    retries: int = 0,
    retry_delay_seconds: Union[float, int] = 0,
):
    """
    Create a new task from the current object, updating provided options.

    Args:
        name: A new name for the task.
        description: A new description for the task.
        tags: A new set of tags for the task. If given, existing tags are ignored,
            not merged.
        cache_key_fn: A new cache key function for the task.
        cache_expiration: A new cache expiration time for the task.
        retries: A new number of times to retry on task run failure.
        retry_delay_seconds: A new number of seconds to wait before retrying the
            task after failure. This is only applicable if `retries` is nonzero.

    Returns:
        A new `Task` instance.

    Examples:

        Create a new task from an existing task and update the name

        >>> @task(name="My task")
        >>> def my_task():
        >>>     return 1
        >>>
        >>> new_task = my_task.with_options(name="My new task")

        Create a new task from an existing task and update the retry settings

        >>> from random import randint
        >>>
        >>> @task(retries=1, retry_delay_seconds=5)
        >>> def my_task():
        >>>     x = randint(0, 5)
        >>>     if x >= 3:  # Make a task that fails sometimes
        >>>         raise ValueError("Retry me please!")
        >>>     return x
        >>>
        >>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)

        Use a task with updated options within a flow

        >>> @task(name="My task")
        >>> def my_task():
        >>>     return 1
        >>>
        >>> @flow
        >>> my_flow():
        >>>     new_task = my_task.with_options(name="My new task")
        >>>     new_task()
    """
    return Task(
        fn=self.fn,
        name=name or self.name,
        description=description or self.description,
        tags=tags or copy(self.tags),
        cache_key_fn=cache_key_fn or self.cache_key_fn,
        cache_expiration=cache_expiration or self.cache_expiration,
        retries=retries or self.retries,
        retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
    )

task

Decorator to designate a function as a task in a Prefect workflow.

This decorator may be used for asynchronous or synchronous functions.

Parameters:

Name Description Default
name

An optional name for the task; if not provided, the name will be inferred from the given function.

str
None
description

An optional string description for the task.

str
None
tags

An optional set of tags to be associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime.

Iterable[str]
None
version

An optional string specifying the version of this task definition

str
None
cache_key_fn

An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again.

Callable[[TaskRunContext, Dict[str, Any]], Optional[str]]
None
cache_expiration

An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire.

timedelta
None
retries

An optional number of times to retry on task run failure

int
0
retry_delay_seconds

An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.

Union[float, int]
0

Returns:

Type Description

A callable Task object which, when called, will submit the task for execution.

Examples:

Define a simple task

>>> @task
>>> def add(x, y):
>>>     return x + y

Define an async task

>>> @task
>>> async def add(x, y):
>>>     return x + y

Define a task with tags and a description

>>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
>>> def my_task():
>>>     pass

Define a task with a custom name

>>> @task(name="The Ultimate Task")
>>> def my_task():
>>>     pass

Define a task that retries 3 times with a 5 second delay between attempts

>>> from random import randint
>>>
>>> @task(retries=3, retry_delay_seconds=5)
>>> def my_task():
>>>     x = randint(0, 5)
>>>     if x >= 3:  # Make a task that fails sometimes
>>>         raise ValueError("Retry me please!")
>>>     return x

Define a task that is cached for a day based on its inputs

>>> from prefect.tasks import task_input_hash
>>> from datetime import timedelta
>>>
>>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
>>> def my_task():
>>>     return "hello"
Source code in prefect/tasks.py
def task(
    __fn=None,
    *,
    name: str = None,
    description: str = None,
    tags: Iterable[str] = None,
    version: str = None,
    cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]] = None,
    cache_expiration: datetime.timedelta = None,
    retries: int = 0,
    retry_delay_seconds: Union[float, int] = 0,
):
    """
    Decorator to designate a function as a task in a Prefect workflow.

    This decorator may be used for asynchronous or synchronous functions.

    Args:
        name: An optional name for the task; if not provided, the name will be inferred
            from the given function.
        description: An optional string description for the task.
        tags: An optional set of tags to be associated with runs of this task. These
            tags are combined with any tags defined by a `prefect.tags` context at
            task runtime.
        version: An optional string specifying the version of this task definition
        cache_key_fn: An optional callable that, given the task run context and call
            parameters, generates a string key; if the key matches a previous completed
            state, that state result will be restored instead of running the task again.
        cache_expiration: An optional amount of time indicating how long cached states
            for this task should be restorable; if not provided, cached states will
            never expire.
        retries: An optional number of times to retry on task run failure
        retry_delay_seconds: An optional number of seconds to wait before retrying the
            task after failure. This is only applicable if `retries` is nonzero.

    Returns:
        A callable `Task` object which, when called, will submit the task for execution.

    Examples:
        Define a simple task

        >>> @task
        >>> def add(x, y):
        >>>     return x + y

        Define an async task

        >>> @task
        >>> async def add(x, y):
        >>>     return x + y

        Define a task with tags and a description

        >>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
        >>> def my_task():
        >>>     pass

        Define a task with a custom name

        >>> @task(name="The Ultimate Task")
        >>> def my_task():
        >>>     pass

        Define a task that retries 3 times with a 5 second delay between attempts

        >>> from random import randint
        >>>
        >>> @task(retries=3, retry_delay_seconds=5)
        >>> def my_task():
        >>>     x = randint(0, 5)
        >>>     if x >= 3:  # Make a task that fails sometimes
        >>>         raise ValueError("Retry me please!")
        >>>     return x

        Define a task that is cached for a day based on its inputs

        >>> from prefect.tasks import task_input_hash
        >>> from datetime import timedelta
        >>>
        >>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
        >>> def my_task():
        >>>     return "hello"
    """
    if __fn:
        return cast(
            Task[P, R],
            Task(
                fn=__fn,
                name=name,
                description=description,
                tags=tags,
                version=version,
                cache_key_fn=cache_key_fn,
                cache_expiration=cache_expiration,
                retries=retries,
                retry_delay_seconds=retry_delay_seconds,
            ),
        )
    else:
        return cast(
            Callable[[Callable[P, R]], Task[P, R]],
            partial(
                task,
                name=name,
                description=description,
                tags=tags,
                version=version,
                cache_key_fn=cache_key_fn,
                cache_expiration=cache_expiration,
                retries=retries,
                retry_delay_seconds=retry_delay_seconds,
            ),
        )

task_input_hash

A task cache key implementation which hashes all inputs to the task using a JSON or cloudpickle serializer. If any arguments are not JSON serializable, the pickle serializer is used as a fallback. If cloudpickle fails, this will return a null key indicating that a cache key could not be generated for the given inputs.

Parameters:

Name Description Default
context

the active TaskRunContext

TaskRunContext
required
arguments

a dictionary of arguments to be passed to the underlying task

Dict[str, Any]
required

Returns:

Type Description
Optional[str]

a string hash if hashing succeeded, else None

Source code in prefect/tasks.py
def task_input_hash(
    context: "TaskRunContext", arguments: Dict[str, Any]
) -> Optional[str]:
    """
    A task cache key implementation which hashes all inputs to the task using a JSON or
    cloudpickle serializer. If any arguments are not JSON serializable, the pickle
    serializer is used as a fallback. If cloudpickle fails, this will return a null key
    indicating that a cache key could not be generated for the given inputs.

    Arguments:
        context: the active `TaskRunContext`
        arguments: a dictionary of arguments to be passed to the underlying task

    Returns:
        a string hash if hashing succeeded, else `None`
    """
    return hash_objects(
        # We use the task key to get the qualified name for the task and include the
        # task functions `co_code` bytes to avoid caching when the underlying function
        # changes
        context.task.task_key,
        context.task.fn.__code__.co_code,
        arguments,
    )