Skip to content

prefect.cli.deployment

Command line interface for working with deployments.

Infra

An enumeration.

Source code in prefect/cli/deployment.py
class Infra(str, Enum):
    kubernetes = KubernetesJob.get_block_type_slug()
    process = Process.get_block_type_slug()
    docker = DockerContainer.get_block_type_slug()

apply async

Create or update a deployment from a YAML file.

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def apply(
    paths: List[str] = typer.Argument(
        ...,
        help="One or more paths to deployment YAML files.",
    ),
):
    """
    Create or update a deployment from a YAML file.
    """
    for path in paths:

        # load the file
        with open(str(path), "r") as f:
            data = yaml.safe_load(f)

        # create deployment object
        try:
            deployment = Deployment(**data)
            app.console.print(f"Successfully loaded {deployment.name!r}", style="green")
        except Exception as exc:
            exit_with_error(f"'{path!s}' did not conform to deployment spec: {exc!r}")

        async with get_client() as client:
            # prep IDs
            flow_id = await client.create_flow_from_name(deployment.flow_name)

            if not deployment.infrastructure._block_document_id:
                # if not building off a block, will create an anonymous block
                deployment.infrastructure = deployment.infrastructure.copy()
                infrastructure_document_id = await deployment.infrastructure._save(
                    is_anonymous=True,
                )
            else:
                infrastructure_document_id = (
                    deployment.infrastructure._block_document_id
                )

            # we assume storage was already saved
            storage_document_id = getattr(
                deployment.storage, "_block_document_id", None
            )

            deployment_id = await client.create_deployment(
                flow_id=flow_id,
                name=deployment.name,
                version=deployment.version,
                schedule=deployment.schedule,
                parameters=deployment.parameters,
                description=deployment.description,
                tags=deployment.tags,
                manifest_path=deployment.manifest_path,  # allows for backwards YAML compat
                path=deployment.path,
                entrypoint=deployment.entrypoint,
                infra_overrides=deployment.infra_overrides,
                storage_document_id=storage_document_id,
                infrastructure_document_id=infrastructure_document_id,
                parameter_openapi_schema=deployment.parameter_openapi_schema.dict(),
            )

        app.console.print(
            f"Deployment '{deployment.flow_name}/{deployment.name}' successfully created with id '{deployment_id}'.",
            style="green",
        )

build async

Generate a deployment YAML from /path/to/file.py:flow_function

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def build(
    path: str = typer.Argument(
        ...,
        help="The path to a flow entrypoint, in the form of `./path/to/file.py:flow_func_name`",
    ),
    name: str = typer.Option(
        None, "--name", "-n", help="The name to give the deployment."
    ),
    version: str = typer.Option(
        None, "--version", "-v", help="A version to give the deployment."
    ),
    tags: List[str] = typer.Option(
        None,
        "-t",
        "--tag",
        help="One or more optional tags to apply to the deployment.",
    ),
    infra_type: Infra = typer.Option(
        None,
        "--infra",
        "-i",
        help="The infrastructure type to use, prepopulated with defaults.",
    ),
    infra_block: str = typer.Option(
        None,
        "--infra-block",
        "-ib",
        help="The slug of the infrastructure block to use as a template.",
    ),
    overrides: List[str] = typer.Option(
        None,
        "--override",
        help="One or more optional infrastructure overrides provided as a dot delimited path, e.g., `env.env_key=env_value`",
    ),
    storage_block: str = typer.Option(
        None,
        "--storage-block",
        "-sb",
        help="The slug of the storage block. Use the syntax: 'block_type/block_name', where block_type must be one of 'remote-file-system', 's3', 'gcs', 'azure'",
    ),
    output: str = typer.Option(
        None,
        "--output",
        "-o",
        help="An optional filename to write the deployment file to.",
    ),
):
    """
    Generate a deployment YAML from /path/to/file.py:flow_function
    """

    # validate inputs
    if not name:
        exit_with_error(
            "A name for this deployment must be provided with the '--name' flag."
        )

    output_file = None
    if output:
        output_file = Path(output)
        if output_file.suffix and output_file.suffix != ".yaml":
            exit_with_error("Output file must be a '.yaml' file.")
        else:
            output_file = output_file.with_suffix(".yaml")

    # validate flow
    try:
        fpath, obj_name = path.rsplit(":", 1)
    except ValueError as exc:
        if str(exc) == "not enough values to unpack (expected 2, got 1)":
            missing_flow_name_msg = f"Your flow path must include the name of the function that is the entrypoint to your flow.\nTry {path}:<flow_name> for your flow path."
            exit_with_error(missing_flow_name_msg)
        else:
            raise exc
    try:
        flow = import_object(path)
        if isinstance(flow, Flow):
            app.console.print(f"Found flow {flow.name!r}", style="green")
        else:
            exit_with_error(
                f"Found object of unexpected type {type(flow).__name__!r}. Expected 'Flow'."
            )
    except AttributeError:
        exit_with_error(f"{obj_name!r} not found in {fpath!r}.")
    except FileNotFoundError:
        exit_with_error(f"{fpath!r} not found.")

    ## process storage, move files around and process path logic
    deployment_path = None
    entrypoint = (
        f"{Path(fpath).absolute().relative_to(Path('.').absolute())}:{obj_name}"
    )
    if storage_block:
        template = await Block.load(storage_block)
        storage = template.copy(
            exclude={"_block_document_id", "_block_document_name", "_is_anonymous"}
        )

        # process .prefectignore file
        if set_default_ignore_file(path="."):
            app.console.print(
                f"Default '.prefectignore' file written to {(Path('.') / '.prefectignore').absolute()}",
                style="green",
            )

        # upload current directory to storage location
        file_count = await storage.put_directory(ignore_file=".prefectignore")
        app.console.print(
            f"Successfully uploaded {file_count} files to {storage.basepath}",
            style="green",
        )
    else:
        # default storage, no need to move anything around
        storage = None
        deployment_path = str(Path(".").absolute())

    # persists storage now in case it contains secret values
    if storage and not storage._block_document_id:
        await storage._save(is_anonymous=True)

    if infra_block:
        infrastructure = await Block.load(infra_block)
    elif infra_type:
        if infra_type == Infra.kubernetes:
            infrastructure = KubernetesJob()
        elif infra_type == Infra.docker:
            infrastructure = DockerContainer()
        elif infra_type == Infra.process:
            infrastructure = Process()
    else:
        infrastructure = Process()

    description = getdoc(flow)
    schedule = None
    parameters = None
    flow_parameter_schema = parameter_schema(flow)

    async with get_client() as client:
        try:
            deployment = await client.read_deployment_by_name(f"{flow.name}/{name}")
            description = deployment.description
            schedule = deployment.schedule
            parameters = deployment.parameters

            # if infra was passed, we override the server-side settings
            if not infrastructure and deployment.infrastructure_document_id:
                infrastructure = Block._from_block_document(
                    await client.read_block_document(
                        deployment.infrastructure_document_id
                    )
                )
        except ObjectNotFound:
            pass

    infra_overrides = {}
    for override in overrides or []:
        key, value = override.split("=", 1)
        infra_overrides[key] = value
    deployment = Deployment(
        name=name,
        description=description,
        tags=tags or [],
        parameters=parameters or {},
        version=version or flow.version,
        flow_name=flow.name,
        schedule=schedule,
        parameter_openapi_schema=flow_parameter_schema,
        path=deployment_path,
        entrypoint=entrypoint,
        storage=storage,
        infrastructure=infrastructure,
        infra_overrides=infra_overrides,
    )

    deployment_loc = output_file or f"{obj_name}-deployment.yaml"
    with open(deployment_loc, "w") as f:
        f.write(deployment.header)
        yaml.dump(deployment.editable_fields_dict(), f, sort_keys=False)
        f.write("###\n### DO NOT EDIT BELOW THIS LINE\n###\n")
        yaml.dump(deployment.immutable_fields_dict(), f, sort_keys=False)

    exit_with_success(
        f"Deployment YAML created at '{Path(deployment_loc).absolute()!s}'."
    )

delete async

Delete a deployment.



Examples:

 $ prefect deployment delete test_flow/test_deployment $ prefect deployment delete --id dfd3e220-a130-4149-9af6-8d487e02fea6

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def delete(
    name: Optional[str] = typer.Argument(
        None, help="A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>"
    ),
    deployment_id: Optional[str] = typer.Option(
        None, "--id", help="A deployment id to search for if no name is given"
    ),
):
    """
    Delete a deployment.

    \b
    Examples:
        \b
        $ prefect deployment delete test_flow/test_deployment
        $ prefect deployment delete --id dfd3e220-a130-4149-9af6-8d487e02fea6
    """
    async with get_client() as client:
        if name is None and deployment_id is not None:
            try:
                await client.delete_deployment(deployment_id)
                exit_with_success(f"Deleted deployment '{deployment_id}'.")
            except ObjectNotFound:
                exit_with_error(f"Deployment {deployment_id!r} not found!")
        elif name is not None:
            try:
                deployment = await client.read_deployment_by_name(name)
                await client.delete_deployment(deployment.id)
                exit_with_success(f"Deleted deployment '{name}'.")
            except ObjectNotFound:
                exit_with_error(f"Deployment {name!r} not found!")
        else:
            exit_with_error("Must provide a deployment name or id")

inspect async

View details about a deployment.



Examples:

 $ prefect deployment inspect "hello-world/my-deployment" { 'id': '610df9c3-0fb4-4856-b330-67f588d20201', 'created': '2022-08-01T18:36:25.192102+00:00', 'updated': '2022-08-01T18:36:25.188166+00:00', 'name': 'my-deployment', 'description': None, 'flow_id': 'b57b0aa2-ef3a-479e-be49-381fb0483b4e', 'schedule': None, 'is_schedule_active': True, 'parameters': {'name': 'Marvin'}, 'tags': ['test'], 'parameter_openapi_schema': { 'title': 'Parameters', 'type': 'object', 'properties': { 'name': { 'title': 'name', 'type': 'string' } }, 'required': ['name'] }, 'storage_document_id': '63ef008f-1e5d-4e07-a0d4-4535731adb32', 'infrastructure_document_id': '6702c598-7094-42c8-9785-338d2ec3a028', 'infrastructure': { 'type': 'process', 'env': {}, 'labels': {}, 'name': None, 'command': ['python', '-m', 'prefect.engine'], 'stream_output': True } }

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def inspect(name: str):
    """
    View details about a deployment.

    \b
    Example:
        \b
        $ prefect deployment inspect "hello-world/my-deployment"
        {
            'id': '610df9c3-0fb4-4856-b330-67f588d20201',
            'created': '2022-08-01T18:36:25.192102+00:00',
            'updated': '2022-08-01T18:36:25.188166+00:00',
            'name': 'my-deployment',
            'description': None,
            'flow_id': 'b57b0aa2-ef3a-479e-be49-381fb0483b4e',
            'schedule': None,
            'is_schedule_active': True,
            'parameters': {'name': 'Marvin'},
            'tags': ['test'],
            'parameter_openapi_schema': {
                'title': 'Parameters',
                'type': 'object',
                'properties': {
                    'name': {
                        'title': 'name',
                        'type': 'string'
                    }
                },
                'required': ['name']
            },
            'storage_document_id': '63ef008f-1e5d-4e07-a0d4-4535731adb32',
            'infrastructure_document_id': '6702c598-7094-42c8-9785-338d2ec3a028',
            'infrastructure': {
                'type': 'process',
                'env': {},
                'labels': {},
                'name': None,
                'command': ['python', '-m', 'prefect.engine'],
                'stream_output': True
            }
        }

    """
    assert_deployment_name_format(name)

    async with get_client() as client:
        try:
            deployment = await client.read_deployment_by_name(name)
        except ObjectNotFound:
            exit_with_error(f"Deployment {name!r} not found!")

        deployment_json = deployment.dict(json_compatible=True)

        if deployment.infrastructure_document_id:
            deployment_json["infrastructure"] = Block._from_block_document(
                await client.read_block_document(deployment.infrastructure_document_id)
            ).dict(
                exclude={"_block_document_id", "_block_document_name", "_is_anonymous"}
            )

    app.console.print(Pretty(deployment_json))

ls async

View all deployments or deployments for specific flows.

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def ls(flow_name: List[str] = None, by_created: bool = False):
    """
    View all deployments or deployments for specific flows.
    """
    async with get_client() as client:
        deployments = await client.read_deployments(
            flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None
        )
        flows = {
            flow.id: flow
            for flow in await client.read_flows(
                flow_filter=FlowFilter(id={"any_": [d.flow_id for d in deployments]})
            )
        }

    sort_by_name_keys = lambda d: (flows[d.flow_id].name, d.name)
    sort_by_created_key = lambda d: pendulum.now("utc") - d.created

    table = Table(
        title="Deployments",
    )
    table.add_column("Name", style="blue", no_wrap=True)
    table.add_column("ID", style="cyan", no_wrap=True)

    for deployment in sorted(
        deployments, key=sort_by_created_key if by_created else sort_by_name_keys
    ):
        table.add_row(
            f"{flows[deployment.flow_id].name}/[bold]{deployment.name}[/]",
            str(deployment.id),
        )

    app.console.print(table)

run async

Create a flow run for the given flow and deployment.

The flow run will be scheduled for now and an agent must execute it.

The flow run will not execute until an agent starts.

Source code in prefect/cli/deployment.py
@deployment_app.command()
async def run(
    name: Optional[str] = typer.Argument(
        None, help="A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>"
    ),
    deployment_id: Optional[str] = typer.Option(
        None, "--id", help="A deployment id to search for if no name is given"
    ),
):
    """
    Create a flow run for the given flow and deployment.

    The flow run will be scheduled for now and an agent must execute it.

    The flow run will not execute until an agent starts.
    """
    async with get_client() as client:
        deployment = await get_deployment(client, name, deployment_id)
        flow_run = await client.create_flow_run_from_deployment(deployment.id)
    app.console.print(f"Created flow run {flow_run.name!r} ({flow_run.id})")

str_presenter

configures yaml for dumping multiline strings Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data

Source code in prefect/cli/deployment.py
def str_presenter(dumper, data):
    """
    configures yaml for dumping multiline strings
    Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data
    """
    if len(data.splitlines()) > 1:  # check for multiline string
        return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
    return dumper.represent_scalar("tag:yaml.org,2002:str", data)