transaction
context manager:
pipeline(contents="hello world!")
it will fail.
Importantly, after the flow has exited, there is no "side-effect.txt"
file in
your working directory.
This is because the write_file
task’s on_rollback
hook was executed due to the transaction failing.
on_rollback
hooks are different than on_failure
hooksNote that the on_rollback
hook is executed when the quality_test
task fails, not the write_file
task that it is associated with it, which succeeded.
This is because rollbacks occur whenever the transaction a task is participating in fails, even if that
failure is outside the task’s local scope.
This behavior makes transactions a valuable pattern for managing pipeline failure.with transaction()
is executed; this transaction remains active
throughout the duration of the subtransactions within it.write_file
task. Upon completion of the write_file
task, this
transaction is now STAGED.quality_test
task. This transaction fails before it can be staged,
causing a rollback in its parent transaction which then rolls back any staged subtransactions. In particular,
the staged write_file
’s transaction is rolled back.on_commit
lifecycle hooksIn addition to the on_rollback
hook, a task can also register on_commit
hooks that execute whenever
its transaction is committed.
A task run persists its result only at transaction commit time, which could be significantly
after the task’s completion time if it is within a long running transaction.The signature for an on_commit
hook is the same as that of an on_rollback
hook:key
for your transaction, you can ensure that
your code is executed only once.
For example, here’s a flow that downloads some data from an API and writes it to a file:
key
will cause the transaction to write a record on commit signifying that the transaction has completed.
The call to txn.is_committed()
will return True
only if the persisted record exists.
READ_COMMITED
which means that they can see any previously committed records, but they are not prevented from overwriting
a record that was created by another transaction between the time they started and the time they committed.
To see this behavior in action in the following script:
key
argument between runs.
To prevent race conditions, you can set the isolation_level
of a transaction to SERIALIZABLE
. This will cause each transaction to take a lock on the
provided key. This will prevent other transactions from starting until the first transaction has completed.
Here’s an updated example that uses SERIALIZABLE
isolation:
SERIALIZABLE
isolation level, you must also provide a lock_manager
to the transaction
context manager. The
lock manager is responsible for acquiring and releasing locks on the transaction key. In the example above, we use a FileSystemLockManager
which
will manage locks as files on the current instance’s filesystem.
Prefect offers several lock managers for different concurrency use cases:
Lock Manager | Storage | Supports | Module/Package |
---|---|---|---|
MemoryLockManager | In-memory | Single-process workflows using threads | prefect.locking.memory |
FileLockManager | Filesystem | Multi-process workflows on a single machine | prefect.locking.filesystem |
RedisLockManager | Redis database | Distributed workflows | prefect-redis |
on_rollback
hook.
The code below shows how to set a key-value pair within a transaction and access it within the on_rollback
hook:
contents
is accessible within the on_rollback
hook.
Use get_transaction()
to access the transaction object and txn.get("key")
to access the value of the key.