# Task


# Task

class

prefect.core.task.Task

(name=None, slug=None, tags=None, max_retries=None, retry_delay=None, retry_on=None, timeout=None, trigger=None, skip_on_upstream_skip=True, cache_for=None, cache_validator=None, cache_key=None, checkpoint=None, state_handlers=None, on_failure=None, log_stdout=False, result=None, target=None, task_run_name=None, nout=None)[source]

The Task class which is used as the full representation of a unit of work.

This Task class can be used directly as a first class object where it must be inherited from by a class that implements the run method. For a more functional way of generating Tasks, see the task decorator.

Inheritance example:

class AddTask(Task):
    def run(self, x, y):
        return x + y

Note: The implemented run method cannot have *args in its signature. In addition, the following keywords are reserved: upstream_tasks, task_args and mapped.

An instance of a Task can be used functionally to generate other task instances with the same attributes but with different values bound to their run methods.

Example:

class AddTask(Task):
    def run(self, x, y):
        return x + y

a = AddTask()

with Flow("My Flow") as f:
    t1 = a(1, 2) # t1 != a
    t2 = a(5, 7) # t2 != a

To bind values to a Task's run method imperatively (and without making a copy), see Task.bind.

Args:

  • name (str, optional): The name of this task
  • slug (str, optional): The slug for this task. Slugs provide a stable ID for tasks so that the Prefect API can identify task run states. If a slug is not provided, one will be generated automatically once the task is added to a Flow.
  • tags ([str], optional): A list of tags for this task
  • max_retries (int, optional): The maximum amount of times this task can be retried
  • retry_delay (timedelta, optional): The amount of time to wait until task is retried
  • retry_on (Union[Exception, Iterable[Type[Exception]]], optional): Exception types that will allow retry behavior to occur. If not set, all exceptions will allow retries. If set, retries will only occur if the exception is a subtype of the exception types provided.
  • timeout (Union[int, timedelta], optional): The amount of time (in seconds) to wait while running this task before a timeout occurs; note that sub-second resolution is not supported, even when passing in a timedelta.
  • trigger (callable, optional): a function that determines whether the task should run, based on the states of any upstream tasks.
  • skip_on_upstream_skip (bool, optional): if True, if any immediately upstream tasks are skipped, this task will automatically be skipped as well, regardless of trigger. By default, this prevents tasks from attempting to use either state or data from tasks that didn't run. If False, the task's trigger will be called as normal, with skips considered successes. Defaults to True.
  • cache_for (timedelta, optional): The amount of time to maintain a cache of the outputs of this task. Useful for situations where the containing Flow will be rerun multiple times, but this task doesn't need to be.
  • cache_validator (Callable, optional): Validator that will determine whether the cache for this task is still valid (only required if cache_for is provided; defaults to prefect.engine.cache_validators.duration_only)
  • cache_key (str, optional): if provided, a cache_key serves as a unique identifier for this Task's cache, and can be shared across both Tasks and Flows; if not provided, the Task's name will be used if running locally, or the Task's database ID if running in Cloud
  • checkpoint (bool, optional): if this Task is successful, whether to store its result using the configured result available during the run; Also note that checkpointing will only occur locally if prefect.config.flows.checkpointing is set to True
  • result (Result, optional): the result instance used to retrieve and store task results during execution
  • target (Union[str, Callable], optional): location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support.
  • state_handlers (Iterable[Callable], optional): A list of state change handlers that will be called whenever the task changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the task instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.
  • on_failure (Callable, optional): A function with signature fn(task: Task, state: State) -> None that will be called anytime this Task enters a failure state
  • log_stdout (bool, optional): Toggle whether or not to send stdout messages to the Prefect logger. Defaults to False.
  • task_run_name (Union[str, Callable], optional): a name to set for this task at runtime. task_run_name strings can be templated formatting strings which will be formatted at runtime with values from task arguments, prefect.context, and flow parameters (in the case of a name conflict between these, earlier values take precedence). If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support. Note: this only works for tasks running against a backend API.
  • nout (int, optional): for tasks that return multiple results, the number of outputs to expect. If not provided, will be inferred from the task return annotation, if possible. Note that nout=1 implies the task returns a tuple of one value (leave as None for non-tuple return types).
Raises:
  • TypeError: if tags is of type str
  • TypeError: if timeout is not of type int
  • TypeError: if positional-only parameters are present in task's signature

methods:                                                                                                                                                       

prefect.core.task.Task.bind

(*args, mapped=False, upstream_tasks=None, flow=None, **kwargs)[source]

Binding a task to (keyword) arguments creates a keyed edge in the active Flow that will pass data from the arguments (whether Tasks or constants) to the Task's run method under the appropriate key. Once a Task is bound in this manner, the same task instance cannot be bound a second time in the same Flow.

To bind arguments to a copy of this Task instance, see __call__. Additionally, non-keyed edges can be created by passing any upstream dependencies through upstream_tasks.

Args:

  • *args: arguments to bind to the current Task's run method
  • mapped (bool, optional): Whether the results of these tasks should be mapped over with the specified keyword arguments; defaults to False. If True, any arguments contained within a prefect.utilities.edges.unmapped container will not be mapped over.
  • upstream_tasks ([Task], optional): a list of upstream dependencies for the current task.
  • flow (Flow, optional): The flow to set dependencies on, defaults to the current flow in context if no flow is specified
  • **kwargs: keyword arguments to bind to the current Task's run method
Returns:
  • Task: the current Task instance

prefect.core.task.Task.copy

(**task_args)[source]

Creates and returns a copy of the current Task.

Args:

  • **task_args (dict, optional): a dictionary of task attribute keyword arguments, these attributes will be set on the new copy
Raises:
  • AttributeError: if any passed task_args are not attributes of the original
Returns:
  • Task: a copy of the current Task, with any attributes updated from task_args

prefect.core.task.Task.inputs

()[source]

Describe the inputs for this task. The result is a dictionary that maps each input to a type, required, and default. All values are inferred from the run() signature; this method can be overloaded for more precise control.

Returns:

  • dict

prefect.core.task.Task.is_equal

(other)[source]

Produces a Task that evaluates self == other

This can't be implemented as the eq() magic method because of Task comparisons.

Args:

  • other (object): the other operand of the operator. It will be converted to a Task if it isn't one already.
Returns:
  • Task

prefect.core.task.Task.is_not_equal

(other)[source]

Produces a Task that evaluates self != other

This can't be implemented as the neq() magic method because of Task comparisons.

Args:

  • other (object): the other operand of the operator. It will be converted to a Task if it isn't one already.
Returns:
  • Task

prefect.core.task.Task.map

(*args, upstream_tasks=None, flow=None, task_args=None, **kwargs)[source]

Map the Task elementwise across one or more Tasks. Arguments that should not be mapped over should be placed in the prefect.utilities.edges.unmapped container.

For example:

    task.map(x=X, y=unmapped(Y))


will map over the values of X, but not over the values of Y


Args:
  • *args: arguments to map over, which will elementwise be bound to the Task's run method
  • upstream_tasks ([Task], optional): a list of upstream dependencies to map over
  • flow (Flow, optional): The flow to set dependencies on, defaults to the current flow in context if no flow is specified
  • task_args (dict, optional): a dictionary of task attribute keyword arguments, these attributes will be set on the new copy
  • **kwargs: keyword arguments to map over, which will elementwise be bound to the Task's run method
Raises:
  • AttributeError: if any passed task_args are not attributes of the original
Returns:
  • Task: a new Task instance

prefect.core.task.Task.not_

()[source]

Produces a Task that evaluates not self

Returns:

  • Task

prefect.core.task.Task.or_

(other)[source]

Produces a Task that evaluates self or other

Args:

  • other (object): the other operand of the operator. It will be converted to a Task if it isn't one already.
Returns:
  • Task

prefect.core.task.Task.outputs

()[source]

Get the output types for this task.

Returns:

  • Any

prefect.core.task.Task.pipe

(_prefect_self, _prefect_task, **kwargs)[source]

"Pipes" the result of this task through another task. some_task().pipe(other_task) is equivalent to other_task(some_task()), but can result in more readable code when used in a long chain of task calls.

Args:

  • _prefect_task: The task to execute after this task.
  • **kwargs: Additional keyword arguments to include as task arguments.
Returns:
  • Task: A new task with the new arguments bound to it.

prefect.core.task.Task.run

()[source]

The run() method is called (with arguments, if appropriate) to run a task.

Note: The implemented run method cannot have *args in its signature. In addition, the following keywords are reserved: upstream_tasks, task_args and mapped.

If a task has arguments in its run() method, these can be bound either by using the functional API and calling the task instance, or by using self.bind directly.

In addition to running arbitrary functions, tasks can interact with Prefect in a few ways:

  • Return an optional result. When this function runs successfully, the task is considered successful and the result (if any) can be made available to downstream tasks.
  • Raise an error. Errors are interpreted as failure.
  • Raise a signal. Signals can include FAIL, SUCCESS, RETRY, SKIP, etc. and indicate that the task should be put in the indicated state.
    • FAIL will lead to retries if appropriate
    • SUCCESS will cause the task to be marked successful
    • RETRY will cause the task to be marked for retry, even if max_retries has been exceeded
    • SKIP will skip the task and possibly propogate the skip state through the flow, depending on whether downstream tasks have skip_on_upstream_skip=True.

prefect.core.task.Task.serialize

()[source]

Creates a serialized representation of this task

Returns:

  • dict representing this task

prefect.core.task.Task.set_dependencies

(flow=None, upstream_tasks=None, downstream_tasks=None, keyword_tasks=None, mapped=False, validate=None)[source]

Set dependencies for a flow either specified or in the current context using this task

Args:

  • flow (Flow, optional): The flow to set dependencies on, defaults to the current flow in context if no flow is specified
  • upstream_tasks ([object], optional): A list of upstream tasks for this task
  • downstream_tasks ([object], optional): A list of downtream tasks for this task
  • keyword_tasks ({str, object}}, optional): The results of these tasks will be provided to this task under the specified keyword arguments.
  • mapped (bool, optional): Whether the results of the upstream tasks should be mapped over with the specified keyword arguments
  • validate (bool, optional): Whether or not to check the validity of the flow. If not provided, defaults to the value of eager_edge_validation in your Prefect configuration file.
Returns:
  • self
Raises:
  • ValueError: if no flow is specified and no flow can be found in the current context

prefect.core.task.Task.set_downstream

(task, flow=None, key=None, mapped=False)[source]

Sets the provided task as a downstream dependency of this task.

Args:

  • task (Task): A task that will be set as a downstream dependency of this task.
  • flow (Flow, optional): The flow to set dependencies on, defaults to the current flow in context if no flow is specified
  • key (str, optional): The key to be set for the new edge; the result of this task will be passed to the downstream task's run() method under this keyword argument.
  • mapped (bool, optional): Whether this dependency is mapped; defaults to False
Returns:
  • self
Raises:
  • ValueError: if no flow is specified and no flow can be found in the current context

prefect.core.task.Task.set_upstream

(task, flow=None, key=None, mapped=False)[source]

Sets the provided task as an upstream dependency of this task.

Args:

  • task (object): A task or object that will be converted to a task that will be set as a upstream dependency of this task.
  • flow (Flow, optional): The flow to set dependencies on, defaults to the current flow in context if no flow is specified
  • key (str, optional): The key to be set for the new edge; the result of the upstream task will be passed to this task's run() method under this keyword argument.
  • mapped (bool, optional): Whether this dependency is mapped; defaults to False
Returns:
  • self
Raises:
  • ValueError: if no flow is specified and no flow can be found in the current context



This documentation was auto-generated from commit ffa9a6c
on February 1, 2023 at 18:44 UTC