Skip to content

prefect.filesystems

Azure pydantic-model

Store data as a file on Azure Datalake and Azure Blob Storage.

Examples:

Load stored Azure config:

from prefect.filesystems import Azure

az_block = Azure.load("BLOCK_NAME")

Source code in prefect/filesystems.py
class Azure(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on Azure Datalake and Azure Blob Storage.

    Example:
        Load stored Azure config:
        ```python
        from prefect.filesystems import Azure

        az_block = Azure.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Azure"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/6AiQ6HRIft8TspZH7AfyZg/39fd82bdbb186db85560f688746c8cdd/azure.png?h=250"

    bucket_path: str = Field(
        default=...,
        description="An Azure storage bucket path.",
        example="my-bucket/a-directory-within",
    )
    azure_storage_connection_string: Optional[SecretStr] = Field(
        default=None,
        title="Azure storage connection string",
        description="Equivalent to the AZURE_STORAGE_CONNECTION_STRING environment variable.",
    )
    azure_storage_account_name: Optional[SecretStr] = Field(
        default=None,
        title="Azure storage account name",
        description="Equivalent to the AZURE_STORAGE_ACCOUNT_NAME environment variable.",
    )
    azure_storage_account_key: Optional[SecretStr] = Field(
        default=None,
        title="Azure storage account key",
        description="Equivalent to the AZURE_STORAGE_ACCOUNT_KEY environment variable.",
    )
    azure_storage_tenant_id: Optional[SecretStr] = Field(
        None,
        title="Azure storage tenant ID",
        description="Equivalent to the AZURE_TENANT_ID environment variable.",
    )
    azure_storage_client_id: Optional[SecretStr] = Field(
        None,
        title="Azure storage client ID",
        description="Equivalent to the AZURE_CLIENT_ID environment variable.",
    )
    azure_storage_client_secret: Optional[SecretStr] = Field(
        None,
        title="Azure storage client secret",
        description="Equivalent to the AZURE_CLIENT_SECRET environment variable.",
    )
    _remote_file_system: RemoteFileSystem = None

    @property
    def basepath(self) -> str:
        return f"az://{self.bucket_path}"

    @property
    def filesystem(self) -> RemoteFileSystem:
        settings = {}
        if self.azure_storage_connection_string:
            settings[
                "connection_string"
            ] = self.azure_storage_connection_string.get_secret_value()
        if self.azure_storage_account_name:
            settings[
                "account_name"
            ] = self.azure_storage_account_name.get_secret_value()
        if self.azure_storage_account_key:
            settings["account_key"] = self.azure_storage_account_key.get_secret_value()
        if self.azure_storage_tenant_id:
            settings["tenant_id"] = self.azure_storage_tenant_id.get_secret_value()
        if self.azure_storage_client_id:
            settings["client_id"] = self.azure_storage_client_id.get_secret_value()
        if self.azure_storage_client_secret:
            settings[
                "client_secret"
            ] = self.azure_storage_client_secret.get_secret_value()
        self._remote_file_system = RemoteFileSystem(
            basepath=f"az://{self.bucket_path}", settings=settings
        )
        return self._remote_file_system

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> bytes:
        """
        Downloads a directory from a given remote path to a local direcotry.

        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        return await self.filesystem.get_directory(
            from_path=from_path, local_path=local_path
        )

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to a remote directory.

        Defaults to uploading the entire contents of the current working directory to the block's basepath.
        """
        return await self.filesystem.put_directory(
            local_path=local_path, to_path=to_path, ignore_file=ignore_file
        )

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        return await self.filesystem.read_path(path)

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        return await self.filesystem.write_path(path=path, content=content)

azure_storage_account_key pydantic-field

Type: SecretStr

Equivalent to the AZURE_STORAGE_ACCOUNT_KEY environment variable.

azure_storage_account_name pydantic-field

Type: SecretStr

Equivalent to the AZURE_STORAGE_ACCOUNT_NAME environment variable.

azure_storage_client_id pydantic-field

Type: SecretStr

Equivalent to the AZURE_CLIENT_ID environment variable.

azure_storage_client_secret pydantic-field

Type: SecretStr

Equivalent to the AZURE_CLIENT_SECRET environment variable.

azure_storage_connection_string pydantic-field

Type: SecretStr

Equivalent to the AZURE_STORAGE_CONNECTION_STRING environment variable.

azure_storage_tenant_id pydantic-field

Type: SecretStr

Equivalent to the AZURE_TENANT_ID environment variable.

bucket_path pydantic-field required

Type: str

An Azure storage bucket path.

Azure.get_directory async

Downloads a directory from a given remote path to a local direcotry.

Defaults to downloading the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> bytes:
    """
    Downloads a directory from a given remote path to a local direcotry.

    Defaults to downloading the entire contents of the block's basepath to the current working directory.
    """
    return await self.filesystem.get_directory(
        from_path=from_path, local_path=local_path
    )

Azure.put_directory async

Uploads a directory from a given local path to a remote directory.

Defaults to uploading the entire contents of the current working directory to the block's basepath.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to a remote directory.

    Defaults to uploading the entire contents of the current working directory to the block's basepath.
    """
    return await self.filesystem.put_directory(
        local_path=local_path, to_path=to_path, ignore_file=ignore_file
    )

GCS pydantic-model

Store data as a file on Google Cloud Storage.

Examples:

Load stored GCS config:

from prefect.filesystems import GCS

gcs_block = GCS.load("BLOCK_NAME")

Source code in prefect/filesystems.py
class GCS(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on Google Cloud Storage.

    Example:
        Load stored GCS config:
        ```python
        from prefect.filesystems import GCS

        gcs_block = GCS.load("BLOCK_NAME")
        ```
    """

    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/4CD4wwbiIKPkZDt4U3TEuW/c112fe85653da054b6d5334ef662bec4/gcp.png?h=250"

    bucket_path: str = Field(
        default=...,
        description="A GCS bucket path.",
        example="my-bucket/a-directory-within",
    )
    service_account_info: Optional[SecretStr] = Field(
        default=None,
        description="The contents of a service account keyfile as a JSON string.",
    )
    project: Optional[str] = Field(
        default=None,
        description="The project the GCS bucket resides in. If not provided, the project will be inferred from the credentials or environment.",
    )

    @property
    def basepath(self) -> str:
        return f"gcs://{self.bucket_path}"

    @property
    def filesystem(self) -> RemoteFileSystem:
        settings = {}
        if self.service_account_info:
            try:
                settings["token"] = json.loads(
                    self.service_account_info.get_secret_value()
                )
            except json.JSONDecodeError:
                raise ValueError(
                    "Unable to load provided service_account_info. Please make sure that the provided value is a valid JSON string."
                )
        remote_file_system = RemoteFileSystem(
            basepath=f"gcs://{self.bucket_path}", settings=settings
        )
        return remote_file_system

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> bytes:
        """
        Downloads a directory from a given remote path to a local directory.

        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        return await self.filesystem.get_directory(
            from_path=from_path, local_path=local_path
        )

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to a remote directory.

        Defaults to uploading the entire contents of the current working directory to the block's basepath.
        """
        return await self.filesystem.put_directory(
            local_path=local_path, to_path=to_path, ignore_file=ignore_file
        )

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        return await self.filesystem.read_path(path)

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        return await self.filesystem.write_path(path=path, content=content)

bucket_path pydantic-field required

Type: str

A GCS bucket path.

project pydantic-field

Type: str

The project the GCS bucket resides in. If not provided, the project will be inferred from the credentials or environment.

service_account_info pydantic-field

Type: SecretStr

The contents of a service account keyfile as a JSON string.

GCS.get_directory async

Downloads a directory from a given remote path to a local directory.

Defaults to downloading the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> bytes:
    """
    Downloads a directory from a given remote path to a local directory.

    Defaults to downloading the entire contents of the block's basepath to the current working directory.
    """
    return await self.filesystem.get_directory(
        from_path=from_path, local_path=local_path
    )

GCS.put_directory async

Uploads a directory from a given local path to a remote directory.

Defaults to uploading the entire contents of the current working directory to the block's basepath.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to a remote directory.

    Defaults to uploading the entire contents of the current working directory to the block's basepath.
    """
    return await self.filesystem.put_directory(
        local_path=local_path, to_path=to_path, ignore_file=ignore_file
    )

GitHub pydantic-model

Interact with files stored on public GitHub repositories.

Source code in prefect/filesystems.py
class GitHub(ReadableDeploymentStorage):
    """
    Interact with files stored on public GitHub repositories.
    """

    _block_type_name = "GitHub"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/187oCWsD18m5yooahq1vU0/ace41e99ab6dc40c53e5584365a33821/github.png?h=250"

    repository: str = Field(
        default=...,
        description="The URL of a GitHub repository to read from, in either HTTPS or SSH format.",
    )
    reference: Optional[str] = Field(
        default=None,
        description="An optional reference to pin to; can be a branch name or tag.",
    )
    access_token: Optional[SecretStr] = Field(
        name="Personal Access Token",
        default=None,
        description="A GitHub Personal Access Token (PAT) with repo scope.",
    )

    @validator("access_token")
    def _ensure_credentials_go_with_https(cls, v: str, values: dict) -> str:
        """Ensure that credentials are not provided with 'SSH' formatted GitHub URLs.

        Note: validates `access_token` specifically so that it only fires when
        private repositories are used.
        """
        if v is not None:
            if urllib.parse.urlparse(values["repository"]).scheme != "https":
                raise InvalidRepositoryURLError(
                    (
                        "Crendentials can only be used with GitHub repositories "
                        "using the 'HTTPS' format. You must either remove the "
                        "credential if you wish to use the 'SSH' format and are not "
                        "using a private repository, or you must change the repository "
                        "URL to the 'HTTPS' format. "
                    )
                )

        return v

    def _create_repo_url(self) -> str:
        """Format the URL provided to the `git clone` command.

        For private repos: https://<oauth-key>@github.com/<username>/<repo>.git
        All other repos should be the same as `self.repository`.
        """
        url_components = urllib.parse.urlparse(self.repository)
        if url_components.scheme == "https" and self.access_token is not None:
            updated_components = url_components._replace(
                netloc=f"{self.access_token.get_secret_value()}@{url_components.netloc}"
            )
            full_url = urllib.parse.urlunparse(updated_components)
        else:
            full_url = self.repository

        return full_url

    @staticmethod
    def _get_paths(
        dst_dir: Union[str, None], src_dir: str, sub_directory: str
    ) -> Tuple[str, str]:
        """Returns the fully formed paths for GitHubRepository contents in the form
        (content_source, content_destination).
        """
        if dst_dir is None:
            content_destination = Path(".").absolute()
        else:
            content_destination = Path(dst_dir)

        content_source = Path(src_dir)

        if sub_directory:
            content_destination = content_destination.joinpath(sub_directory)
            content_source = content_source.joinpath(sub_directory)

        return str(content_source), str(content_destination)

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        """
        Clones a GitHub project specified in `from_path` to the provided `local_path`;
        defaults to cloning the repository reference configured on the Block to the
        present working directory.

        Args:
            from_path: If provided, interpreted as a subdirectory of the underlying
                repository that will be copied to the provided local path.
            local_path: A local path to clone to; defaults to present working directory.
        """
        # CONSTRUCT COMMAND
        cmd = ["git", "clone", self._create_repo_url()]
        if self.reference:
            cmd += ["-b", self.reference]

        # Limit git history
        cmd += ["--depth", "1"]

        # Clone to a temporary directory and move the subdirectory over
        with TemporaryDirectory(suffix="prefect") as tmp_dir:
            cmd.append(tmp_dir)

            err_stream = io.StringIO()
            out_stream = io.StringIO()
            process = await run_process(cmd, stream_output=(out_stream, err_stream))
            if process.returncode != 0:
                err_stream.seek(0)
                raise OSError(f"Failed to pull from remote:\n {err_stream.read()}")

            content_source, content_destination = self._get_paths(
                dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
            )

            copytree(src=content_source, dst=content_destination, dirs_exist_ok=True)

access_token pydantic-field

Type: SecretStr

A GitHub Personal Access Token (PAT) with repo scope.

reference pydantic-field

Type: str

An optional reference to pin to; can be a branch name or tag.

repository pydantic-field required

Type: str

The URL of a GitHub repository to read from, in either HTTPS or SSH format.

GitHub.get_directory async

Clones a GitHub project specified in from_path to the provided local_path; defaults to cloning the repository reference configured on the Block to the present working directory.

Parameters:

Name Description Default
from_path

If provided, interpreted as a subdirectory of the underlying repository that will be copied to the provided local path.

Optional[str]
None
local_path

A local path to clone to; defaults to present working directory.

Optional[str]
None
Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
    """
    Clones a GitHub project specified in `from_path` to the provided `local_path`;
    defaults to cloning the repository reference configured on the Block to the
    present working directory.

    Args:
        from_path: If provided, interpreted as a subdirectory of the underlying
            repository that will be copied to the provided local path.
        local_path: A local path to clone to; defaults to present working directory.
    """
    # CONSTRUCT COMMAND
    cmd = ["git", "clone", self._create_repo_url()]
    if self.reference:
        cmd += ["-b", self.reference]

    # Limit git history
    cmd += ["--depth", "1"]

    # Clone to a temporary directory and move the subdirectory over
    with TemporaryDirectory(suffix="prefect") as tmp_dir:
        cmd.append(tmp_dir)

        err_stream = io.StringIO()
        out_stream = io.StringIO()
        process = await run_process(cmd, stream_output=(out_stream, err_stream))
        if process.returncode != 0:
            err_stream.seek(0)
            raise OSError(f"Failed to pull from remote:\n {err_stream.read()}")

        content_source, content_destination = self._get_paths(
            dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
        )

        copytree(src=content_source, dst=content_destination, dirs_exist_ok=True)

LocalFileSystem pydantic-model

Store data as a file on a local file system.

Examples:

Load stored local file system config:

from prefect.filesystems import LocalFileSystem

local_file_system_block = LocalFileSystem.load("BLOCK_NAME")

Source code in prefect/filesystems.py
class LocalFileSystem(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on a local file system.

    Example:
        Load stored local file system config:
        ```python
        from prefect.filesystems import LocalFileSystem

        local_file_system_block = LocalFileSystem.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Local File System"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/EVKjxM7fNyi4NGUSkeTEE/95c958c5dd5a56c59ea5033e919c1a63/image1.png?h=250"

    basepath: Optional[str] = Field(
        default=None, description="Default local path for this block to write to."
    )

    @validator("basepath", pre=True)
    def cast_pathlib(cls, value):
        if isinstance(value, Path):
            return str(value)
        return value

    def _resolve_path(self, path: str) -> Path:
        # Only resolve the base path at runtime, default to the current directory
        basepath = (
            Path(self.basepath).expanduser().resolve()
            if self.basepath
            else Path(".").resolve()
        )

        # Determine the path to access relative to the base path, ensuring that paths
        # outside of the base path are off limits
        path: Path = Path(path).expanduser()
        if not path.is_absolute():
            path = basepath / path
        else:
            path = path.resolve()
            if not basepath in path.parents and (basepath != path):
                raise ValueError(
                    f"Provided path {path} is outside of the base path {basepath}."
                )

        return path

    @sync_compatible
    async def get_directory(
        self, from_path: str = None, local_path: str = None
    ) -> None:
        """
        Copies a directory from one place to another on the local filesystem.

        Defaults to copying the entire contents of the block's basepath to the current working directory.
        """
        if not from_path:
            from_path = Path(self.basepath).expanduser().resolve()
        else:
            from_path = Path(from_path).resolve()

        if not local_path:
            local_path = Path(".").resolve()
        else:
            local_path = Path(local_path).resolve()

        if from_path == local_path:
            # If the paths are the same there is no need to copy
            # and we avoid shutil.copytree raising an error
            return

        copytree(from_path, local_path, dirs_exist_ok=True)

    async def _get_ignore_func(self, local_path: str, ignore_file: str):
        with open(ignore_file, "r") as f:
            ignore_patterns = f.readlines()
        included_files = filter_files(root=local_path, ignore_patterns=ignore_patterns)

        def ignore_func(directory, files):
            relative_path = Path(directory).relative_to(local_path)

            files_to_ignore = [
                f for f in files if str(relative_path / f) not in included_files
            ]
            return files_to_ignore

        return ignore_func

    @sync_compatible
    async def put_directory(
        self, local_path: str = None, to_path: str = None, ignore_file: str = None
    ) -> None:
        """
        Copies a directory from one place to another on the local filesystem.

        Defaults to copying the entire contents of the current working directory to the block's basepath.

        An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore.
        """
        if not to_path:
            to_path = Path(self.basepath).expanduser()

        if not local_path:
            local_path = Path(".").absolute()

        if ignore_file:
            ignore_func = await self._get_ignore_func(
                local_path=local_path, ignore_file=ignore_file
            )
        else:
            ignore_func = None

        if local_path == to_path:
            pass
        else:
            copytree(
                src=local_path, dst=to_path, ignore=ignore_func, dirs_exist_ok=True
            )

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        path: Path = self._resolve_path(path)

        # Check if the path exists
        if not path.exists():
            raise ValueError(f"Path {path} does not exist.")

        # Validate that its a file
        if not path.is_file():
            raise ValueError(f"Path {path} is not a file.")

        async with await anyio.open_file(str(path), mode="rb") as f:
            content = await f.read()

        return content

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        path: Path = self._resolve_path(path)

        # Construct the path if it does not exist
        path.parent.mkdir(exist_ok=True, parents=True)

        # Check if the file already exists
        if path.exists() and not path.is_file():
            raise ValueError(f"Path {path} already exists and is not a file.")

        async with await anyio.open_file(path, mode="wb") as f:
            await f.write(content)

basepath pydantic-field

Type: str

Default local path for this block to write to.

LocalFileSystem.get_directory async

Copies a directory from one place to another on the local filesystem.

Defaults to copying the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: str = None, local_path: str = None
) -> None:
    """
    Copies a directory from one place to another on the local filesystem.

    Defaults to copying the entire contents of the block's basepath to the current working directory.
    """
    if not from_path:
        from_path = Path(self.basepath).expanduser().resolve()
    else:
        from_path = Path(from_path).resolve()

    if not local_path:
        local_path = Path(".").resolve()
    else:
        local_path = Path(local_path).resolve()

    if from_path == local_path:
        # If the paths are the same there is no need to copy
        # and we avoid shutil.copytree raising an error
        return

    copytree(from_path, local_path, dirs_exist_ok=True)

LocalFileSystem.put_directory async

Copies a directory from one place to another on the local filesystem.

Defaults to copying the entire contents of the current working directory to the block's basepath.

An ignore_file path may be provided that can include gitignore style expressions for filepaths to ignore.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self, local_path: str = None, to_path: str = None, ignore_file: str = None
) -> None:
    """
    Copies a directory from one place to another on the local filesystem.

    Defaults to copying the entire contents of the current working directory to the block's basepath.

    An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore.
    """
    if not to_path:
        to_path = Path(self.basepath).expanduser()

    if not local_path:
        local_path = Path(".").absolute()

    if ignore_file:
        ignore_func = await self._get_ignore_func(
            local_path=local_path, ignore_file=ignore_file
        )
    else:
        ignore_func = None

    if local_path == to_path:
        pass
    else:
        copytree(
            src=local_path, dst=to_path, ignore=ignore_func, dirs_exist_ok=True
        )

RemoteFileSystem pydantic-model

Store data as a file on a remote file system.

Supports any remote file system supported by fsspec. The file system is specified using a protocol. For example, "s3://my-bucket/my-folder/" will use S3.

Examples:

Load stored remote file system config:

from prefect.filesystems import RemoteFileSystem

remote_file_system_block = RemoteFileSystem.load("BLOCK_NAME")

Source code in prefect/filesystems.py
class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on a remote file system.

    Supports any remote file system supported by `fsspec`. The file system is specified
    using a protocol. For example, "s3://my-bucket/my-folder/" will use S3.

    Example:
        Load stored remote file system config:
        ```python
        from prefect.filesystems import RemoteFileSystem

        remote_file_system_block = RemoteFileSystem.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Remote File System"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/4CxjycqILlT9S9YchI7o1q/ee62e2089dfceb19072245c62f0c69d2/image12.png?h=250"

    basepath: str = Field(
        default=...,
        description="Default path for this block to write to.",
        example="s3://my-bucket/my-folder/",
    )
    settings: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional settings to pass through to fsspec.",
    )

    # Cache for the configured fsspec file system used for access
    _filesystem: fsspec.AbstractFileSystem = None

    @validator("basepath")
    def check_basepath(cls, value):
        scheme, netloc, _, _, _ = urllib.parse.urlsplit(value)

        if not scheme:
            raise ValueError(f"Base path must start with a scheme. Got {value!r}.")

        if not netloc:
            raise ValueError(
                f"Base path must include a location after the scheme. Got {value!r}."
            )

        if scheme == "file":
            raise ValueError(
                "Base path scheme cannot be 'file'. Use `LocalFileSystem` instead for local file access."
            )

        return value

    def _resolve_path(self, path: str) -> str:
        base_scheme, base_netloc, base_urlpath, _, _ = urllib.parse.urlsplit(
            self.basepath
        )
        scheme, netloc, urlpath, _, _ = urllib.parse.urlsplit(path)

        # Confirm that absolute paths are valid
        if scheme:
            if scheme != base_scheme:
                raise ValueError(
                    f"Path {path!r} with scheme {scheme!r} must use the same scheme as the base path {base_scheme!r}."
                )

        if netloc:
            if (netloc != base_netloc) or not urlpath.startswith(base_urlpath):
                raise ValueError(
                    f"Path {path!r} is outside of the base path {self.basepath!r}."
                )

        return f"{self.basepath.rstrip('/')}/{urlpath.lstrip('/')}"

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        """
        Downloads a directory from a given remote path to a local direcotry.

        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        if from_path is None:
            from_path = str(self.basepath)
        else:
            from_path = self._resolve_path(from_path)

        if local_path is None:
            local_path = Path(".").absolute()

        return self.filesystem.get(from_path, local_path, recursive=True)

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
        overwrite: bool = True,
    ) -> int:
        """
        Uploads a directory from a given local path to a remote direcotry.

        Defaults to uploading the entire contents of the current working directory to the block's basepath.
        """
        if to_path is None:
            to_path = str(self.basepath)
        else:
            to_path = self._resolve_path(to_path)

        if local_path is None:
            local_path = "."

        included_files = None
        if ignore_file:
            with open(ignore_file, "r") as f:
                ignore_patterns = f.readlines()

            included_files = filter_files(
                local_path, ignore_patterns, include_dirs=True
            )

        counter = 0
        for f in Path(local_path).rglob("*"):
            relative_path = f.relative_to(local_path)
            if included_files and str(relative_path) not in included_files:
                continue

            if to_path.endswith("/"):
                fpath = to_path + relative_path.as_posix()
            else:
                fpath = to_path + "/" + relative_path.as_posix()

            if f.is_dir():
                pass
            else:
                f = f.as_posix()
                if overwrite:
                    self.filesystem.put_file(f, fpath, overwrite=True)
                else:
                    self.filesystem.put_file(f, fpath)

                counter += 1

        return counter

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        path = self._resolve_path(path)

        with self.filesystem.open(path, "rb") as file:
            content = await run_sync_in_worker_thread(file.read)

        return content

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        path = self._resolve_path(path)
        dirpath = path[: path.rindex("/")]

        self.filesystem.makedirs(dirpath, exist_ok=True)

        with self.filesystem.open(path, "wb") as file:
            await run_sync_in_worker_thread(file.write, content)

    @property
    def filesystem(self) -> fsspec.AbstractFileSystem:
        if not self._filesystem:
            scheme, _, _, _, _ = urllib.parse.urlsplit(self.basepath)

            try:
                self._filesystem = fsspec.filesystem(scheme, **self.settings)
            except ImportError as exc:
                # The path is a remote file system that uses a lib that is not installed
                raise RuntimeError(
                    f"File system created with scheme {scheme!r} from base path "
                    f"{self.basepath!r} could not be created. "
                    "You are likely missing a Python module required to use the given "
                    "storage protocol."
                ) from exc

        return self._filesystem

basepath pydantic-field required

Type: str

Default path for this block to write to.

settings pydantic-field

Type: Dict[str, Any]

Additional settings to pass through to fsspec.

RemoteFileSystem.get_directory async

Downloads a directory from a given remote path to a local direcotry.

Defaults to downloading the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
    """
    Downloads a directory from a given remote path to a local direcotry.

    Defaults to downloading the entire contents of the block's basepath to the current working directory.
    """
    if from_path is None:
        from_path = str(self.basepath)
    else:
        from_path = self._resolve_path(from_path)

    if local_path is None:
        local_path = Path(".").absolute()

    return self.filesystem.get(from_path, local_path, recursive=True)

RemoteFileSystem.put_directory async

Uploads a directory from a given local path to a remote direcotry.

Defaults to uploading the entire contents of the current working directory to the block's basepath.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
    overwrite: bool = True,
) -> int:
    """
    Uploads a directory from a given local path to a remote direcotry.

    Defaults to uploading the entire contents of the current working directory to the block's basepath.
    """
    if to_path is None:
        to_path = str(self.basepath)
    else:
        to_path = self._resolve_path(to_path)

    if local_path is None:
        local_path = "."

    included_files = None
    if ignore_file:
        with open(ignore_file, "r") as f:
            ignore_patterns = f.readlines()

        included_files = filter_files(
            local_path, ignore_patterns, include_dirs=True
        )

    counter = 0
    for f in Path(local_path).rglob("*"):
        relative_path = f.relative_to(local_path)
        if included_files and str(relative_path) not in included_files:
            continue

        if to_path.endswith("/"):
            fpath = to_path + relative_path.as_posix()
        else:
            fpath = to_path + "/" + relative_path.as_posix()

        if f.is_dir():
            pass
        else:
            f = f.as_posix()
            if overwrite:
                self.filesystem.put_file(f, fpath, overwrite=True)
            else:
                self.filesystem.put_file(f, fpath)

            counter += 1

    return counter

S3 pydantic-model

Store data as a file on AWS S3.

Examples:

Load stored S3 config:

from prefect.filesystems import S3

s3_block = S3.load("BLOCK_NAME")

Source code in prefect/filesystems.py
class S3(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on AWS S3.

    Example:
        Load stored S3 config:
        ```python
        from prefect.filesystems import S3

        s3_block = S3.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "S3"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250"

    bucket_path: str = Field(
        default=...,
        description="An S3 bucket path.",
        example="my-bucket/a-directory-within",
    )
    aws_access_key_id: Optional[SecretStr] = Field(
        default=None,
        title="AWS Access Key ID",
        description="Equivalent to the AWS_ACCESS_KEY_ID environment variable.",
        example="AKIAIOSFODNN7EXAMPLE",
    )
    aws_secret_access_key: Optional[SecretStr] = Field(
        default=None,
        title="AWS Secret Access Key",
        description="Equivalent to the AWS_SECRET_ACCESS_KEY environment variable.",
        example="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    )

    _remote_file_system: RemoteFileSystem = None

    @property
    def basepath(self) -> str:
        return f"s3://{self.bucket_path}"

    @property
    def filesystem(self) -> RemoteFileSystem:
        settings = {}
        if self.aws_access_key_id:
            settings["key"] = self.aws_access_key_id.get_secret_value()
        if self.aws_secret_access_key:
            settings["secret"] = self.aws_secret_access_key.get_secret_value()
        self._remote_file_system = RemoteFileSystem(
            basepath=f"s3://{self.bucket_path}", settings=settings
        )
        return self._remote_file_system

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> bytes:
        """
        Downloads a directory from a given remote path to a local directory.

        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        return await self.filesystem.get_directory(
            from_path=from_path, local_path=local_path
        )

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to a remote directory.

        Defaults to uploading the entire contents of the current working directory to the block's basepath.
        """
        return await self.filesystem.put_directory(
            local_path=local_path, to_path=to_path, ignore_file=ignore_file
        )

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        return await self.filesystem.read_path(path)

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        return await self.filesystem.write_path(path=path, content=content)

aws_access_key_id pydantic-field

Type: SecretStr

Equivalent to the AWS_ACCESS_KEY_ID environment variable.

aws_secret_access_key pydantic-field

Type: SecretStr

Equivalent to the AWS_SECRET_ACCESS_KEY environment variable.

bucket_path pydantic-field required

Type: str

An S3 bucket path.

S3.get_directory async

Downloads a directory from a given remote path to a local directory.

Defaults to downloading the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> bytes:
    """
    Downloads a directory from a given remote path to a local directory.

    Defaults to downloading the entire contents of the block's basepath to the current working directory.
    """
    return await self.filesystem.get_directory(
        from_path=from_path, local_path=local_path
    )

S3.put_directory async

Uploads a directory from a given local path to a remote directory.

Defaults to uploading the entire contents of the current working directory to the block's basepath.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to a remote directory.

    Defaults to uploading the entire contents of the current working directory to the block's basepath.
    """
    return await self.filesystem.put_directory(
        local_path=local_path, to_path=to_path, ignore_file=ignore_file
    )

SMB pydantic-model

Store data as a file on a SMB share.

Examples:

Load stored SMB config:

from prefect.filesystems import SMB
smb_block = SMB.load("BLOCK_NAME")
Source code in prefect/filesystems.py
class SMB(WritableFileSystem, WritableDeploymentStorage):
    """
    Store data as a file on a SMB share.

    Example:

        Load stored SMB config:

        ```python
        from prefect.filesystems import SMB
        smb_block = SMB.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "SMB"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/6J444m3vW6ukgBOCinSxLk/025f5562d3c165feb7a5df599578a6a8/samba_2010_logo_transparent_151x27.png?h=250"

    share_path: str = Field(
        default=...,
        description="SMB target (requires <SHARE>, followed by <PATH>).",
        example="/SHARE/dir/subdir",
    )
    smb_username: Optional[SecretStr] = Field(
        default=None,
        title="SMB Username",
        description="Username with access to the target SMB SHARE.",
    )
    smb_password: Optional[SecretStr] = Field(
        default=None, title="SMB Password", description="Password for SMB access."
    )
    smb_host: str = Field(
        default=..., tile="SMB server/hostname", description="SMB server/hostname."
    )
    smb_port: Optional[int] = Field(
        default=None, title="SMB port", description="SMB port (default: 445)."
    )

    _remote_file_system: RemoteFileSystem = None

    @property
    def basepath(self) -> str:
        return f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}"

    @property
    def filesystem(self) -> RemoteFileSystem:
        settings = {}
        if self.smb_username:
            settings["username"] = self.smb_username.get_secret_value()
        if self.smb_password:
            settings["password"] = self.smb_password.get_secret_value()
        if self.smb_host:
            settings["host"] = self.smb_host
        if self.smb_port:
            settings["port"] = self.smb_port
        self._remote_file_system = RemoteFileSystem(
            basepath=f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}",
            settings=settings,
        )
        return self._remote_file_system

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> bytes:
        """
        Downloads a directory from a given remote path to a local directory.
        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        return await self.filesystem.get_directory(
            from_path=from_path, local_path=local_path
        )

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to a remote directory.
        Defaults to uploading the entire contents of the current working directory to the block's basepath.
        """
        return await self.filesystem.put_directory(
            local_path=local_path,
            to_path=to_path,
            ignore_file=ignore_file,
            overwrite=False,
        )

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        return await self.filesystem.read_path(path)

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        return await self.filesystem.write_path(path=path, content=content)

share_path pydantic-field required

Type: str

SMB target (requires , followed by ).

smb_host pydantic-field required

Type: str

SMB server/hostname.

smb_password pydantic-field

Type: SecretStr

Password for SMB access.

smb_port pydantic-field

Type: int

SMB port (default: 445).

smb_username pydantic-field

Type: SecretStr

Username with access to the target SMB SHARE.

SMB.get_directory async

Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory.

Source code in prefect/filesystems.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> bytes:
    """
    Downloads a directory from a given remote path to a local directory.
    Defaults to downloading the entire contents of the block's basepath to the current working directory.
    """
    return await self.filesystem.get_directory(
        from_path=from_path, local_path=local_path
    )

SMB.put_directory async

Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath.

Source code in prefect/filesystems.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to a remote directory.
    Defaults to uploading the entire contents of the current working directory to the block's basepath.
    """
    return await self.filesystem.put_directory(
        local_path=local_path,
        to_path=to_path,
        ignore_file=ignore_file,
        overwrite=False,
    )