libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit cc87817b4adab5172aa7cb3a86580892cfc71c56
parent 889b88faf594891b2eb9fa35be832e74d553e730
Author: MS <ms@taler.net>
Date:   Fri, 10 Mar 2023 17:43:23 +0100

Long-polling.

Using DB notifications for /history/incoming at
the Taler facade.

Diffstat:
Mnexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt | 12+++++++-----
Mnexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 3+--
Mnexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 161++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt | 1-
4 files changed, 144 insertions(+), 33 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt @@ -60,13 +60,13 @@ fun findPermission(p: Permission): NexusPermissionEntity? { * Require that the authenticated user has at least one of the listed permissions. * * Throws a NexusError if the authenticated user for the request doesn't have any of - * listed the permissions. + * listed the permissions. It returns the username of the authorized user. */ -fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery) { - transaction { +fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery): String { + val username = transaction { val user = authenticateRequest(this@requirePermission) if (user.superuser) { - return@transaction + return@transaction user.username } var foundPermission = false for (pr in perms) { @@ -82,8 +82,10 @@ fun ApplicationRequest.requirePermission(vararg perms: PermissionQuery) { perms.joinToString(" | ") { "${it.resourceId} ${it.resourceType} ${it.permissionName}" } throw NexusError( HttpStatusCode.Forbidden, - "User ${user.id.value} has insufficient permissions (needs $possiblePerms." + "User ${user.username} has insufficient permissions (needs $possiblePerms)." ) } + user.username } + return username } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -29,6 +29,7 @@ import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.util.EbicsInitState import java.sql.Connection + /** * This table holds the values that exchange gave to issue a payment, * plus a reference to the prepared pain.001 version of. Note that @@ -101,7 +102,6 @@ object TalerIncomingPaymentsTable : LongIdTable() { val debtorPaytoUri = text("incomingPaytoUri") } - class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) { companion object : LongEntityClass<TalerIncomingPaymentEntity>(TalerIncomingPaymentsTable) @@ -343,7 +343,6 @@ class EbicsSubscriberEntity(id: EntityID<Long>) : LongEntity(id) { } object NexusUsersTable : LongIdTable() { - val username = text("username") val passwordHash = text("password") val superuser = bool("superuser") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt @@ -19,6 +19,7 @@ package tech.libeufin.nexus +import UtilError import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.server.application.ApplicationCall import io.ktor.server.application.call @@ -33,16 +34,24 @@ import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post import io.ktor.server.util.* +import io.ktor.util.* +import kotlinx.coroutines.* +import net.taler.wallet.crypto.Base32Crockford import org.jetbrains.exposed.dao.Entity import org.jetbrains.exposed.dao.id.IdTable import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction +import org.postgresql.jdbc.PgConnection import tech.libeufin.nexus.bankaccount.addPaymentInitiation import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions +import tech.libeufin.nexus.bankaccount.getBankAccount import tech.libeufin.nexus.iso20022.* import tech.libeufin.nexus.server.* import tech.libeufin.util.* import java.net.URL +import java.util.concurrent.atomic.AtomicReference +import javax.xml.crypto.Data import kotlin.math.abs import kotlin.math.min @@ -66,7 +75,8 @@ data class TalerTransferResponse( ) /** - * History accounting data structures + * History accounting data structures, typically + * used to build JSON responses. */ data class TalerIncomingBankTransaction( val row_id: Long, @@ -122,21 +132,16 @@ fun getComparisonOperator(delta: Int, start: Long, table: IdTable<Long>): Op<Boo } fun expectLong(param: String?): Long { - if (param == null) { - throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not Long") - } - return try { - param.toLong() - } catch (e: Exception) { - throw EbicsProtocolError(HttpStatusCode.BadRequest, "'$param' is not Long") + if (param == null) throw badRequest("'$param' is not Long") + return try { param.toLong() } catch (e: Exception) { + throw badRequest("'$param' is not Long") } } -/** Helper handling 'start' being optional and its dependence on 'delta'. */ +// Helper handling 'start' being optional and its dependence on 'delta'. fun handleStartArgument(start: String?, delta: Int): Long { if (start == null) { - if (delta >= 0) - return -1 + if (delta >= 0) return -1 return Long.MAX_VALUE } return expectLong(start) @@ -307,8 +312,35 @@ fun talerFilter( reservePublicKey = reservePub timestampMs = System.currentTimeMillis() debtorPaytoUri = buildIbanPaytoUri( - debtorIban, debtorAgent.bic, debtorName + debtorIban, + debtorAgent.bic, + debtorName + ) + } + val dbTx = TransactionManager.currentOrNull() ?: throw NexusError( + HttpStatusCode.InternalServerError, + "talerFilter(): unexpected execution out of a DB transaction" + ) + /** + * Without COMMIT here, the woken up LISTENer won't + * find the record in the database. + */ + dbTx.commit() + // Only supporting Postgres' NOTIFY. + if (dbTx.isPostgres()) { + val channelName = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING, + payment.bankAccount.iban + ) + logger.debug("NOTIFYing on domain" + + " ${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" + + " for IBAN: ${payment.bankAccount.iban}. Resulting channel" + + " name: $channelName.") + val notifyHandle = PostgresListenNotify( + dbTx.getPgConnection(), + channelName ) + notifyHandle.postgresNotify() } } @@ -445,27 +477,106 @@ private suspend fun historyOutgoing(call: ApplicationCall) { ) } -/** - * Handle a /taler-wire-gateway/history/incoming request. - */ +// Handle a /taler-wire-gateway/history/incoming request. private suspend fun historyIncoming(call: ApplicationCall) { val facadeId = expectNonNull(call.parameters["fcid"]) - call.request.requirePermission(PermissionQuery("facade", facadeId, "facade.talerwiregateway.history")) + val username = call.request.requirePermission( + PermissionQuery( + "facade", + facadeId, + "facade.talerwiregateway.history" + ) + ) + val longPollTimeoutPar = call.parameters["long_poll_ms"] + val longPollTimeout = if (longPollTimeoutPar != null) { + val longPollTimeoutValue = try { longPollTimeoutPar.toLong() } + catch (e: Exception) { + throw badRequest("long_poll_ms value is invalid") + } + longPollTimeoutValue + } else null val param = call.expectUrlParameter("delta") - val delta: Int = try { - param.toInt() - } catch (e: Exception) { + val delta: Int = try { param.toInt() } catch (e: Exception) { throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not Int") } val start: Long = handleStartArgument(call.request.queryParameters["start"], delta) val history = TalerIncomingHistory() val startCmpOp = getComparisonOperator(delta, start, TalerIncomingPaymentsTable) - transaction { - val orderedPayments = TalerIncomingPaymentEntity.find { - startCmpOp - }.orderTaler(delta) - if (orderedPayments.isNotEmpty()) { - orderedPayments.subList(0, min(abs(delta), orderedPayments.size)).forEach { + /** + * The following block checks first for results, and then LISTEN + * _only if_ the client gave the long_poll_ms parameter. + */ + var resultOrWait: Pair< + List<TalerIncomingPaymentEntity>, + PostgresListenNotify? + > = transaction { + val res = TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) + // Register to Postgres notifications, if no results arrived. + if (res.isEmpty() && this.isPostgres() && longPollTimeout != null) { + // Getting the IBAN to build the unique channel name. + val f = FacadeEntity.find { FacadesTable.facadeName eq facadeId }.firstOrNull() + if (f == null) throw internalServerError( + "Handling request for facade '$facadeId', but that's not found in the database." + ) + val fState = FacadeStateEntity.find { + FacadeStateTable.facade eq f.id.value + }.firstOrNull() + if (fState == null) throw internalServerError( + "Facade '$facadeId' exist but has no state." + ) + val bankAccount = getBankAccount(fState.bankAccount) + val channelName = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING, + bankAccount.iban + ) + logger.debug("LISTENing on domain " + + "${NotificationsChannelDomains.LIBEUFIN_TALER_INCOMING}" + + " for IBAN: ${bankAccount.iban} with timeout: $longPollTimeoutPar." + + " Resulting channel name: $channelName" + ) + val listenHandle = PostgresListenNotify( + this.getPgConnection(), + channelName + ) + listenHandle.postrgesListen() + return@transaction Pair(res, listenHandle) + } + Pair(res, null) + } + /** + * Wait here by releasing the execution, or proceed to response if didn't sleep. + * The right condition only silences the compiler, because when the timeout is null + * the left condition is always false (no listen-notify object.) + */ + if (resultOrWait.second != null && longPollTimeout != null) { + logger.debug("Waiting for NOTIFY, with timeout: $longPollTimeoutPar ms") + val listenHandle = resultOrWait.second!! + val notificationArrived = listenHandle.postgresWaitNotification(longPollTimeout) + if (notificationArrived) { + val likelyNewPayments = transaction { + // addLogger(StdOutSqlLogger) + TalerIncomingPaymentEntity.find { startCmpOp }.orderTaler(delta) + } + /** + * NOTE: the query can still have zero results despite the + * notification. That happens when the 'start' URI param is + * higher than the ID of the new row in the database. Not + * an error. + */ + resultOrWait = Pair(likelyNewPayments, null) + } + } + /** + * Whether because of a timeout or a notification or of never slept, here it + * proceeds to the response (== resultOrWait.first IS EFFECTIVE). + */ + val maybeNewPayments = resultOrWait.first + if (maybeNewPayments.isNotEmpty()) { + transaction { + maybeNewPayments.subList( + 0, + min(abs(delta), maybeNewPayments.size) + ).forEach { history.incoming_transactions.add( TalerIncomingBankTransaction( // Rounded timestamp diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -57,7 +57,6 @@ import tech.libeufin.util.* import java.net.BindException import java.net.URLEncoder import kotlin.system.exitProcess - /** * Return facade state depending on the type. */