libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 41628115dcac19c5f1532fdafb06e361a409fa8a
parent b2b49493967b71b58910274f8d3fe87427891c7a
Author: MS <ms@taler.net>
Date:   Thu,  9 Nov 2023 18:25:24 +0100

nexus fetch

- fixing the file logger
- implementing --only-statements flag
- fixing the SELECTion of non-existent execution_time

Diffstat:
Mnexus/src/main/kotlin/tech/libeufin/nexus/Database.kt | 8+++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt | 96++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt | 22++++++++++++----------
3 files changed, 88 insertions(+), 38 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt @@ -319,10 +319,12 @@ class Database(dbConfig: String): java.io.Closeable { ) stmt.executeQuery().use { if (!it.next()) return@runConn null - val timestamp = it.getLong("latest_execution_time").microsToJavaInstant() - if (timestamp == null) + val timestamp = it.getLong("latest_execution_time") + if (timestamp == 0L) return@runConn null + val asInstant = timestamp.microsToJavaInstant() + if (asInstant == null) throw Exception("Could not convert latest_execution_time to Instant") - return@runConn timestamp + return@runConn asInstant } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -7,13 +7,13 @@ import io.ktor.client.* import kotlinx.coroutines.runBlocking import org.apache.commons.compress.archivers.zip.ZipFile import org.apache.commons.compress.utils.SeekableInMemoryByteChannel -import tech.libeufin.nexus.ebics.EbicsSideError import tech.libeufin.nexus.ebics.EbicsSideException import tech.libeufin.nexus.ebics.createEbics3DownloadInitialization import tech.libeufin.nexus.ebics.doEbicsDownload import tech.libeufin.util.ebics_h005.Ebics3Request import tech.libeufin.util.getXmlDate import tech.libeufin.util.toDbMicros +import java.io.File import java.nio.file.Path import java.time.Instant import java.time.LocalDate @@ -217,7 +217,7 @@ fun prepReportRequest( * length is zero. It returns null, if the bank assigned an * error to the EBICS transaction. */ -suspend fun downloadRecords( +private suspend fun downloadHelper( cfg: EbicsSetupConfig, bankKeys: BankPublicKeysFile, clientKeys: ClientPrivateKeysFile, @@ -260,29 +260,32 @@ suspend fun downloadRecords( * @param content ZIP bytes from the server. */ fun maybeLogFile(cfg: EbicsSetupConfig, content: ByteArray) { + // Main dir. val maybeLogDir = cfg.config.lookupString( - "[neuxs-fetch]", + "nexus-fetch", "STATEMENT_LOG_DIRECTORY" ) ?: return - try { Path.of(maybeLogDir).createDirectories() } + logger.debug("Logging to $maybeLogDir") + // Subdir based on current day. + val now = Instant.now() + val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC")) + val subDir = "${asUtcDate.year}-${asUtcDate.monthValue}-${asUtcDate.dayOfMonth}" + // Creating the combined dir. + val dirs = Path.of(maybeLogDir, subDir) + try { dirs.createDirectories() } catch (e: Exception) { - logger.error("Could not create log directory of path: $maybeLogDir") + logger.error("Could not create log directory of path: $dirs") // check how dirs stringifies. exitProcess(1) } - val now = Instant.now() - val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC")) + // Write each ZIP entry in the combined dir. content.unzipForEach { fileName, xmlContent -> - val f = Path.of( - "${asUtcDate.year}-${asUtcDate.monthValue}-${asUtcDate.dayOfMonth}", - "${now.toDbMicros()}_$fileName" - ).toFile() - val completePath = Path.of(maybeLogDir, f.path) + val f = File(dirs.toString(), "${now.toDbMicros()}_$fileName") // Rare: cannot download the same file twice in the same microsecond. if (f.exists()) { - logger.error("Log file exists already at: $completePath") + logger.error("Log file exists already at: ${f.path}") exitProcess(1) } - completePath.toFile().writeText(xmlContent) + f.writeText(xmlContent) } } @@ -305,19 +308,24 @@ fun maybeLogFile(cfg: EbicsSetupConfig, content: ByteArray) { * @param clientKeys EBICS subscriber private keys. * @param bankKeys bank public keys. */ -suspend fun fetchHistory( +private suspend fun fetchDocuments( cfg: EbicsSetupConfig, db: Database, httpClient: HttpClient, clientKeys: ClientPrivateKeysFile, - bankKeys: BankPublicKeysFile + bankKeys: BankPublicKeysFile, + whichDocument: SupportedDocument = SupportedDocument.CAMT_054 ) { // maybe get last execution_date. - val lastExecutionTime = db.incomingPaymentLastExecTime() - // Asking unseen records. - val req = if (lastExecutionTime == null) prepNotificationRequest(isAppendix = false) - else prepNotificationRequest(lastExecutionTime, isAppendix = false) - val maybeContent = downloadRecords( + val lastExecutionTime: Instant? = db.incomingPaymentLastExecTime() + logger.debug("Fetching documents from timestamp: $lastExecutionTime") + val req = when(whichDocument) { + SupportedDocument.PAIN_002 -> prepAckRequest(startDate = lastExecutionTime) + SupportedDocument.CAMT_052 -> prepReportRequest(startDate = lastExecutionTime) + SupportedDocument.CAMT_053 -> prepStatementRequest(startDate = lastExecutionTime) + SupportedDocument.CAMT_054 -> prepNotificationRequest(startDate = lastExecutionTime, isAppendix = false) + } + val maybeContent = downloadHelper( cfg, bankKeys, clientKeys, @@ -329,7 +337,13 @@ suspend fun fetchHistory( maybeLogFile(cfg, maybeContent) } -class EbicsFetch: CliktCommand("Fetches bank records") { +enum class SupportedDocument { + PAIN_002, + CAMT_053, + CAMT_052, + CAMT_054 +} +class EbicsFetch: CliktCommand("Fetches bank records. Defaults to camt.054 notifications") { private val configFile by option( "--config", "-c", help = "set the configuration file" @@ -340,6 +354,10 @@ class EbicsFetch: CliktCommand("Fetches bank records") { "ignoring the 'frequency' configuration value" ).flag(default = false) + private val onlyStatements by option( + help = "Downloads only camt.053 statements" + ).flag(default = false) + /** * This function collects the main steps of fetching banking records. * In this current version, it does not implement long polling, instead @@ -366,9 +384,19 @@ class EbicsFetch: CliktCommand("Fetches bank records") { exitProcess(1) } val httpClient = HttpClient() + val whichDoc = if (onlyStatements) SupportedDocument.CAMT_053 else SupportedDocument.CAMT_054 if (transient) { logger.info("Transient mode: fetching once and returning.") - runBlocking { fetchHistory(cfg, db, httpClient, clientKeys, bankKeys) } + runBlocking { + fetchDocuments( + cfg, + db, + httpClient, + clientKeys, + bankKeys, + whichDoc + ) + } return } val frequency: NexusFrequency = doOrFail { @@ -379,14 +407,32 @@ class EbicsFetch: CliktCommand("Fetches bank records") { logger.debug("Running with a frequency of ${frequency.fromConfig}") if (frequency.inSeconds == 0) { logger.warn("Long-polling not implemented, running therefore in transient mode") - runBlocking { fetchHistory(cfg, db, httpClient, clientKeys, bankKeys) } + runBlocking { + fetchDocuments( + cfg, + db, + httpClient, + clientKeys, + bankKeys, + whichDoc + ) + } return } fixedRateTimer( name = "ebics submit period", period = (frequency.inSeconds * 1000).toLong(), action = { - runBlocking { fetchHistory(cfg, db, httpClient, clientKeys, bankKeys) } + runBlocking { + fetchDocuments( + cfg, + db, + httpClient, + clientKeys, + bankKeys, + whichDoc + ) + } } ) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -30,6 +30,7 @@ import tech.libeufin.nexus.ebics.EbicsUploadException import tech.libeufin.nexus.ebics.submitPain001 import tech.libeufin.util.parsePayto import tech.libeufin.util.toDbMicros +import java.io.File import java.nio.file.Path import java.time.Instant import java.time.LocalDate @@ -137,24 +138,25 @@ private suspend fun submitInitiatedPayment( "SUBMISSIONS_LOG_DIRECTORY" ) if (logDir != null) { - try { Path.of(logDir).createDirectories() } + val now = Instant.now() + val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC")) + val subDir = "${asUtcDate.year}-${asUtcDate.monthValue}-${asUtcDate.dayOfMonth}" + val dirs = Path.of(logDir, subDir) + try { dirs.createDirectories() } catch (e: Exception) { - logger.error("Could not create log directory of path: $logDir") + logger.error("Could not create log directory of path: $dirs") exitProcess(1) } - val now = Instant.now() - val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC")) - val f = Path.of( - "${asUtcDate.year}-${asUtcDate.monthValue}-${asUtcDate.dayOfMonth}", + val f = File( + dirs.toString(), "${now.toDbMicros()}_requestUid_${initiatedPayment.requestUid}_pain.001.xml" - ).toFile() - val completePath = Path.of(logDir, f.path) + ) // Very rare: same pain.001 should not be submitted twice in the same microsecond. if (f.exists()) { - logger.error("pain.001 log file exists already at: $completePath") + logger.error("pain.001 log file exists already at: $f") exitProcess(1) } - completePath.toFile().writeText(xml) + f.writeText(xml) } }