# ResourceManager Tasks


# ResourceManager

class

prefect.tasks.core.resource_manager.ResourceManager

(resource_class, name=None, init_task_kwargs=None, setup_task_kwargs=None, cleanup_task_kwargs=None)[source]

An object for managing temporary resources.

Used as a context manager, ResourceManager objects create tasks to setup and cleanup temporary objects used within a block of tasks. Examples might include temporary Dask/Spark clusters, Docker containers, etc...

ResourceManager objects are usually created using the resource_manager decorator, but can be created directly using this class if desired.

For more information, see the docs for resource_manager.

Args:

  • resource_class (Callable): A callable (usually the class itself) for creating an object that follows the ResourceManager protocol.
  • name (str, optional): The resource name - defaults to the name of the decorated class.
  • init_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the init task.
  • setup_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the setup task.
  • cleanup_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the cleanup task.
Example:

Here's an example resource manager for creating a temporary local dask cluster as part of the flow.

from prefect import resource_manager
from dask.distributed import Client

@resource_manager
class DaskCluster:
    def __init__(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return Client(n_workers=self.n_workers)

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        client.close()

To use the DaskCluster resource manager as part of your Flow, you can use DaskCluster as a context manager:

with Flow("example") as flow:
    n_workers = Parameter("n_workers")

    with DaskCluster(n_workers=n_workers) as client:
        some_task(client)
        some_other_task(client)



# Functions

top-level functions:                                                                                                                                                       

prefect.tasks.core.resource_manager.resource_manager

(resource_class=None, name=None, init_task_kwargs=None, setup_task_kwargs=None, cleanup_task_kwargs=None)[source]

A decorator for creating a ResourceManager object.

Used as a context manager, ResourceManager objects create tasks to setup and/or cleanup temporary objects used within a block of tasks. Examples might include temporary Dask/Spark clusters, Docker containers, etc...

Through usage a ResourceManager object adds up to three tasks to the graph: - A init task, which returns an object that meets the ResourceManager protocol. This protocol contains two methods: * setup(self) -> resource: A method for creating the resource. The return value from this will available to user tasks. If no setup is required, the setup method may be left undefined. * cleanup(self, resource) -> None: A method for cleaning up the resource. This takes the return value from setup (or None if no setup method) and shouldn't return anything. - A setup task, which calls the optional setup method on the ResourceManager - A cleanup task, which calls the cleanup method on the ResourceManager.

Args:

  • resource_class (Callable): The decorated class.
  • name (str, optional): The resource name - defaults to the name of the decorated class.
  • init_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the init task.
  • setup_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the optional setup task.
  • cleanup_task_kwargs (dict, optional): keyword arguments that will be passed to the Task constructor for the cleanup task.
Returns:
  • ResourceManager: the created ResourceManager object.
Example:

Here's an example resource manager for creating a temporary local dask cluster as part of the flow.


from prefect import resource_manager
from dask.distributed import Client

@resource_manager
class DaskCluster:
def init(self, n_workers):
self.n_workers = n_workers

def setup(self):
"Create a local dask cluster"
return Client(n_workers=self.n_workers)

def cleanup(self, client):
"Cleanup the local dask cluster"
client.close()



To use the DaskCluster resource manager as part of your Flow, you can use DaskCluster as a context manager:


with Flow("example") as flow:
n_workers = Parameter("n_workers")

with DaskCluster(n_workers=n_workers) as client:
some_task(client)
some_other_task(client)



The Task returned by entering the DaskCluster context (i.e. the client part of as client) is the output of the setup method on the ResourceManager class. A Task is automatically added to call the cleanup method (closing the Dask cluster) after all tasks under the context have completed. By default this cleanup task is configured with a trigger to always run if the setup task succeeds, and won't be set as a reference task.

This documentation was auto-generated from commit ffa9a6c
on February 1, 2023 at 18:44 UTC