Skip to main content

prefect_dask.utils

Utils to use alongside prefect-dask.

Functions

get_dask_client

get_dask_client(timeout: Optional[Union[int, float, str, timedelta]] = None, **client_kwargs: Dict[str, Any]) -> Generator[Client, None, None]
Yields a temporary synchronous dask client; this is useful for parallelizing operations on dask collections, such as a dask.DataFrame or dask.Bag. Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker. When in an async context, we recommend using get_async_dask_client instead. Args:
  • timeout: Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the distributed.comm.timeouts.connect configuration value.
  • client_kwargs: Additional keyword arguments to pass to distributed.Client, and overwrites inherited keyword arguments from the task runner, if any.
Examples: Use get_dask_client to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
def compute_task():
    with get_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe()).result()
    return summary_df

@flow(task_runner=DaskTaskRunner())
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

dask_flow()

get_async_dask_client

get_async_dask_client(timeout: Optional[Union[int, float, str, timedelta]] = None, **client_kwargs: Dict[str, Any]) -> AsyncGenerator[Client, None]
Yields a temporary asynchronous dask client; this is useful for parallelizing operations on dask collections, such as a dask.DataFrame or dask.Bag. Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker. Args:
  • timeout: Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the distributed.comm.timeouts.connect configuration value.
  • client_kwargs: Additional keyword arguments to pass to distributed.Client, and overwrites inherited keyword arguments from the task runner, if any.
Examples: Use get_async_dask_client to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client

@task
async def compute_task():
    async with get_async_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = await client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner())
async def dask_flow():
    prefect_future = await compute_task.submit()
    return await prefect_future.result()

asyncio.run(dask_flow())