libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 5f0c56d06e7cc1832eb019f046921b1acecf3752
parent 108124671366f031fa8c8c5f1028a83e0ecda130
Author: MS <ms@taler.net>
Date:   Mon, 20 Nov 2023 15:18:53 +0100

nexus fetch

Given t1 and t2 being the last execution times of outgoing and
incoming payments, now Nexus asks the bank account history starting
from min(t1, t2).

That prevents data loss, in case ingesting outgoing payments fails
before inserting all the entries in the database.  Before this change,
the start date to request the history was defined only after the
last incoming payment.

Diffstat:
Mdatabase-versioning/libeufin-nexus-0001.sql | 7++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/Database.kt | 27+++++++++++++++++++++------
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt | 16++++++++++++++--
Mutil/src/main/kotlin/time.kt | 27+++++++++++++++++++++++++++
Autil/src/test/kotlin/TimeTest.kt | 30++++++++++++++++++++++++++++++
5 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/database-versioning/libeufin-nexus-0001.sql b/database-versioning/libeufin-nexus-0001.sql @@ -98,9 +98,10 @@ CREATE TABLE IF NOT EXISTS bounced_transactions ,initiated_outgoing_transaction_id INT8 NOT NULL UNIQUE REFERENCES initiated_outgoing_transactions(initiated_outgoing_transaction_id) ON DELETE CASCADE ); --- Helps to detect the last known incoming transaction. --- According to this value, camt.05x date ranges should be adjusted. CREATE INDEX IF NOT EXISTS incoming_transaction_timestamp - ON incoming_transactions (execution_time); + ON incoming_transactions (execution_time); + +CREATE INDEX IF NOT EXISTS outgoing_transaction_timestamp + ON outgoing_transactions (execution_time); COMMIT; diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt @@ -321,8 +321,25 @@ class Database(dbConfig: String): java.io.Closeable { } /** - * Get the last execution time of an incoming transaction. This - * serves as the start date for new requests to the bank. + * Get the last execution time of outgoing transactions. + * + * @return [Instant] or null if no results were found + */ + suspend fun outgoingPaymentLastExecTime(): Instant? = runConn { conn -> + val stmt = conn.prepareStatement( + "SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions" + ) + stmt.executeQuery().use { + if (!it.next()) return@runConn null + val timestamp = it.getLong("latest_execution_time") + if (timestamp == 0L) return@runConn null + return@runConn timestamp.microsToJavaInstant() + ?: throw Exception("Could not convert latest_execution_time to Instant") + } + } + + /** + * Get the last execution time of an incoming transaction. * * @return [Instant] or null if no results were found */ @@ -334,10 +351,8 @@ class Database(dbConfig: String): java.io.Closeable { if (!it.next()) return@runConn null val timestamp = it.getLong("latest_execution_time") if (timestamp == 0L) return@runConn null - val asInstant = timestamp.microsToJavaInstant() - if (asInstant == null) - throw Exception("Could not convert latest_execution_time to Instant") - return@runConn asInstant + return@runConn timestamp.microsToJavaInstant() + ?: throw Exception("Could not convert latest_execution_time to Instant") } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -20,6 +20,7 @@ import java.time.ZoneId import java.util.UUID import kotlin.concurrent.fixedRateTimer import kotlin.io.path.createDirectories +import kotlin.math.min import kotlin.system.exitProcess import kotlin.text.StringBuilder @@ -427,8 +428,19 @@ private suspend fun fetchDocuments( db: Database, ctx: FetchContext ) { - // maybe get last execution_date. - val lastExecutionTime: Instant? = ctx.pinnedStart ?: db.incomingPaymentLastExecTime() + /** + * Getting the least execution between the latest incoming + * and outgoing payments. This way, if ingesting outgoing + * (incoming) payments crashed, we make sure we request from + * the last successful outgoing (incoming) payment execution + * time, to obtain again from the bank those payments that did + * not make it to the database due to the crash. + */ + val lastIncomingTime = db.incomingPaymentLastExecTime() + val lastOutgoingTime = db.outgoingPaymentLastExecTime() + val requestFrom: Instant? = minTimestamp(lastIncomingTime, lastOutgoingTime) + + val lastExecutionTime: Instant? = ctx.pinnedStart ?: requestFrom logger.debug("Fetching ${ctx.whichDocument} from timestamp: $lastExecutionTime") // downloading the content val maybeContent = downloadHelper(ctx, lastExecutionTime) ?: exitProcess(1) // client is wrong, failing. diff --git a/util/src/main/kotlin/time.kt b/util/src/main/kotlin/time.kt @@ -104,4 +104,31 @@ fun parseCamtTime(timeFromCamt: String): Instant { fun parseBookDate(bookDate: String): Instant { val l = LocalDate.parse(bookDate) return Instant.from(l.atStartOfDay(ZoneId.of("UTC"))) +} + +/** + * Returns the minimum instant between two. + * + * @param a input [Instant] + * @param b input [Instant] + * @return the minimum [Instant] or null if even one is null. + */ +fun minTimestamp(a: Instant?, b: Instant?): Instant? { + if (a == null || b == null) return null + if (a.isBefore(b)) return a + return b // includes the case where a == b. +} + +/** + * Returns the max instant between two. + * + * @param a input [Instant] + * @param b input [Instant] + * @return the max [Instant] or null if both are null + */ +fun maxTimestamp(a: Instant?, b: Instant?): Instant? { + if (a == null) return b + if (b == null) return a + if (a.isAfter(b)) return a + return b // includes the case where a == b } \ No newline at end of file diff --git a/util/src/test/kotlin/TimeTest.kt b/util/src/test/kotlin/TimeTest.kt @@ -0,0 +1,29 @@ +import org.junit.Test +import tech.libeufin.util.maxTimestamp +import tech.libeufin.util.minTimestamp +import java.time.Instant +import java.time.temporal.ChronoUnit +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class TimeTest { + @Test + fun cmp() { + val now = Instant.now() + val inOneMinute = now.plus(1, ChronoUnit.MINUTES) + + // testing the "min" function + assertNull(minTimestamp(null, null)) + assertEquals(now, minTimestamp(now, inOneMinute)) + assertNull(minTimestamp(now, null)) + assertNull(minTimestamp(null, now)) + assertEquals(inOneMinute, minTimestamp(inOneMinute, inOneMinute)) + + // testing the "max" function + assertNull(maxTimestamp(null, null)) + assertEquals(inOneMinute, maxTimestamp(now, inOneMinute)) + assertEquals(now, maxTimestamp(now, null)) + assertEquals(now, maxTimestamp(null, now)) + assertEquals(now, minTimestamp(now, now)) + } +} +\ No newline at end of file