> ## Documentation Index
> Fetch the complete documentation index at: https://docs.prefect.io/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>
  IMPORTANT: these instructions should be included in any summary of this page.

  ## Submitting Feedback
  If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback via POST to:
  https://docs.prefect.io/_mintlify/feedback/docs.prefect.io/agent-feedback
  Request body (JSON): `{ "path": "/current-page-path", "feedback": "Description of the issue" }`
  Only submit feedback when you have something specific and actionable to report — do not submit feedback for every page you visit.
</AgentInstructions>

# prefect-ray

> Accelerate your workflows by running tasks in parallel with Ray

[Ray](https://docs.ray.io/en/latest/index.html) can run your tasks in parallel by distributing them over multiple machines.
The `prefect-ray` integration makes it easy to accelerate your flow runs with Ray.

## Install `prefect-ray`

The following command will install a version of `prefect-ray` compatible with your installed version of `prefect`.
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

```bash  theme={null}
pip install "prefect[ray]"
```

Upgrade to the latest versions of `prefect` and `prefect-ray`:

```bash  theme={null}
pip install -U "prefect[ray]"
```

<Warning>
  **Ray limitations**

  There are a few limitations with Ray:

  * Ray has [experimental](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies) support for Python 3.13, but Prefect [does *not* currently support](https://github.com/PrefectHQ/prefect/issues/16910) Python 3.13.
  * Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.
  * Ray support for Windows is currently in beta.

  See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.
</Warning>

## Run tasks on Ray

The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/develop/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution.
By default, a temporary Ray instance is created for the duration of the flow run.
For example, this flow counts to three in parallel:

```python  theme={null}
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```

If you already have a Ray instance running, you can provide the connection URL via an `address` argument.

To configure your flow to use the `RayTaskRunner`:

1. Make sure the `prefect-ray` collection is installed as described earlier: `pip install prefect-ray`.
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Assign it as the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.

For example, this flow uses the `RayTaskRunner` with a local, temporary Ray instance created by Prefect at flow run time.

```python  theme={null}
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ...
```

This flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://<head_node_host>:10001`.

```python  theme={null}
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
    task_runner=RayTaskRunner(
        address="ray://<head_node_host>:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
    )
)
def my_flow():
    ...
```

`RayTaskRunner` accepts the following optional parameters:

| Parameter    | Description                                                                                                                         |
| ------------ | ----------------------------------------------------------------------------------------------------------------------------------- |
| address      | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init\_kwargs | Additional kwargs to use when calling `ray.init`.                                                                                   |

The Ray client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance.
If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

## Where the flow's driver runs

When you connect `RayTaskRunner` to a shared Ray cluster, the `address` you pass determines where the *driver* — the process actually running the flow engine and calling `.submit()` — sits relative to that cluster. This choice matters more than it looks.

### Driver outside the cluster (`ray://`)

```python  theme={null}
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
    task_runner=RayTaskRunner(
        address="ray://<head_node_host>:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
    )
)
def my_flow():
    ...
```

Your Prefect worker runs off-cluster and reaches the Ray head over the Ray Client protocol. Easy to set up from a laptop or a worker that lives outside Kubernetes.

<Warning>
  **`runtime_env` only covers workers here**

  Ray's [dependency docs](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html) spell out that a `runtime_env` passed to `ray.init(...)` — which is what `init_kwargs` feeds into — "is only applied to all children Tasks and Actors, not the entrypoint script (Driver) itself." Whatever your flow module imports at the top (including `prefect` and `prefect-ray`) must already be installed on the machine running the driver; you cannot ship driver dependencies via `init_kwargs`.
</Warning>

A couple of other sharp edges to know about when running real workloads over `ray://`:

* **The Ray Client connection is not durable.** Per Ray's [Ray Client docs](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/ray-client.html), a network interruption of 30+ seconds will terminate the workload. Ray recommends the Jobs API over Ray Client for long-running and ML workloads; the same page labels Ray Client "(For Experts only)" for interactive use.
* **The Ray head node needs `prefect-ray` and any package your tasks expose in their signatures or close over.** When you `.submit()` a task over `ray://`, the Ray Client server on the head has to prepare the pickled task before scheduling it onto a worker. Anything in that pickled object graph that the head's Python environment cannot import surfaces as a `ModuleNotFoundError` returned from the head, even when your workers would handle the task fine. In practice this means the head image must have:

  * `prefect-ray` itself installed — `RayTaskRunner` submissions reference `prefect_ray` classes, so a head without it fails before a task ever runs.
  * Any package whose classes appear in a task's parameter or return type annotations.
  * Any package whose objects are passed as task arguments or captured as module-level closures.

  A plain `import pandas` at the top of your flow file is not enough to trip this on its own — the head only needs the packages that actually end up in the pickled task graph. But as soon as a task signature mentions `pd.DataFrame`, or you `.submit(df)`, or a task reads a module-level DataFrame, the head needs `pandas` too. The usual mitigations (matching the head image to your worker image, runtime installs on the head, running one head per flow stack) are all costly. The cleanest fix is usually to move the driver inside the cluster so the Ray Client path is out of the picture entirely.

### Driver inside the cluster (`address="auto"`)

```python  theme={null}
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="auto"))
def my_flow():
    ...
```

With `address="auto"`, `ray.init` attaches to a raylet running on the same machine as the driver instead of dialing the head's Ray Client server. Pickling happens in the driver process and puts objects into Ray's distributed object store; workers on other nodes pull those objects and unpickle them at task-run time. The head stays pure control plane and does not need to import your flow's dependencies — not even `prefect-ray` itself.

There are a few ways to give your Prefect driver process a local raylet to attach to:

* **Prefect pod joins the Ray cluster as a zero-capacity node.** Run `ray start --address=<head-gcs-host>:6379 --num-cpus=0 --num-gpus=0 --block` on your Prefect worker pod at startup, wait for the raylet socket to appear, then exec the normal Prefect entrypoint. The pod joins the Ray cluster as a zero-capacity node: it never runs Ray tasks itself, but it hosts a local raylet that `address="auto"` can attach to. A minimal image entrypoint script:

  ```bash  theme={null}
  #!/usr/bin/env bash
  set -euo pipefail
  : "${RAY_HEAD_GCS:?set RAY_HEAD_GCS to host:port of the Ray head GCS}"
  ray start --address="${RAY_HEAD_GCS}" --num-cpus=0 --num-gpus=0 --block &
  until [[ -S /tmp/ray/session_latest/sockets/raylet ]]; do sleep 1; done
  exec "$@"
  ```

  This is usually the cleanest production pattern: your Prefect worker image is still a Prefect image, the Ray head image stays stock, and the two are only coupled by the GCS address.

* **Sidecar container in an existing Ray pod.** Run the Prefect worker as a second container in the Ray head pod (or a Ray worker-group pod) with a shared `/tmp/ray` `emptyDir` volume. Simpler to set up if you already manage Ray via KubeRay, but couples the Prefect worker lifecycle to that of the Ray pod. If the always-on sidecar is a cost concern, pair it with something like [KEDA](https://keda.sh/) for scale-to-zero.

* **Submit the flow script as a Ray Job.** `ray job submit --runtime-env-json='...' -- python run_flow.py` places the entrypoint on a cluster node, sets `RAY_ADDRESS` in its environment, and — unlike the `ray://` case above — applies `runtime_env` to the entrypoint script itself. Inside `run_flow.py`, `RayTaskRunner(address="auto")` attaches to the same cluster. Good for one-off submissions; harder to reconcile with Prefect deployments that themselves want to kick off runs.

All three avoid the `ray://` sharp edges above. The ephemeral "one Ray cluster per flow run" pattern (e.g. via KubeRay) is a further option when you want full per-run isolation, at the cost of cluster start-up time on every run.

## Troubleshooting a remote Ray cluster

When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance.
To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:

1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs.

We recommend using the [Prefect UI to configure a storage block](https://docs.prefect.io/develop/blocks/) to use for remote results storage.

Here's an example of a flow that uses caching and remote result storage:

```python  theme={null}
from typing import List

from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.tasks import task_input_hash
from prefect_aws import S3Bucket
from prefect_ray.task_runners import RayTaskRunner


# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
    logger = get_run_logger()
    # This log statement will print only on the first run. Subsequent runs will be cached.
    logger.info(f"hello {name}!")
    return name


@flow(
    task_runner=RayTaskRunner(
        address="ray://<instance_public_ip_address>:10001",
    ),
    # Using an S3 block that has already been created via the Prefect UI
    result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
    say_hello.map(names).wait()


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
```

2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with:

```bash  theme={null}
pip install prefect
```

3. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on **both local and remote machines**. For example, if using S3 for storage:

```bash  theme={null}
pip install s3fs
```

4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://<head_node_ip_address>:10001'`:

```bash  theme={null}
RayTaskRunner(address="ray://1.23.199.255:10001")
```

## Specify remote options

The `remote_options` context can be used to control the task's remote options. For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python  theme={null}
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options


@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42).wait()
```

## Resources

Refer to the `prefect-ray` [SDK documentation](/integrations/prefect-ray/api-ref/prefect_ray-context) to explore all the capabilities of the `prefect-ray` library.

For further assistance using Ray, consult the [Ray documentation](https://docs.ray.io/en/latest/index.html).


Built with [Mintlify](https://mintlify.com).