commit 14aa16b1040b81ee6c552af8ef0cb258b6abefe0
parent 51c83a249ae92cb8cb53af224b56005d60d5e56c
Author: MS <ms@taler.net>
Date: Mon, 1 Jun 2020 14:22:39 +0200
Scheduler.
Allowing the scheduling of a coroutine from within
a transaction block.
Diffstat:
5 files changed, 15 insertions(+), 22 deletions(-)
diff --git a/integration-tests/test-ebics-backup.py b/integration-tests/test-ebics-backup.py
@@ -160,7 +160,6 @@ assertResponse(
)
# 1.a, make a new nexus user.
-
assertResponse(
post(
"http://localhost:5001/users",
diff --git a/integration-tests/test-ebics-highlevel.py b/integration-tests/test-ebics-highlevel.py
@@ -171,7 +171,6 @@ assertResponse(
)
# 1.a, make a new nexus user.
-
assertResponse(
post(
"http://localhost:5001/users",
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/JSON.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/JSON.kt
@@ -186,9 +186,9 @@ data class Transactions(
/** Request type of "POST /collected-transactions" */
data class CollectedTransaction(
- val transport: String?,
- val start: String?,
- val end: String?
+ val transport: String? = null,
+ val start: String? = null,
+ val end: String? = null
)
data class BankProtocolsResponse(
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -56,6 +56,7 @@ import io.ktor.server.netty.Netty
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.jvm.javaio.toByteReadChannel
import io.ktor.utils.io.jvm.javaio.toInputStream
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.delay
import org.jetbrains.exposed.sql.and
@@ -242,28 +243,23 @@ fun ApplicationRequest.hasBody(): Boolean {
}
return false
}
-suspend fun schedulePeriodicWork() {
+suspend fun schedulePeriodicWork(coroutineScope: CoroutineScope) {
while (true) {
delay(Duration.ofSeconds(1))
- // download TWG C52
- // ingest TWG new histories
- logger.debug("I am scheduled")
- downloadFacadesTransactions()
+ downloadFacadesTransactions(coroutineScope)
ingestTalerTransactions()
}
}
/** Crawls all the facades, and requests history for each of its creators. */
-suspend fun downloadFacadesTransactions() {
+suspend fun downloadFacadesTransactions(coroutineScope: CoroutineScope) {
+ val httpClient = HttpClient()
transaction {
- FacadeEntity.all()
- }.forEach {
- fetchTransactionsInternal(
- HttpClient(),
- it.creator,
- it.config.bankAccount,
- CollectedTransaction(null, null, null)
- )
+ FacadeEntity.all().forEach {
+ coroutineScope.launch {
+ fetchTransactionsInternal(httpClient, it.creator, it.config.bankAccount, CollectedTransaction())
+ }
+ }
}
}
@@ -322,7 +318,7 @@ fun serverMain(dbName: String) {
}
val server = embeddedServer(Netty, port = 5001) {
launch {
- schedulePeriodicWork()
+ schedulePeriodicWork(this)
}
install(CallLogging) {
this.level = Level.DEBUG
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
@@ -466,7 +466,6 @@ suspend fun historyOutgoing(call: ApplicationCall): Unit {
// /taler/history/incoming
suspend fun historyIncoming(call: ApplicationCall): Unit {
- val exchangeUser = authenticateRequest(call.request)
val delta: Int = expectInt(call.expectUrlParameter("delta"))
val start: Long = handleStartArgument(call.request.queryParameters["start"], delta)
val history = TalerIncomingHistory()
@@ -518,7 +517,7 @@ fun talerFacadeRoutes(route: Route) {
historyIncoming(call)
return@get
}
- route.get("/taler") {
+ route.get("") {
call.respondText("Hello Taler")
return@get
}