diff options
author | MS <ms@taler.net> | 2020-06-06 00:36:12 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2020-06-06 00:36:12 +0200 |
commit | 74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07 (patch) | |
tree | 46fb8beba729f65e731a4f23f30839274efce702 /nexus/src | |
parent | 708661483b61f2d420a6a51c68220c3f5e141d38 (diff) | |
download | libeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.tar.gz libeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.tar.bz2 libeufin-74c2a6c0e9b3ed17fdb8bd3abd1325edcd662d07.zip |
deduplicating transactions and addressing #6266.
Diffstat (limited to 'nexus/src')
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 31 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt | 16 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 36 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt | 85 | ||||
-rw-r--r-- | nexus/src/test/kotlin/DBTest.kt | 4 |
5 files changed, 96 insertions, 76 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt index 2a6c61a8..193c1ef4 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -80,7 +80,6 @@ class TalerIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) { return newRow } } - var payment by RawBankTransactionEntity referencedOn TalerIncomingPayments.payment var valid by TalerIncomingPayments.valid var refunded by TalerIncomingPayments.refunded @@ -120,12 +119,12 @@ object RawBankTransactionsTable : LongIdTable() { val counterpartName = text("counterpartName") val bookingDate = long("bookingDate") val status = text("status") // BOOK or other. + val uid = text("uid") // AcctSvcrRef code, given by the bank. val bankAccount = reference("bankAccount", NexusBankAccountsTable) } class RawBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) { companion object : LongEntityClass<RawBankTransactionEntity>(RawBankTransactionsTable) - var unstructuredRemittanceInformation by RawBankTransactionsTable.unstructuredRemittanceInformation var transactionType by RawBankTransactionsTable.transactionType var currency by RawBankTransactionsTable.currency @@ -135,6 +134,7 @@ class RawBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) { var counterpartName by RawBankTransactionsTable.counterpartName var bookingDate by RawBankTransactionsTable.bookingDate var status by RawBankTransactionsTable.status + var uid by RawBankTransactionsTable.uid var bankAccount by NexusBankAccountEntity referencedOn RawBankTransactionsTable.bankAccount } @@ -157,10 +157,8 @@ object PreparedPaymentsTable : IdTable<String>() { val debitorIban = text("debitorIban") val debitorBic = text("debitorBic") val debitorName = text("debitorName").nullable() - /* Indicates whether the PAIN message was sent to the bank. */ val submitted = bool("submitted").default(false) - /* Indicates whether the bank didn't perform the payment: note that * this state can be reached when the payment gets listed in a CRZ * response OR when the payment doesn't show up in a C52/C53 response */ @@ -169,7 +167,6 @@ object PreparedPaymentsTable : IdTable<String>() { class PreparedPaymentEntity(id: EntityID<String>) : Entity<String>(id) { companion object : EntityClass<String, PreparedPaymentEntity>(PreparedPaymentsTable) - var paymentId by PreparedPaymentsTable.paymentId var preparationDate by PreparedPaymentsTable.preparationDate var submissionDate by PreparedPaymentsTable.submissionDate @@ -227,7 +224,6 @@ object EbicsSubscribersTable : IntIdTable() { class EbicsSubscriberEntity(id: EntityID<Int>) : IntEntity(id) { companion object : IntEntityClass<EbicsSubscriberEntity>(EbicsSubscribersTable) - var ebicsURL by EbicsSubscribersTable.ebicsURL var hostID by EbicsSubscribersTable.hostID var partnerID by EbicsSubscribersTable.partnerID @@ -273,7 +269,6 @@ object FacadesTable : IdTable<String>() { override val primaryKey = PrimaryKey(id, name = "id") val type = text("type") val creator = reference("creator", NexusUsersTable) - val config = reference("config", TalerFacadeConfigsTable) // see #6266 val highestSeenMsgID = long("highestSeenMessageID").default(0) } @@ -281,25 +276,27 @@ class FacadeEntity(id: EntityID<String>) : Entity<String>(id) { companion object : EntityClass<String, FacadeEntity>(FacadesTable) var type by FacadesTable.type var creator by NexusUserEntity referencedOn FacadesTable.creator - var config by TalerFacadeConfigEntity referencedOn FacadesTable.config - var highestSeenMsgID by FacadesTable.highestSeenMsgID } -object TalerFacadeConfigsTable : IntIdTable() { +object TalerFacadeStatesTable : IntIdTable() { val bankAccount = text("bankAccount") val bankConnection = text("bankConnection") /* "statement", "report", "notification" */ val reserveTransferLevel = text("reserveTransferLevel") val intervalIncrement = text("intervalIncrement") + val facade = reference("facade", FacadesTable) + val highestSeenMsgID = long("highestSeenMsgID").default(0) } -class TalerFacadeConfigEntity(id: EntityID<Int>) : IntEntity(id) { - companion object : IntEntityClass<TalerFacadeConfigEntity>(TalerFacadeConfigsTable) - var bankAccount by TalerFacadeConfigsTable.bankAccount - var bankConnection by TalerFacadeConfigsTable.bankConnection +class TalerFacadeStateEntity(id: EntityID<Int>) : IntEntity(id) { + companion object : IntEntityClass<TalerFacadeStateEntity>(TalerFacadeStatesTable) + var bankAccount by TalerFacadeStatesTable.bankAccount + var bankConnection by TalerFacadeStatesTable.bankConnection /* "statement", "report", "notification" */ - var reserveTransferLevel by TalerFacadeConfigsTable.reserveTransferLevel - var intervalIncrement by TalerFacadeConfigsTable.intervalIncrement + var reserveTransferLevel by TalerFacadeStatesTable.reserveTransferLevel + var intervalIncrement by TalerFacadeStatesTable.intervalIncrement + var facade by FacadeEntity referencedOn TalerFacadeStatesTable.facade + var highestSeenMsgID by TalerFacadeStatesTable.highestSeenMsgID } fun dbCreateTables(dbName: String) { @@ -318,7 +315,7 @@ fun dbCreateTables(dbName: String) { NexusBankConnectionsTable, NexusBankMessagesTable, FacadesTable, - TalerFacadeConfigsTable + TalerFacadeStatesTable ) } }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt index 1462ab97..450c1eec 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Helpers.kt @@ -104,6 +104,16 @@ fun getEbicsSubscriberDetails(userId: String, transportId: String): EbicsClientS return getEbicsSubscriberDetailsInternal(subscriber) } +// returns true if the payment is found in the database. +fun isDuplicate(camt: Document, acctSvcrRef: String): Boolean { + val found = transaction { + RawBankTransactionEntity.find { + RawBankTransactionsTable.uid eq acctSvcrRef + }.firstOrNull() + } + return found != null +} + fun processCamtMessage( bankAccountId: String, camt53doc: Document @@ -116,8 +126,14 @@ fun processCamtMessage( val bookingDate = parseDashedDate( camt53doc.pickString("//*[local-name()='BookgDt']//*[local-name()='Dt']") ) + val acctSvcrRef = camt53doc.pickString("//*[local-name()='AcctSvcrRef']") + if (isDuplicate(camt53doc, acctSvcrRef)) { + logger.info("Processing a duplicate, not storing it.") + return@transaction + } RawBankTransactionEntity.new { bankAccount = acct + uid = acctSvcrRef unstructuredRemittanceInformation = camt53doc.pickString("//*[local-name()='Ntry']//*[local-name()='Ustrd']") transactionType = camt53doc.pickString("//*[local-name()='Ntry']//*[local-name()='CdtDbtInd']") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt index 372f0e83..901db99c 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -74,7 +74,6 @@ import java.util.* import java.util.zip.InflaterInputStream import javax.crypto.EncryptedPrivateKeyInfo import java.time.LocalDateTime -import kotlin.coroutines.CoroutineContext data class NexusError(val statusCode: HttpStatusCode, val reason: String) : Exception("${reason} (HTTP status $statusCode)") @@ -265,7 +264,7 @@ fun schedulePeriodicWork() { logger.debug("Outer background job") try { delay(Duration.ofSeconds(1)) - downloadFacadesTransactions(this) + downloadTalerFacadesTransactions(this) ingestTalerTransactions() } catch (e: Exception) { logger.info("==== Background job exception ====\n${e.message}======") @@ -275,13 +274,13 @@ fun schedulePeriodicWork() { } /** Crawls all the facades, and requests history for each of its creators. */ -suspend fun downloadFacadesTransactions(myScope: CoroutineScope) { +suspend fun downloadTalerFacadesTransactions(myScope: CoroutineScope) { val httpClient = HttpClient() val work = mutableListOf<Pair<String, String>>() transaction { - FacadeEntity.all().forEach { - logger.debug("Fetching history for facade: ${it.id.value}, bank account: ${it.config.bankAccount}") - work.add(Pair(it.creator.id.value, it.config.bankAccount)) + TalerFacadeStateEntity.all().forEach { + logger.debug("Fetching history for facade: ${it.id.value}, bank account: ${it.bankAccount}") + work.add(Pair(it.facade.creator.id.value, it.bankAccount)) } } work.forEach { @@ -489,7 +488,6 @@ fun serverMain(dbName: String) { call.respond(HttpStatusCode.OK, qr) return@post } - /** * Shows the bank accounts belonging to the requesting user. */ @@ -522,7 +520,6 @@ fun serverMain(dbName: String) { } call.respond(res) } - /** * Submit one particular payment to the bank. */ @@ -1095,25 +1092,20 @@ fun serverMain(dbName: String) { } post("/facades") { val body = call.receive<FacadeInfo>() - val (user, talerConfig) = transaction { + val newFacade = transaction { val user = authenticateRequest(call.request) - val talerConfig = TalerFacadeConfigEntity.new { + FacadeEntity.new(body.name) { + type = body.type + creator = user + } + } + transaction { + TalerFacadeStateEntity.new { bankAccount = body.config.bankAccount bankConnection = body.config.bankConnection intervalIncrement = body.config.intervalIncremental reserveTransferLevel = body.config.reserveTransferLevel - } - Pair(user, talerConfig) - } - // Kotlin+Exposed did NOT like the referenced and referencing - // tables to be created inside the same transfer block. This - // problem must be further investigated. - transaction { - FacadeEntity.new(body.name) { - type = body.type - creator = user - config = talerConfig - highestSeenMsgID = 0 + facade = newFacade } } call.respondText("Facade created") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt index 499743e3..7a74d09b 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt @@ -177,9 +177,13 @@ fun expectLong(param: String?): Long { } } - /** Helper handling 'start' being optional and its dependence on 'delta'. */ fun handleStartArgument(start: String?, delta: Int): Long { + if (start == null) { + if (delta >= 0) + return -1 + return Long.MAX_VALUE + } return expectLong(start) } @@ -218,13 +222,36 @@ fun paymentFailed(entry: RawBankTransactionEntity): Boolean { return false } -fun getFacadeBankAccount(nexusUser: NexusUserEntity): NexusBankAccountEntity { - val facade = FacadeEntity.find { FacadesTable.creator eq nexusUser.id.value }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, "Could not find any facade from ${nexusUser.id.value}" +fun getTalerFacadeState(fcid: String): TalerFacadeStateEntity { + val facade = FacadeEntity.find { FacadesTable.id eq fcid }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find facade '${fcid}'" + ) + val facadeState = TalerFacadeStateEntity.find { + TalerFacadeStatesTable.facade eq facade.id.value + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any state for facade: ${fcid}" ) - val bankAccount = NexusBankAccountEntity.findById(facade.config.bankAccount) ?: throw NexusError( - HttpStatusCode.NotFound, "Could not find any bank account named ${facade.config.bankAccount}" + return facadeState +} + +fun getTalerFacadeBankAccount(fcid: String): NexusBankAccountEntity { + val facade = FacadeEntity.find { FacadesTable.id eq fcid }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find facade '${fcid}'" ) + val facadeState = TalerFacadeStateEntity.find { + TalerFacadeStatesTable.facade eq facade.id.value + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any state for facade: ${fcid}" + ) + val bankAccount = NexusBankAccountEntity.findById(facadeState.bankAccount) ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any bank account named ${facadeState.bankAccount}" + ) + return bankAccount } @@ -251,7 +278,7 @@ suspend fun talerTransfer(call: ApplicationCall) { ) } } - val exchangeBankAccount = getFacadeBankAccount(exchangeUser) + val exchangeBankAccount = getTalerFacadeBankAccount(expectNonNull(call.parameters["fcid"])) val pain001 = addPreparedPayment( Pain001Data( creditorIban = creditorData.iban, @@ -263,6 +290,7 @@ suspend fun talerTransfer(call: ApplicationCall) { ), exchangeBankAccount ) + logger.debug("Taler requests payment: ${transferRequest.wtid}") val row = TalerRequestedPaymentEntity.new { preparedPayment = pain001 // not really used/needed, just here to silence warnings exchangeBaseUrl = transferRequest.exchange_base_url @@ -296,16 +324,12 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { val addIncomingData = call.receive<TalerAdminAddIncoming>() val debtor = parsePayto(addIncomingData.debit_account) val res = transaction { + val user = authenticateRequest(call.request) val facadeID = expectNonNull(call.parameters["fcid"]) - val facade = FacadeEntity.findById(facadeID) ?: throw NexusError( - HttpStatusCode.NotFound, "Could not find facade '$facadeID'" - ) - val facadeBankAccount = NexusBankAccountEntity.findById(facade.config.bankAccount) ?: throw NexusError( - HttpStatusCode.NotFound, - "Such bank account '${facade.config.bankAccount}' wasn't found for facade '$facadeID'" - ) + val facadeState = getTalerFacadeState(facadeID) + val facadeBankAccount = getTalerFacadeBankAccount(facadeID) return@transaction object { - val facadeLastSeen = facade.highestSeenMsgID + val facadeLastSeen = facadeState.highestSeenMsgID val facadeIban = facadeBankAccount.iban val facadeBic = facadeBankAccount.bankCode val facadeHolderName = facadeBankAccount.accountHolder @@ -317,6 +341,7 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { urlString = "http://localhost:5000/admin/payments", block = { /** FIXME: ideally Jackson should define such request body. */ + val parsedAmount = parseAmount(addIncomingData.amount) this.body = """{ "creditorIban": "${res.facadeIban}", "creditorBic": "${res.facadeBic}", @@ -324,7 +349,8 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { "debitorIban": "${debtor.iban}", "debitorBic": "${debtor.bic}", "debitorName": "${debtor.name}", - "amount": "${addIncomingData.amount}", + "amount": "${parsedAmount.amount}", + "currency": "${parsedAmount.currency}", "subject": "${addIncomingData.reserve_pub}" }""".trimIndent() contentType(ContentType.Application.Json) @@ -356,7 +382,8 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { fun ingestTalerTransactions() { fun ingest(subscriberAccount: NexusBankAccountEntity, facade: FacadeEntity) { logger.debug("Ingesting transactions for Taler facade: ${facade.id.value}") - var lastId = facade.highestSeenMsgID + val facadeState = getTalerFacadeState(facade.id.value) + var lastId = facadeState.highestSeenMsgID RawBankTransactionEntity.find { /** Those with exchange bank account involved */ RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value and @@ -365,13 +392,6 @@ fun ingestTalerTransactions() { /** Those that came later than the latest processed payment */ (RawBankTransactionsTable.id.greater(lastId)) }.orderBy(Pair(RawBankTransactionsTable.id, SortOrder.ASC)).forEach { - if (duplicatePayment(it)) { - logger.warn("Incoming payment already seen") - throw NexusError( - HttpStatusCode.InternalServerError, - "Incoming payment already seen" - ) - } // Incoming payment. if (it.transactionType == "CRDT") { if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) { @@ -388,29 +408,25 @@ fun ingestTalerTransactions() { } // Outgoing payment if (it.transactionType == "DBIT") { + logger.debug("Ingesting outgoing payment: ${it.unstructuredRemittanceInformation}") var talerRequested = TalerRequestedPaymentEntity.find { TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation }.firstOrNull() ?: throw NexusError( HttpStatusCode.InternalServerError, "Payment '${it.unstructuredRemittanceInformation}' shows in history, but was never requested!" ) - if (talerRequested != null) { - talerRequested.rawConfirmed = it - } + talerRequested.rawConfirmed = it } - /** WARNING: it is not guaranteed that the last processed raw - * payment is ALSO the one with highest ID. A more accurate management - * is needed. */ lastId = it.id.value } - facade.highestSeenMsgID = lastId + facadeState.highestSeenMsgID = lastId } // invoke ingestion for all the facades transaction { FacadeEntity.find { FacadesTable.type eq "taler-wire-gateway" }.forEach { - val subscriberAccount = getFacadeBankAccount(it.creator) + val subscriberAccount = getTalerFacadeBankAccount(it.id.value) ingest(subscriberAccount, it) } } @@ -429,9 +445,8 @@ suspend fun historyOutgoing(call: ApplicationCall): Unit { val history = TalerOutgoingHistory() transaction { val user = authenticateRequest(call.request) - /** Retrieve all the outgoing payments from the _clean Taler outgoing table_ */ - val subscriberBankAccount = getFacadeBankAccount(user) + val subscriberBankAccount = getTalerFacadeBankAccount(expectNonNull(call.parameters["fcid"])) val reqPayments = TalerRequestedPaymentEntity.find { TalerRequestedPayments.rawConfirmed.isNotNull() and startCmpOp }.orderTaler(delta) @@ -523,4 +538,4 @@ fun talerFacadeRoutes(route: Route) { call.respondText("Hello, this is Taler Facade") return@get } -} +}
\ No newline at end of file diff --git a/nexus/src/test/kotlin/DBTest.kt b/nexus/src/test/kotlin/DBTest.kt index 80055b0c..586707c8 100644 --- a/nexus/src/test/kotlin/DBTest.kt +++ b/nexus/src/test/kotlin/DBTest.kt @@ -41,10 +41,10 @@ class DBTest { addLogger(StdOutSqlLogger) SchemaUtils.create( FacadesTable, - TalerFacadeConfigsTable, + TalerFacadeStatesTable, NexusUsersTable ) - val talerConfig = TalerFacadeConfigEntity.new { + val talerConfig = TalerFacadeStateEntity.new { bankAccount = "b" bankConnection = "b" reserveTransferLevel = "any" |