/* * This file is part of LibEuFin. * Copyright (C) 2024 Taler Systems S.A. * LibEuFin is free software; you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation; either version 3, or * (at your option) any later version. * LibEuFin is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General * Public License for more details. * You should have received a copy of the GNU Affero General Public * License along with LibEuFin; see the file COPYING. If not, see * */ package tech.libeufin.nexus import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.* import com.github.ajalt.clikt.parameters.groups.* import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import io.ktor.client.* import io.ktor.client.plugins.* import kotlinx.coroutines.* import tech.libeufin.common.* import tech.libeufin.nexus.ebics.* import java.io.IOException import java.io.InputStream import java.time.Instant import java.time.LocalDate import java.time.ZoneId import kotlin.io.* import kotlin.io.path.* /** * Necessary data to perform a download. */ data class FetchContext( /** * Config handle. */ val cfg: EbicsSetupConfig, /** * HTTP client handle to reach the bank */ val httpClient: HttpClient, /** * EBICS subscriber private keys. */ val clientKeys: ClientPrivateKeysFile, /** * Bank public keys. */ val bankKeys: BankPublicKeysFile, /** * Start date of the returned documents. Only * used in --transient mode. */ var pinnedStart: Instant?, val fileLogger: FileLogger ) /** * Downloads content via EBICS, according to the order params passed * by the caller. * * @param T [Ebics2Request] for EBICS 2 or [Ebics3Request.OrderDetails.BTOrderParams] for EBICS 3 * @param ctx [FetchContext] * @param req contains the instructions for the download, namely * which document is going to be downloaded from the bank. * @return the [ByteArray] payload. On an empty response, the array * length is zero. It returns null, if the bank assigned an * error to the EBICS transaction. */ private suspend fun downloadHelper( ctx: FetchContext, lastExecutionTime: Instant? = null, doc: SupportedDocument, processing: (InputStream) -> Unit ) { val initXml = Ebics3BTS( ctx.cfg, ctx.bankKeys, ctx.clientKeys ).downloadInitializationDoc(doc, lastExecutionTime) return ebicsDownload( ctx.httpClient, ctx.cfg, ctx.clientKeys, ctx.bankKeys, initXml, processing ) } /** * Converts the 2-digits fraction value as given by the bank * (postfinance dialect), to the Taler 8-digit value (db representation). * * @param bankFrac fractional value * @return the Taler fractional value with at most 8 digits. */ private fun makeTalerFrac(bankFrac: String): Int { if (bankFrac.length > 2) throw Exception("Fractional value has more than 2 digits") var buf = bankFrac.toIntOrNull() ?: throw Exception("Fractional value not an Int: $bankFrac") repeat(8 - bankFrac.length) { buf *= 10 } return buf } /** * Ingests an outgoing payment. It links it to the initiated * outgoing transaction that originated it. * * @param db database handle. * @param payment payment to (maybe) ingest. */ suspend fun ingestOutgoingPayment( db: Database, payment: OutgoingPayment ) { val result = db.registerOutgoing(payment) if (result.new) { if (result.initiated) logger.info("$payment") else logger.warn("$payment recovered") } else { logger.debug("$payment already seen") } } private val PATTERN = Regex("[a-z0-9A-Z]{52}") /** * Ingests an incoming payment. Stores the payment into valid talerable ones * or bounces it, according to the subject. * * @param db database handle. * @param payment payment to (maybe) ingest. */ suspend fun ingestIncomingPayment( db: Database, payment: IncomingPayment ) { runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold( onSuccess = { reservePub -> val result = db.registerTalerableIncoming(payment, reservePub) if (result.new) { logger.info("$payment") } else { logger.debug("$payment already seen") } }, onFailure = { e -> val result = db.registerMalformedIncoming( payment, payment.amount, Instant.now() ) if (result.new) { logger.info("$payment bounced in '${result.bounceId}': ${e.message}") } else { logger.debug("$payment already seen and bounced in '${result.bounceId}': ${e.message}") } } ) } /** * Ingests an outgoing payment bounce. * * @param db database handle. * @param reversal reversal ingest. */ suspend fun ingestReversal( db: Database, reversal: OutgoingReversal ) { logger.warn("BOUNCE '${reversal.bankId}': ${reversal.reason}") // TODO store in db= } private fun ingestDocument( db: Database, currency: String, xml: InputStream, whichDocument: SupportedDocument ) { when (whichDocument) { SupportedDocument.CAMT_054 -> { try { val notifications = mutableListOf() parseTxNotif(xml, currency, notifications) runBlocking { notifications.forEach { when (it) { is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment) is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment) is TxNotification.Reversal -> ingestReversal(db, it.reversal) } } } } catch (e: Exception) { throw Exception("Ingesting notifications failed", e) } } SupportedDocument.PAIN_002_LOGS -> { val acks = parseCustomerAck(xml) for (ack in acks) { val msg = if (ack.orderId != null) { if (ack.code != null) { val msg = ack.msg() db.mem[ack.orderId] = msg msg } else { db.mem[ack.orderId] } } else { null } when (ack.actionType) { HacAction.FILE_DOWNLOAD -> logger.debug("$ack") HacAction.ORDER_HAC_FINAL_POS -> { // TODO update pending transaction status logger.debug("$ack") logger.info("Order '${ack.orderId}' was accepted at ${ack.timestamp.fmtDateTime()}") } HacAction.ORDER_HAC_FINAL_NEG -> { // TODO update pending transaction status logger.debug("$ack") logger.warn("Order '${ack.orderId}' was refused at ${ack.timestamp.fmtDateTime()}: $msg") } else -> { // TODO update pending transaction status logger.debug("$ack") } } } } SupportedDocument.PAIN_002 -> { val status = parseCustomerPaymentStatusReport(xml) if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) logger.warn("Transaction '${status.id()}' was rejected") // TODO update pending transaction status logger.debug("$status") } SupportedDocument.CAMT_053, SupportedDocument.CAMT_052 -> { // TODO parsing // TODO ingesting } } } private fun ingestDocuments( db: Database, currency: String, content: InputStream, whichDocument: SupportedDocument ) { when (whichDocument) { SupportedDocument.CAMT_054, SupportedDocument.PAIN_002, SupportedDocument.CAMT_053, SupportedDocument.CAMT_052 -> { try { content.unzipEach { fileName, xmlContent -> logger.trace("parse $fileName") ingestDocument(db, currency, xmlContent, whichDocument) } } catch (e: IOException) { throw Exception("Could not open any ZIP archive", e) } } SupportedDocument.PAIN_002_LOGS -> ingestDocument(db, currency, content, whichDocument) } } /** * Fetches the banking records via EBICS notifications requests. * * It first checks the last execution_time (db column) among the * incoming transactions. If that's not found, it asks the bank * about 'unseen notifications' (= does not specify any date range * in the request). If that's found, it crafts a notification * request with such execution_time as the start date and now as * the end date. * * What this function does NOT do (now): linking documents between * different camt.05x formats and/or pain.002 acknowledgements. * * @param db database connection * @param ctx [FetchContext] * @param pinnedStart explicit start date for the downloaded documents. * This parameter makes the last incoming transaction timestamp in * the database IGNORED. Only useful when running in --transient * mode to download past documents / debug. */ private suspend fun fetchDocuments( db: Database, ctx: FetchContext, docs: List ): Boolean { val lastExecutionTime: Instant? = ctx.pinnedStart return docs.all { doc -> try { if (lastExecutionTime == null) { logger.info("Fetching new '${doc.fullDescription()}'") } else { logger.info("Fetching '${doc.fullDescription()}' from timestamp: $lastExecutionTime") } val doc = doc.doc() // downloading the content downloadHelper(ctx, lastExecutionTime, doc) { stream -> val loggedStream = ctx.fileLogger.logFetch( stream, doc == SupportedDocument.PAIN_002_LOGS ) ingestDocuments(db, ctx.cfg.currency, loggedStream, doc) } true } catch (e: Exception) { e.fmtLog(logger) false } } } enum class EbicsDocument { /// EBICS acknowledgement - CustomerAcknowledgement HAC pain.002 acknowledgement, /// Payment status - CustomerPaymentStatusReport pain.002 status, /// Account intraday reports - BankToCustomerAccountReport camt.052 // report, TODO add support /// Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054 notification, /// Account statements - BankToCustomerStatement camt.053 // statement, TODO add support ; fun shortDescription(): String = when (this) { acknowledgement -> "EBICS acknowledgement" status -> "Payment status" //Document.report -> "Account intraday reports" notification -> "Debit & credit notifications" //Document.statement -> "Account statements" } fun fullDescription(): String = when (this) { acknowledgement -> "EBICS acknowledgement - CustomerAcknowledgement HAC pain.002" status -> "Payment status - CustomerPaymentStatusReport pain.002" //report -> "Account intraday reports - BankToCustomerAccountReport camt.052" notification -> "Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054" //statement -> "Account statements - BankToCustomerStatement camt.053" } fun doc(): SupportedDocument = when (this) { acknowledgement -> SupportedDocument.PAIN_002_LOGS status -> SupportedDocument.PAIN_002 //Document.report -> SupportedDocument.CAMT_052 notification -> SupportedDocument.CAMT_054 //Document.statement -> SupportedDocument.CAMT_053 } } class EbicsFetch: CliktCommand("Fetches EBICS files") { private val common by CommonOption() private val transient by option( "--transient", help = "This flag fetches only once from the bank and returns, " + "ignoring the 'frequency' configuration value" ).flag(default = false) private val documents: Set by argument( help = "Which documents should be fetched? If none are specified, all supported documents will be fetched", helpTags = EbicsDocument.entries.map { Pair(it.name, it.shortDescription()) }.toMap() ).enum().multiple().unique() private val pinnedStart by option( help = "Constant YYYY-MM-DD date for the earliest document" + " to download (only consumed in --transient mode). The" + " latest document is always until the current time." ) private val ebicsLog by option( "--debug-ebics", help = "Log EBICS content at SAVEDIR", ) /** * This function collects the main steps of fetching banking records. * In this current version, it does not implement long polling, instead * it runs transient if FREQUENCY is zero. Transient is also the default * mode when no flags are passed to the invocation. * FIXME: reduce code duplication with the submit subcommand. */ override fun run() = cliCmd(logger, common.log) { val cfg: EbicsSetupConfig = extractEbicsConfig(common.config) val dbCfg = cfg.config.dbConfig() Database(dbCfg.dbConnStr).use { db -> val (clientKeys, bankKeys) = expectFullKeys(cfg) val ctx = FetchContext( cfg, HttpClient { install(HttpTimeout) { // It can take a lot of time for the bank to generate documents socketTimeoutMillis = 5 * 60 * 1000 } }, clientKeys, bankKeys, null, FileLogger(ebicsLog) ) val docs = if (documents.isEmpty()) EbicsDocument.entries else documents.toList() if (transient) { logger.info("Transient mode: fetching once and returning.") val pinnedStartVal = pinnedStart val pinnedStartArg = if (pinnedStartVal != null) { logger.debug("Pinning start date to: $pinnedStartVal") // Converting YYYY-MM-DD to Instant. LocalDate.parse(pinnedStartVal).atStartOfDay(ZoneId.of("UTC")).toInstant() } else null ctx.pinnedStart = pinnedStartArg if (!fetchDocuments(db, ctx, docs)) { throw Exception("Failed to fetch documents") } } else { val configValue = cfg.config.requireString("nexus-fetch", "frequency") val frequencySeconds = checkFrequency(configValue) val cfgFrequency: NexusFrequency = NexusFrequency(frequencySeconds, configValue) logger.debug("Running with a frequency of ${cfgFrequency.fromConfig}") val frequency: NexusFrequency? = if (cfgFrequency.inSeconds == 0) { logger.warn("Long-polling not implemented, running therefore in transient mode") null } else { cfgFrequency } do { fetchDocuments(db, ctx, docs) delay(((frequency?.inSeconds ?: 0) * 1000).toLong()) } while (frequency != null) } } } }