Use this file to discover all available pages before exploring further.
View full project on GitHubDatabase cleanup is critical for self-hosted Prefect deployments (see database maintenance guide),
but itβs risky: too automated and you might delete important data, too manual and it becomes a constant burden.This example shows how to build a cleanup workflow that evolves with your confidence:
Start with human approval: Preview what will be deleted, pause the flow, and manually approve/reject via a UI form
Graduate to AI autonomy: Switch to an AI agent that investigates system health using Prefect MCP tools and returns structured decisions with confidence scores
Build trust incrementally by monitoring decisions in lower-risk environments before enabling AI autonomy in production.For a full deployment example with scheduling and environment configuration, see:
github.com/zzstoatzz/prefect-mcp-server-demo
# For human approval onlyuv add prefect# For AI approval, add pydantic-aiuv add 'pydantic-ai[prefect]'export ANTHROPIC_API_KEY='your-key'
The Prefect MCP server provides AI agents with read-only tools for investigating your Prefect instance.
See How to use the Prefect MCP server for setup.
Instead of scattering configuration across your code, define it as a structured Pydantic model.
This becomes a UI form automatically - see form building guide.
class RetentionConfig(BaseModel): """Define what to clean and how.""" retention_period: timedelta = Field( default=timedelta(days=30), description="How far back to keep flow runs" ) states_to_clean: list[str] = Field( default=["Completed", "Failed", "Cancelled"], description="Which states to clean", ) batch_size: int = Field(default=100, ge=10, le=1000) dry_run: bool = Field(default=False, description="Preview without deleting") approval_type: Literal["human", "ai"] = Field( default="human", description="Human form or AI agent approval" )
Human Approval: Pause and Review
When using approval_type="human", the flow pauses and shows a form in the UI.
class CleanupApproval(RunInput): """Human approval form for cleanup operations.""" approve: bool = Field(default=False) notes: str = Field(default="", description="Why approve/reject?")@flow(name="human-approval")def get_human_approval(preview: str, count: int) -> tuple[bool, str]: """Pause and wait for human decision via UI form.""" print(f"βΈοΈ pausing for human review of {count} flow runs...") approval = pause_flow_run( wait_for_input=CleanupApproval.with_initial_data( description=f"**Preview ({count} runs):**\n{preview}" ), timeout=3600, ) return approval.approve, approval.notes
AI Approval: Autonomous Investigation
When using approval_type="ai", a pydantic-ai agent investigates using Prefect MCP tools to decide if cleanup is safe.
AGENT_PROMPT = """you are a prefect infrastructure operations agent reviewing a proposed database cleanup.use the prefect mcp tools to investigate system health:- query recent flow run patterns- check deployment schedules- review system statusreturn your decision with confidence (0-1), reasoning, and any concerns.approve routine cleanup unless you detect risks like ongoing incidents or critical deployments needing history."""class CleanupDecision(BaseModel): """Structured AI decision.""" approved: bool confidence: float = Field(ge=0.0, le=1.0) reasoning: str concerns: list[str] | None = Nonedef create_cleanup_agent() -> PrefectAgent[None, CleanupDecision]: """Create AI agent with Prefect MCP tools for autonomous approval.""" # Connect to Prefect MCP server - provides read-only Prefect tools mcp_server = MCPServerStdio( "prefect", "uvx", args=["--from", "prefect-mcp", "prefect-mcp-server"] ) agent = Agent( model=AnthropicModel("claude-sonnet-4-5-20250929"), output_type=CleanupDecision, system_prompt=AGENT_PROMPT, mcp_servers=[mcp_server], ) # Wrap with PrefectAgent for retry/timeout handling return PrefectAgent( agent, model_task_config=TaskConfig(retries=2, timeout_seconds=120.0), )@flow(name="ai-approval", log_prints=True)async def get_ai_approval( preview: str, count: int, config: RetentionConfig) -> tuple[bool, str]: """Use AI agent to autonomously decide approval.""" print("π€ requesting ai agent decision...") agent = create_cleanup_agent() context = f"""proposed cleanup:- retention: {config.retention_period}- states: {", ".join(config.states_to_clean)}- count: {count} flow runspreview:{preview}investigate using your prefect mcp tools and decide if safe to proceed.""" result = await agent.run(context) decision = result.output print(f"decision: {'β approved' if decision.approved else 'β rejected'}") print(f"confidence: {decision.confidence:.0%}") print(f"reasoning: {decision.reasoning}") return decision.approved, decision.reasoning
Main Cleanup Flow
@flow(name="database-cleanup", log_prints=True)async def database_cleanup_flow(config: RetentionConfig | None = None) -> dict: """Database cleanup with configurable approval workflow.""" if config is None: config = RetentionConfig() print(f"π starting cleanup (approval: {config.approval_type})") # Fetch flow runs matching retention policy async with get_client() as client: cutoff = datetime.now(timezone.utc) - config.retention_period flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( start_time=FlowRunFilterStartTime(before_=cutoff), state=FlowRunFilterStateName(any_=config.states_to_clean), ), limit=config.batch_size * 5, ) if not flow_runs: print("β¨ nothing to clean") return {"status": "no_action", "deleted": 0} # Preview what will be deleted preview = "\n".join( f"- {r.name} ({r.state.type.value}) - {r.start_time}" for r in flow_runs[:5] ) if len(flow_runs) > 5: preview += f"\n... and {len(flow_runs) - 5} more" print(f"\nπ preview:\n{preview}\n") # Get approval (human or AI based on config) if config.approval_type == "human": approved, notes = get_human_approval(preview, len(flow_runs)) else: approved, notes = await get_ai_approval(preview, len(flow_runs), config) if not approved: print(f"β cleanup rejected: {notes}") return {"status": "rejected", "reason": notes} print(f"β cleanup approved: {notes}") if config.dry_run: print("π dry run - no deletions") return {"status": "dry_run", "would_delete": len(flow_runs)} # Perform deletion with batching and rate limiting deleted = 0 async with get_client() as client: for i in range(0, len(flow_runs), config.batch_size): batch = flow_runs[i : i + config.batch_size] for run in batch: try: await client.delete_flow_run(run.id) deleted += 1 except ObjectNotFound: # Already deleted (e.g., by concurrent cleanup) - treat as success deleted += 1 except Exception as e: print(f"failed to delete {run.id}: {e}") await asyncio.sleep(0.1) # rate limiting print(f"β deleted {deleted}/{len(flow_runs)} flow runs") return {"status": "completed", "deleted": deleted}
if __name__ == "__main__": # Start with human approval in production prod_config = RetentionConfig( retention_period=timedelta(days=30), dry_run=False, approval_type="human", ) # Graduate to AI approval in dev/staging dev_config = RetentionConfig( retention_period=timedelta(minutes=5), dry_run=False, approval_type="ai", # requires ANTHROPIC_API_KEY ) database_cleanup_flow.serve( name="database-cleanup-deployment", tags=["database-maintenance", "cleanup"], )