Skip to content

Workers

Prerequisites

Docker installed and running on your machine.

Why workers

In the previous section of the tutorial, you learned how work pools are a bridge between the Prefect orchestration layer and infrastructure for flow runs that can be dynamically provisioned. You saw how you can transition from persistent infrastructure to dynamic infrastructure by using flow.deploy instead of flow.serve.

Work pools that rely on client-side workers take this a step further by enabling you to run work flows in your own Docker containers, Kubernetes clusters, and serverless environments such as AWS ECS, Azure Container Instances, and GCP Cloud Run.

The architecture of a worker-based work pool deployment can be summarized with the following diagram:

graph TD
    subgraph your_infra["Your Execution Environment"]
        worker["Worker"]
    subgraph flow_run_infra[Flow Run Infra]
     flow_run_a(("Flow Run A"))
    end
    subgraph flow_run_infra_2[Flow Run Infra]
     flow_run_b(("Flow Run B"))
    end      
    end

    subgraph api["Prefect API"]
    Deployment --> |assigned to| work_pool
        work_pool(["Work Pool"])
    end

    worker --> |polls| work_pool
    worker --> |creates| flow_run_infra
    worker --> |creates| flow_run_infra_2

Notice above that the worker is in charge of provisioning the flow run infrastructure. In context of this tutorial, that flow run infrastructure is an ephemeral Docker container to host each flow run. Different worker types create different types of flow run infrastructure.

Now that we’ve reviewed the concepts of a work pool and worker, let’s create them so that you can deploy your tutorial flow, and execute it later using the Prefect API.

Set up a work pool and worker

For this tutorial you will create a Docker type work pool via the CLI.

Using the Docker work pool type means that all work sent to this work pool will run within a dedicated Docker container using a Docker client available to the worker.

Other work pool types

There are work pool types for serverless computing environments such as AWS ECS, Azure Container Instances, Google Cloud Run, and Vertex AI. Kubernetes is also a popular work pool type.

These options are expanded upon in various How-to Guides.

Create a work pool

In your terminal, run the following command to set up a Docker type work pool.

prefect work-pool create --type docker my-docker-pool

Let’s confirm that the work pool was successfully created by running the following command in the same terminal.

prefect work-pool ls

You should see your new my-docker-pool listed in the output.

Finally, let’s double check that you can see this work pool in your Prefect UI.

Navigate to the Work Pools tab and verify that you see my-docker-pool listed.

When you click into my-docker-pool you should see a red status icon signifying that this work pool is not ready.

To make the work pool ready, you need to start a worker.

Start a worker

Workers are a lightweight polling process that kick off scheduled flow runs on a specific type of infrastructure (such as Docker). To start a worker on your local machine, open a new terminal and confirm that your virtual environment has prefect installed.

Run the following command in this new terminal to start the worker:

prefect worker start --pool my-docker-pool

You should see the worker start. It's now polling the Prefect API to check for any scheduled flow runs it should pick up and then submit for execution. You’ll see your new worker listed in the UI under the Workers tab of the Work Pools page with a recent last polled date.

You should also be able to see a Ready status indicator on your work pool - progress!

You will need to keep this terminal session active for the worker to continue to pick up jobs. Since you are running this worker locally, the worker will terminate if you close the terminal. Therefore, in a production setting this worker should run as a daemonized or managed process.

Now that you’ve set up your work pool and worker, we have what we need to kick off and execute flow runs of flows deployed to this work pool. Let's deploy your tutorial flow to my-docker-pool.

Create the deployment

From our previous steps, we now have:

  1. A flow
  2. A work pool
  3. A worker

Now it’s time to put it all together. We're going to update our repo_info.py file to build a Docker image and update our deployment so our worker can execute it.

The updates that you need to make to repo_info.py are:

  1. Change flow.serve to flow.deploy.
  2. Tell flow.deploy which work pool to deploy to.
  3. Tell flow.deploy the name to use for the Docker image that will be built.

Here's what the updated repo_info.py looks like:

repo_info.py
import httpx
from prefect import flow


@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    response = httpx.get(url)
    response.raise_for_status()
    repo = response.json()
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo['stargazers_count']}")
    print(f"Forks 🍴 : {repo['forks_count']}")


if __name__ == "__main__":
    get_repo_info.deploy(
        name="my-first-deployment", 
        work_pool_name="my-docker-pool", 
        image="my-first-deployment-image:tutorial",
        push=False
    )

Why the push=False?

For this tutorial, your Docker worker is running on your machine, so we don't need to push the image built by flow.deploy to a registry. When your worker is running on a remote machine, you will need to push the image to a registry that the worker can access.

Remove the push=False argument, include your registry name, and ensure you've authenticated with the Docker CLI to push the image to a registry.

Now that you've updated your script, you can run it to deploy your flow to the work pool:

python repo_info.py

Prefect will build a custom Docker image containing your workflow code that the worker can use to dynamically spawn Docker containers whenever this workflow needs to run.

What Dockerfile?

In this example, Prefect generates a Dockerfile for you that will build an image based off of one of Prefect's published images. The generated Dockerfile will copy the current directory into the Docker image and install any dependencies listed in a requirements.txt file.

If you want to use a custom Dockerfile, you can specify the path to the Dockerfile using the DeploymentImage class:

repo_info.py
import httpx
from prefect import flow
from prefect.deployments import DeploymentImage


@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    response = httpx.get(url)
    response.raise_for_status()
    repo = response.json()
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo['stargazers_count']}")
    print(f"Forks 🍴 : {repo['forks_count']}")


if __name__ == "__main__":
    get_repo_info.deploy(
        name="my-first-deployment", 
        work_pool_name="my-docker-pool", 
        image=DeploymentImage(
            name="my-first-deployment-image",
            tag="tutorial",
            dockerfile="Dockerfile"
        ),
        push=False
    )

Modify the deployment

If you need to make updates to your deployment, you can do so by modifying your script and rerunning it. You'll need to make one update to specify a value for job_variables to ensure your Docker worker can successfully execute scheduled runs for this flow. See the example below.

The job_variables section allows you to fine-tune the infrastructure settings for a specific deployment. These values override default values in the specified work pool's base job template.

When testing images locally without pushing them to a registry (to avoid potential errors like docker.errors.NotFound), it's recommended to include an image_pull_policy job_variable set to Never. However, for production workflows, always consider pushing images to a remote registry for more reliability and accessibility.

Here's how you can quickly set the image_pull_policy to be Never for this tutorial deployment without affecting the default value set on your work pool:

repo_info.py
import httpx
from prefect import flow


@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    response = httpx.get(url)
    response.raise_for_status()
    repo = response.json()
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo['stargazers_count']}")
    print(f"Forks 🍴 : {repo['forks_count']}")


if __name__ == "__main__":
    get_repo_info.deploy(
        name="my-first-deployment", 
        work_pool_name="my-docker-pool", 
        job_variables={"image_pull_policy": "Never"},
        image="my-first-deployment-image:tutorial",
        push=False
    )

To register this update to your deployment's parameters with Prefect's API, run:

python repo_info.py

Now everything is set for us to submit a flow-run to the work pool:

prefect deployment run 'get_repo_info/my-deployment'

Common Pitfall

  • Store and run your deploy scripts at the root of your repo, otherwise the built Docker file may be missing files that it needs to execute!

Did you know?

A Prefect flow can have more than one deployment. This pattern can be useful if you want your flow to run in different execution environments.

Next steps

Happy building!