libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 88302c6447139a4687c43cc1837bddf3b3eeae06
parent 75771006882617149ef31152462e90b412ca9acb
Author: Florian Dold <florian.dold@gmail.com>
Date:   Fri, 19 Jun 2020 16:22:19 +0530

implement fancy scheduling

Diffstat:
Mnexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 4++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt | 151++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt | 10++++------
Mnexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt | 48++++++++++++++++++++++++++++++++++++++++--------
4 files changed, 143 insertions(+), 70 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -343,6 +343,8 @@ object NexusScheduledTasksTable : IntIdTable() { val taskType = text("taskType") val taskCronspec = text("taskCronspec") val taskParams = text("taskParams") + val nextScheduledExecutionSec = long("nextScheduledExecutionSec").nullable() + val prevScheduledExecutionSec = long("lastScheduledExecutionSec").nullable() } class NexusScheduledTaskEntity(id: EntityID<Int>) : IntEntity(id) { @@ -354,6 +356,8 @@ class NexusScheduledTaskEntity(id: EntityID<Int>) : IntEntity(id) { var taskType by NexusScheduledTasksTable.taskType var taskCronspec by NexusScheduledTasksTable.taskCronspec var taskParams by NexusScheduledTasksTable.taskParams + var nextScheduledExecutionSec by NexusScheduledTasksTable.nextScheduledExecutionSec + var prevScheduledExecutionSec by NexusScheduledTasksTable.prevScheduledExecutionSec } fun dbCreateTables(dbName: String) { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt @@ -19,83 +19,122 @@ package tech.libeufin.nexus +import com.cronutils.model.definition.CronDefinitionBuilder +import com.cronutils.model.time.ExecutionTime +import com.cronutils.parser.CronParser +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.client.HttpClient import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import kotlinx.coroutines.time.delay import org.jetbrains.exposed.sql.transactions.transaction -import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal +import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions import tech.libeufin.nexus.bankaccount.submitAllPaymentInitiations -import tech.libeufin.nexus.server.FetchLevel import tech.libeufin.nexus.server.FetchSpecJson -import tech.libeufin.nexus.server.FetchSpecLatestJson -import java.io.PrintWriter -import java.io.StringWriter +import java.lang.IllegalArgumentException import java.time.Duration +import java.time.Instant +import java.time.ZonedDateTime -/** Crawls all the facades, and requests history for each of its creators. */ -suspend fun downloadTalerFacadesTransactions(httpClient: HttpClient, fetchSpec: FetchSpecJson) { - val work = mutableListOf<Pair<String, String>>() - transaction { - TalerFacadeStateEntity.all().forEach { - logger.debug("Fetching history for facade: ${it.id.value}, bank account: ${it.bankAccount}") - work.add(Pair(it.facade.creator.id.value, it.bankAccount)) +private data class TaskSchedule( + val taskId: Int, + val name: String, + val type: String, + val resourceType: String, + val resourceId: String, + val params: String +) + +private suspend fun runTask(client: HttpClient, sched: TaskSchedule) { + logger.info("running task $sched") + try { + + when (sched.resourceType) { + "bank-account" -> { + when (sched.type) { + "fetch" -> { + @Suppress("BlockingMethodInNonBlockingContext") + val fetchSpec = jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java) + fetchBankAccountTransactions(client, fetchSpec, sched.resourceId) + } + "submit" -> { + submitAllPaymentInitiations(client, sched.resourceId) + } + else -> { + logger.error("task type ${sched.type} not understood") + } + } + } + else -> logger.error("task on resource ${sched.resourceType} not understood") } - } - work.forEach { - fetchTransactionsInternal( - client = httpClient, - fetchSpec = fetchSpec, - userId = it.first, - accountid = it.second - ) + } catch (e: Exception) { + logger.error("Exception during task $sched", e) } } - -private inline fun reportAndIgnoreErrors(f: () -> Unit) { - try { - f() - } catch (e: java.lang.Exception) { - logger.error("ignoring exception", e) +object NexusCron { + val parser = run { + val cronDefinition = + CronDefinitionBuilder.defineCron() + .withSeconds().and() + .withMinutes().optional().and() + .withDayOfMonth().optional().and() + .withMonth().optional().and() + .withDayOfWeek().optional() + .and().instance() + CronParser(cronDefinition) } } -fun moreFrequentBackgroundTasks(httpClient: HttpClient) { +fun startOperationScheduler(httpClient: HttpClient) { GlobalScope.launch { while (true) { - logger.debug("Running more frequent background jobs") - reportAndIgnoreErrors { - downloadTalerFacadesTransactions( - httpClient, - FetchSpecLatestJson( - FetchLevel.ALL, - null - ) - ) + logger.info("running schedule loop") + + // First, assign next execution time stamps to all tasks that need them + transaction { + NexusScheduledTaskEntity.find { + NexusScheduledTasksTable.nextScheduledExecutionSec.isNull() + }.forEach { + val cron = try { + NexusCron.parser.parse(it.taskCronspec) + } catch (e: IllegalArgumentException) { + logger.error("invalid cronspec in schedule ${it.resourceType}/${it.resourceId}/${it.taskName}") + return@forEach + } + val zonedNow = ZonedDateTime.now() + val et = ExecutionTime.forCron(cron) + val next = et.nextExecution(zonedNow) + logger.info("scheduling task ${it.taskName} at $next (now is $zonedNow)") + it.nextScheduledExecutionSec = next.get().toEpochSecond() + } } - // FIXME: should be done automatically after raw ingestion - reportAndIgnoreErrors { ingestTalerTransactions() } - reportAndIgnoreErrors { submitAllPaymentInitiations(httpClient) } - logger.debug("More frequent background jobs done") - delay(Duration.ofSeconds(1)) - } - } -} -fun lessFrequentBackgroundTasks(httpClient: HttpClient) { - GlobalScope.launch { - while (true) { - logger.debug("Less frequent background job") - try { - //downloadTalerFacadesTransactions(httpClient, "C53") - } catch (e: Exception) { - val sw = StringWriter() - val pw = PrintWriter(sw) - e.printStackTrace(pw) - logger.info("==== Less frequent background task exception ====\n${sw}======") + val nowSec = Instant.now().epochSecond + // Second, find tasks that are due + val dueTasks = transaction { + NexusScheduledTaskEntity.find { + NexusScheduledTasksTable.nextScheduledExecutionSec lessEq nowSec + }.map { + TaskSchedule(it.id.value, it.taskName, it.taskType, it.resourceType, it.resourceId, it.taskParams) + } } - delay(Duration.ofSeconds(10)) + + // Execute those due tasks + dueTasks.forEach { + runTask(httpClient, it) + transaction { + val t = NexusScheduledTaskEntity.findById(it.taskId) + if (t != null) { + // Reset next scheduled execution + t.nextScheduledExecutionSec = null + t.prevScheduledExecutionSec = nowSec + } + } + } + + // Wait a bit + delay(Duration.ofSeconds(1)) } } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt @@ -58,7 +58,7 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: /** * Submit all pending prepared payments. */ -suspend fun submitAllPaymentInitiations(httpClient: HttpClient) { +suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: String) { data class Submission( val id: Long ) @@ -233,10 +233,9 @@ fun addPaymentInitiation(paymentData: Pain001Data, debitorAccount: NexusBankAcco } } -suspend fun fetchTransactionsInternal( +suspend fun fetchBankAccountTransactions( client: HttpClient, fetchSpec: FetchSpecJson, - userId: String, accountid: String ) { val res = transaction { @@ -261,18 +260,17 @@ suspend fun fetchTransactionsInternal( } when (res.connectionType) { "ebics" -> { - // FIXME(dold): Support fetching not only the latest transactions. - // It's not clear what's the nicest way to support this. fetchEbicsBySpec( fetchSpec, client, res.connectionName ) - ingestBankMessagesIntoAccount(res.connectionName, accountid) } else -> throw NexusError( HttpStatusCode.BadRequest, "Connection type '${res.connectionType}' not implemented" ) } + ingestBankMessagesIntoAccount(res.connectionName, accountid) + ingestTalerTransactions() } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -53,12 +53,13 @@ import org.jetbrains.exposed.sql.transactions.transaction import org.slf4j.event.Level import tech.libeufin.nexus.* import tech.libeufin.nexus.bankaccount.addPaymentInitiation -import tech.libeufin.nexus.bankaccount.fetchTransactionsInternal +import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions import tech.libeufin.nexus.bankaccount.getPaymentInitiation import tech.libeufin.nexus.bankaccount.submitPaymentInitiation import tech.libeufin.nexus.ebics.* import tech.libeufin.util.* import tech.libeufin.util.logger +import java.lang.IllegalArgumentException import java.net.URLEncoder import java.util.zip.InflaterInputStream @@ -260,8 +261,7 @@ fun serverMain(dbName: String) { return@intercept } - lessFrequentBackgroundTasks(client) - moreFrequentBackgroundTasks(client) + startOperationScheduler(client) routing { // Shows information about the requesting user. @@ -379,6 +379,11 @@ fun serverMain(dbName: String) { if (bankAccount == null) { throw NexusError(HttpStatusCode.NotFound, "unknown bank account") } + try { + NexusCron.parser.parse(schedSpec.cronspec) + } catch (e: IllegalArgumentException) { + throw NexusError(HttpStatusCode.BadRequest, "bad cron spec: ${e.message}") + } when (schedSpec.type) { "fetch" -> { val fetchSpec = jacksonObjectMapper().treeToValue(schedSpec.params, FetchSpecJson::class.java) @@ -386,8 +391,18 @@ fun serverMain(dbName: String) { throw NexusError(HttpStatusCode.BadRequest, "bad fetch spec") } } + "submit" -> {} else -> throw NexusError(HttpStatusCode.BadRequest, "unsupported task type") } + val oldSchedTask = NexusScheduledTaskEntity.find { + (NexusScheduledTasksTable.taskName eq schedSpec.name) and + (NexusScheduledTasksTable.resourceType eq "bank-account") and + (NexusScheduledTasksTable.resourceId eq accountId) + + }.firstOrNull() + if (oldSchedTask != null) { + throw NexusError(HttpStatusCode.BadRequest, "schedule task already exists") + } NexusScheduledTaskEntity.new { resourceType = "bank-account" resourceId = accountId @@ -401,12 +416,30 @@ fun serverMain(dbName: String) { call.respond(object { }) } - get("/bank-accounts/{accountid}/schedule/{taskid}") { - + get("/bank-accounts/{accountId}/schedule/{taskId}") { + call.respond(object { }) } - delete("/bank-accounts/{accountid}/schedule/{taskid}") { + delete("/bank-accounts/{accountId}/schedule/{taskId}") { + logger.info("schedule delete requested") + val accountId = ensureNonNull(call.parameters["accountId"]) + val taskId = ensureNonNull(call.parameters["taskId"]) + transaction { + val bankAccount = NexusBankAccountEntity.findById(accountId) + if (bankAccount == null) { + throw NexusError(HttpStatusCode.NotFound, "unknown bank account") + } + val oldSchedTask = NexusScheduledTaskEntity.find { + (NexusScheduledTasksTable.taskName eq taskId) and + (NexusScheduledTasksTable.resourceType eq "bank-account") and + (NexusScheduledTasksTable.resourceId eq accountId) + }.firstOrNull() + if (oldSchedTask != null) { + oldSchedTask.delete() + } + } + call.respond(object { }) } get("/bank-accounts/{accountid}") { @@ -517,10 +550,9 @@ fun serverMain(dbName: String) { null ) } - fetchTransactionsInternal( + fetchBankAccountTransactions( client, fetchSpec, - user.id.value, accountid ) call.respondText("Collection performed")