libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit c696a3748b72eb5eb83f576885b3ea28d9c814bc
parent 38e336736a10a7fd34be9cea6f30f2f0dc38bcb4
Author: MS <ms@taler.net>
Date:   Wed,  8 Feb 2023 14:15:19 +0100

background jobs

fix invocation of root coroutine.

Diffstat:
Mnexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 10++++++----
Mnexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt | 88+++++++++++++++++++++++++++++++++++++------------------------------------------
2 files changed, 47 insertions(+), 51 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -30,6 +30,10 @@ import com.github.ajalt.clikt.parameters.types.int import execThrowableOrTerminate import com.github.ajalt.clikt.core.* import com.github.ajalt.clikt.parameters.options.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext import startServer import tech.libeufin.nexus.iso20022.parseCamtMessage import tech.libeufin.nexus.server.client @@ -70,10 +74,8 @@ class Serve : CliktCommand("Run nexus HTTP server") { private val logLevel by option() override fun run() { setLogLevel(logLevel) - execThrowableOrTerminate { - dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) - } - startOperationScheduler(client) + execThrowableOrTerminate { dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) } + CoroutineScope(Dispatchers.IO).launch(fallback) { startOperationScheduler(client) } if (withUnixSocket != null) { startServer( withUnixSocket!!, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt @@ -24,9 +24,8 @@ import com.cronutils.model.time.ExecutionTime import com.cronutils.parser.CronParser import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.client.HttpClient -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.* +import kotlinx.coroutines.GlobalScope.coroutineContext import kotlinx.coroutines.time.delay import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions @@ -36,6 +35,7 @@ import java.lang.IllegalArgumentException import java.time.Duration import java.time.Instant import java.time.ZonedDateTime +import kotlin.coroutines.coroutineContext import kotlin.system.exitProcess private data class TaskSchedule( @@ -96,59 +96,53 @@ object NexusCron { CronParser(cronDefinition) } } -/** - * Fails whenever a unmanaged Throwable reaches the root coroutine. - */ + +// Fails whenever a unmanaged Throwable reaches the root coroutine. val fallback = CoroutineExceptionHandler { _, err -> logger.error(err.stackTraceToString()) exitProcess(1) } -fun startOperationScheduler(httpClient: HttpClient) { - GlobalScope.launch(fallback) { - while (true) { - // First, assign next execution time stamps to all tasks that need them - transaction { - NexusScheduledTaskEntity.find { - NexusScheduledTasksTable.nextScheduledExecutionSec.isNull() - }.forEach { - val cron = try { - NexusCron.parser.parse(it.taskCronspec) - } catch (e: IllegalArgumentException) { - logger.error("invalid cronspec in schedule ${it.resourceType}/${it.resourceId}/${it.taskName}") - return@forEach - } - val zonedNow = ZonedDateTime.now() - val et = ExecutionTime.forCron(cron) - val next = et.nextExecution(zonedNow) - logger.info("scheduling task ${it.taskName} at $next (now is $zonedNow)") - it.nextScheduledExecutionSec = next.get().toEpochSecond() +suspend fun startOperationScheduler(httpClient: HttpClient) { + while (true) { + // First, assign next execution time stamps to all tasks that need them + transaction { + NexusScheduledTaskEntity.find { + NexusScheduledTasksTable.nextScheduledExecutionSec.isNull() + }.forEach { + val cron = try { + NexusCron.parser.parse(it.taskCronspec) + } catch (e: IllegalArgumentException) { + logger.error("invalid cronspec in schedule ${it.resourceType}/${it.resourceId}/${it.taskName}") + return@forEach } + val zonedNow = ZonedDateTime.now() + val et = ExecutionTime.forCron(cron) + val next = et.nextExecution(zonedNow) + logger.info("scheduling task ${it.taskName} at $next (now is $zonedNow)") + it.nextScheduledExecutionSec = next.get().toEpochSecond() } - - val nowSec = Instant.now().epochSecond - // Second, find tasks that are due - val dueTasks = transaction { - NexusScheduledTaskEntity.find { - NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec - }.map { - TaskSchedule(it.id.value, it.taskName, it.taskType, it.resourceType, it.resourceId, it.taskParams) - } + } + val nowSec = Instant.now().epochSecond + // Second, find tasks that are due + val dueTasks = transaction { + NexusScheduledTaskEntity.find { + NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec + }.map { + TaskSchedule(it.id.value, it.taskName, it.taskType, it.resourceType, it.resourceId, it.taskParams) } - // Execute those due tasks - dueTasks.forEach { - runTask(httpClient, it) - transaction { - val t = NexusScheduledTaskEntity.findById(it.taskId) - if (t != null) { - // Reset next scheduled execution - t.nextScheduledExecutionSec = null - t.prevScheduledExecutionSec = nowSec - } + } // Execute those due tasks + dueTasks.forEach { + runTask(httpClient, it) + transaction { + val t = NexusScheduledTaskEntity.findById(it.taskId) + if (t != null) { + // Reset next scheduled execution + t.nextScheduledExecutionSec = null + t.prevScheduledExecutionSec = nowSec } } - - // Wait a bit - delay(Duration.ofSeconds(1)) } + // Wait a bit + delay(Duration.ofSeconds(1)) } } \ No newline at end of file