prefect.core.flow.Flow(name, schedule=None, environment=None, storage=None, tasks=None, edges=None, reference_tasks=None, state_handlers=None, on_failure=None, validate=None, result_handler=None)[source]
The Flow class is used as the representation of a collection of dependent Tasks. Flows track Task dependencies, parameters and provide the main API for constructing and managing workflows.
Initializing Flow example:
class MyTask(Task): def run(self): return "hello" task_1 = MyTask() flow = Flow(name="my_flow", tasks=[task_1]) flow.run()
Initializing Flow as context manager example:
@task def my_task(): return "hello" with Flow("my_flow") as flow: task_1 = my_task() flow.run()
name (str): The name of the flow. Cannot be
Noneor an empty string
schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
environment (prefect.environments.Environment, optional): The environment that the flow should be run in. If
RemoteEnvironmentwill be created.
storage (prefect.environments.storage.Storage, optional): The unit of storage that the flow will be written into.
tasks ([Task], optional): If provided, a list of tasks that will initialize the flow
edges ([Edge], optional): A list of edges between tasks
reference_tasks ([Task], optional): A list of tasks that determine the final state of a flow
state_handlers (Iterable[Callable], optional): A list of state change handlers that will be called whenever the flow changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the flow instance, the old (prior) state, and the new (current) state, with the following signature:
state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]If multiple functions are passed, then the
new_stateargument will be the result of the previous handler.
on_failure (Callable, optional): A function with signature
fn(flow: Flow, state: State) -> Nonewhich will be called anytime this Flow enters a failure state
validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys) after adding the edges passed in the
edgesargument. Defaults to the value of
eager_edge_validationin your prefect configuration file.
result_handler (ResultHandler, optional): the handler to use for retrieving and storing state results during execution; if not provided, will default to the one specified in your config
prefect.core.flow.Flow.add_edge(upstream_task, downstream_task, key=None, mapped=False, validate=None)[source]
Add an edge in the flow between two tasks. All edges are directed beginning with an upstream task and ending with a downstream task.
Add a task to the flow if the task does not already exist. The tasks are uniquely identified by their
Returns a dictionary relating each task in the Flow to the set of all downstream edges for the task
Returns a dictionary relating each task in the Flow to the set of all upstream edges for the task
Adds a sequence of dependent tasks to the flow; each task should be provided as an argument (or splatted from a list).
Create and returns a copy of the current Flow.
prefect.core.flow.Flow.deploy(project_name, build=True, set_schedule_active=True, **kwargs)[source]
Deploy the flow to Prefect Cloud; if no storage is present on the Flow, the default value from your config will be used and initialized with
Get all of the tasks downstream of a task
Get all of the edges leading from a task (i.e., the downstream edges)
Get all of the edges leading to a task (i.e., the upstream edges)
prefect.core.flow.Flow.get_tasks(name=None, slug=None, tags=None, task_type=None)[source]
Helper method for retrieving tasks from this flow based on certain attributes. The intersection of all provided attributes is taken, i.e., only those tasks which match all provided conditions are returned.
Returns any parameters of the flow.
A flow's "reference tasks" are used to determine its state when it runs. If all the reference tasks are successful, then the flow run is considered successful. However, if any of the reference tasks fail, the flow is considered to fail. (Note that skips are counted as successes; see the state documentation for a full description of what is considered failure, success, etc.)
prefect.core.flow.Flow.replace(old, new, validate=True)[source]
Performs an inplace replacement of the old task with the provided new task.
Get the tasks in the flow that have no upstream dependencies; these are the tasks that, by default, flow execution begins with.
prefect.core.flow.Flow.run(parameters=None, run_on_schedule=None, runner_cls=None, **kwargs)[source]
Run the flow on its schedule using an instance of a FlowRunner. If the Flow has no schedule, a single stateful run will occur (including retries).
Creates a serialized representation of the flow.
prefect.core.flow.Flow.set_dependencies(task, upstream_tasks=None, downstream_tasks=None, keyword_tasks=None, mapped=False, validate=None)[source]
Convenience function for adding task dependencies.
Get the tasks in this flow in a sorted manner. This allows us to find if any cycles exist in this flow's DAG.
Get the tasks in the flow that have no downstream dependencies
Take all tasks and edges in another flow and add it to this flow
Get all of the tasks upstream of a task
Checks that the flow is valid.
Creates graphviz object for representing the current flow; this graphviz object will be rendered inline if called from an IPython notebook, otherwise it will be rendered in a new window. If a
This documentation was auto-generated from commit 424be6b
on October 21, 2019 at 15:48 UTC