Abstract base class for task runners.A task runner is responsible for submitting tasks to the task run engine running
in an execution environment. Submitted tasks are non-blocking and return a future
object that can be used to wait for the task to complete and retrieve the result.Task runners are context managers and should be used in a with block to ensure
proper cleanup of resources.Methods:
A task runner that executes tasks in a separate thread pool.Attributes:
max_workers: The maximum number of threads to use for executing tasks.
Defaults to PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS or sys.maxsize.
Examples:Use a thread pool task runner with a flow:
from prefect import flow, taskfrom prefect.task_runners import ThreadPoolTaskRunner@taskdef some_io_bound_task(x: int) -> int: # making a query to a database, reading a file, etc. return x * 2@flow(task_runner=ThreadPoolTaskRunner(max_workers=3)) # use at most 3 threads at a timedef my_io_bound_flow(): futures = [] for i in range(10): future = some_io_bound_task.submit(i * 100) futures.append(future) return [future.result() for future in futures]
Use a thread pool task runner as a context manager:
from prefect.task_runners import ThreadPoolTaskRunner@taskdef some_io_bound_task(x: int) -> int: # making a query to a database, reading a file, etc. return x * 2# Use the runner directlywith ThreadPoolTaskRunner(max_workers=2) as runner: future1 = runner.submit(some_io_bound_task, {"x": 1}) future2 = runner.submit(some_io_bound_task, {"x": 2}) result1 = future1.result() # 2 result2 = future2.result() # 4
Configure max workers via settings:
# Set via environment variable# export PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS=8from prefect import flowfrom prefect.task_runners import ThreadPoolTaskRunner@flow(task_runner=ThreadPoolTaskRunner()) # Uses 8 workers from settingdef my_flow(): ...
A task runner that executes tasks in a separate process pool.This task runner uses ProcessPoolExecutor to run tasks in separate processes,
providing true parallelism for CPU-bound tasks and process isolation. Tasks
are executed with proper context propagation and error handling.Attributes:
max_workers: The maximum number of processes to use for executing tasks.
Defaults to multiprocessing.cpu_count() if PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS is not set.
Examples:Use a process pool task runner with a flow:
from prefect import flow, taskfrom prefect.task_runners import ProcessPoolTaskRunner@taskdef compute_heavy_task(n: int) -> int: # CPU-intensive computation that benefits from process isolation return sum(i ** 2 for i in range(n))@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))def my_flow(): futures = [] for i in range(10): future = compute_heavy_task.submit(i * 1000) futures.append(future) return [future.result() for future in futures]
Use a process pool task runner as a context manager:
from prefect.task_runners import ProcessPoolTaskRunner@taskdef my_task(x: int) -> int: return x * 2# Use the runner directlywith ProcessPoolTaskRunner(max_workers=2) as runner: future1 = runner.submit(my_task, {"x": 1}) future2 = runner.submit(my_task, {"x": 2}) result1 = future1.result() # 2 result2 = future2.result() # 4
Configure max workers via settings:
# Set via environment variable# export PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS=8from prefect import flowfrom prefect.task_runners import ProcessPoolTaskRunner@flow(task_runner=ProcessPoolTaskRunner()) # Uses 8 workers from settingdef my_flow(): ...