From 59a20971438391fb22a536080b27b4db50ea5d31 Mon Sep 17 00:00:00 2001 From: MS Date: Fri, 31 Mar 2023 14:15:41 +0200 Subject: Taler facade. Using new interface for DB notifications. --- nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 108 ++++++++------------- 1 file changed, 38 insertions(+), 70 deletions(-) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt index 4b325def..365a4ea5 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt @@ -19,7 +19,6 @@ package tech.libeufin.nexus -import UtilError import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.server.application.ApplicationCall import io.ktor.server.application.call @@ -34,15 +33,15 @@ import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post import io.ktor.server.util.* -import io.ktor.util.* -import kotlinx.coroutines.* -import net.taler.wallet.crypto.Base32Crockford +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.currentCoroutineContext import org.jetbrains.exposed.dao.Entity import org.jetbrains.exposed.dao.id.IdTable import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction -import org.postgresql.jdbc.PgConnection import tech.libeufin.nexus.bankaccount.addPaymentInitiation import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions import tech.libeufin.nexus.bankaccount.getBankAccount @@ -50,8 +49,6 @@ import tech.libeufin.nexus.iso20022.* import tech.libeufin.nexus.server.* import tech.libeufin.util.* import java.net.URL -import java.util.concurrent.atomic.AtomicReference -import javax.xml.crypto.Data import kotlin.math.abs import kotlin.math.min @@ -245,10 +242,12 @@ private suspend fun talerTransfer(call: ApplicationCall) { ) } +// Processes new transactions and stores TWG-specific data in fun talerFilter( payment: NexusBankTransactionEntity, txDtls: TransactionDetails ) { + val channelsToNotify = mutableListOf() var isInvalid = false // True when pub is invalid or duplicate. val subject = txDtls.unstructuredRemittanceInformation val debtorName = txDtls.debtor?.name @@ -324,11 +323,6 @@ fun talerFilter( HttpStatusCode.InternalServerError, "talerFilter(): unexpected execution out of a DB transaction" ) - /** - * Without COMMIT here, the woken up LISTENer won't - * find the record in the database. - */ - dbTx.commit() // Only supporting Postgres' NOTIFY. if (dbTx.isPostgres()) { val channelName = buildChannelName( @@ -339,11 +333,7 @@ fun talerFilter( " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" + " for IBAN: ${payment.bankAccount.iban}. Resulting channel" + " name: $channelName.") - val notifyHandle = PostgresListenNotify( - dbTx.getPgConnection(), - channelName - ) - notifyHandle.postgresNotify() + dbTx.postgresNotify(channelName) } } @@ -505,75 +495,53 @@ private suspend fun historyIncoming(call: ApplicationCall) { val start: Long = handleStartArgument(call.request.queryParameters["start"], delta) val history = TalerIncomingHistory() val startCmpOp = getComparisonOperator(delta, start, TalerIncomingPaymentsTable) + val listenHandle: PostgresListenHandle? = if (isPostgres() && longPollTimeout != null) { + val notificationChannelName = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING, + getFacadeBankAccount(facadeId).iban + ) + val handle = PostgresListenHandle(channelName = notificationChannelName) + handle.postgresListen() + handle + } else null + /** - * The following block checks first for results, and then LISTEN - * _only if_ the client gave the long_poll_ms parameter. + * NOTE: the LISTEN command MAY also go inside this transaction, + * but that uses a connection other than the one provided by the + * transaction block. More facts on the consequences are needed. */ - var resultOrWait: Pair< - List, - PostgresListenNotify? - > = transaction { - val res = TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) - // Register to Postgres notifications, if no results arrived. - if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) { - // Getting the IBAN to build the unique channel name. - val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId }.firstOrNull() - if (f == null) throw internalServerError( - "Handling request for facade '$facadeId', but that's not found in the database." - ) - val fState = FacadeStateEntity.find { - FacadeStateTable.facade eq f.id.value - }.firstOrNull() - if (fState == null) throw internalServerError( - "Facade '$facadeId' exist but has no state." - ) - val bankAccount = getBankAccount(fState.bankAccount) - val channelName = buildChannelName( - NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING, - bankAccount.iban - ) - logger.debug("LISTENing on domain " + - "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" + - " for IBAN: ${bankAccount.iban} with timeout: $longPollTimeoutPar." + - " Resulting channel name: $channelName" - ) - val listenHandle = PostgresListenNotify( - this.getPgConnection(), - channelName - ) - listenHandle.postrgesListen() - return@transaction Pair(res, listenHandle) - } - Pair(res, null) + var result: List = transaction { + TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) } - /** - * Wait here by releasing the execution, or proceed to response if didn't sleep. - * The right condition only silences the compiler, because when the timeout is null - * the left condition is always false (no listen-notify object.) - */ - if (resultOrWait.second != null && longPollTimeout != null) { - logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar ms") - val listenHandle = resultOrWait.second!! - val notificationArrived = listenHandle.postgresWaitNotification(longPollTimeout) + if (result.isNotEmpty() && listenHandle != null) + listenHandle.postgresUnlisten() + + if (result.isEmpty() && listenHandle != null && longPollTimeout != null) { + logger.debug("Waiting for NOTIFY on channel ${listenHandle.channelName}," + + " with timeout: $longPollTimeoutPar ms") + val notificationArrived = coroutineScope { + async(Dispatchers.IO) { + listenHandle.postgresGetNotifications(longPollTimeout) + }.await() + } if (notificationArrived) { - val likelyNewPayments = transaction { - // addLogger(StdOutSqlLogger) - TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) - } /** * NOTE: the query can still have zero results despite the * notification. That happens when the 'start' URI param is * higher than the ID of the new row in the database. Not * an error. */ - resultOrWait = Pair(likelyNewPayments, null) + result = transaction { + // addLogger(StdOutSqlLogger) + TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) + } } } /** * Whether because of a timeout or a notification or of never slept, here it * proceeds to the response (== resultOrWait.first IS EFFECTIVE). */ - val maybeNewPayments = resultOrWait.first + val maybeNewPayments = result if (maybeNewPayments.isNotEmpty()) { transaction { maybeNewPayments.subList( -- cgit v1.2.3