summaryrefslogtreecommitdiff
path: root/nexus/src/main/kotlin/tech
diff options
context:
space:
mode:
authorMS <ms@taler.net>2020-06-02 15:12:50 +0200
committerMS <ms@taler.net>2020-06-02 15:12:50 +0200
commit58978d661afe089ef4632d16c81ffce3059f0f07 (patch)
tree0eea798f7fde96afdd77a16976f89b7e43717b26 /nexus/src/main/kotlin/tech
parentd5c493531fb0f9e1f8f2c23a356e3243e26410d4 (diff)
downloadlibeufin-58978d661afe089ef4632d16c81ffce3059f0f07.tar.gz
libeufin-58978d661afe089ef4632d16c81ffce3059f0f07.tar.bz2
libeufin-58978d661afe089ef4632d16c81ffce3059f0f07.zip
Freezing the last-seen raw payment into one column value.
Diffstat (limited to 'nexus/src/main/kotlin/tech')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt2
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt4
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt127
3 files changed, 53 insertions, 80 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
index 7dbf7b06..642daa88 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt
@@ -272,6 +272,7 @@ object FacadesTable : IdTable<String>() {
val type = text("type")
val creator = reference("creator", NexusUsersTable)
val config = reference("config", TalerFacadeConfigsTable) // see #6266
+ val highestSeenMsgID = long("highestSeenMessageID").default(0)
}
class FacadeEntity(id: EntityID<String>) : Entity<String>(id) {
@@ -279,6 +280,7 @@ class FacadeEntity(id: EntityID<String>) : Entity<String>(id) {
var type by FacadesTable.type
var creator by NexusUserEntity referencedOn FacadesTable.creator
var config by TalerFacadeConfigEntity referencedOn FacadesTable.config
+ var highestSeenMsgID by FacadesTable.highestSeenMsgID
}
object TalerFacadeConfigsTable : IntIdTable() {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index c1a1e3c8..1c34f71b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -260,7 +260,7 @@ fun ApplicationRequest.hasBody(): Boolean {
}
suspend fun schedulePeriodicWork(coroutineScope: CoroutineScope) {
while (true) {
- delay(Duration.ofSeconds(1))
+ delay(Duration.ofMillis(100))
downloadFacadesTransactions(coroutineScope)
ingestTalerTransactions()
}
@@ -1097,7 +1097,7 @@ fun serverMain(dbName: String) {
* Hello endpoint.
*/
get("/") {
- call.respondText("Hello by nexus!\n")
+ call.respondText("Hello, this is Nexus.\n")
return@get
}
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
index 349acbbe..2ba1751d 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/taler.kt
@@ -16,9 +16,13 @@ import org.apache.http.client.methods.RequestBuilder.post
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.IdTable
import org.jetbrains.exposed.sql.*
+import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.transactions.transaction
import org.joda.time.DateTime
import tech.libeufin.util.*
+import java.time.LocalDateTime
+import java.time.ZoneId
+import java.util.concurrent.atomic.LongAdder
import kotlin.math.abs
import kotlin.math.min
@@ -306,34 +310,23 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
val addIncomingData = call.receive<TalerAdminAddIncoming>()
val debtor = parsePayto(addIncomingData.debit_account)
val amount = parseAmount(addIncomingData.amount)
- val (bookingDate, opaque_row_id) = transaction {
- val exchangeUser = authenticateRequest(call.request)
- val rawPayment = RawBankTransactionEntity.new {
- unstructuredRemittanceInformation = addIncomingData.reserve_pub
- transactionType = "CRDT"
- currency = amount.currency
- this.amount = amount.amount.toPlainString()
- counterpartBic = debtor.bic
- counterpartName = debtor.name
- counterpartIban = debtor.iban
- bookingDate = DateTime.now().millis
- status = "BOOK"
- bankAccount = getFacadeBankAccount(exchangeUser)
- }
- /** This payment is "valid by default" and will be returned
- * as soon as the exchange will ask for new payments. */
- val row = TalerIncomingPaymentEntity.new {
- payment = rawPayment
- valid = true
- }
- Pair(rawPayment.bookingDate, row.id.value)
+
+ val myLastSeenRawPayment = transaction {
+ val facadeID = expectNonNull(call.parameters["fcid"])
+ val facade = FacadeEntity.findById(facadeID) ?: throw NexusError(
+ HttpStatusCode.NotFound, "Could not find facade"
+ )
+ facade.highestSeenMsgID
}
return call.respond(
TextContent(
customConverter(
TalerAddIncomingResponse(
- timestamp = GnunetTimestamp(bookingDate/ 1000),
- row_id = opaque_row_id
+ timestamp = GnunetTimestamp(
+ // warning: this value might need to come from a real last-seen payment.
+ LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond()
+ ),
+ row_id = myLastSeenRawPayment
)
),
ContentType.Application.Json
@@ -350,21 +343,16 @@ suspend fun talerAddIncoming(call: ApplicationCall): Unit {
* in the local table).
*/
fun ingestTalerTransactions() {
- fun ingestIncoming(subscriberAccount: NexusBankAccountEntity) {
- val latestIncomingPayment = TalerIncomingPaymentEntity.all().maxBy { it.payment.id.value }
+ fun ingest(subscriberAccount: NexusBankAccountEntity, facade: FacadeEntity) {
+ var lastId = facade.highestSeenMsgID
RawBankTransactionEntity.find {
/** Those with exchange bank account involved */
RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value and
- /** Those that are incoming */
- (RawBankTransactionsTable.transactionType eq "CRDT") and
/** Those that are booked */
(RawBankTransactionsTable.status eq "BOOK") and
/** Those that came later than the latest processed payment */
- (RawBankTransactionsTable.id.greater(
- if (latestIncomingPayment == null) 0
- else latestIncomingPayment.payment.id.value
- ))
- }.forEach {
+ (RawBankTransactionsTable.id.greater(lastId))
+ }.orderBy(Pair(RawBankTransactionsTable.id, SortOrder.ASC)).forEach {
if (duplicatePayment(it)) {
logger.warn("Incoming payment already seen")
throw NexusError(
@@ -372,63 +360,46 @@ fun ingestTalerTransactions() {
"Incoming payment already seen"
)
}
- if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) {
- TalerIncomingPaymentEntity.new {
- payment = it
- valid = true
- }
- } else {
- TalerIncomingPaymentEntity.new {
- payment = it
- valid = false
+ // Incoming payment.
+ if (it.transactionType == "CRDT") {
+ if (CryptoUtil.checkValidEddsaPublicKey(it.unstructuredRemittanceInformation)) {
+ TalerIncomingPaymentEntity.new {
+ payment = it
+ valid = true
+ }
+ } else {
+ TalerIncomingPaymentEntity.new {
+ payment = it
+ valid = false
+ }
}
}
- }
- }
- fun ingestOutgoing(subscriberAccount: NexusBankAccountEntity) {
- val latestOutgoingPayment = TalerIncomingPaymentEntity.all().maxBy { it.payment.id.value }
- RawBankTransactionEntity.find {
- /** Those that came after the last processed payment */
- RawBankTransactionsTable.id.greater(
- if (latestOutgoingPayment == null) 0
- else latestOutgoingPayment.payment.id.value
- ) and
- /** Those involving the exchange bank account */
- (RawBankTransactionsTable.bankAccount eq subscriberAccount.id.value) and
- /** Those that are outgoing */
- (RawBankTransactionsTable.transactionType eq "DBIT")
- }.forEach {
- if (paymentFailed(it)) {
- logger.error("Bank didn't accept one payment from the exchange")
- throw NexusError(
- HttpStatusCode.InternalServerError,
- "Bank didn't accept one payment from the exchange"
- )
- }
- if (duplicatePayment(it)) {
- logger.warn("Incoming payment already seen")
- throw NexusError(
+ // Outgoing payment
+ if (it.transactionType == "DBIT") {
+ var talerRequested = TalerRequestedPaymentEntity.find {
+ TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation
+ }.firstOrNull() ?: throw NexusError(
HttpStatusCode.InternalServerError,
- "Outgoing payment already seen"
+ "Payment '${it.unstructuredRemittanceInformation}' shows in history, but was never requested!"
)
+ if (talerRequested != null) {
+ talerRequested.rawConfirmed = it
+ }
}
- var talerRequested = TalerRequestedPaymentEntity.find {
- TalerRequestedPayments.wtid eq it.unstructuredRemittanceInformation
- }.firstOrNull() ?: throw NexusError(
- HttpStatusCode.InternalServerError,
- "Unrecognized fresh outgoing payment met (subject: ${it.unstructuredRemittanceInformation})."
- )
- talerRequested.rawConfirmed = it
+ /** WARNING: it is not guaranteed that the last processed raw
+ * payment is ALSO the one with highest ID. A more accurate management
+ * is needed. */
+ lastId = it.id.value
}
+ facade.highestSeenMsgID = lastId
}
-
+ // invoke ingestion for all the facades
transaction {
FacadeEntity.find {
FacadesTable.type eq "taler-wire-gateway"
}.forEach {
val subscriberAccount = getFacadeBankAccount(it.creator)
- ingestIncoming(subscriberAccount)
- ingestOutgoing(subscriberAccount)
+ ingest(subscriberAccount, it)
}
}
}
@@ -523,7 +494,7 @@ fun talerFacadeRoutes(route: Route) {
return@get
}
route.get("") {
- call.respondText("Hello Taler")
+ call.respondText("Hello, this is Taler Facade")
return@get
}
} \ No newline at end of file