prefect.deployments
¶
Objects for specifying deployments and utilities for loading flows from deployments.
Deployment
pydantic-model
¶
Source code in prefect/deployments.py
class Deployment(BaseModel):
@property
def editable_fields(self) -> List[str]:
editable_fields = [
"name",
"description",
"version",
"tags",
"parameters",
"schedule",
"infra_overrides",
]
if self.infrastructure._block_document_id:
return editable_fields
else:
return editable_fields + ["infrastructure"]
@property
def header(self) -> str:
return f"###\n### A complete description of a Prefect Deployment for flow {self.flow_name!r}\n###\n"
def yaml_dict(self) -> dict:
# avoids issues with UUIDs showing up in YAML
all_fields = json.loads(
self.json(
exclude={
"storage": {"_filesystem", "filesystem", "_remote_file_system"}
}
)
)
if all_fields["storage"]:
all_fields["storage"][
"_block_type_slug"
] = self.storage.get_block_type_slug()
return all_fields
def editable_fields_dict(self):
"Returns YAML compatible dictionary of editable fields, in the correct order"
all_fields = self.yaml_dict()
return {field: all_fields[field] for field in self.editable_fields}
def immutable_fields_dict(self):
"Returns YAML compatible dictionary of immutable fields, in the correct order"
all_fields = self.yaml_dict()
return {k: v for k, v in all_fields.items() if k not in self.editable_fields}
# top level metadata
name: str = Field(..., description="The name of the deployment.")
description: str = Field(
None, description="An optional description of the deployment."
)
version: str = Field(None, description="An optional version for the deployment.")
tags: List[str] = Field(default_factory=list)
schedule: schemas.schedules.SCHEDULE_TYPES = None
flow_name: str = Field(..., description="The name of the flow.")
# flow data
parameters: Dict[str, Any] = Field(default_factory=dict)
manifest_path: str = Field(
None,
description="The path to the flow's manifest file, relative to the chosen storage.",
)
infrastructure: Union[DockerContainer, KubernetesJob, Process] = Field(
default_factory=Process
)
infra_overrides: Dict[str, Any] = Field(
default_factory=dict,
description="Overrides to apply to the base infrastructure block at runtime.",
)
storage: Optional[Block] = Field(
None,
help="The remote storage to use for this workflow.",
)
path: str = Field(
None,
description="The path to the working directory for the workflow, relative to remote storage or an absolute path.",
)
entrypoint: str = Field(
None,
description="The path to the entrypoint for the workflow, relative to the `path`.",
)
parameter_openapi_schema: ParameterSchema = Field(
..., description="The parameter schema of the flow, including defaults."
)
@validator("storage", pre=True)
def cast_storage_to_block_type(cls, value):
if isinstance(value, dict):
block = lookup_type(Block, value.pop("_block_type_slug"))
return block(**value)
return value
description
pydantic-field
¶
Type: str
An optional description of the deployment.
entrypoint
pydantic-field
¶
Type: str
The path to the entrypoint for the workflow, relative to the path
.
flow_name
pydantic-field
required
¶
Type: str
The name of the flow.
infra_overrides
pydantic-field
¶
Type: Dict[str, Any]
Overrides to apply to the base infrastructure block at runtime.
manifest_path
pydantic-field
¶
Type: str
The path to the flow's manifest file, relative to the chosen storage.
name
pydantic-field
required
¶
Type: str
The name of the deployment.
parameter_openapi_schema
pydantic-field
required
¶
Type: ParameterSchema
The parameter schema of the flow, including defaults.
path
pydantic-field
¶
Type: str
The path to the working directory for the workflow, relative to remote storage or an absolute path.
version
pydantic-field
¶
Type: str
An optional version for the deployment.
Deployment.editable_fields_dict
¶
Returns YAML compatible dictionary of editable fields, in the correct order
Source code in prefect/deployments.py
def editable_fields_dict(self):
"Returns YAML compatible dictionary of editable fields, in the correct order"
all_fields = self.yaml_dict()
return {field: all_fields[field] for field in self.editable_fields}
Deployment.immutable_fields_dict
¶
Returns YAML compatible dictionary of immutable fields, in the correct order
Source code in prefect/deployments.py
def immutable_fields_dict(self):
"Returns YAML compatible dictionary of immutable fields, in the correct order"
all_fields = self.yaml_dict()
return {k: v for k, v in all_fields.items() if k not in self.editable_fields}
load_deployments_from_yaml
¶
Load deployments from a yaml file.
Source code in prefect/deployments.py
def load_deployments_from_yaml(
path: str,
) -> PrefectObjectRegistry:
"""
Load deployments from a yaml file.
"""
with open(path, "r") as f:
contents = f.read()
# Parse into a yaml tree to retrieve separate documents
nodes = yaml.compose_all(contents)
with PrefectObjectRegistry(capture_failures=True) as registry:
for node in nodes:
with tmpchdir(path):
deployment_dict = yaml.safe_load(yaml.serialize(node))
# The return value is not necessary, just instantiating the Deployment
# is enough to get it recorded on the registry
parse_obj_as(Deployment, deployment_dict)
return registry
load_flow_from_flow_run
async
¶
Load a flow from the location/script provided in a deployment's storage document.
Source code in prefect/deployments.py
@inject_client
async def load_flow_from_flow_run(
flow_run: schemas.core.FlowRun, client: OrionClient
) -> Flow:
"""
Load a flow from the location/script provided in a deployment's storage document.
"""
deployment = await client.read_deployment(flow_run.deployment_id)
if deployment.storage_document_id:
storage_document = await client.read_block_document(
deployment.storage_document_id
)
storage_block = Block._from_block_document(storage_document)
else:
basepath = deployment.path or Path(deployment.manifest_path).parent
storage_block = LocalFileSystem(basepath=basepath)
sys.path.insert(0, ".")
# TODO: append deployment.path
await storage_block.get_directory(from_path=None, local_path=".")
flow_run_logger(flow_run).debug(
f"Loading flow for deployment {deployment.name!r}..."
)
# for backwards compat
import_path = deployment.entrypoint
if deployment.manifest_path:
with open(deployment.manifest_path, "r") as f:
import_path = json.load(f)["import_path"]
flow = import_object(import_path)
return flow