The simplest way to create a deployment for your flow is by calling its serve method.

Serve a flow

The serve method creates a deployment for the flow and starts a long-running process that monitors for work from the Prefect server. When work is found, it is executed within its own isolated subprocess.

hello_world.py
from prefect import flow


@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
    print(f"Hello {name} from Prefect! 🤗")

    if goodbye:
        print(f"Goodbye {name}!")


if __name__ == "__main__":
    # creates a deployment and starts a long-running
    # process that listens for scheduled work
    hello_world.serve(name="my-first-deployment",
        tags=["onboarding"],
        parameters={"goodbye": True},
        interval=60
    )

This interface provides the configuration for a deployment (with no strong infrastructure requirements), such as:

  • schedules
  • event triggers
  • metadata such as tags and description
  • default parameter values

Schedules are auto-paused on shutdown

By default, stopping the process running flow.serve will pause the schedule for the deployment (if it has one).

When running this in environments where restarts are expected use the

pause_on_shutdown=False flag to prevent this behavior:

if __name__ == "__main__":
    hello_world.serve(
        name="my-first-deployment",
        tags=["onboarding"],
        parameters={"goodbye": True},
        pause_on_shutdown=False,
        interval=60
    )

Additional serve options

The serve method on flows exposes many options for the deployment. Here’s how to use some of those options:

  • cron: a keyword that allows you to set a cron string schedule for the deployment; see schedules for more advanced scheduling options
  • tags: a keyword that allows you to tag this deployment and its runs for bookkeeping and filtering purposes
  • description: a keyword that allows you to document what this deployment does; by default the description is set from the docstring of the flow function (if documented)
  • version: a keyword that allows you to track changes to your deployment; uses a hash of the file containing the flow by default; popular options include semver tags or git commit hashes
  • triggers: a keyword that allows you to define a set of conditions for when the deployment should run; see triggers for more on Prefect Events concepts

Next, add these options to your deployment:

if __name__ == "__main__":
    get_repo_info.serve(
        name="my-first-deployment",
        cron="* * * * *",
        tags=["testing", "tutorial"],
        description="Given a GitHub repository, logs repository statistics for that repo.",
        version="tutorial/deployments",
    )

Triggers with .serve

See this example that triggers downstream work on upstream events.

When you rerun this script, you will find an updated deployment in the UI that is actively scheduling work. Stop the script in the CLI using CTRL+C and your schedule automatically pauses.

serve() is a long-running process

To execute remotely triggered or scheduled runs, your script with flow.serve must be actively running.

Serve multiple flows at once

Serve multiple flows with the same process using the serve utility along with the to_deployment method of flows:

serve_two_flows.py
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")
    serve(slow_deploy, fast_deploy)

The behavior and interfaces are identical to the single flow case. A few things to note:

  • the flow.to_deployment interface exposes the exact same options as flow.serve; this method produces a deployment object
  • the deployments are only registered with the API once serve(...) is called
  • when serving multiple deployments, the only requirement is that they share a Python environment; they can be executed and scheduled independently of each other

A few optional steps for exploration include:

  • pause and unpause the schedule for the "sleeper" deployment
  • use the UI to submit ad-hoc runs for the "sleeper" deployment with different values for sleep
  • cancel an active run for the "sleeper" deployment from the UI

Hybrid execution option

Prefect’s deployment interface allows you to choose a hybrid execution model. Whether you use Prefect Cloud or self-host Prefect server, you can run workflows in the environments best suited to their execution. This model enables efficient use of your infrastructure resources while maintaining the privacy of your code and data. There is no ingress required. Read more about our hybrid model.

Retrieve a flow from remote storage

Just like the .deploy method, the flow.from_source method is used to define how to retrieve the flow that you want to serve.

from_source

The flow.from_source method on Flow objects requires a source and an entrypoint.

source

The source of your deployment can be:

  • a path to a local directory such as path/to/a/local/directory
  • a repository URL such as https://github.com/org/repo.git
  • a GitRepository object that accepts
    • a repository URL
    • a reference to a branch, tag, or commit hash
    • GitCredentials for private repositories

entrypoint

A flow entrypoint is the path to the file where the flow is located within that source, in the form

{path}:{flow_name}

For example, the following code will load the hello flow from the flows/hello_world.py file in the PrefectHQ/prefect repository:

load_from_url.py
from prefect import flow


my_flow = flow.from_source(
    source="https://github.com/PrefectHQ/prefect.git",
    entrypoint="flows/hello_world.py:hello"
)


if __name__ == "__main__":
    my_flow()
16:40:33.818 | INFO    | prefect.engine - Created flow run 'muscular-perch' for flow 'hello'
16:40:34.048 | INFO    | Flow run 'muscular-perch' - Hello world!
16:40:34.706 | INFO    | Flow run 'muscular-perch' - Finished in state Completed()

For more ways to store and access flow code, see the Retrieve code from storage page.

You can serve loaded flows

You can serve a flow loaded from remote storage with the same serve method as a local flow:

serve_loaded_flow.py
from prefect import flow


if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/org/repo.git",
        entrypoint="flows.py:my_flow"
    ).serve(name="my-deployment")

Remote storage polling

When you serve a flow loaded from remote storage, the serving process periodically polls your remote storage for updates to the flow’s code. This pattern allows you to update your flow code without restarting the serving process. Note that if you change metadata associated with your flow’s deployment such as parameters, you will need to restart the serve process.

Containerize your serve process

A simple deployment strategy is to define a serve process as the ENTRYPOINT of a container image.

See Serve flows in a long-lived Docker container for a guide.

If you need dynamic infrastructure

For more configuration, you can create a deployment that uses a work pool. Reasons to create a work-pool based deployment include:

  • Wanting to run your flow on dynamically provisioned infrastructure
  • Needing more control over the execution environment on a per-flow run basis
  • Creating an infrastructure template to use across deployments

Work pools are popular with data platform teams because they allow you to manage infrastructure configuration across an organization.

Learn more about work-pool based deployments on the Configure dynamic infrastructure with work pools page