Skip to content

prefect.cli.orion

Command line interface for working with Orion

downgrade async

Downgrade the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def downgrade(
    yes: bool = typer.Option(False, "--yes", "-y"),
    revision: str = typer.Option(
        "base",
        "-r",
        help="The revision to pass to `alembic downgrade`. If not provided, runs all migrations.",
    ),
    dry_run: bool = typer.Option(
        False,
        help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
    ),
):
    """Downgrade the Orion database"""
    db = provide_database_interface()
    engine = await db.engine()

    if not yes:
        confirm = typer.confirm(
            "Are you sure you want to downgrade the Orion "
            f"database at {engine.url!r}?"
        )
        if not confirm:
            exit_with_error("Database downgrade aborted!")

    app.console.print("Running downgrade migrations ...")
    await run_sync_in_worker_thread(
        alembic_downgrade, revision=revision, dry_run=dry_run
    )
    app.console.print("Migrations succeeded!")
    exit_with_success(f"Orion database at {engine.url!r} downgraded!")

reset async

Drop and recreate all Orion database tables

Source code in prefect/cli/orion.py
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
    """Drop and recreate all Orion database tables"""
    db = provide_database_interface()
    engine = await db.engine()
    if not yes:
        confirm = typer.confirm(
            "Are you sure you want to reset the Orion database located "
            f'at "{engine.url!r}"? This will drop and recreate all tables.'
        )
        if not confirm:
            exit_with_error("Database reset aborted")
    app.console.print("Downgrading database...")
    await db.drop_db()
    app.console.print("Upgrading database...")
    await db.create_db()
    exit_with_success(f'Orion database "{engine.url!r}" reset!')

revision async

Create a new migration for the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def revision(
    message: str = typer.Option(
        None,
        "--message",
        "-m",
        help="A message to describe the migration.",
    ),
    autogenerate: bool = False,
):
    """Create a new migration for the Orion database"""

    app.console.print("Running migration file creation ...")
    await run_sync_in_worker_thread(
        alembic_revision,
        message=message,
        autogenerate=autogenerate,
    )
    exit_with_success("Creating new migration file succeeded!")

stamp async

Stamp the revision table with the given revision; don’t run any migrations

Source code in prefect/cli/orion.py
@database_app.command()
async def stamp(revision: str):
    """Stamp the revision table with the given revision; don’t run any migrations"""

    app.console.print("Stamping database with revision ...")
    await run_sync_in_worker_thread(alembic_stamp, revision=revision)
    exit_with_success("Stamping database with revision succeeded!")

start async

Start an Orion server

Source code in prefect/cli/orion.py
@orion_app.command()
async def start(
    host: str = SettingsOption(PREFECT_ORION_API_HOST),
    port: int = SettingsOption(PREFECT_ORION_API_PORT),
    log_level: str = SettingsOption(PREFECT_LOGGING_SERVER_LEVEL),
    scheduler: bool = SettingsOption(PREFECT_ORION_SERVICES_SCHEDULER_ENABLED),
    analytics: bool = SettingsOption(
        PREFECT_ORION_ANALYTICS_ENABLED, "--analytics-on/--analytics-off"
    ),
    late_runs: bool = SettingsOption(PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED),
    ui: bool = SettingsOption(PREFECT_ORION_UI_ENABLED),
):
    """Start an Orion server"""

    server_env = os.environ.copy()
    server_env["PREFECT_ORION_SERVICES_SCHEDULER_ENABLED"] = str(scheduler)
    server_env["PREFECT_ORION_ANALYTICS_ENABLED"] = str(analytics)
    server_env["PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED"] = str(late_runs)
    server_env["PREFECT_ORION_SERVICES_UI"] = str(ui)
    server_env["PREFECT_LOGGING_SERVER_LEVEL"] = log_level

    base_url = f"http://{host}:{port}"

    async with anyio.create_task_group() as tg:
        app.console.print(generate_welcome_blurb(base_url, ui_enabled=ui))
        app.console.print("\n")

        orion_process_id = await tg.start(
            partial(
                run_process,
                command=[
                    "uvicorn",
                    "--factory",
                    "prefect.orion.api.server:create_app",
                    "--host",
                    str(host),
                    "--port",
                    str(port),
                ],
                env=server_env,
                stream_output=True,
            )
        )

        # Explicitly handle the interrupt signal here, as it will allow us to cleanly
        # stop the Orion uvicorn server with a SIGTERM.  Failing to do that may cause
        # a large amount of anyio error traces on the terminal, because the SIGINT is
        # handled by Typer/Click in this (the parent process) and will start
        # shutting down subprocesses:
        # https://github.com/PrefectHQ/orion/issues/2475
        # The first interrupt with send a SIGTERM to uvicorn, then subsequent ones will
        # send SIGKILL
        def stop_orion(*args):
            app.console.print("\nStopping Orion...")
            os.kill(orion_process_id, signal.SIGTERM)
            signal.signal(signal.SIGINT, kill_orion)

        def kill_orion(*args):
            app.console.print("\nKilling Orion...")
            os.kill(orion_process_id, signal.SIGKILL)

        signal.signal(signal.SIGINT, stop_orion)

    app.console.print("Orion stopped!")

upgrade async

Upgrade the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def upgrade(
    yes: bool = typer.Option(False, "--yes", "-y"),
    revision: str = typer.Option(
        "head",
        "-r",
        help="The revision to pass to `alembic upgrade`. If not provided, runs all migrations.",
    ),
    dry_run: bool = typer.Option(
        False,
        help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
    ),
):
    """Upgrade the Orion database"""
    db = provide_database_interface()
    engine = await db.engine()

    if not yes:
        confirm = typer.confirm(
            "Are you sure you want to upgrade the " f"Orion database at {engine.url!r}?"
        )
        if not confirm:
            exit_with_error("Database upgrade aborted!")

    app.console.print("Running upgrade migrations ...")
    await run_sync_in_worker_thread(alembic_upgrade, revision=revision, dry_run=dry_run)
    app.console.print("Migrations succeeded!")
    exit_with_success(f"Orion database at {engine.url!r} upgraded!")