Skip to content

prefect.infrastructure special

base

Infrastructure pydantic-model

Source code in prefect/infrastructure/base.py
class Infrastructure(Block, abc.ABC):
    _block_schema_capabilities = ["run-infrastructure"]

    type: str

    env: Dict[str, Optional[str]] = pydantic.Field(
        default_factory=dict,
        title="Environment",
        description="Environment variables to set in the configured infrastructure.",
    )
    labels: Dict[str, str] = pydantic.Field(
        default_factory=dict,
        description="Labels applied to the infrastructure for metadata purposes.",
    )
    name: Optional[str] = pydantic.Field(
        default=None,
        description="Name applied to the infrastructure for identification.",
    )
    command: Optional[List[str]] = pydantic.Field(
        default=None,
        description="The command to run in the infrastructure.",
    )

    @abc.abstractmethod
    async def run(
        self,
        task_status: anyio.abc.TaskStatus = None,
    ) -> InfrastructureResult:
        """
        Run the infrastructure.

        If provided a `task_status`, the status will be reported as started when the
        infrastructure is successfully created. The status return value will be an
        identifier for the infrastructure.

        The call will then monitor the created infrastructure, returning a result at
        the end containing a status code indicating if the infrastructure exited cleanly
        or encountered an error.
        """
        # Note: implementations should include `sync_compatible`

    @abc.abstractmethod
    def preview(self) -> str:
        """
        View a preview of the infrastructure that would be run.
        """

    @property
    def logger(self):
        return get_logger(f"prefect.infrastructure.{self.type}")

    @classmethod
    def _base_environment(cls) -> Dict[str, str]:
        """
        Environment variables that should be passed to all created infrastructure.

        These values should be overridable with the `env` field.
        """
        return get_current_settings().to_environment_variables(exclude_unset=True)

    def prepare_for_flow_run(
        self: Self,
        flow_run: "FlowRun",
        deployment: Optional["Deployment"] = None,
        flow: Optional["Flow"] = None,
    ) -> Self:
        """
        Return an infrastructure block that is prepared to execute a flow run.
        """
        if deployment is not None:
            deployment_labels = self._base_deployment_labels(deployment)
        else:
            deployment_labels = {}

        if flow is not None:
            flow_labels = self._base_flow_labels(flow)
        else:
            flow_labels = {}

        return self.copy(
            update={
                "env": {**self._base_flow_run_environment(flow_run), **self.env},
                "labels": {
                    **self._base_flow_run_labels(flow_run),
                    **deployment_labels,
                    **flow_labels,
                    **self.labels,
                },
                "name": self.name or flow_run.name,
                "command": self.command or self._base_flow_run_command(),
            }
        )

    @staticmethod
    def _base_flow_run_command() -> List[str]:
        """
        Generate a command for a flow run job.
        """
        return ["python", "-m", "prefect.engine"]

    @staticmethod
    def _base_flow_run_labels(flow_run: "FlowRun") -> Dict[str, str]:
        """
        Generate a dictionary of labels for a flow run job.
        """
        return {
            "prefect.io/flow-run-id": str(flow_run.id),
            "prefect.io/flow-run-name": flow_run.name,
            "prefect.io/version": prefect.__version__,
        }

    @staticmethod
    def _base_flow_run_environment(flow_run: "FlowRun") -> Dict[str, str]:
        """
        Generate a dictionary of environment variables for a flow run job.
        """
        environment = {}
        environment["PREFECT__FLOW_RUN_ID"] = flow_run.id.hex
        return environment

    @staticmethod
    def _base_deployment_labels(deployment: "Deployment") -> Dict[str, str]:
        labels = {
            "prefect.io/deployment-name": deployment.name,
        }
        if deployment.updated is not None:
            labels["prefect.io/deployment-updated"] = deployment.updated.in_timezone(
                "utc"
            ).to_iso8601_string()
        return labels

    @staticmethod
    def _base_flow_labels(flow: "Flow") -> Dict[str, str]:
        return {
            "prefect.io/flow-name": flow.name,
        }

command pydantic-field

Type: List[str]

The command to run in the infrastructure.

env pydantic-field

Type: Dict[str, Optional[str]]

Environment variables to set in the configured infrastructure.

labels pydantic-field

Type: Dict[str, str]

Labels applied to the infrastructure for metadata purposes.

name pydantic-field

Type: str

Name applied to the infrastructure for identification.

Infrastructure.prepare_for_flow_run

Return an infrastructure block that is prepared to execute a flow run.

Source code in prefect/infrastructure/base.py
def prepare_for_flow_run(
    self: Self,
    flow_run: "FlowRun",
    deployment: Optional["Deployment"] = None,
    flow: Optional["Flow"] = None,
) -> Self:
    """
    Return an infrastructure block that is prepared to execute a flow run.
    """
    if deployment is not None:
        deployment_labels = self._base_deployment_labels(deployment)
    else:
        deployment_labels = {}

    if flow is not None:
        flow_labels = self._base_flow_labels(flow)
    else:
        flow_labels = {}

    return self.copy(
        update={
            "env": {**self._base_flow_run_environment(flow_run), **self.env},
            "labels": {
                **self._base_flow_run_labels(flow_run),
                **deployment_labels,
                **flow_labels,
                **self.labels,
            },
            "name": self.name or flow_run.name,
            "command": self.command or self._base_flow_run_command(),
        }
    )

Infrastructure.preview

View a preview of the infrastructure that would be run.

Source code in prefect/infrastructure/base.py
@abc.abstractmethod
def preview(self) -> str:
    """
    View a preview of the infrastructure that would be run.
    """

Infrastructure.run async

Run the infrastructure.

If provided a task_status, the status will be reported as started when the infrastructure is successfully created. The status return value will be an identifier for the infrastructure.

The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error.

Source code in prefect/infrastructure/base.py
@abc.abstractmethod
async def run(
    self,
    task_status: anyio.abc.TaskStatus = None,
) -> InfrastructureResult:
    """
    Run the infrastructure.

    If provided a `task_status`, the status will be reported as started when the
    infrastructure is successfully created. The status return value will be an
    identifier for the infrastructure.

    The call will then monitor the created infrastructure, returning a result at
    the end containing a status code indicating if the infrastructure exited cleanly
    or encountered an error.
    """
    # Note: implementations should include `sync_compatible`

docker

BaseDockerLogin pydantic-model

Source code in prefect/infrastructure/docker.py
class BaseDockerLogin(Block, ABC):
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/2IfXXfMq66mrzJBDFFCHTp/6d8f320d9e4fc4393f045673d61ab612/Moby-logo.png?h=250"
    _block_schema_capabilities = ["docker-login"]

    @abstractmethod
    async def login(self) -> "DockerClient":
        """
        Log in and return an authenticated `DockerClient`.
        (DEPRECATED) Use `get_docker_client` instead of `login`.
        """

    @abstractmethod
    async def get_docker_client(self) -> "DockerClient":
        """
        Log in and return an authenticated `DockerClient`.
        """

    def _login(self, username, password, registry_url, reauth) -> "DockerClient":
        client = self._get_docker_client()

        client.login(
            username=username,
            password=password,
            registry=registry_url,
            # See https://github.com/docker/docker-py/issues/2256 for information on
            # the default value for reauth.
            reauth=reauth,
        )

        return client

    @staticmethod
    def _get_docker_client():
        try:

            with warnings.catch_warnings():
                # Silence warnings due to use of deprecated methods within dockerpy
                # See https://github.com/docker/docker-py/pull/2931
                warnings.filterwarnings(
                    "ignore",
                    message="distutils Version classes are deprecated.*",
                    category=DeprecationWarning,
                )

                docker_client = docker.from_env()

        except docker.errors.DockerException as exc:
            raise RuntimeError(f"Could not connect to Docker.") from exc

        return docker_client

BaseDockerLogin.get_docker_client async

Log in and return an authenticated DockerClient.

Source code in prefect/infrastructure/docker.py
@abstractmethod
async def get_docker_client(self) -> "DockerClient":
    """
    Log in and return an authenticated `DockerClient`.
    """

BaseDockerLogin.login async

Log in and return an authenticated DockerClient. (DEPRECATED) Use get_docker_client instead of login.

Source code in prefect/infrastructure/docker.py
@abstractmethod
async def login(self) -> "DockerClient":
    """
    Log in and return an authenticated `DockerClient`.
    (DEPRECATED) Use `get_docker_client` instead of `login`.
    """

DockerContainer pydantic-model

Runs a command in a container.

Requires a Docker Engine to be connectable. Docker settings will be retrieved from the environment.

Attributes:

Name Description
auto_remove

If set, the container will be removed on completion. Otherwise, the container will remain after exit for inspection.

bool
command

A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.

Optional[List[str]]
env

Environment variables to set for the container.

Dict[str, Optional[str]]
image

An optional string specifying the tag of a Docker image to use. Defaults to the Prefect image.

str
image_pull_policy

Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'.

Optional[prefect.infrastructure.docker.ImagePullPolicy]
image_registry

A DockerRegistry block containing credentials to use if image is stored in a private image registry.

Optional[prefect.infrastructure.docker.DockerRegistry]
labels

An optional dictionary of labels, mapping name to value.

Dict[str, str]
name

An optional name for the container.

Optional[str]
network_mode

Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set.

Optional[str]
networks

An optional list of strings specifying Docker networks to connect the container to.

List[str]
stream_output

If set, stream output from the container to local standard output.

bool
volumes

An optional list of volume mount strings in the format of "local_path:container_path".

List[str]
Connecting to a locally hosted Prefect API

If using a local API URL on Linux, we will update the network mode default to 'host' to enable connectivity. If using another OS or an alternative network mode is used, we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally, this will enable connectivity, but the API URL can be provided as an environment variable to override inference in more complex use-cases.

Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not necessary and the API is connectable while bound to localhost.

Source code in prefect/infrastructure/docker.py
class DockerContainer(Infrastructure):
    """
    Runs a command in a container.

    Requires a Docker Engine to be connectable. Docker settings will be retrieved from
    the environment.

    Attributes:
        auto_remove: If set, the container will be removed on completion. Otherwise,
            the container will remain after exit for inspection.
        command: A list of strings specifying the command to run in the container to
            start the flow run. In most cases you should not override this.
        env: Environment variables to set for the container.
        image: An optional string specifying the tag of a Docker image to use.
            Defaults to the Prefect image.
        image_pull_policy: Specifies if the image should be pulled. One of 'ALWAYS',
            'NEVER', 'IF_NOT_PRESENT'.
        image_registry: A `DockerRegistry` block containing credentials to use if `image` is stored in a private
            image registry.
        labels: An optional dictionary of labels, mapping name to value.
        name: An optional name for the container.
        network_mode: Set the network mode for the created container. Defaults to 'host'
            if a local API url is detected, otherwise the Docker default of 'bridge' is
            used. If 'networks' is set, this cannot be set.
        networks: An optional list of strings specifying Docker networks to connect the
            container to.
        stream_output: If set, stream output from the container to local standard output.
        volumes: An optional list of volume mount strings in the format of
            "local_path:container_path".

    ## Connecting to a locally hosted Prefect API

    If using a local API URL on Linux, we will update the network mode default to 'host'
    to enable connectivity. If using another OS or an alternative network mode is used,
    we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally,
    this will enable connectivity, but the API URL can be provided as an environment
    variable to override inference in more complex use-cases.

    Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound
    to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not
    necessary and the API is connectable while bound to localhost.
    """

    type: Literal["docker-container"] = Field(
        default="docker-container", description="The type of infrastructure."
    )
    image: str = Field(
        default_factory=get_prefect_image_name,
        description="Tag of a Docker image to use. Defaults to the Prefect image.",
    )
    image_pull_policy: Optional[ImagePullPolicy] = Field(
        default=None, description="Specifies if the image should be pulled."
    )
    image_registry: Optional[DockerRegistry] = None
    networks: List[str] = Field(
        default_factory=list,
        description="A list of strings specifying Docker networks to connect the container to.",
    )
    network_mode: Optional[str] = Field(
        default=None,
        description="The network mode for the created container (e.g. host, bridge). If 'networks' is set, this cannot be set.",
    )
    auto_remove: bool = Field(
        default=False,
        description="If set, the container will be removed on completion.",
    )
    volumes: List[str] = Field(
        default_factory=list,
        description='A list of volume mount strings in the format of "local_path:container_path".',
    )
    stream_output: bool = Field(
        default=True,
        description="If set, the output will be streamed from the container to local standard output.",
    )

    _block_type_name = "Docker Container"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/2IfXXfMq66mrzJBDFFCHTp/6d8f320d9e4fc4393f045673d61ab612/Moby-logo.png?h=250"

    @validator("labels")
    def convert_labels_to_docker_format(cls, labels: Dict[str, str]):
        labels = labels or {}
        new_labels = {}
        for name, value in labels.items():
            if "/" in name:
                namespace, key = name.split("/", maxsplit=1)
                new_namespace = ".".join(reversed(namespace.split(".")))
                new_labels[f"{new_namespace}.{key}"] = value
            else:
                new_labels[name] = value
        return new_labels

    @validator("volumes")
    def check_volume_format(cls, volumes):
        for volume in volumes:
            if not ":" in volume:
                raise ValueError(
                    "Invalid volume specification. "
                    f"Expected format 'path:container_path', but got {volume!r}"
                )

        return volumes

    @sync_compatible
    async def run(
        self,
        task_status: Optional[anyio.abc.TaskStatus] = None,
    ) -> Optional[bool]:
        if not self.command:
            raise ValueError("Docker container cannot be run with empty command.")

        # The `docker` library uses requests instead of an async http library so it must
        # be run in a thread to avoid blocking the event loop.
        container = await run_sync_in_worker_thread(self._create_and_start_container)
        container_pid = self._get_infrastructure_pid(container_id=container.id)

        # Mark as started and return the infrastructure id
        if task_status:
            task_status.started(container_pid)

        # Monitor the container
        container = await run_sync_in_worker_thread(
            self._watch_container_safe, container
        )

        exit_code = container.attrs["State"].get("ExitCode")
        return DockerContainerResult(
            status_code=exit_code if exit_code is not None else -1,
            identifier=container_pid,
        )

    async def kill(self, infrastructure_pid: str, grace_seconds: int = 30):
        docker_client = self._get_client()
        base_url, container_id = self._parse_infrastructure_pid(infrastructure_pid)

        if docker_client.api.base_url != base_url:
            raise InfrastructureNotAvailable(
                "".join(
                    [
                        f"Unable to stop container {container_id!r}: the current Docker API ",
                        f"URL {docker_client.api.base_url!r} does not match the expected ",
                        f"API base URL {base_url}.",
                    ]
                )
            )
        try:
            container = docker_client.containers.get(container_id=container_id)
        except docker.errors.NotFound:
            raise InfrastructureNotFound(
                f"Unable to stop container {container_id!r}: The container was not found."
            )

        try:
            container.stop(timeout=grace_seconds)
        except Exception:
            raise

    def preview(self):
        # TODO: build and document a more sophisticated preview
        docker_client = self._get_client()
        try:
            return json.dumps(self._build_container_settings(docker_client))
        finally:
            docker_client.close()

    def _get_infrastructure_pid(self, container_id: str) -> str:
        """Generates a Docker infrastructure_pid string in the form of
        `<docker_host_base_url>:<container_id>`.
        """
        docker_client = self._get_client()
        base_url = docker_client.api.base_url
        docker_client.close()
        return f"{base_url}:{container_id}"

    def _parse_infrastructure_pid(self, infrastructure_pid: str) -> Tuple[str, str]:
        """Splits a Docker infrastructure_pid into its component parts"""

        # base_url can contain `:` so we only want the last item of the split
        base_url, container_id = infrastructure_pid.rsplit(":", 1)
        return base_url, str(container_id)

    def _build_container_settings(
        self,
        docker_client: "DockerClient",
    ) -> Dict:
        network_mode = self._get_network_mode()
        return dict(
            image=self.image,
            network=self.networks[0] if self.networks else None,
            network_mode=network_mode,
            command=self.command,
            environment=self._get_environment_variables(network_mode),
            auto_remove=self.auto_remove,
            labels={**CONTAINER_LABELS, **self.labels},
            extra_hosts=self._get_extra_hosts(docker_client),
            name=self._get_container_name(),
            volumes=self.volumes,
        )

    def _create_and_start_container(self) -> "Container":
        if self.image_registry:
            # If an image registry block was supplied, load an authenticated Docker
            # client from the block. Otherwise, use an unauthenticated client to
            # pull images from public registries.
            docker_client = self.image_registry.get_docker_client()
        else:
            docker_client = self._get_client()
        container_settings = self._build_container_settings(docker_client)

        if self._should_pull_image(docker_client):
            self.logger.info(f"Pulling image {self.image!r}...")
            self._pull_image(docker_client)

        container = self._create_container(docker_client, **container_settings)

        # Add additional networks after the container is created; only one network can
        # be attached at creation time
        if len(self.networks) > 1:
            for network_name in self.networks[1:]:
                network = docker_client.networks.get(network_name)
                network.connect(container)

        # Start the container
        container.start()

        docker_client.close()

        return container

    def _get_image_and_tag(self) -> Tuple[str, Optional[str]]:
        return parse_image_tag(self.image)

    def _determine_image_pull_policy(self) -> ImagePullPolicy:
        """
        Determine the appropriate image pull policy.

        1. If they specified an image pull policy, use that.

        2. If they did not specify an image pull policy and gave us
           the "latest" tag, use ImagePullPolicy.always.

        3. If they did not specify an image pull policy and did not
           specify a tag, use ImagePullPolicy.always.

        4. If they did not specify an image pull policy and gave us
           a tag other than "latest", use ImagePullPolicy.if_not_present.

        This logic matches the behavior of Kubernetes.
        See:https://kubernetes.io/docs/concepts/containers/images/#imagepullpolicy-defaulting
        """
        if not self.image_pull_policy:
            _, tag = self._get_image_and_tag()
            if tag == "latest" or not tag:
                return ImagePullPolicy.ALWAYS
            return ImagePullPolicy.IF_NOT_PRESENT
        return self.image_pull_policy

    def _get_network_mode(self) -> Optional[str]:
        # User's value takes precedence; this may collide with the incompatible options
        # mentioned below.
        if self.network_mode:
            if sys.platform != "linux" and self.network_mode == "host":
                warnings.warn(
                    f"{self.network_mode!r} network mode is not supported on platform "
                    f"{sys.platform!r} and may not work as intended."
                )
            return self.network_mode

        # Network mode is not compatible with networks or ports (we do not support ports
        # yet though)
        if self.networks:
            return None

        # Check for a local API connection
        api_url = self.env.get("PREFECT_API_URL", PREFECT_API_URL.value())

        if api_url:
            try:
                _, netloc, _, _, _, _ = urllib.parse.urlparse(api_url)
            except Exception as exc:
                warnings.warn(
                    f"Failed to parse host from API URL {api_url!r} with exception: "
                    f"{exc}\nThe network mode will not be inferred."
                )
                return None

            host = netloc.split(":")[0]

            # If using a locally hosted API, use a host network on linux
            if sys.platform == "linux" and (host == "127.0.0.1" or host == "localhost"):
                return "host"

        # Default to unset
        return None

    def _should_pull_image(self, docker_client: "DockerClient") -> bool:
        """
        Decide whether we need to pull the Docker image.
        """
        image_pull_policy = self._determine_image_pull_policy()

        if image_pull_policy is ImagePullPolicy.ALWAYS:
            return True
        elif image_pull_policy is ImagePullPolicy.NEVER:
            return False
        elif image_pull_policy is ImagePullPolicy.IF_NOT_PRESENT:
            try:
                # NOTE: images.get() wants the tag included with the image
                # name, while images.pull() wants them split.
                docker_client.images.get(self.image)
            except docker.errors.ImageNotFound:
                self.logger.debug(f"Could not find Docker image locally: {self.image}")
                return True
        return False

    def _pull_image(self, docker_client: "DockerClient"):
        """
        Pull the image we're going to use to create the container.
        """
        image, tag = self._get_image_and_tag()

        return docker_client.images.pull(image, tag)

    def _create_container(self, docker_client: "DockerClient", **kwargs) -> "Container":
        """
        Create a docker container with retries on name conflicts.

        If the container already exists with the given name, an incremented index is
        added.
        """
        # Create the container with retries on name conflicts (with an incremented idx)
        index = 0
        container = None
        name = original_name = kwargs.pop("name")

        while not container:
            from docker.errors import APIError

            try:
                display_name = repr(name) if name else "with auto-generated name"
                self.logger.info(f"Creating Docker container {display_name}...")
                container = docker_client.containers.create(name=name, **kwargs)
            except APIError as exc:
                if "Conflict" in str(exc) and "container name" in str(exc):
                    self.logger.info(
                        f"Docker container name {display_name} already exists; "
                        "retrying..."
                    )
                    index += 1
                    name = f"{original_name}-{index}"
                else:
                    raise

        self.logger.info(
            f"Docker container {container.name!r} has status {container.status!r}"
        )
        return container

    def _watch_container_safe(self, container: "Container") -> "Container":
        # Monitor the container capturing the latest snapshot while capturing
        # not found errors
        docker_client = self._get_client()

        try:
            for latest_container in self._watch_container(docker_client, container.id):
                container = latest_container
        except docker.errors.NotFound:
            # The container was removed during watching
            self.logger.warning(
                f"Docker container {container.name} was removed before we could wait "
                "for its completion."
            )
        finally:
            docker_client.close()

        return container

    def _watch_container(
        self, docker_client: "DockerClient", container_id: str
    ) -> Generator[None, None, "Container"]:
        container: "Container" = docker_client.containers.get(container_id)

        status = container.status
        self.logger.info(
            f"Docker container {container.name!r} has status {container.status!r}"
        )
        yield container

        if self.stream_output:
            try:
                for log in container.logs(stream=True):
                    log: bytes
                    print(log.decode().rstrip())
            except docker.errors.APIError as exc:
                if "marked for removal" in str(exc):
                    self.logger.warning(
                        f"Docker container {container.name} was marked for removal before "
                        "logs could be retrieved. Output will not be streamed. "
                    )
                else:
                    self.logger.exception(
                        "An unexpected Docker API error occured while streaming output "
                        f"from container {container.name}."
                    )

            container.reload()
            if container.status != status:
                self.logger.info(
                    f"Docker container {container.name!r} has status {container.status!r}"
                )
            yield container

        container.wait()
        self.logger.info(
            f"Docker container {container.name!r} has status {container.status!r}"
        )
        yield container

    def _get_client(self):
        try:

            with warnings.catch_warnings():
                # Silence warnings due to use of deprecated methods within dockerpy
                # See https://github.com/docker/docker-py/pull/2931
                warnings.filterwarnings(
                    "ignore",
                    message="distutils Version classes are deprecated.*",
                    category=DeprecationWarning,
                )

                docker_client = docker.from_env()

        except docker.errors.DockerException as exc:
            raise RuntimeError(f"Could not connect to Docker.") from exc

        return docker_client

    def _get_container_name(self) -> Optional[str]:
        """
        Generates a container name to match the configured name, ensuring it is Docker
        compatible.
        """
        # Must match `/?[a-zA-Z0-9][a-zA-Z0-9_.-]+` in the end
        if not self.name:
            return None

        return (
            slugify(
                self.name,
                lowercase=False,
                # Docker does not limit length but URL limits apply eventually so
                # limit the length for safety
                max_length=250,
                # Docker allows these characters for container names
                regex_pattern=r"[^a-zA-Z0-9_.-]+",
            ).lstrip(
                # Docker does not allow leading underscore, dash, or period
                "_-."
            )
            # Docker does not allow 0 character names so cast to null if the name is
            # empty after slufification
            or None
        )

    def _get_extra_hosts(self, docker_client) -> Dict[str, str]:
        """
        A host.docker.internal -> host-gateway mapping is necessary for communicating
        with the API on Linux machines. Docker Desktop on macOS will automatically
        already have this mapping.
        """
        if sys.platform == "linux" and (
            # Do not warn if the user has specified a host manually that does not use
            # a local address
            "PREFECT_API_URL" not in self.env
            or re.search(
                ".*(localhost)|(127.0.0.1)|(host.docker.internal).*",
                self.env["PREFECT_API_URL"],
            )
        ):
            user_version = packaging.version.parse(docker_client.version()["Version"])
            required_version = packaging.version.parse("20.10.0")

            if user_version < required_version:
                warnings.warn(
                    "`host.docker.internal` could not be automatically resolved to your "
                    "local ip address. This feature is not supported on Docker Engine "
                    f"v{user_version}, upgrade to v{required_version}+ if you "
                    "encounter issues."
                )
                return {}
            else:
                # Compatibility for linux -- https://github.com/docker/cli/issues/2290
                # Only supported by Docker v20.10.0+ which is our minimum recommend version
                return {"host.docker.internal": "host-gateway"}

    def _get_environment_variables(self, network_mode):
        # If the API URL has been set by the base environment rather than the by the
        # user, update the value to ensure connectivity when using a bridge network by
        # updating local connections to use the docker internal host unless the
        # network mode is "host" where localhost is available already.
        env = {**self._base_environment(), **self.env}

        if (
            "PREFECT_API_URL" in env
            and "PREFECT_API_URL" not in self.env
            and network_mode != "host"
        ):
            env["PREFECT_API_URL"] = (
                env["PREFECT_API_URL"]
                .replace("localhost", "host.docker.internal")
                .replace("127.0.0.1", "host.docker.internal")
            )

        # Drop null values allowing users to "unset" variables
        return {key: value for key, value in env.items() if value is not None}

auto_remove pydantic-field

Type: bool

If set, the container will be removed on completion.

image pydantic-field

Type: str

Tag of a Docker image to use. Defaults to the Prefect image.

image_pull_policy pydantic-field

Type: ImagePullPolicy

Specifies if the image should be pulled.

network_mode pydantic-field

Type: str

The network mode for the created container (e.g. host, bridge). If 'networks' is set, this cannot be set.

networks pydantic-field

Type: List[str]

A list of strings specifying Docker networks to connect the container to.

stream_output pydantic-field

Type: bool

If set, the output will be streamed from the container to local standard output.

volumes pydantic-field

Type: List[str]

A list of volume mount strings in the format of "local_path:container_path".

DockerContainer.preview

View a preview of the infrastructure that would be run.

Source code in prefect/infrastructure/docker.py
def preview(self):
    # TODO: build and document a more sophisticated preview
    docker_client = self._get_client()
    try:
        return json.dumps(self._build_container_settings(docker_client))
    finally:
        docker_client.close()

DockerContainerResult pydantic-model

Contains information about a completed Docker container

Source code in prefect/infrastructure/docker.py
class DockerContainerResult(InfrastructureResult):
    """Contains information about a completed Docker container"""

DockerRegistry pydantic-model

Connects to a Docker registry.

Requires a Docker Engine to be connectable.

Attributes:

Name Description
username

The username to log into the registry with.

str
password

The password to log into the registry with.

SecretStr
registry_url

The URL to the registry such as registry.hub.docker.com. Generally, "http" or "https" can be omitted.

str
reauth

If already logged into the registry, should login be performed again? This setting defaults to True to support common token authentication patterns such as ECR.

bool
Source code in prefect/infrastructure/docker.py
class DockerRegistry(BaseDockerLogin):
    """
    Connects to a Docker registry.

    Requires a Docker Engine to be connectable.

    Attributes:
        username: The username to log into the registry with.
        password: The password to log into the registry with.
        registry_url: The URL to the registry such as registry.hub.docker.com. Generally, "http" or "https" can be
            omitted.
        reauth: If already logged into the registry, should login be performed again?
            This setting defaults to `True` to support common token authentication
            patterns such as ECR.
    """

    _block_type_name = "Docker Registry"
    username: str = Field(
        default=..., description="The username to log into the registry with."
    )
    password: SecretStr = Field(
        default=..., description="The password to log into the registry with."
    )
    registry_url: str = Field(
        default=...,
        description='The URL to the registry. Generally, "http" or "https" can be omitted.',
    )
    reauth: bool = Field(
        default=True,
        description="Whether or not to reauthenticate on each interaction.",
    )

    @sync_compatible
    async def login(self) -> "DockerClient":
        warnings.warn(
            "`login` is deprecated. Instead, use `get_docker_client` to obtain an authenticated `DockerClient`.",
            category=DeprecationWarning,
            stacklevel=3,
        )
        return await self.get_docker_client()

    @sync_compatible
    async def get_docker_client(self) -> "DockerClient":
        client = await run_sync_in_worker_thread(
            self._login,
            self.username,
            self.password.get_secret_value(),
            self.registry_url,
            self.reauth,
        )

        return client

password pydantic-field required

Type: SecretStr

The password to log into the registry with.

reauth pydantic-field

Type: bool

Whether or not to reauthenticate on each interaction.

registry_url pydantic-field required

Type: str

The URL to the registry. Generally, "http" or "https" can be omitted.

username pydantic-field required

Type: str

The username to log into the registry with.

ImagePullPolicy

An enumeration.

Source code in prefect/infrastructure/docker.py
class ImagePullPolicy(AutoEnum):
    IF_NOT_PRESENT = AutoEnum.auto()
    ALWAYS = AutoEnum.auto()
    NEVER = AutoEnum.auto()

kubernetes

KubernetesImagePullPolicy

An enumeration.

Source code in prefect/infrastructure/kubernetes.py
class KubernetesImagePullPolicy(enum.Enum):
    IF_NOT_PRESENT = "IfNotPresent"
    ALWAYS = "Always"
    NEVER = "Never"

KubernetesJob pydantic-model

Runs a command as a Kubernetes Job.

Attributes:

Name Description
cluster_config

An optional Kubernetes cluster config to use for this job.

Optional[prefect.blocks.kubernetes.KubernetesClusterConfig]
command

A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.

Optional[List[str]]
customizations

A list of JSON 6902 patches to apply to the base Job manifest.

JsonPatch
env

Environment variables to set for the container.

Dict[str, Optional[str]]
image

An optional string specifying the tag of a Docker image to use for the job. Defaults to the Prefect image.

Optional[str]
image_pull_policy

The Kubernetes image pull policy to use for job containers.

Optional[prefect.infrastructure.kubernetes.KubernetesImagePullPolicy]
job

The base manifest for the Kubernetes Job.

Dict[str, Any]
job_watch_timeout_seconds

Number of seconds to watch for job creation before timing out (default 5).

int
labels

An optional dictionary of labels to add to the job.

Dict[str, str]
name

An optional name for the job.

Optional[str]
namespace

An optional string signifying the Kubernetes namespace to use.

Optional[str]
pod_watch_timeout_seconds

Number of seconds to watch for pod creation before timing out (default 5).

int
service_account_name

An optional string specifying which Kubernetes service account to use.

Optional[str]
stream_output

If set, stream output from the job to local standard output.

bool
finished_job_ttl

The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.

Optional[int]
Source code in prefect/infrastructure/kubernetes.py
class KubernetesJob(Infrastructure):
    """
    Runs a command as a Kubernetes Job.

    Attributes:
        cluster_config: An optional Kubernetes cluster config to use for this job.
        command: A list of strings specifying the command to run in the container to
            start the flow run. In most cases you should not override this.
        customizations: A list of JSON 6902 patches to apply to the base Job manifest.
        env: Environment variables to set for the container.
        image: An optional string specifying the tag of a Docker image to use for the job.
            Defaults to the Prefect image.
        image_pull_policy: The Kubernetes image pull policy to use for job containers.
        job: The base manifest for the Kubernetes Job.
        job_watch_timeout_seconds: Number of seconds to watch for job creation before timing out (default 5).
        labels: An optional dictionary of labels to add to the job.
        name: An optional name for the job.
        namespace: An optional string signifying the Kubernetes namespace to use.
        pod_watch_timeout_seconds: Number of seconds to watch for pod creation before timing out (default 5).
        service_account_name: An optional string specifying which Kubernetes service account to use.
        stream_output: If set, stream output from the job to local standard output.
        finished_job_ttl: The number of seconds to retain jobs after completion. If set, finished jobs will
            be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be
            manually removed.
    """

    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1zrSeY8DZ1MJZs2BAyyyGk/20445025358491b8b72600b8f996125b/Kubernetes_logo_without_workmark.svg.png?h=250"

    type: Literal["kubernetes-job"] = Field(
        default="kubernetes-job", description="The type of infrastructure."
    )
    # shortcuts for the most common user-serviceable settings
    image: Optional[str] = Field(
        default=None,
        description=(
            "The tag of a Docker image to use for the job. Defaults to the Prefect "
            "image unless an image is already present in a provided job manifest."
        ),
    )
    namespace: Optional[str] = Field(
        default=None,
        description=(
            "The Kubernetes namespace to use for this job. Defaults to 'default' "
            "unless a namespace is already present in a provided job manifest."
        ),
    )
    service_account_name: Optional[str] = Field(
        default=None, description="The Kubernetes service account to use for this job."
    )
    image_pull_policy: Optional[KubernetesImagePullPolicy] = Field(
        default=None,
        description="The Kubernetes image pull policy to use for job containers.",
    )

    # connection to a cluster
    cluster_config: Optional[KubernetesClusterConfig] = Field(
        default=None, description="The Kubernetes cluster config to use for this job."
    )

    # settings allowing full customization of the Job
    job: KubernetesManifest = Field(
        default_factory=lambda: KubernetesJob.base_job_manifest(),
        description="The base manifest for the Kubernetes Job.",
        title="Base Job Manifest",
    )
    customizations: JsonPatch = Field(
        default_factory=lambda: JsonPatch([]),
        description="A list of JSON 6902 patches to apply to the base Job manifest.",
    )

    # controls the behavior of execution
    job_watch_timeout_seconds: int = Field(
        default=5,
        description="Number of seconds to watch for job creation before timing out.",
    )
    pod_watch_timeout_seconds: int = Field(
        default=60,
        description="Number of seconds to watch for pod creation before timing out.",
    )
    stream_output: bool = Field(
        default=True,
        description="If set, output will be streamed from the job to local standard output.",
    )
    finished_job_ttl: Optional[int] = Field(
        default=None,
        description="The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.",
    )

    # internal-use only right now
    _api_dns_name: Optional[str] = None  # Replaces 'localhost' in API URL

    _block_type_name = "Kubernetes Job"

    @validator("job")
    def ensure_job_includes_all_required_components(cls, value: KubernetesManifest):
        patch = JsonPatch.from_diff(value, cls.base_job_manifest())
        missing_paths = sorted([op["path"] for op in patch if op["op"] == "add"])
        if missing_paths:
            raise ValueError(
                "Job is missing required attributes at the following paths: "
                f"{', '.join(missing_paths)}"
            )
        return value

    @validator("job")
    def ensure_job_has_compatible_values(cls, value: KubernetesManifest):
        patch = JsonPatch.from_diff(value, cls.base_job_manifest())
        incompatible = sorted(
            [
                f"{op['path']} must have value {op['value']!r}"
                for op in patch
                if op["op"] == "replace"
            ]
        )
        if incompatible:
            raise ValueError(
                "Job has incompatble values for the following attributes: "
                f"{', '.join(incompatible)}"
            )
        return value

    @validator("customizations", pre=True)
    def cast_customizations_to_a_json_patch(
        cls, value: Union[List[Dict], JsonPatch]
    ) -> JsonPatch:
        if isinstance(value, list):
            return JsonPatch(value)
        return value

    @root_validator
    def default_namespace(cls, values):
        job = values.get("job")

        namespace = values.get("namespace")
        job_namespace = job["metadata"].get("namespace") if job else None

        if not namespace and not job_namespace:
            values["namespace"] = "default"

        return values

    @root_validator
    def default_image(cls, values):
        job = values.get("job")
        image = values.get("image")
        job_image = (
            job["spec"]["template"]["spec"]["containers"][0].get("image")
            if job
            else None
        )

        if not image and not job_image:
            values["image"] = get_prefect_image_name()

        return values

    # Support serialization of the 'JsonPatch' type
    class Config:
        arbitrary_types_allowed = True
        json_encoders = {JsonPatch: lambda p: p.patch}

    def dict(self, *args, **kwargs) -> Dict:
        d = super().dict(*args, **kwargs)
        d["customizations"] = self.customizations.patch
        return d

    @classmethod
    def base_job_manifest(cls) -> KubernetesManifest:
        """Produces the bare minimum allowed Job manifest"""
        return {
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {"labels": {}},
            "spec": {
                "template": {
                    "spec": {
                        "parallelism": 1,
                        "completions": 1,
                        "restartPolicy": "Never",
                        "containers": [
                            {
                                "name": "prefect-job",
                                "env": [],
                            }
                        ],
                    }
                }
            },
        }

    # Note that we're using the yaml package to load both YAML and JSON files below.
    # This works because YAML is a strict superset of JSON:
    #
    #   > The YAML 1.23 specification was published in 2009. Its primary focus was
    #   > making YAML a strict superset of JSON. It also removed many of the problematic
    #   > implicit typing recommendations.
    #
    #   https://yaml.org/spec/1.2.2/#12-yaml-history

    @classmethod
    def job_from_file(cls, filename: str) -> KubernetesManifest:
        """Load a Kubernetes Job manifest from a YAML or JSON file."""
        with open(filename, "r", encoding="utf-8") as f:
            return yaml.load(f, yaml.SafeLoader)

    @classmethod
    def customize_from_file(cls, filename: str) -> JsonPatch:
        """Load an RFC 6902 JSON patch from a YAML or JSON file."""
        with open(filename, "r", encoding="utf-8") as f:
            return JsonPatch(yaml.load(f, yaml.SafeLoader))

    @sync_compatible
    async def run(
        self,
        task_status: Optional[anyio.abc.TaskStatus] = None,
    ) -> Optional[bool]:
        if not self.command:
            raise ValueError("Kubernetes job cannot be run with empty command.")

        self._configure_kubernetes_library_client()
        manifest = self.build_job()
        job_name = await run_sync_in_worker_thread(self._create_job, manifest)

        job_pid = self._get_infrastructure_pid(job_name)
        # Indicate that the job has started
        if task_status is not None:
            task_status.started(job_pid)

        # Monitor the job
        return await run_sync_in_worker_thread(self._watch_job, job_name)

    async def kill(self, infrastructure_pid: str, grace_seconds: int = 30):
        self._configure_kubernetes_library_client()
        job_cluster, job_name = self._parse_infrastructure_pid(infrastructure_pid)
        current_cluster = self._get_active_cluster_name()
        if job_cluster != current_cluster:
            raise InfrastructureNotAvailable(
                f"Unable to stop job {job_name!r}: the job is running on cluster ",
                f"{job_cluster!r}, but your current context is attached to cluster ",
                f"{current_cluster!r}.",
            )

        with self.get_batch_client() as batch_client:
            try:
                batch_client.delete_namespaced_job(
                    name=job_name,
                    namespace=self.namespace,
                    grace_period_seconds=grace_seconds,
                    propagation_policy="Foreground",
                )
            except kubernetes.client.exceptions.ApiException as exc:
                if exc.status == 404:
                    raise InfrastructureNotFound(
                        f"Unable to stop job {job_name!r}: The job was not found."
                    ) from exc
                else:
                    raise

    def preview(self):
        return yaml.dump(self.build_job())

    def build_job(self) -> KubernetesManifest:
        """Builds the Kubernetes Job Manifest"""
        job_manifest = copy.copy(self.job)
        job_manifest = self._shortcut_customizations().apply(job_manifest)
        job_manifest = self.customizations.apply(job_manifest)
        return job_manifest

    @contextmanager
    def get_batch_client(self) -> Generator["BatchV1Api", None, None]:
        with kubernetes.client.ApiClient() as client:
            try:
                yield kubernetes.client.BatchV1Api(api_client=client)
            finally:
                client.rest_client.pool_manager.clear()

    @contextmanager
    def get_client(self) -> Generator["CoreV1Api", None, None]:
        with kubernetes.client.ApiClient() as client:
            try:
                yield kubernetes.client.CoreV1Api(api_client=client)
            finally:
                client.rest_client.pool_manager.clear()

    def _get_infrastructure_pid(self, job_name: str) -> str:
        """Generates a kubernetes pid string in the form of `<cluster_name>:<job_name>`."""
        try:
            cluster_name = self._get_active_cluster_name()
        except kubernetes.config.config_exception.ConfigException:
            cluster_name = "in-cluster-config"
        job_pid = f"{cluster_name}:{job_name}"
        return job_pid

    def _parse_infrastructure_pid(self, infrastructure_pid: str) -> Tuple[str, str]:
        """Splits a kubernetes infrastructure pid into its component parts."""
        cluster_name, job_name = infrastructure_pid.rsplit(":", 1)
        return cluster_name, job_name

    def _get_active_cluster_name(self) -> str:
        _, active_context = kubernetes.config.list_kube_config_contexts()
        cluster_name = active_context["context"]["cluster"]
        return cluster_name

    def _configure_kubernetes_library_client(self) -> None:
        """Set the correct kubernetes client configuration.

        Important: this is NOT threadsafe.

        TODO: investigate returning a client
        """
        # if a k8s cluster block is provided to the flow runner, use that
        if self.cluster_config:
            self.cluster_config.configure_client()
        else:
            # If no block specified, try to load Kubernetes configuration within a cluster. If that doesn't
            # work, try to load the configuration from the local environment, allowing
            # any further ConfigExceptions to bubble up.
            try:
                kubernetes.config.load_incluster_config()
            except kubernetes.config.ConfigException:
                kubernetes.config.load_kube_config()

    def _shortcut_customizations(self) -> JsonPatch:
        """Produces the JSON 6902 patch for the most commonly used customizations, like
        image and namespace, which we offer as top-level parameters (with sensible
        default values)"""
        shortcuts = []

        if self.namespace:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/metadata/namespace",
                    "value": self.namespace,
                }
            )

        if self.image:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/spec/template/spec/containers/0/image",
                    "value": self.image,
                }
            )

        shortcuts += [
            {
                "op": "add",
                "path": f"/metadata/labels/{self._slugify_label_key(key).replace('/', '~1', 1)}",
                "value": self._slugify_label_value(value),
            }
            for key, value in self.labels.items()
        ]

        shortcuts += [
            {
                "op": "add",
                "path": "/spec/template/spec/containers/0/env/-",
                "value": {"name": key, "value": value},
            }
            for key, value in self._get_environment_variables().items()
        ]

        if self.image_pull_policy:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/spec/template/spec/containers/0/imagePullPolicy",
                    "value": self.image_pull_policy.value,
                }
            )

        if self.service_account_name:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/spec/template/spec/serviceAccountName",
                    "value": self.service_account_name,
                }
            )

        if self.finished_job_ttl is not None:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/spec/ttlSecondsAfterFinished",
                    "value": self.finished_job_ttl,
                }
            )

        if self.command:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/spec/template/spec/containers/0/args",
                    "value": self.command,
                }
            )

        if self.name:
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/metadata/generateName",
                    "value": self._slugify_name(self.name) + "-",
                }
            )
        else:
            # Generate name is required
            shortcuts.append(
                {
                    "op": "add",
                    "path": "/metadata/generateName",
                    "value": "prefect-job-"
                    # We generate a name using a hash of the primary job settings
                    + stable_hash(
                        *self.command,
                        *self.env.keys(),
                        *[v for v in self.env.values() if v is not None],
                    )
                    + "-",
                }
            )

        return JsonPatch(shortcuts)

    def _get_job(self, job_id: str) -> Optional["V1Job"]:
        with self.get_batch_client() as batch_client:
            try:
                job = batch_client.read_namespaced_job(job_id, self.namespace)
            except kubernetes.ApiException:
                self.logger.error(f"Job{job_id!r} was removed.", exc_info=True)
                return None
            return job

    def _get_job_pod(self, job_name: str) -> "V1Pod":
        """Get the first running pod for a job."""

        # Wait until we find a running pod for the job
        watch = kubernetes.watch.Watch()
        self.logger.debug(f"Job {job_name!r}: Starting watch for pod start...")
        last_phase = None
        with self.get_client() as client:
            for event in watch.stream(
                func=client.list_namespaced_pod,
                namespace=self.namespace,
                label_selector=f"job-name={job_name}",
                timeout_seconds=self.pod_watch_timeout_seconds,
            ):
                phase = event["object"].status.phase
                if phase != last_phase:
                    self.logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")

                if phase != "Pending":
                    watch.stop()
                    return event["object"]

                last_phase = phase

        self.logger.error(f"Job {job_name!r}: Pod never started.")

    def _watch_job(self, job_name: str) -> bool:
        job = self._get_job(job_name)
        if not job:
            return KubernetesJobResult(status_code=-1, identifier=job_name)

        pod = self._get_job_pod(job_name)
        if not pod:
            return KubernetesJobResult(status_code=-1, identifier=job.metadata.name)

        if self.stream_output:
            with self.get_client() as client:
                logs = client.read_namespaced_pod_log(
                    pod.metadata.name,
                    self.namespace,
                    follow=True,
                    _preload_content=False,
                )
                for log in logs.stream():
                    print(log.decode().rstrip())

        # Wait for job to complete
        self.logger.debug(f"Job {job_name!r}: Starting watch for job completion")
        watch = kubernetes.watch.Watch()
        with self.get_batch_client() as batch_client:
            for event in watch.stream(
                func=batch_client.list_namespaced_job,
                field_selector=f"metadata.name={job_name}",
                namespace=self.namespace,
                timeout_seconds=self.job_watch_timeout_seconds,
            ):
                if event["object"].status.completion_time:
                    watch.stop()
                    break
            else:
                self.logger.error(f"Job {job_name!r}: Job did not complete.")
                return KubernetesJobResult(status_code=-1, identifier=job.metadata.name)

        with self.get_client() as client:
            pod_status = client.read_namespaced_pod_status(
                namespace=self.namespace, name=pod.metadata.name
            )
            first_container_status = pod_status.status.container_statuses[0]

        return KubernetesJobResult(
            status_code=first_container_status.state.terminated.exit_code,
            identifier=job.metadata.name,
        )

    def _create_job(self, job_manifest: KubernetesManifest) -> str:
        """
        Given a Kubernetes Job Manifest, create the Job on the configured Kubernetes
        cluster and return its name.
        """
        with self.get_batch_client() as batch_client:
            job = batch_client.create_namespaced_job(self.namespace, job_manifest)
        return job.metadata.name

    def _slugify_name(self, name: str) -> str:
        """
        Slugify text for use as a name.

        Keeps only alphanumeric characters and dashes, and caps the length
        of the slug at 45 chars.

        The 45 character length allows room for the k8s utility
        "generateName" to generate a unique name from the slug while
        keeping the total length of a name below 63 characters, which is
        the limit for e.g. label names that follow RFC 1123 (hostnames) and
        RFC 1035 (domain names).

        Args:
            name: The name of the job

        Returns:
            the slugified job name
        """
        slug = slugify(
            name,
            max_length=45,  # Leave enough space for generateName
            regex_pattern=r"[^a-zA-Z0-9-]+",
        )

        # TODO: Handle the case that the name is an empty string after being
        # slugified.

        return slug

    def _slugify_label_key(self, key: str) -> str:
        """
        Slugify text for use as a label key.

        Keys are composed of an optional prefix and name, separated by a slash (/).

        Keeps only alphanumeric characters, dashes, underscores, and periods.
        Limits the length of the label prefix to 253 characters.
        Limits the length of the label name to 63 characters.

        See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

        Args:
            key: The label key

        Returns:
            The slugified label key
        """
        if "/" in key:
            prefix, name = key.split("/", maxsplit=1)
        else:
            prefix = None
            name = key

        name_slug = (
            slugify(name, max_length=63, regex_pattern=r"[^a-zA-Z0-9-_.]+",).strip(
                "_-."  # Must start or end with alphanumeric characters
            )
            or name
        )
        # Fallback to the original if we end up with an empty slug, this will allow
        # Kubernetes to throw the validation error

        if prefix:
            prefix_slug = (
                slugify(
                    prefix,
                    max_length=253,
                    regex_pattern=r"[^a-zA-Z0-9-\.]+",
                ).strip(
                    "_-."
                )  # Must start or end with alphanumeric characters
                or prefix
            )

            return f"{prefix_slug}/{name_slug}"

        return name_slug

    def _slugify_label_value(self, value: str) -> str:
        """
        Slugify text for use as a label value.

        Keeps only alphanumeric characters, dashes, underscores, and periods.
        Limits the total length of label text to below 63 characters.

        See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

        Args:
            value: The text for the label

        Returns:
            The slugified value
        """
        slug = (
            slugify(value, max_length=63, regex_pattern=r"[^a-zA-Z0-9-_\.]+",).strip(
                "_-."  # Must start or end with alphanumeric characters
            )
            or value
        )
        # Fallback to the original if we end up with an empty slug, this will allow
        # Kubernetes to throw the validation error

        return slug

    def _get_environment_variables(self):
        # If the API URL has been set by the base environment rather than the by the
        # user, update the value to ensure connectivity when using a bridge network by
        # updating local connections to use the internal host
        env = {**self._base_environment(), **self.env}

        if (
            "PREFECT_API_URL" in env
            and "PREFECT_API_URL" not in self.env
            and self._api_dns_name
        ):
            env["PREFECT_API_URL"] = (
                env["PREFECT_API_URL"]
                .replace("localhost", self._api_dns_name)
                .replace("127.0.0.1", self._api_dns_name)
            )

        # Drop null values allowing users to "unset" variables
        return {key: value for key, value in env.items() if value is not None}

cluster_config pydantic-field

Type: KubernetesClusterConfig

The Kubernetes cluster config to use for this job.

customizations pydantic-field

Type: JsonPatch

A list of JSON 6902 patches to apply to the base Job manifest.

finished_job_ttl pydantic-field

Type: int

The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.

image pydantic-field

Type: str

The tag of a Docker image to use for the job. Defaults to the Prefect image unless an image is already present in a provided job manifest.

image_pull_policy pydantic-field

Type: KubernetesImagePullPolicy

The Kubernetes image pull policy to use for job containers.

job pydantic-field

Type: Dict[str, Any]

The base manifest for the Kubernetes Job.

job_watch_timeout_seconds pydantic-field

Type: int

Number of seconds to watch for job creation before timing out.

namespace pydantic-field

Type: str

The Kubernetes namespace to use for this job. Defaults to 'default' unless a namespace is already present in a provided job manifest.

pod_watch_timeout_seconds pydantic-field

Type: int

Number of seconds to watch for pod creation before timing out.

service_account_name pydantic-field

Type: str

The Kubernetes service account to use for this job.

stream_output pydantic-field

Type: bool

If set, output will be streamed from the job to local standard output.

KubernetesJob.__json_encoder__ special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

KubernetesJob.base_job_manifest classmethod

Produces the bare minimum allowed Job manifest

Source code in prefect/infrastructure/kubernetes.py
@classmethod
def base_job_manifest(cls) -> KubernetesManifest:
    """Produces the bare minimum allowed Job manifest"""
    return {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {"labels": {}},
        "spec": {
            "template": {
                "spec": {
                    "parallelism": 1,
                    "completions": 1,
                    "restartPolicy": "Never",
                    "containers": [
                        {
                            "name": "prefect-job",
                            "env": [],
                        }
                    ],
                }
            }
        },
    }

KubernetesJob.build_job

Builds the Kubernetes Job Manifest

Source code in prefect/infrastructure/kubernetes.py
def build_job(self) -> KubernetesManifest:
    """Builds the Kubernetes Job Manifest"""
    job_manifest = copy.copy(self.job)
    job_manifest = self._shortcut_customizations().apply(job_manifest)
    job_manifest = self.customizations.apply(job_manifest)
    return job_manifest

KubernetesJob.customize_from_file classmethod

Load an RFC 6902 JSON patch from a YAML or JSON file.

Source code in prefect/infrastructure/kubernetes.py
@classmethod
def customize_from_file(cls, filename: str) -> JsonPatch:
    """Load an RFC 6902 JSON patch from a YAML or JSON file."""
    with open(filename, "r", encoding="utf-8") as f:
        return JsonPatch(yaml.load(f, yaml.SafeLoader))

KubernetesJob.dict

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Source code in prefect/infrastructure/kubernetes.py
def dict(self, *args, **kwargs) -> Dict:
    d = super().dict(*args, **kwargs)
    d["customizations"] = self.customizations.patch
    return d

KubernetesJob.job_from_file classmethod

Load a Kubernetes Job manifest from a YAML or JSON file.

Source code in prefect/infrastructure/kubernetes.py
@classmethod
def job_from_file(cls, filename: str) -> KubernetesManifest:
    """Load a Kubernetes Job manifest from a YAML or JSON file."""
    with open(filename, "r", encoding="utf-8") as f:
        return yaml.load(f, yaml.SafeLoader)

KubernetesJob.preview

View a preview of the infrastructure that would be run.

Source code in prefect/infrastructure/kubernetes.py
def preview(self):
    return yaml.dump(self.build_job())

KubernetesJobResult pydantic-model

Contains information about the final state of a completed Kubernetes Job

Source code in prefect/infrastructure/kubernetes.py
class KubernetesJobResult(InfrastructureResult):
    """Contains information about the final state of a completed Kubernetes Job"""

KubernetesRestartPolicy

An enumeration.

Source code in prefect/infrastructure/kubernetes.py
class KubernetesRestartPolicy(enum.Enum):
    ON_FAILURE = "OnFailure"
    NEVER = "Never"

process

Process pydantic-model

Run a command in a new process.

Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.

Attributes:

Name Description
command

A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.

Optional[List[str]]
env

Environment variables to set for the new process.

Dict[str, Optional[str]]
labels

Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself.

Dict[str, str]
name

A name for the process. For display purposes only.

Optional[str]
Source code in prefect/infrastructure/process.py
class Process(Infrastructure):
    """
    Run a command in a new process.

    Current environment variables and Prefect settings will be included in the created
    process. Configured environment variables will override any current environment
    variables.

    Attributes:
        command: A list of strings specifying the command to run in the container to
            start the flow run. In most cases you should not override this.
        env: Environment variables to set for the new process.
        labels: Labels for the process. Labels are for metadata purposes only and
            cannot be attached to the process itself.
        name: A name for the process. For display purposes only.
    """

    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/39WQhVu4JK40rZWltGqhuC/d15be6189a0cb95949a6b43df00dcb9b/image5.png?h=250"

    type: Literal["process"] = Field(
        default="process", description="The type of infrastructure."
    )
    stream_output: bool = Field(
        default=True,
        description="If set, output will be streamed from the process to local standard output.",
    )
    working_dir: Union[str, Path, None] = Field(
        default=None,
        description="If set, the process will open within the specified path as the working directory."
        " Otherwise, a temporary directory will be created.",
    )  # Underlying accepted types are str, bytes, PathLike[str], None

    @sync_compatible
    async def run(
        self,
        task_status: anyio.abc.TaskStatus = None,
    ) -> "ProcessResult":
        if not self.command:
            raise ValueError("Process cannot be run with empty command.")

        _use_threaded_child_watcher()
        display_name = f" {self.name!r}" if self.name else ""

        # Open a subprocess to execute the flow run
        self.logger.info(f"Opening process{display_name}...")
        working_dir_ctx = (
            tempfile.TemporaryDirectory(suffix="prefect")
            if not self.working_dir
            else contextlib.nullcontext(self.working_dir)
        )
        with working_dir_ctx as working_dir:
            self.logger.debug(
                f"Process{display_name} running command: {' '.join(self.command)} in {working_dir}"
            )

            process = await run_process(
                self.command,
                stream_output=self.stream_output,
                task_status=task_status,
                task_status_handler=_infrastructure_pid_from_process,
                env=self._get_environment_variables(),
                cwd=working_dir,
            )

        # Use the pid for display if no name was given
        display_name = display_name or f" {process.pid}"

        if process.returncode:
            help_message = None
            if process.returncode == -9:
                help_message = (
                    "This indicates that the process exited due to a SIGKILL signal. "
                    "Typically, this is either caused by manual cancellation or "
                    "high memory usage causing the operating system to "
                    "terminate the process."
                )
            if process.returncode == -15:
                help_message = (
                    "This indicates that the process exited due to a SIGTERM signal. "
                    "Typically, this is caused by manual cancellation."
                )
            elif process.returncode == 247:
                help_message = (
                    "This indicates that the process was terminated due to high "
                    "memory usage."
                )

            self.logger.error(
                f"Process{display_name} exited with status code: "
                f"{process.returncode}" + (f"; {help_message}" if help_message else "")
            )
        else:
            self.logger.info(f"Process{display_name} exited cleanly.")

        return ProcessResult(
            status_code=process.returncode, identifier=str(process.pid)
        )

    async def kill(self, infrastructure_pid: str, grace_seconds: int = 30):
        hostname, pid = _parse_infrastructure_pid(infrastructure_pid)

        if hostname != socket.gethostname():
            raise InfrastructureNotAvailable(
                f"Unable to kill process {pid!r}: The process is running on a different host {hostname!r}."
            )

        # In a non-windows enviornment first send a SIGTERM, then, after
        # `grace_seconds` seconds have passed subsequent send SIGKILL. In
        # Windows we use CTRL_BREAK_EVENT as SIGTERM is useless:
        # https://bugs.python.org/issue26350
        if sys.platform == "win32":
            try:
                os.kill(pid, signal.CTRL_BREAK_EVENT)
            except ProcessLookupError:
                # The process exited before we were able to kill it.
                return
        else:
            try:
                os.kill(pid, signal.SIGTERM)
            except ProcessLookupError:
                raise InfrastructureNotFound(
                    f"Unable to kill process {pid!r}: The process was not found."
                )

            await anyio.sleep(grace_seconds)

            try:
                os.kill(pid, signal.SIGKILL)
            except ProcessLookupError:
                # The process likely exited due to the SIGTERM above.
                return

    def preview(self):
        environment = self._get_environment_variables(include_os_environ=False)
        return " \\\n".join(
            [f"{key}={value}" for key, value in environment.items()]
            + [" ".join(self.command)]
        )

    def _get_environment_variables(self, include_os_environ: bool = True):
        os_environ = os.environ if include_os_environ else {}
        # The base environment must override the current environment or
        # the Prefect settings context may not be respected
        env = {**os_environ, **self._base_environment(), **self.env}

        # Drop null values allowing users to "unset" variables
        return {key: value for key, value in env.items() if value is not None}

    def _base_flow_run_command(self):
        return [sys.executable, "-m", "prefect.engine"]

stream_output pydantic-field

Type: bool

If set, output will be streamed from the process to local standard output.

working_dir pydantic-field

Type: Union[str, pathlib.Path]

If set, the process will open within the specified path as the working directory. Otherwise, a temporary directory will be created.

Process.preview

View a preview of the infrastructure that would be run.

Source code in prefect/infrastructure/process.py
def preview(self):
    environment = self._get_environment_variables(include_os_environ=False)
    return " \\\n".join(
        [f"{key}={value}" for key, value in environment.items()]
        + [" ".join(self.command)]
    )

ProcessResult pydantic-model

Contains information about the final state of a completed process

Source code in prefect/infrastructure/process.py
class ProcessResult(InfrastructureResult):
    """Contains information about the final state of a completed process"""