diff options
Diffstat (limited to 'nexus/src')
12 files changed, 1080 insertions, 320 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt index bdaa0ec3..2048813e 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Auth.kt @@ -9,6 +9,13 @@ import tech.libeufin.nexus.server.Permission import tech.libeufin.nexus.server.PermissionQuery import tech.libeufin.util.* +fun getNexusUser(username: String): NexusUserEntity = + transaction { + NexusUserEntity.find { + NexusUsersTable.username eq username + }.firstOrNull() ?: throw notFound("User $username not found.") + } + /** * HTTP basic auth. Throws error if password is wrong, * and makes sure that the user exists in the system. diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt index 6a18db21..ac52095c 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/BankConnectionProtocol.kt @@ -23,11 +23,12 @@ import com.fasterxml.jackson.databind.JsonNode import io.ktor.client.HttpClient import io.ktor.http.HttpStatusCode import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.server.BankConnectionType import tech.libeufin.nexus.server.FetchSpecJson // 'const' allows only primitive types. -val bankConnectionRegistry: Map<String, BankConnectionProtocol> = mapOf( - "ebics" to EbicsBankConnectionProtocol() +val bankConnectionRegistry: Map<BankConnectionType, BankConnectionProtocol> = mapOf( + BankConnectionType.EBICS to EbicsBankConnectionProtocol() ) interface BankConnectionProtocol { @@ -64,9 +65,13 @@ interface BankConnectionProtocol { * * This function returns a possibly empty list of exceptions. * That helps not to stop fetching if ONE operation fails. Notably, - * C52 and C53 may be asked along one invocation of this function, + * C52 _and_ C53 may be asked along one invocation of this function, * therefore storing the exception on C52 allows the C53 to still * take place. The caller then decides how to handle the exceptions. + * + * More on multi requests: C52 and C53, or more generally 'reports' + * and 'statements' are tried to be downloaded together when the fetch + * level is set to ALL. */ suspend fun fetchTransactions( fetchSpec: FetchSpecJson, @@ -76,9 +81,18 @@ interface BankConnectionProtocol { ): List<Exception>? } -fun getConnectionPlugin(connId: String): BankConnectionProtocol { - return bankConnectionRegistry.get(connId) ?: throw NexusError( +fun getConnectionPlugin(connType: BankConnectionType): BankConnectionProtocol { + return bankConnectionRegistry[connType] ?: throw NexusError( HttpStatusCode.NotFound, - "Connection type '${connId}' not available" + "Connection type '${connType}' not available" ) +} + +/** + * Adaptor helper to keep until all the connection type mentions will + * be passed as BankConnectionType instead of arbitrary easy-to-break + * string. + */ +fun getConnectionPlugin(connType: String): BankConnectionProtocol { + return getConnectionPlugin(BankConnectionType.parseBankConnectionType(connType)) }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt index f0af4499..38fb7bd3 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -19,17 +19,28 @@ package tech.libeufin.nexus +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.jetbrains.exposed.dao.* import org.jetbrains.exposed.dao.id.EntityID import org.jetbrains.exposed.dao.id.LongIdTable import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.util.EbicsInitState import java.sql.Connection +import kotlin.reflect.typeOf +enum class EntryStatus { + // Booked + BOOK, + // Pending + PDNG, + // Informational + INFO, +} + /** * This table holds the values that exchange gave to issue a payment, * plus a reference to the prepared pain.001 version of. Note that @@ -134,15 +145,36 @@ class NexusBankBalanceEntity(id: EntityID<Long>) : LongEntity(id) { var date by NexusBankBalancesTable.date } +// This table holds the data to talk to Sandbox +// via the x-libeufin-bank protocol supplier. +object XLibeufinBankUsersTable : LongIdTable() { + val username = text("username") + val password = text("password") + val baseUrl = text("baseUrl") + val nexusBankConnection = reference("nexusBankConnection", NexusBankConnectionsTable) +} + +class XLibeufinBankUserEntity(id: EntityID<Long>) : LongEntity(id) { + companion object : LongEntityClass<XLibeufinBankUserEntity>(XLibeufinBankUsersTable) + var username by XLibeufinBankUsersTable.username + var password by XLibeufinBankUsersTable.password + var baseUrl by XLibeufinBankUsersTable.baseUrl + var nexusBankConnection by NexusBankConnectionEntity referencedOn XLibeufinBankUsersTable.nexusBankConnection +} + /** * Table that stores all messages we receive from the bank. + * The nullable fields were introduced along the x-libeufin-bank + * connection, as those messages are plain JSON object unlike + * the more structured CaMt. */ object NexusBankMessagesTable : LongIdTable() { val bankConnection = reference("bankConnection", NexusBankConnectionsTable) - val messageId = text("messageId") - val code = text("code") val message = blob("message") - val errors = bool("errors").default(false) // true when the parser could not ingest one message. + val messageId = text("messageId").nullable() + val code = text("code").nullable() + // true when the parser could not ingest one message: + val errors = bool("errors").default(false) } class NexusBankMessageEntity(id: EntityID<Long>) : LongEntity(id) { @@ -188,6 +220,21 @@ class NexusBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) { var transactionJson by NexusBankTransactionsTable.transactionJson var accountTransactionId by NexusBankTransactionsTable.accountTransactionId val updatedBy by NexusBankTransactionEntity optionalReferencedOn NexusBankTransactionsTable.updatedBy + + /** + * It is responsibility of the caller to insert only valid + * JSON into the database, and therefore provide error management + * when calling the two helpers below. + */ + + inline fun <reified T> parseDetailsIntoObject(): T { + val mapper = jacksonObjectMapper() + return mapper.readValue(this.transactionJson, T::class.java) + } + fun parseDetailsIntoObject(): JsonNode { + val mapper = jacksonObjectMapper() + return mapper.readTree(this.transactionJson) + } } /** @@ -459,9 +506,7 @@ object NexusPermissionsTable : LongIdTable() { val subjectId = text("subjectName") val permissionName = text("permissionName") - init { - uniqueIndex(resourceType, resourceId, subjectType, subjectId, permissionName) - } + init { uniqueIndex(resourceType, resourceId, subjectType, subjectId, permissionName) } } class NexusPermissionEntity(id: EntityID<Long>) : LongEntity(id) { @@ -479,6 +524,7 @@ fun dbDropTables(dbConnectionString: String) { transaction { SchemaUtils.drop( NexusUsersTable, + XLibeufinBankUsersTable, PaymentInitiationsTable, NexusEbicsSubscribersTable, NexusBankAccountsTable, @@ -504,6 +550,7 @@ fun dbCreateTables(dbConnectionString: String) { TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE transaction { SchemaUtils.create( + XLibeufinBankUsersTable, NexusScheduledTasksTable, NexusUsersTable, PaymentInitiationsTable, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt index 7fdd2c26..c908a828 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/FacadeUtil.kt @@ -2,47 +2,60 @@ package tech.libeufin.nexus import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* +import org.jetbrains.exposed.dao.flushCache import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.nexus.iso20022.CamtBankAccountEntry import tech.libeufin.nexus.iso20022.CreditDebitIndicator -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.nexus.iso20022.TransactionDetails +import tech.libeufin.nexus.server.NexusFacadeType - -/** - * Mainly used to resort the last processed transaction ID. - */ +// Mainly used to resort the last processed transaction ID. fun getFacadeState(fcid: String): FacadeStateEntity { - val facade = FacadeEntity.find { FacadesTable.facadeName eq fcid }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find facade '${fcid}'" - ) - return FacadeStateEntity.find { - FacadeStateTable.facade eq facade.id.value - }.firstOrNull() ?: throw NexusError( - HttpStatusCode.NotFound, - "Could not find any state for facade: $fcid" - ) + return transaction { + val facade = FacadeEntity.find { + FacadesTable.facadeName eq fcid + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find facade '${fcid}'" + ) + FacadeStateEntity.find { + FacadeStateTable.facade eq facade.id.value + }.firstOrNull() ?: throw NexusError( + HttpStatusCode.NotFound, + "Could not find any state for facade: $fcid" + ) + } } fun getFacadeBankAccount(fcid: String): NexusBankAccountEntity { - val facadeState = getFacadeState(fcid) - return NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( - HttpStatusCode.NotFound, - "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" - ) + return transaction { + val facadeState = getFacadeState(fcid) + NexusBankAccountEntity.findByName(facadeState.bankAccount) ?: throw NexusError( + HttpStatusCode.NotFound, + "The facade: $fcid doesn't manage bank account: ${facadeState.bankAccount}" + ) + } } /** * Ingests transactions for those facades accounting for bankAccountId. + * 'incomingFilterCb' decides whether the facade accepts the payment; + * if not, refundCb prepares a refund. The 'txStatus' parameter decides + * at which state one transaction deserve to fuel Taler transactions. BOOK + * is conservative, and with some banks the delay can be significant. PNDG + * instead reacts faster, but risks that one transaction gets undone by the + * bank and never reach the BOOK state; this would mean a loss and/or admin + * burden. */ fun ingestFacadeTransactions( bankAccountId: String, - facadeType: String, + facadeType: NexusFacadeType, incomingFilterCb: ((NexusBankTransactionEntity, TransactionDetails) -> Unit)?, - refundCb: ((NexusBankAccountEntity, Long) -> Unit)? + refundCb: ((NexusBankAccountEntity, Long) -> Unit)?, + txStatus: EntryStatus = EntryStatus.BOOK ) { fun ingest(bankAccount: NexusBankAccountEntity, facade: FacadeEntity) { logger.debug( @@ -55,18 +68,21 @@ fun ingestFacadeTransactions( /** Those with "our" bank account involved */ NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and /** Those that are booked */ - (NexusBankTransactionsTable.status eq EntryStatus.BOOK) and + (NexusBankTransactionsTable.status eq txStatus) and /** Those that came later than the latest processed payment */ (NexusBankTransactionsTable.id.greater(lastId)) }.orderBy(Pair(NexusBankTransactionsTable.id, SortOrder.ASC)).forEach { // Incoming payment. - logger.debug("Facade checks payment: ${it.transactionJson}") val tx = jacksonObjectMapper().readValue( - it.transactionJson, CamtBankAccountEntry::class.java + it.transactionJson, + CamtBankAccountEntry::class.java ) - val details = tx.batches?.get(0)?.batchTransactions?.get(0)?.details + /** + * Need transformer from "JSON tx" to TransactionDetails?. + */ + val details: TransactionDetails? = tx.batches?.get(0)?.batchTransactions?.get(0)?.details if (details == null) { - logger.warn("A void money movement made it through the ingestion: VERY strange") + logger.warn("A void money movement (${tx.accountServicerRef}) made it through the ingestion: VERY strange") return@forEach } when (tx.creditDebitIndicator) { @@ -90,16 +106,17 @@ fun ingestFacadeTransactions( ) } } catch (e: Exception) { - logger.warn("sending refund payment failed", e) + logger.warn("Sending refund payment failed: ${e.message}") } facadeState.highestSeenMessageSerialId = lastId } // invoke ingestion for all the facades transaction { - FacadeEntity.find { FacadesTable.type eq facadeType }.forEach { + FacadeEntity.find { FacadesTable.type eq facadeType.facadeType }.forEach { val facadeBankAccount = getFacadeBankAccount(it.facadeName) if (facadeBankAccount.bankAccountName == bankAccountId) ingest(facadeBankAccount, it) + flushCache() } } }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt index 86c86a37..8d6062d1 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt @@ -59,7 +59,15 @@ private suspend fun runTask(client: HttpClient, sched: TaskSchedule) { "fetch" -> { @Suppress("BlockingMethodInNonBlockingContext") val fetchSpec = jacksonObjectMapper().readValue(sched.params, FetchSpecJson::class.java) - fetchBankAccountTransactions(client, fetchSpec, sched.resourceId) + val outcome = fetchBankAccountTransactions(client, fetchSpec, sched.resourceId) + if (outcome.errors != null && outcome.errors!!.isNotEmpty()) { + /** + * Communication with the bank had at least one error. All of + * them get logged when this 'outcome.errors' list was defined, + * so not logged twice here. Failing to bring the problem(s) up. + */ + exitProcess(1) + } } /** * Submits the payment preparations that are found in the database. diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt index c71e5531..7843345f 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/bankaccount/BankAccount.kt @@ -24,20 +24,37 @@ import io.ktor.server.application.ApplicationCall import io.ktor.client.HttpClient import io.ktor.http.HttpStatusCode import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.transactions.transaction -import org.w3c.dom.Document import tech.libeufin.nexus.* import tech.libeufin.nexus.iso20022.* -import tech.libeufin.nexus.server.FetchSpecJson -import tech.libeufin.nexus.server.Pain001Data -import tech.libeufin.nexus.server.requireBankConnection -import tech.libeufin.nexus.server.toPlainString +import tech.libeufin.nexus.server.* +import tech.libeufin.nexus.xlibeufinbank.processXLibeufinBankMessage import tech.libeufin.util.XMLUtil +import tech.libeufin.util.internalServerError import java.time.Instant +import java.time.ZoneOffset import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter private val keepBankMessages: String? = System.getenv("LIBEUFIN_NEXUS_KEEP_BANK_MESSAGES") + +/** + * Gets a prepared payment starting from its 'payment information id'. + * Note: although the terminology comes from CaMt, a 'payment information id' + * is indeed any UID that identifies the payment. For this reason, also + * the x-libeufin-bank logic uses this helper. + * + * Returns the prepared payment, or null if that's not found. Not throwing + * any exception because the null case is common: not every transaction being + * processed by Neuxs was prepared/initiated here; incoming transactions are + * one example. + */ +fun getPaymentInitiation(pmtInfId: String): PaymentInitiationEntity? = + transaction { + PaymentInitiationEntity.find( + PaymentInitiationsTable.paymentInformationId.eq(pmtInfId) + ).firstOrNull() + } fun requireBankAccount(call: ApplicationCall, parameterKey: String): NexusBankAccountEntity { val name = call.parameters[parameterKey] if (name == null) @@ -63,6 +80,7 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: val submitted = paymentInitiation.submitted } } + // Skips, if the payment was sent once already. if (r.submitted) { return } @@ -75,10 +93,11 @@ suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: /** * Submit all pending prepared payments. */ -suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: String) { - data class Submission( - val id: Long - ) +suspend fun submitAllPaymentInitiations( + httpClient: HttpClient, + accountid: String +) { + data class Submission(val id: Long) val workQueue = mutableListOf<Submission>() transaction { val account = NexusBankAccountEntity.findByName(accountid) ?: throw NexusError( @@ -120,24 +139,10 @@ suspend fun submitAllPaymentInitiations(httpClient: HttpClient, accountid: Strin } /** - * Check if the transaction is already found in the database. - */ -private fun findDuplicate(bankAccountId: String, acctSvcrRef: String): NexusBankTransactionEntity? { - // FIXME: make this generic depending on transaction identification scheme - val ati = "AcctSvcrRef:$acctSvcrRef" - return transaction { - val account = NexusBankAccountEntity.findByName((bankAccountId)) ?: return@transaction null - NexusBankTransactionEntity.find { - (NexusBankTransactionsTable.accountTransactionId eq ati) and (NexusBankTransactionsTable.bankAccount eq account.id) - }.firstOrNull() - } -} - -/** * NOTE: this type can be used BOTH for one Camt document OR * for a set of those. */ -data class CamtTransactionsCount( +data class IngestedTransactionsCount( /** * Number of transactions that are new to the database. * Note that transaction T can be downloaded multiple times; @@ -155,154 +160,31 @@ data class CamtTransactionsCount( /** * Exceptions occurred while fetching transactions. Fetching * transactions can be done via multiple EBICS messages, therefore - * a failing one should not prevent other messages to be sent. - * This list collects all the exceptions that happened during the - * execution of a batch of messages. + * a failing one should not prevent other messages to be fetched. + * This list collects all the exceptions that happened while fetching + * multiple messages. */ var errors: List<Exception>? = null ) /** - * Get the Camt parsed by a helper function, discards duplicates - * and stores new transactions. - */ -fun processCamtMessage( - bankAccountId: String, camtDoc: Document, code: String -): CamtTransactionsCount { - var newTransactions = 0 - var downloadedTransactions = 0 - transaction { - val acct = NexusBankAccountEntity.findByName(bankAccountId) - if (acct == null) { - throw NexusError(HttpStatusCode.NotFound, "user not found") - } - val res = try { - parseCamtMessage(camtDoc) - } catch (e: CamtParsingError) { - logger.warn("Invalid CAMT received from bank: $e") - newTransactions = -1 - return@transaction - } - res.reports.forEach { - NexusAssert( - it.account.iban == acct.iban, - "Nexus hit a report or statement of a wrong IBAN!" - ) - it.balances.forEach { b -> - if (b.type == "CLBD") { - val lastBalance = NexusBankBalanceEntity.all().lastOrNull() - /** - * Store balances different from the one that came from the bank, - * or the very first balance. This approach has the following inconvenience: - * the 'balance' held at Nexus does not differentiate between one - * coming from a statement and one coming from a report. As a consequence, - * the two types of balances may override each other without notice. - */ - if ((lastBalance == null) || - (b.amount.toPlainString() != lastBalance.balance)) { - NexusBankBalanceEntity.new { - bankAccount = acct - balance = b.amount.toPlainString() - creditDebitIndicator = b.creditDebitIndicator.name - date = b.date - } - } - } - } - } - /** - * Why is the report/statement creation timestamp important, - * rather than each individual payment identification value? - */ - val stamp = - ZonedDateTime.parse(res.creationDateTime, DateTimeFormatter.ISO_DATE_TIME).toInstant().toEpochMilli() - when (code) { - "C52" -> { - val s = acct.lastReportCreationTimestamp - if (s != null && stamp > s) { - acct.lastReportCreationTimestamp = stamp - } - } - "C53" -> { - val s = acct.lastStatementCreationTimestamp - if (s != null && stamp > s) { - acct.lastStatementCreationTimestamp = stamp - } - } - } - val entries: List<CamtBankAccountEntry> = res.reports.map { it.entries }.flatten() - var newPaymentsLog = "" - downloadedTransactions = entries.size - txloop@ for (entry in entries) { - val singletonBatchedTransaction = entry.batches?.get(0)?.batchTransactions?.get(0) - ?: throw NexusError( - HttpStatusCode.InternalServerError, - "Singleton money movements policy wasn't respected" - ) - val acctSvcrRef = entry.accountServicerRef - if (acctSvcrRef == null) { - // FIXME(dold): Report this! - logger.error("missing account servicer reference in transaction") - continue - } - val duplicate = findDuplicate(bankAccountId, acctSvcrRef) - if (duplicate != null) { - logger.info("Found a duplicate (acctSvcrRef): $acctSvcrRef") - // FIXME(dold): See if an old transaction needs to be superseded by this one - // https://bugs.gnunet.org/view.php?id=6381 - continue@txloop - } - val rawEntity = NexusBankTransactionEntity.new { - bankAccount = acct - accountTransactionId = "AcctSvcrRef:$acctSvcrRef" - amount = singletonBatchedTransaction.amount.value - currency = singletonBatchedTransaction.amount.currency - transactionJson = jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(entry) - creditDebitIndicator = singletonBatchedTransaction.creditDebitIndicator.name - status = entry.status - } - rawEntity.flush() - newTransactions++ - newPaymentsLog += "\n- " + entry.batches[0].batchTransactions[0].details.unstructuredRemittanceInformation - // This block tries to acknowledge a former outgoing payment as booked. - if (singletonBatchedTransaction.creditDebitIndicator == CreditDebitIndicator.DBIT) { - val t0 = singletonBatchedTransaction.details - val pmtInfId = t0.paymentInformationId - if (pmtInfId != null) { - val paymentInitiation = PaymentInitiationEntity.find { - PaymentInitiationsTable.bankAccount eq acct.id and ( - PaymentInitiationsTable.paymentInformationId eq pmtInfId) - - }.firstOrNull() - if (paymentInitiation != null) { - logger.info("Could confirm one initiated payment: $pmtInfId") - paymentInitiation.confirmationTransaction = rawEntity - } - } - } - } - if (newTransactions > 0) - logger.debug("Camt $code '${res.messageId}' has new payments:${newPaymentsLog}") - } - return CamtTransactionsCount( - newTransactions = newTransactions, - downloadedTransactions = downloadedTransactions - ) -} - -/** - * Create new transactions for an account based on bank messages it - * did not see before. + * Causes new Nexus transactions to be stored into the database. Note: + * this function does NOT parse itself the banking data but relies on the + * dedicated helpers. This function is mostly responsible for _iterating_ + * over the new downloaded messages and update the local bank account about + * the new data. */ fun ingestBankMessagesIntoAccount( bankConnectionId: String, bankAccountId: String -): CamtTransactionsCount { +): IngestedTransactionsCount { var totalNew = 0 var downloadedTransactions = 0 transaction { val conn = - NexusBankConnectionEntity.find { NexusBankConnectionsTable.connectionId eq bankConnectionId }.firstOrNull() + NexusBankConnectionEntity.find { + NexusBankConnectionsTable.connectionId eq bankConnectionId + }.firstOrNull() if (conn == null) { throw NexusError(HttpStatusCode.InternalServerError, "connection not found") } @@ -311,15 +193,55 @@ fun ingestBankMessagesIntoAccount( throw NexusError(HttpStatusCode.InternalServerError, "account not found") } var lastId = acct.highestSeenBankMessageSerialId + /** + * This block picks all the new messages that were downloaded + * from the bank and passes them to the deeper banking data handlers + * according to the connection type. Such handlers are then responsible + * to extract the interesting values and insert them into the database. + */ NexusBankMessageEntity.find { (NexusBankMessagesTable.bankConnection eq conn.id) and (NexusBankMessagesTable.id greater acct.highestSeenBankMessageSerialId) and - // Wrong messages got already skipped by the - // index check above. Below is a extra check. not(NexusBankMessagesTable.errors) - }.orderBy(Pair(NexusBankMessagesTable.id, SortOrder.ASC)).forEach { - val doc = XMLUtil.parseStringIntoDom(it.message.bytes.toString(Charsets.UTF_8)) - val processingResult = processCamtMessage(bankAccountId, doc, it.code) + }.orderBy( + Pair(NexusBankMessagesTable.id, SortOrder.ASC) + ).forEach { + val processingResult: IngestedTransactionsCount = when(BankConnectionType.parseBankConnectionType(conn.type)) { + BankConnectionType.EBICS -> { + val doc = XMLUtil.parseStringIntoDom(it.message.bytes.toString(Charsets.UTF_8)) + /** + * Calling the CaMt handler. After its return, all the Neuxs-meaningful + * payment data got stored into the database and is ready to being further + * processed by any facade OR simply be communicated to the CLI via JSON. + */ + processCamtMessage( + bankAccountId, + doc, + it.code ?: throw internalServerError( + "Bank message with ID ${it.id.value} in DB table" + + " NexusBankMessagesTable has no code, but one is expected." + ) + ) + } + BankConnectionType.X_LIBEUFIN_BANK -> { + val jMessage = try { jacksonObjectMapper().readTree(it.message.bytes) } + catch (e: Exception) { + logger.error("Bank message ${it.id}/${it.messageId} could not" + + " be parsed into JSON by the x-libeufin-bank ingestion.") + throw internalServerError("Could not ingest x-libeufin-bank messages.") + } + processXLibeufinBankMessage( + bankAccountId, + jMessage + ) + } + } + /** + * Checking for errors. Note: errors do NOT stop this loop as + * they mean that ONE message has errors. Erroneous messages gets + * (1) flagged, (2) skipped when this function will run again, and (3) + * NEVER deleted from the database. + */ if (processingResult.newTransactions == -1) { it.errors = true lastId = it.id.value @@ -336,12 +258,18 @@ fun ingestBankMessagesIntoAccount( it.delete() return@forEach } + /** + * Updating the highest seen message ID with the serial ID of + * the row that's being currently iterated over. Note: this + * number is ever-growing REGARDLESS of the row being kept into + * the database. + */ lastId = it.id.value } + // Causing the lastId to be stored into the database: acct.highestSeenBankMessageSerialId = lastId } - // return totalNew - return CamtTransactionsCount( + return IngestedTransactionsCount( newTransactions = totalNew, downloadedTransactions = downloadedTransactions ) @@ -358,6 +286,31 @@ fun getPaymentInitiation(uuid: Long): PaymentInitiationEntity { "Payment '$uuid' not found" ) } + +data class LastMessagesTimes( + val lastStatement: ZonedDateTime?, + val lastReport: ZonedDateTime? +) +/** + * Get the last timestamps where a report and + * a statement were received for the bank account + * given as argument. + */ +fun getLastMessagesTimes(bankAccountId: String): LastMessagesTimes { + val acct = getBankAccount(bankAccountId) + return getLastMessagesTimes(acct) +} + +fun getLastMessagesTimes(acct: NexusBankAccountEntity): LastMessagesTimes { + return LastMessagesTimes( + lastReport = acct.lastReportCreationTimestamp?.let { + ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) + }, + lastStatement = acct.lastStatementCreationTimestamp?.let { + ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) + } + ) +} fun getBankAccount(label: String): NexusBankAccountEntity { val maybeBankAccount = transaction { NexusBankAccountEntity.findByName(label) @@ -405,9 +358,11 @@ fun addPaymentInitiation(paymentData: Pain001Data, debtorAccount: NexusBankAccou } suspend fun fetchBankAccountTransactions( - client: HttpClient, fetchSpec: FetchSpecJson, accountId: String -): CamtTransactionsCount { - val res = transaction { + client: HttpClient, + fetchSpec: FetchSpecJson, + accountId: String +): IngestedTransactionsCount { + val connectionDetails = transaction { val acct = NexusBankAccountEntity.findByName(accountId) if (acct == null) { throw NexusError( @@ -423,7 +378,12 @@ suspend fun fetchBankAccountTransactions( ) } return@transaction object { - val connectionType = conn.type + /** + * The connection type _as enum_ should eventually come + * directly from the database, instead of being parsed by + * parseBankConnectionType(). + */ + val connectionType = BankConnectionType.parseBankConnectionType(conn.type) val connectionName = conn.connectionId } } @@ -432,16 +392,39 @@ suspend fun fetchBankAccountTransactions( * document into the database. This function tries to download * both reports AND statements even if the first one fails. */ - val errors: List<Exception>? = getConnectionPlugin(res.connectionType).fetchTransactions( + val errors: List<Exception>? = getConnectionPlugin(connectionDetails.connectionType).fetchTransactions( fetchSpec, client, - res.connectionName, + connectionDetails.connectionName, accountId ) - val ingestionResult = ingestBankMessagesIntoAccount(res.connectionName, accountId) - ingestFacadeTransactions(accountId, "taler-wire-gateway", ::talerFilter, ::maybeTalerRefunds) - ingestFacadeTransactions(accountId, "anastasis", ::anastasisFilter, null) + /** + * This block causes new NexusBankAccountTransactions rows to be + * INSERTed into the database, according to the banking data that + * was recently downloaded. + */ + val ingestionResult: IngestedTransactionsCount = ingestBankMessagesIntoAccount( + connectionDetails.connectionName, + accountId + ) + /** + * The following two functions further processe the banking data + * that was recently downloaded, according to the particular facade + * being honored. + */ + ingestFacadeTransactions( + bankAccountId = accountId, + facadeType = NexusFacadeType.TALER, + incomingFilterCb = ::talerFilter, + refundCb = ::maybeTalerRefunds + ) + ingestFacadeTransactions( + bankAccountId = accountId, + facadeType = NexusFacadeType.ANASTASIS, + incomingFilterCb = ::anastasisFilter, + refundCb = null + ) ingestionResult.errors = errors return ingestionResult @@ -498,4 +481,29 @@ fun importBankAccount(call: ApplicationCall, offeredBankAccountId: String, nexus } } } -}
\ No newline at end of file +} + + +/** + * Check if the transaction is already found in the database. + * This function works as long as the caller provides the appropriate + * 'uid' parameter. For CaMt messages this value is carried along + * the AcctSvcrRef node, whereas for x-libeufin-bank connections + * that's the 'uid' field of the XLibeufinBankTransaction type. + * + * Returns the transaction that's already in the database, in case + * the 'uid' is from a duplicate. + */ +fun findDuplicate( + bankAccountId: String, + uid: String +): NexusBankTransactionEntity? { + return transaction { + val account = NexusBankAccountEntity.findByName((bankAccountId)) ?: + return@transaction null + NexusBankTransactionEntity.find { + (NexusBankTransactionsTable.accountTransactionId eq uid) and + (NexusBankTransactionsTable.bankAccount eq account.id) + }.firstOrNull() + } +} diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt index 209c9384..f41a3729 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsNexus.kt @@ -44,9 +44,9 @@ import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.statements.api.ExposedBlob -import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.getLastMessagesTimes import tech.libeufin.nexus.bankaccount.getPaymentInitiation import tech.libeufin.nexus.iso20022.NexusPaymentInitiationData import tech.libeufin.nexus.iso20022.createPain001document @@ -149,6 +149,10 @@ private suspend fun fetchEbicsC5x( } } +/** + * Prepares key material and other EBICS details and + * returns them along a convenient object. + */ private fun getEbicsSubscriberDetailsInternal(subscriber: EbicsSubscriberEntity): EbicsClientSubscriberDetails { var bankAuthPubValue: RSAPublicKey? = null if (subscriber.bankAuthenticationPublicKey != null) { @@ -178,18 +182,19 @@ private fun getEbicsSubscriberDetailsInternal(subscriber: EbicsSubscriberEntity) ebicsHiaState = subscriber.ebicsHiaState ) } - +private fun getSubscriberFromConnection(connectionEntity: NexusBankConnectionEntity): EbicsSubscriberEntity = + transaction { + EbicsSubscriberEntity.find { + NexusEbicsSubscribersTable.nexusBankConnection eq connectionEntity.id + }.firstOrNull() ?: throw internalServerError("ebics bank connection '${connectionEntity.connectionId}' has no subscriber.") + } /** * Retrieve Ebics subscriber details given a bank connection. */ fun getEbicsSubscriberDetails(bankConnectionId: String): EbicsClientSubscriberDetails { - val transport = NexusBankConnectionEntity.findByName(bankConnectionId) - if (transport == null) { - throw NexusError(HttpStatusCode.NotFound, "transport not found") - } - val subscriber = EbicsSubscriberEntity.find { - NexusEbicsSubscribersTable.nexusBankConnection eq transport.id - }.first() + val transport = getBankConnection(bankConnectionId) + val subscriber = getSubscriberFromConnection(transport) + // transport exists and belongs to caller. return getEbicsSubscriberDetailsInternal(subscriber) } @@ -417,23 +422,7 @@ class EbicsBankConnectionProtocol: BankConnectionProtocol { accountId: String ): List<Exception>? { val subscriberDetails = transaction { getEbicsSubscriberDetails(bankConnectionId) } - val lastTimes = transaction { - val acct = NexusBankAccountEntity.findByName(accountId) - if (acct == null) { - throw NexusError( - HttpStatusCode.NotFound, - "Account '$accountId' not found" - ) - } - object { - val lastStatement = acct.lastStatementCreationTimestamp?.let { - ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) - } - val lastReport = acct.lastReportCreationTimestamp?.let { - ZonedDateTime.ofInstant(Instant.ofEpochMilli(it), ZoneOffset.UTC) - } - } - } + val lastTimes = getLastMessagesTimes(accountId) /** * Will be filled with fetch instructions, according * to the parameters received from the client. @@ -533,61 +522,53 @@ class EbicsBankConnectionProtocol: BankConnectionProtocol { return errors return null } - /** - * Submit one Pain.001 for one payment initiations. - */ + // Submit one Pain.001 for one payment initiations. override suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: Long) { - val r = transaction { - val paymentInitiation = PaymentInitiationEntity.findById(paymentInitiationId) - ?: throw NexusError(HttpStatusCode.NotFound, "payment initiation not found") - val conn = paymentInitiation.bankAccount.defaultBankConnection - ?: throw NexusError(HttpStatusCode.NotFound, "no default bank connection available for submission") + val dbData = transaction { + val preparedPayment = getPaymentInitiation(paymentInitiationId) + val conn = preparedPayment.bankAccount.defaultBankConnection ?: throw NexusError(HttpStatusCode.NotFound, "no default bank connection available for submission") val subscriberDetails = getEbicsSubscriberDetails(conn.connectionId) val painMessage = createPain001document( NexusPaymentInitiationData( - debtorIban = paymentInitiation.bankAccount.iban, - debtorBic = paymentInitiation.bankAccount.bankCode, - debtorName = paymentInitiation.bankAccount.accountHolder, - currency = paymentInitiation.currency, - amount = paymentInitiation.sum.toString(), - creditorIban = paymentInitiation.creditorIban, - creditorName = paymentInitiation.creditorName, - creditorBic = paymentInitiation.creditorBic, - paymentInformationId = paymentInitiation.paymentInformationId, - preparationTimestamp = paymentInitiation.preparationDate, - subject = paymentInitiation.subject, - instructionId = paymentInitiation.instructionId, - endToEndId = paymentInitiation.endToEndId, - messageId = paymentInitiation.messageId + debtorIban = preparedPayment.bankAccount.iban, + debtorBic = preparedPayment.bankAccount.bankCode, + debtorName = preparedPayment.bankAccount.accountHolder, + currency = preparedPayment.currency, + amount = preparedPayment.sum, + creditorIban = preparedPayment.creditorIban, + creditorName = preparedPayment.creditorName, + creditorBic = preparedPayment.creditorBic, + paymentInformationId = preparedPayment.paymentInformationId, + preparationTimestamp = preparedPayment.preparationDate, + subject = preparedPayment.subject, + instructionId = preparedPayment.instructionId, + endToEndId = preparedPayment.endToEndId, + messageId = preparedPayment.messageId ) ) - logger.debug("Sending Pain.001: ${paymentInitiation.paymentInformationId}," + - " for payment: '${paymentInitiation.subject}'") - if (!XMLUtil.validateFromString(painMessage)) { - logger.error("Pain.001 ${paymentInitiation.paymentInformationId}" + - " is invalid, not submitting it and flag as invalid.") - val payment = getPaymentInitiation(paymentInitiationId) - payment.invalid = true - // The following commit prevents the thrown error - // to lose the database transaction data. - TransactionManager.current().commit() - throw NexusError( - HttpStatusCode.InternalServerError, - "Attempted Pain.001 (${paymentInitiation.paymentInformationId})" + - " message is invalid. Not sent to the bank.", - LibeufinErrorCode.LIBEUFIN_EC_INVALID_STATE - ) - } object { + val painXml = painMessage val subscriberDetails = subscriberDetails - val painMessage = painMessage } } + if (!XMLUtil.validateFromString(dbData.painXml)) { + val pmtInfId = transaction { + val payment = getPaymentInitiation(paymentInitiationId) + logger.error("Pain.001 ${payment.paymentInformationId}" + + " is invalid, not submitting it and flag as invalid.") + payment.invalid = true + } + throw NexusError( + HttpStatusCode.InternalServerError, + "pain document: $pmtInfId document is invalid. Not sent to the bank.", + LibeufinErrorCode.LIBEUFIN_EC_INVALID_STATE + ) + } doEbicsUploadTransaction( httpClient, - r.subscriberDetails, + dbData.subscriberDetails, "CCT", - r.painMessage.toByteArray(Charsets.UTF_8), + dbData.painXml.toByteArray(Charsets.UTF_8), EbicsStandardOrderParams() ) transaction { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt index 98e92a46..52627b66 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt @@ -22,15 +22,21 @@ */ package tech.libeufin.nexus.iso20022 +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* +import io.ktor.util.reflect.* +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.transaction import org.w3c.dom.Document -import tech.libeufin.nexus.NexusAssert -import tech.libeufin.nexus.NexusError +import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.IngestedTransactionsCount +import tech.libeufin.nexus.bankaccount.findDuplicate import tech.libeufin.nexus.server.CurrencyAmount +import tech.libeufin.nexus.server.toPlainString import tech.libeufin.util.* -import java.math.BigDecimal import java.time.Instant import java.time.ZoneId import java.time.ZonedDateTime @@ -40,23 +46,6 @@ enum class CreditDebitIndicator { DBIT, CRDT } -enum class EntryStatus { - /** - * Booked - */ - BOOK, - - /** - * Pending - */ - PDNG, - - /** - * Informational - */ - INFO, -} - enum class CashManagementResponseType(@get:JsonValue val jsonName: String) { Report("report"), Statement("statement"), Notification("notification") } @@ -268,7 +257,7 @@ data class ReturnInfo( ) data class BatchTransaction( - val amount: CurrencyAmount, + val amount: CurrencyAmount, // Fuels Taler withdrawal amount. val creditDebitIndicator: CreditDebitIndicator, val details: TransactionDetails ) @@ -329,7 +318,53 @@ data class CamtBankAccountEntry( // list of sub-transactions participating in this money movement. val batches: List<Batch>? -) +) { + /** + * This function returns the subject of the unique transaction + * accounted in this object. If the transaction is not unique, + * it throws an exception. NOTE: the caller has the responsibility + * of not passing an empty report; those usually should be discarded + * and never participate in the application logic. + */ + @JsonIgnore + fun getSingletonSubject(): String { + // Checks that the given list contains only one element and returns it. + fun <T>checkAndGetSingleton(maybeTxs: List<T>?): T { + if (maybeTxs == null || maybeTxs.size > 1) throw internalServerError( + "Only a singleton transaction is " + + "allowed inside ${this.javaClass}." + ) + return maybeTxs[0] + } + /** + * Types breakdown until the last payment information is reached. + * + * CamtBankAccountEntry contains: + * - Batch 0 + * - Batch 1 + * - Batch N + * + * Batch X contains: + * - BatchTransaction 0 + * - BatchTransaction 1 + * - BatchTransaction N + * + * BatchTransaction X contains: + * - TransactionDetails + * + * TransactionDetails contains the involved parties + * and the payment subject but MAY NOT contain the amount. + * In this model, the amount is held in the BatchTransaction + * type, that is also -- so far -- required to be a singleton + * inside Batch. + */ + checkAndGetSingleton<Batch>(this.batches) + val batchTransactions = this.batches?.get(0)?.batchTransactions + val tx = checkAndGetSingleton<BatchTransaction>(batchTransactions) + val details: TransactionDetails = tx.details + return details.unstructuredRemittanceInformation + } +} class CamtParsingError(msg: String) : Exception(msg) @@ -861,7 +896,10 @@ private fun XmlElementDestructor.extractInnerTransactions(): CamtReport { instructedAmount = instructedAmount, creditDebitIndicator = creditDebitIndicator, bankTransactionCode = btc, - batches = extractBatches(amount, creditDebitIndicator, acctSvcrRef ?: "AcctSvcrRef not given/found"), + batches = extractBatches( + amount, + creditDebitIndicator, + acctSvcrRef ?: "AcctSvcrRef not given/found"), bookingDate = maybeUniqueChildNamed("BookgDt") { extractDateOrDateTime() }, valueDate = maybeUniqueChildNamed("ValDt") { extractDateOrDateTime() }, accountServicerRef = acctSvcrRef, @@ -936,3 +974,155 @@ fun parseCamtMessage(doc: Document): CamtParseResult { } } } + +/** + * Given that every CaMt is a collection of reports/statements + * where each of them carries the bank account balance and a list + * of transactions, this function: + * + * - extracts the balance (storing a NexusBankBalanceEntity) + * - updates timestamps in NexusBankAccountEntity to the last seen + * report/statement. + * - finds which transactions were already downloaded. + * - stores a new NexusBankTransactionEntity for each new tx +accounted in the report/statement. + * - tries to link the new transaction with a submitted one, in + * case of DBIT transaction. + * - returns a IngestedTransactionCount object. + */ +fun processCamtMessage( + bankAccountId: String, + camtDoc: Document, + /** + * FIXME: should NOT be C52/C53 but "report" or "statement". + * The reason is that C52/C53 are NOT CaMt, they are EBICS names. + */ + code: String +): IngestedTransactionsCount { + var newTransactions = 0 + var downloadedTransactions = 0 + transaction { + val acct = NexusBankAccountEntity.findByName(bankAccountId) + if (acct == null) { + throw NexusError(HttpStatusCode.NotFound, "user not found") + } + val res = try { parseCamtMessage(camtDoc) } catch (e: CamtParsingError) { + logger.warn("Invalid CAMT received from bank: $e") + newTransactions = -1 + return@transaction + } + res.reports.forEach { + NexusAssert( + it.account.iban == acct.iban, + "Nexus hit a report or statement of a wrong IBAN!" + ) + it.balances.forEach { b -> + if (b.type == "CLBD") { + val lastBalance = NexusBankBalanceEntity.all().lastOrNull() + /** + * Store balances different from the one that came from the bank, + * or the very first balance. This approach has the following inconvenience: + * the 'balance' held at Nexus does not differentiate between one + * coming from a statement and one coming from a report. As a consequence, + * the two types of balances may override each other without notice. + */ + if ((lastBalance == null) || + (b.amount.toPlainString() != lastBalance.balance)) { + NexusBankBalanceEntity.new { + bankAccount = acct + balance = b.amount.toPlainString() + creditDebitIndicator = b.creditDebitIndicator.name + date = b.date + } + } + } + } + } + // Updating the local bank account state timestamps according to the current document. + val stamp = ZonedDateTime.parse( + res.creationDateTime, + DateTimeFormatter.ISO_DATE_TIME + ).toInstant().toEpochMilli() + when (code) { + "C52" -> { + val s = acct.lastReportCreationTimestamp + /** + * FIXME. + * The following check seems broken, as it ONLY sets the value when + * s is non-null BUT s gets never set; not even with a default value. + * That didn't break so far because the timestamp gets only used when + * the fetch specification has "since-last" for the time range. Never + * used. + */ + if (s != null && stamp > s) { + acct.lastReportCreationTimestamp = stamp + } + } + "C53" -> { + val s = acct.lastStatementCreationTimestamp + if (s != null && stamp > s) { + acct.lastStatementCreationTimestamp = stamp + } + } + } + val entries: List<CamtBankAccountEntry> = res.reports.map { it.entries }.flatten() + var newPaymentsLog = "" + downloadedTransactions = entries.size + txloop@ for (entry: CamtBankAccountEntry in entries) { + val singletonBatchedTransaction: BatchTransaction = entry.batches?.get(0)?.batchTransactions?.get(0) + ?: throw NexusError( + HttpStatusCode.InternalServerError, + "Singleton money movements policy wasn't respected" + ) + val acctSvcrRef = entry.accountServicerRef + if (acctSvcrRef == null) { + // FIXME(dold): Report this! + logger.error("missing account servicer reference in transaction") + continue + } + val duplicate = findDuplicate(bankAccountId, acctSvcrRef) + if (duplicate != null) { + logger.info("Found a duplicate (acctSvcrRef): $acctSvcrRef") + // FIXME(dold): See if an old transaction needs to be superseded by this one + // https://bugs.gnunet.org/view.php?id=6381 + continue@txloop + } + val rawEntity = NexusBankTransactionEntity.new { + bankAccount = acct + accountTransactionId = acctSvcrRef + amount = singletonBatchedTransaction.amount.value + currency = singletonBatchedTransaction.amount.currency + transactionJson = jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(entry) + creditDebitIndicator = singletonBatchedTransaction.creditDebitIndicator.name + status = entry.status + } + rawEntity.flush() + newTransactions++ + newPaymentsLog += "\n- " + entry.batches[0].batchTransactions[0].details.unstructuredRemittanceInformation + // This block tries to acknowledge a former outgoing payment as booked. + if (singletonBatchedTransaction.creditDebitIndicator == CreditDebitIndicator.DBIT) { + val t0 = singletonBatchedTransaction.details + val pmtInfId = t0.paymentInformationId + if (pmtInfId != null) { + val paymentInitiation = PaymentInitiationEntity.find { + PaymentInitiationsTable.bankAccount eq acct.id and ( + // pmtInfId is a value that the payment submitter + // asked the bank to associate with the payment to be made. + PaymentInitiationsTable.paymentInformationId eq pmtInfId) + + }.firstOrNull() + if (paymentInitiation != null) { + logger.info("Could confirm one initiated payment: $pmtInfId") + paymentInitiation.confirmationTransaction = rawEntity + } + } + } + } + if (newTransactions > 0) + logger.debug("Camt $code '${res.messageId}' has new payments:${newPaymentsLog}") + } + return IngestedTransactionsCount( + newTransactions = newTransactions, + downloadedTransactions = downloadedTransactions + ) +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt new file mode 100644 index 00000000..8c01705a --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt @@ -0,0 +1,85 @@ +package tech.libeufin.nexus.server + +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.NexusBankConnectionEntity +import tech.libeufin.nexus.NexusBankConnectionsTable +import tech.libeufin.util.internalServerError +import tech.libeufin.util.notFound + +/** + * FIXME: + * enum type names were introduced after 0.9.2 and need to + * be employed wherever now type names are passed as plain + * strings. + */ + +// Valid connection types. +enum class BankConnectionType(val typeName: String) { + EBICS("ebics"), + X_LIBEUFIN_BANK("x-taler-bank"); + companion object { + /** + * This method takes legacy bank connection type names as input + * and _tries_ to return the correspondent enum type. This + * fixes the cases where bank connection types are passed as + * easy-to-break arbitrary strings; eventually this method should + * be discarded and only enum types be passed as connection type names. + */ + fun parseBankConnectionType(typeName: String): BankConnectionType { + return when(typeName) { + "ebics" -> EBICS + "x-libeufin-bank" -> X_LIBEUFIN_BANK + else -> throw internalServerError( + "Cannot extract ${this::class.java.typeName}' instance from name: $typeName'" + ) + } + } + } +} +// Valid facade types +enum class NexusFacadeType(val facadeType: String) { + TALER("taler-wire-gateway"), + ANASTASIS("anastasis") +} + +/** + * These types point at the _content_ brought by bank connections. + * The following stack depicts the layering of banking communication + * as modeled here in Nexus. On top the most inner layer. + * + * -------------------- + * Banking data type + * -------------------- + * Bank connection type + * -------------------- + * HTTP + * -------------------- + * + * Once the banking data type arrives to the local database, facades + * types MAY apply further processing to it. + * + * For example, a Taler facade WILL look for Taler-meaningful wire + * subjects and act accordingly. Even without a facade, the Nexus + * native HTTP API picks instances of banking data and extracts its + * details to serve to the client. + * + * NOTE: this type MAY help but is NOT essential, as each connection + * is USUALLY tied with the same banking data type. For example, EBICS + * brings CaMt, and x-libeufin-bank bring its own (same-named x-libeufin-bank) + * banking data type. + */ +enum class BankingDataType { + X_LIBEUFIN_BANK, + CAMT +} + +// Gets connection or throws. +fun getBankConnection(connId: String): NexusBankConnectionEntity { + val maybeConn = transaction { + NexusBankConnectionEntity.find { + NexusBankConnectionsTable.connectionId eq connId + }.firstOrNull() + } + if (maybeConn == null) throw notFound("Bank connection $connId not found") + return maybeConn +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt index 38ab4236..7fdfd526 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/JSON.kt @@ -32,10 +32,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonSerialize import com.fasterxml.jackson.databind.deser.std.StdDeserializer import com.fasterxml.jackson.databind.ser.std.StdSerializer +import tech.libeufin.nexus.EntryStatus import tech.libeufin.nexus.iso20022.CamtBankAccountEntry -import tech.libeufin.nexus.iso20022.EntryStatus import tech.libeufin.util.* -import java.math.BigDecimal import java.time.Instant import java.time.ZoneId import java.time.ZonedDateTime @@ -251,6 +250,17 @@ data class EbicsNewTransport( val systemID: String? ) +/** + * Credentials and URL to access Sandbox and talk JSON to it. + * See https://docs.taler.net/design-documents/038-demobanks-protocol-suppliers.html#static-x-libeufin-bank-with-dynamic-demobank + * for an introduction on x-libeufin-bank. + */ +data class XLibeufinBankTransport( + val username: String, + val password: String, + val baseUrl: String +) + /** Response type of "GET /prepared-payments/{uuid}" */ data class PaymentStatus( val paymentInitiationId: String, @@ -262,10 +272,6 @@ data class PaymentStatus( val subject: String, val submissionDate: String?, val preparationDate: String, - // null when the payment was never acknowledged by - // the bank. For example, it was submitted but never - // seen in any report; or only created and not even - // submitted. val status: EntryStatus? ) @@ -346,8 +352,9 @@ data class BankMessageList( ) data class BankMessageInfo( - val messageId: String, - val code: String, + // x-libeufin-bank messages do not have any ID or code. + val messageId: String?, + val code: String?, val length: Long ) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt index ebe002fc..fb864d3b 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -53,6 +53,7 @@ import tech.libeufin.nexus.* import tech.libeufin.nexus.bankaccount.* import tech.libeufin.nexus.ebics.* import tech.libeufin.nexus.iso20022.CamtBankAccountEntry +import tech.libeufin.nexus.iso20022.processCamtMessage import tech.libeufin.util.* import java.net.BindException import java.net.URLEncoder @@ -739,7 +740,7 @@ val nexusApp: Application.() -> Unit = { var statusCode = HttpStatusCode.OK /** * Client errors are unlikely here, because authentication - * and JSON validity fail earlier. Hence either Nexus or the + * and JSON validity fail earlier. Hence, either Nexus or the * bank had a problem. NOTE: because this handler triggers multiple * fetches, it is ALSO possible that although one error is reported, * SOME transactions made it to the database! diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt new file mode 100644 index 00000000..b819163a --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/xlibeufinbank/XLibeufinBankNexus.kt @@ -0,0 +1,395 @@ +package tech.libeufin.nexus.xlibeufinbank + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.server.util.* +import io.ktor.util.* +import org.jetbrains.exposed.sql.statements.api.ExposedBlob +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.* +import tech.libeufin.nexus.iso20022.* +import tech.libeufin.nexus.server.* +import tech.libeufin.util.XLibeufinBankDirection +import tech.libeufin.util.XLibeufinBankTransaction +import tech.libeufin.util.badRequest +import tech.libeufin.util.internalServerError +import java.net.MalformedURLException +import java.net.URL + +// Gets Sandbox URL and credentials, taking the connection name as input. +fun getXLibeufinBankCredentials(conn: NexusBankConnectionEntity): XLibeufinBankTransport { + val maybeCredentials = transaction { + XLibeufinBankUserEntity.find { + XLibeufinBankUsersTable.nexusBankConnection eq conn.id + }.firstOrNull() + } + if (maybeCredentials == null) throw internalServerError( + "Existing connection ${conn.connectionId} has no transport details" + ) + return XLibeufinBankTransport( + username = maybeCredentials.username, + password = maybeCredentials.password, + baseUrl = maybeCredentials.baseUrl + ) +} +fun getXLibeufinBankCredentials(connId: String): XLibeufinBankTransport { + val conn = getBankConnection(connId) + return getXLibeufinBankCredentials(conn) + +} + +class XlibeufinBankConnectionProtocol : BankConnectionProtocol { + override suspend fun connect(client: HttpClient, connId: String) { + TODO("Not yet implemented") + } + + override suspend fun fetchAccounts(client: HttpClient, connId: String) { + throw NotImplementedError("x-libeufin-bank does not need to fetch accounts") + } + + override fun createConnectionFromBackup( + connId: String, + user: NexusUserEntity, + passphrase: String?, + backup: JsonNode + ) { + TODO("Not yet implemented") + } + + override fun createConnection( + connId: String, + user: NexusUserEntity, + data: JsonNode) { + + val bankConn = transaction { + NexusBankConnectionEntity.new { + this.connectionId = connId + owner = user + type = "x-libeufin-bank" + } + } + val newTransportData = jacksonObjectMapper().treeToValue( + data, XLibeufinBankTransport::class.java + ) ?: throw badRequest("x-libeufin-bank details not found in the request") + // Validate the base URL + try { URL(newTransportData.baseUrl).toURI() } + catch (e: MalformedURLException) { + throw badRequest("Base URL (${newTransportData.baseUrl}) is invalid.") + } + transaction { + XLibeufinBankUserEntity.new { + username = newTransportData.username + password = newTransportData.password + // Only addressing mild cases where ONE slash ends the base URL. + baseUrl = newTransportData.baseUrl.dropLastWhile { it == '/' } + nexusBankConnection = bankConn + } + } + } + + override fun getConnectionDetails(conn: NexusBankConnectionEntity): JsonNode { + TODO("Not yet implemented") + } + + override fun exportBackup(bankConnectionId: String, passphrase: String): JsonNode { + TODO("Not yet implemented") + } + + override fun exportAnalogDetails(conn: NexusBankConnectionEntity): ByteArray { + throw NotImplementedError("x-libeufin-bank does not need analog details") + } + + override suspend fun submitPaymentInitiation(httpClient: HttpClient, paymentInitiationId: Long) { + TODO("Not yet implemented") + } + + override suspend fun fetchTransactions( + fetchSpec: FetchSpecJson, + client: HttpClient, + bankConnectionId: String, + accountId: String + ): List<Exception>? { + val conn = getBankConnection(bankConnectionId) + /** + * Note: fetchSpec.level is ignored because Sandbox does not + * differentiate between booked and non-booked transactions. + * Just logging if the unaware client specified non-REPORT for + * the level. FIXME: docs have to mention this. + */ + if (fetchSpec.level == FetchLevel.REPORT || fetchSpec.level == FetchLevel.ALL) + throw badRequest("level '${fetchSpec.level}' on x-libeufin-bank" + + "connection (${conn.connectionId}) is not supported:" + + " bank has only 'booked' state." + ) + // Get credentials + val credentials = getXLibeufinBankCredentials(conn) + /** + * Now builds the URL to ask the transactions, according to the + * FetchSpec gotten in the args. Level 'statement' and time range + * 'previous-dayes' are NOT implemented. + */ + val baseUrl = URL(credentials.baseUrl) + val fetchUrl = url { + protocol = URLProtocol(name = baseUrl.protocol, defaultPort = -1) + appendPathSegments( + baseUrl.path.dropLastWhile { it == '/' }, + "accounts/${credentials.username}/transactions") + when (fetchSpec) { + // Gets the last 5 transactions + is FetchSpecLatestJson -> { + // Do nothing, the bare endpoint gets the last 5 txs by default. + } + /* Defines the from_ms URI param. according to the last transaction + * timestamp that was seen in this connection */ + is FetchSpecSinceLastJson -> { + val localBankAccount = getBankAccount(accountId) + val lastMessagesTimes = getLastMessagesTimes(localBankAccount) + // Sandbox doesn't have report vs. statement, defaulting to report time + // and so does the ingestion routine when storing the last message time. + this.parameters["from_ms"] = "${lastMessagesTimes.lastStatement ?: 0}" + } + // This wants ALL the transactions, hence it sets the from_ms to zero. + is FetchSpecAllJson -> { + this.parameters["from_ms"] = "0" + } + else -> throw NexusError( + HttpStatusCode.NotImplemented, + "FetchSpec ${fetchSpec::class} not supported" + ) + } + } + logger.debug("Requesting x-libeufin-bank transactions to: $fetchUrl") + val resp: HttpResponse = try { + client.get(fetchUrl) { + expectSuccess = true + contentType(ContentType.Application.Json) + basicAuth(credentials.username, credentials.password) + } + } catch (e: Exception) { + e.printStackTrace() + logger.error(e.message) + return listOf(e) + } + val respBlob = resp.bodyAsChannel().toByteArray() + transaction { + NexusBankMessageEntity.new { + bankConnection = conn + message = ExposedBlob(respBlob) + } + } + return null + } +} + +/** + * Parses one x-libeufin-bank message and INSERTs Nexus local + * transaction records into the database. After this function + * returns, the transactions are ready to both being communicated + * to the CLI via the native JSON interface OR being further processed + * by ANY facade. + * + * This function: + * - updates the local timestamps related to the latest report. + * - inserts a new NexusBankTransactionEntity. To achieve that, it extracts the: + * -- amount + * -- credit/debit indicator + * -- currency + * + * Note: in contrast to what the CaMt handler does, here there's NO + * status, since Sandbox has only one (unnamed) transaction state and + * all transactions are asked as reports. + */ +fun processXLibeufinBankMessage( + bankAccountId: String, + data: JsonNode +): IngestedTransactionsCount { + data class XLibeufinBankTransactions( + val transactions: List<XLibeufinBankTransaction> + ) + val txs = try { + jacksonObjectMapper().treeToValue( + data, + XLibeufinBankTransactions::class.java + ) + } catch (e: Exception) { + throw NexusError( + HttpStatusCode.BadGateway, + "The bank sent invalid x-libeufin-bank transactions." + ) + } + val bankAccount = getBankAccount(bankAccountId) + var newTxs = 0 // Counts how many transactions are new. + txs.transactions.forEach { + val maybeTimestamp = try { + it.date.toLong() + } catch (e: Exception) { + throw NexusError( + HttpStatusCode.BadGateway, + "The bank gave an invalid timestamp " + + "for x-libeufin-bank message: ${it.uid}" + ) + } + // Searching for duplicates. + if (findDuplicate(bankAccountId, it.uid) != null) { + logger.debug( + "x-libeufin-bank ingestion: transaction ${it.uid} is a duplicate, skipping." + ) + return@forEach + } + val direction = if (it.debtorIban == bankAccount.iban) + XLibeufinBankDirection.DEBIT else XLibeufinBankDirection.CREDIT + // New tx, storing it. + transaction { + val localTx = NexusBankTransactionEntity.new { + this.bankAccount = bankAccount + this.amount = it.amount + this.currency = it.currency + /** + * Sandbox has only booked state for its transactions: as soon as + * one payment makes it to the database, that is the final (booked) + * state. + */ + this.status = EntryStatus.BOOK + this.accountTransactionId = it.uid + this.transactionJson = jacksonObjectMapper( + ).writeValueAsString(it.exportAsCamtModel()) + this.creditDebitIndicator = direction.direction + newTxs++ + logger.debug("x-libeufin-bank transaction with subject '${it.subject}' ingested.") + } + /** + * The following block tries to reconcile a previous prepared + * (outgoing) payment with the one being iterated over. + */ + if (direction == XLibeufinBankDirection.DEBIT) { + val maybePrepared = getPaymentInitiation(pmtInfId = it.uid) + if (maybePrepared != null) maybePrepared.confirmationTransaction = localTx + } + // x-libeufin-bank transactions are ALWAYS modeled as reports + // in Nexus, because such bank protocol supplier doesn't have + // the report vs. statement distinction. Therefore, we only + // consider the last report timestamp. + if ((bankAccount.lastStatementCreationTimestamp ?: 0L) < maybeTimestamp) + bankAccount.lastStatementCreationTimestamp = maybeTimestamp + } + } + return IngestedTransactionsCount( + newTransactions = newTxs, + downloadedTransactions = txs.transactions.size + ) +} + +fun XLibeufinBankTransaction.exportCamtDirectionIndicator(): CreditDebitIndicator = + if (this.direction == XLibeufinBankDirection.CREDIT) + CreditDebitIndicator.CRDT else CreditDebitIndicator.DBIT + +/** + * This function transforms an x-libeufin-bank transaction + * into the JSON representation of CaMt used by Nexus along + * its processing. Notably, this helps to stick to one unified + * type when facades process transactions. + */ +fun XLibeufinBankTransaction.exportAsCamtModel(): CamtBankAccountEntry = + CamtBankAccountEntry( + /** + * Amount obtained by summing all the transactions accounted + * in this report/statement. Here this field equals the amount of the + * _unique_ transaction accounted. + */ + amount = CurrencyAmount(currency = this.currency, value = this.amount), + accountServicerRef = this.uid, + bankTransactionCode = "Not given", + bookingDate = this.date, + counterValueAmount = null, + creditDebitIndicator = this.exportCamtDirectionIndicator(), + currencyExchange = null, + entryRef = null, + instructedAmount = null, + valueDate = null, + status = EntryStatus.BOOK, // x-libeufin-bank always/only BOOK. + /** + * This field accounts for the _unique_ transaction that this + * object represents. + */ + batches = listOf( + Batch( + messageId = null, + paymentInformationId = this.uid, + batchTransactions = listOf( + BatchTransaction( + amount = CurrencyAmount( + currency = this.currency, + value = this.amount + ), + creditDebitIndicator = this.exportCamtDirectionIndicator(), + details = TransactionDetails( + debtor = PartyIdentification( + name = this.debtorName, + countryOfResidence = null, + organizationId = null, + otherId = null, + postalAddress = null, + privateId = null + ), + debtorAccount = CashAccount( + name = null, + currency = this.currency, + iban = this.debtorIban, + otherId = null + ), + debtorAgent = AgentIdentification( + name = null, + bic = this.debtorBic, + clearingSystemCode = null, + clearingSystemMemberId = null, + lei = null, + otherId = null, + postalAddress = null, + proprietaryClearingSystemCode = null + ), + counterValueAmount = null, + currencyExchange = null, + interBankSettlementAmount = null, + proprietaryPurpose = null, + purpose = null, + returnInfo = null, + ultimateCreditor = null, + ultimateDebtor = null, + unstructuredRemittanceInformation = this.subject, + instructedAmount = null, + creditor = PartyIdentification( + name = this.creditorName, + countryOfResidence = null, + organizationId = null, + otherId = null, + postalAddress = null, + privateId = null + ), + creditorAccount = CashAccount( + name = null, + currency = this.currency, + iban = this.creditorIban, + otherId = null + ), + creditorAgent = AgentIdentification( + name = null, + bic = this.creditorBic, + clearingSystemCode = null, + clearingSystemMemberId = null, + lei = null, + otherId = null, + postalAddress = null, + proprietaryClearingSystemCode = null + ) + ) + ) + ) + ) + ) + )
\ No newline at end of file |