Documentation Index
Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
Use this file to discover all available pages before exploring further.
View on GitHub
This example shows how to build resilient AI workflows using Prefect and pydantic-ai.
The integration provides automatic retries for LLM calls, full observability of agent decisions,
and durable execution semantics that make workflows idempotent and rerunnable.
The Scenario: AI Data Analyst
You need to analyze datasets programmatically, but writing custom analysis code for each dataset is time-consuming.
Instead, you’ll build an AI agent that:
- Understands your dataset structure
- Decides which analyses are most valuable
- Uses Python tools to calculate statistics and detect anomalies
- Generates actionable insights
All while being resilient to LLM failures, tool errors, and network issues.
This example demonstrates:
PrefectAgent – Wraps pydantic-ai agents for durable execution
- Agent Tools – Python functions the AI can call, automatically wrapped as Prefect tasks
TaskConfig – Custom retry policies and timeouts for AI operations
- Durable Execution – Automatic idempotency and failure recovery
Setup
Install dependencies (if not already installed):
uv add pydantic-ai[prefect] pandas
# or with pip:
pip install "pydantic-ai[prefect]" pandas
from __future__ import annotations
from typing import Any
import pandas as pd
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
from prefect import flow, task
These functions are “tools” that the AI agent can call to analyze data.
Prefect automatically wraps each tool execution as a task for observability and retries.
def calculate_statistics(ctx: RunContext[pd.DataFrame], column: str) -> dict[str, Any]:
"""Calculate descriptive statistics for a column.
The AI agent can call this tool to understand data distribution,
and Prefect ensures it retries on failure."""
df = ctx.deps
if column not in df.columns:
return {"error": f"Column '{column}' not found. Available: {list(df.columns)}"}
stats = df[column].describe().to_dict()
stats["missing_count"] = int(df[column].isna().sum())
stats["unique_count"] = int(df[column].nunique())
return {
k: (float(v) if isinstance(v, (int, float)) else v) for k, v in stats.items()
}
def detect_anomalies(
ctx: RunContext[pd.DataFrame], column: str, threshold: float = 3.0
) -> list[dict[str, Any]]:
"""Detect anomalies using standard deviation method.
Identifies values that are more than `threshold` standard deviations from the mean.
This tool demonstrates how complex analysis logic can be made reliable with Prefect."""
df = ctx.deps
if column not in df.columns:
return [{"error": f"Column '{column}' not found"}]
if not pd.api.types.is_numeric_dtype(df[column]):
return [{"error": f"Column '{column}' is not numeric"}]
mean = df[column].mean()
std = df[column].std()
if std == 0:
return []
anomalies = df[abs(df[column] - mean) > (threshold * std)]
return [
{
"index": int(idx),
"value": float(row[column]),
"z_score": float((row[column] - mean) / std),
}
for idx, row in anomalies.head(10).iterrows()
]
def get_column_info(ctx: RunContext[pd.DataFrame]) -> dict[str, Any]:
"""Get overview of all columns in the dataset.
Helps the AI agent understand the dataset structure before analysis."""
df = ctx.deps
return {
"columns": list(df.columns),
"shape": {"rows": len(df), "columns": len(df.columns)},
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
}
Analysis Results Model
Structured output ensures the AI returns consistent, parseable results.
class DataAnalysis(BaseModel):
"""Structured analysis results from the AI agent."""
summary: str = Field(description="High-level summary of the dataset")
key_findings: list[str] = Field(
description="Key findings discovered from the data", min_length=3, max_length=5
)
recommendations: list[str] = Field(
description="Actionable recommendations based on the findings",
min_length=3,
max_length=5,
)
columns_analyzed: list[str] = Field(
description="List of columns that were analyzed"
)
def __str__(self) -> str:
"""Format the analysis results for clean display."""
findings = "\n".join(
f" {i}. {finding}" for i, finding in enumerate(self.key_findings, 1)
)
recommendations = "\n".join(
f" {i}. {rec}" for i, rec in enumerate(self.recommendations, 1)
)
return f"""
{"=" * 80}
ANALYSIS RESULTS
{"=" * 80}
📋 Summary:
{self.summary}
🔑 Key Findings:
{findings}
💡 Recommendations:
{recommendations}
📊 Columns Analyzed: {", ".join(self.columns_analyzed)}
{"=" * 80}
"""
Creating the AI Agent
We configure the agent with tools and wrap it with PrefectAgent for durability.
def create_data_analyst_agent() -> PrefectAgent[pd.DataFrame, DataAnalysis]:
"""Create an AI data analyst with Prefect durability.
The PrefectAgent wrapper automatically:
- Wraps agent.run as a Prefect flow
- Wraps LLM calls as Prefect tasks with retries
- Wraps tool calls as separate Prefect tasks
"""
# Create the base pydantic-ai agent
agent = Agent(
"openai:gpt-4o",
name="data-analyst-agent",
output_type=DataAnalysis,
deps_type=pd.DataFrame,
# Register tools that the agent can use
tools=[calculate_statistics, detect_anomalies, get_column_info],
system_prompt=(
"You are an expert data analyst. Analyze the provided dataset using "
"the available tools. Focus on finding meaningful patterns, anomalies, "
"and actionable insights. Always start by understanding the dataset "
"structure with get_column_info."
),
)
# Wrap with PrefectAgent for durable execution with custom retry policy
return PrefectAgent(
agent,
model_task_config=TaskConfig(
retries=3, # Retry LLM calls up to 3 times
retry_delay_seconds=[1.0, 2.0, 4.0], # Exponential backoff
timeout_seconds=60.0, # 60s timeout for LLM calls
),
tool_task_config=TaskConfig(
retries=2, # Retry tool calls up to 2 times
retry_delay_seconds=[0.5, 1.0],
),
)
Sample Dataset Generator
Create a realistic sales dataset for demonstration.
@task
def create_sample_dataset() -> pd.DataFrame:
"""Generate a sample sales dataset with some anomalies.
In production, you'd load real data from a file, database, or API."""
data = {
"product": ["Widget", "Gadget", "Doohickey", "Widget", "Gadget"] * 20,
"sales": [100, 150, 200, 110, 145] * 19
+ [100, 150, 200, 1000, 2000], # Last 2 are anomalies
"region": ["North", "South", "East", "West", "Central"] * 20,
"month": [1, 2, 3, 4, 5] * 20,
}
return pd.DataFrame(data)
Main Analysis Flow
Orchestrate the entire AI analysis workflow with Prefect.
@flow(name="ai-data-analyst", log_prints=True)
async def analyze_dataset_with_ai() -> DataAnalysis:
"""Run AI-powered data analysis with automatic retries and observability.
This flow demonstrates how Prefect makes AI workflows production-ready:
1. Dataset preparation is tracked as a task
2. AI agent execution is wrapped for durability
3. All LLM and tool calls are logged and retryable
4. Results are structured and validated with Pydantic
"""
# Prepare the dataset
print("📊 Preparing dataset...")
df = create_sample_dataset()
print(f"Dataset shape: {df.shape}\n")
# Create the AI agent with Prefect durability
print("🤖 Initializing AI data analyst...")
agent = create_data_analyst_agent()
# Run the analysis - all LLM and tool calls are automatically retried on failure
print("🔍 Running AI analysis...\n")
result = await agent.run(
"Analyze this sales dataset. Identify patterns, anomalies, and provide recommendations.",
deps=df,
)
# Display results
print(result.output)
return result.output
Serve the Flow
To get full durable execution with automatic idempotency, serve the flow to create a deployment.
Deployed flows enable Prefect’s transactional semantics for agent operations.
if __name__ == "__main__":
import os
import sys
# Check if OpenAI API key is set
if not os.getenv("OPENAI_API_KEY"):
print("❌ Error: OPENAI_API_KEY environment variable not set")
print("Set it with: export OPENAI_API_KEY='your-key-here'")
sys.exit(1)
# Serve the flow - this creates a deployment and runs a worker process
analyze_dataset_with_ai.serve(
name="ai-data-analyst-deployment",
tags=["ai", "pydantic-ai", "data-analysis"],
)
Triggering Flow Runs
Once served, trigger runs via:
Prefect UI:
- Navigate to http://localhost:4200
- Go to Deployments → “ai-data-analyst-deployment”
- Click “Run” → “Quick Run”
CLI:
prefect deployment run ai-data-analyst/ai-data-analyst-deployment --watch
Local Testing
For quick local testing without deployment:
import asyncio
asyncio.run(analyze_dataset_with_ai())
What Just Happened?
When you serve and trigger this flow, Prefect and pydantic-ai work together to create a resilient AI pipeline:
- Deployment Creation –
serve() creates a deployment and starts a worker to execute flow runs
- Durable AI Execution – The
PrefectAgent wrapper makes all AI operations retryable:
- LLM calls retry up to 3 times with exponential backoff (1s, 2s, 4s)
- Tool calls retry up to 2 times
- All operations respect 60s timeout
- Tool Observability – Each time the AI calls a tool (
get_column_info, calculate_statistics, detect_anomalies),
the call is run as a Prefect task
- Structured Results – Pydantic validates the AI’s output, ensuring it matches the expected schema
- Automatic Idempotency – When a deployed flow run is retried, Prefect’s transactional semantics ensure that
completed tasks are skipped and only failed operations are re-executed. This prevents duplicate API calls and
wasted compute.
Key Takeaways
- Deploy for Durability – Use
flow.serve() or flow.deploy() to unlock automatic idempotency and transactional semantics
- Retry Intelligence – Failed flow runs can be retried from the UI, skipping already-completed tasks
- Tool Observability – Every AI decision and tool call is tracked, logged, and independently retryable
- Zero Boilerplate – Just wrap your pydantic-ai agent with
PrefectAgent
- Customizable Policies – Fine-tune retries, timeouts, and error handling per operation type
Try it yourself:
- Set your OpenAI API key:
export OPENAI_API_KEY='your-key'
- Start the Prefect server:
prefect server start
- Serve the flow:
uv run -s examples/ai_data_analyst_with_pydantic_ai.py
- Trigger a run from the UI (http://localhost:4200) or CLI
- Watch all AI operations tracked in real-time
For more on AI orchestration with Prefect: