commit 6974246a9944845fbed3fbaabdf8ce92cacf8294
parent de667818328e178f7932e34294cc3be80388a0e1
Author: MS <ms@taler.net>
Date: Thu, 6 Apr 2023 18:32:46 +0200
scheduler helper
Diffstat:
3 files changed, 87 insertions(+), 38 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -33,7 +33,6 @@ import com.github.ajalt.clikt.parameters.options.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
-import kotlinx.coroutines.newSingleThreadContext
import startServer
import tech.libeufin.nexus.iso20022.parseCamtMessage
import tech.libeufin.nexus.server.client
@@ -75,7 +74,7 @@ class Serve : CliktCommand("Run nexus HTTP server") {
override fun run() {
setLogLevel(logLevel)
execThrowableOrTerminate { dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
- CoroutineScope(Dispatchers.IO).launch(fallback) { startOperationScheduler(client) }
+ CoroutineScope(Dispatchers.IO).launch(fallback) { whileTrueOperationScheduler(client) }
if (withUnixSocket != null) {
startServer(
withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -34,6 +34,8 @@ import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant
import java.time.ZonedDateTime
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
private data class TaskSchedule(
@@ -53,7 +55,6 @@ private suspend fun runTask(client: HttpClient, sched: TaskSchedule) {
when (sched.type) {
// Downloads and ingests the payment records from the bank.
"fetch" -> {
- @Suppress("BlockingMethodInNonBlockingContext")
val fetchSpec = jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java)
fetchBankAccountTransactions(client, fetchSpec, sched.resourceId)
/**
@@ -108,45 +109,51 @@ val fallback = CoroutineExceptionHandler { _, err ->
logger.error(err.stackTraceToString())
exitProcess(1)
}
-suspend fun startOperationScheduler(httpClient: HttpClient) {
- while (true) {
- // First, assign next execution time stamps to all tasks that need them
- transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
- }.forEach {
- val cron = try { NexusCron.parser.parse(it.taskCronspec) }
- catch (e: IllegalArgumentException) {
- logger.error("invalid cronspec in schedule ${it.resourceType}/${it.resourceId}/${it.taskName}")
- return@forEach
- }
- val zonedNow = ZonedDateTime.now()
- val parsedCron = ExecutionTime.forCron(cron)
- val next = parsedCron.nextExecution(zonedNow)
- logger.info("scheduling task ${it.taskName} at $next (now is $zonedNow)")
- it.nextScheduledExecutionSec = next.get().toEpochSecond()
+
+// Internal routine ultimately scheduling the tasks.
+private suspend fun operationScheduler(httpClient: HttpClient) {
+ // First, assign next execution time stamps to all tasks that need them
+ transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+ }.forEach {
+ val cron = try { NexusCron.parser.parse(it.taskCronspec) }
+ catch (e: IllegalArgumentException) {
+ logger.error("invalid cronspec in schedule ${it.resourceType}/${it.resourceId}/${it.taskName}")
+ return@forEach
}
+ val zonedNow = ZonedDateTime.now()
+ val parsedCron = ExecutionTime.forCron(cron)
+ val next = parsedCron.nextExecution(zonedNow)
+ logger.info("Scheduling task ${it.taskName} at $next (now is $zonedNow).")
+ it.nextScheduledExecutionSec = next.get().toEpochSecond()
}
- val nowSec = Instant.now().epochSecond
- // Second, find tasks that are due
- val dueTasks = transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
- }.map {
- TaskSchedule(it.id.value, it.taskName, it.taskType, it.resourceType, it.resourceId, it.taskParams)
- }
- } // Execute those due tasks and reset to null the next execution time.
- dueTasks.forEach {
- runTask(httpClient, it)
- transaction {
- val t = NexusScheduledTaskEntity.findById(it.taskId)
- if (t != null) {
- // Reset next scheduled execution
- t.nextScheduledExecutionSec = null
- t.prevScheduledExecutionSec = nowSec
- }
+ }
+ val nowSec = Instant.now().epochSecond
+ // Second, find tasks that are due
+ val dueTasks = transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec
+ }.map {
+ TaskSchedule(it.id.value, it.taskName, it.taskType, it.resourceType, it.resourceId, it.taskParams)
+ }
+ } // Execute those due tasks and reset to null the next execution time.
+ dueTasks.forEach {
+ runTask(httpClient, it)
+ transaction {
+ val t = NexusScheduledTaskEntity.findById(it.taskId)
+ if (t != null) {
+ // Reset next scheduled execution
+ t.nextScheduledExecutionSec = null
+ t.prevScheduledExecutionSec = nowSec
}
}
+ }
+
+}
+suspend fun whileTrueOperationScheduler(httpClient: HttpClient) {
+ while (true) {
+ operationScheduler(httpClient)
// Wait a bit
delay(Duration.ofSeconds(1))
}
diff --git a/nexus/src/test/kotlin/SchedulingTesting.kt b/nexus/src/test/kotlin/SchedulingTesting.kt
@@ -0,0 +1,42 @@
+import io.ktor.client.*
+import io.ktor.server.testing.*
+import kotlinx.coroutines.*
+import org.junit.Ignore
+import org.junit.Test
+import tech.libeufin.nexus.whileTrueOperationScheduler
+import tech.libeufin.sandbox.sandboxApp
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class SchedulingTesting {
+ // Testing the 'sleep' technique of the scheduler, to watch with TOP(1)
+ @Ignore // Just an experimental piece. No assertion takes place, nor its logic is used anywhere.
+ @Test
+ fun sleep1SecWithDelay() {
+ val sched = Executors.newScheduledThreadPool(1)
+ sched.scheduleAtFixedRate(
+ { println(".") },
+ 1,
+ 1,
+ TimeUnit.SECONDS
+ )
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+ // Launching the scheduler to measure its perf with TOP(1)
+ @Test
+ fun normalOperation() {
+ withTestDatabase {
+ prepNexusDb()
+ prepSandboxDb()
+ testApplication {
+ application(sandboxApp)
+ whileTrueOperationScheduler(client)
+ }
+ }
+ runBlocking {
+ launch { awaitCancellation() }
+ }
+ }
+}
+\ No newline at end of file