aboutsummaryrefslogtreecommitdiff
path: root/nexus
diff options
context:
space:
mode:
authorAntoine A <>2024-02-13 17:20:33 +0100
committerAntoine A <>2024-02-13 17:20:33 +0100
commit3046601e239d774716527d4a0a34f585ac5ade79 (patch)
tree093ef827ed28be1c32ab25d3342b1a0e324a6ab4 /nexus
parent5837035a2e5679f529196ae85bc041d695a69176 (diff)
downloadlibeufin-3046601e239d774716527d4a0a34f585ac5ade79.tar.gz
libeufin-3046601e239d774716527d4a0a34f585ac5ade79.tar.bz2
libeufin-3046601e239d774716527d4a0a34f585ac5ade79.zip
Reduce memory usage by using InputStream as much as possiblev0.9.4-dev.19
Diffstat (limited to 'nexus')
-rw-r--r--nexus/build.gradle3
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt27
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt2
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt9
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt16
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt8
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt85
-rw-r--r--nexus/src/test/kotlin/Ebics.kt6
8 files changed, 66 insertions, 90 deletions
diff --git a/nexus/build.gradle b/nexus/build.gradle
index 3d6c6c7c..b510dba6 100644
--- a/nexus/build.gradle
+++ b/nexus/build.gradle
@@ -27,9 +27,6 @@ dependencies {
// XML parsing/binding and encryption
implementation("jakarta.xml.bind:jakarta.xml.bind-api:2.3.3")
- // Compression
- implementation("org.apache.commons:commons-compress:1.25.0")
-
// Command line parsing
implementation("com.github.ajalt.clikt:clikt:$clikt_version")
implementation("org.postgresql:postgresql:$postgres_version")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index 03d37e76..d4234cd1 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -31,6 +31,7 @@ import tech.libeufin.common.*
import tech.libeufin.ebics.*
import tech.libeufin.ebics.ebics_h005.Ebics3Request
import java.io.IOException
+import java.io.InputStream
import java.time.Instant
import java.time.LocalDate
import java.time.ZoneId
@@ -78,12 +79,12 @@ data class FetchContext(
* length is zero. It returns null, if the bank assigned an
* error to the EBICS transaction.
*/
-private suspend fun <T> downloadHelper(
+private suspend fun downloadHelper(
ctx: FetchContext,
lastExecutionTime: Instant? = null,
doc: SupportedDocument,
- processing: (ByteArray) -> T
-): T? {
+ processing: (InputStream) -> Unit
+) {
val isEbics3 = doc != SupportedDocument.PAIN_002_LOGS
val initXml = if (isEbics3) {
createEbics3DownloadInitialization(
@@ -246,7 +247,7 @@ suspend fun ingestIncomingPayment(
private fun ingestDocument(
db: Database,
currency: String,
- xml: ByteArray,
+ xml: InputStream,
whichDocument: SupportedDocument
) {
when (whichDocument) {
@@ -308,7 +309,7 @@ private fun ingestDocument(
private fun ingestDocuments(
db: Database,
currency: String,
- content: ByteArray,
+ content: InputStream,
whichDocument: SupportedDocument
) {
when (whichDocument) {
@@ -317,7 +318,7 @@ private fun ingestDocuments(
SupportedDocument.CAMT_053,
SupportedDocument.CAMT_052 -> {
try {
- content.unzipForEach { fileName, xmlContent ->
+ content.unzipEach { fileName, xmlContent ->
logger.trace("parse $fileName")
ingestDocument(db, currency, xmlContent, whichDocument)
}
@@ -364,14 +365,12 @@ private suspend fun fetchDocuments(
}
val doc = doc.doc()
// downloading the content
- downloadHelper(ctx, lastExecutionTime, doc) { content ->
- if (!content.isEmpty()) {
- ctx.fileLogger.logFetch(
- content,
- doc == SupportedDocument.PAIN_002_LOGS
- )
- ingestDocuments(db, ctx.cfg.currency, content, doc)
- }
+ downloadHelper(ctx, lastExecutionTime, doc) { stream ->
+ val loggedStream = ctx.fileLogger.logFetch(
+ stream,
+ doc == SupportedDocument.PAIN_002_LOGS
+ )
+ ingestDocuments(db, ctx.cfg.currency, loggedStream, doc)
}
true
} catch (e: Exception) {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
index bd95543b..a5c40f35 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt
@@ -110,7 +110,7 @@ private fun handleHpbResponse(
throw Exception("HPB content not found in a EBICS response with successful return codes.")
}
val hpbObj = try {
- parseEbicsHpbOrder(hpbBytes)
+ parseEbicsHpbOrder(hpbBytes.inputStream())
} catch (e: Exception) {
throw Exception("HPB response content seems invalid", e)
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
index 97ba52f6..f6562c2e 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
@@ -23,6 +23,7 @@ import tech.libeufin.ebics.*
import java.net.URLEncoder
import java.time.*
import java.time.format.*
+import java.io.InputStream
/**
@@ -145,7 +146,7 @@ data class CustomerAck(
*
* @param xml pain.002 input document
*/
-fun parseCustomerAck(xml: ByteArray): List<CustomerAck> {
+fun parseCustomerAck(xml: InputStream): List<CustomerAck> {
return destructXml(xml, "Document") {
one("CstmrPmtStsRpt").map("OrgnlPmtInfAndSts") {
val actionType = one("OrgnlPmtInfId").enum<HacAction>()
@@ -214,7 +215,7 @@ data class Reason (
*
* @param xml pain.002 input document
*/
-fun parseCustomerPaymentStatusReport(xml: ByteArray): PaymentStatus {
+fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus {
fun XmlDestructor.reasons(): List<Reason> {
return map("StsRsnInf") {
val code = one("Rsn").one("Cd").enum<ExternalStatusReasonCode>()
@@ -255,7 +256,7 @@ fun parseCustomerPaymentStatusReport(xml: ByteArray): PaymentStatus {
* @param outgoing list of outgoing payments
*/
fun parseTxNotif(
- notifXml: ByteArray,
+ notifXml: InputStream,
acceptedCurrency: String,
incoming: MutableList<IncomingPayment>,
outgoing: MutableList<OutgoingPayment>
@@ -324,7 +325,7 @@ fun parseTxNotif(
* @param xml the input document.
*/
private fun notificationForEachTx(
- xml: ByteArray,
+ xml: InputStream,
directionLambda: XmlDestructor.(Instant) -> Unit
) {
destructXml(xml, "Document") {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt
index 3299a16e..88e3481a 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Log.kt
@@ -19,7 +19,6 @@
package tech.libeufin.nexus
-import tech.libeufin.nexus.ebics.unzipForEach
import tech.libeufin.common.*
import java.io.*
import java.nio.file.*
@@ -50,12 +49,12 @@ class FileLogger(path: String?) {
/**
* Logs EBICS fetch content if EBICS debug logging is enabled
*
- * @param content EBICS fetch content
+ * @param stream EBICS fetch content
* @param hac only true when downloading via HAC (EBICS 2)
*/
- fun logFetch(content: ByteArray, hac: Boolean = false) {
- if (dir == null) return;
-
+ fun logFetch(stream: InputStream, hac: Boolean = false): InputStream {
+ if (dir == null) return stream;
+ val content = stream.readBytes()
// Subdir based on current day.
val now = Instant.now()
val asUtcDate = LocalDate.ofInstant(now, ZoneId.of("UTC"))
@@ -67,10 +66,13 @@ class FileLogger(path: String?) {
subDir.resolve("${nowMs}_HAC_response.pain.002.xml").writeBytes(content)
} else {
// Write each ZIP entry in the combined dir.
- content.unzipForEach { fileName, xmlContent ->
- subDir.resolve("${nowMs}_$fileName").writeBytes(xmlContent)
+ content.inputStream().unzipEach { fileName, xmlContent ->
+ xmlContent.use {
+ Files.copy(it, subDir.resolve("${nowMs}_$fileName"))
+ }
}
}
+ return content.inputStream()
}
/**
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt
index 9f8a5d9f..1d96e93d 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/Ebics2.kt
@@ -36,6 +36,7 @@ import java.security.interfaces.RSAPrivateCrtKey
import java.time.Instant
import java.time.ZoneId
import java.util.*
+import java.io.InputStream
import javax.xml.datatype.DatatypeFactory
private val logger: Logger = LoggerFactory.getLogger("libeufin-nexus-ebics2")
@@ -151,10 +152,11 @@ fun createEbics25DownloadTransferPhase(
*/
fun parseKeysMgmtResponse(
clientEncryptionKey: RSAPrivateCrtKey,
- xml: ByteArray
+ xml: InputStream
): EbicsKeyManagementResponseContent? {
+ // TODO throw instead of null
val jaxb = try {
- XMLUtil.convertBytesToJaxb<EbicsKeyManagementResponse>(xml)
+ XMLUtil.convertToJaxb<EbicsKeyManagementResponse>(xml)
} catch (e: Exception) {
tech.libeufin.nexus.logger.error("Could not parse the raw response from bank into JAXB.")
return null
@@ -172,7 +174,7 @@ fun parseKeysMgmtResponse(
clientEncryptionKey,
DataEncryptionInfo(this.transactionKey, this.encryptionPubKeyDigest.value),
listOf(encOrderData)
- )
+ ).readBytes()
}
}
val bankReturnCode = EbicsReturnCode.lookup(jaxb.value.body.returnCode.value) // business error
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
index 58718824..66a38b62 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
@@ -42,20 +42,20 @@ import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
-import org.apache.commons.compress.archivers.zip.ZipFile
-import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
+import io.ktor.utils.io.jvm.javaio.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import tech.libeufin.nexus.*
import tech.libeufin.common.*
import tech.libeufin.ebics.*
import tech.libeufin.ebics.ebics_h005.Ebics3Request
+import java.io.SequenceInputStream
import java.io.ByteArrayOutputStream
+import java.io.InputStream
import java.security.interfaces.RSAPrivateCrtKey
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.*
-import java.util.zip.DeflaterInputStream
/**
* Available EBICS versions.
@@ -73,24 +73,6 @@ enum class SupportedDocument {
CAMT_054
}
-
-/**
- * Unzips the ByteArray and runs the lambda over each entry.
- *
- * @param lambda function that gets the (fileName, fileContent) pair
- * for each entry in the ZIP archive as input.
- */
-fun ByteArray.unzipForEach(lambda: (String, ByteArray) -> Unit) {
- val mem = SeekableInMemoryByteChannel(this)
- ZipFile(mem).use { file ->
- file.getEntriesInPhysicalOrder().iterator().forEach {
- lambda(
- it.name, file.getInputStream(it).readAllBytes()
- )
- }
- }
-}
-
/**
* Decrypts and decompresses the business payload that was
* transported within an EBICS message from the bank
@@ -106,21 +88,16 @@ fun decryptAndDecompressPayload(
clientEncryptionKey: RSAPrivateCrtKey,
encryptionInfo: DataEncryptionInfo,
chunks: List<String>
-): ByteArray {
- val buf = StringBuilder()
- chunks.forEach { buf.append(it) }
- val decoded = Base64.getDecoder().decode(buf.toString())
- val er = CryptoUtil.EncryptionResult(
- encryptionInfo.transactionKey,
- encryptionInfo.bankPubDigest,
- decoded
- )
- val dataCompr = CryptoUtil.decryptEbicsE002(
- er,
- clientEncryptionKey
- )
- return EbicsOrderUtil.decodeOrderData(dataCompr)
-}
+): InputStream =
+ SequenceInputStream(Collections.enumeration(chunks.map { it.toByteArray().inputStream() })) // Aggregate
+ .decodeBase64()
+ .run {
+ CryptoUtil.decryptEbicsE002(
+ encryptionInfo.transactionKey,
+ this,
+ clientEncryptionKey
+ )
+ }.inflate()
/**
* POSTs the EBICS message to the bank.
@@ -129,7 +106,7 @@ fun decryptAndDecompressPayload(
* @param msg EBICS message as raw bytes.
* @return the raw bank response.
*/
-suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): ByteArray {
+suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): InputStream {
logger.debug("POSTing EBICS to '$bankUrl'")
val res = post(urlString = bankUrl) {
contentType(ContentType.Text.Xml)
@@ -138,7 +115,7 @@ suspend fun HttpClient.postToBank(bankUrl: String, msg: ByteArray): ByteArray {
if (res.status != HttpStatusCode.OK) {
throw Exception("Invalid response status: ${res.status}")
}
- return res.readBytes() // TODO input stream
+ return res.bodyAsChannel().toInputStream()
}
/**
@@ -278,19 +255,19 @@ private fun areCodesOk(ebicsResponseContent: EbicsResponseContent) =
* @param bankKeys bank EBICS public keys.
* @param reqXml raw EBICS XML request of the init phase.
* @param isEbics3 true for EBICS 3, false otherwise.
- * @param processing processing lambda receiving EBICS files as bytes or empty bytes if nothing to download.
- * @return T if the transaction was successful and null if the transaction was empty. If the failure is at the EBICS
+ * @param processing processing lambda receiving EBICS files as a byte stream if the transaction was not empty.
+ * @return T if the transaction was successful. If the failure is at the EBICS
* level EbicsSideException is thrown else ités the expection of the processing lambda.
*/
-suspend fun <T> ebicsDownload(
+suspend fun ebicsDownload(
client: HttpClient,
cfg: EbicsSetupConfig,
clientKeys: ClientPrivateKeysFile,
bankKeys: BankPublicKeysFile,
reqXml: ByteArray,
isEbics3: Boolean,
- processing: (ByteArray) -> T
-): T {
+ processing: (InputStream) -> Unit
+) {
val initResp = postEbics(client, cfg, bankKeys, reqXml, isEbics3)
logger.debug("Download init phase done. EBICS- and bank-technical codes are: ${initResp.technicalReturnCode}, ${initResp.bankReturnCode}")
if (initResp.technicalReturnCode != EbicsReturnCode.EBICS_OK) {
@@ -298,7 +275,7 @@ suspend fun <T> ebicsDownload(
}
if (initResp.bankReturnCode == EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE) {
logger.debug("Download content is empty")
- return processing(ByteArray(0))
+ return
}
if (initResp.bankReturnCode != EbicsReturnCode.EBICS_OK) {
throw Exception("Download init phase has bank-technical error: ${initResp.bankReturnCode}")
@@ -419,11 +396,11 @@ class EbicsSideException(
*/
fun parseAndValidateEbicsResponse(
bankKeys: BankPublicKeysFile,
- resp: ByteArray,
+ resp: InputStream,
withEbics3: Boolean
): EbicsResponseContent {
- val responseDocument = try {
- XMLUtil.parseBytesIntoDom(resp)
+ val doc = try {
+ XMLUtil.parseIntoDom(resp)
} catch (e: Exception) {
throw EbicsSideException(
"Bank response apparently invalid",
@@ -431,9 +408,9 @@ fun parseAndValidateEbicsResponse(
)
}
if (!XMLUtil.verifyEbicsDocument(
- responseDocument,
- bankKeys.bank_authentication_public_key,
- withEbics3
+ doc,
+ bankKeys.bank_authentication_public_key,
+ withEbics3
)) {
throw EbicsSideException(
"Bank signature did not verify",
@@ -441,8 +418,8 @@ fun parseAndValidateEbicsResponse(
)
}
if (withEbics3)
- return ebics3toInternalRepr(resp)
- return ebics25toInternalRepr(resp)
+ return ebics3toInternalRepr(doc)
+ return ebics25toInternalRepr(doc)
}
/**
@@ -490,9 +467,7 @@ fun prepareUploadPayload(
val plainTransactionKey = encryptionResult.plainTransactionKey
?: throw Exception("Could not generate the transaction key, cannot encrypt the payload!")
// Then only E002 symmetric (with ephemeral key) encrypt.
- val compressedInnerPayload = DeflaterInputStream(
- payload.inputStream()
- ).use { it.readAllBytes() }
+ val compressedInnerPayload = payload.inputStream().deflate().readAllBytes()
val encryptedPayload = CryptoUtil.encryptEbicsE002withTransactionKey(
compressedInnerPayload,
bankKeys.bank_encryption_public_key,
diff --git a/nexus/src/test/kotlin/Ebics.kt b/nexus/src/test/kotlin/Ebics.kt
index 507820ce..f3fad224 100644
--- a/nexus/src/test/kotlin/Ebics.kt
+++ b/nexus/src/test/kotlin/Ebics.kt
@@ -35,7 +35,7 @@ class Ebics {
@Test
fun iniMessage() = conf { config ->
val msg = generateIniMessage(config, clientKeys)
- val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid
+ val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid
assertEquals(ini.value.header.static.orderDetails.orderType, "INI") // ensures is INI
}
@@ -43,7 +43,7 @@ class Ebics {
@Test
fun hiaMessage() = conf { config ->
val msg = generateHiaMessage(config, clientKeys)
- val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid
+ val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid
assertEquals(ini.value.header.static.orderDetails.orderType, "HIA") // ensures is HIA
}
@@ -51,7 +51,7 @@ class Ebics {
@Test
fun hpbMessage() = conf { config ->
val msg = generateHpbMessage(config, clientKeys)
- val ini = XMLUtil.convertBytesToJaxb<EbicsUnsecuredRequest>(msg) // ensures is valid
+ val ini = XMLUtil.convertToJaxb<EbicsUnsecuredRequest>(msg.inputStream()) // ensures is valid
assertEquals(ini.value.header.static.orderDetails.orderType, "HPB") // ensures is HPB
}
// POSTs an EBICS message to the mock bank. Tests