prefect.tasks
¶
Module containing the base workflow task class and decorator - for most use cases, using the @task
decorator is preferred.
Task
¶
A Prefect task definition.
Note
We recommend using the @task
decorator for most use-cases.
Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function creates a new task run.
To preserve the input and output types, we use the generic type variables P and R for "Parameters" and "Returns" respectively.
Parameters:
Name | Description | Default |
---|---|---|
name |
An optional name for the task; if not provided, the name will be inferred from the given function. |
required |
description |
An optional string description for the task. |
required |
tags |
An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a |
required |
version |
An optional string specifying the version of this task definition |
required |
cache_key_fn |
An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again. |
required |
cache_expiration |
An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire. |
required |
retries |
An optional number of times to retry on task run failure. |
required |
retry_delay_seconds |
An optional number of seconds to wait before retrying the
task after failure. This is only applicable if |
required |
Source code in prefect/tasks.py
class Task(Generic[P, R]):
"""
A Prefect task definition.
!!! note
We recommend using [the `@task` decorator][prefect.tasks.task] for most use-cases.
Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function
creates a new task run.
To preserve the input and output types, we use the generic type variables P and R for "Parameters" and
"Returns" respectively.
Args:
name: An optional name for the task; if not provided, the name will be inferred
from the given function.
description: An optional string description for the task.
tags: An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a `prefect.tags` context at
task runtime.
version: An optional string specifying the version of this task definition
cache_key_fn: An optional callable that, given the task run context and call
parameters, generates a string key; if the key matches a previous completed
state, that state result will be restored instead of running the task again.
cache_expiration: An optional amount of time indicating how long cached states
for this task should be restorable; if not provided, cached states will
never expire.
retries: An optional number of times to retry on task run failure.
retry_delay_seconds: An optional number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
"""
# NOTE: These parameters (types, defaults, and docstrings) should be duplicated
# exactly in the @task decorator
def __init__(
self,
fn: Callable[P, R],
name: str = None,
description: str = None,
tags: Iterable[str] = None,
version: str = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
if not callable(fn):
raise TypeError("'fn' must be callable")
self.description = description or inspect.getdoc(fn)
update_wrapper(self, fn)
self.fn = fn
self.isasync = inspect.iscoroutinefunction(self.fn)
self.name = name or self.fn.__name__
self.version = version
raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])
self.tags = set(tags if tags else [])
self.task_key = to_qualified_name(self.fn)
self.cache_key_fn = cache_key_fn
self.cache_expiration = cache_expiration
# TaskRunPolicy settings
# TODO: We can instantiate a `TaskRunPolicy` and add Pydantic bound checks to
# validate that the user passes positive numbers here
self.retries = retries
self.retry_delay_seconds = retry_delay_seconds
# Warn if this task's `name` conflicts with another task while having a
# different function. This is to detect the case where two or more tasks
# share a name or are lambdas, which should result in a warning, and to
# differentiate it from the case where the task was 'copied' via
# `with_options`, which should not result in a warning.
registry = PrefectObjectRegistry.get()
if registry and any(
other
for other in registry.get_instances(Task)
if other.name == self.name and id(other.fn) != id(self.fn)
):
file = inspect.getsourcefile(self.fn)
line_number = inspect.getsourcelines(self.fn)[1]
warnings.warn(
f"A task named {self.name!r} and defined at '{file}:{line_number}' "
"conflicts with another task. Consider specifying a unique `name` "
"parameter in the task definition:\n\n "
"`@task(name='my_unique_name', ...)`"
)
def with_options(
self,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Create a new task from the current object, updating provided options.
Args:
name: A new name for the task.
description: A new description for the task.
tags: A new set of tags for the task. If given, existing tags are ignored,
not merged.
cache_key_fn: A new cache key function for the task.
cache_expiration: A new cache expiration time for the task.
retries: A new number of times to retry on task run failure.
retry_delay_seconds: A new number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A new `Task` instance.
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
"""
return Task(
fn=self.fn,
name=name or self.name,
description=description or self.description,
tags=tags or copy(self.tags),
cache_key_fn=cache_key_fn or self.cache_key_fn,
cache_expiration=cache_expiration or self.cache_expiration,
retries=retries or self.retries,
retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
)
@overload
def __call__(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> None:
# `NoReturn` matches if a type can't be inferred for the function which stops a
# sync function from matching the `Coroutine` overload
...
@overload
def __call__(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> T:
...
@overload
def __call__(
self: "Task[P, T]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> State[T]:
...
def __call__(
self,
*args: P.args,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: P.kwargs,
):
"""
Run the task and return the result. If `return_state` is True returns
the result is wrapped in a Prefect State which provides error handling.
"""
from prefect.engine import enter_task_run_engine
from prefect.task_runners import SequentialTaskRunner
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "result"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
task_runner=SequentialTaskRunner(),
return_type=return_type,
mapped=False,
)
@overload
def _run(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFuture[None, Sync]:
# `NoReturn` matches if a type can't be inferred for the function which stops a
# sync function from matching the `Coroutine` overload
...
@overload
def _run(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> Awaitable[State[T]]:
...
@overload
def _run(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> State[T]:
...
def _run(
self,
*args: P.args,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: P.kwargs,
) -> Union[State, Awaitable[State]]:
"""
Run the task and return the final state.
"""
from prefect.engine import enter_task_run_engine
from prefect.task_runners import SequentialTaskRunner
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
return_type="state",
task_runner=SequentialTaskRunner(),
mapped=False,
)
@overload
def submit(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFuture[None, Sync]:
# `NoReturn` matches if a type can't be inferred for the function which stops a
# sync function from matching the `Coroutine` overload
...
@overload
def submit(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> Awaitable[PrefectFuture[T, Async]]:
...
@overload
def submit(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFuture[T, Sync]:
...
@overload
def submit(
self: "Task[P, T]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> State[T]:
...
def submit(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
"""
Submit a run of the task to a worker.
Must be called within a flow function. If writing an async task, this call must
be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Args:
*args: Arguments to run the task with
return_state: Return the result of the flow run wrapped in a
Prefect State.
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword arguments to run the task with
Returns:
If `return_state` is False a future allowing asynchronous access to
the state of the task
If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to
the state of the task
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.submit()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task.submit().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task.submit().result())
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task.submit()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task.submit()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.submit(wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "future"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
return_type=return_type,
task_runner=None, # Use the flow's task runner
mapped=False,
)
@overload
def map(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> List[PrefectFuture[None, Sync]]:
# `NoReturn` matches if a type can't be inferred for the function which stops a
# sync function from matching the `Coroutine` overload
...
@overload
def map(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> List[Awaitable[PrefectFuture[T, Async]]]:
...
@overload
def map(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> List[PrefectFuture[T, Sync]]:
...
@overload
def map(
self: "Task[P, T]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> List[State[T]]:
...
def map(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> List[Union[PrefectFuture, Awaitable[PrefectFuture]]]:
"""
Submit a mapped run of the task to a worker.
Must be called within a flow function. If writing an async task, this
call must be awaited.
Must be called with an iterable per task function argument. All
iterables must be the same length.
Will create as many task runs as the length of the iterable(s) in the
backing API and submit the task runs to the flow's task runner. This
call blocks if given a future as input while the future is resolved. It
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution
for sync tasks and they are fully resolved on submission.
Args:
*args: Iterable arguments to run the tasks with
return_state: Return a list of Prefect States that wrap the results
of each task run.
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword iterable arguments to run the task with
Returns:
A list of futures allowing asynchronous access to the state of the
tasks
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task(x):
>>> return x + 1
Run a map in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.map([1, 2, 3])
Wait for mapping to finish
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.wait()
Use the result from a map in a flow
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.result()
>>> my_flow()
[2, 3, 4]
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.map([1, 2, 3], wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "future"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
return_type=return_type,
task_runner=None,
mapped=True,
)
Task.__call__
special
¶
Run the task and return the result. If return_state
is True returns
the result is wrapped in a Prefect State which provides error handling.
Source code in prefect/tasks.py
def __call__(
self,
*args: P.args,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: P.kwargs,
):
"""
Run the task and return the result. If `return_state` is True returns
the result is wrapped in a Prefect State which provides error handling.
"""
from prefect.engine import enter_task_run_engine
from prefect.task_runners import SequentialTaskRunner
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "result"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
task_runner=SequentialTaskRunner(),
return_type=return_type,
mapped=False,
)
Task.map
¶
Submit a mapped run of the task to a worker.
Must be called within a flow function. If writing an async task, this call must be awaited.
Must be called with an iterable per task function argument. All iterables must be the same length.
Will create as many task runs as the length of the iterable(s) in the
backing API and submit the task runs to the flow's task runner. This
call blocks if given a future as input while the future is resolved. It
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing. However, note
that the SequentialTaskRunner
does not implement parallel execution
for sync tasks and they are fully resolved on submission.
Parameters:
Name | Description | Default |
---|---|---|
*args |
Iterable arguments to run the tasks with Any |
() |
return_state |
Return a list of Prefect States that wrap the results of each task run. bool |
False |
wait_for |
Upstream task futures to wait for before starting the task Optional[Iterable[prefect.futures.PrefectFuture]] |
None |
**kwargs |
Keyword iterable arguments to run the task with Any |
{} |
Returns:
Type | Description |
---|---|
List[Union[prefect.futures.PrefectFuture, Awaitable[prefect.futures.PrefectFuture]]] |
A list of futures allowing asynchronous access to the state of the tasks |
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task(x):
>>> return x + 1
Run a map in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.map([1, 2, 3])
Wait for mapping to finish
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.wait()
Use the result from a map in a flow
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.result()
>>> my_flow()
[2, 3, 4]
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.map([1, 2, 3], wait_for=[x])
Source code in prefect/tasks.py
def map(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> List[Union[PrefectFuture, Awaitable[PrefectFuture]]]:
"""
Submit a mapped run of the task to a worker.
Must be called within a flow function. If writing an async task, this
call must be awaited.
Must be called with an iterable per task function argument. All
iterables must be the same length.
Will create as many task runs as the length of the iterable(s) in the
backing API and submit the task runs to the flow's task runner. This
call blocks if given a future as input while the future is resolved. It
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution
for sync tasks and they are fully resolved on submission.
Args:
*args: Iterable arguments to run the tasks with
return_state: Return a list of Prefect States that wrap the results
of each task run.
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword iterable arguments to run the task with
Returns:
A list of futures allowing asynchronous access to the state of the
tasks
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task(x):
>>> return x + 1
Run a map in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.map([1, 2, 3])
Wait for mapping to finish
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.wait()
Use the result from a map in a flow
>>> @flow
>>> def my_flow():
>>> futures = my_task.map([1, 2, 3])
>>> for future in futures:
>>> future.result()
>>> my_flow()
[2, 3, 4]
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.map([1, 2, 3], wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "future"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
return_type=return_type,
task_runner=None,
mapped=True,
)
Task.submit
¶
Submit a run of the task to a worker.
Must be called within a flow function. If writing an async task, this call must be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the SequentialTaskRunner
does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Parameters:
Name | Description | Default |
---|---|---|
*args |
Arguments to run the task with Any |
() |
return_state |
Return the result of the flow run wrapped in a bool |
False |
wait_for |
Upstream task futures to wait for before starting the task Optional[Iterable[prefect.futures.PrefectFuture]] |
None |
**kwargs |
Keyword arguments to run the task with Any |
{} |
Returns:
Type | Description |
---|---|
Union[prefect.futures.PrefectFuture, Awaitable[prefect.futures.PrefectFuture]] |
If |
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.submit()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task.submit().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task.submit().result())
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task.submit()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task.submit()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.submit(wait_for=[x])
Source code in prefect/tasks.py
def submit(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
**kwargs: Any,
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
"""
Submit a run of the task to a worker.
Must be called within a flow function. If writing an async task, this call must
be awaited.
Will create a new task run in the backing API and submit the task to the flow's
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing. However, note
that the `SequentialTaskRunner` does not implement parallel execution for sync tasks
and they are fully resolved on submission.
Args:
*args: Arguments to run the task with
return_state: Return the result of the flow run wrapped in a
Prefect State.
wait_for: Upstream task futures to wait for before starting the task
**kwargs: Keyword arguments to run the task with
Returns:
If `return_state` is False a future allowing asynchronous access to
the state of the task
If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to
the state of the task
Examples:
Define a task
>>> from prefect import task
>>> @task
>>> def my_task():
>>> return "hello"
Run a task in a flow
>>> from prefect import flow
>>> @flow
>>> def my_flow():
>>> my_task.submit()
Wait for a task to finish
>>> @flow
>>> def my_flow():
>>> my_task.submit().wait()
Use the result from a task in a flow
>>> @flow
>>> def my_flow():
>>> print(my_task.submit().result())
>>>
>>> my_flow()
hello
Run an async task in an async flow
>>> @task
>>> async def my_async_task():
>>> pass
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task.submit()
Run a sync task in an async flow
>>> @flow
>>> async def my_flow():
>>> my_task.submit()
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.submit(wait_for=[x])
"""
from prefect.engine import enter_task_run_engine
# Convert the call args/kwargs to a parameter dict
parameters = get_call_parameters(self.fn, args, kwargs)
return_type = "state" if return_state else "future"
return enter_task_run_engine(
self,
parameters=parameters,
wait_for=wait_for,
return_type=return_type,
task_runner=None, # Use the flow's task runner
mapped=False,
)
Task.with_options
¶
Create a new task from the current object, updating provided options.
Parameters:
Name | Description | Default |
---|---|---|
name |
A new name for the task. str |
None |
description |
A new description for the task. str |
None |
tags |
A new set of tags for the task. If given, existing tags are ignored, not merged. Iterable[str] |
None |
cache_key_fn |
A new cache key function for the task. Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] |
None |
cache_expiration |
A new cache expiration time for the task. timedelta |
None |
retries |
A new number of times to retry on task run failure. int |
0 |
retry_delay_seconds |
A new number of seconds to wait before retrying the
task after failure. This is only applicable if Union[float, int] |
0 |
Returns:
Type | Description |
---|---|
A new |
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
Source code in prefect/tasks.py
def with_options(
self,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
cache_key_fn: Callable[
["TaskRunContext", Dict[str, Any]], Optional[str]
] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Create a new task from the current object, updating provided options.
Args:
name: A new name for the task.
description: A new description for the task.
tags: A new set of tags for the task. If given, existing tags are ignored,
not merged.
cache_key_fn: A new cache key function for the task.
cache_expiration: A new cache expiration time for the task.
retries: A new number of times to retry on task run failure.
retry_delay_seconds: A new number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A new `Task` instance.
Examples:
Create a new task from an existing task and update the name
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
Create a new task from an existing task and update the retry settings
>>> from random import randint
>>>
>>> @task(retries=1, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
>>>
>>> new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
Use a task with updated options within a flow
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> @flow
>>> my_flow():
>>> new_task = my_task.with_options(name="My new task")
>>> new_task()
"""
return Task(
fn=self.fn,
name=name or self.name,
description=description or self.description,
tags=tags or copy(self.tags),
cache_key_fn=cache_key_fn or self.cache_key_fn,
cache_expiration=cache_expiration or self.cache_expiration,
retries=retries or self.retries,
retry_delay_seconds=retry_delay_seconds or self.retry_delay_seconds,
)
task
¶
Decorator to designate a function as a task in a Prefect workflow.
This decorator may be used for asynchronous or synchronous functions.
Parameters:
Name | Description | Default |
---|---|---|
name |
An optional name for the task; if not provided, the name will be inferred from the given function. str |
None |
description |
An optional string description for the task. str |
None |
tags |
An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a Iterable[str] |
None |
version |
An optional string specifying the version of this task definition str |
None |
cache_key_fn |
An optional callable that, given the task run context and call parameters, generates a string key; if the key matches a previous completed state, that state result will be restored instead of running the task again. Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] |
None |
cache_expiration |
An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire. timedelta |
None |
retries |
An optional number of times to retry on task run failure int |
0 |
retry_delay_seconds |
An optional number of seconds to wait before retrying the
task after failure. This is only applicable if Union[float, int] |
0 |
Returns:
Type | Description |
---|---|
A callable |
Examples:
Define a simple task
>>> @task
>>> def add(x, y):
>>> return x + y
Define an async task
>>> @task
>>> async def add(x, y):
>>> return x + y
Define a task with tags and a description
>>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
>>> def my_task():
>>> pass
Define a task with a custom name
>>> @task(name="The Ultimate Task")
>>> def my_task():
>>> pass
Define a task that retries 3 times with a 5 second delay between attempts
>>> from random import randint
>>>
>>> @task(retries=3, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
Define a task that is cached for a day based on its inputs
>>> from prefect.tasks import task_input_hash
>>> from datetime import timedelta
>>>
>>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
>>> def my_task():
>>> return "hello"
Source code in prefect/tasks.py
def task(
__fn=None,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
version: str = None,
cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]] = None,
cache_expiration: datetime.timedelta = None,
retries: int = 0,
retry_delay_seconds: Union[float, int] = 0,
):
"""
Decorator to designate a function as a task in a Prefect workflow.
This decorator may be used for asynchronous or synchronous functions.
Args:
name: An optional name for the task; if not provided, the name will be inferred
from the given function.
description: An optional string description for the task.
tags: An optional set of tags to be associated with runs of this task. These
tags are combined with any tags defined by a `prefect.tags` context at
task runtime.
version: An optional string specifying the version of this task definition
cache_key_fn: An optional callable that, given the task run context and call
parameters, generates a string key; if the key matches a previous completed
state, that state result will be restored instead of running the task again.
cache_expiration: An optional amount of time indicating how long cached states
for this task should be restorable; if not provided, cached states will
never expire.
retries: An optional number of times to retry on task run failure
retry_delay_seconds: An optional number of seconds to wait before retrying the
task after failure. This is only applicable if `retries` is nonzero.
Returns:
A callable `Task` object which, when called, will submit the task for execution.
Examples:
Define a simple task
>>> @task
>>> def add(x, y):
>>> return x + y
Define an async task
>>> @task
>>> async def add(x, y):
>>> return x + y
Define a task with tags and a description
>>> @task(tags={"a", "b"}, description="This task is empty but its my first!")
>>> def my_task():
>>> pass
Define a task with a custom name
>>> @task(name="The Ultimate Task")
>>> def my_task():
>>> pass
Define a task that retries 3 times with a 5 second delay between attempts
>>> from random import randint
>>>
>>> @task(retries=3, retry_delay_seconds=5)
>>> def my_task():
>>> x = randint(0, 5)
>>> if x >= 3: # Make a task that fails sometimes
>>> raise ValueError("Retry me please!")
>>> return x
Define a task that is cached for a day based on its inputs
>>> from prefect.tasks import task_input_hash
>>> from datetime import timedelta
>>>
>>> @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
>>> def my_task():
>>> return "hello"
"""
if __fn:
return cast(
Task[P, R],
Task(
fn=__fn,
name=name,
description=description,
tags=tags,
version=version,
cache_key_fn=cache_key_fn,
cache_expiration=cache_expiration,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
),
)
else:
return cast(
Callable[[Callable[P, R]], Task[P, R]],
partial(
task,
name=name,
description=description,
tags=tags,
version=version,
cache_key_fn=cache_key_fn,
cache_expiration=cache_expiration,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
),
)
task_input_hash
¶
A task cache key implementation which hashes all inputs to the task using a JSON or cloudpickle serializer. If any arguments are not JSON serializable, the pickle serializer is used as a fallback. If cloudpickle fails, this will return a null key indicating that a cache key could not be generated for the given inputs.
Parameters:
Name | Description | Default |
---|---|---|
context |
the active TaskRunContext |
required |
arguments |
a dictionary of arguments to be passed to the underlying task Dict[str, Any] |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
a string hash if hashing succeeded, else |
Source code in prefect/tasks.py
def task_input_hash(
context: "TaskRunContext", arguments: Dict[str, Any]
) -> Optional[str]:
"""
A task cache key implementation which hashes all inputs to the task using a JSON or
cloudpickle serializer. If any arguments are not JSON serializable, the pickle
serializer is used as a fallback. If cloudpickle fails, this will return a null key
indicating that a cache key could not be generated for the given inputs.
Arguments:
context: the active `TaskRunContext`
arguments: a dictionary of arguments to be passed to the underlying task
Returns:
a string hash if hashing succeeded, else `None`
"""
return hash_objects(
# We use the task key to get the qualified name for the task and include the
# task functions `co_code` bytes to avoid caching when the underlying function
# changes
context.task.task_key,
context.task.fn.__code__.co_code,
arguments,
)