Skip to content

prefect.packaging special

base

PackageManifest pydantic-model

Describes a package.

Source code in prefect/packaging/base.py
class PackageManifest(BaseModel, abc.ABC):
    """
    Describes a package.
    """

    type: str
    flow_name: str
    flow_parameter_schema: ParameterSchema

    @abc.abstractmethod
    async def unpackage(self) -> Flow:
        """
        Retrieve a flow from the package.
        """

PackageManifest.unpackage async

Retrieve a flow from the package.

Source code in prefect/packaging/base.py
@abc.abstractmethod
async def unpackage(self) -> Flow:
    """
    Retrieve a flow from the package.
    """

Packager pydantic-model

Creates a package for a flow.

A package contains the flow and is typically stored outside of Prefect. To facilitate interaction with the package, a manifest is returned that describes how to access and use the package.

Source code in prefect/packaging/base.py
class Packager(BaseModel, abc.ABC):
    """
    Creates a package for a flow.

    A package contains the flow and is typically stored outside of Prefect. To
    facilitate interaction with the package, a manifest is returned that describes how
    to access and use the package.
    """

    type: str

    def base_manifest(self, flow: Flow) -> PartialModel[PackageManifest]:
        manifest_cls = lookup_type(PackageManifest, self.type)
        return PartialModel(
            manifest_cls,
            type=self.type,
            flow_name=flow.name,
            flow_parameter_schema=parameter_schema(flow.fn),
        )

    @abc.abstractmethod
    async def package(self, flow: Flow) -> "PackageManifest":
        """
        Package a flow and return a manifest describing the created package.
        """

Packager.package async

Package a flow and return a manifest describing the created package.

Source code in prefect/packaging/base.py
@abc.abstractmethod
async def package(self, flow: Flow) -> "PackageManifest":
    """
    Package a flow and return a manifest describing the created package.
    """

Serializer pydantic-model

A serializer that can encode objects of type 'D' into bytes.

Source code in prefect/packaging/base.py
class Serializer(BaseModel, Generic[D], abc.ABC):
    """
    A serializer that can encode objects of type 'D' into bytes.
    """

    type: str

    def dumps(self, obj: D) -> bytes:
        """Encode the object into a blob of bytes."""

    def loads(self, blob: bytes) -> D:
        """Decode the blob of bytes into an object."""

Serializer.dumps

Encode the object into a blob of bytes.

Source code in prefect/packaging/base.py
def dumps(self, obj: D) -> bytes:
    """Encode the object into a blob of bytes."""

Serializer.loads

Decode the blob of bytes into an object.

Source code in prefect/packaging/base.py
def loads(self, blob: bytes) -> D:
    """Decode the blob of bytes into an object."""

docker

DockerPackageManifest pydantic-model

Represents a flow packaged in a Docker image

Source code in prefect/packaging/docker.py
class DockerPackageManifest(PackageManifest):
    """
    Represents a flow packaged in a Docker image
    """

    type: Literal["docker"] = "docker"

    image: str
    image_flow_location: str

    async def unpackage(self) -> Flow:
        return load_flow_from_script(self.image_flow_location, self.flow_name)

DockerPackageManifest.unpackage async

Retrieve a flow from the package.

Source code in prefect/packaging/docker.py
async def unpackage(self) -> Flow:
    return load_flow_from_script(self.image_flow_location, self.flow_name)

DockerPackager pydantic-model

This packager builds a Docker image containing the flow and the runtime environment necessary to run the flow. The resulting image is optionally pushed to a container registry, given by registry_url.

Source code in prefect/packaging/docker.py
class DockerPackager(Packager):
    """
    This packager builds a Docker image containing the flow and the runtime environment
    necessary to run the flow.  The resulting image is optionally pushed to a container
    registry, given by `registry_url`.
    """

    type: Literal["docker"] = "docker"

    base_image: Optional[str] = None
    python_environment: Optional[Union[PythonEnvironment, CondaEnvironment]] = None
    dockerfile: Optional[Path] = None
    platform: Optional[str] = (None,)
    image_flow_location: str = "/flow.py"
    registry_url: Optional[AnyHttpUrl] = None

    @root_validator
    def set_default_base_image(cls, values):
        if not values.get("base_image") and not values.get("dockerfile"):
            values["base_image"] = get_prefect_image_name(
                flavor="conda"
                if isinstance(values.get("python_environment"), CondaEnvironment)
                else None
            )
        return values

    @root_validator
    def base_image_and_dockerfile_exclusive(cls, values: Mapping[str, Any]):
        if values.get("base_image") and values.get("dockerfile"):
            raise ValueError(
                "Either `base_image` or `dockerfile` should be provided, but not both"
            )
        return values

    @root_validator
    def default_python_environment(cls, values: Mapping[str, Any]):
        if values.get("base_image") and not values.get("python_environment"):
            values["python_environment"] = PythonEnvironment.from_environment()
        return values

    @validator("registry_url", pre=True)
    def ensure_registry_url_is_prefixed(cls, value):
        if isinstance(value, str):
            if "://" not in value:
                return "https://" + value
        return value

    async def package(self, flow: Flow) -> DockerPackageManifest:
        """
        Package a flow as a Docker image and, optionally, push it to a registry
        """
        image_reference = await self._build_image(flow)

        if self.registry_url:
            image_name = f"{slugify(flow.name)}"
            image_reference = await run_sync_in_worker_thread(
                push_image, image_reference, self.registry_url, image_name
            )

        return self.base_manifest(flow).finalize(
            image=image_reference, image_flow_location=self.image_flow_location
        )

    async def _build_image(self, flow: Flow) -> str:
        if self.dockerfile:
            return await self._build_from_dockerfile()
        return await self._build_from_base_image(flow)

    async def _build_from_dockerfile(self) -> str:
        context = self.dockerfile.resolve().parent
        dockerfile = self.dockerfile.relative_to(context)
        return await run_sync_in_worker_thread(
            build_image,
            platform=self.platform,
            context=context,
            dockerfile=str(dockerfile),
        )

    async def _build_from_base_image(self, flow: Flow) -> str:
        with ImageBuilder(
            base_image=self.base_image, platform=self.platform
        ) as builder:
            for command in self.python_environment.install_commands():
                builder.add_line(to_run_command(command))

            source_info = json.loads(SourceSerializer().dumps(flow))

            builder.write_text(source_info["source"], self.image_flow_location)

            return await run_sync_in_worker_thread(
                builder.build, stream_progress_to=sys.stdout
            )

DockerPackager.package async

Package a flow as a Docker image and, optionally, push it to a registry

Source code in prefect/packaging/docker.py
async def package(self, flow: Flow) -> DockerPackageManifest:
    """
    Package a flow as a Docker image and, optionally, push it to a registry
    """
    image_reference = await self._build_image(flow)

    if self.registry_url:
        image_name = f"{slugify(flow.name)}"
        image_reference = await run_sync_in_worker_thread(
            push_image, image_reference, self.registry_url, image_name
        )

    return self.base_manifest(flow).finalize(
        image=image_reference, image_flow_location=self.image_flow_location
    )

file

FilePackager pydantic-model

This packager stores the flow as a single file.

By default, the file is the source code of the module the flow is defined in. Alternative serialization modes are available in prefect.packaging.serializers.

Source code in prefect/packaging/file.py
class FilePackager(Packager):
    """
    This packager stores the flow as a single file.

    By default, the file is the source code of the module the flow is defined in.
    Alternative serialization modes are available in `prefect.packaging.serializers`.
    """

    type: Literal["file"] = "file"
    serializer: Serializer = Field(default_factory=SourceSerializer)
    filesystem: WritableFileSystem = Field(
        default_factory=lambda: LocalFileSystem(
            basepath=PREFECT_HOME.value() / "storage"
        )
    )

    @inject_client
    async def package(self, flow: Flow, client: "OrionClient") -> FilePackageManifest:
        content = self.serializer.dumps(flow)
        key = stable_hash(content)

        await self.filesystem.write_path(key, content)

        filesystem_id = (
            self.filesystem._block_document_id
            or await self.filesystem._save(is_anonymous=True)
        )

        return self.base_manifest(flow).finalize(
            serializer=self.serializer,
            filesystem_id=filesystem_id,
            key=key,
        )

orion

OrionPackager pydantic-model

This packager stores the flow as an anonymous JSON block in the Orion database. The content of the block are encrypted at rest.

By default, the content is the source code of the module the flow is defined in. Alternative serialization modes are available in prefect.packaging.serializers.

Source code in prefect/packaging/orion.py
class OrionPackager(Packager):
    """
    This packager stores the flow as an anonymous JSON block in the Orion database.
    The content of the block are encrypted at rest.

    By default, the content is the source code of the module the flow is defined in.
    Alternative serialization modes are available in `prefect.packaging.serializers`.
    """

    type: Literal["orion"] = "orion"
    serializer: Serializer = Field(default_factory=SourceSerializer)

    async def package(self, flow: Flow) -> OrionPackageManifest:
        """
        Package a flow in the Orion database as an anonymous block.
        """
        block_document_id = await JSON(
            value={"flow": self.serializer.dumps(flow)}
        )._save(is_anonymous=True)

        return self.base_manifest(flow).finalize(
            serializer=self.serializer,
            block_document_id=block_document_id,
        )

OrionPackager.package async

Package a flow in the Orion database as an anonymous block.

Source code in prefect/packaging/orion.py
async def package(self, flow: Flow) -> OrionPackageManifest:
    """
    Package a flow in the Orion database as an anonymous block.
    """
    block_document_id = await JSON(
        value={"flow": self.serializer.dumps(flow)}
    )._save(is_anonymous=True)

    return self.base_manifest(flow).finalize(
        serializer=self.serializer,
        block_document_id=block_document_id,
    )

serializers

ImportSerializer pydantic-model

Serializes objects by storing their importable path.

Source code in prefect/packaging/serializers.py
class ImportSerializer(Serializer):
    """
    Serializes objects by storing their importable path.
    """

    type: Literal["import"] = "import"

    def dumps(self, obj: Any) -> bytes:
        return to_qualified_name(obj).encode()

    def loads(self, blob: bytes) -> Any:
        return from_qualified_name(blob.decode())

ImportSerializer.dumps

Encode the object into a blob of bytes.

Source code in prefect/packaging/serializers.py
def dumps(self, obj: Any) -> bytes:
    return to_qualified_name(obj).encode()

ImportSerializer.loads

Decode the blob of bytes into an object.

Source code in prefect/packaging/serializers.py
def loads(self, blob: bytes) -> Any:
    return from_qualified_name(blob.decode())

PickleSerializer pydantic-model

Serializes objects using the pickle protocol.

If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will be serialized by value instead of by reference, which means they do not have to be installed in the runtime location. This is especially useful for serializing objects that rely on local packages.

Wraps pickles in base64 for safe transmission.

Source code in prefect/packaging/serializers.py
class PickleSerializer(Serializer):
    """
    Serializes objects using the pickle protocol.

    If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will
    be serialized by value instead of by reference, which means they do not have to be
    installed in the runtime location. This is especially useful for serializing objects
    that rely on local packages.

    Wraps pickles in base64 for safe transmission.
    """

    type: Literal["pickle"] = "pickle"

    picklelib: str = "cloudpickle"
    picklelib_version: str = None
    pickle_modules: List[str] = pydantic.Field(default_factory=list)

    @pydantic.validator("picklelib")
    def check_picklelib(cls, value):
        """
        Check that the given pickle library is importable and has dumps/loads methods.
        """
        try:
            pickler = from_qualified_name(value)
        except (ImportError, AttributeError) as exc:
            raise ValueError(
                f"Failed to import requested pickle library: {value!r}."
            ) from exc

        if not callable(getattr(pickler, "dumps", None)):
            raise ValueError(
                f"Pickle library at {value!r} does not have a 'dumps' method."
            )

        if not callable(getattr(pickler, "loads", None)):
            raise ValueError(
                f"Pickle library at {value!r} does not have a 'loads' method."
            )

        return value

    @pydantic.root_validator
    def check_picklelib_version(cls, values):
        """
        Infers a default value for `picklelib_version` if null or ensures it matches
        the version retrieved from the `pickelib`.
        """
        picklelib = values.get("picklelib")
        picklelib_version = values.get("picklelib_version")

        if not picklelib:
            raise ValueError("Unable to check version of unrecognized picklelib module")

        pickler = from_qualified_name(picklelib)
        pickler_version = getattr(pickler, "__version__", None)

        if not picklelib_version:
            values["picklelib_version"] = pickler_version
        elif picklelib_version != pickler_version:
            warnings.warn(
                f"Mismatched {picklelib!r} versions. Found {pickler_version} in the "
                f"environment but {picklelib_version} was requested. This may cause "
                "the serializer to fail.",
                RuntimeWarning,
                stacklevel=3,
            )

        return values

    @pydantic.root_validator
    def check_picklelib_and_modules(cls, values):
        """
        Prevents modules from being specified if picklelib is not cloudpickle
        """
        if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
            raise ValueError(
                f"`pickle_modules` cannot be used without 'cloudpickle'. Got {values.get('picklelib')!r}."
            )
        return values

    def dumps(self, obj: Any) -> bytes:
        pickler = from_qualified_name(self.picklelib)

        for module in self.pickle_modules:
            pickler.register_pickle_by_value(from_qualified_name(module))

        blob = pickler.dumps(obj)

        for module in self.pickle_modules:
            # Restore the pickler settings
            pickler.unregister_pickle_by_value(from_qualified_name(module))

        return base64.encodebytes(blob)

    def loads(self, blob: bytes) -> Any:
        pickler = from_qualified_name(self.picklelib)
        return pickler.loads(base64.decodebytes(blob))

PickleSerializer.check_picklelib classmethod

Check that the given pickle library is importable and has dumps/loads methods.

Source code in prefect/packaging/serializers.py
@pydantic.validator("picklelib")
def check_picklelib(cls, value):
    """
    Check that the given pickle library is importable and has dumps/loads methods.
    """
    try:
        pickler = from_qualified_name(value)
    except (ImportError, AttributeError) as exc:
        raise ValueError(
            f"Failed to import requested pickle library: {value!r}."
        ) from exc

    if not callable(getattr(pickler, "dumps", None)):
        raise ValueError(
            f"Pickle library at {value!r} does not have a 'dumps' method."
        )

    if not callable(getattr(pickler, "loads", None)):
        raise ValueError(
            f"Pickle library at {value!r} does not have a 'loads' method."
        )

    return value

PickleSerializer.check_picklelib_and_modules classmethod

Prevents modules from being specified if picklelib is not cloudpickle

Source code in prefect/packaging/serializers.py
@pydantic.root_validator
def check_picklelib_and_modules(cls, values):
    """
    Prevents modules from being specified if picklelib is not cloudpickle
    """
    if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
        raise ValueError(
            f"`pickle_modules` cannot be used without 'cloudpickle'. Got {values.get('picklelib')!r}."
        )
    return values

PickleSerializer.check_picklelib_version classmethod

Infers a default value for picklelib_version if null or ensures it matches the version retrieved from the pickelib.

Source code in prefect/packaging/serializers.py
@pydantic.root_validator
def check_picklelib_version(cls, values):
    """
    Infers a default value for `picklelib_version` if null or ensures it matches
    the version retrieved from the `pickelib`.
    """
    picklelib = values.get("picklelib")
    picklelib_version = values.get("picklelib_version")

    if not picklelib:
        raise ValueError("Unable to check version of unrecognized picklelib module")

    pickler = from_qualified_name(picklelib)
    pickler_version = getattr(pickler, "__version__", None)

    if not picklelib_version:
        values["picklelib_version"] = pickler_version
    elif picklelib_version != pickler_version:
        warnings.warn(
            f"Mismatched {picklelib!r} versions. Found {pickler_version} in the "
            f"environment but {picklelib_version} was requested. This may cause "
            "the serializer to fail.",
            RuntimeWarning,
            stacklevel=3,
        )

    return values

PickleSerializer.dumps

Encode the object into a blob of bytes.

Source code in prefect/packaging/serializers.py
def dumps(self, obj: Any) -> bytes:
    pickler = from_qualified_name(self.picklelib)

    for module in self.pickle_modules:
        pickler.register_pickle_by_value(from_qualified_name(module))

    blob = pickler.dumps(obj)

    for module in self.pickle_modules:
        # Restore the pickler settings
        pickler.unregister_pickle_by_value(from_qualified_name(module))

    return base64.encodebytes(blob)

PickleSerializer.loads

Decode the blob of bytes into an object.

Source code in prefect/packaging/serializers.py
def loads(self, blob: bytes) -> Any:
    pickler = from_qualified_name(self.picklelib)
    return pickler.loads(base64.decodebytes(blob))

SourceSerializer pydantic-model

Serializes objects by retrieving the source code of the module they are defined in.

Creates a JSON blob with keys: source: The source code file_name: The name of the file the source was in symbol_name: The name of the object to extract from the source code

Deserialization requires the code to run with exec.

Source code in prefect/packaging/serializers.py
class SourceSerializer(Serializer):
    """
    Serializes objects by retrieving the source code of the module they are defined in.

    Creates a JSON blob with keys:
        source: The source code
        file_name: The name of the file the source was in
        symbol_name: The name of the object to extract from the source code

    Deserialization requires the code to run with `exec`.
    """

    type: Literal["source"] = "source"

    def dumps(self, obj: Any) -> bytes:
        module = inspect.getmodule(obj)

        if module is None:
            raise ValueError(f"Cannot determine source module for object: {obj!r}.")

        if not getattr(module, "__file__", None):
            raise ValueError(
                f"Found module {module!r} without source code file while serializing "
                f"object: {obj!r}."
            )

        source = inspect.getsource(module)

        return json.dumps(
            {
                "source": source,
                "file_name": os.path.basename(module.__file__),
                "symbol_name": obj.__name__,
            }
        ).encode()

    def loads(self, blob: bytes) -> Any:
        document = json.loads(blob)
        if not isinstance(document, dict) or set(document.keys()) != {
            "source",
            "file_name",
            "symbol_name",
        }:
            raise ValueError(
                "Invalid serialized data. "
                "Expected dictionary with keys 'source', 'file_name', and "
                "'symbol_name'. "
                f"Got: {document}"
            )

        with TemporaryDirectory() as tmpdir:
            temp_script = Path(tmpdir) / document["file_name"]
            temp_script.write_text(document["source"])
            module = load_script_as_module(str(temp_script))

        return getattr(module, document["symbol_name"])

SourceSerializer.dumps

Encode the object into a blob of bytes.

Source code in prefect/packaging/serializers.py
def dumps(self, obj: Any) -> bytes:
    module = inspect.getmodule(obj)

    if module is None:
        raise ValueError(f"Cannot determine source module for object: {obj!r}.")

    if not getattr(module, "__file__", None):
        raise ValueError(
            f"Found module {module!r} without source code file while serializing "
            f"object: {obj!r}."
        )

    source = inspect.getsource(module)

    return json.dumps(
        {
            "source": source,
            "file_name": os.path.basename(module.__file__),
            "symbol_name": obj.__name__,
        }
    ).encode()

SourceSerializer.loads

Decode the blob of bytes into an object.

Source code in prefect/packaging/serializers.py
def loads(self, blob: bytes) -> Any:
    document = json.loads(blob)
    if not isinstance(document, dict) or set(document.keys()) != {
        "source",
        "file_name",
        "symbol_name",
    }:
        raise ValueError(
            "Invalid serialized data. "
            "Expected dictionary with keys 'source', 'file_name', and "
            "'symbol_name'. "
            f"Got: {document}"
        )

    with TemporaryDirectory() as tmpdir:
        temp_script = Path(tmpdir) / document["file_name"]
        temp_script.write_text(document["source"])
        module = load_script_as_module(str(temp_script))

    return getattr(module, document["symbol_name"])