summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-03-31 14:15:41 +0200
committerMS <ms@taler.net>2023-03-31 14:15:41 +0200
commit59a20971438391fb22a536080b27b4db50ea5d31 (patch)
tree09de30fe938a17eb5056976c948ba2aef0790dc5
parent9832b269d5fbe14ce6c7421d47f3ee6868c87c3f (diff)
downloadlibeufin-59a20971438391fb22a536080b27b4db50ea5d31.tar.gz
libeufin-59a20971438391fb22a536080b27b4db50ea5d31.tar.bz2
libeufin-59a20971438391fb22a536080b27b4db50ea5d31.zip
Taler facade.
Using new interface for DB notifications.
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt108
1 files changed, 38 insertions, 70 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
index 4b325def..365a4ea5 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt
@@ -19,7 +19,6 @@
package tech.libeufin.nexus
-import UtilError
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.server.application.ApplicationCall
import io.ktor.server.application.call
@@ -34,15 +33,15 @@ import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.util.*
-import io.ktor.util.*
-import kotlinx.coroutines.*
-import net.taler.wallet.crypto.Base32Crockford
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.currentCoroutineContext
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
-import org.postgresql.jdbc.PgConnection
import tech.libeufin.nexus.bankaccount.addPaymentInitiation
import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
import tech.libeufin.nexus.bankaccount.getBankAccount
@@ -50,8 +49,6 @@ import tech.libeufin.nexus.iso20022.*
import tech.libeufin.nexus.server.*
import tech.libeufin.util.*
import java.net.URL
-import java.util.concurrent.atomic.AtomicReference
-import javax.xml.crypto.Data
import kotlin.math.abs
import kotlin.math.min
@@ -245,10 +242,12 @@ private suspend fun talerTransfer(call: ApplicationCall) {
)
}
+// Processes new transactions and stores TWG-specific data in
fun talerFilter(
payment: NexusBankTransactionEntity,
txDtls: TransactionDetails
) {
+ val channelsToNotify = mutableListOf<String>()
var isInvalid = false // True when pub is invalid or duplicate.
val subject = txDtls.unstructuredRemittanceInformation
val debtorName = txDtls.debtor?.name
@@ -324,11 +323,6 @@ fun talerFilter(
HttpStatusCode.InternalServerError,
"talerFilter(): unexpected execution out of a DB transaction"
)
- /**
- * Without COMMIT here, the woken up LISTENer won't
- * find the record in the database.
- */
- dbTx.commit()
// Only supporting Postgres' NOTIFY.
if (dbTx.isPostgres()) {
val channelName = buildChannelName(
@@ -339,11 +333,7 @@ fun talerFilter(
" ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
" for IBAN: ${payment.bankAccount.iban}. Resulting channel" +
" name: $channelName.")
- val notifyHandle = PostgresListenNotify(
- dbTx.getPgConnection(),
- channelName
- )
- notifyHandle.postgresNotify()
+ dbTx.postgresNotify(channelName)
}
}
@@ -505,75 +495,53 @@ private suspend fun historyIncoming(call: ApplicationCall) {
val start: Long = handleStartArgument(call.request.queryParameters["start"], delta)
val history = TalerIncomingHistory()
val startCmpOp = getComparisonOperator(delta, start, TalerIncomingPaymentsTable)
+ val listenHandle: PostgresListenHandle? = if (isPostgres() && longPollTimeout != null) {
+ val notificationChannelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
+ getFacadeBankAccount(facadeId).iban
+ )
+ val handle = PostgresListenHandle(channelName = notificationChannelName)
+ handle.postgresListen()
+ handle
+ } else null
+
/**
- * The following block checks first for results, and then LISTEN
- * _only if_ the client gave the long_poll_ms parameter.
+ * NOTE: the LISTEN command MAY also go inside this transaction,
+ * but that uses a connection other than the one provided by the
+ * transaction block. More facts on the consequences are needed.
*/
- var resultOrWait: Pair<
- List<TalerIncomingPaymentEntity>,
- PostgresListenNotify?
- > = transaction {
- val res = TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
- // Register to Postgres notifications, if no results arrived.
- if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) {
- // Getting the IBAN to build the unique channel name.
- val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId }.firstOrNull()
- if (f == null) throw internalServerError(
- "Handling request for facade '$facadeId', but that's not found in the database."
- )
- val fState = FacadeStateEntity.find {
- FacadeStateTable.facade eq f.id.value
- }.firstOrNull()
- if (fState == null) throw internalServerError(
- "Facade '$facadeId' exist but has no state."
- )
- val bankAccount = getBankAccount(fState.bankAccount)
- val channelName = buildChannelName(
- NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING,
- bankAccount.iban
- )
- logger.debug("LISTENing on domain " +
- "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" +
- " for IBAN: ${bankAccount.iban} with timeout: $longPollTimeoutPar." +
- " Resulting channel name: $channelName"
- )
- val listenHandle = PostgresListenNotify(
- this.getPgConnection(),
- channelName
- )
- listenHandle.postrgesListen()
- return@transaction Pair(res, listenHandle)
- }
- Pair(res, null)
+ var result: List<TalerIncomingPaymentEntity> = transaction {
+ TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
}
- /**
- * Wait here by releasing the execution, or proceed to response if didn't sleep.
- * The right condition only silences the compiler, because when the timeout is null
- * the left condition is always false (no listen-notify object.)
- */
- if (resultOrWait.second != null && longPollTimeout != null) {
- logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar ms")
- val listenHandle = resultOrWait.second!!
- val notificationArrived = listenHandle.postgresWaitNotification(longPollTimeout)
+ if (result.isNotEmpty() && listenHandle != null)
+ listenHandle.postgresUnlisten()
+
+ if (result.isEmpty() && listenHandle != null && longPollTimeout != null) {
+ logger.debug("Waiting for NOTIFY on channel ${listenHandle.channelName}," +
+ " with timeout: $longPollTimeoutPar ms")
+ val notificationArrived = coroutineScope {
+ async(Dispatchers.IO) {
+ listenHandle.postgresGetNotifications(longPollTimeout)
+ }.await()
+ }
if (notificationArrived) {
- val likelyNewPayments = transaction {
- // addLogger(StdOutSqlLogger)
- TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
- }
/**
* NOTE: the query can still have zero results despite the
* notification. That happens when the 'start' URI param is
* higher than the ID of the new row in the database. Not
* an error.
*/
- resultOrWait = Pair(likelyNewPayments, null)
+ result = transaction {
+ // addLogger(StdOutSqlLogger)
+ TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta)
+ }
}
}
/**
* Whether because of a timeout or a notification or of never slept, here it
* proceeds to the response (== resultOrWait.first IS EFFECTIVE).
*/
- val maybeNewPayments = resultOrWait.first
+ val maybeNewPayments = result
if (maybeNewPayments.isNotEmpty()) {
transaction {
maybeNewPayments.subList(