prefect.orion.api.run_history
¶
Utilities for querying flow and task run history.
run_history
async
¶
Produce a history of runs aggregated by interval and state
Source code in prefect/orion/api/run_history.py
@inject_db
async def run_history(
session: sa.orm.Session,
db: OrionDBInterface,
run_type: Literal["flow_run", "task_run"],
history_start: DateTimeTZ,
history_end: DateTimeTZ,
history_interval: datetime.timedelta,
flows: schemas.filters.FlowFilter = None,
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
) -> List[schemas.responses.HistoryResponse]:
"""
Produce a history of runs aggregated by interval and state
"""
# SQLite has issues with very small intervals
# (by 0.001 seconds it stops incrementing the interval)
if history_interval < datetime.timedelta(seconds=1):
raise ValueError("History interval must not be less than 1 second.")
# prepare run-specific models
if run_type == "flow_run":
run_model = db.FlowRun
state_model = db.FlowRunState
run_filter_function = models.flow_runs._apply_flow_run_filters
elif run_type == "task_run":
run_model = db.TaskRun
state_model = db.TaskRunState
run_filter_function = models.task_runs._apply_task_run_filters
# create a CTE for timestamp intervals
intervals = db.make_timestamp_intervals(
history_start,
history_end,
history_interval,
).cte("intervals")
# apply filters to the flow runs (and related states)
runs = (
await run_filter_function(
sa.select(
run_model.id,
run_model.expected_start_time,
run_model.estimated_run_time,
run_model.estimated_start_time_delta,
state_model.type.label("state_type"),
state_model.name.label("state_name"),
)
.select_from(run_model)
.join(state_model, run_model.state_id == state_model.id),
flow_filter=flows,
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
)
).alias("runs")
# outer join intervals to the filtered runs to create a dataset composed of
# every interval and the aggregate of all its runs. The runs aggregate is represented
# by a descriptive JSON object
counts = (
sa.select(
intervals.c.interval_start,
intervals.c.interval_end,
# build a JSON object, ignoring the case where the count of runs is 0
sa.case(
(sa.func.count(runs.c.id) == 0, None),
else_=db.build_json_object(
"state_type",
runs.c.state_type,
"state_name",
runs.c.state_name,
"count_runs",
sa.func.count(runs.c.id),
# estimated run times only includes positive run times (to avoid any unexpected corner cases)
"sum_estimated_run_time",
sa.func.sum(
db.greatest(0, sa.extract("epoch", runs.c.estimated_run_time))
),
# estimated lateness is the sum of any positive start time deltas
"sum_estimated_lateness",
sa.func.sum(
db.greatest(
0, sa.extract("epoch", runs.c.estimated_start_time_delta)
)
),
),
).label("state_agg"),
)
.select_from(intervals)
.join(
runs,
sa.and_(
runs.c.expected_start_time >= intervals.c.interval_start,
runs.c.expected_start_time < intervals.c.interval_end,
),
isouter=True,
)
.group_by(
intervals.c.interval_start,
intervals.c.interval_end,
runs.c.state_type,
runs.c.state_name,
)
).alias("counts")
# aggregate all state-aggregate objects into a single array for each interval,
# ensuring that intervals with no runs have an empty array
query = (
sa.select(
counts.c.interval_start,
counts.c.interval_end,
sa.func.coalesce(
db.json_arr_agg(db.cast_to_json(counts.c.state_agg)).filter(
counts.c.state_agg.is_not(None)
),
sa.text("'[]'"),
).label("states"),
)
.group_by(counts.c.interval_start, counts.c.interval_end)
.order_by(counts.c.interval_start)
# return no more than 500 bars
.limit(500)
)
# issue the query
result = await session.execute(query)
records = result.all()
# load and parse the record if the database returns JSON as strings
if db.uses_json_strings:
records = [dict(r) for r in records]
for r in records:
r["states"] = json.loads(r["states"])
return pydantic.parse_obj_as(List[schemas.responses.HistoryResponse], records)