Quickstart
Get started with Prefect, the easiest way to orchestrate and observe your data pipelines
Prefect is an orchestration and observability platform that empowers developers to build and scale workflows quickly. In this quickstart, you will use Prefect to convert the following Python script to a schedulable, observable, resilient, and deployable workflow in minutes:
import httpx
def get_repo_info():
"""Fetch statistics about the Prefect repository"""
url = "https://api.github.com/repos/PrefectHQ/prefect"
response = httpx.get(url)
repo = response.json()
print("PrefectHQ/prefect repository statistics 🤓:")
print(f"Stars 🌠: {repo['stargazers_count']}")
if __name__ == "__main__":
get_repo_info()
Install Prefect
To install Prefect with pip, run:
pip install -U prefect
See Install Prefect for more details on installation.
Connect to a Prefect API
Connect to a Prefect API:
-
Start a local API server:
prefect server start
-
Open the Prefect dashboard in your browser at http://localhost:4200.
Convert your script to a Prefect workflow
The easiest way to convert a Python script into a workflow is to add a @flow
decorator to the script’s entrypoint.
This will create a corresponding flow.
Adding @task
decorators to any functions called by the flow converts them to tasks.
Tasks receive metadata about upstream dependencies and the state of those dependencies before they run.
Prefect will then record these dependencies and states as it orchestrates these tasks.
import httpx # an HTTP client library and dependency of Prefect
from prefect import flow, task
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
"""Get info about a repo - will retry twice after failing"""
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
"""Get contributors for a repo"""
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(log_prints=True)
def log_repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
"""
Given a GitHub repository, logs the number of stargazers
and contributors for that repo.
"""
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠: {repo_info['stargazers_count']}")
contributors = get_contributors(repo_info)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
log_repo_info()
The log_prints=True
argument provided to the @flow
decorator automatically converts any print
statements within the function to INFO
level logs.
Run your flow
You can run your Prefect flow just as you would a Python script:
python my_gh_workflow.py
Prefect automatically tracks the state of the flow run and logs the output, which can be viewed directly in the terminal or in the UI.
Create a work pool
Running a flow locally is a good start, but most use cases require a remote execution environment. A work pool is the most common interface for deploying flows to remote infrastructure.
Deploy your flow to a self-hosted Prefect server instance using a Process
work pool.
All flow runs submitted to this work pool will run in a local subprocess (the mechanics are similar for other work pool types that run on remote infrastructure).
-
Create a
Process
work pool:prefect work-pool create --type process my-work-pool
-
Verify that the work pool exists:
prefect work-pool ls
-
Start a worker to poll the work pool:
prefect worker start --pool my-work-pool
You can also choose from other work pool types.
Deploy and schedule your flow
A deployment is used to determine when, where, and how a flow should run. Deployments elevate flows to remotely configurable entities that have their own API.
-
Create a deployment in code:
create_deployment.pyfrom prefect import flow # Source for the code to deploy (here, a GitHub repo) SOURCE_REPO="https://github.com/prefecthq/demos.git" if __name__ == "__main__": flow.from_source( source=SOURCE_REPO, entrypoint="my_gh_workflow.py:repo_info", # Specific flow to run ).deploy( name="my-first-deployment", work_pool_name="my-work-pool", # Work pool target cron="0 1 * * *", # Cron schedule (1am every day) )
You can store your flow code in nearly any location as long as Prefect can access it. See Where to store your flow code for more details.
-
Run the script to create the deployment:
python create_deployment.py
Check the logs to ensure your deployment was created:
Successfully created/updated all deployments! ______________________________________________________ | Deployments | ______________________________________________________ | Name | Status | Details | ______________________________________________________ | repo-info/my-first-deployment | applied | | ______________________________________________________
-
Schedule a run for the deployment:
prefect deployment run 'repo-info/my-first-deployment'
Soon you should see the flow run graph and logs on the Flow Run page in the UI. Logs are also streamed to the terminal.
Next steps
You’ve seen how to move from a Python script to a scheduled, observable, remotely orchestrated workflow with Prefect. Now consider reading:
- Write flows
- Write tasks
- Cloud and server
- Manage infrastructure with work pools to learn about running workflows on Kubernetes, Docker, and serverless infrastructure.
Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.
Was this page helpful?