/*
* This file is part of LibEuFin.
* Copyright (C) 2024 Taler Systems S.A.
* LibEuFin is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation; either version 3, or
* (at your option) any later version.
* LibEuFin is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
* Public License for more details.
* You should have received a copy of the GNU Affero General Public
* License along with LibEuFin; see the file COPYING. If not, see
*
*/
package tech.libeufin.nexus.db
import tech.libeufin.nexus.*
import tech.libeufin.common.*
import java.time.Instant
import java.sql.ResultSet
/** Data access logic for initiated outgoing payments */
class InitiatedDAO(private val db: Database) {
/** Outgoing payments initiation result */
enum class PaymentInitiationResult {
REQUEST_UID_REUSE,
SUCCESS
}
/** Register a new pending payment in the database */
suspend fun create(paymentData: InitiatedPayment): PaymentInitiationResult = db.conn { conn ->
val stmt = conn.prepareStatement("""
INSERT INTO initiated_outgoing_transactions (
amount
,wire_transfer_subject
,credit_payto_uri
,initiation_time
,request_uid
) VALUES ((?,?)::taler_amount,?,?,?,?)
""")
stmt.setLong(1, paymentData.amount.value)
stmt.setInt(2, paymentData.amount.frac)
stmt.setString(3, paymentData.wireTransferSubject)
stmt.setString(4, paymentData.creditPaytoUri.toString())
val initiationTime = paymentData.initiationTime.toDbMicros() ?: run {
throw Exception("Initiation time could not be converted to microseconds for the database.")
}
stmt.setLong(5, initiationTime)
stmt.setString(6, paymentData.requestUid)
if (stmt.executeUpdateViolation())
return@conn PaymentInitiationResult.SUCCESS
return@conn PaymentInitiationResult.REQUEST_UID_REUSE
}
/** Register EBICS submission success */
suspend fun submissionSuccess(
id: Long,
now: Instant,
orderId: String
) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions SET
submitted = 'success'::submission_state
,last_submission_time = ?
,failure_message = NULL
,order_id = ?
,submission_counter = submission_counter + 1
WHERE initiated_outgoing_transaction_id = ?
""")
stmt.setLong(1, now.toDbMicros()!!)
stmt.setString(2, orderId)
stmt.setLong(3, id)
stmt.execute()
}
/** Register EBICS submission failure */
suspend fun submissionFailure(
id: Long,
now: Instant,
msg: String?
) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions SET
submitted = 'transient_failure'::submission_state
,last_submission_time = ?
,failure_message = ?
,submission_counter = submission_counter + 1
WHERE initiated_outgoing_transaction_id = ?
""")
stmt.setLong(1, now.toDbMicros()!!)
stmt.setString(2, msg)
stmt.setLong(3, id)
stmt.execute()
}
/** Register EBICS log status message */
suspend fun logMessage(orderId: String, msg: String) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions SET failure_message = ?
WHERE order_id = ?
""")
stmt.setString(1, msg)
stmt.setString(2, orderId)
stmt.execute()
}
/** Register EBICS log success and return request_uid if found */
suspend fun logSuccess(orderId: String): String? = db.conn { conn ->
val stmt = conn.prepareStatement("""
SELECT request_uid FROM initiated_outgoing_transactions
WHERE order_id = ?
""")
stmt.setString(1, orderId)
stmt.oneOrNull { it.getString(1) }
}
/** Register EBICS log failure and return request_uid and previous message if found */
suspend fun logFailure(orderId: String): Pair? = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions
SET submitted = 'permanent_failure'::submission_state
WHERE order_id = ?
RETURNING request_uid, failure_message
""")
stmt.setString(1, orderId)
stmt.oneOrNull { Pair(it.getString(1), it.getString(2)) }
}
/** Register bank status message */
suspend fun bankMessage(requestUID: String, msg: String) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions
SET failure_message = ?
WHERE request_uid = ?
""")
stmt.setString(1, msg)
stmt.setString(2, requestUID)
stmt.execute()
}
/** Register bank failure */
suspend fun bankFailure(requestUID: String, msg: String) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions SET
submitted = 'permanent_failure'::submission_state
,failure_message = ?
WHERE request_uid = ?
""")
stmt.setString(1, msg)
stmt.setString(2, requestUID)
stmt.execute()
}
/** Register reversal */
suspend fun reversal(requestUID: String, msg: String) = db.conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions SET
submitted = 'permanent_failure'::submission_state
,failure_message = ?
WHERE request_uid = ?
""")
stmt.setString(1, msg)
stmt.setString(2, requestUID)
stmt.execute()
}
/** List every initiated payment pending submission in ther order they should be submitted */
suspend fun submittable(currency: String): List = db.conn { conn ->
fun extract(it: ResultSet): InitiatedPayment {
val rowId = it.getLong("initiated_outgoing_transaction_id")
val initiationTime = it.getLong("initiation_time").microsToJavaInstant()
if (initiationTime == null) { // nexus fault
throw Exception("Found invalid timestamp at initiated payment with ID: $rowId")
}
return InitiatedPayment(
id = it.getLong("initiated_outgoing_transaction_id"),
amount = it.getAmount("amount", currency),
creditPaytoUri = it.getString("credit_payto_uri"),
wireTransferSubject = it.getString("wire_transfer_subject"),
initiationTime = initiationTime,
requestUid = it.getString("request_uid")
)
}
val selectPart = """
SELECT
initiated_outgoing_transaction_id
,(amount).val as amount_val
,(amount).frac as amount_frac
,wire_transfer_subject
,credit_payto_uri
,initiation_time
,request_uid
FROM initiated_outgoing_transactions
"""
// We want to maximize the number of successfully submitted transactions in the event
// of a malformed transaction or a persistent error classified as transient. We send
// the unsubmitted transactions first, starting with the oldest by creation time.
// This is the happy path, giving every transaction a chance while being fair on the
// basis of creation date.
// Then we retry the failed transaction, starting with the oldest by submission time.
// This the bad path retrying each failed transaction applying a rotation based on
// resubmission time.
val unsubmitted = conn.prepareStatement(
"$selectPart WHERE submitted='unsubmitted' ORDER BY initiation_time"
).all(::extract)
val failed = conn.prepareStatement(
"$selectPart WHERE submitted='transient_failure' ORDER BY last_submission_time"
).all(::extract)
unsubmitted + failed
}
}