diff options
author | MS <ms@taler.net> | 2020-06-02 15:12:50 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2020-06-02 15:12:50 +0200 |
commit | 58978d661afe089ef4632d16c81ffce3059f0f07 (patch) | |
tree | 0eea798f7fde96afdd77a16976f89b7e43717b26 /nexus/src/main/kotlin/tech | |
parent | d5c493531fb0f9e1f8f2c23a356e3243e26410d4 (diff) | |
download | libeufin-58978d661afe089ef4632d16c81ffce3059f0f07.tar.gz libeufin-58978d661afe089ef4632d16c81ffce3059f0f07.tar.bz2 libeufin-58978d661afe089ef4632d16c81ffce3059f0f07.zip |
Freezing the last-seen raw payment into one column value.
Diffstat (limited to 'nexus/src/main/kotlin/tech')
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 2 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 4 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt | 127 |
3 files changed, 53 insertions, 80 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt index 7dbf7b06..642daa88 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -272,6 +272,7 @@ object FacadesTable : IdTable<String>() { val type = text("type") val creator = reference("creator", NexusUsersTable) val config = reference("config", TalerFacadeConfigsTable) // see #6266 + val highestSeenMsgID = long("highestSeenMessageID").default(0) } class FacadeEntity(id: EntityID<String>) : Entity<String>(id) { @@ -279,6 +280,7 @@ class FacadeEntity(id: EntityID<String>) : Entity<String>(id) { var type by FacadesTable.type var creator by NexusUserEntity referencedOn FacadesTable.creator var config by TalerFacadeConfigEntity referencedOn FacadesTable.config + var highestSeenMsgID by FacadesTable.highestSeenMsgID } object TalerFacadeConfigsTable : IntIdTable() { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt index c1a1e3c8..1c34f71b 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -260,7 +260,7 @@ fun ApplicationRequest.hasBody(): Boolean { } suspend fun schedulePeriodicWork(coroutineScope: CoroutineScope) { while (true) { - delay(Duration.ofSeconds(1)) + delay(Duration.ofMillis(100)) downloadFacadesTransactions(coroutineScope) ingestTalerTransactions() } @@ -1097,7 +1097,7 @@ fun serverMain(dbName: String) { * Hello endpoint. */ get("/") { - call.respondText("Hello by nexus!\n") + call.respondText("Hello, this is Nexus.\n") return@get } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt index 349acbbe..2ba1751d 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt @@ -16,9 +16,13 @@ import org.apache.http.client.methods.RequestBuilder.post import org.jetbrains.exposed.dao.Entity import org.jetbrains.exposed.dao.IdTable import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.transactions.transaction import org.joda.time.DateTime import tech.libeufin.util.* +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.concurrent.atomic.LongAdder import kotlin.math.abs import kotlin.math.min @@ -306,34 +310,23 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { val addIncomingData = call.receive<TalerAdminAddIncoming>() val debtor = parsePayto(addIncomingData.debit_account) val amount = parseAmount(addIncomingData.amount) - val (bookingDate, opaque_row_id) = transaction { - val exchangeUser = authenticateRequest(call.request) - val rawPayment = RawBankTransactionEntity.new { - unstructuredRemittanceInformation = addIncomingData.reserve_pub - transactionType = "CRDT" - currency = amount.currency - this.amount = amount.amount.toPlainString() - counterpartBic = debtor.bic - counterpartName = debtor.name - counterpartIban = debtor.iban - bookingDate = DateTime.now().millis - status = "BOOK" - bankAccount = getFacadeBankAccount(exchangeUser) - } - /** This payment is "valid by default" and will be returned - * as soon as the exchange will ask for new payments. */ - val row = TalerIncomingPaymentEntity.new { - payment = rawPayment - valid = true - } - Pair(rawPayment.bookingDate, row.id.value) + + val myLastSeenRawPayment = transaction { + val facadeID = expectNonNull(call.parameters["fcid"]) + val facade = FacadeEntity.findById(facadeID) ?: throw NexusError( + HttpStatusCode.NotFound, "Could not find facade" + ) + facade.highestSeenMsgID } return call.respond( TextContent( customConverter( TalerAddIncomingResponse( - timestamp = GnunetTimestamp(bookingDate/ 1000), - row_id = opaque_row_id + timestamp = GnunetTimestamp( + // warning: this value might need to come from a real last-seen payment. + LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond() + ), + row_id = myLastSeenRawPayment ) ), ContentType.Application.Json @@ -350,21 +343,16 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit { * in the local table). */ fun ingestTalerTransactions() { - fun ingestIncoming(subscriberAccount: NexusBankAccountEntity) { - val latestIncomingPayment = TalerIncomingPaymentEntity.all().maxBy { it.payment.id.value } + fun ingest(subscriberAccount: NexusBankAccountEntity, facade: FacadeEntity) { + var lastId = facade.highestSeenMsgID RawBankTransactionEntity.find { /** Those with exchange bank account involved */ RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value and - /** Those that are incoming */ - (RawBankTransactionsTable.transactionType eq "CRDT") and /** Those that are booked */ (RawBankTransactionsTable.status eq "BOOK") and /** Those that came later than the latest processed payment */ - (RawBankTransactionsTable.id.greater( - if (latestIncomingPayment == null) 0 - else latestIncomingPayment.payment.id.value - )) - }.forEach { + (RawBankTransactionsTable.id.greater(lastId)) + }.orderBy(Pair(RawBankTransactionsTable.id, SortOrder.ASC)).forEach { if (duplicatePayment(it)) { logger.warn("Incoming payment already seen") throw NexusError( @@ -372,63 +360,46 @@ fun ingestTalerTransactions() { "Incoming payment already seen" ) } - if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) { - TalerIncomingPaymentEntity.new { - payment = it - valid = true - } - } else { - TalerIncomingPaymentEntity.new { - payment = it - valid = false + // Incoming payment. + if (it.transactionType == "CRDT") { + if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) { + TalerIncomingPaymentEntity.new { + payment = it + valid = true + } + } else { + TalerIncomingPaymentEntity.new { + payment = it + valid = false + } } } - } - } - fun ingestOutgoing(subscriberAccount: NexusBankAccountEntity) { - val latestOutgoingPayment = TalerIncomingPaymentEntity.all().maxBy { it.payment.id.value } - RawBankTransactionEntity.find { - /** Those that came after the last processed payment */ - RawBankTransactionsTable.id.greater( - if (latestOutgoingPayment == null) 0 - else latestOutgoingPayment.payment.id.value - ) and - /** Those involving the exchange bank account */ - (RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value) and - /** Those that are outgoing */ - (RawBankTransactionsTable.transactionType eq "DBIT") - }.forEach { - if (paymentFailed(it)) { - logger.error("Bank didn't accept one payment from the exchange") - throw NexusError( - HttpStatusCode.InternalServerError, - "Bank didn't accept one payment from the exchange" - ) - } - if (duplicatePayment(it)) { - logger.warn("Incoming payment already seen") - throw NexusError( + // Outgoing payment + if (it.transactionType == "DBIT") { + var talerRequested = TalerRequestedPaymentEntity.find { + TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation + }.firstOrNull() ?: throw NexusError( HttpStatusCode.InternalServerError, - "Outgoing payment already seen" + "Payment '${it.unstructuredRemittanceInformation}' shows in history, but was never requested!" ) + if (talerRequested != null) { + talerRequested.rawConfirmed = it + } } - var talerRequested = TalerRequestedPaymentEntity.find { - TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation - }.firstOrNull() ?: throw NexusError( - HttpStatusCode.InternalServerError, - "Unrecognized fresh outgoing payment met (subject: ${it.unstructuredRemittanceInformation})." - ) - talerRequested.rawConfirmed = it + /** WARNING: it is not guaranteed that the last processed raw + * payment is ALSO the one with highest ID. A more accurate management + * is needed. */ + lastId = it.id.value } + facade.highestSeenMsgID = lastId } - + // invoke ingestion for all the facades transaction { FacadeEntity.find { FacadesTable.type eq "taler-wire-gateway" }.forEach { val subscriberAccount = getFacadeBankAccount(it.creator) - ingestIncoming(subscriberAccount) - ingestOutgoing(subscriberAccount) + ingest(subscriberAccount, it) } } } @@ -523,7 +494,7 @@ fun talerFacadeRoutes(route: Route) { return@get } route.get("") { - call.respondText("Hello Taler") + call.respondText("Hello, this is Taler Facade") return@get } }
\ No newline at end of file |