Migration from Apache Airflow to Prefect: A Comprehensive How-To Guide
Airflow Concept | Prefect Equivalent | Key Differences |
---|---|---|
DAGs | Flows | Prefect flows are standard Python functions (@flow ). No DAG classes or >> dependencies. |
Operators | Tasks | Prefect tasks (@task ) replace Airflow Operators, removing the need for specialized classes. |
Executors | Work Pools & Workers | Prefect decouples task execution using lightweight workers polling work pools. |
Scheduling | Deployments | Scheduling is separate from flow code and configured externally. |
XComs | Return Values | Prefect tasks return data directly; no need for XComs or metadata storage. |
Hooks & Connections | Blocks & Integrations | Prefect replaces Hooks with Blocks for secure resource management. |
Sensors | Triggers & Event-Driven Flows | Prefect uses external event triggers or lightweight polling flows. |
Airflow UI | Prefect UI | Prefect provides real-time monitoring, task logs, and automation features. |
Feature | Airflow | Prefect |
---|---|---|
Task Execution | Tasks run as independent processes/pods | Tasks execute in single flow runtime |
Resource Control | Task-level via executor settings | Flow-level via work pools & task runners |
Data Passing | Requires XComs or external storage | Direct in-memory data passing |
Parallelism | Managed by executor configuration | Managed by work pools and task runners |
Task Dependencies | Uses >> operators and set_upstream() | Implicit via Python function calls |
DAG Parsing | Pre-parsed with global variable execution | Standard Python function execution |
State & Retries | Individual task retries, manual DAG fixes | Built-in flow & task retry handling |
Scheduling | Tightly coupled with DAG code | Decoupled via deployments |
Infrastructure | Requires scheduler, metadata DB, workers | Lightweight API server with optional cloud |
pip install prefect
).prefect server start
) or sign up for Prefect Cloud to run flows immediately.PythonOperator
, then specify that they run sequentially (extract
then transform
then load
).
extract_fn
, transform_fn
, load_fn
and turn each into a @task
decorated function. The code inside can remain largely the same (minus any Airflow-specific cruft). For example:
@task
to each function. No need for a special operator class or task IDs - the function name serves as an identifier, and Prefect will handle the orchestration.
@flow
function that calls these tasks in the required order:
>>
dependencies or XComs. Task results can be stored in variables that are passed directly to other tasks as arguments. By default, tasks are automatically executed in the order they are called.
Unlike Airflow, where testing often requires an Airflow context, Prefect flows run like standard Python code. You can execute etl_pipeline()
in an interpreter, import it elsewhere, or test tasks individually (transform_data.fn(sample_data)
).
>>
), and relies on XCom for data passing.Feature | Airflow (BranchPythonOperator) | Prefect (if/else logic) |
---|---|---|
Branching Method | Often uses specialized operators (BranchPythonOperator ) | Uses native Python conditionals (if/else ) |
Skipped Tasks | Unselected branches are explicitly skipped | Prefect only runs the executed branch—no skipping needed |
Join Behavior | Uses DummyOperator to rejoin paths | Downstream tasks execute automatically after the conditional branch |
DummyOperator
default_args
) or per task (e.g., retries=3
). In Prefect, you can specify retries for any task or flow.
Use @task(retries=2, retry_delay_seconds=60)
to retry a task twice on failure, or @flow(retries=1)
to retry the entire flow once. Prefect distinguishes flow and task retries—flow retries rerun all tasks, while task retries rerun only the failed task. Replace Airflow-specific error handling (on_failure_callback
, sensors) with Prefect’s Retry, State Handlers, or built-in failure notifications.
DAG(...)
blocks), default_args, Airflow imports (from airflow...
), XCom push/pull calls (replace with return values), Jinja templating in operator arguments (you can often just compute those values in Python directly or use Prefect parameters).
If your DAG used Airflow Variables or Connections (Airflow’s way to store config in the Metastore), you’ll need to supply those to Prefect tasks via another means - for example, as environment variables or using Prefect Blocks (like a Block for a database connection string). Essentially, your Prefect flow code should look like a regular Python script with functions, not like an Airflow DAG file.
As an illustration, here’s how our example ETL pipeline looks after conversion:
if __name__ == "__main__": etl_pipeline_flow()
allows running the flow locally during development. In production, Prefect handles scheduling.retries=1
ensures one retry on failure, and log_prints=True
sends print()
output to Prefect’s UI.@task
decorator serves a similar purpose but does not rely on XComs.
After completing these steps, the Prefect flow should accurately replicate the functionality of the Airflow DAG while being more modular and testable. The migration is now complete, and the next step is to focus on deploying and optimizing the new workflows.
Create a Prefect Managed Work Pool
Deploy a Flow to Managed Execution
Run the Deployment via Prefect UI or CLI
DaskTaskRunner
to enable multi-process execution:
"prod-work-pool"
):
S3KeySensor
, configure an AWS Lambda or EventBridge rule to call the Prefect API when an S3 file is uploaded. Prefect Cloud and Server provide API endpoints to start flows on demand. Prefect’s Automations can also trigger flows based on specific conditions.
Handling Polling Scenariosprefect-snowflake
, prefect-gcp
, prefect-dbt
) or by directly using the relevant Python libraries within tasks.
Migrating Airflow Hooks to Prefect
PostgresHook
, GoogleCloudStorageHook
).success
, failed
, skipped
, etc.) are stored in a metadata database and displayed in the Airflow UI’s DAG run view. Prefect also tracks state for each task and flow run, but these states are managed by the Prefect backend (Prefect Server or Cloud API) and can be accessed via the Prefect UI, API, or CLI.
After migration, similar visibility is available in Prefect’s UI, where you can track which flows and tasks succeeded or failed. Prefect also includes additional state management features such as:
Cancelling
state).@flow(log_prints=True)
or @task(log_prints=True)
These flags route print()
statements to Prefect logs automatically.
For centralized logging (e.g., ElasticSearch, Stackdriver), Prefect supports custom logging handlers and third-party integrations. Logs can be forwarded similarly to how Airflow handled external logging.
Debugging and Troubleshooting
Prefect simplifies debugging because tasks are standard Python functions. Instead of analyzing scheduler or worker logs, you can:
.deploy()
method, or CLI. Violations generate prefect.sla.violation
events, which can trigger Automations to send notifications or take corrective actions.
For full configuration details, refer to the Measure reliability with Service Level Agreements documentation.
Implementation Considerations
Prefect allows flexible logging and alerting adjustments to match existing monitoring workflows. Logging handlers can integrate with third-party services (e.g., ElasticSearch, Datadog), and Prefect’s API and UI provide real-time state visibility for proactive monitoring.
flow.deploy()
), via CLI (prefect deployment
), or by writing a YAML (prefect.yaml
) that describes the deployment.
Key things a Prefect deployment defines:
etl_pipeline
DAG to etl_pipeline_flow
in Prefect, we might write a prefect.yaml
like:
prod-k8s-pool
(a Kubernetes pool perhaps). In Airflow, these details were all intertwined in the DAG file (the schedule was in code, the infrastructure maybe in the executor config or the DAG via executor_config
). In Prefect, there is a separation of these concerns.
prefect deployment build -n <name> -p <work_pool_name> --cron "<schedule>" -q default -o deployment.yaml
(or use prefect.yaml
) and apply it.prefect deployment build --skip-upload
), and unit test tasks as regular Python functions.prefect server start
), register a deployment, and trigger flows via Prefect UI to mirror production behavior.ImportError
, ensure all required libraries are installed in the execution environment (Docker image, VM, etc.), not just locally.trigger_rule="all_done"
, implement equivalent logic in Prefect with try/except
.print
statements, re-deploy (which is quick with Prefect CLI), and re-run to see output.MarvinAI is a tool that can help you debug your Prefect flows.
task.map(items)
) to process large datasets efficiently.Technique | Description |
---|---|
Scale efficiently | Prefect makes it simple to distribute workloads across work pools and workers, eliminating Airflow’s scheduler bottlenecks. |
Optimize infrastructure | Adjust worker capacity based on usage, scaling vertically (more resources per worker) or horizontally (adding more workers). |
Reduce costs | Consider serverless work pools (AWS ECS, GCP Cloud Run) to avoid idle infrastructure costs. |
Technique | Description |
---|---|
Share best practices | Conduct a team retrospective to refine workflows and establish templates for new flows. |
Embrace Prefect’s flexibility | Now that scheduling and execution are handled seamlessly, focus on building better data workflows, not managing infrastructure. |