/* * This file is part of LibEuFin. * Copyright (C) 2024 Taler Systems S.A. * LibEuFin is free software; you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation; either version 3, or * (at your option) any later version. * LibEuFin is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General * Public License for more details. * You should have received a copy of the GNU Affero General Public * License along with LibEuFin; see the file COPYING. If not, see * */ package tech.libeufin.nexus import org.postgresql.util.PSQLState import tech.libeufin.common.* import java.sql.PreparedStatement import java.sql.SQLException import java.text.SimpleDateFormat import java.time.Instant import java.util.* fun Instant.fmtDate(): String { val formatter = SimpleDateFormat("yyyy-MM-dd") return formatter.format(Date.from(this)) } fun Instant.fmtDateTime(): String { val formatter = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") return formatter.format(Date.from(this)) } // INCOMING PAYMENTS STRUCTS /** * Represents an incoming payment in the database. */ data class IncomingPayment( val amount: TalerAmount, val wireTransferSubject: String, val debitPaytoUri: String, val executionTime: Instant, /** ISO20022 AccountServicerReference */ val bankId: String ) { override fun toString(): String { return "IN ${executionTime.fmtDate()} $amount '$bankId' debitor=$debitPaytoUri subject=$wireTransferSubject" } } // INITIATED PAYMENTS STRUCTS enum class DatabaseSubmissionState { /** * Submission got both EBICS_OK. */ success, /** * Submission can be retried (network issue, for example) */ transient_failure, /** * Submission got at least one error code which was not * EBICS_OK. */ permanent_failure, /** * The submitted payment was never witnessed by a camt.5x * or pain.002 report. */ never_heard_back } /** * Minimal set of information to initiate a new payment in * the database. */ data class InitiatedPayment( val id: Long, val amount: TalerAmount, val wireTransferSubject: String, val creditPaytoUri: String, val initiationTime: Instant, val requestUid: String ) /** * Possible outcomes for inserting a initiated payment * into the database. */ enum class PaymentInitiationOutcome { /** * The row contains a client_request_uid that exists * already in the database. */ UNIQUE_CONSTRAINT_VIOLATION, /** * Record successfully created. */ SUCCESS } // OUTGOING PAYMENTS STRUCTS /** * Collects data of a booked outgoing payment. */ data class OutgoingPayment( val amount: TalerAmount, val executionTime: Instant, /** ISO20022 MessageIdentification */ val messageId: String, val creditPaytoUri: String? = null, // not showing in camt.054 val wireTransferSubject: String? = null // not showing in camt.054 ) { override fun toString(): String { return "OUT ${executionTime.fmtDate()} $amount '$messageId' creditor=$creditPaytoUri subject=$wireTransferSubject" } } /** Outgoing payments registration result */ data class OutgoingRegistrationResult( val id: Long, val initiated: Boolean, val new: Boolean ) /** Incoming payments registration result */ data class IncomingRegistrationResult( val id: Long, val new: Boolean ) /** Incoming payments bounce registration result */ data class IncomingBounceRegistrationResult( val id: Long, val bounceId: String, val new: Boolean ) /** * Performs a INSERT, UPDATE, or DELETE operation. * * @return true if at least one row was affected by this operation, * false on unique constraint violation or no rows were affected. * */ private fun PreparedStatement.maybeUpdate(): Boolean { try { this.executeUpdate() } catch (e: SQLException) { logger.error(e.message) if (e.sqlState == PSQLState.UNIQUE_VIOLATION.state) return false throw e // rethrowing, not to hide other types of errors. } return updateCount > 0 } /** * Collects database connection steps and any operation on the Nexus tables. */ class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") { // Temporary in memory database to store EBICS order status until we modify the schema to actually store it in the database var mem: MutableMap = mutableMapOf() // OUTGOING PAYMENTS METHODS /** * Register an outgoing payment OPTIONALLY reconciling it with its * initiated payment counterpart. * * @param paymentData information about the outgoing payment. * @return operation outcome enum. */ suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = conn { val stmt = it.prepareStatement(""" SELECT out_tx_id, out_initiated, out_found FROM register_outgoing( (?,?)::taler_amount ,? ,? ,? ,? )""" ) val executionTime = paymentData.executionTime.toDbMicros() ?: throw Exception("Could not convert outgoing payment execution_time to microseconds") stmt.setLong(1, paymentData.amount.value) stmt.setInt(2, paymentData.amount.frac) stmt.setString(3, paymentData.wireTransferSubject) stmt.setLong(4, executionTime) stmt.setString(5, paymentData.creditPaytoUri) stmt.setString(6, paymentData.messageId) stmt.executeQuery().use { when { !it.next() -> throw Exception("Inserting outgoing payment gave no outcome.") else -> OutgoingRegistrationResult( it.getLong("out_tx_id"), it.getBoolean("out_initiated"), !it.getBoolean("out_found") ) } } } // INCOMING PAYMENTS METHODS /** * Register an incoming payment and bounce it * * @param paymentData information about the incoming payment * @param requestUid unique identifier of the bounce outgoing payment to * initiate * @param bounceAmount amount to send back to the original debtor * @param bounceSubject subject of the bounce outhoing payment * @return true if new */ suspend fun registerMalformedIncoming( paymentData: IncomingPayment, bounceAmount: TalerAmount, now: Instant ): IncomingBounceRegistrationResult = conn { val stmt = it.prepareStatement(""" SELECT out_found, out_tx_id, out_bounce_id FROM register_incoming_and_bounce( (?,?)::taler_amount ,? ,? ,? ,? ,(?,?)::taler_amount ,? )""" ) val refundTimestamp = now.toDbMicros() ?: throw Exception("Could not convert refund execution time from Instant.now() to microsends.") val executionTime = paymentData.executionTime.toDbMicros() ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") stmt.setLong(1, paymentData.amount.value) stmt.setInt(2, paymentData.amount.frac) stmt.setString(3, paymentData.wireTransferSubject) stmt.setLong(4, executionTime) stmt.setString(5, paymentData.debitPaytoUri) stmt.setString(6, paymentData.bankId) stmt.setLong(7, bounceAmount.value) stmt.setInt(8, bounceAmount.frac) stmt.setLong(9, refundTimestamp) stmt.executeQuery().use { when { !it.next() -> throw Exception("Inserting malformed incoming payment gave no outcome") else -> IncomingBounceRegistrationResult( it.getLong("out_tx_id"), it.getString("out_bounce_id"), !it.getBoolean("out_found") ) } } } /** * Register an talerable incoming payment * * @param paymentData incoming talerable payment. * @param reservePub reserve public key. The caller is * responsible to check it. */ suspend fun registerTalerableIncoming( paymentData: IncomingPayment, reservePub: EddsaPublicKey ): IncomingRegistrationResult = conn { conn -> val stmt = conn.prepareStatement(""" SELECT out_found, out_tx_id FROM register_incoming_and_talerable( (?,?)::taler_amount ,? ,? ,? ,? ,? )""" ) val executionTime = paymentData.executionTime.toDbMicros() ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") stmt.setLong(1, paymentData.amount.value) stmt.setInt(2, paymentData.amount.frac) stmt.setString(3, paymentData.wireTransferSubject) stmt.setLong(4, executionTime) stmt.setString(5, paymentData.debitPaytoUri) stmt.setString(6, paymentData.bankId) stmt.setBytes(7, reservePub.raw) stmt.executeQuery().use { when { !it.next() -> throw Exception("Inserting talerable incoming payment gave no outcome") else -> IncomingRegistrationResult( it.getLong("out_tx_id"), !it.getBoolean("out_found") ) } } } /** * Get the last execution time of outgoing transactions. * * @return [Instant] or null if no results were found */ suspend fun outgoingPaymentLastExecTime(): Instant? = conn { conn -> val stmt = conn.prepareStatement( "SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions" ) stmt.executeQuery().use { if (!it.next()) return@conn null val timestamp = it.getLong("latest_execution_time") if (timestamp == 0L) return@conn null return@conn timestamp.microsToJavaInstant() ?: throw Exception("Could not convert latest_execution_time to Instant") } } /** * Get the last execution time of an incoming transaction. * * @return [Instant] or null if no results were found */ suspend fun incomingPaymentLastExecTime(): Instant? = conn { conn -> val stmt = conn.prepareStatement( "SELECT MAX(execution_time) as latest_execution_time FROM incoming_transactions" ) stmt.executeQuery().use { if (!it.next()) return@conn null val timestamp = it.getLong("latest_execution_time") if (timestamp == 0L) return@conn null return@conn timestamp.microsToJavaInstant() ?: throw Exception("Could not convert latest_execution_time to Instant") } } /** * Checks if the reserve public key already exists. * * @param maybeReservePub reserve public key to look up * @return true if found, false otherwise */ suspend fun isReservePubFound(maybeReservePub: EddsaPublicKey): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" SELECT 1 FROM talerable_incoming_transactions WHERE reserve_public_key = ?; """) stmt.setBytes(1, maybeReservePub.raw) val res = stmt.executeQuery() res.use { return@conn it.next() } } // INITIATED PAYMENTS METHODS /** * Represents all the states but "unsubmitted" related to an * initiated payment. Unsubmitted gets set by default by the * database and there's no case where it has to be reset to an * initiated payment. */ /** * Sets the submission state of an initiated payment. Transparently * sets the last_submission_time column too, as this corresponds to the * time when we set the state. * * @param rowId row ID of the record to set. * @param submissionState which state to set. * @return true on success, false if no payment was affected. */ suspend fun initiatedPaymentSetSubmittedState( rowId: Long, submissionState: DatabaseSubmissionState ): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" UPDATE initiated_outgoing_transactions SET submitted = submission_state(?), last_submission_time = ? WHERE initiated_outgoing_transaction_id = ? """ ) val now = Instant.now() stmt.setString(1, submissionState.name) stmt.setLong(2, now.toDbMicros() ?: run { throw Exception("Submission time could not be converted to microseconds for the database.") }) stmt.setLong(3, rowId) return@conn stmt.maybeUpdate() } /** * Sets the failure reason to an initiated payment. * * @param rowId row ID of the record to set. * @param failureMessage error associated to this initiated payment. * @return true on success, false if no payment was affected. */ suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" UPDATE initiated_outgoing_transactions SET failure_message = ? WHERE initiated_outgoing_transaction_id=? """ ) stmt.setString(1, failureMessage) stmt.setLong(2, rowId) return@conn stmt.maybeUpdate() } /** * Gets any initiated payment that was not submitted to the * bank yet. * * @param currency in which currency should the payment be submitted to the bank. * @return [Map] of the initiated payment row ID and [InitiatedPayment] */ suspend fun initiatedPaymentsSubmittableGet(currency: String): List = conn { conn -> val stmt = conn.prepareStatement(""" SELECT initiated_outgoing_transaction_id ,(amount).val as amount_val ,(amount).frac as amount_frac ,wire_transfer_subject ,credit_payto_uri ,initiation_time ,request_uid FROM initiated_outgoing_transactions WHERE (submitted='unsubmitted' OR submitted='transient_failure') AND ((amount).val != 0 OR (amount).frac != 0); """) stmt.all { val rowId = it.getLong("initiated_outgoing_transaction_id") val initiationTime = it.getLong("initiation_time").microsToJavaInstant() if (initiationTime == null) { // nexus fault throw Exception("Found invalid timestamp at initiated payment with ID: $rowId") } InitiatedPayment( id = it.getLong("initiated_outgoing_transaction_id"), amount = it.getAmount("amount", currency), creditPaytoUri = it.getString("credit_payto_uri"), wireTransferSubject = it.getString("wire_transfer_subject"), initiationTime = initiationTime, requestUid = it.getString("request_uid") ) } } /** * Initiate a payment in the database. The "submit" * command is then responsible to pick it up and submit * it to the bank. * * @param paymentData any data that's used to prepare the payment. * @return true if the insertion went through, false in case of errors. */ suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = conn { conn -> val stmt = conn.prepareStatement(""" INSERT INTO initiated_outgoing_transactions ( amount ,wire_transfer_subject ,credit_payto_uri ,initiation_time ,request_uid ) VALUES ( (?,?)::taler_amount ,? ,? ,? ,? ) """) stmt.setLong(1, paymentData.amount.value) stmt.setInt(2, paymentData.amount.frac) stmt.setString(3, paymentData.wireTransferSubject) stmt.setString(4, paymentData.creditPaytoUri.toString()) val initiationTime = paymentData.initiationTime.toDbMicros() ?: run { throw Exception("Initiation time could not be converted to microseconds for the database.") } stmt.setLong(5, initiationTime) stmt.setString(6, paymentData.requestUid) // can be null. if (stmt.maybeUpdate()) return@conn PaymentInitiationOutcome.SUCCESS /** * _very_ likely, Nexus didn't check the request idempotency, * as the row ID would never fall into the following problem. */ return@conn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION } /** * Gets the ID of an initiated payment. Useful to link it to its * outgoing payment witnessed in a bank record. * * @param uid UID as given by Nexus when it initiated the payment. * This value then gets specified as the MsgId of pain.001, * and it gets associated by the bank to the booked entries * in camt.05x reports. * @return the initiated payment row ID, or null if not found. NOTE: * null gets returned even when the initiated payment exists, * *but* it was NOT flagged as submitted. */ suspend fun initiatedPaymentGetFromUid(uid: String): Long? = conn { conn -> val stmt = conn.prepareStatement(""" SELECT initiated_outgoing_transaction_id FROM initiated_outgoing_transactions WHERE request_uid = ? AND submitted = 'success'; """) stmt.setString(1, uid) val res = stmt.executeQuery() res.use { if (!it.next()) return@conn null return@conn it.getLong("initiated_outgoing_transaction_id") } } }