diff options
-rw-r--r-- | sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt | 51 | ||||
-rw-r--r-- | sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt | 14 |
2 files changed, 37 insertions, 28 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt index 31ad5b88..46a9edd5 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt @@ -1,15 +1,12 @@ package tech.libeufin.sandbox import CamtBankAccountEntry -import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.jsonMapper import io.ktor.client.* import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* -import io.ktor.utils.io.jvm.javaio.* import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.and @@ -230,7 +227,9 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX) * on the 'watchedBankAccount' and submits the related cash-out payment * to Nexus. The fiat payment will then take place ENTIRELY on Nexus' - * responsibility. + * responsibility. NOTE: This function is NOT supposed to be stopped, + * and if it returns, is to signal one fatal error and the caller has to + * handle it. */ suspend fun cashoutMonitor( httpClient: HttpClient, @@ -266,15 +265,15 @@ suspend fun cashoutMonitor( val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus) val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus) val paymentInitEndpoint = nexusBaseUrl.run { - var ret = this - if (!ret.endsWith('/')) - ret += '/' + var nexusBaseUrlFromConfig = this + if (!nexusBaseUrlFromConfig.endsWith('/')) + nexusBaseUrlFromConfig += '/' /** * WARNING: Nexus gives the possibility to have bank account names * DIFFERENT from their owner's username. Sandbox however MUST have * its Nexus bank account named THE SAME as its username. */ - ret + "bank-accounts/$usernameAtNexus/payment-initiations" + nexusBaseUrlFromConfig + "bank-accounts/$usernameAtNexus/payment-initiations" } while (true) { val listenHandle = PostgresListenHandle(eventChannel) @@ -284,17 +283,24 @@ suspend fun cashoutMonitor( // arrived _before_ the LISTEN. var newTxs = getUnsubmittedTransactions(watchedBankAccount) // Data found, UNLISTEN. - if (newTxs.isNotEmpty()) + if (newTxs.isNotEmpty()) { + logger.debug("Found cash-out's without waiting any DB event.") listenHandle.postgresUnlisten() + } // Data not found, wait. else { + logger.debug("Need to wait a DB event for new cash-out's") val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout) if (isNotificationArrived && listenHandle.receivedPayload == "CRDT") newTxs = getUnsubmittedTransactions(watchedBankAccount) } - if (newTxs.isEmpty()) + if (newTxs.isEmpty()) { + logger.debug("DB event timeout expired") continue + } + logger.debug("POSTing new cash-out's") newTxs.forEach { + logger.debug("POSTing cash-out '${it.subject}' to $paymentInitEndpoint") val body = object { /** * This field is UID of the request _as assigned by the @@ -306,7 +312,7 @@ suspend fun cashoutMonitor( val uid = it.accountServicerReference val iban = it.creditorIban val bic = it.creditorBic - val amount = "${config.cashoutCurrency}:${it.amount}" // FIXME: need fiat currency here. + val amount = "${config.cashoutCurrency}:${it.amount}" val subject = it.subject val name = it.creditorName } @@ -322,25 +328,36 @@ suspend fun cashoutMonitor( catch (e: Exception) { logger.error("Cash-out monitor could not reach Nexus. Pause and retry") logger.error(e.message) + /** + * Explicit delaying because the monitor normally + * waits on DB events, and this retry likely won't + * wait on a DB event. + */ delay(2000) return@forEach } // Server fault. Pause and retry. if (resp.status.value.toString().startsWith('5')) { logger.error("Cash-out monitor POSTed to a failing Nexus. Pause and retry") - logger.error(resp.bodyAsText()) + logger.error("Server responded: ${resp.bodyAsText()}") + /** + * Explicit delaying because the monitor normally + * waits on DB events, and this retry likely won't + * wait on a DB event. + */ delay(2000L) + return@forEach } // Client fault, fail Sandbox. if (resp.status.value.toString().startsWith('4')) { - logger.error("Cash-out monitor failed at POSTing to Nexus. Fail Sandbox") + logger.error("Cash-out monitor failed at POSTing to Nexus. Returning the cash-out monitor") logger.error("Nexus responded: ${resp.bodyAsText()}") - exitProcess(1) + return // fatal error, the caller handles it. } // Expecting 200 OK. What if 3xx? if (resp.status.value != HttpStatusCode.OK.value) { - logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Fail Sandbox") - exitProcess(1) + logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Returning the cash-out monitor") + return // fatal error, the caller handles it. } // Successful case, mark the wire transfer as submitted, // and advance the pointer to the last submitted payment. @@ -348,9 +365,7 @@ suspend fun cashoutMonitor( transaction { CashoutSubmissionEntity.new { localTransaction = it.id - hasErrors = false submissionTime = resp.responseTime.timestamp - isSubmitted = true /** * The following block associates the submitted payment * to the UID that Nexus assigned to it. It is currently not diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt index 57e00f77..d9fb393b 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt @@ -648,17 +648,13 @@ object BankAccountReportsTable : IntIdTable() { } /** - * This table tracks the submissions of fiat payment instructions - * that Sandbox sends to Nexus. Every fiat payment instruction is - * related to a confirmed cash-out operation. The cash-out confirmation - * is effective once the customer sends a local wire transfer to the - * "admin" bank account. Such wire transfer is tracked by the 'localTransaction' - * column. + * This table tracks the cash-out requests that Sandbox sends to Nexus. + * Only successful requests make it to this table. Failed request would + * either _stop_ the conversion service (for client-side errors) or get retried + * at a later time (for server-side errors.) */ object CashoutSubmissionsTable: LongIdTable() { val localTransaction = reference("localTransaction", BankAccountTransactionsTable).uniqueIndex() - val isSubmitted = bool("isSubmitted").default(false) - val hasErrors = bool("hasErrors") val maybeNexusResponse = text("maybeNexusResponse").nullable() val submissionTime = long("submissionTime").nullable() // failed don't have it. } @@ -666,8 +662,6 @@ object CashoutSubmissionsTable: LongIdTable() { class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) { companion object : LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable) var localTransaction by CashoutSubmissionsTable.localTransaction - var isSubmitted by CashoutSubmissionsTable.isSubmitted - var hasErrors by CashoutSubmissionsTable.hasErrors var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse var submissionTime by CashoutSubmissionsTable.submissionTime } |