# Schedules

Looking for the latest Prefect 2 release? Prefect 2 and Prefect Cloud 2 have been released for General Availability. See https://docs.prefect.io/ for details.

# Overview

Prefect assumes that flows can be run at any time, for any reason. However, it is often useful to automate flow runs at certain times. Simple schedules can be attached to Flows via the schedule keyword argument. For more detailed or complex schedules, Prefect provides a versatile Schedule object which allows for subtle date time adjustments and filtering, along with updating Parameter values based on the scheduled time.

# Simple Schedules

Simple schedules can be attached to Flows via the schedule keyword argument:

from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule


@task
def say_hello():
    print("Hello, world!")


schedule = IntervalSchedule(interval=timedelta(minutes=2))

with Flow("Hello", schedule=schedule) as flow:
    say_hello()

flow.run()

You can see a cron schedule in this example.

# Complex Schedules

Prefect Schedules have three components:

  • a clock that emits events. For example, an IntervalClock might emit an event every hour; a CronClock could emit an event according to a cron string. A single schedule may include multiple clocks. Clocks can also be used to specify varying Parameter values for each flow run.
  • filters that decide whether an event should be included or not. For example, a filter might be set that only allows events on weekdays, or only during business hours.
  • adjustments that can be used to modify events that pass the filters. For example, an adjustment could advance an event to the next business day, or the last business day of the month.

These three components allow users to combine simple functions into complex behavior.

Note that many examples here use the Pendulum Python package for easy datetime manipulation. Pendulum isn’t required, but it’s employed in many schedule use cases, such as start and end dates.

# Clocks

# Interval clocks

The most basic Prefect clock is the IntervalClock. It takes an interval argument and emits events on a regular basis. An optional start_date can be provided, in which case the intervals will be relative to that date; an end_date can be provided as well.

Prefect does not support sub-minute schedules.

from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

schedule = Schedule(clocks=[IntervalClock(timedelta(hours=24))])

schedule.next(5)

Time Zones

Want to pin your schedule to a time zone? Specify a start_date corresponding to that time zone for your clock:

import pendulum

schedules.clocks.IntervalClock(
    start_date=pendulum.datetime(2019, 1, 1, tz="America/New_York"),
    interval=timedelta(days=1),
)

Daylight Saving Time

If the IntervalClock start time is provided with a DST-observing timezone, then the schedule will adjust itself appropriately. Intervals greater than 24 hours will follow DST conventions, while intervals of less than 24 hours will follow UTC intervals. For example, an hourly schedule will fire every UTC hour, even across DST boundaries. When clocks are set back, this will result in two runs that appear to both be scheduled for 1am local time, even though they are an hour apart in UTC time. For longer intervals, like a daily schedule, the interval schedule will adjust for DST boundaries so that the clock-hour remains constant. This means that a daily schedule that always fires at 9am will observe DST and continue to fire at 9am in the local time zone.

Note that this behavior is different from the CronClock.

# Cron clocks

Clocks can also be generated from cron strings with the Prefect CronClock.

from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

schedule = Schedule(clocks=[CronClock("0 0 * * *")])

schedule.next(5)

Daylight Saving Time

If the CronClock start time is provided with a DST-observing timezone, then the schedule will adjust itself. Cron's rules for DST are based on clock times, not intervals. This means that an hourly cron schedule will fire on every new clock hour, not every elapsed hour; for example, when clocks are set back this will result in a two-hour pause as the schedule will fire the first time 1am is reached and the first time 2am is reached, 120 minutes later. Longer schedules, such as one that fires at 9am every morning, will automatically adjust for DST.

Note that this behavior is different from the IntervalClock.

# Date clocks

For more ad-hoc schedules, Prefect provides a DatesClock that only fires on specific, user-provided dates.

import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import DatesClock

schedule = Schedule(
    clocks=[DatesClock([pendulum.now().add(days=1), pendulum.now().add(days=2)])]
)

schedule.next(2)

# Recurrence Rule Clocks

The Prefect RRuleClock supports iCal recurrence rules (RRules), which provide convenient syntax for creating repetitive schedules. Schedules can repeat on a frequency from yearly down to every minute.

RRuleClock uses the dateutil rrule module to specify iCal recurrence rules.

For example, RRuleClock can specify a schedule recurring every day for a week:

from dateutil.rrule import rrule, DAILY
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import RRuleClock

start_date = pendulum.now().add(days=1)
r_rule = rrule(freq=DAILY, count=7)

schedule = Schedule(clocks=[RRuleClock(r_rule, start_date=start_date)])

# Varying Parameter Values

All clocks support an optional parameter_defaults argument that allows users to specify varying Parameter values for each flow run generated from this clock. For example, suppose we have the following flow that logs the value of the Parameter that is passed to it:

import prefect
from prefect import task, Flow, Parameter


@task
def log_param(p):
    logger = prefect.context["logger"]
    logger.info(f"Received parameter value {p}")


p = Parameter("p", default=None, required=False)

with Flow("Varying Parameters") as flow:
    log_param(p)

Each time we run this flow, we can optionally pass a new value for the p parameter; if we were to run this flow on a fixed schedule, we might want to pass different values for p depending on which schedule is being called - we can do this through the use of clocks:

import datetime
from prefect.schedules import clocks, Schedule

now = datetime.datetime.utcnow()

clock1 = clocks.IntervalClock(
    start_date=now,
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"p": "CLOCK 1"},
)
clock2 = clocks.IntervalClock(
    start_date=now + datetime.timedelta(seconds=30),
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"p": "CLOCK 2"},
)

# the full schedule
schedule = Schedule(clocks=[clock1, clock2])

flow.schedule = schedule  # set the schedule on the Flow
flow.run()

When we run this flow on its schedule as above, we will see the parameter value change in the log with each new run:

...
INFO - prefect.Task: log_param | Received parameter value CLOCK 2
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 1
...

# Filters

Prefect provides a variety of event filters, including:

  • on_datetime (allows events on a certain datetime)
  • on_date (allows events on a certain date, for example March 15)
  • at_time (allows events at a certain time, such as 3:30pm)
  • between_datetimes (allows events between two specific datetimes)
  • between_times (allows events between two times, for example 9am and 5pm)
  • between_dates (allows events between two calendar dates, such as January 1 and March 31)
  • is_weekday (allows events on weekdays)
  • is_weekend (allows events on weekends)
  • is_month_end (allows events on month-end)

Filters can be provided to schedules in three different ways:

  • filters: ALL filters must return True for the event to be included
  • or_filters: AT LEAST ONE filter must return True for the even to be included
  • not_filters: NO filters can return True for the event to be included
schedules.Schedule(
    # fire every hour
    clocks=[clocks.IntervalClock(timedelta(hours=1))],
    # but only on weekdays
    filters=[filters.is_weekday],
    # and only at 9am or 3pm
    or_filters=[
        filters.between_times(pendulum.time(9), pendulum.time(9)),
        filters.between_times(pendulum.time(15), pendulum.time(15)),
    ],
    # and not in January
    not_filters=[filters.between_dates(1, 1, 1, 31)],
)

# Adjustments

Adjustments allow schedules to modify dates that are emitted by clocks and pass a collection of filters:

  • add (adds an interval to the date)
  • next_weekday (advances the date to the next weekday)
schedules.Schedule(
    # fire every day
    clocks=[clocks.IntervalClock(timedelta(days=1))],
    # filtered for month ends
    filters=[filters.is_month_end],
    # and run on the next weekday
    adjustments=[adjustments.next_weekday],
)