Distributed job runner/task scheduler with postgres and spring
with code samples in Kotlin
Table of contents
Imagine a scenario where there is a system that is inserting new rows occasionally in your database table. and your application is querying the database to see if there are any new rows inserted in the table, and if there are any new rows, it will take those rows up and process and update them.
In a non-distributed system, the process of querying the database to see if there are any new rows, and processing and updating them, works fine. However, in a distributed system, there's always the possibility of multiple instances updating a single row, leading to data race and deadlocking.
One of the solutions to this problem is using a distributed job runner/task scheduler. frameworks like shedlock(github.com/lukas-krecan/ShedLock) solve this by employing a distributed lock. Shedlock gives you the guarantee that even in the case of multiple instances/nodes/threads, a task lock can be acquired by at most 1 node. But this is not the most efficient and scalable solution in a distributed environment. If there are multiple nodes, only one node will do all the processing, and other nodes will be idle, effectively emulating a non-distributed environment.
A better solution is to leverage the Postgres
pessimistic locking in combination with skip_locked
to pick up equal amounts of rows to process. This way, we can scale up or down the number of nodes to handle the load. Let me show how in an example.
Example with Spring boot
Setup
Let's assume there is a very simple table consisting of only 2 columns, the primary key id
and the data
,
CREATE TABLE test(
id BIGINT PRIMARY KEY,
data TEXT,
processed BOOLEAN
);
The equivalent JPA entity and repository would be,
@Entity
class Test(
@Id val id: Long,
val data: String,
val processed: Boolean
)
interface TestRepository: JpaRepository<Test, Long>
Now imagine someplace where we need to check for new rows in the table and update them,
@Service
class TestService(private val testRepo: TestRepository) {
@Transactional
fun doSomething() {
val t = testRepo.findFirstByProcessedIsFalse() // ----- (1)
if (t != null) {
process(t)
testRepo.save(t)
}
}
private fun process(t: Test) {
TODO("do processing")
}
}
- (1): this method does not exist yet. We'll add them next
interface TestRepository: JpaRepository<Test, Long> {
fun findFirstByProcessedIsFalse(): Test?
}
Problem
A single instance of this code will run just fine. But in a distributed system, there would be data race when multiple nodes will try to get a hold of the same row from the DB table. Since this operation is transactional
, only one node will be able to acquire the locks. This will result in blocked threads in other nodes, which will either wait till the lock is released or the DB lock timeout is up.
Solution
Let's solve this using pessimistic locking
in combination with skip_locked
.
const val SKIP_LOCK = "-2"
interface TestRepository: JpaRepository<Test, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE) // ------ (1)
@QueryHints(QueryHint(name = AvailableSettings.JPA_LOCK_TIMEOUT, value = SKIP_LOCK)) // ------ (2)
fun findFirstByProcessedIsFalse(): Test?
}
The method employs two tricks:
@Lock(LockModeType.PESSIMISTIC_WRITE)
creates and holds a pessimistic lock on a row. As long as the lock is held, the rows cannot be edited by other threads.
@QueryHints(QueryHint(name = AvailableSettings.JPA_LOCK_TIMEOUT, value = SKIP_LOCK))
instructs other threads not to wait for acquiring the pessimistic locks and instead skip them and move on to the next free row. This is achieved by changing theJPA_LOCK_TIMEOUT
mode. TheSKIP_LOCK
timeout mode value in JPA is-2
.
And thats it. No need to change the rest of the code. This will ensure that the nodes in a distributed system will be able to get a hold of unique rows and do processing on them, effectively enabling a more scalable system.