libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 9b93584989eacd6be3d533c4aa40d4a650eefd71
parent 2da505a32407d693492b1a08842c186721c604c7
Author: MS <ms@taler.net>
Date:   Tue, 17 Aug 2021 02:57:46 -1100

Implement Anastasis facade.  Needs testing

Diffstat:
Anexus/src/main/kotlin/tech/libeufin/nexus/Anastasis.kt | 107+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 38+++++++++++++++++++++++++++-----------
Anexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt | 101+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt | 98++++---------------------------------------------------------------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt | 6++++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt | 6+++---
Mnexus/src/test/kotlin/DBTest.kt | 4++--
7 files changed, 249 insertions(+), 111 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Anastasis.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Anastasis.kt @@ -0,0 +1,106 @@ +package tech.libeufin.nexus + +import io.ktor.application.* +import io.ktor.http.* +import io.ktor.response.* +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.iso20022.TransactionDetails +import tech.libeufin.nexus.server.PermissionQuery +import tech.libeufin.nexus.server.expectNonNull +import tech.libeufin.nexus.server.expectUrlParameter +import tech.libeufin.util.EbicsProtocolError +import kotlin.math.abs +import kotlin.math.min +import io.ktor.content.TextContent + +data class AnastasisIncomingBankTransaction( + val row_id: Long, + val date: GnunetTimestamp, // timestamp + val amount: String, + val credit_account: String, // payto form, + val debit_account: String, + val subject: String +) + +fun anastasisFilter(payment: NexusBankTransactionEntity, txDtls: TransactionDetails) { + val debtorName = txDtls.debtor?.name + if (debtorName == null) { + logger.warn("empty debtor name") + return + } + val debtorAcct = txDtls.debtorAccount + if (debtorAcct == null) { + // FIXME: Report payment, we can't even send it back + logger.warn("empty debtor account") + return + } + val debtorIban = debtorAcct.iban + if (debtorIban == null) { + // FIXME: Report payment, we can't even send it back + logger.warn("non-iban debtor account") + return + } + val debtorAgent = txDtls.debtorAgent + if (debtorAgent == null) { + // FIXME: Report payment, we can't even send it back + logger.warn("missing debtor agent") + return + } + if (debtorAgent.bic == null) { + logger.warn("Not allowing transactions missing the BIC. IBAN and name: ${debtorIban}, $debtorName") + return + } + AnastasisIncomingPaymentEntity.new { + this.payment = payment + subject = txDtls.unstructuredRemittanceInformation + timestampMs = System.currentTimeMillis() + debtorPaytoUri = buildIbanPaytoUri( + debtorIban, debtorAgent.bic, debtorName, "DBIT" + ) + } +} + +/** + * Handle a /taler-wire-gateway/history/incoming request. + */ +private suspend fun historyIncoming(call: ApplicationCall) { + val facadeId = expectNonNull(call.parameters["fcid"]) + call.request.requirePermission(PermissionQuery("facade", facadeId, "facade.talerwiregateway.history")) + val param = call.expectUrlParameter("delta") + val delta: Int = try { + param.toInt() + } catch (e: Exception) { + throw EbicsProtocolError(HttpStatusCode.BadRequest, "'${param}' is not Int") + } + val start: Long = handleStartArgument(call.request.queryParameters["start"], delta) + val history = object { + val incoming_transactions: MutableList<AnastasisIncomingBankTransaction> = mutableListOf() + } + val startCmpOp = getComparisonOperator(delta, start, AnastasisIncomingPaymentsTable) + transaction { + val orderedPayments = AnastasisIncomingPaymentEntity.find { + startCmpOp + }.orderTaler(delta) // Taler and Anastasis have same ordering policy. Fixme: find better function's name? + if (orderedPayments.isNotEmpty()) { + orderedPayments.subList(0, min(abs(delta), orderedPayments.size)).forEach { + history.incoming_transactions.add( + AnastasisIncomingBankTransaction( + // Rounded timestamp + date = GnunetTimestamp((it.timestampMs / 1000) * 1000), + row_id = it.id.value, + amount = "${it.payment.currency}:${it.payment.amount}", + subject = it.subject, + credit_account = buildIbanPaytoUri( + it.payment.bankAccount.iban, + it.payment.bankAccount.bankCode, + it.payment.bankAccount.accountHolder, + "CRDT" + ), + debit_account = it.debtorPaytoUri + ) + ) + } + } + } + return call.respond(TextContent(customConverter(history), ContentType.Application.Json)) +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -75,6 +75,21 @@ class TalerInvalidIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) { var refunded by TalerInvalidIncomingPaymentsTable.refunded } +object AnastasisIncomingPaymentsTable: LongIdTable() { + val payment = reference("payment", NexusBankTransactionsTable) + val subject = text("subject") + val timestampMs = long("timestampMs") + val debtorPaytoUri = text("incomingPaytoUri") +} + +class AnastasisIncomingPaymentEntity(id: EntityID<Long>) : LongEntity(id) { + companion object : LongEntityClass<AnastasisIncomingPaymentEntity>(AnastasisIncomingPaymentsTable) + + var payment by NexusBankTransactionEntity referencedOn AnastasisIncomingPaymentsTable.payment + var subject by AnastasisIncomingPaymentsTable.subject + var timestampMs by AnastasisIncomingPaymentsTable.timestampMs + var debtorPaytoUri by AnastasisIncomingPaymentsTable.debtorPaytoUri +} /** * This is the table of the incoming payments. Entries are merely "pointers" to the @@ -359,7 +374,7 @@ class FacadeEntity(id: EntityID<Long>) : LongEntity(id) { var creator by NexusUserEntity referencedOn FacadesTable.creator } -object TalerFacadeStateTable : LongIdTable() { +object FacadeStateTable : LongIdTable() { val bankAccount = text("bankAccount") val bankConnection = text("bankConnection") val currency = text("currency") @@ -376,19 +391,19 @@ object TalerFacadeStateTable : LongIdTable() { val highestSeenMsgSerialId = long("highestSeenMessageSerialId").default(0) } -class TalerFacadeStateEntity(id: EntityID<Long>) : LongEntity(id) { - companion object : LongEntityClass<TalerFacadeStateEntity>(TalerFacadeStateTable) +class FacadeStateEntity(id: EntityID<Long>) : LongEntity(id) { + companion object : LongEntityClass<FacadeStateEntity>(FacadeStateTable) - var bankAccount by TalerFacadeStateTable.bankAccount - var bankConnection by TalerFacadeStateTable.bankConnection - var currency by TalerFacadeStateTable.currency + var bankAccount by FacadeStateTable.bankAccount + var bankConnection by FacadeStateTable.bankConnection + var currency by FacadeStateTable.currency /** * "statement", "report", "notification" */ - var reserveTransferLevel by TalerFacadeStateTable.reserveTransferLevel - var facade by FacadeEntity referencedOn TalerFacadeStateTable.facade - var highestSeenMessageSerialId by TalerFacadeStateTable.highestSeenMsgSerialId + var reserveTransferLevel by FacadeStateTable.reserveTransferLevel + var facade by FacadeEntity referencedOn FacadeStateTable.facade + var highestSeenMessageSerialId by FacadeStateTable.highestSeenMsgSerialId } object NexusScheduledTasksTable : LongIdTable() { @@ -458,7 +473,7 @@ fun dbDropTables(dbConnectionString: String) { NexusBankConnectionsTable, NexusBankMessagesTable, FacadesTable, - TalerFacadeStateTable, + FacadeStateTable, NexusScheduledTasksTable, OfferedBankAccountsTable, NexusPermissionsTable, @@ -477,9 +492,10 @@ fun dbCreateTables(dbConnectionString: String) { NexusEbicsSubscribersTable, NexusBankAccountsTable, NexusBankTransactionsTable, + AnastasisIncomingPaymentsTable, TalerIncomingPaymentsTable, TalerRequestedPaymentsTable, - TalerFacadeStateTable, + FacadeStateTable, TalerInvalidIncomingPaymentsTable, NexusBankConnectionsTable, NexusBankMessagesTable, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt @@ -0,0 +1,100 @@ +package tech.libeufin.nexus + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.http.* +import org.jetbrains.exposed.sql.SortOrder +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.iso20022.CamtBankAccountEntry +import tech.libeufin.nexus.iso20022.CreditDebitIndicator +import tech.libeufin.nexus.iso20022.EntryStatus +import tech.libeufin.nexus.iso20022.TransactionDetails + + +/** + * Mainly used to resort the last processed transaction ID. + */ +fun getFacadeState(fcid: String): FacadeStateEntity { + val facade = FacadeEntity.find { FacadesTable.facadeName eq fcid }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find facade '${fcid}'" + ) + return FacadeStateEntity.find { + FacadeStateTable.facade eq facade.id.value + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any state for facade: $fcid" + ) +} + +fun getFacadeBankAccount(fcid: String): NexusBankAccountEntity { + val facadeState = getFacadeState(fcid) + return NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( + HttpStatusCode.NotFound, + "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" + ) +} + +/** + * Ingests transactions for those facades accounting for bankAccountId. + */ +fun ingestFacadeTransactions( + bankAccountId: String, + facadeType: String, + incomingFilterCb: ((NexusBankTransactionEntity, TransactionDetails) -> Unit)?, + refundCb: ((NexusBankAccountEntity, Long) -> Unit)? +) { + fun ingest(bankAccount: NexusBankAccountEntity, facade: FacadeEntity) { + logger.debug( + "Ingesting transactions for Taler facade ${facade.id.value}," + + " and bank account: ${bankAccount.bankAccountName}" + ) + val facadeState = getFacadeState(facade.facadeName) + var lastId = facadeState.highestSeenMessageSerialId + NexusBankTransactionEntity.find { + /** Those with "our" bank account involved */ + NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and + /** Those that are booked */ + (NexusBankTransactionsTable.status eq EntryStatus.BOOK) and + /** Those that came later than the latest processed payment */ + (NexusBankTransactionsTable.id.greater(lastId)) + }.orderBy(Pair(NexusBankTransactionsTable.id, SortOrder.ASC)).forEach { + // Incoming payment. + logger.debug("Facade checks payment: ${it.transactionJson}") + val tx = jacksonObjectMapper().readValue( + it.transactionJson, CamtBankAccountEntry::class.java + ) + val details = tx.batches?.get(0)?.batchTransactions?.get(0)?.details + if (details == null) { + logger.warn("A void money movement made it through the ingestion: VERY strange") + return@forEach + } + when (tx.creditDebitIndicator) { + CreditDebitIndicator.CRDT -> { + if (incomingFilterCb != null) { + incomingFilterCb(it, details) + } + } + else -> Unit + } + lastId = it.id.value + } + try { + // FIXME: This currently does not do proper error handing. + if (refundCb != null) { + refundCb(bankAccount, facadeState.highestSeenMessageSerialId) + } + } catch (e: Exception) { + logger.warn("sending refund payment failed", e); + } + facadeState.highestSeenMessageSerialId = lastId + } + // invoke ingestion for all the facades + transaction { + FacadeEntity.find { FacadesTable.type eq facadeType }.forEach { + val facadeBankAccount = getFacadeBankAccount(it.facadeName) + if (facadeBankAccount.bankAccountName == bankAccountId) + ingest(facadeBankAccount, it) + } + } +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Taler.kt @@ -172,27 +172,6 @@ fun extractReservePubFromSubject(rawSubject: String): String? { return result.value.uppercase() } -private fun getTalerFacadeState(fcid: String): TalerFacadeStateEntity { - val facade = FacadeEntity.find { FacadesTable.facadeName eq fcid }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find facade '${fcid}'" - ) - return TalerFacadeStateEntity.find { - TalerFacadeStateTable.facade eq facade.id.value - }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find any state for facade: $fcid" - ) -} - -private fun getTalerFacadeBankAccount(fcid: String): NexusBankAccountEntity { - val facadeState = getTalerFacadeState(fcid) - return NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( - HttpStatusCode.NotFound, - "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" - ) -} - /** * Handle a Taler Wire Gateway /transfer request. */ @@ -226,7 +205,7 @@ private suspend fun talerTransfer(call: ApplicationCall) { ) } } - val exchangeBankAccount = getTalerFacadeBankAccount(facadeId) + val exchangeBankAccount = getFacadeBankAccount(facadeId) val pain001 = addPaymentInitiation( Pain001Data( creditorIban = creditorData.iban, @@ -273,7 +252,7 @@ fun roundTimestamp(t: GnunetTimestamp): GnunetTimestamp { return GnunetTimestamp(t.t_ms - (t.t_ms % 1000)) } -private fun ingestOneIncomingTransaction(payment: NexusBankTransactionEntity, txDtls: TransactionDetails) { +fun talerFilter(payment: NexusBankTransactionEntity, txDtls: TransactionDetails) { val subject = txDtls.unstructuredRemittanceInformation val debtorName = txDtls.debtor?.name if (debtorName == null) { @@ -332,10 +311,9 @@ private fun ingestOneIncomingTransaction(payment: NexusBankTransactionEntity, tx debtorIban, debtorAgent.bic, debtorName, "DBIT" ) } - return } -fun maybePrepareRefunds(bankAccount: NexusBankAccountEntity, lastSeenId: Long) { +fun maybeTalerRefunds(bankAccount: NexusBankAccountEntity, lastSeenId: Long) { logger.debug( "Searching refundable payments of account: ${bankAccount}," + " after last seen transaction id: ${lastSeenId}" @@ -400,71 +378,6 @@ fun maybePrepareRefunds(bankAccount: NexusBankAccountEntity, lastSeenId: Long) { } /** - * Crawls the database to find ALL the users that have a Taler - * facade and process their histories respecting the TWG policy. - * The two main tasks it does are: (1) marking as invalid those - * payments with bad subject line, and (2) see if previously requested - * payments got booked as outgoing payments (and mark them accordingly - * in the local table). - */ - -/** - * - */ -fun ingestTalerTransactions(bankAccountId: String) { - fun ingest(bankAccount: NexusBankAccountEntity, facade: FacadeEntity) { - logger.debug( - "Ingesting transactions for Taler facade ${facade.id.value}," + - " and bank account: ${bankAccount.bankAccountName}" - ) - val facadeState = getTalerFacadeState(facade.facadeName) - var lastId = facadeState.highestSeenMessageSerialId - NexusBankTransactionEntity.find { - /** Those with "our" bank account involved */ - NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and - /** Those that are booked */ - (NexusBankTransactionsTable.status eq EntryStatus.BOOK) and - /** Those that came later than the latest processed payment */ - (NexusBankTransactionsTable.id.greater(lastId)) - }.orderBy(Pair(NexusBankTransactionsTable.id, SortOrder.ASC)).forEach { - // Incoming payment. - logger.debug("Taler checks payment: ${it.transactionJson}") - val tx = jacksonObjectMapper().readValue( - it.transactionJson, CamtBankAccountEntry::class.java - ) - val details = tx.batches?.get(0)?.batchTransactions?.get(0)?.details - if (details == null) { - logger.warn("A void money movement made it through the ingestion: VERY strange") - return@forEach - } - when (tx.creditDebitIndicator) { - CreditDebitIndicator.CRDT -> { - ingestOneIncomingTransaction(it, txDtls = details) - } - else -> Unit - } - lastId = it.id.value - } - try { - // FIXME: This currently does not do proper error handing. - maybePrepareRefunds(bankAccount, facadeState.highestSeenMessageSerialId) - } catch (e: Exception) { - logger.warn("sending refund payment failed", e); - } - facadeState.highestSeenMessageSerialId = lastId - - } - // invoke ingestion for all the facades - transaction { - FacadeEntity.find { FacadesTable.type eq "taler-wire-gateway" }.forEach { - val facadeBankAccount = getTalerFacadeBankAccount(it.facadeName) - if (facadeBankAccount.bankAccountName == bankAccountId) - ingest(facadeBankAccount, it) - } - } -} - -/** * Handle a /taler/history/outgoing request. */ private suspend fun historyOutgoing(call: ApplicationCall) { @@ -482,7 +395,7 @@ private suspend fun historyOutgoing(call: ApplicationCall) { val history = TalerOutgoingHistory() transaction { /** Retrieve all the outgoing payments from the _clean Taler outgoing table_ */ - val subscriberBankAccount = getTalerFacadeBankAccount(facadeId) + val subscriberBankAccount = getFacadeBankAccount(facadeId) val reqPayments = mutableListOf<TalerRequestedPaymentEntity>() val reqPaymentsWithUnconfirmed = TalerRequestedPaymentEntity.find { startCmpOp @@ -561,12 +474,11 @@ private suspend fun historyIncoming(call: ApplicationCall) { private fun getCurrency(facadeName: String): String { return transaction { - getTalerFacadeState(facadeName).currency + getFacadeState(facadeName).currency } } fun talerFacadeRoutes(route: Route, httpClient: HttpClient) { - route.get("/config") { val facadeId = ensureNonNull(call.parameters["fcid"]) call.request.requirePermission( diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt @@ -306,7 +306,7 @@ suspend fun fetchBankAccountTransactions(client: HttpClient, fetchSpec: FetchSpe val connectionName = conn.connectionId } } - + // abstracts over the connection type: ebics or others. getConnectionPlugin(res.connectionType).fetchTransactions( fetchSpec, client, @@ -315,7 +315,9 @@ suspend fun fetchBankAccountTransactions(client: HttpClient, fetchSpec: FetchSpe ) val newTransactions = ingestBankMessagesIntoAccount(res.connectionName, accountId) - ingestTalerTransactions(accountId) + ingestFacadeTransactions(accountId, "taler-wire-gateway", ::talerFilter, ::maybeTalerRefunds) + ingestFacadeTransactions(accountId, "anastasis", ::anastasisFilter, null) + return newTransactions } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -62,8 +62,8 @@ fun getFacadeState(type: String, facade: FacadeEntity): JsonNode { return transaction { when (type) { "taler-wire-gateway" -> { - val state = TalerFacadeStateEntity.find { - TalerFacadeStateTable.facade eq facade.id + val state = FacadeStateEntity.find { + FacadeStateTable.facade eq facade.id }.firstOrNull() if (state == null) throw NexusError(HttpStatusCode.NotFound, "State of facade ${facade.id} not found") val node = jacksonObjectMapper().createObjectNode() @@ -980,7 +980,7 @@ fun serverMain(host: String, port: Int) { ) } transaction { - TalerFacadeStateEntity.new { + FacadeStateEntity.new { bankAccount = body.config.bankAccount bankConnection = body.config.bankConnection reserveTransferLevel = body.config.reserveTransferLevel diff --git a/nexus/src/test/kotlin/DBTest.kt b/nexus/src/test/kotlin/DBTest.kt @@ -65,7 +65,7 @@ class DBTest { addLogger(StdOutSqlLogger) SchemaUtils.create( FacadesTable, - TalerFacadeStateTable, + FacadeStateTable, NexusUsersTable ) val user = NexusUserEntity.new { @@ -78,7 +78,7 @@ class DBTest { type = "any" creator = user } - TalerFacadeStateEntity.new { + FacadeStateEntity.new { bankAccount = "b" bankConnection = "b" reserveTransferLevel = "any"