Run flows in local processes
Create a deployment for a flow by calling the serve
method.
The simplest way to create a deployment for your
flow is by calling its serve
method.
Serve a flow
The serve method creates a deployment for the flow and starts a long-running process that monitors for work from the Prefect server. When work is found, it is executed within its own isolated subprocess.
from prefect import flow
@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
print(f"Hello {name} from Prefect! 🤗")
if goodbye:
print(f"Goodbye {name}!")
if __name__ == "__main__":
# creates a deployment and stays running to monitor for work instructions
# generated on the server
hello_world.serve(name="my-first-deployment",
tags=["onboarding"],
parameters={"goodbye": True},
interval=60)
This interface provides the configuration for a deployment (with no strong infrastructure requirements), such as:
- schedules
- event triggers
- metadata such as tags and description
- default parameter values
Schedules are auto-paused on shutdown
By default, stopping the process running flow.serve
will pause the schedule
for the deployment (if it has one).
When running this in environments where restarts are expected use the
pause_on_shutdown=False
flag to prevent this behavior:
if __name__ == "__main__":
hello_world.serve(name="my-first-deployment",
tags=["onboarding"],
parameters={"goodbye": True},
pause_on_shutdown=False,
interval=60)
Additional serve options
The serve
method on flows exposes many options for the deployment.
Here’s how to use some of those options:
cron
: a keyword that allows you to set a cron string schedule for the deployment; see schedules for more advanced scheduling optionstags
: a keyword that allows you to tag this deployment and its runs for bookkeeping and filtering purposesdescription
: a keyword that allows you to document what this deployment does; by default the description is set from the docstring of the flow function (if documented)version
: a keyword that allows you to track changes to your deployment; uses a hash of the file containing the flow by default; popular options include semver tags or git commit hashes
Next, add these options to your deployment:
if __name__ == "__main__":
get_repo_info.serve(
name="my-first-deployment",
cron="* * * * *",
tags=["testing", "tutorial"],
description="Given a GitHub repository, logs repository statistics for that repo.",
version="tutorial/deployments",
)
When you rerun this script, you will find an updated deployment in the UI that is actively scheduling work.
Stop the script in the CLI using CTRL+C
and your schedule automatically pauses.
serve()
is a long-running process
To execute remotely triggered or scheduled runs, your script with flow.serve
must be actively running.
Serve multiple flows at once
Serve multiple flows with the same process using the serve
utility along with the to_deployment
method of flows:
import time
from prefect import flow, serve
@flow
def slow_flow(sleep: int = 60):
"Sleepy flow - sleeps the provided amount of time (in seconds)."
time.sleep(sleep)
@flow
def fast_flow():
"Fastest flow this side of the Mississippi."
return
if __name__ == "__main__":
slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
fast_deploy = fast_flow.to_deployment(name="fast")
serve(slow_deploy, fast_deploy)
The behavior and interfaces are identical to the single flow case. A few things to note:
- the
flow.to_deployment
interface exposes the exact same options asflow.serve
; this method produces a deployment object - the deployments are only registered with the API once
serve(...)
is called - when serving multiple deployments, the only requirement is that they share a Python environment; they can be executed and scheduled independently of each other
A few optional steps for exploration include:
- pause and unpause the schedule for the “sleeper” deployment
- use the UI to submit ad-hoc runs for the “sleeper” deployment with different values for
sleep
- cancel an active run for the “sleeper” deployment from the UI
Hybrid execution option
Prefect’s deployment interface allows you to choose a hybrid execution model. Whether you use Prefect Cloud or self-host Prefect server, you can run workflows in the environments best suited to their execution. This model enables efficient use of your infrastructure resources while maintaining the privacy of your code and data. There is no ingress required. Read more about our hybrid model.
Retrieve a flow from remote storage
You can retrieve flows from remote storage with the flow.from_source
method.
flow.from_source
accepts a git repository URL and an entrypoint pointing to the
flow to load from the repository:
from prefect import flow
my_flow = flow.from_source(
source="https://github.com/PrefectHQ/prefect.git",
entrypoint="flows/hello_world.py:hello"
)
if __name__ == "__main__":
my_flow()
16:40:33.818 | INFO | prefect.engine - Created flow run 'muscular-perch' for flow 'hello'
16:40:34.048 | INFO | Flow run 'muscular-perch' - Hello world!
16:40:34.706 | INFO | Flow run 'muscular-perch' - Finished in state Completed()
A flow entrypoint is the path to the file where the flow is located, and the name of the flow function separated by a colon.
For additional configuration, such as specifying a private repository,
provide a GitRepository
object instead of URL:
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="https://github.com/org/private-repo.git",
branch="dev",
credentials={
"access_token": Secret.load("github-access-token").get()
}
),
entrypoint="flows.py:my_flow"
)
if __name__ == "__main__":
my_flow()
You can serve loaded flows
You can serve a flow loaded from remote storage with the same serve
method as a local flow:
from prefect import flow
if __name__ == "__main__":
flow.from_source(
source="https://github.com/org/repo.git",
entrypoint="flows.py:my_flow"
).serve(name="my-deployment")
When you serve a flow loaded from remote storage, the serving process periodically polls your remote storage for updates to the flow’s code. This pattern allows you to update your flow code without restarting the serving process.
Was this page helpful?