summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt51
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt14
2 files changed, 37 insertions, 28 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index 31ad5b88..46a9edd5 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,15 +1,12 @@
package tech.libeufin.sandbox
import CamtBankAccountEntry
-import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.jsonMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
-import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
@@ -230,7 +227,9 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou
* This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
* on the 'watchedBankAccount' and submits the related cash-out payment
* to Nexus. The fiat payment will then take place ENTIRELY on Nexus'
- * responsibility.
+ * responsibility. NOTE: This function is NOT supposed to be stopped,
+ * and if it returns, is to signal one fatal error and the caller has to
+ * handle it.
*/
suspend fun cashoutMonitor(
httpClient: HttpClient,
@@ -266,15 +265,15 @@ suspend fun cashoutMonitor(
val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
val paymentInitEndpoint = nexusBaseUrl.run {
- var ret = this
- if (!ret.endsWith('/'))
- ret += '/'
+ var nexusBaseUrlFromConfig = this
+ if (!nexusBaseUrlFromConfig.endsWith('/'))
+ nexusBaseUrlFromConfig += '/'
/**
* WARNING: Nexus gives the possibility to have bank account names
* DIFFERENT from their owner's username. Sandbox however MUST have
* its Nexus bank account named THE SAME as its username.
*/
- ret + "bank-accounts/$usernameAtNexus/payment-initiations"
+ nexusBaseUrlFromConfig + "bank-accounts/$usernameAtNexus/payment-initiations"
}
while (true) {
val listenHandle = PostgresListenHandle(eventChannel)
@@ -284,17 +283,24 @@ suspend fun cashoutMonitor(
// arrived _before_ the LISTEN.
var newTxs = getUnsubmittedTransactions(watchedBankAccount)
// Data found, UNLISTEN.
- if (newTxs.isNotEmpty())
+ if (newTxs.isNotEmpty()) {
+ logger.debug("Found cash-out's without waiting any DB event.")
listenHandle.postgresUnlisten()
+ }
// Data not found, wait.
else {
+ logger.debug("Need to wait a DB event for new cash-out's")
val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout)
if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
newTxs = getUnsubmittedTransactions(watchedBankAccount)
}
- if (newTxs.isEmpty())
+ if (newTxs.isEmpty()) {
+ logger.debug("DB event timeout expired")
continue
+ }
+ logger.debug("POSTing new cash-out's")
newTxs.forEach {
+ logger.debug("POSTing cash-out '${it.subject}' to $paymentInitEndpoint")
val body = object {
/**
* This field is UID of the request _as assigned by the
@@ -306,7 +312,7 @@ suspend fun cashoutMonitor(
val uid = it.accountServicerReference
val iban = it.creditorIban
val bic = it.creditorBic
- val amount = "${config.cashoutCurrency}:${it.amount}" // FIXME: need fiat currency here.
+ val amount = "${config.cashoutCurrency}:${it.amount}"
val subject = it.subject
val name = it.creditorName
}
@@ -322,25 +328,36 @@ suspend fun cashoutMonitor(
catch (e: Exception) {
logger.error("Cash-out monitor could not reach Nexus. Pause and retry")
logger.error(e.message)
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000)
return@forEach
}
// Server fault. Pause and retry.
if (resp.status.value.toString().startsWith('5')) {
logger.error("Cash-out monitor POSTed to a failing Nexus. Pause and retry")
- logger.error(resp.bodyAsText())
+ logger.error("Server responded: ${resp.bodyAsText()}")
+ /**
+ * Explicit delaying because the monitor normally
+ * waits on DB events, and this retry likely won't
+ * wait on a DB event.
+ */
delay(2000L)
+ return@forEach
}
// Client fault, fail Sandbox.
if (resp.status.value.toString().startsWith('4')) {
- logger.error("Cash-out monitor failed at POSTing to Nexus. Fail Sandbox")
+ logger.error("Cash-out monitor failed at POSTing to Nexus. Returning the cash-out monitor")
logger.error("Nexus responded: ${resp.bodyAsText()}")
- exitProcess(1)
+ return // fatal error, the caller handles it.
}
// Expecting 200 OK. What if 3xx?
if (resp.status.value != HttpStatusCode.OK.value) {
- logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Fail Sandbox")
- exitProcess(1)
+ logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Returning the cash-out monitor")
+ return // fatal error, the caller handles it.
}
// Successful case, mark the wire transfer as submitted,
// and advance the pointer to the last submitted payment.
@@ -348,9 +365,7 @@ suspend fun cashoutMonitor(
transaction {
CashoutSubmissionEntity.new {
localTransaction = it.id
- hasErrors = false
submissionTime = resp.responseTime.timestamp
- isSubmitted = true
/**
* The following block associates the submitted payment
* to the UID that Nexus assigned to it. It is currently not
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index 57e00f77..d9fb393b 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -648,17 +648,13 @@ object BankAccountReportsTable : IntIdTable() {
}
/**
- * This table tracks the submissions of fiat payment instructions
- * that Sandbox sends to Nexus. Every fiat payment instruction is
- * related to a confirmed cash-out operation. The cash-out confirmation
- * is effective once the customer sends a local wire transfer to the
- * "admin" bank account. Such wire transfer is tracked by the 'localTransaction'
- * column.
+ * This table tracks the cash-out requests that Sandbox sends to Nexus.
+ * Only successful requests make it to this table. Failed request would
+ * either _stop_ the conversion service (for client-side errors) or get retried
+ * at a later time (for server-side errors.)
*/
object CashoutSubmissionsTable: LongIdTable() {
val localTransaction = reference("localTransaction", BankAccountTransactionsTable).uniqueIndex()
- val isSubmitted = bool("isSubmitted").default(false)
- val hasErrors = bool("hasErrors")
val maybeNexusResponse = text("maybeNexusResponse").nullable()
val submissionTime = long("submissionTime").nullable() // failed don't have it.
}
@@ -666,8 +662,6 @@ object CashoutSubmissionsTable: LongIdTable() {
class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) {
companion object : LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable)
var localTransaction by CashoutSubmissionsTable.localTransaction
- var isSubmitted by CashoutSubmissionsTable.isSubmitted
- var hasErrors by CashoutSubmissionsTable.hasErrors
var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse
var submissionTime by CashoutSubmissionsTable.submissionTime
}