In the Build a data pipeline tutorial, you learned how to create resilient and performant data pipelines. Now you’ll learn how to handle data dependencies and ingest large amounts of data by building a GitHub issue analysis pipeline.

The real world can present additional challenges when dealing with web data:

  • API requests can fail or give a response with missing or malformed data.
  • You need to make multiple dependent API calls.
  • You need to ingest data when you don’t know in advance how much data is available.

Set up error handling

Throw and catch errors to handle them gracefully. For example, if you don’t get a 2xx response from an API, throw an exception and log the error.

from typing import Optional

from prefect import task

@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status() # Raise an exception if the response is not a 2xx status code
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None

Ingest large amounts of data

Use pagination to fetch large amounts of data and run tasks concurrently to analyze the data efficiently:

from typing import List

from prefect import flow

@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    all_issues = []

    for repo in repos:
        for page in range(1, 3):  # Get first 2 pages
            issues = fetch_page_of_issues(repo, page)
            if not issues:
                break
            all_issues.extend(issues)
    
    # Run issue analysis tasks concurrently
    for issue in all_issues:
        analyze_issue.submit(issue) # Submit each task to a task runner
    
    # Wait for all analysis tasks to complete
    for detail in issue_details:
        result = detail.result() # Block until the task has completed
        print(f"Analyzed issue #{result['number']}")

Structure your code with dependent nested flows and tasks

Use nested flows and tasks to help distribute tasks more efficiently and aid with debugging.

  • Use nested flows for more complex operations that involve multiple steps.
  • Use tasks for simpler, atomic operations.

Here’s an example of how to use nested flows and tasks:

from typing import List

from prefect import flow, task


@flow
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        
        # Fetch and analyze all issues
        issues = fetch_repo_issues(repo)
        
        # Calculate metrics
        resolution_rate = calculate_resolution_rate(issues)
        # ...


@flow
def fetch_repo_issues(repo: str):
    """Nested flow: Fetch all data for a single repository"""

    # ...


@task
def calculate_resolution_rate(issues: List[dict]) -> float:
    """Task: Calculate the percentage of closed issues"""

    # ...

Put it all together

Here’s the complete flow that combines all of these components. We’ll also add retries, caching, and rate limiting to make the workflow more robust.

repo_analysis.py
from datetime import timedelta, datetime
from statistics import mean
from typing import List, Optional
import httpx

from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.sync import rate_limit


@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
    """Analyze issue health metrics for GitHub repositories"""
    for repo in repos:
        print(f"Analyzing {repo}...")
        
        # Fetch and analyze all issues
        issues = fetch_repo_issues(repo)
        
        # Calculate metrics
        avg_response_time = calculate_response_times(issues)
        resolution_rate = calculate_resolution_rate(issues)
        
        print(f"Average response time: {avg_response_time:.1f} hours")
        print(f"Resolution rate: {resolution_rate:.1f}%")


@flow
def fetch_repo_issues(repo: str):
    """Fetch all issues for a single repository"""
    all_issues = []
    page = 1
    
    for page in range(1, 3):  # Limit to 2 pages to avoid hitting rate limits
        issues = fetch_page_of_issues(repo, page)
        if not issues or len(issues) == 0:
            break
        all_issues.extend(issues)
        page += 1

    issue_details = []
    for issue in all_issues[:5]:  # Limit to 5 issues to avoid hitting rate limits
        issue_details.append(
            fetch_issue_details.submit(repo, issue['number'])
        )
    
    details = []
    for issue in issue_details:
        details.append(issue.result())

    return details


@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
    """Fetch a page of issues for a GitHub repository"""
    rate_limit("github-api")
    try:
        response = httpx.get(
            f"https://api.github.com/repos/{repo}/issues",
            params={"page": page, "state": "all", "per_page": 100}
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching issues for {repo}: {e}")
        return None


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_issue_details(repo: str, issue_number: int) -> dict:
    """Fetch detailed information about a specific issue"""
    rate_limit("github-api")
    response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
    issue_data = response.json()
    
    # Fetch comments for the issue
    comments = fetch_comments(issue_data['comments_url'])
    issue_data['comments_data'] = comments
    
    return issue_data


@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_comments(comments_url: str) -> List[dict]:
    """Fetch comments for an issue"""
    rate_limit("github-api")
    try:
        response = httpx.get(comments_url)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"Error fetching comments: {e}")
        return []


@task
def calculate_response_times(issues: List[dict]) -> float:
    """Calculate average time to first response for issues"""
    response_times = []
    
    for issue in issues:
        comments_data = issue.get('comments_data', [])
        if comments_data:  # If there are comments
            created = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
            first_comment = datetime.fromisoformat(
                comments_data[0]['created_at'].replace('Z', '+00:00')
            )
            response_time = (first_comment - created).total_seconds() / 3600
            response_times.append(response_time)

    return mean(response_times) if response_times else 0


@task
def calculate_resolution_rate(issues: List[dict]) -> float:
    """Calculate the percentage of closed issues"""
    if not issues:
        return 0
    closed = sum(1 for issue in issues if issue['state'] == 'closed')
    return (closed / len(issues)) * 100


if __name__ == "__main__":
    analyze_repo_health([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])

Before running this code, make sure to set up the GitHub API rate limit:

# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016

Run your analysis:

python repo_analysis.py

The output should look something like this:

10:59:13.933 | INFO    | prefect.engine - Created flow run 'robust-kangaroo' for flow 'analyze-repo-health'
10:59:13.934 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/abdf7f46-6d59-4857-99cd-9e265cadc4a7
10:59:13.954 | INFO    | Flow run 'robust-kangaroo' - Analyzing PrefectHQ/prefect...
...
10:59:27.631 | INFO    | Flow run 'robust-kangaroo' - Average response time: 0.4 hours
10:59:27.631 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 40.0%
10:59:27.632 | INFO    | Flow run 'robust-kangaroo' - Analyzing pydantic/pydantic...
...
10:59:40.990 | INFO    | Flow run 'robust-kangaroo' - Average response time: 0.0 hours
10:59:40.991 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:40.991 | INFO    | Flow run 'robust-kangaroo' - Analyzing huggingface/transformers...
...
10:59:54.225 | INFO    | Flow run 'robust-kangaroo' - Average response time: 1.1 hours
10:59:54.225 | INFO    | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:54.240 | INFO    | Flow run 'robust-kangaroo' - Finished in state Completed()

Next steps

In this tutorial, you built a complex data extraction pipeline which uses the following new techniques:

Now that you’ve finished this tutorial series, continue your learning journey by going deep on the following topics:

Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.