Skip to main content

prefect_ray.task_runners

Interface and implementations of the Ray Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow. Example:
import time

from prefect import flow, task

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
Switching to a RayTaskRunner:
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9

Classes

PrefectRayFuture

Methods:

add_done_callback

add_done_callback(self, fn: Callable[['PrefectRayFuture[R]'], Any])

result

result(self, timeout: float | None = None, raise_on_failure: bool = True) -> R

wait

wait(self, timeout: float | None = None) -> None

RayTaskRunner

A parallel task_runner that submits tasks to ray. By default, a temporary Ray cluster is created for the duration of the flow run. Alternatively, if you already have a ray instance running, you can provide the connection URL via the address kwarg. Args: address (string, optional): Address of a currently running ray instance; if one is not provided, a temporary instance will be created. init_kwargs (dict, optional): Additional kwargs to use when calling ray.init. Examples: Using a temporary local ray cluster:
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ...
Connecting to an existing ray instance:
RayTaskRunner(address="ray://<head_node_host>:10001")
Methods:

duplicate

duplicate(self)
Return a new instance of with the same settings as this one.

map

map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

map

map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

map

map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

submit

submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]

submit

submit(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]

submit

submit(self, task: Task[P, R], parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None)