Skip to content

prefect.deployments.steps.pull

Core set of steps for specifying a Prefect project pull step.

git_clone async

Clones a git repository into the current working directory.

Parameters:

Name Type Description Default
repository str

the URL of the repository to clone

required
branch Optional[str]

the branch to clone; if not provided, the default branch will be used

None
include_submodules bool

whether to include git submodules when cloning the repository

False
access_token Optional[str]

an access token to use for cloning the repository; if not provided the repository will be cloned using the default git credentials

None
credentials Optional[Block]

a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the credentials to use for cloning the repository.

None

Returns:

Name Type Description
dict dict

a dictionary containing a directory key of the new directory that was created

Raises:

Type Description
CalledProcessError

if the git clone command fails for any reason

Examples:

Clone a public repository:

pull:
    - prefect.deployments.steps.git_clone:
        repository: https://github.com/PrefectHQ/prefect.git

Clone a branch of a public repository:

pull:
    - prefect.deployments.steps.git_clone:
        repository: https://github.com/PrefectHQ/prefect.git
        branch: my-branch

Clone a private repository using a GitHubCredentials block:

pull:
    - prefect.deployments.steps.git_clone:
        repository: https://github.com/org/repo.git
        credentials: "{{ prefect.blocks.github-credentials.my-github-credentials-block }}"

Clone a private repository using an access token:

pull:
    - prefect.deployments.steps.git_clone:
        repository: https://github.com/org/repo.git
        access_token: "{{ prefect.blocks.secret.github-access-token }}" # Requires creation of a Secret block
Note that you will need to create a Secret block to store the value of your git credentials. You can also store a username/password combo or token prefix (e.g. x-token-auth) in your secret block. Refer to your git providers documentation for the correct authentication schema.

Clone a repository with submodules:

pull:
    - prefect.deployments.steps.git_clone:
        repository: https://github.com/org/repo.git
        include_submodules: true

Clone a repository with an SSH key (note that the SSH key must be added to the worker before executing flows):

pull:
    - prefect.deployments.steps.git_clone:
        repository: git@github.com:org/repo.git

Source code in prefect/deployments/steps/pull.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@sync_compatible
async def git_clone(
    repository: str,
    branch: Optional[str] = None,
    include_submodules: bool = False,
    access_token: Optional[str] = None,
    credentials: Optional[Block] = None,
) -> dict:
    """
    Clones a git repository into the current working directory.

    Args:
        repository: the URL of the repository to clone
        branch: the branch to clone; if not provided, the default branch will be used
        include_submodules (bool): whether to include git submodules when cloning the repository
        access_token: an access token to use for cloning the repository; if not provided
            the repository will be cloned using the default git credentials
        credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the
            credentials to use for cloning the repository.

    Returns:
        dict: a dictionary containing a `directory` key of the new directory that was created

    Raises:
        subprocess.CalledProcessError: if the git clone command fails for any reason

    Examples:
        Clone a public repository:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: https://github.com/PrefectHQ/prefect.git
        ```

        Clone a branch of a public repository:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: https://github.com/PrefectHQ/prefect.git
                branch: my-branch
        ```

        Clone a private repository using a GitHubCredentials block:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: https://github.com/org/repo.git
                credentials: "{{ prefect.blocks.github-credentials.my-github-credentials-block }}"
        ```

        Clone a private repository using an access token:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: https://github.com/org/repo.git
                access_token: "{{ prefect.blocks.secret.github-access-token }}" # Requires creation of a Secret block
        ```
        Note that you will need to [create a Secret block](/concepts/blocks/#using-existing-block-types) to store the
        value of your git credentials. You can also store a username/password combo or token prefix (e.g. `x-token-auth`)
        in your secret block. Refer to your git providers documentation for the correct authentication schema.

        Clone a repository with submodules:
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: https://github.com/org/repo.git
                include_submodules: true
        ```

        Clone a repository with an SSH key (note that the SSH key must be added to the worker
        before executing flows):
        ```yaml
        pull:
            - prefect.deployments.steps.git_clone:
                repository: git@github.com:org/repo.git
        ```
    """
    if access_token and credentials:
        raise ValueError(
            "Please provide either an access token or credentials but not both."
        )

    credentials = {"access_token": access_token} if access_token else credentials

    storage = GitRepository(
        url=repository,
        credentials=credentials,
        branch=branch,
        include_submodules=include_submodules,
    )

    await storage.pull_code()

    directory = str(storage.destination.relative_to(Path.cwd()))
    deployment_logger.info(f"Cloned repository {repository!r} into {directory!r}")
    return {"directory": directory}

git_clone_project async

Deprecated. Use git_clone instead.

Source code in prefect/deployments/steps/pull.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@deprecated_callable(start_date="Jun 2023", help="Use 'git clone' instead.")
@sync_compatible
async def git_clone_project(
    repository: str,
    branch: Optional[str] = None,
    include_submodules: bool = False,
    access_token: Optional[str] = None,
) -> dict:
    """Deprecated. Use `git_clone` instead."""
    return await git_clone(
        repository=repository,
        branch=branch,
        include_submodules=include_submodules,
        access_token=access_token,
    )

pull_from_remote_storage async

Pulls code from a remote storage location into the current working directory.

Works with protocols supported by fsspec.

Parameters:

Name Type Description Default
url str

the URL of the remote storage location. Should be a valid fsspec URL. Some protocols may require an additional fsspec dependency to be installed. Refer to the fsspec docs for more details.

required
**settings Any

any additional settings to pass the fsspec filesystem class.

{}

Returns:

Name Type Description
dict

a dictionary containing a directory key of the new directory that was created

Examples:

Pull code from a remote storage location:

pull:
    - prefect.deployments.steps.pull_from_remote_storage:
        url: s3://my-bucket/my-folder

Pull code from a remote storage location with additional settings:

pull:
    - prefect.deployments.steps.pull_from_remote_storage:
        url: s3://my-bucket/my-folder
        key: {{ prefect.blocks.secret.my-aws-access-key }}}
        secret: {{ prefect.blocks.secret.my-aws-secret-key }}}

Source code in prefect/deployments/steps/pull.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def pull_from_remote_storage(url: str, **settings: Any):
    """
    Pulls code from a remote storage location into the current working directory.

    Works with protocols supported by `fsspec`.

    Args:
        url (str): the URL of the remote storage location. Should be a valid `fsspec` URL.
            Some protocols may require an additional `fsspec` dependency to be installed.
            Refer to the [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations)
            for more details.
        **settings (Any): any additional settings to pass the `fsspec` filesystem class.

    Returns:
        dict: a dictionary containing a `directory` key of the new directory that was created

    Examples:
        Pull code from a remote storage location:
        ```yaml
        pull:
            - prefect.deployments.steps.pull_from_remote_storage:
                url: s3://my-bucket/my-folder
        ```

        Pull code from a remote storage location with additional settings:
        ```yaml
        pull:
            - prefect.deployments.steps.pull_from_remote_storage:
                url: s3://my-bucket/my-folder
                key: {{ prefect.blocks.secret.my-aws-access-key }}}
                secret: {{ prefect.blocks.secret.my-aws-secret-key }}}
        ```
    """
    storage = RemoteStorage(url, **settings)

    await storage.pull_code()

    directory = str(storage.destination.relative_to(Path.cwd()))
    deployment_logger.info(f"Pulled code from {url!r} into {directory!r}")
    return {"directory": directory}

pull_with_block async

Pulls code using a block.

Parameters:

Name Type Description Default
block_document_name str

The name of the block document to use

required
block_type_slug str

The slug of the type of block to use

required
Source code in prefect/deployments/steps/pull.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def pull_with_block(block_document_name: str, block_type_slug: str):
    """
    Pulls code using a block.

    Args:
        block_document_name: The name of the block document to use
        block_type_slug: The slug of the type of block to use
    """
    full_slug = f"{block_type_slug}/{block_document_name}"
    try:
        block = await Block.load(full_slug)
    except Exception:
        deployment_logger.exception("Unable to load block '%s'", full_slug)
        raise

    try:
        storage = BlockStorageAdapter(block)
    except Exception:
        deployment_logger.exception(
            "Unable to create storage adapter for block '%s'", full_slug
        )
        raise

    await storage.pull_code()

    directory = str(storage.destination.relative_to(Path.cwd()))
    deployment_logger.info(
        "Pulled code using block '%s' into '%s'", full_slug, directory
    )
    return {"directory": directory}

set_working_directory

Sets the working directory; works with both absolute and relative paths.

Parameters:

Name Type Description Default
directory str

the directory to set as the working directory

required

Returns:

Name Type Description
dict dict

a dictionary containing a directory key of the directory that was set

Source code in prefect/deployments/steps/pull.py
17
18
19
20
21
22
23
24
25
26
27
28
29
def set_working_directory(directory: str) -> dict:
    """
    Sets the working directory; works with both absolute and relative paths.

    Args:
        directory (str): the directory to set as the working directory

    Returns:
        dict: a dictionary containing a `directory` key of the
            directory that was set
    """
    os.chdir(directory)
    return dict(directory=directory)