diff options
author | Antoine A <> | 2024-02-13 17:20:33 +0100 |
---|---|---|
committer | Antoine A <> | 2024-02-13 17:20:33 +0100 |
commit | 3046601e239d774716527d4a0a34f585ac5ade79 (patch) | |
tree | 093ef827ed28be1c32ab25d3342b1a0e324a6ab4 /nexus | |
parent | 5837035a2e5679f529196ae85bc041d695a69176 (diff) | |
download | libeufin-3046601e239d774716527d4a0a34f585ac5ade79.tar.gz libeufin-3046601e239d774716527d4a0a34f585ac5ade79.tar.bz2 libeufin-3046601e239d774716527d4a0a34f585ac5ade79.zip |
Reduce memory usage by using InputStream as much as possiblev0.9.4-dev.19
Diffstat (limited to 'nexus')
-rw-r--r-- | nexus/build.gradle | 3 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt | 27 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt | 2 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt | 9 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt | 16 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt | 8 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt | 85 | ||||
-rw-r--r-- | nexus/src/test/kotlin/Ebics.kt | 6 |
8 files changed, 66 insertions, 90 deletions
diff --git a/nexus/build.gradle b/nexus/build.gradle index 3d6c6c7c..b510dba6 100644 --- a/nexus/build.gradle +++ b/nexus/build.gradle @@ -27,9 +27,6 @@ dependencies { // XML parsing/binding and encryption implementation("jakarta.xml.bind:jakarta.xml.bind-api:2.3.3") - // Compression - implementation("org.apache.commons:commons-compress:1.25.0") - // Command line parsing implementation("com.github.ajalt.clikt:clikt:$clikt_version") implementation("org.postgresql:postgresql:$postgres_version") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt index 03d37e76..d4234cd1 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -31,6 +31,7 @@ import tech.libeufin.common.* import tech.libeufin.ebics.* import tech.libeufin.ebics.ebics_h005.Ebics3Request import java.io.IOException +import java.io.InputStream import java.time.Instant import java.time.LocalDate import java.time.ZoneId @@ -78,12 +79,12 @@ data class FetchContext( * length is zero. It returns null, if the bank assigned an * error to the EBICS transaction. */ -private suspend fun <T> downloadHelper( +private suspend fun downloadHelper( ctx: FetchContext, lastExecutionTime: Instant? = null, doc: SupportedDocument, - processing: (ByteArray) -> T -): T? { + processing: (InputStream) -> Unit +) { val isEbics3 = doc != SupportedDocument.PAIN_002_LOGS val initXml = if (isEbics3) { createEbics3DownloadInitialization( @@ -246,7 +247,7 @@ suspend fun ingestIncomingPayment( private fun ingestDocument( db: Database, currency: String, - xml: ByteArray, + xml: InputStream, whichDocument: SupportedDocument ) { when (whichDocument) { @@ -308,7 +309,7 @@ private fun ingestDocument( private fun ingestDocuments( db: Database, currency: String, - content: ByteArray, + content: InputStream, whichDocument: SupportedDocument ) { when (whichDocument) { @@ -317,7 +318,7 @@ private fun ingestDocuments( SupportedDocument.CAMT_053, SupportedDocument.CAMT_052 -> { try { - content.unzipForEach { fileName, xmlContent -> + content.unzipEach { fileName, xmlContent -> logger.trace("parse $fileName") ingestDocument(db, currency, xmlContent, whichDocument) } @@ -364,14 +365,12 @@ private suspend fun fetchDocuments( } val doc = doc.doc() // downloading the content - downloadHelper(ctx, lastExecutionTime, doc) { content -> - if (!content.isEmpty()) { - ctx.fileLogger.logFetch( - content, - doc == SupportedDocument.PAIN_002_LOGS - ) - ingestDocuments(db, ctx.cfg.currency, content, doc) - } + downloadHelper(ctx, lastExecutionTime, doc) { stream -> + val loggedStream = ctx.fileLogger.logFetch( + stream, + doc == SupportedDocument.PAIN_002_LOGS + ) + ingestDocuments(db, ctx.cfg.currency, loggedStream, doc) } true } catch (e: Exception) { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt index bd95543b..a5c40f35 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt @@ -110,7 +110,7 @@ private fun handleHpbResponse( throw Exception("HPB content not found in a EBICS response with successful return codes.") } val hpbObj = try { - parseEbicsHpbOrder(hpbBytes) + parseEbicsHpbOrder(hpbBytes.inputStream()) } catch (e: Exception) { throw Exception("HPB response content seems invalid", e) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt index 97ba52f6..f6562c2e 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt @@ -23,6 +23,7 @@ import tech.libeufin.ebics.* import java.net.URLEncoder import java.time.* import java.time.format.* +import java.io.InputStream /** @@ -145,7 +146,7 @@ data class CustomerAck( * * @param xml pain.002 input document */ -fun parseCustomerAck(xml: ByteArray): List<CustomerAck> { +fun parseCustomerAck(xml: InputStream): List<CustomerAck> { return destructXml(xml, "Document") { one("CstmrPmtStsRpt").map("OrgnlPmtInfAndSts") { val actionType = one("OrgnlPmtInfId").enum<HacAction>() @@ -214,7 +215,7 @@ data class Reason ( * * @param xml pain.002 input document */ -fun parseCustomerPaymentStatusReport(xml: ByteArray): PaymentStatus { +fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus { fun XmlDestructor.reasons(): List<Reason> { return map("StsRsnInf") { val code = one("Rsn").one("Cd").enum<ExternalStatusReasonCode>() @@ -255,7 +256,7 @@ fun parseCustomerPaymentStatusReport(xml: ByteArray): PaymentStatus { * @param outgoing list of outgoing payments */ fun parseTxNotif( - notifXml: ByteArray, + notifXml: InputStream, acceptedCurrency: String, incoming: MutableList<IncomingPayment>, outgoing: MutableList<OutgoingPayment> @@ -324,7 +325,7 @@ fun parseTxNotif( * @param xml the input document. */ private fun notificationForEachTx( - xml: ByteArray, + xml: InputStream, directionLambda: XmlDestructor.(Instant) -> Unit ) { destructXml(xml, "Document") { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt index 3299a16e..88e3481a 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt @@ -19,7 +19,6 @@ package tech.libeufin.nexus -import tech.libeufin.nexus.ebics.unzipForEach import tech.libeufin.common.* import java.io.* import java.nio.file.* @@ -50,12 +49,12 @@ class FileLogger(path: String?) { /** * Logs EBICS fetch content if EBICS debug logging is enabled * - * @param content EBICS fetch content + * @param stream EBICS fetch content * @param hac only true when downloading via HAC (EBICS 2) */ - fun logFetch(content: ByteArray, hac: Boolean = false) { - if (dir == null) return; - + fun logFetch(stream: InputStream, hac: Boolean = false): InputStream { + if (dir == null) return stream; + val content = stream.readBytes() // Subdir based on current day. val now = Instant.now() val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC")) @@ -67,10 +66,13 @@ class FileLogger(path: String?) { subDir.resolve("${nowMs}_HAC_response.pain.002.xml").writeBytes(content) } else { // Write each ZIP entry in the combined dir. - content.unzipForEach { fileName, xmlContent -> - subDir.resolve("${nowMs}_$fileName").writeBytes(xmlContent) + content.inputStream().unzipEach { fileName, xmlContent -> + xmlContent.use { + Files.copy(it, subDir.resolve("${nowMs}_$fileName")) + } } } + return content.inputStream() } /** diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt index 9f8a5d9f..1d96e93d 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt @@ -36,6 +36,7 @@ import java.security.interfaces.RSAPrivateCrtKey import java.time.Instant import java.time.ZoneId import java.util.* +import java.io.InputStream import javax.xml.datatype.DatatypeFactory private val logger: Logger = LoggerFactory.getLogger("libeufin-nexus-ebics2") @@ -151,10 +152,11 @@ fun createEbics25DownloadTransferPhase( */ fun parseKeysMgmtResponse( clientEncryptionKey: RSAPrivateCrtKey, - xml: ByteArray + xml: InputStream ): EbicsKeyManagementResponseContent? { + // TODO throw instead of null val jaxb = try { - XMLUtil.convertBytesToJaxb<EbicsKeyManagementResponse>(xml) + XMLUtil.convertToJaxb<EbicsKeyManagementResponse>(xml) } catch (e: Exception) { tech.libeufin.nexus.logger.error("Could not parse the raw response from bank into JAXB.") return null @@ -172,7 +174,7 @@ fun parseKeysMgmtResponse( clientEncryptionKey, DataEncryptionInfo(this.transactionKey, this.encryptionPubKeyDigest.value), listOf(encOrderData) - ) + ).readBytes() } } val bankReturnCode = EbicsReturnCode.lookup(jaxb.value.body.returnCode.value) // business error diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt index 58718824..66a38b62 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt @@ -42,20 +42,20 @@ import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* -import org.apache.commons.compress.archivers.zip.ZipFile -import org.apache.commons.compress.utils.SeekableInMemoryByteChannel +import io.ktor.utils.io.jvm.javaio.* import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.libeufin.nexus.* import tech.libeufin.common.* import tech.libeufin.ebics.* import tech.libeufin.ebics.ebics_h005.Ebics3Request +import java.io.SequenceInputStream import java.io.ByteArrayOutputStream +import java.io.InputStream import java.security.interfaces.RSAPrivateCrtKey import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.util.* -import java.util.zip.DeflaterInputStream /** * Available EBICS versions. @@ -73,24 +73,6 @@ enum class SupportedDocument { CAMT_054 } - -/** - * Unzips the ByteArray and runs the lambda over each entry. - * - * @param lambda function that gets the (fileName, fileContent) pair - * for each entry in the ZIP archive as input. - */ -fun ByteArray.unzipForEach(lambda: (String, ByteArray) -> Unit) { - val mem = SeekableInMemoryByteChannel(this) - ZipFile(mem).use { file -> - file.getEntriesInPhysicalOrder().iterator().forEach { - lambda( - it.name, file.getInputStream(it).readAllBytes() - ) - } - } -} - /** * Decrypts and decompresses the business payload that was * transported within an EBICS message from the bank @@ -106,21 +88,16 @@ fun decryptAndDecompressPayload( clientEncryptionKey: RSAPrivateCrtKey, encryptionInfo: DataEncryptionInfo, chunks: List<String> -): ByteArray { - val buf = StringBuilder() - chunks.forEach { buf.append(it) } - val decoded = Base64.getDecoder().decode(buf.toString()) - val er = CryptoUtil.EncryptionResult( - encryptionInfo.transactionKey, - encryptionInfo.bankPubDigest, - decoded - ) - val dataCompr = CryptoUtil.decryptEbicsE002( - er, - clientEncryptionKey - ) - return EbicsOrderUtil.decodeOrderData(dataCompr) -} +): InputStream = + SequenceInputStream(Collections.enumeration(chunks.map { it.toByteArray().inputStream() })) // Aggregate + .decodeBase64() + .run { + CryptoUtil.decryptEbicsE002( + encryptionInfo.transactionKey, + this, + clientEncryptionKey + ) + }.inflate() /** * POSTs the EBICS message to the bank. @@ -129,7 +106,7 @@ fun decryptAndDecompressPayload( * @param msg EBICS message as raw bytes. * @return the raw bank response. */ -suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): ByteArray { +suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): InputStream { logger.debug("POSTing EBICS to '$bankUrl'") val res = post(urlString = bankUrl) { contentType(ContentType.Text.Xml) @@ -138,7 +115,7 @@ suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): ByteArray { if (res.status != HttpStatusCode.OK) { throw Exception("Invalid response status: ${res.status}") } - return res.readBytes() // TODO input stream + return res.bodyAsChannel().toInputStream() } /** @@ -278,19 +255,19 @@ private fun areCodesOk(ebicsResponseContent: EbicsResponseContent) = * @param bankKeys bank EBICS public keys. * @param reqXml raw EBICS XML request of the init phase. * @param isEbics3 true for EBICS 3, false otherwise. - * @param processing processing lambda receiving EBICS files as bytes or empty bytes if nothing to download. - * @return T if the transaction was successful and null if the transaction was empty. If the failure is at the EBICS + * @param processing processing lambda receiving EBICS files as a byte stream if the transaction was not empty. + * @return T if the transaction was successful. If the failure is at the EBICS * level EbicsSideException is thrown else ités the expection of the processing lambda. */ -suspend fun <T> ebicsDownload( +suspend fun ebicsDownload( client: HttpClient, cfg: EbicsSetupConfig, clientKeys: ClientPrivateKeysFile, bankKeys: BankPublicKeysFile, reqXml: ByteArray, isEbics3: Boolean, - processing: (ByteArray) -> T -): T { + processing: (InputStream) -> Unit +) { val initResp = postEbics(client, cfg, bankKeys, reqXml, isEbics3) logger.debug("Download init phase done. EBICS- and bank-technical codes are: ${initResp.technicalReturnCode}, ${initResp.bankReturnCode}") if (initResp.technicalReturnCode != EbicsReturnCode.EBICS_OK) { @@ -298,7 +275,7 @@ suspend fun <T> ebicsDownload( } if (initResp.bankReturnCode == EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE) { logger.debug("Download content is empty") - return processing(ByteArray(0)) + return } if (initResp.bankReturnCode != EbicsReturnCode.EBICS_OK) { throw Exception("Download init phase has bank-technical error: ${initResp.bankReturnCode}") @@ -419,11 +396,11 @@ class EbicsSideException( */ fun parseAndValidateEbicsResponse( bankKeys: BankPublicKeysFile, - resp: ByteArray, + resp: InputStream, withEbics3: Boolean ): EbicsResponseContent { - val responseDocument = try { - XMLUtil.parseBytesIntoDom(resp) + val doc = try { + XMLUtil.parseIntoDom(resp) } catch (e: Exception) { throw EbicsSideException( "Bank response apparently invalid", @@ -431,9 +408,9 @@ fun parseAndValidateEbicsResponse( ) } if (!XMLUtil.verifyEbicsDocument( - responseDocument, - bankKeys.bank_authentication_public_key, - withEbics3 + doc, + bankKeys.bank_authentication_public_key, + withEbics3 )) { throw EbicsSideException( "Bank signature did not verify", @@ -441,8 +418,8 @@ fun parseAndValidateEbicsResponse( ) } if (withEbics3) - return ebics3toInternalRepr(resp) - return ebics25toInternalRepr(resp) + return ebics3toInternalRepr(doc) + return ebics25toInternalRepr(doc) } /** @@ -490,9 +467,7 @@ fun prepareUploadPayload( val plainTransactionKey = encryptionResult.plainTransactionKey ?: throw Exception("Could not generate the transaction key, cannot encrypt the payload!") // Then only E002 symmetric (with ephemeral key) encrypt. - val compressedInnerPayload = DeflaterInputStream( - payload.inputStream() - ).use { it.readAllBytes() } + val compressedInnerPayload = payload.inputStream().deflate().readAllBytes() val encryptedPayload = CryptoUtil.encryptEbicsE002withTransactionKey( compressedInnerPayload, bankKeys.bank_encryption_public_key, diff --git a/nexus/src/test/kotlin/Ebics.kt b/nexus/src/test/kotlin/Ebics.kt index 507820ce..f3fad224 100644 --- a/nexus/src/test/kotlin/Ebics.kt +++ b/nexus/src/test/kotlin/Ebics.kt @@ -35,7 +35,7 @@ class Ebics { @Test fun iniMessage() = conf { config -> val msg = generateIniMessage(config, clientKeys) - val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid + val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid assertEquals(ini.value.header.static.orderDetails.orderType, "INI") // ensures is INI } @@ -43,7 +43,7 @@ class Ebics { @Test fun hiaMessage() = conf { config -> val msg = generateHiaMessage(config, clientKeys) - val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid + val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid assertEquals(ini.value.header.static.orderDetails.orderType, "HIA") // ensures is HIA } @@ -51,7 +51,7 @@ class Ebics { @Test fun hpbMessage() = conf { config -> val msg = generateHpbMessage(config, clientKeys) - val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid + val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid assertEquals(ini.value.header.static.orderDetails.orderType, "HPB") // ensures is HPB } // POSTs an EBICS message to the mock bank. Tests |