Flow


Flow

class

prefect.core.flow.Flow

(name, schedule=None, environment=None, storage=None, tasks=None, edges=None, reference_tasks=None, state_handlers=None, on_failure=None, validate=None, result_handler=None)[source]

The Flow class is used as the representation of a collection of dependent Tasks. Flows track Task dependencies, parameters and provide the main API for constructing and managing workflows.

Initializing Flow example:

class MyTask(Task):
    def run(self):
        return "hello"

task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1])

flow.run()

Initializing Flow as context manager example:

@task
def my_task():
    return "hello"

with Flow("my_flow") as flow:
    task_1 = my_task()

flow.run()

Args:

  • name (str): The name of the flow. Cannot be None or an empty string
  • schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
  • environment (prefect.environments.Environment, optional): The environment that the flow should be run in. If None, a RemoteEnvironment will be created.
  • storage (prefect.environments.storage.Storage, optional): The unit of storage that the flow will be written into.
  • tasks ([Task], optional): If provided, a list of tasks that will initialize the flow
  • edges ([Edge], optional): A list of edges between tasks
  • reference_tasks ([Task], optional): A list of tasks that determine the final state of a flow
  • state_handlers (Iterable[Callable], optional): A list of state change handlers that will be called whenever the flow changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the flow instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.
  • on_failure (Callable, optional): A function with signature fn(flow: Flow, state: State) -> None which will be called anytime this Flow enters a failure state
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys) after adding the edges passed in the edges argument. Defaults to the value of eager_edge_validation in your prefect configuration file.
  • result_handler (ResultHandler, optional): the handler to use for retrieving and storing state results during execution; if not provided, will default to the one specified in your config

methods:                                                                                                                                                       

prefect.core.flow.Flow.add_edge

(upstream_task, downstream_task, key=None, mapped=False, validate=None)[source]

Add an edge in the flow between two tasks. All edges are directed beginning with an upstream task and ending with a downstream task.

Args:

  • upstream_task (Task): The task that the edge should start from
  • downstream_task (Task): The task that the edge should end with
  • key (str, optional): The key to be set for the new edge; the result of the upstream task will be passed to the downstream task's run() method under this keyword argument
  • mapped (bool, optional): Whether this edge represents a call to Task.map(); defaults to False
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys). Defaults to the value of eager_edge_validation in your prefect configuration file.
Returns:
  • prefect.core.edge.Edge: The Edge object that was successfully added to the flow
Raises:
  • ValueError: if the downstream_task is of type Parameter
  • ValueError: if the edge exists with this key and downstream_task

prefect.core.flow.Flow.add_task

(task)[source]

Add a task to the flow if the task does not already exist. The tasks are uniquely identified by their slug.

Args:

  • task (Task): the new Task to be added to the flow
Returns:
  • Task: the Task object passed in if the task was successfully added
Raises:
  • TypeError: if the task is not of type Task
  • ValueError: if the task.slug matches that of a task already in the flow

prefect.core.flow.Flow.all_downstream_edges

()[source]

Returns a dictionary relating each task in the Flow to the set of all downstream edges for the task

Returns:

  • dict with the key as tasks and the value as a set of downstream edges

prefect.core.flow.Flow.all_upstream_edges

()[source]

Returns a dictionary relating each task in the Flow to the set of all upstream edges for the task

Returns:

  • dict with the key as tasks and the value as a set of upstream edges

prefect.core.flow.Flow.chain

(*tasks, validate=None)[source]

Adds a sequence of dependent tasks to the flow; each task should be provided as an argument (or splatted from a list).

Args:

  • *tasks (list): A list of tasks to chain together
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles). Defaults to the value of eager_edge_validation in your prefect configuration file.
Returns:
  • A list of Edge objects added to the flow

prefect.core.flow.Flow.copy

()[source]

Create and returns a copy of the current Flow.

prefect.core.flow.Flow.deploy

(project_name, build=True, set_schedule_active=True, **kwargs)[source]

Deploy the flow to Prefect Cloud; if no storage is present on the Flow, the default value from your config will be used and initialized with **kwargs.

Args:

  • project_name (str): the project that should contain this flow.
  • build (bool, optional): if True, the flow's environment is built prior to serialization; defaults to True
  • set_schedule_active (bool, optional): if False, will set the schedule to inactive in the database to prevent auto-scheduling runs (if the Flow has a schedule). Defaults to True. This can be changed later.
  • **kwargs (Any): if instantiating a Storage object from default settings, these keyword arguments will be passed to the initialization method of the default Storage class
Returns:
  • str: the ID of the flow that was deployed

prefect.core.flow.Flow.downstream_tasks

(task)[source]

Get all of the tasks downstream of a task

Args:

  • task (Task): The task that we want to find downstream tasks from
Returns:
  • set of Task objects which are downstream of task

prefect.core.flow.Flow.edges_from

(task)[source]

Get all of the edges leading from a task (i.e., the downstream edges)

Args:

  • task (Task): The task that we want to find edges leading from
Returns:
  • dict with the key as the task passed in and the value as a set of all edges leading from that task
Raises:
  • ValueError: if task is not found in this flow

prefect.core.flow.Flow.edges_to

(task)[source]

Get all of the edges leading to a task (i.e., the upstream edges)

Args:

  • task (Task): The task that we want to find edges leading to
Returns:
  • dict with the key as the task passed in and the value as a set of all edges leading to that task
Raises:
  • ValueError: if task is not found in this flow

prefect.core.flow.Flow.get_tasks

(name=None, slug=None, tags=None, task_type=None)[source]

Helper method for retrieving tasks from this flow based on certain attributes. The intersection of all provided attributes is taken, i.e., only those tasks which match all provided conditions are returned.

Args:

  • name (str, optional): the name of the task
  • slug (str, optional): the slug of the task
  • tags ([str], optional): an iterable of task tags
  • task_type (type, optional): a possible task class type
Returns:
  • [Task]: a list of tasks that meet the required conditions

prefect.core.flow.Flow.parameters

()[source]

Returns any parameters of the flow.

Returns:

  • set: a set of any Parameters in this flow

prefect.core.flow.Flow.reference_tasks

()[source]

A flow's "reference tasks" are used to determine its state when it runs. If all the reference tasks are successful, then the flow run is considered successful. However, if any of the reference tasks fail, the flow is considered to fail. (Note that skips are counted as successes; see the state documentation for a full description of what is considered failure, success, etc.)

By default, a flow's reference tasks are its terminal tasks. This means the state of a flow is determined by those tasks that have no downstream dependencies.

In some situations, users may want to customize this behavior; for example, if a flow's terminal tasks are "clean up" tasks for the rest of the flow that only run if certain (more relevant) tasks fail, we might not want them determining the overall state of the flow run. The flow.set_reference_tasks() method can be used to set such custom reference_tasks.

Please note that even if reference_tasks are provided that are not terminal tasks, the flow will not be considered "finished" until all terminal tasks have completed. Only then will state be determined, using the reference tasks.

Returns:

  • set of Task objects which are the reference tasks in the flow

prefect.core.flow.Flow.replace

(old, new, validate=True)[source]

Performs an inplace replacement of the old task with the provided new task.

Args:

  • old (Task): the old task to replace
  • new (Task): the new task to replace the old with; if not a Prefect Task, Prefect will attempt to convert it to one
  • validate (boolean, optional): whether to validate the Flow after the replace has been completed; defaults to True
Raises:
  • ValueError: if the old task is not a part of this flow

prefect.core.flow.Flow.root_tasks

()[source]

Get the tasks in the flow that have no upstream dependencies; these are the tasks that, by default, flow execution begins with.

Returns:

  • set of Task objects that have no upstream dependencies

prefect.core.flow.Flow.run

(parameters=None, run_on_schedule=None, runner_cls=None, **kwargs)[source]

Run the flow on its schedule using an instance of a FlowRunner. If the Flow has no schedule, a single stateful run will occur (including retries).

Note that this command will block and run this Flow on its schedule indefinitely (if it has one); all task states will be stored in memory, and task retries will not occur until every Task in the Flow has had a chance to run.

Args:

  • parameters (Dict[str, Any], optional): values to pass into the runner
  • run_on_schedule (bool, optional): whether to run this flow on its schedule, or simply run a single execution; if not provided, will default to the value set in your user config
  • runner_cls (type): an optional FlowRunner class (will use the default if not provided)
  • **kwargs: additional keyword arguments; if any provided keywords match known parameter names, they will be used as such. Otherwise they will be passed to the FlowRunner.run() method
Raises:
  • ValueError: if this Flow has a Schedule with no more scheduled runs
  • ValueError: if the return_tasks keyword argument is provided
Returns:
  • State: the state of the flow after its final run

prefect.core.flow.Flow.serialize

(build=False)[source]

Creates a serialized representation of the flow.

Args:

  • build (bool, optional): if True, the flow's environment is built prior to serialization
Returns:
  • dict representing the flow

prefect.core.flow.Flow.set_dependencies

(task, upstream_tasks=None, downstream_tasks=None, keyword_tasks=None, mapped=False, validate=None)[source]

Convenience function for adding task dependencies.

Args:

  • task (object): a Task that will become part of the Flow. If the task is not a Task subclass, Prefect will attempt to convert it to one.
  • upstream_tasks ([object], optional): Tasks that will run before the task runs. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • downstream_tasks ([object], optional): Tasks that will run after the task runs. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • keyword_tasks ({key: object}, optional): The results of these tasks will be provided to the task under the specified keyword arguments. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • mapped (bool, optional): Whether the upstream tasks (both keyed and non-keyed) should be mapped over; defaults to False. If True, any tasks wrapped in the prefect.utilities.tasks.unmapped container will not be mapped over.
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles). Defaults to the value of eager_edge_validation in your Prefect configuration file.
Returns:
  • None

prefect.core.flow.Flow.set_reference_tasks

(tasks)[source]

Sets the reference_tasks for the flow. See flow.reference_tasks for more details.

Args:

  • tasks ([Task]): the tasks that should be set as a flow's reference tasks
Returns:
  • None

prefect.core.flow.Flow.sorted_tasks

(root_tasks=None)[source]

Get the tasks in this flow in a sorted manner. This allows us to find if any cycles exist in this flow's DAG.

Args:

  • root_tasks ([Tasks], optional): an Iterable of Task objects to start the sorting from
Returns:
  • tuple of task objects that were sorted
Raises:
  • ValueError: if a cycle is found in the flow's DAG

prefect.core.flow.Flow.terminal_tasks

()[source]

Get the tasks in the flow that have no downstream dependencies

Returns:

  • set of Task objects that have no downstream dependencies

prefect.core.flow.Flow.update

(flow, validate=None)[source]

Take all tasks and edges in another flow and add it to this flow

Args:

  • flow (Flow): A flow which is used to update this flow
  • validate (bool, optional): Whether or not to check the validity of the flow
Returns:
  • None

prefect.core.flow.Flow.upstream_tasks

(task)[source]

Get all of the tasks upstream of a task

Args:

  • task (Task): The task that we want to find upstream tasks of
Returns:
  • set of Task objects which are upstream of task

prefect.core.flow.Flow.validate

()[source]

Checks that the flow is valid.

Returns:

  • None
Raises:
  • ValueError: if edges refer to tasks that are not in this flow
  • ValueError: if specified reference tasks are not in this flow
  • ValueError: if any tasks do not have assigned IDs

prefect.core.flow.Flow.visualize

(flow_state=None, filename=None)[source]

Creates graphviz object for representing the current flow; this graphviz object will be rendered inline if called from an IPython notebook, otherwise it will be rendered in a new window. If a filename is provided, the object will not be rendered and instead saved to the location specified.

Args:

  • flow_state (State, optional): flow state object used to optionally color the nodes
  • filename (str, optional): a filename specifying a location to save this visualization to; if provided, the visualization will not be rendered automatically
Raises:
  • ImportError: if graphviz is not installed



This documentation was auto-generated from commit 424be6b
on October 21, 2019 at 15:48 UTC