diff options
author | MS <ms@taler.net> | 2023-03-31 14:22:46 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2023-03-31 14:22:46 +0200 |
commit | 4f1525107159cd7f92c18e8060f0333a9c3055b4 (patch) | |
tree | cddc21785fa296a417963802961429c553826f98 | |
parent | 59a20971438391fb22a536080b27b4db50ea5d31 (diff) | |
download | libeufin-4f1525107159cd7f92c18e8060f0333a9c3055b4.tar.gz libeufin-4f1525107159cd7f92c18e8060f0333a9c3055b4.tar.bz2 libeufin-4f1525107159cd7f92c18e8060f0333a9c3055b4.zip |
Nexus x-libeufin-bank connection.
Introducing the operation to download transactions
from the bank. More helpers were added, and some
CaMt specific code got moved from BankAccount.kt to
Iso20022.kt.
12 files changed, 1080 insertions, 320 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt index bdaa0ec3..2048813e 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt @@ -9,6 +9,13 @@ import tech.libeufin.nexus.server.Permission import tech.libeufin.nexus.server.PermissionQuery import tech.libeufin.util.* +fun getNexusUser(username: String): NexusUserEntity = + transaction { + NexusUserEntity.find { + NexusUsersTable.username eq username + }.firstOrNull() ?: throw notFound("User $username not found.") + } + /** * HTTP basic auth. Throws error if password is wrong, * and makes sure that the user exists in the system. diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt index 6a18db21..ac52095c 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt @@ -23,11 +23,12 @@ import com.fasterxml.jackson.databind.JsonNode import io.ktor.client.HttpClient import io.ktor.http.HttpStatusCode import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.server.BankConnectionType import tech.libeufin.nexus.server.FetchSpecJson // 'const' allows only primitive types. -val bankConnectionRegistry: Map<String, BankConnectionProtocol> = mapOf( - "ebics" to EbicsBankConnectionProtocol() +val bankConnectionRegistry: Map<BankConnectionType, BankConnectionProtocol> = mapOf( + BankConnectionType.EBICS to EbicsBankConnectionProtocol() ) interface BankConnectionProtocol { @@ -64,9 +65,13 @@ interface BankConnectionProtocol { * * This function returns a possibly empty list of exceptions. * That helps not to stop fetching if ONE operation fails. Notably, - * C52 and C53 may be asked along one invocation of this function, + * C52 _and_ C53 may be asked along one invocation of this function, * therefore storing the exception on C52 allows the C53 to still * take place. The caller then decides how to handle the exceptions. + * + * More on multi requests: C52 and C53, or more generally 'reports' + * and 'statements' are tried to be downloaded together when the fetch + * level is set to ALL. */ suspend fun fetchTransactions( fetchSpec: FetchSpecJson, @@ -76,9 +81,18 @@ interface BankConnectionProtocol { ): List<Exception>? } -fun getConnectionPlugin(connId: String): BankConnectionProtocol { - return bankConnectionRegistry.get(connId) ?: throw NexusError( +fun getConnectionPlugin(connType: BankConnectionType): BankConnectionProtocol { + return bankConnectionRegistry[connType] ?: throw NexusError( HttpStatusCode.NotFound, - "Connection type '${connId}' not available" + "Connection type '${connType}' not available" ) +} + +/** + * Adaptor helper to keep until all the connection type mentions will + * be passed as BankConnectionType instead of arbitrary easy-to-break + * string. + */ +fun getConnectionPlugin(connType: String): BankConnectionProtocol { + return getConnectionPlugin(BankConnectionType.parseBankConnectionType(connType)) }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt index f0af4499..38fb7bd3 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -19,17 +19,28 @@ package tech.libeufin.nexus +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.jetbrains.exposed.dao.* import org.jetbrains.exposed.dao.id.EntityID import org.jetbrains.exposed.dao.id.LongIdTable import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.util.EbicsInitState import java.sql.Connection +import kotlin.reflect.typeOf +enum class EntryStatus { + // Booked + BOOK, + // Pending + PDNG, + // Informational + INFO, +} + /** * This table holds the values that exchange gave to issue a payment, * plus a reference to the prepared pain.001 version of. Note that @@ -134,15 +145,36 @@ class NexusBankBalanceEntity(id: EntityID<Long>) : LongEntity(id) { var date by NexusBankBalancesTable.date } +// This table holds the data to talk to Sandbox +// via the x-libeufin-bank protocol supplier. +object XLibeufinBankUsersTable : LongIdTable() { + val username = text("username") + val password = text("password") + val baseUrl = text("baseUrl") + val nexusBankConnection = reference("nexusBankConnection", NexusBankConnectionsTable) +} + +class XLibeufinBankUserEntity(id: EntityID<Long>) : LongEntity(id) { + companion object : LongEntityClass<XLibeufinBankUserEntity>(XLibeufinBankUsersTable) + var username by XLibeufinBankUsersTable.username + var password by XLibeufinBankUsersTable.password + var baseUrl by XLibeufinBankUsersTable.baseUrl + var nexusBankConnection by NexusBankConnectionEntity referencedOn XLibeufinBankUsersTable.nexusBankConnection +} + /** * Table that stores all messages we receive from the bank. + * The nullable fields were introduced along the x-libeufin-bank + * connection, as those messages are plain JSON object unlike + * the more structured CaMt. */ object NexusBankMessagesTable : LongIdTable() { val bankConnection = reference("bankConnection", NexusBankConnectionsTable) - val messageId = text("messageId") - val code = text("code") val message = blob("message") - val errors = bool("errors").default(false) // true when the parser could not ingest one message. + val messageId = text("messageId").nullable() + val code = text("code").nullable() + // true when the parser could not ingest one message: + val errors = bool("errors").default(false) } class NexusBankMessageEntity(id: EntityID<Long>) : LongEntity(id) { @@ -188,6 +220,21 @@ class NexusBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) { var transactionJson by NexusBankTransactionsTable.transactionJson var accountTransactionId by NexusBankTransactionsTable.accountTransactionId val updatedBy by NexusBankTransactionEntity optionalReferencedOn NexusBankTransactionsTable.updatedBy + + /** + * It is responsibility of the caller to insert only valid + * JSON into the database, and therefore provide error management + * when calling the two helpers below. + */ + + inline fun <reified T> parseDetailsIntoObject(): T { + val mapper = jacksonObjectMapper() + return mapper.readValue(this.transactionJson, T::class.java) + } + fun parseDetailsIntoObject(): JsonNode { + val mapper = jacksonObjectMapper() + return mapper.readTree(this.transactionJson) + } } /** @@ -459,9 +506,7 @@ object NexusPermissionsTable : LongIdTable() { val subjectId = text("subjectName") val permissionName = text("permissionName") - init { - uniqueIndex(resourceType, resourceId, subjectType, subjectId, permissionName) - } + init { uniqueIndex(resourceType, resourceId, subjectType, subjectId, permissionName) } } class NexusPermissionEntity(id: EntityID<Long>) : LongEntity(id) { @@ -479,6 +524,7 @@ fun dbDropTables(dbConnectionString: String) { transaction { SchemaUtils.drop( NexusUsersTable, + XLibeufinBankUsersTable, PaymentInitiationsTable, NexusEbicsSubscribersTable, NexusBankAccountsTable, @@ -504,6 +550,7 @@ fun dbCreateTables(dbConnectionString: String) { TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE transaction { SchemaUtils.create( + XLibeufinBankUsersTable, NexusScheduledTasksTable, NexusUsersTable, PaymentInitiationsTable, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt index 7fdd2c26..c908a828 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt @@ -2,47 +2,60 @@ package tech.libeufin.nexus import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* +import org.jetbrains.exposed.dao.flushCache import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.nexus.iso20022.CamtBankAccountEntry import tech.libeufin.nexus.iso20022.CreditDebitIndicator -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.nexus.iso20022.TransactionDetails +import tech.libeufin.nexus.server.NexusFacadeType - -/** - * Mainly used to resort the last processed transaction ID. - */ +// Mainly used to resort the last processed transaction ID. fun getFacadeState(fcid: String): FacadeStateEntity { - val facade = FacadeEntity.find { FacadesTable.facadeName eq fcid }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find facade '${fcid}'" - ) - return FacadeStateEntity.find { - FacadeStateTable.facade eq facade.id.value - }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find any state for facade: $fcid" - ) + return transaction { + val facade = FacadeEntity.find { + FacadesTable.facadeName eq fcid + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find facade '${fcid}'" + ) + FacadeStateEntity.find { + FacadeStateTable.facade eq facade.id.value + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any state for facade: $fcid" + ) + } } fun getFacadeBankAccount(fcid: String): NexusBankAccountEntity { - val facadeState = getFacadeState(fcid) - return NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( - HttpStatusCode.NotFound, - "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" - ) + return transaction { + val facadeState = getFacadeState(fcid) + NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( + HttpStatusCode.NotFound, + "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" + ) + } } /** * Ingests transactions for those facades accounting for bankAccountId. + * 'incomingFilterCb' decides whether the facade accepts the payment; + * if not, refundCb prepares a refund. The 'txStatus' parameter decides + * at which state one transaction deserve to fuel Taler transactions. BOOK + * is conservative, and with some banks the delay can be significant. PNDG + * instead reacts faster, but risks that one transaction gets undone by the + * bank and never reach the BOOK state; this would mean a loss and/or admin + * burden. */ fun ingestFacadeTransactions( bankAccountId: String, - facadeType: String, + facadeType: NexusFacadeType, incomingFilterCb: ((NexusBankTransactionEntity, TransactionDetails) -> Unit)?, - refundCb: ((NexusBankAccountEntity, Long) -> Unit)? + refundCb: ((NexusBankAccountEntity, Long) -> Unit)?, + txStatus: EntryStatus = EntryStatus.BOOK ) { fun ingest(bankAccount: NexusBankAccountEntity, facade: FacadeEntity) { logger.debug( @@ -55,18 +68,21 @@ fun ingestFacadeTransactions( /** Those with "our" bank account involved */ NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and /** Those that are booked */ - (NexusBankTransactionsTable.status eq EntryStatus.BOOK) and + (NexusBankTransactionsTable.status eq txStatus) and /** Those that came later than the latest processed payment */ (NexusBankTransactionsTable.id.greater(lastId)) }.orderBy(Pair(NexusBankTransactionsTable.id, SortOrder.ASC)).forEach { // Incoming payment. - logger.debug("Facade checks payment: ${it.transactionJson}") val tx = jacksonObjectMapper().readValue( - it.transactionJson, CamtBankAccountEntry::class.java + it.transactionJson, + CamtBankAccountEntry::class.java ) - val details = tx.batches?.get(0)?.batchTransactions?.get(0)?.details + /** + * Need transformer from "JSON tx" to TransactionDetails?. + */ + val details: TransactionDetails? = tx.batches?.get(0)?.batchTransactions?.get(0)?.details if (details == null) { - logger.warn("A void money movement made it through the ingestion: VERY strange") + logger.warn("A void money movement (${tx.accountServicerRef}) made it through the ingestion: VERY strange") return@forEach } when (tx.creditDebitIndicator) { @@ -90,16 +106,17 @@ fun ingestFacadeTransactions( ) } } catch (e: Exception) { - logger.warn("sending refund payment failed", e) + logger.warn("Sending refund payment failed: ${e.message}") } facadeState.highestSeenMessageSerialId = lastId } // invoke ingestion for all the facades transaction { - FacadeEntity.find { FacadesTable.type eq facadeType }.forEach { + FacadeEntity.find { FacadesTable.type eq facadeType.facadeType }.forEach { val facadeBankAccount = getFacadeBankAccount(it.facadeName) if (facadeBankAccount.bankAccountName == bankAccountId) ingest(facadeBankAccount, it) + flushCache() } } }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt index 86c86a37..8d6062d1 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt @@ -59,7 +59,15 @@ private suspend fun runTask(client: HttpClient, sched: TaskSchedule) { "fetch" -> { @Suppress("BlockingMethodInNonBlockingContext") val fetchSpec = jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java) - fetchBankAccountTransactions(client, fetchSpec, sched.resourceId) + val outcome = fetchBankAccountTransactions(client, fetchSpec, sched.resourceId) + if (outcome.errors != null && outcome.errors!!.isNotEmpty()) { + /** + * Communication with the bank had at least one error. All of + * them get logged when this 'outcome.errors' list was defined, + * so not logged twice here. Failing to bring the problem(s) up. + */ + exitProcess(1) + } } /** * Submits the payment preparations that are found in the database. diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt index c71e5531..7843345f 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt @@ -24,20 +24,37 @@ import io.ktor.server.application.ApplicationCall import io.ktor.client.HttpClient import io.ktor.http.HttpStatusCode import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.transactions.transaction -import org.w3c.dom.Document import tech.libeufin.nexus.* import tech.libeufin.nexus.iso20022.* -import tech.libeufin.nexus.server.FetchSpecJson -import tech.libeufin.nexus.server.Pain001Data -import tech.libeufin.nexus.server.requireBankConnection -import tech.libeufin.nexus.server.toPlainString +import tech.libeufin.nexus.server.* +import tech.libeufin.nexus.xlibeufinbank.processXLibeufinBankMessage import tech.libeufin.util.XMLUtil +import tech.libeufin.util.internalServerError import java.time.Instant +import java.time.ZoneOffset import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter private val keepBankMessages: String? = System.getenv("LIBEUFIN_NEXUS_KEEP_BANK_MESSAGES") + +/** + * Gets a prepared payment starting from its 'payment information id'. + * Note: although the terminology comes from CaMt, a 'payment information id' + * is indeed any UID that identifies the payment. For this reason, also + * the x-libeufin-bank logic uses this helper. + * + * Returns the prepared payment, or null if that's not found. Not throwing + * any exception because the null case is common: not every transaction being + * processed by Neuxs was prepared/initiated here; incoming transactions are + * one example. + */ +fun getPaymentInitiation(pmtInfId: String): PaymentInitiationEntity? = + transaction { + PaymentInitiationEntity.find( + PaymentInitiationsTable.paymentInformationId.eq(pmtInfId) + ).firstOrNull() + } fun requireBankAccount(call: ApplicationCall, parameterKey: String): NexusBankAccountEntity { val name = call.parameters[parameterKey] if (name == null) @@ -63,6 +80,7 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: val submitted = paymentInitiation.submitted } } + // Skips, if the payment was sent once already. if (r.submitted) { return } @@ -75,10 +93,11 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: /** * Submit all pending prepared payments. */ -suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: String) { - data class Submission( - val id: Long - ) +suspend fun submitAllPaymentInitiations( + httpClient: HttpClient, + accountid: String +) { + data class Submission(val id: Long) val workQueue = mutableListOf<Submission>() transaction { val account = NexusBankAccountEntity.findByName(accountid) ?: throw NexusError( @@ -120,24 +139,10 @@ suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: Strin } /** - * Check if the transaction is already found in the database. - */ -private fun findDuplicate(bankAccountId: String, acctSvcrRef: String): NexusBankTransactionEntity? { - // FIXME: make this generic depending on transaction identification scheme - val ati = "AcctSvcrRef:$acctSvcrRef" - return transaction { - val account = NexusBankAccountEntity.findByName((bankAccountId)) ?: return@transaction null - NexusBankTransactionEntity.find { - (NexusBankTransactionsTable.accountTransactionId eq ati) and (NexusBankTransactionsTable.bankAccount eq account.id) - }.firstOrNull() - } -} - -/** * NOTE: this type can be used BOTH for one Camt document OR * for a set of those. */ -data class CamtTransactionsCount( +data class IngestedTransactionsCount( /** * Number of transactions that are new to the database. * Note that transaction T can be downloaded multiple times; @@ -155,154 +160,31 @@ data class CamtTransactionsCount( /** * Exceptions occurred while fetching transactions. Fetching * transactions can be done via multiple EBICS messages, therefore - * a failing one should not prevent other messages to be sent. - * This list collects all the exceptions that happened during the - * execution of a batch of messages. + * a failing one should not prevent other messages to be fetched. + * This list collects all the exceptions that happened while fetching + * multiple messages. */ var errors: List<Exception>? = null ) /** - * Get the Camt parsed by a helper function, discards duplicates - * and stores new transactions. - */ -fun processCamtMessage( - bankAccountId: String, camtDoc: Document, code: String -): CamtTransactionsCount { - var newTransactions = 0 - var downloadedTransactions = 0 - transaction { - val acct = NexusBankAccountEntity.findByName(bankAccountId) - if (acct == null) { - throw NexusError(HttpStatusCode.NotFound, "user not found") - } - val res = try { - parseCamtMessage(camtDoc) - } catch (e: CamtParsingError) { - logger.warn("Invalid CAMT received from bank: $e") - newTransactions = -1 - return@transaction - } - res.reports.forEach { - NexusAssert( - it.account.iban == acct.iban, - "Nexus hit a report or statement of a wrong IBAN!" - ) - it.balances.forEach { b -> - if (b.type == "CLBD") { - val lastBalance = NexusBankBalanceEntity.all().lastOrNull() - /** - * Store balances different from the one that came from the bank, - * or the very first balance. This approach has the following inconvenience: - * the 'balance' held at Nexus does not differentiate between one - * coming from a statement and one coming from a report. As a consequence, - * the two types of balances may override each other without notice. - */ - if ((lastBalance == null) || - (b.amount.toPlainString() != lastBalance.balance)) { - NexusBankBalanceEntity.new { - bankAccount = acct - balance = b.amount.toPlainString() - creditDebitIndicator = b.creditDebitIndicator.name - date = b.date - } - } - } - } - } - /** - * Why is the report/statement creation timestamp important, - * rather than each individual payment identification value? - */ - val stamp = - ZonedDateTime.parse(res.creationDateTime, DateTimeFormatter.ISO_DATE_TIME).toInstant().toEpochMilli() - when (code) { - "C52" -> { - val s = acct.lastReportCreationTimestamp - if (s != null && stamp > s) { - acct.lastReportCreationTimestamp = stamp - } - } - "C53" -> { - val s = acct.lastStatementCreationTimestamp - if (s != null && stamp > s) { - acct.lastStatementCreationTimestamp = stamp - } - } - } - val entries: List<CamtBankAccountEntry> = res.reports.map { it.entries }.flatten() - var newPaymentsLog = "" - downloadedTransactions = entries.size - txloop@ for (entry in entries) { - val singletonBatchedTransaction = entry.batches?.get(0)?.batchTransactions?.get(0) - ?: throw NexusError( - HttpStatusCode.InternalServerError, - "Singleton money movements policy wasn't respected" - ) - val acctSvcrRef = entry.accountServicerRef - if (acctSvcrRef == null) { - // FIXME(dold): Report this! - logger.error("missing account servicer reference in transaction") - continue - } - val duplicate = findDuplicate(bankAccountId, acctSvcrRef) - if (duplicate != null) { - logger.info("Found a duplicate (acctSvcrRef): $acctSvcrRef") - // FIXME(dold): See if an old transaction needs to be superseded by this one - // https://bugs.gnunet.org/view.php?id=6381 - continue@txloop - } - val rawEntity = NexusBankTransactionEntity.new { - bankAccount = acct - accountTransactionId = "AcctSvcrRef:$acctSvcrRef" - amount = singletonBatchedTransaction.amount.value - currency = singletonBatchedTransaction.amount.currency - transactionJson = jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(entry) - creditDebitIndicator = singletonBatchedTransaction.creditDebitIndicator.name - status = entry.status - } - rawEntity.flush() - newTransactions++ - newPaymentsLog += "\n- " + entry.batches[0].batchTransactions[0].details.unstructuredRemittanceInformation - // This block tries to acknowledge a former outgoing payment as booked. - if (singletonBatchedTransaction.creditDebitIndicator == CreditDebitIndicator.DBIT) { - val t0 = singletonBatchedTransaction.details - val pmtInfId = t0.paymentInformationId - if (pmtInfId != null) { - val paymentInitiation = PaymentInitiationEntity.find { - PaymentInitiationsTable.bankAccount eq acct.id and ( - PaymentInitiationsTable.paymentInformationId eq pmtInfId) - - }.firstOrNull() - if (paymentInitiation != null) { - logger.info("Could confirm one initiated payment: $pmtInfId") - paymentInitiation.confirmationTransaction = rawEntity - } - } - } - } - if (newTransactions > 0) - logger.debug("Camt $code '${res.messageId}' has new payments:${newPaymentsLog}") - } - return CamtTransactionsCount( - newTransactions = newTransactions, - downloadedTransactions = downloadedTransactions - ) -} - -/** - * Create new transactions for an account based on bank messages it - * did not see before. + * Causes new Nexus transactions to be stored into the database. Note: + * this function does NOT parse itself the banking data but relies on the + * dedicated helpers. This function is mostly responsible for _iterating_ + * over the new downloaded messages and update the local bank account about + * the new data. */ fun ingestBankMessagesIntoAccount( bankConnectionId: String, bankAccountId: String -): CamtTransactionsCount { +): IngestedTransactionsCount { var totalNew = 0 var downloadedTransactions = 0 transaction { val conn = - NexusBankConnectionEntity.find { NexusBankConnectionsTable.connectionId eq bankConnectionId }.firstOrNull() + NexusBankConnectionEntity.find { + NexusBankConnectionsTable.connectionId eq bankConnectionId + }.firstOrNull() if (conn == null) { throw NexusError(HttpStatusCode.InternalServerError, "connection not found") } @@ -311,15 +193,55 @@ fun ingestBankMessagesIntoAccount( throw NexusError(HttpStatusCode.InternalServerError, "account not found") } var lastId = acct.highestSeenBankMessageSerialId + /** + * This block picks all the new messages that were downloaded + * from the bank and passes them to the deeper banking data handlers + * according to the connection type. Such handlers are then responsible + * to extract the interesting values and insert them into the database. + */ NexusBankMessageEntity.find { (NexusBankMessagesTable.bankConnection eq conn.id) and (NexusBankMessagesTable.id greater acct.highestSeenBankMessageSerialId) and - // Wrong messages got already skipped by the - // index check above. Below is a extra check. not(NexusBankMessagesTable.errors) - }.orderBy(Pair(NexusBankMessagesTable.id, SortOrder.ASC)).forEach { - val doc = XMLUtil.parseStringIntoDom(it.message.bytes.toString(Charsets.UTF_8)) - val processingResult = processCamtMessage(bankAccountId, doc, it.code) + }.orderBy( + Pair(NexusBankMessagesTable.id, SortOrder.ASC) + ).forEach { + val processingResult: IngestedTransactionsCount = when(BankConnectionType.parseBankConnectionType(conn.type)) { + BankConnectionType.EBICS -> { + val doc = XMLUtil.parseStringIntoDom(it.message.bytes.toString(Charsets.UTF_8)) + /** + * Calling the CaMt handler. After its return, all the Neuxs-meaningful + * payment data got stored into the database and is ready to being further + * processed by any facade OR simply be communicated to the CLI via JSON. + */ + processCamtMessage( + bankAccountId, + doc, + it.code ?: throw internalServerError( + "Bank message with ID ${it.id.value} in DB table" + + " NexusBankMessagesTable has no code, but one is expected." + ) + ) + } + BankConnectionType.X_LIBEUFIN_BANK -> { + val jMessage = try { jacksonObjectMapper().readTree(it.message.bytes) } + catch (e: Exception) { + logger.error("Bank message ${it.id}/${it.messageId} could not" + + " be parsed into JSON by the x-libeufin-bank ingestion.") + throw internalServerError("Could not ingest x-libeufin-bank messages.") + } + processXLibeufinBankMessage( + bankAccountId, + jMessage + ) + } + } + /** + * Checking for errors. Note: errors do NOT stop this loop as + * they mean that ONE message has errors. Erroneous messages gets + * (1) flagged, (2) skipped when this function will run again, and (3) + * NEVER deleted from the database. + */ if (processingResult.newTransactions == -1) { it.errors = true lastId = it.id.value @@ -336,12 +258,18 @@ fun ingestBankMessagesIntoAccount( it.delete() return@forEach } + /** + * Updating the highest seen message ID with the serial ID of + * the row that's being currently iterated over. Note: this + * number is ever-growing REGARDLESS of the row being kept into + * the database. + */ lastId = it.id.value } + // Causing the lastId to be stored into the database: acct.highestSeenBankMessageSerialId = lastId } - // return totalNew - return CamtTransactionsCount( + return IngestedTransactionsCount( newTransactions = totalNew, downloadedTransactions = downloadedTransactions ) @@ -358,6 +286,31 @@ fun getPaymentInitiation(uuid: Long): PaymentInitiationEntity { "Payment '$uuid' not found" ) } + +data class LastMessagesTimes( + val lastStatement: ZonedDateTime?, + val lastReport: ZonedDateTime? +) +/** + * Get the last timestamps where a report and + * a statement were received for the bank account + * given as argument. + */ +fun getLastMessagesTimes(bankAccountId: String): LastMessagesTimes { + val acct = getBankAccount(bankAccountId) + return getLastMessagesTimes(acct) +} + +fun getLastMessagesTimes(acct: NexusBankAccountEntity): LastMessagesTimes { + return LastMessagesTimes( + lastReport = acct.lastReportCreationTimestamp?.let { + ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) + }, + lastStatement = acct.lastStatementCreationTimestamp?.let { + ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) + } + ) +} fun getBankAccount(label: String): NexusBankAccountEntity { val maybeBankAccount = transaction { NexusBankAccountEntity.findByName(label) @@ -405,9 +358,11 @@ fun addPaymentInitiation(paymentData: Pain001Data, debtorAccount: NexusBankAccou } suspend fun fetchBankAccountTransactions( - client: HttpClient, fetchSpec: FetchSpecJson, accountId: String -): CamtTransactionsCount { - val res = transaction { + client: HttpClient, + fetchSpec: FetchSpecJson, + accountId: String +): IngestedTransactionsCount { + val connectionDetails = transaction { val acct = NexusBankAccountEntity.findByName(accountId) if (acct == null) { throw NexusError( @@ -423,7 +378,12 @@ suspend fun fetchBankAccountTransactions( ) } return@transaction object { - val connectionType = conn.type + /** + * The connection type _as enum_ should eventually come + * directly from the database, instead of being parsed by + * parseBankConnectionType(). + */ + val connectionType = BankConnectionType.parseBankConnectionType(conn.type) val connectionName = conn.connectionId } } @@ -432,16 +392,39 @@ suspend fun fetchBankAccountTransactions( * document into the database. This function tries to download * both reports AND statements even if the first one fails. */ - val errors: List<Exception>? = getConnectionPlugin(res.connectionType).fetchTransactions( + val errors: List<Exception>? = getConnectionPlugin(connectionDetails.connectionType).fetchTransactions( fetchSpec, client, - res.connectionName, + connectionDetails.connectionName, accountId ) - val ingestionResult = ingestBankMessagesIntoAccount(res.connectionName, accountId) - ingestFacadeTransactions(accountId, "taler-wire-gateway", ::talerFilter, ::maybeTalerRefunds) - ingestFacadeTransactions(accountId, "anastasis", ::anastasisFilter, null) + /** + * This block causes new NexusBankAccountTransactions rows to be + * INSERTed into the database, according to the banking data that + * was recently downloaded. + */ + val ingestionResult: IngestedTransactionsCount = ingestBankMessagesIntoAccount( + connectionDetails.connectionName, + accountId + ) + /** + * The following two functions further processe the banking data + * that was recently downloaded, according to the particular facade + * being honored. + */ + ingestFacadeTransactions( + bankAccountId = accountId, + facadeType = NexusFacadeType.TALER, + incomingFilterCb = ::talerFilter, + refundCb = ::maybeTalerRefunds + ) + ingestFacadeTransactions( + bankAccountId = accountId, + facadeType = NexusFacadeType.ANASTASIS, + incomingFilterCb = ::anastasisFilter, + refundCb = null + ) ingestionResult.errors = errors return ingestionResult @@ -498,4 +481,29 @@ fun importBankAccount(call: ApplicationCall, offeredBankAccountId: String, nexus } } } -}
\ No newline at end of file +} + + +/** + * Check if the transaction is already found in the database. + * This function works as long as the caller provides the appropriate + * 'uid' parameter. For CaMt messages this value is carried along + * the AcctSvcrRef node, whereas for x-libeufin-bank connections + * that's the 'uid' field of the XLibeufinBankTransaction type. + * + * Returns the transaction that's already in the database, in case + * the 'uid' is from a duplicate. + */ +fun findDuplicate( + bankAccountId: String, + uid: String +): NexusBankTransactionEntity? { + return transaction { + val account = NexusBankAccountEntity.findByName((bankAccountId)) ?: + return@transaction null + NexusBankTransactionEntity.find { + (NexusBankTransactionsTable.accountTransactionId eq uid) and + (NexusBankTransactionsTable.bankAccount eq account.id) + }.firstOrNull() + } +} diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt index 209c9384..f41a3729 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt @@ -44,9 +44,9 @@ import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.statements.api.ExposedBlob -import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.getLastMessagesTimes import tech.libeufin.nexus.bankaccount.getPaymentInitiation import tech.libeufin.nexus.iso20022.NexusPaymentInitiationData import tech.libeufin.nexus.iso20022.createPain001document @@ -149,6 +149,10 @@ private suspend fun fetchEbicsC5x( } } +/** + * Prepares key material and other EBICS details and + * returns them along a convenient object. + */ private fun getEbicsSubscriberDetailsInternal(subscriber: EbicsSubscriberEntity): EbicsClientSubscriberDetails { var bankAuthPubValue: RSAPublicKey? = null if (subscriber.bankAuthenticationPublicKey != null) { @@ -178,18 +182,19 @@ private fun getEbicsSubscriberDetailsInternal(subscriber: EbicsSubscriberEntity) ebicsHiaState = subscriber.ebicsHiaState ) } - +private fun getSubscriberFromConnection(connectionEntity: NexusBankConnectionEntity): EbicsSubscriberEntity = + transaction { + EbicsSubscriberEntity.find { + NexusEbicsSubscribersTable.nexusBankConnection eq connectionEntity.id + }.firstOrNull() ?: throw internalServerError("ebics bank connection '${connectionEntity.connectionId}' has no subscriber.") + } /** * Retrieve Ebics subscriber details given a bank connection. */ fun getEbicsSubscriberDetails(bankConnectionId: String): EbicsClientSubscriberDetails { - val transport = NexusBankConnectionEntity.findByName(bankConnectionId) - if (transport == null) { - throw NexusError(HttpStatusCode.NotFound, "transport not found") - } - val subscriber = EbicsSubscriberEntity.find { - NexusEbicsSubscribersTable.nexusBankConnection eq transport.id - }.first() + val transport = getBankConnection(bankConnectionId) + val subscriber = getSubscriberFromConnection(transport) + // transport exists and belongs to caller. return getEbicsSubscriberDetailsInternal(subscriber) } @@ -417,23 +422,7 @@ class EbicsBankConnectionProtocol: BankConnectionProtocol { accountId: String ): List<Exception>? { val subscriberDetails = transaction { getEbicsSubscriberDetails(bankConnectionId) } - val lastTimes = transaction { - val acct = NexusBankAccountEntity.findByName(accountId) - if (acct == null) { - throw NexusError( - HttpStatusCode.NotFound, - "Account '$accountId' not found" - ) - } - object { - val lastStatement = acct.lastStatementCreationTimestamp?.let { - ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) - } - val lastReport = acct.lastReportCreationTimestamp?.let { - ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) - } - } - } + val lastTimes = getLastMessagesTimes(accountId) /** * Will be filled with fetch instructions, according * to the parameters received from the client. @@ -533,61 +522,53 @@ class EbicsBankConnectionProtocol: BankConnectionProtocol { return errors return null } - /** - * Submit one Pain.001 for one payment initiations. - */ + // Submit one Pain.001 for one payment initiations. override suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: Long) { - val r = transaction { - val paymentInitiation = PaymentInitiationEntity.findById(paymentInitiationId) - ?: throw NexusError(HttpStatusCode.NotFound, "payment initiation not found") - val conn = paymentInitiation.bankAccount.defaultBankConnection - ?: throw NexusError(HttpStatusCode.NotFound, "no default bank connection available for submission") + val dbData = transaction { + val preparedPayment = getPaymentInitiation(paymentInitiationId) + val conn = preparedPayment.bankAccount.defaultBankConnection ?: throw NexusError(HttpStatusCode.NotFound, "no default bank connection available for submission") val subscriberDetails = getEbicsSubscriberDetails(conn.connectionId) val painMessage = createPain001document( NexusPaymentInitiationData( - debtorIban = paymentInitiation.bankAccount.iban, - debtorBic = paymentInitiation.bankAccount.bankCode, - debtorName = paymentInitiation.bankAccount.accountHolder, - currency = paymentInitiation.currency, - amount = paymentInitiation.sum.toString(), - creditorIban = paymentInitiation.creditorIban, - creditorName = paymentInitiation.creditorName, - creditorBic = paymentInitiation.creditorBic, - paymentInformationId = paymentInitiation.paymentInformationId, - preparationTimestamp = paymentInitiation.preparationDate, - subject = paymentInitiation.subject, - instructionId = paymentInitiation.instructionId, - endToEndId = paymentInitiation.endToEndId, - messageId = paymentInitiation.messageId + debtorIban = preparedPayment.bankAccount.iban, + debtorBic = preparedPayment.bankAccount.bankCode, + debtorName = preparedPayment.bankAccount.accountHolder, + currency = preparedPayment.currency, + amount = preparedPayment.sum, + creditorIban = preparedPayment.creditorIban, + creditorName = preparedPayment.creditorName, + creditorBic = preparedPayment.creditorBic, + paymentInformationId = preparedPayment.paymentInformationId, + preparationTimestamp = preparedPayment.preparationDate, + subject = preparedPayment.subject, + instructionId = preparedPayment.instructionId, + endToEndId = preparedPayment.endToEndId, + messageId = preparedPayment.messageId ) ) - logger.debug("Sending Pain.001: ${paymentInitiation.paymentInformationId}," + - " for payment: '${paymentInitiation.subject}'") - if (!XMLUtil.validateFromString(painMessage)) { - logger.error("Pain.001 ${paymentInitiation.paymentInformationId}" + - " is invalid, not submitting it and flag as invalid.") - val payment = getPaymentInitiation(paymentInitiationId) - payment.invalid = true - // The following commit prevents the thrown error - // to lose the database transaction data. - TransactionManager.current().commit() - throw NexusError( - HttpStatusCode.InternalServerError, - "Attempted Pain.001 (${paymentInitiation.paymentInformationId})" + - " message is invalid. Not sent to the bank.", - LibeufinErrorCode.LIBEUFIN_EC_INVALID_STATE - ) - } object { + val painXml = painMessage val subscriberDetails = subscriberDetails - val painMessage = painMessage } } + if (!XMLUtil.validateFromString(dbData.painXml)) { + val pmtInfId = transaction { + val payment = getPaymentInitiation(paymentInitiationId) + logger.error("Pain.001 ${payment.paymentInformationId}" + + " is invalid, not submitting it and flag as invalid.") + payment.invalid = true + } + throw NexusError( + HttpStatusCode.InternalServerError, + "pain document: $pmtInfId document is invalid. Not sent to the bank.", + LibeufinErrorCode.LIBEUFIN_EC_INVALID_STATE + ) + } doEbicsUploadTransaction( httpClient, - r.subscriberDetails, + dbData.subscriberDetails, "CCT", - r.painMessage.toByteArray(Charsets.UTF_8), + dbData.painXml.toByteArray(Charsets.UTF_8), EbicsStandardOrderParams() ) transaction { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt index 98e92a46..52627b66 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt @@ -22,15 +22,21 @@ */ package tech.libeufin.nexus.iso20022 +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* +import io.ktor.util.reflect.* +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.transaction import org.w3c.dom.Document -import tech.libeufin.nexus.NexusAssert -import tech.libeufin.nexus.NexusError +import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.IngestedTransactionsCount +import tech.libeufin.nexus.bankaccount.findDuplicate import tech.libeufin.nexus.server.CurrencyAmount +import tech.libeufin.nexus.server.toPlainString import tech.libeufin.util.* -import java.math.BigDecimal import java.time.Instant import java.time.ZoneId import java.time.ZonedDateTime @@ -40,23 +46,6 @@ enum class CreditDebitIndicator { DBIT, CRDT } -enum class EntryStatus { - /** - * Booked - */ - BOOK, - - /** - * Pending - */ - PDNG, - - /** - * Informational - */ - INFO, -} - enum class CashManagementResponseType(@get:JsonValue val jsonName: String) { Report("report"), Statement("statement"), Notification("notification") } @@ -268,7 +257,7 @@ data class ReturnInfo( ) data class BatchTransaction( - val amount: CurrencyAmount, + val amount: CurrencyAmount, // Fuels Taler withdrawal amount. val creditDebitIndicator: CreditDebitIndicator, val details: TransactionDetails ) @@ -329,7 +318,53 @@ data class CamtBankAccountEntry( // list of sub-transactions participating in this money movement. val batches: List<Batch>? -) +) { + /** + * This function returns the subject of the unique transaction + * accounted in this object. If the transaction is not unique, + * it throws an exception. NOTE: the caller has the responsibility + * of not passing an empty report; those usually should be discarded + * and never participate in the application logic. + */ + @JsonIgnore + fun getSingletonSubject(): String { + // Checks that the given list contains only one element and returns it. + fun <T>checkAndGetSingleton(maybeTxs: List<T>?): T { + if (maybeTxs == null || maybeTxs.size > 1) throw internalServerError( + "Only a singleton transaction is " + + "allowed inside ${this.javaClass}." + ) + return maybeTxs[0] + } + /** + * Types breakdown until the last payment information is reached. + * + * CamtBankAccountEntry contains: + * - Batch 0 + * - Batch 1 + * - Batch N + * + * Batch X contains: + * - BatchTransaction 0 + * - BatchTransaction 1 + * - BatchTransaction N + * + * BatchTransaction X contains: + * - TransactionDetails + * + * TransactionDetails contains the involved parties + * and the payment subject but MAY NOT contain the amount. + * In this model, the amount is held in the BatchTransaction + * type, that is also -- so far -- required to be a singleton + * inside Batch. + */ + checkAndGetSingleton<Batch>(this.batches) + val batchTransactions = this.batches?.get(0)?.batchTransactions + val tx = checkAndGetSingleton<BatchTransaction>(batchTransactions) + val details: TransactionDetails = tx.details + return details.unstructuredRemittanceInformation + } +} class CamtParsingError(msg: String) : Exception(msg) @@ -861,7 +896,10 @@ private fun XmlElementDestructor.extractInnerTransactions(): CamtReport { instructedAmount = instructedAmount, creditDebitIndicator = creditDebitIndicator, bankTransactionCode = btc, - batches = extractBatches(amount, creditDebitIndicator, acctSvcrRef ?: "AcctSvcrRef not given/found"), + batches = extractBatches( + amount, + creditDebitIndicator, + acctSvcrRef ?: "AcctSvcrRef not given/found"), bookingDate = maybeUniqueChildNamed("BookgDt") { extractDateOrDateTime() }, valueDate = maybeUniqueChildNamed("ValDt") { extractDateOrDateTime() }, accountServicerRef = acctSvcrRef, @@ -936,3 +974,155 @@ fun parseCamtMessage(doc: Document): CamtParseResult { } } } + +/** + * Given that every CaMt is a collection of reports/statements + * where each of them carries the bank account balance and a list + * of transactions, this function: + * + * - extracts the balance (storing a NexusBankBalanceEntity) + * - updates timestamps in NexusBankAccountEntity to the last seen + * report/statement. + * - finds which transactions were already downloaded. + * - stores a new NexusBankTransactionEntity for each new tx +accounted in the report/statement. + * - tries to link the new transaction with a submitted one, in + * case of DBIT transaction. + * - returns a IngestedTransactionCount object. + */ +fun processCamtMessage( + bankAccountId: String, + camtDoc: Document, + /** + * FIXME: should NOT be C52/C53 but "report" or "statement". + * The reason is that C52/C53 are NOT CaMt, they are EBICS names. + */ + code: String +): IngestedTransactionsCount { + var newTransactions = 0 + var downloadedTransactions = 0 + transaction { + val acct = NexusBankAccountEntity.findByName(bankAccountId) + if (acct == null) { + throw NexusError(HttpStatusCode.NotFound, "user not found") + } + val res = try { parseCamtMessage(camtDoc) } catch (e: CamtParsingError) { + logger.warn("Invalid CAMT received from bank: $e") + newTransactions = -1 + return@transaction + } + res.reports.forEach { + NexusAssert( + it.account.iban == acct.iban, + "Nexus hit a report or statement of a wrong IBAN!" + ) + it.balances.forEach { b -> + if (b.type == "CLBD") { + val lastBalance = NexusBankBalanceEntity.all().lastOrNull() + /** + * Store balances different from the one that came from the bank, + * or the very first balance. This approach has the following inconvenience: + * the 'balance' held at Nexus does not differentiate between one + * coming from a statement and one coming from a report. As a consequence, + * the two types of balances may override each other without notice. + */ + if ((lastBalance == null) || + (b.amount.toPlainString() != lastBalance.balance)) { + NexusBankBalanceEntity.new { + bankAccount = acct + balance = b.amount.toPlainString() + creditDebitIndicator = b.creditDebitIndicator.name + date = b.date + } + } + } + } + } + // Updating the local bank account state timestamps according to the current document. + val stamp = ZonedDateTime.parse( + res.creationDateTime, + DateTimeFormatter.ISO_DATE_TIME + ).toInstant().toEpochMilli() + when (code) { + "C52" -> { + val s = acct.lastReportCreationTimestamp + /** + * FIXME. + * The following check seems broken, as it ONLY sets the value when + * s is non-null BUT s gets never set; not even with a default value. + * That didn't break so far because the timestamp gets only used when + * the fetch specification has "since-last" for the time range. Never + * used. + */ + if (s != null && stamp > s) { + acct.lastReportCreationTimestamp = stamp + } + } + "C53" -> { + val s = acct.lastStatementCreationTimestamp + if (s != null && stamp > s) { + acct.lastStatementCreationTimestamp = stamp + } + } + } + val entries: List<CamtBankAccountEntry> = res.reports.map { it.entries }.flatten() + var newPaymentsLog = "" + downloadedTransactions = entries.size + txloop@ for (entry: CamtBankAccountEntry in entries) { + val singletonBatchedTransaction: BatchTransaction = entry.batches?.get(0)?.batchTransactions?.get(0) + ?: throw NexusError( + HttpStatusCode.InternalServerError, + "Singleton money movements policy wasn't respected" + ) + val acctSvcrRef = entry.accountServicerRef + if (acctSvcrRef == null) { + // FIXME(dold): Report this! + logger.error("missing account servicer reference in transaction") + continue + } + val duplicate = findDuplicate(bankAccountId, acctSvcrRef) + if (duplicate != null) { + logger.info("Found a duplicate (acctSvcrRef): $acctSvcrRef") + // FIXME(dold): See if an old transaction needs to be superseded by this one + // https://bugs.gnunet.org/view.php?id=6381 + continue@txloop + } + val rawEntity = NexusBankTransactionEntity.new { + bankAccount = acct + accountTransactionId = acctSvcrRef + amount = singletonBatchedTransaction.amount.value + currency = singletonBatchedTransaction.amount.currency + transactionJson = jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(entry) + creditDebitIndicator = singletonBatchedTransaction.creditDebitIndicator.name + status = entry.status + } + rawEntity.flush() + newTransactions++ + newPaymentsLog += "\n- " + entry.batches[0].batchTransactions[0].details.unstructuredRemittanceInformation + // This block tries to acknowledge a former outgoing payment as booked. + if (singletonBatchedTransaction.creditDebitIndicator == CreditDebitIndicator.DBIT) { + val t0 = singletonBatchedTransaction.details + val pmtInfId = t0.paymentInformationId + if (pmtInfId != null) { + val paymentInitiation = PaymentInitiationEntity.find { + PaymentInitiationsTable.bankAccount eq acct.id and ( + // pmtInfId is a value that the payment submitter + // asked the bank to associate with the payment to be made. + PaymentInitiationsTable.paymentInformationId eq pmtInfId) + + }.firstOrNull() + if (paymentInitiation != null) { + logger.info("Could confirm one initiated payment: $pmtInfId") + paymentInitiation.confirmationTransaction = rawEntity + } + } + } + } + if (newTransactions > 0) + logger.debug("Camt $code '${res.messageId}' has new payments:${newPaymentsLog}") + } + return IngestedTransactionsCount( + newTransactions = newTransactions, + downloadedTransactions = downloadedTransactions + ) +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt new file mode 100644 index 00000000..8c01705a --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt @@ -0,0 +1,85 @@ +package tech.libeufin.nexus.server + +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.NexusBankConnectionEntity +import tech.libeufin.nexus.NexusBankConnectionsTable +import tech.libeufin.util.internalServerError +import tech.libeufin.util.notFound + +/** + * FIXME: + * enum type names were introduced after 0.9.2 and need to + * be employed wherever now type names are passed as plain + * strings. + */ + +// Valid connection types. +enum class BankConnectionType(val typeName: String) { + EBICS("ebics"), + X_LIBEUFIN_BANK("x-taler-bank"); + companion object { + /** + * This method takes legacy bank connection type names as input + * and _tries_ to return the correspondent enum type. This + * fixes the cases where bank connection types are passed as + * easy-to-break arbitrary strings; eventually this method should + * be discarded and only enum types be passed as connection type names. + */ + fun parseBankConnectionType(typeName: String): BankConnectionType { + return when(typeName) { + "ebics" -> EBICS + "x-libeufin-bank" -> X_LIBEUFIN_BANK + else -> throw internalServerError( + "Cannot extract ${this::class.java.typeName}' instance from name: $typeName'" + ) + } + } + } +} +// Valid facade types +enum class NexusFacadeType(val facadeType: String) { + TALER("taler-wire-gateway"), + ANASTASIS("anastasis") +} + +/** + * These types point at the _content_ brought by bank connections. + * The following stack depicts the layering of banking communication + * as modeled here in Nexus. On top the most inner layer. + * + * -------------------- + * Banking data type + * -------------------- + * Bank connection type + * -------------------- + * HTTP + * -------------------- + * + * Once the banking data type arrives to the local database, facades + * types MAY apply further processing to it. + * + * For example, a Taler facade WILL look for Taler-meaningful wire + * subjects and act accordingly. Even without a facade, the Nexus + * native HTTP API picks instances of banking data and extracts its + * details to serve to the client. + * + * NOTE: this type MAY help but is NOT essential, as each connection + * is USUALLY tied with the same banking data type. For example, EBICS + * brings CaMt, and x-libeufin-bank bring its own (same-named x-libeufin-bank) + * banking data type. + */ +enum class BankingDataType { + X_LIBEUFIN_BANK, + CAMT +} + +// Gets connection or throws. +fun getBankConnection(connId: String): NexusBankConnectionEntity { + val maybeConn = transaction { + NexusBankConnectionEntity.find { + NexusBankConnectionsTable.connectionId eq connId + }.firstOrNull() + } + if (maybeConn == null) throw notFound("Bank connection $connId not found") + return maybeConn +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt index 38ab4236..7fdfd526 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt @@ -32,10 +32,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonSerialize import com.fasterxml.jackson.databind.deser.std.StdDeserializer import com.fasterxml.jackson.databind.ser.std.StdSerializer +import tech.libeufin.nexus.EntryStatus import tech.libeufin.nexus.iso20022.CamtBankAccountEntry -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.util.* -import java.math.BigDecimal import java.time.Instant import java.time.ZoneId import java.time.ZonedDateTime @@ -251,6 +250,17 @@ data class EbicsNewTransport( val systemID: String? ) +/** + * Credentials and URL to access Sandbox and talk JSON to it. + * See https://docs.taler.net/design-documents/038-demobanks-protocol-suppliers.html#static-x-libeufin-bank-with-dynamic-demobank + * for an introduction on x-libeufin-bank. + */ +data class XLibeufinBankTransport( + val username: String, + val password: String, + val baseUrl: String +) + /** Response type of "GET /prepared-payments/{uuid}" */ data class PaymentStatus( val paymentInitiationId: String, @@ -262,10 +272,6 @@ data class PaymentStatus( val subject: String, val submissionDate: String?, val preparationDate: String, - // null when the payment was never acknowledged by - // the bank. For example, it was submitted but never - // seen in any report; or only created and not even - // submitted. val status: EntryStatus? ) @@ -346,8 +352,9 @@ data class BankMessageList( ) data class BankMessageInfo( - val messageId: String, - val code: String, + // x-libeufin-bank messages do not have any ID or code. + val messageId: String?, + val code: String?, val length: Long ) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt index ebe002fc..fb864d3b 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -53,6 +53,7 @@ import tech.libeufin.nexus.* import tech.libeufin.nexus.bankaccount.* import tech.libeufin.nexus.ebics.* import tech.libeufin.nexus.iso20022.CamtBankAccountEntry +import tech.libeufin.nexus.iso20022.processCamtMessage import tech.libeufin.util.* import java.net.BindException import java.net.URLEncoder @@ -739,7 +740,7 @@ val nexusApp: Application.() -> Unit = { var statusCode = HttpStatusCode.OK /** * Client errors are unlikely here, because authentication - * and JSON validity fail earlier. Hence either Nexus or the + * and JSON validity fail earlier. Hence, either Nexus or the * bank had a problem. NOTE: because this handler triggers multiple * fetches, it is ALSO possible that although one error is reported, * SOME transactions made it to the database! diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt new file mode 100644 index 00000000..b819163a --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt @@ -0,0 +1,395 @@ +package tech.libeufin.nexus.xlibeufinbank + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.server.util.* +import io.ktor.util.* +import org.jetbrains.exposed.sql.statements.api.ExposedBlob +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.* +import tech.libeufin.nexus.iso20022.* +import tech.libeufin.nexus.server.* +import tech.libeufin.util.XLibeufinBankDirection +import tech.libeufin.util.XLibeufinBankTransaction +import tech.libeufin.util.badRequest +import tech.libeufin.util.internalServerError +import java.net.MalformedURLException +import java.net.URL + +// Gets Sandbox URL and credentials, taking the connection name as input. +fun getXLibeufinBankCredentials(conn: NexusBankConnectionEntity): XLibeufinBankTransport { + val maybeCredentials = transaction { + XLibeufinBankUserEntity.find { + XLibeufinBankUsersTable.nexusBankConnection eq conn.id + }.firstOrNull() + } + if (maybeCredentials == null) throw internalServerError( + "Existing connection ${conn.connectionId} has no transport details" + ) + return XLibeufinBankTransport( + username = maybeCredentials.username, + password = maybeCredentials.password, + baseUrl = maybeCredentials.baseUrl + ) +} +fun getXLibeufinBankCredentials(connId: String): XLibeufinBankTransport { + val conn = getBankConnection(connId) + return getXLibeufinBankCredentials(conn) + +} + +class XlibeufinBankConnectionProtocol : BankConnectionProtocol { + override suspend fun connect(client: HttpClient, connId: String) { + TODO("Not yet implemented") + } + + override suspend fun fetchAccounts(client: HttpClient, connId: String) { + throw NotImplementedError("x-libeufin-bank does not need to fetch accounts") + } + + override fun createConnectionFromBackup( + connId: String, + user: NexusUserEntity, + passphrase: String?, + backup: JsonNode + ) { + TODO("Not yet implemented") + } + + override fun createConnection( + connId: String, + user: NexusUserEntity, + data: JsonNode) { + + val bankConn = transaction { + NexusBankConnectionEntity.new { + this.connectionId = connId + owner = user + type = "x-libeufin-bank" + } + } + val newTransportData = jacksonObjectMapper().treeToValue( + data, XLibeufinBankTransport::class.java + ) ?: throw badRequest("x-libeufin-bank details not found in the request") + // Validate the base URL + try { URL(newTransportData.baseUrl).toURI() } + catch (e: MalformedURLException) { + throw badRequest("Base URL (${newTransportData.baseUrl}) is invalid.") + } + transaction { + XLibeufinBankUserEntity.new { + username = newTransportData.username + password = newTransportData.password + // Only addressing mild cases where ONE slash ends the base URL. + baseUrl = newTransportData.baseUrl.dropLastWhile { it == '/' } + nexusBankConnection = bankConn + } + } + } + + override fun getConnectionDetails(conn: NexusBankConnectionEntity): JsonNode { + TODO("Not yet implemented") + } + + override fun exportBackup(bankConnectionId: String, passphrase: String): JsonNode { + TODO("Not yet implemented") + } + + override fun exportAnalogDetails(conn: NexusBankConnectionEntity): ByteArray { + throw NotImplementedError("x-libeufin-bank does not need analog details") + } + + override suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: Long) { + TODO("Not yet implemented") + } + + override suspend fun fetchTransactions( + fetchSpec: FetchSpecJson, + client: HttpClient, + bankConnectionId: String, + accountId: String + ): List<Exception>? { + val conn = getBankConnection(bankConnectionId) + /** + * Note: fetchSpec.level is ignored because Sandbox does not + * differentiate between booked and non-booked transactions. + * Just logging if the unaware client specified non-REPORT for + * the level. FIXME: docs have to mention this. + */ + if (fetchSpec.level == FetchLevel.REPORT || fetchSpec.level == FetchLevel.ALL) + throw badRequest("level '${fetchSpec.level}' on x-libeufin-bank" + + "connection (${conn.connectionId}) is not supported:" + + " bank has only 'booked' state." + ) + // Get credentials + val credentials = getXLibeufinBankCredentials(conn) + /** + * Now builds the URL to ask the transactions, according to the + * FetchSpec gotten in the args. Level 'statement' and time range + * 'previous-dayes' are NOT implemented. + */ + val baseUrl = URL(credentials.baseUrl) + val fetchUrl = url { + protocol = URLProtocol(name = baseUrl.protocol, defaultPort = -1) + appendPathSegments( + baseUrl.path.dropLastWhile { it == '/' }, + "accounts/${credentials.username}/transactions") + when (fetchSpec) { + // Gets the last 5 transactions + is FetchSpecLatestJson -> { + // Do nothing, the bare endpoint gets the last 5 txs by default. + } + /* Defines the from_ms URI param. according to the last transaction + * timestamp that was seen in this connection */ + is FetchSpecSinceLastJson -> { + val localBankAccount = getBankAccount(accountId) + val lastMessagesTimes = getLastMessagesTimes(localBankAccount) + // Sandbox doesn't have report vs. statement, defaulting to report time + // and so does the ingestion routine when storing the last message time. + this.parameters["from_ms"] = "${lastMessagesTimes.lastStatement ?: 0}" + } + // This wants ALL the transactions, hence it sets the from_ms to zero. + is FetchSpecAllJson -> { + this.parameters["from_ms"] = "0" + } + else -> throw NexusError( + HttpStatusCode.NotImplemented, + "FetchSpec ${fetchSpec::class} not supported" + ) + } + } + logger.debug("Requesting x-libeufin-bank transactions to: $fetchUrl") + val resp: HttpResponse = try { + client.get(fetchUrl) { + expectSuccess = true + contentType(ContentType.Application.Json) + basicAuth(credentials.username, credentials.password) + } + } catch (e: Exception) { + e.printStackTrace() + logger.error(e.message) + return listOf(e) + } + val respBlob = resp.bodyAsChannel().toByteArray() + transaction { + NexusBankMessageEntity.new { + bankConnection = conn + message = ExposedBlob(respBlob) + } + } + return null + } +} + +/** + * Parses one x-libeufin-bank message and INSERTs Nexus local + * transaction records into the database. After this function + * returns, the transactions are ready to both being communicated + * to the CLI via the native JSON interface OR being further processed + * by ANY facade. + * + * This function: + * - updates the local timestamps related to the latest report. + * - inserts a new NexusBankTransactionEntity. To achieve that, it extracts the: + * -- amount + * -- credit/debit indicator + * -- currency + * + * Note: in contrast to what the CaMt handler does, here there's NO + * status, since Sandbox has only one (unnamed) transaction state and + * all transactions are asked as reports. + */ +fun processXLibeufinBankMessage( + bankAccountId: String, + data: JsonNode +): IngestedTransactionsCount { + data class XLibeufinBankTransactions( + val transactions: List<XLibeufinBankTransaction> + ) + val txs = try { + jacksonObjectMapper().treeToValue( + data, + XLibeufinBankTransactions::class.java + ) + } catch (e: Exception) { + throw NexusError( + HttpStatusCode.BadGateway, + "The bank sent invalid x-libeufin-bank transactions." + ) + } + val bankAccount = getBankAccount(bankAccountId) + var newTxs = 0 // Counts how many transactions are new. + txs.transactions.forEach { + val maybeTimestamp = try { + it.date.toLong() + } catch (e: Exception) { + throw NexusError( + HttpStatusCode.BadGateway, + "The bank gave an invalid timestamp " + + "for x-libeufin-bank message: ${it.uid}" + ) + } + // Searching for duplicates. + if (findDuplicate(bankAccountId, it.uid) != null) { + logger.debug( + "x-libeufin-bank ingestion: transaction ${it.uid} is a duplicate, skipping." + ) + return@forEach + } + val direction = if (it.debtorIban == bankAccount.iban) + XLibeufinBankDirection.DEBIT else XLibeufinBankDirection.CREDIT + // New tx, storing it. + transaction { + val localTx = NexusBankTransactionEntity.new { + this.bankAccount = bankAccount + this.amount = it.amount + this.currency = it.currency + /** + * Sandbox has only booked state for its transactions: as soon as + * one payment makes it to the database, that is the final (booked) + * state. + */ + this.status = EntryStatus.BOOK + this.accountTransactionId = it.uid + this.transactionJson = jacksonObjectMapper( + ).writeValueAsString(it.exportAsCamtModel()) + this.creditDebitIndicator = direction.direction + newTxs++ + logger.debug("x-libeufin-bank transaction with subject '${it.subject}' ingested.") + } + /** + * The following block tries to reconcile a previous prepared + * (outgoing) payment with the one being iterated over. + */ + if (direction == XLibeufinBankDirection.DEBIT) { + val maybePrepared = getPaymentInitiation(pmtInfId = it.uid) + if (maybePrepared != null) maybePrepared.confirmationTransaction = localTx + } + // x-libeufin-bank transactions are ALWAYS modeled as reports + // in Nexus, because such bank protocol supplier doesn't have + // the report vs. statement distinction. Therefore, we only + // consider the last report timestamp. + if ((bankAccount.lastStatementCreationTimestamp ?: 0L) < maybeTimestamp) + bankAccount.lastStatementCreationTimestamp = maybeTimestamp + } + } + return IngestedTransactionsCount( + newTransactions = newTxs, + downloadedTransactions = txs.transactions.size + ) +} + +fun XLibeufinBankTransaction.exportCamtDirectionIndicator(): CreditDebitIndicator = + if (this.direction == XLibeufinBankDirection.CREDIT) + CreditDebitIndicator.CRDT else CreditDebitIndicator.DBIT + +/** + * This function transforms an x-libeufin-bank transaction + * into the JSON representation of CaMt used by Nexus along + * its processing. Notably, this helps to stick to one unified + * type when facades process transactions. + */ +fun XLibeufinBankTransaction.exportAsCamtModel(): CamtBankAccountEntry = + CamtBankAccountEntry( + /** + * Amount obtained by summing all the transactions accounted + * in this report/statement. Here this field equals the amount of the + * _unique_ transaction accounted. + */ + amount = CurrencyAmount(currency = this.currency, value = this.amount), + accountServicerRef = this.uid, + bankTransactionCode = "Not given", + bookingDate = this.date, + counterValueAmount = null, + creditDebitIndicator = this.exportCamtDirectionIndicator(), + currencyExchange = null, + entryRef = null, + instructedAmount = null, + valueDate = null, + status = EntryStatus.BOOK, // x-libeufin-bank always/only BOOK. + /** + * This field accounts for the _unique_ transaction that this + * object represents. + */ + batches = listOf( + Batch( + messageId = null, + paymentInformationId = this.uid, + batchTransactions = listOf( + BatchTransaction( + amount = CurrencyAmount( + currency = this.currency, + value = this.amount + ), + creditDebitIndicator = this.exportCamtDirectionIndicator(), + details = TransactionDetails( + debtor = PartyIdentification( + name = this.debtorName, + countryOfResidence = null, + organizationId = null, + otherId = null, + postalAddress = null, + privateId = null + ), + debtorAccount = CashAccount( + name = null, + currency = this.currency, + iban = this.debtorIban, + otherId = null + ), + debtorAgent = AgentIdentification( + name = null, + bic = this.debtorBic, + clearingSystemCode = null, + clearingSystemMemberId = null, + lei = null, + otherId = null, + postalAddress = null, + proprietaryClearingSystemCode = null + ), + counterValueAmount = null, + currencyExchange = null, + interBankSettlementAmount = null, + proprietaryPurpose = null, + purpose = null, + returnInfo = null, + ultimateCreditor = null, + ultimateDebtor = null, + unstructuredRemittanceInformation = this.subject, + instructedAmount = null, + creditor = PartyIdentification( + name = this.creditorName, + countryOfResidence = null, + organizationId = null, + otherId = null, + postalAddress = null, + privateId = null + ), + creditorAccount = CashAccount( + name = null, + currency = this.currency, + iban = this.creditorIban, + otherId = null + ), + creditorAgent = AgentIdentification( + name = null, + bic = this.creditorBic, + clearingSystemCode = null, + clearingSystemMemberId = null, + lei = null, + otherId = null, + postalAddress = null, + proprietaryClearingSystemCode = null + ) + ) + ) + ) + ) + ) + )
\ No newline at end of file |