summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-05-22 12:12:19 +0200
committerMS <ms@taler.net>2023-05-22 12:12:19 +0200
commit14718fdb0635f67663670f64999acdebb3793482 (patch)
treedb1b3efaa1df41bba8a082fe6f98a1c16f0bf1b2 /sandbox/src/main/kotlin/tech/libeufin
parent68a60b689ef636c46498755be39e8046b4f7fd6e (diff)
downloadlibeufin-14718fdb0635f67663670f64999acdebb3793482.tar.gz
libeufin-14718fdb0635f67663670f64999acdebb3793482.tar.bz2
libeufin-14718fdb0635f67663670f64999acdebb3793482.zip
Conversion service.
Removing error accounting in the database, since only successful requests are supposed to be accounted. Failed request would either stop the service (when the problem is on the client), or be retried (when the server responded 5xx).
Diffstat (limited to 'sandbox/src/main/kotlin/tech/libeufin')
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt51
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt14
2 files changed, 37 insertions, 28 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index 31ad5b88..46a9edd5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,15 +1,12 @@
package tech.libeufin.sandbox
import CamtBankAccountEntry
-import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.jsonMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
-import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
@@ -230,7 +227,9 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou
* This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
* on the 'watchedBankAccount' and submits the related cash-out payment
* to Nexus. The fiat payment will then take place ENTIRELY on Nexus'
- * responsibility.
+ * responsibility. NOTE: This function is NOT supposed to be stopped,
+ * and if it returns, is to signal one fatal error and the caller has to
+ * handle it.
*/
suspend fun cashoutMonitor(
httpClient: HttpClient,
@@ -266,15 +265,15 @@ suspend fun cashoutMonitor(
val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
val paymentInitEndpoint = nexusBaseUrl.run {
- var ret = this
- if (!ret.endsWith('/'))
- ret += '/'
+ var nexusBaseUrlFromConfig = this
+ if (!nexusBaseUrlFromConfig.endsWith('/'))
+ nexusBaseUrlFromConfig += '/'
/**
* WARNING: Nexus gives the possibility to have bank account names
* DIFFERENT from their owner's username. Sandbox however MUST have
* its Nexus bank account named THE SAME as its username.
*/
- ret + "bank-accounts/$usernameAtNexus/payment-initiations"
+ nexusBaseUrlFromConfig + "bank-accounts/$usernameAtNexus/payment-initiations"
}
while (true) {
val listenHandle = PostgresListenHandle(eventChannel)
@@ -284,17 +283,24 @@ suspend fun cashoutMonitor(
// arrived _before_ the LISTEN.
var newTxs = getUnsubmittedTransactions(watchedBankAccount)
// Data found, UNLISTEN.
- if (newTxs.isNotEmpty())
+ if (newTxs.isNotEmpty()) {
+ logger.debug("Found cash-out's without waiting any DB event.")
listenHandle.postgresUnlisten()
+ }
// Data not found, wait.
else {
+ logger.debug("Need to wait a DB event for new cash-out's")
val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout)
if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
newTxs = getUnsubmittedTransactions(watchedBankAccount)
}
- if (newTxs.isEmpty())
+ if (newTxs.isEmpty()) {
+ logger.debug("DB event timeout expired")
continue
+ }
+ logger.debug("POSTing new cash-out's")
newTxs.forEach {
+ logger.debug("POSTing cash-out '${it.subject}' to $paymentInitEndpoint")
val body = object {
/**
* This field is UID of the request _as assigned by the
@@ -306,7 +312,7 @@ suspend fun cashoutMonitor(
val uid = it.accountServicerReference
val iban = it.creditorIban
val bic = it.creditorBic
- val amount = "${config.cashoutCurrency}:${it.amount}" // FIXME: need fiat currency here.
+ val amount = "${config.cashoutCurrency}:${it.amount}"
val subject = it.subject
val name = it.creditorName
}
@@ -322,25 +328,36 @@ suspend fun cashoutMonitor(
catch (e: Exception) {
logger.error("Cash-out monitor could not reach Nexus. Pause and retry")
logger.error(e.message)
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000)
return@forEach
}
// Server fault. Pause and retry.
if (resp.status.value.toString().startsWith('5')) {
logger.error("Cash-out monitor POSTed to a failing Nexus. Pause and retry")
- logger.error(resp.bodyAsText())
+ logger.error("Server responded: ${resp.bodyAsText()}")
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000L)
+ return@forEach
}
// Client fault, fail Sandbox.
if (resp.status.value.toString().startsWith('4')) {
- logger.error("Cash-out monitor failed at POSTing to Nexus. Fail Sandbox")
+ logger.error("Cash-out monitor failed at POSTing to Nexus. Returning the cash-out monitor")
logger.error("Nexus responded: ${resp.bodyAsText()}")
- exitProcess(1)
+ return // fatal error, the caller handles it.
}
// Expecting 200 OK. What if 3xx?
if (resp.status.value != HttpStatusCode.OK.value) {
- logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Fail Sandbox")
- exitProcess(1)
+ logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Returning the cash-out monitor")
+ return // fatal error, the caller handles it.
}
// Successful case, mark the wire transfer as submitted,
// and advance the pointer to the last submitted payment.
@@ -348,9 +365,7 @@ suspend fun cashoutMonitor(
transaction {
CashoutSubmissionEntity.new {
localTransaction = it.id
- hasErrors = false
submissionTime = resp.responseTime.timestamp
- isSubmitted = true
/**
* The following block associates the submitted payment
* to the UID that Nexus assigned to it. It is currently not
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index 57e00f77..d9fb393b 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -648,17 +648,13 @@ object BankAccountReportsTable : IntIdTable() {
}
/**
- * This table tracks the submissions of fiat payment instructions
- * that Sandbox sends to Nexus. Every fiat payment instruction is
- * related to a confirmed cash-out operation. The cash-out confirmation
- * is effective once the customer sends a local wire transfer to the
- * "admin" bank account. Such wire transfer is tracked by the 'localTransaction'
- * column.
+ * This table tracks the cash-out requests that Sandbox sends to Nexus.
+ * Only successful requests make it to this table. Failed request would
+ * either _stop_ the conversion service (for client-side errors) or get retried
+ * at a later time (for server-side errors.)
*/
object CashoutSubmissionsTable: LongIdTable() {
val localTransaction = reference("localTransaction", BankAccountTransactionsTable).uniqueIndex()
- val isSubmitted = bool("isSubmitted").default(false)
- val hasErrors = bool("hasErrors")
val maybeNexusResponse = text("maybeNexusResponse").nullable()
val submissionTime = long("submissionTime").nullable() // failed don't have it.
}
@@ -666,8 +662,6 @@ object CashoutSubmissionsTable: LongIdTable() {
class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) {
companion object : LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable)
var localTransaction by CashoutSubmissionsTable.localTransaction
- var isSubmitted by CashoutSubmissionsTable.isSubmitted
- var hasErrors by CashoutSubmissionsTable.hasErrors
var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse
var submissionTime by CashoutSubmissionsTable.submissionTime
}