Skip to main content
Test Prefect flows and tasks by running them against an isolated, temporary backend or by calling the underlying function directly.

Run flows against a temporary database

Use prefect_test_harness as a context manager to run flows and tasks against a temporary local SQLite database:
from prefect import flow
from prefect.testing.utilities import prefect_test_harness

@flow
def my_favorite_flow():
    return 42

def test_my_favorite_flow():
  with prefect_test_harness():
      # run the flow against a temporary testing database
      assert my_favorite_flow() == 42
For more extensive testing, use prefect_test_harness as a fixture in your unit testing framework. For example, when using pytest:
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

@flow
def my_favorite_flow():
    return 42

def test_my_favorite_flow():
    assert my_favorite_flow() == 42
Session-scoped fixtureIn this example, the fixture is scoped to run once for the entire test session. In most cases, you do not need a clean database for each test. Just isolate your test runs to a test database. Creating a new test database per test creates significant overhead, so scope the fixture to the session. If you need to isolate some tests fully, place them in a separate module with a function-scoped harness fixture (see Stale state between tests).

Understand what the test harness does

When you enter the prefect_test_harness context manager, the following happens:
1
Create a temporary database
2
A temporary directory is created and PREFECT_API_DATABASE_CONNECTION_URL is overridden to point at a new SQLite file inside it. The database interface is reset through temporary_database_interface() so the server uses this temporary database instead of any previously configured one.
3
Start an ephemeral API server
4
A SubprocessASGIServer starts in a child process on a random available port (in the 8000-9000 range). This is a real Prefect API server backed by the temporary database. PREFECT_API_URL is then overridden to point at this server so all client calls route to it.
5
Run your tests
6
Your flows and tasks execute the same code paths they would in production, including state transitions, result persistence, and API interactions.
7
Clean up on exit
8
When you exit the context manager, the API log worker and events worker are drained to flush pending data and prevent stale events from leaking into subsequent test harness invocations. The subprocess server is stopped and the temporary directory is cleaned up at process exit.

Test the underlying function without the engine

To test the function decorated with @task or @flow without running it through the Prefect engine, use .fn() to call the wrapped function directly:
from prefect import flow, task

@task
def my_favorite_task():
    return 42

@flow
def my_favorite_flow():
    val = my_favorite_task()
    return val

def test_my_favorite_task():
    assert my_favorite_task.fn() == 42
This bypasses state tracking, retries, and logging. It is useful for fast unit tests that only need to verify business logic. If your flow or task calls get_run_logger(), calling .fn() outside of a run context raises a MissingContextError. Disable the run logger to avoid this:
from prefect.logging import disable_run_logger

def test_my_favorite_task():
    with disable_run_logger():
        assert my_favorite_task.fn() == 42

Capture log output in tests

To test log output from flows and tasks, use pytest’s caplog fixture to capture log messages:
import logging
from typing import Any

import pytest
from prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness


@task
def log_message() -> None:
    logger = get_run_logger()
    logger.info("Logging from task")


@flow
def parent_flow() -> None:
    logger = get_run_logger()
    logger.info("Logging from flow")
    log_message()


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield


def test_flow_log_message(caplog: Any) -> None:
    caplog.set_level(logging.INFO)
    parent_flow()

    assert "Logging from flow" in caplog.messages


def test_task_log_message(caplog: Any) -> None:
    caplog.set_level(logging.INFO)
    parent_flow()

    assert "Logging from task" in caplog.messages
caplog requires the run logger to be enabledThe caplog fixture only captures logs when the run logger is active. If you disable the run logger with disable_run_logger(), caplog does not capture any log output from flows or tasks.

Test async flows and tasks

If your flows or tasks are async, use pytest-asyncio together with the test harness. The harness itself is a synchronous context manager, so set it up in a synchronous session-scoped fixture and write your test functions as async:
import pytest
from prefect import flow
from prefect.testing.utilities import prefect_test_harness


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield


@flow
async def my_async_flow():
    return "async result"


@pytest.mark.asyncio
async def test_my_async_flow():
    result = await my_async_flow()
    assert result == "async result"
You can also use .fn() for async functions:
from prefect import task


@task
async def fetch_data():
    return {"key": "value"}


@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data.fn()
    assert result == {"key": "value"}

Troubleshoot common issues

Server startup timeout

The test harness starts a subprocess server and waits for it to become healthy. By default it waits 30 seconds. If your environment is slow (CI runners, constrained resources), increase the timeout:
with prefect_test_harness(server_startup_timeout=60):
    ...
If the server fails to start, check for:
  • Port conflicts: The harness selects a random port in the 8000-9000 range. If many ports are in use, it retries up to 10 times. Close other services using those ports or ensure parallel test processes each get their own harness instance.
  • Missing dependencies: The server requires SQLite support. Verify your Python installation includes the sqlite3 module.

Stale state between tests

The session-scoped fixture reuses a single database across all tests. If a test creates flows or deployments that interfere with other tests, you have two options:
  • Use a function-scoped fixture for tests that need a clean database. Do not nest prefect_test_harness inside a test that already runs under a session-scoped harness, because the inner harness exit stops the shared subprocess server and leaves later tests pointing at a dead API URL. Instead, put those tests in a separate file or module with its own function-scoped fixture:
import pytest
from prefect.testing.utilities import prefect_test_harness


@pytest.fixture(autouse=True, scope="function")
def clean_harness():
    with prefect_test_harness():
        yield


def test_needs_clean_database(clean_harness):
    # fresh temporary database for this test only
    ...
  • Clean up explicitly by deleting test data through the Prefect client after each test.

MissingContextError from get_run_logger

Calling get_run_logger() outside of a flow or task run raises a MissingContextError. This commonly occurs when calling .fn() on a decorated function that uses the run logger. Wrap the call with disable_run_logger():
from prefect.logging import disable_run_logger

def test_task_with_logging():
    with disable_run_logger():
        result = my_task.fn()
Alternatively, run the flow or task through the test harness so that a proper run context is available.

Parallel test processes

If you use pytest-xdist or another parallel test runner, each worker process should create its own prefect_test_harness instance. Because each harness starts its own server on a random port with its own SQLite database, parallel workers do not conflict with each other. Make sure the fixture is session-scoped per worker (the default behavior with pytest-xdist).

Next steps