summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2024-04-08 11:39:53 +0200
committerAntoine A <>2024-04-09 15:46:02 +0200
commited18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d (patch)
treed8feeb4cd4a3dc60d7411370a01a6b9e33d5fc61
parentfc72eb5f8b3e82bca93b90c292848e5b731cee9d (diff)
downloadlibeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.tar.gz
libeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.tar.bz2
libeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.zip
Move more API and DB logic in common in preparation for libeufin-nexus REST API
-rw-r--r--bank/build.gradle5
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Constants.kt3
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Error.kt111
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Main.kt193
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt2
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt2
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt7
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt4
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt1
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt3
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt5
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt8
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt7
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt150
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt12
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt151
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt5
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt4
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt10
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/helpers.kt2
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/params.kt69
-rw-r--r--common/build.gradle11
-rw-r--r--common/src/main/kotlin/ApiError.kt128
-rw-r--r--common/src/main/kotlin/Constants.kt5
-rw-r--r--common/src/main/kotlin/TalerCommon.kt1
-rw-r--r--common/src/main/kotlin/TalerMessage.kt0
-rw-r--r--common/src/main/kotlin/api/server.kt225
-rw-r--r--common/src/main/kotlin/db/helpers.kt113
-rw-r--r--common/src/main/kotlin/db/notifications.kt91
-rw-r--r--common/src/main/kotlin/db/transaction.kt (renamed from common/src/main/kotlin/db/utils.kt)0
-rw-r--r--common/src/main/kotlin/params.kt95
31 files changed, 776 insertions, 647 deletions
diff --git a/bank/build.gradle b/bank/build.gradle
index 9070480a..01b884ab 100644
--- a/bank/build.gradle
+++ b/bank/build.gradle
@@ -27,13 +27,8 @@ dependencies {
implementation("com.github.ajalt.clikt:clikt:$clikt_version")
implementation("io.ktor:ktor-server-core:$ktor_version")
- implementation("io.ktor:ktor-server-call-logging:$ktor_version")
- implementation("io.ktor:ktor-server-cors:$ktor_version")
- implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
- implementation("io.ktor:ktor-server-status-pages:$ktor_version")
implementation("io.ktor:ktor-server-netty:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
- implementation("io.ktor:ktor-server-forwarded-header:$ktor_version")
// UNIX domain sockets support (used to connect to PostgreSQL)
implementation("com.kohlschutter.junixsocket:junixsocket-core:$junixsocket_version")
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
index 4bcd2ee1..766aa5f6 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
@@ -36,9 +36,6 @@ val TOKEN_DEFAULT_DURATION: Duration = Duration.ofDays(1L)
val RESERVED_ACCOUNTS = setOf("admin", "bank")
const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5
-// Security
-const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB
-
// API version
const val COREBANK_API_VERSION: String = "4:7:0"
const val CONVERSION_API_VERSION: String = "0:0:0"
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Error.kt b/bank/src/main/kotlin/tech/libeufin/bank/Error.kt
index 18735633..9c5a115d 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Error.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Error.kt
@@ -23,110 +23,7 @@ import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.util.*
import kotlinx.serialization.Serializable
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.TalerErrorCode
-
-/**
- * Convenience type to throw errors along the bank activity
- * and that is meant to be caught by Ktor and responded to the
- * client.
- */
-class LibeufinException(
- // Status code that Ktor will set for the response.
- val httpStatus: HttpStatusCode,
- // Error detail object, after Taler API.
- val talerError: TalerError
-) : Exception(talerError.hint)
-
-/**
- * Error object to respond to the client. The
- * 'code' field takes values from the GANA gnu-taler-error-code
- * specification. 'hint' is a human-readable description
- * of the error.
- */
-@Serializable
-data class TalerError(
- @kotlinx.serialization.Transient val err: TalerErrorCode = TalerErrorCode.END,
- val code: Int,
- val hint: String? = null,
- val detail: String? = null
-)
-
-private val LOG_MSG = AttributeKey<String>("log_msg")
-
-fun ApplicationCall.logMsg(): String? = attributes.getOrNull(LOG_MSG)
-
-suspend fun ApplicationCall.err(
- status: HttpStatusCode,
- hint: String?,
- error: TalerErrorCode
-) {
- err(
- LibeufinException(
- httpStatus = status, talerError = TalerError(
- code = error.code, err = error, hint = hint
- )
- )
- )
-}
-
-suspend fun ApplicationCall.err(
- err: LibeufinException
-) {
- attributes.put(LOG_MSG, "${err.talerError.err.name} ${err.talerError.hint}")
- respond(
- status = err.httpStatus,
- message = err.talerError
- )
-}
-
-
-fun libeufinError(
- status: HttpStatusCode,
- hint: String?,
- error: TalerErrorCode,
- detail: String? = null
-): LibeufinException = LibeufinException(
- httpStatus = status, talerError = TalerError(
- code = error.code, err = error, hint = hint, detail = detail
- )
-)
-
-/* ----- HTTP error ----- */
-
-fun forbidden(
- hint: String = "No rights on the resource",
- error: TalerErrorCode = TalerErrorCode.END
-): LibeufinException = libeufinError(HttpStatusCode.Forbidden, hint, error)
-
-fun unauthorized(
- hint: String,
- error: TalerErrorCode = TalerErrorCode.GENERIC_UNAUTHORIZED
-): LibeufinException = libeufinError(HttpStatusCode.Unauthorized, hint, error)
-
-fun internalServerError(hint: String?): LibeufinException
- = libeufinError(HttpStatusCode.InternalServerError, hint, TalerErrorCode.GENERIC_INTERNAL_INVARIANT_FAILURE)
-
-fun notFound(
- hint: String,
- error: TalerErrorCode
-): LibeufinException = libeufinError(HttpStatusCode.NotFound, hint, error)
-
-fun conflict(
- hint: String, error: TalerErrorCode
-): LibeufinException = libeufinError(HttpStatusCode.Conflict, hint, error)
-
-fun badRequest(
- hint: String? = null,
- error: TalerErrorCode = TalerErrorCode.GENERIC_JSON_INVALID,
- detail: String? = null
-): LibeufinException = libeufinError(HttpStatusCode.BadRequest, hint, error, detail)
-
-fun unsupportedMediaType(
- hint: String,
- error: TalerErrorCode = TalerErrorCode.END,
-): LibeufinException = libeufinError(HttpStatusCode.UnsupportedMediaType, hint, error)
-
+import tech.libeufin.common.*
/* ----- Currency checks ----- */
@@ -146,21 +43,21 @@ fun BankConfig.checkFiatCurrency(amount: TalerAmount) {
/* ----- Common errors ----- */
-fun unknownAccount(id: String): LibeufinException {
+fun unknownAccount(id: String): ApiException {
return notFound(
"Account '$id' not found",
TalerErrorCode.BANK_UNKNOWN_ACCOUNT
)
}
-fun unknownCreditorAccount(id: String): LibeufinException {
+fun unknownCreditorAccount(id: String): ApiException {
return conflict(
"Creditor account '$id' not found",
TalerErrorCode.BANK_UNKNOWN_CREDITOR
)
}
-fun unsupportedTanChannel(channel: TanChannel): LibeufinException {
+fun unsupportedTanChannel(channel: TanChannel): ApiException {
return conflict(
"Unsupported tan channel $channel",
TalerErrorCode.BANK_TAN_CHANNEL_NOT_SUPPORTED
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt
index 1e28af34..8733b907 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt
@@ -35,12 +35,6 @@ import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.http.content.*
import io.ktor.server.netty.*
-import io.ktor.server.plugins.*
-import io.ktor.server.plugins.callloging.*
-import io.ktor.server.plugins.contentnegotiation.*
-import io.ktor.server.plugins.cors.routing.*
-import io.ktor.server.plugins.forwardedheaders.*
-import io.ktor.server.plugins.statuspages.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
@@ -55,6 +49,7 @@ import tech.libeufin.bank.api.*
import tech.libeufin.bank.db.AccountDAO.*
import tech.libeufin.bank.db.Database
import tech.libeufin.common.*
+import tech.libeufin.common.api.*
import tech.libeufin.common.db.dbInit
import tech.libeufin.common.db.pgDataSource
import java.net.InetAddress
@@ -68,187 +63,23 @@ import kotlin.io.path.readText
private val logger: Logger = LoggerFactory.getLogger("libeufin-bank")
// Dirty local variable to stop the server in test TODO remove this ugly hack
-var engine: ApplicationEngine? = null
+var engine: ApplicationEngine? = null
-/**
- * This plugin checks for body length limit and inflates the requests that have "Content-Encoding: deflate"
- */
-val bodyPlugin = createApplicationPlugin("BodyLimitAndDecompression") {
- onCallReceive { call ->
- // TODO check content length as an optimisation
- transformBody { body ->
- val bytes = ByteArray(MAX_BODY_LENGTH.toInt() + 1)
- var read = 0
- when (val encoding = call.request.headers[HttpHeaders.ContentEncoding]) {
- "deflate" -> {
- // Decompress and check decompressed length
- val inflater = Inflater()
- while (!body.isClosedForRead) {
- body.read { buf ->
- inflater.setInput(buf)
- try {
- read += inflater.inflate(bytes, read, bytes.size - read)
- } catch (e: DataFormatException) {
- logger.error("Deflated request failed to inflate: ${e.message}")
- throw badRequest(
- "Could not inflate request",
- TalerErrorCode.GENERIC_COMPRESSION_INVALID
- )
- }
- }
- if (read > MAX_BODY_LENGTH)
- throw badRequest("Decompressed body is suspiciously big > $MAX_BODY_LENGTH B")
- }
- }
- null -> {
- // Check body length
- while (true) {
- val new = body.readAvailable(bytes, read, bytes.size - read)
- if (new == -1) break // Channel is closed
- read += new
- if (read > MAX_BODY_LENGTH)
- throw badRequest("Body is suspiciously big > $MAX_BODY_LENGTH B")
- }
- }
- else -> throw unsupportedMediaType(
- "Content encoding '$encoding' not supported, expected plain or deflate",
- TalerErrorCode.GENERIC_COMPRESSION_INVALID
- )
- }
- ByteReadChannel(bytes, 0, read)
- }
- }
-}
/**
* Set up web server handlers for the Taler corebank API.
*/
-fun Application.corebankWebApp(db: Database, ctx: BankConfig) {
- install(CallLogging) {
- this.level = Level.INFO
- this.logger = tech.libeufin.bank.logger
- this.format { call ->
- val status = call.response.status()
- val httpMethod = call.request.httpMethod.value
- val path = call.request.path()
- val msg = call.logMsg()
- if (msg != null) {
- "$status, $httpMethod $path, $msg"
- } else {
- "$status, $httpMethod $path"
- }
- }
- }
- install(XForwardedHeaders)
- install(CORS) {
- anyHost()
- allowHeader(HttpHeaders.Authorization)
- allowHeader(HttpHeaders.ContentType)
- allowMethod(HttpMethod.Options)
- allowMethod(HttpMethod.Patch)
- allowMethod(HttpMethod.Delete)
- allowCredentials = true
- }
- install(bodyPlugin)
- install(IgnoreTrailingSlash)
- install(ContentNegotiation) {
- json(Json {
- @OptIn(ExperimentalSerializationApi::class)
- explicitNulls = false
- encodeDefaults = true
- ignoreUnknownKeys = true
- })
- }
- install(StatusPages) {
- status(HttpStatusCode.NotFound) { call, status ->
- call.err(
- status,
- "There is no endpoint defined for the URL provided by the client. Check if you used the correct URL and/or file a report with the developers of the client software.",
- TalerErrorCode.GENERIC_ENDPOINT_UNKNOWN
- )
- }
- status(HttpStatusCode.MethodNotAllowed) { call, status ->
- call.err(
- status,
- "The HTTP method used is invalid for this endpoint. This is likely a bug in the client implementation. Check if you are using the latest available version and/or file a report with the developers.",
- TalerErrorCode.GENERIC_METHOD_INVALID
- )
- }
- exception<Exception> { call, cause ->
- logger.debug("request failed", cause)
- when (cause) {
- is LibeufinException -> call.err(cause)
- is SQLException -> {
- when (cause.sqlState) {
- PSQLState.SERIALIZATION_FAILURE.state -> call.err(
- HttpStatusCode.InternalServerError,
- "Transaction serialization failure",
- TalerErrorCode.BANK_SOFT_EXCEPTION
- )
- else -> call.err(
- HttpStatusCode.InternalServerError,
- "Unexpected sql error with state ${cause.sqlState}",
- TalerErrorCode.BANK_UNMANAGED_EXCEPTION
- )
- }
- }
- is BadRequestException -> {
- /**
- * NOTE: extracting the root cause helps with JSON error messages,
- * because they mention the particular way they are invalid, but OTOH
- * it loses (by getting null) other error messages, like for example
- * the one from MissingRequestParameterException. Therefore, in order
- * to get the most detailed message, we must consider BOTH sides:
- * the 'cause' AND its root cause!
- */
- var rootCause: Throwable? = cause.cause
- while (rootCause?.cause != null)
- rootCause = rootCause.cause
- // Telling apart invalid JSON vs missing parameter vs invalid parameter.
- val talerErrorCode = when {
- cause is MissingRequestParameterException ->
- TalerErrorCode.GENERIC_PARAMETER_MISSING
- cause is ParameterConversionException ->
- TalerErrorCode.GENERIC_PARAMETER_MALFORMED
- rootCause is CommonError -> when (rootCause) {
- is CommonError.AmountFormat -> TalerErrorCode.BANK_BAD_FORMAT_AMOUNT
- is CommonError.AmountNumberTooBig -> TalerErrorCode.BANK_NUMBER_TOO_BIG
- is CommonError.Payto -> TalerErrorCode.GENERIC_JSON_INVALID
- }
- else -> TalerErrorCode.GENERIC_JSON_INVALID
- }
- call.err(
- badRequest(
- cause.message,
- talerErrorCode,
- /* Here getting _some_ error message, by giving precedence
- * to the root cause, as otherwise JSON details would be lost. */
- rootCause?.message
- )
- )
- }
- else -> {
- call.err(
- HttpStatusCode.InternalServerError,
- cause.message,
- TalerErrorCode.BANK_UNMANAGED_EXCEPTION
- )
- }
- }
- }
- }
- routing {
- coreBankApi(db, ctx)
- conversionApi(db, ctx)
- bankIntegrationApi(db, ctx)
- wireGatewayApi(db, ctx)
- revenueApi(db, ctx)
- ctx.spaPath?.let {
- get("/") {
- call.respondRedirect("/webui/")
- }
- staticFiles("/webui/", it.toFile())
+fun Application.corebankWebApp(db: Database, ctx: BankConfig) = talerApi(logger) {
+ coreBankApi(db, ctx)
+ conversionApi(db, ctx)
+ bankIntegrationApi(db, ctx)
+ wireGatewayApi(db, ctx)
+ revenueApi(db, ctx)
+ ctx.spaPath?.let {
+ get("/") {
+ call.respondRedirect("/webui/")
}
+ staticFiles("/webui/", it.toFile())
}
}
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt b/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt
index bb148166..41b57d1d 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt
@@ -30,7 +30,7 @@ import kotlinx.serialization.json.JsonDecoder
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.longOrNull
-import tech.libeufin.common.TalerAmount
+import tech.libeufin.common.*
import java.net.URL
import java.time.Duration
import java.time.Instant
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt
index 05505214..b88d8fa1 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt
@@ -30,7 +30,7 @@ import tech.libeufin.bank.*
import tech.libeufin.bank.db.AbortResult
import tech.libeufin.bank.db.Database
import tech.libeufin.bank.db.WithdrawalDAO.WithdrawalSelectionResult
-import tech.libeufin.common.TalerErrorCode
+import tech.libeufin.common.*
fun Routing.bankIntegrationApi(db: Database, ctx: BankConfig) {
get("/taler-integration/config") {
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt
index 8dcc1f60..2a466db6 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt
@@ -28,14 +28,13 @@ import tech.libeufin.bank.auth.authAdmin
import tech.libeufin.bank.db.ConversionDAO
import tech.libeufin.bank.db.ConversionDAO.ConversionResult
import tech.libeufin.bank.db.Database
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.TalerErrorCode
+import tech.libeufin.common.*
fun Routing.conversionApi(db: Database, ctx: BankConfig) = conditional(ctx.allowConversion) {
get("/conversion-info/config") {
val config = db.conversion.getConfig(ctx.regionalCurrency, ctx.fiatCurrency!!)
if (config == null) {
- throw libeufinError(
+ throw apiError(
HttpStatusCode.NotImplemented,
"conversion rate not configured yet",
TalerErrorCode.END
@@ -62,7 +61,7 @@ fun Routing.conversionApi(db: Database, ctx: BankConfig) = conditional(ctx.allow
"$input is too small to be converted",
TalerErrorCode.BANK_BAD_CONVERSION
)
- is ConversionResult.MissingConfig -> throw libeufinError(
+ is ConversionResult.MissingConfig -> throw apiError(
HttpStatusCode.NotImplemented,
"conversion rate not configured yet",
TalerErrorCode.END
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt
index 1ef1ace5..c659b0f0 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt
@@ -699,7 +699,7 @@ private fun Routing.coreBankTanApi(db: Database, ctx: BankConfig) {
exitValue
}
if (exitValue != 0) {
- throw libeufinError(
+ throw apiError(
HttpStatusCode.BadGateway,
"Tan channel script failure with exit value $exitValue",
TalerErrorCode.BANK_TAN_CHANNEL_SCRIPT_FAILED
@@ -733,7 +733,7 @@ private fun Routing.coreBankTanApi(db: Database, ctx: BankConfig) {
"Incorrect TAN code",
TalerErrorCode.BANK_TAN_CHALLENGE_FAILED
)
- TanSolveResult.NoRetry -> throw libeufinError(
+ TanSolveResult.NoRetry -> throw apiError(
HttpStatusCode.TooManyRequests,
"Too many failed confirmation attempt",
TalerErrorCode.BANK_TAN_RATE_LIMITED
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt
index f91054ce..b46cf875 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt
@@ -25,6 +25,7 @@ import io.ktor.server.routing.*
import tech.libeufin.bank.*
import tech.libeufin.bank.auth.auth
import tech.libeufin.bank.db.Database
+import tech.libeufin.common.*
fun Routing.revenueApi(db: Database, ctx: BankConfig) {
auth(db, TokenScope.readonly) {
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt
index 5b10f7d1..8a35de4c 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt
@@ -35,8 +35,7 @@ import tech.libeufin.bank.db.Database
import tech.libeufin.bank.db.ExchangeDAO
import tech.libeufin.bank.db.ExchangeDAO.AddIncomingResult
import tech.libeufin.bank.db.ExchangeDAO.TransferResult
-import tech.libeufin.common.BankPaytoCtx
-import tech.libeufin.common.TalerErrorCode
+import tech.libeufin.common.*
import java.time.Instant
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt b/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt
index 83bde34f..c0ed1100 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt
@@ -27,11 +27,8 @@ import io.ktor.util.*
import io.ktor.util.pipeline.*
import tech.libeufin.bank.*
import tech.libeufin.bank.db.Database
-import tech.libeufin.common.Base32Crockford
-import tech.libeufin.common.TalerErrorCode
+import tech.libeufin.common.*
import tech.libeufin.common.crypto.PwCrypto
-import tech.libeufin.common.decodeBase64
-import tech.libeufin.common.splitOnce
import java.time.Instant
/** Used to store if the currently authenticated user is admin */
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt
index c4f25657..799d466a 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt
@@ -20,12 +20,8 @@
package tech.libeufin.bank.db
import tech.libeufin.bank.*
-import tech.libeufin.common.ShortHashCode
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.asInstant
-import tech.libeufin.common.db.getAmount
-import tech.libeufin.common.db.oneOrNull
-import tech.libeufin.common.micros
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
import java.time.Instant
/** Data access logic for cashout operations */
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt
index 5849881d..04317b68 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt
@@ -22,11 +22,8 @@ package tech.libeufin.bank.db
import tech.libeufin.bank.ConversionRate
import tech.libeufin.bank.DecimalNumber
import tech.libeufin.bank.RoundingMode
-import tech.libeufin.bank.internalServerError
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.db.getAmount
-import tech.libeufin.common.db.oneOrNull
-import tech.libeufin.common.db.transaction
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
/** Data access logic for conversion */
class ConversionDAO(private val db: Database) {
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
index b6e6fd7e..9efb821a 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
@@ -27,17 +27,17 @@ import kotlinx.coroutines.withTimeoutOrNull
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import tech.libeufin.bank.*
-import tech.libeufin.common.asInstant
+import tech.libeufin.common.*
import tech.libeufin.common.db.*
import java.sql.PreparedStatement
import java.sql.ResultSet
-import kotlin.math.abs
+import java.util.concurrent.ConcurrentHashMap
+import java.util.*
private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db")
class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, internal val fiatCurrency: String?): DbPool(dbConfig, "libeufin-bank") {
- internal val notifWatcher: NotificationWatcher = NotificationWatcher(pgSource)
-
+ // DAOs
val cashout = CashoutDAO(this)
val withdrawal = WithdrawalDAO(this)
val exchange = ExchangeDAO(this)
@@ -48,6 +48,67 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte
val tan = TanDAO(this)
val gc = GcDAO(this)
+ // Transaction flows, the keys are the bank account id
+ private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
+ private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
+ private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
+ private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
+ // Withdrawal confirmation flow, the key is the public withdrawal UUID
+ private val withdrawalFlow = ConcurrentHashMap<UUID, CountedSharedFlow<WithdrawalStatus>>()
+
+ init {
+ watchNotifications(pgSource, "libeufin_bank", LoggerFactory.getLogger("libeufin-bank-db-watcher"), mapOf(
+ "bank_tx" to {
+ val (debtor, creditor, debitRow, creditRow) = it.split(' ', limit = 4).map { it.toLong() }
+ bankTxFlows[debtor]?.run {
+ flow.emit(debitRow)
+ }
+ bankTxFlows[creditor]?.run {
+ flow.emit(creditRow)
+ }
+ revenueTxFlows[creditor]?.run {
+ flow.emit(creditRow)
+ }
+ },
+ "outgoing_tx" to {
+ val (account, merchant, debitRow, creditRow) = it.split(' ', limit = 4).map { it.toLong() }
+ outgoingTxFlows[account]?.run {
+ flow.emit(debitRow)
+ }
+ },
+ "incoming_tx" to {
+ val (account, row) = it.split(' ', limit = 2).map { it.toLong() }
+ incomingTxFlows[account]?.run {
+ flow.emit(row)
+ }
+ },
+ "withdrawal_status" to {
+ val raw = it.split(' ', limit = 2)
+ val uuid = UUID.fromString(raw[0])
+ val status = WithdrawalStatus.valueOf(raw[1])
+ withdrawalFlow[uuid]?.run {
+ flow.emit(status)
+ }
+ }
+ ))
+ }
+
+ /** Listen for new bank transactions for [account] */
+ suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R
+ = listen(bankTxFlows, account, lambda)
+ /** Listen for new taler outgoing transactions from [account] */
+ suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R
+ = listen(outgoingTxFlows, exchange, lambda)
+ /** Listen for new taler incoming transactions to [account] */
+ suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R
+ = listen(incomingTxFlows, exchange, lambda)
+ /** Listen for new taler outgoing transactions to [account] */
+ suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R
+ = listen(revenueTxFlows, merchant, lambda)
+ /** Listen for new withdrawal confirmations */
+ suspend fun <R> listenWithdrawals(withdrawal: UUID, lambda: suspend (Flow<WithdrawalStatus>) -> R): R
+ = listen(withdrawalFlow, withdrawal, lambda)
+
suspend fun monitor(
params: MonitorParams
): MonitorResponse = conn { conn ->
@@ -87,7 +148,7 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte
talerOutCount = it.getLong("taler_out_count"),
talerOutVolume = it.getAmount("taler_out_volume", bankCurrency),
)
- } ?: MonitorNoConversion(
+ } ?: MonitorNoConversion(
talerInCount = it.getLong("taler_in_count"),
talerInVolume = it.getAmount("taler_in_volume", bankCurrency),
talerOutCount = it.getLong("taler_out_count"),
@@ -95,85 +156,6 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte
)
} ?: throw internalServerError("No result from DB procedure stats_get_frame")
}
-
- /** Apply paging logic to a sql query */
- internal suspend fun <T> page(
- params: PageParams,
- idName: String,
- query: String,
- bind: PreparedStatement.() -> Int = { 0 },
- map: (ResultSet) -> T
- ): List<T> = conn { conn ->
- val backward = params.delta < 0
- val pageQuery = """
- $query
- $idName ${if (backward) '<' else '>'} ?
- ORDER BY $idName ${if (backward) "DESC" else "ASC"}
- LIMIT ?
- """
- conn.prepareStatement(pageQuery).run {
- val pad = bind()
- setLong(pad + 1, params.start)
- setInt(pad + 2, abs(params.delta))
- all { map(it) }
- }
- }
-
- /**
- * The following function returns the list of transactions, according
- * to the history parameters and perform long polling when necessary
- */
- internal suspend fun <T> poolHistory(
- params: HistoryParams,
- bankAccountId: Long,
- listen: suspend NotificationWatcher.(Long, suspend (Flow<Long>) -> List<T>) -> List<T>,
- query: String,
- accountColumn: String = "bank_account_id",
- map: (ResultSet) -> T
- ): List<T> {
-
- suspend fun load(): List<T> = page(
- params.page,
- "bank_transaction_id",
- "$query $accountColumn=? AND",
- {
- setLong(1, bankAccountId)
- 1
- },
- map
- )
-
-
- // TODO do we want to handle polling when going backward and there is no transactions yet ?
- // When going backward there is always at least one transaction or none
- return if (params.page.delta >= 0 && params.polling.poll_ms > 0) {
- notifWatcher.(listen)(bankAccountId) { flow ->
- coroutineScope {
- // Start buffering notification before loading transactions to not miss any
- val polling = launch {
- withTimeoutOrNull(params.polling.poll_ms) {
- flow.first { it > params.page.start } // Always forward so >
- }
- }
- // Initial loading
- val init = load()
- // Long polling if we found no transactions
- if (init.isEmpty()) {
- if (polling.join() != null) {
- load()
- } else {
- init
- }
- } else {
- polling.cancel()
- init
- }
- }
- }
- } else {
- load()
- }
- }
}
/** Result status of withdrawal or cashout operation abortion */
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt
index 3ae15c7e..644c17d3 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt
@@ -20,12 +20,8 @@
package tech.libeufin.bank.db
import tech.libeufin.bank.*
-import tech.libeufin.common.BankPaytoCtx
-import tech.libeufin.common.EddsaPublicKey
-import tech.libeufin.common.ShortHashCode
-import tech.libeufin.common.db.getAmount
-import tech.libeufin.common.db.getBankPayto
-import tech.libeufin.common.micros
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
import java.time.Instant
/** Data access logic for exchange specific logic */
@@ -36,7 +32,7 @@ class ExchangeDAO(private val db: Database) {
exchangeId: Long,
ctx: BankPaytoCtx
): List<IncomingReserveTransaction>
- = db.poolHistory(params, exchangeId, NotificationWatcher::listenIncoming, """
+ = db.poolHistory(params, exchangeId, db::listenIncoming, """
SELECT
bank_transaction_id
,transaction_date
@@ -65,7 +61,7 @@ class ExchangeDAO(private val db: Database) {
exchangeId: Long,
ctx: BankPaytoCtx
): List<OutgoingTransaction>
- = db.poolHistory(params, exchangeId, NotificationWatcher::listenOutgoing, """
+ = db.poolHistory(params, exchangeId, db::listenOutgoing, """
SELECT
bank_transaction_id
,transaction_date
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt
deleted file mode 100644
index 077cb771..00000000
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * This file is part of LibEuFin.
- * Copyright (C) 2023 Taler Systems S.A.
-
- * LibEuFin is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation; either version 3, or
- * (at your option) any later version.
-
- * LibEuFin is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
- * Public License for more details.
-
- * You should have received a copy of the GNU Affero General Public
- * License along with LibEuFin; see the file COPYING. If not, see
- * <http://www.gnu.org/licenses/>
- */
-
-package tech.libeufin.bank.db
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import org.postgresql.ds.PGSimpleDataSource
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import tech.libeufin.bank.*
-import tech.libeufin.common.*
-import tech.libeufin.common.db.*
-import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-
-private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db-watcher")
-
-/** Postgres notification collector and distributor */
-internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) {
- // ShareFlow that are manually counted for manual garbage collection
- private class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int)
-
- // Transaction flows, the keys are the bank account id
- private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
- private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
- private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
- private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>()
- // Withdrawal confirmation flow, the key is the public withdrawal UUID
- private val withdrawalFlow = ConcurrentHashMap<UUID, CountedSharedFlow<WithdrawalStatus>>()
-
- private val backoff = ExpoBackoffDecorr()
-
- init {
- // Run notification logic in a separated thread
- kotlin.concurrent.thread(isDaemon = true) {
- runBlocking {
- while (true) {
- try {
- val conn = pgSource.pgConnection("libeufin_bank")
-
- // Listen to all notifications channels
- conn.execSQLUpdate("LISTEN bank_tx")
- conn.execSQLUpdate("LISTEN outgoing_tx")
- conn.execSQLUpdate("LISTEN incoming_tx")
- conn.execSQLUpdate("LISTEN withdrawal_status")
-
- backoff.reset()
-
- while (true) {
- conn.getNotifications(0) // Block until we receive at least one notification
- .forEach {
- // Extract information and dispatch
- when (it.name) {
- "bank_tx" -> {
- val (debtor, creditor, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() }
- bankTxFlows[debtor]?.run {
- flow.emit(debitRow)
- }
- bankTxFlows[creditor]?.run {
- flow.emit(creditRow)
- }
- revenueTxFlows[creditor]?.run {
- flow.emit(creditRow)
- }
- }
- "outgoing_tx" -> {
- val (account, merchant, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() }
- outgoingTxFlows[account]?.run {
- flow.emit(debitRow)
- }
- }
- "incoming_tx" -> {
- val (account, row) = it.parameter.split(' ', limit = 2).map { it.toLong() }
- incomingTxFlows[account]?.run {
- flow.emit(row)
- }
- }
- "withdrawal_status" -> {
- val raw = it.parameter.split(' ', limit = 2)
- val uuid = UUID.fromString(raw[0])
- val status = WithdrawalStatus.valueOf(raw[1])
- withdrawalFlow[uuid]?.run {
- flow.emit(status)
- }
- }
- }
- }
- }
- } catch (e: Exception) {
- e.fmtLog(logger)
- delay(backoff.next())
- }
- }
- }
- }
- }
-
- /** Listen to flow from [map] for [key] using [lambda]*/
- private suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R {
- // Register listener, create a new flow if missing
- val flow = map.compute(key) { _, v ->
- val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0)
- tmp.count++
- tmp
- }!!.flow
-
- try {
- return lambda(flow)
- } finally {
- // Unregister listener, removing unused flow
- map.compute(key) { _, v ->
- v!!
- v.count--
- if (v.count > 0) v else null
- }
- }
- }
-
- /** Listen for new bank transactions for [account] */
- suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R
- = listen(bankTxFlows, account, lambda)
- /** Listen for new taler outgoing transactions from [account] */
- suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R
- = listen(outgoingTxFlows, exchange, lambda)
- /** Listen for new taler incoming transactions to [account] */
- suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R
- = listen(incomingTxFlows, exchange, lambda)
- /** Listen for new taler outgoing transactions to [account] */
- suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R
- = listen(revenueTxFlows, merchant, lambda)
- /** Listen for new withdrawal confirmations */
- suspend fun <R> listenWithdrawals(withdrawal: UUID, lambda: suspend (Flow<WithdrawalStatus>) -> R): R
- = listen(withdrawalFlow, withdrawal, lambda)
-} \ No newline at end of file
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt
index 388d530a..524d5578 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt
@@ -21,9 +21,8 @@ package tech.libeufin.bank.db
import tech.libeufin.bank.Operation
import tech.libeufin.bank.TanChannel
-import tech.libeufin.bank.internalServerError
-import tech.libeufin.common.db.oneOrNull
-import tech.libeufin.common.micros
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
import java.time.Duration
import java.time.Instant
import java.util.concurrent.TimeUnit
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt
index 7fb8823d..794eb04d 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt
@@ -189,7 +189,7 @@ class TransactionDAO(private val db: Database) {
accountId: Long,
ctx: BankPaytoCtx
): List<BankAccountTransactionInfo> {
- return db.poolHistory(params, accountId, NotificationWatcher::listenBank, """
+ return db.poolHistory(params, accountId, db::listenBank, """
SELECT
bank_transaction_id
,transaction_date
@@ -221,7 +221,7 @@ class TransactionDAO(private val db: Database) {
accountId: Long,
ctx: BankPaytoCtx
): List<RevenueIncomingBankTransaction>
- = db.poolHistory(params, accountId, NotificationWatcher::listenRevenue, """
+ = db.poolHistory(params, accountId, db::listenRevenue, """
SELECT
bank_transaction_id
,transaction_date
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt
index 6a02205c..efaa4b74 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt
@@ -24,12 +24,8 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import tech.libeufin.bank.*
-import tech.libeufin.common.EddsaPublicKey
-import tech.libeufin.common.Payto
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.db.getAmount
-import tech.libeufin.common.db.oneOrNull
-import tech.libeufin.common.micros
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
import java.time.Instant
import java.util.*
@@ -207,7 +203,7 @@ class WithdrawalDAO(private val db: Database) {
load: suspend () -> T?
): T? {
return if (params.polling.poll_ms > 0) {
- db.notifWatcher.listenWithdrawals(uuid) { flow ->
+ db.listenWithdrawals(uuid) { flow ->
coroutineScope {
// Start buffering notification before loading transactions to not miss any
val polling = launch {
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
index e502f62c..094b7996 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
@@ -149,7 +149,7 @@ fun Route.intercept(callback: Route.() -> Unit, interceptor: suspend PipelineCon
fun Route.conditional(implemented: Boolean, callback: Route.() -> Unit): Route =
intercept(callback) {
if (!implemented) {
- throw libeufinError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END)
+ throw apiError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END)
}
}
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/params.kt b/bank/src/main/kotlin/tech/libeufin/bank/params.kt
index 72e6cabd..3f34fb36 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/params.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/params.kt
@@ -20,44 +20,13 @@
package tech.libeufin.bank
import io.ktor.http.*
-import tech.libeufin.common.TalerAmount
-import tech.libeufin.common.TalerErrorCode
+import tech.libeufin.common.*
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.temporal.TemporalAdjusters
import java.util.*
-fun Parameters.expect(name: String): String
- = get(name) ?: throw badRequest("Missing '$name' parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
-fun Parameters.int(name: String): Int?
- = get(name)?.run { toIntOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) }
-fun Parameters.expectInt(name: String): Int
- = int(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
-fun Parameters.long(name: String): Long?
- = get(name)?.run { toLongOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) }
-fun Parameters.expectLong(name: String): Long
- = long(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
-fun Parameters.uuid(name: String): UUID? {
- return get(name)?.run {
- try {
- UUID.fromString(this)
- } catch (e: Exception) {
- throw badRequest("Param '$name' not an UUID", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- }
- }
-}
-fun Parameters.expectUuid(name: String): UUID
- = uuid(name) ?: throw badRequest("Missing '$name' UUID parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
-fun Parameters.amount(name: String): TalerAmount?
- = get(name)?.run {
- try {
- TalerAmount(this)
- } catch (e: Exception) {
- throw badRequest("Param '$name' not a taler amount", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- }
- }
-
data class MonitorParams(
val timeframe: Timeframe,
val date: LocalDateTime
@@ -125,42 +94,6 @@ data class AccountParams(
}
}
-data class PageParams(
- val delta: Int, val start: Long
-) {
- companion object {
- fun extract(params: Parameters): PageParams {
- val delta: Int = params.int("delta") ?: -20
- val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE
- if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- // TODO enforce delta limit
- return PageParams(delta, start)
- }
- }
-}
-
-data class PollingParams(
- val poll_ms: Long
-) {
- companion object {
- fun extract(params: Parameters): PollingParams {
- val poll_ms: Long = params.long("long_poll_ms") ?: 0
- if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- return PollingParams(poll_ms)
- }
- }
-}
-
-data class HistoryParams(
- val page: PageParams, val polling: PollingParams
-) {
- companion object {
- fun extract(params: Parameters): HistoryParams {
- return HistoryParams(PageParams.extract(params), PollingParams.extract(params))
- }
- }
-}
-
data class RateParams(
val debit: TalerAmount?, val credit: TalerAmount?
) {
diff --git a/common/build.gradle b/common/build.gradle
index cc9649af..cdc9c3e0 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -24,10 +24,19 @@ dependencies {
implementation("org.postgresql:postgresql:$postgres_version")
implementation("com.zaxxer:HikariCP:5.1.0")
+ implementation("io.ktor:ktor-server-core:$ktor_version")
+ implementation("io.ktor:ktor-server-call-logging:$ktor_version")
+ implementation("io.ktor:ktor-server-cors:$ktor_version")
+ implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
+ implementation("io.ktor:ktor-server-status-pages:$ktor_version")
+ implementation("io.ktor:ktor-server-netty:$ktor_version")
+ implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
+ implementation("io.ktor:ktor-server-forwarded-header:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("io.ktor:ktor-server-test-host:$ktor_version")
- implementation("org.jetbrains.kotlin:kotlin-test:$kotlin_version")
+
implementation("com.github.ajalt.clikt:clikt:$clikt_version")
+ implementation("org.jetbrains.kotlin:kotlin-test:$kotlin_version")
testImplementation("uk.org.webcompere:system-stubs-core:2.1.6")
} \ No newline at end of file
diff --git a/common/src/main/kotlin/ApiError.kt b/common/src/main/kotlin/ApiError.kt
new file mode 100644
index 00000000..15ae871c
--- /dev/null
+++ b/common/src/main/kotlin/ApiError.kt
@@ -0,0 +1,128 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+package tech.libeufin.common
+
+import io.ktor.http.*
+import io.ktor.server.application.*
+import io.ktor.server.response.*
+import io.ktor.util.*
+import kotlinx.serialization.Serializable
+import tech.libeufin.common.TalerAmount
+import tech.libeufin.common.TalerErrorCode
+
+/**
+ * Convenience type to throw errors along the API activity
+ * and that is meant to be caught by Ktor and responded to the
+ * client.
+ */
+class ApiException(
+ // Status code that Ktor will set for the response.
+ val httpStatus: HttpStatusCode,
+ // Error detail object, after Taler API.
+ val talerError: TalerError
+) : Exception(talerError.hint)
+
+/**
+ * Error object to respond to the client. The
+ * 'code' field takes values from the GANA gnu-taler-error-code
+ * specification. 'hint' is a human-readable description
+ * of the error.
+ */
+@Serializable
+data class TalerError(
+ @kotlinx.serialization.Transient val err: TalerErrorCode = TalerErrorCode.END,
+ val code: Int,
+ val hint: String? = null,
+ val detail: String? = null
+)
+
+private val LOG_MSG = AttributeKey<String>("log_msg")
+
+fun ApplicationCall.logMsg(): String? = attributes.getOrNull(LOG_MSG)
+
+suspend fun ApplicationCall.err(
+ status: HttpStatusCode,
+ hint: String?,
+ error: TalerErrorCode
+) {
+ err(
+ ApiException(
+ httpStatus = status, talerError = TalerError(
+ code = error.code, err = error, hint = hint
+ )
+ )
+ )
+}
+
+suspend fun ApplicationCall.err(
+ err: ApiException
+) {
+ attributes.put(LOG_MSG, "${err.talerError.err.name} ${err.talerError.hint}")
+ respond(
+ status = err.httpStatus,
+ message = err.talerError
+ )
+}
+
+
+fun apiError(
+ status: HttpStatusCode,
+ hint: String?,
+ error: TalerErrorCode,
+ detail: String? = null
+): ApiException = ApiException(
+ httpStatus = status, talerError = TalerError(
+ code = error.code, err = error, hint = hint, detail = detail
+ )
+)
+
+/* ----- HTTP error ----- */
+
+fun forbidden(
+ hint: String = "No rights on the resource",
+ error: TalerErrorCode = TalerErrorCode.END
+): ApiException = apiError(HttpStatusCode.Forbidden, hint, error)
+
+fun unauthorized(
+ hint: String,
+ error: TalerErrorCode = TalerErrorCode.GENERIC_UNAUTHORIZED
+): ApiException = apiError(HttpStatusCode.Unauthorized, hint, error)
+
+fun internalServerError(hint: String?): ApiException
+ = apiError(HttpStatusCode.InternalServerError, hint, TalerErrorCode.GENERIC_INTERNAL_INVARIANT_FAILURE)
+
+fun notFound(
+ hint: String,
+ error: TalerErrorCode
+): ApiException = apiError(HttpStatusCode.NotFound, hint, error)
+
+fun conflict(
+ hint: String, error: TalerErrorCode
+): ApiException = apiError(HttpStatusCode.Conflict, hint, error)
+
+fun badRequest(
+ hint: String? = null,
+ error: TalerErrorCode = TalerErrorCode.GENERIC_JSON_INVALID,
+ detail: String? = null
+): ApiException = apiError(HttpStatusCode.BadRequest, hint, error, detail)
+
+fun unsupportedMediaType(
+ hint: String,
+ error: TalerErrorCode = TalerErrorCode.END,
+): ApiException = apiError(HttpStatusCode.UnsupportedMediaType, hint, error) \ No newline at end of file
diff --git a/common/src/main/kotlin/Constants.kt b/common/src/main/kotlin/Constants.kt
index 7cc6ab0b..57f378c0 100644
--- a/common/src/main/kotlin/Constants.kt
+++ b/common/src/main/kotlin/Constants.kt
@@ -20,4 +20,7 @@ package tech.libeufin.common
// DB
const val MIN_VERSION: Int = 14
-const val SERIALIZATION_RETRY: Int = 10 \ No newline at end of file
+const val SERIALIZATION_RETRY: Int = 10
+
+// Security
+const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB \ No newline at end of file
diff --git a/common/src/main/kotlin/TalerCommon.kt b/common/src/main/kotlin/TalerCommon.kt
index 7d561c5c..cec5d9dc 100644
--- a/common/src/main/kotlin/TalerCommon.kt
+++ b/common/src/main/kotlin/TalerCommon.kt
@@ -27,6 +27,7 @@ import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
+import tech.libeufin.common.*
import java.net.URI
sealed class CommonError(msg: String): Exception(msg) {
diff --git a/common/src/main/kotlin/TalerMessage.kt b/common/src/main/kotlin/TalerMessage.kt
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/common/src/main/kotlin/TalerMessage.kt
diff --git a/common/src/main/kotlin/api/server.kt b/common/src/main/kotlin/api/server.kt
new file mode 100644
index 00000000..ba6f2f61
--- /dev/null
+++ b/common/src/main/kotlin/api/server.kt
@@ -0,0 +1,225 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.common.api
+
+import io.ktor.http.*
+import io.ktor.serialization.kotlinx.json.*
+import io.ktor.server.application.*
+import io.ktor.server.engine.*
+import io.ktor.server.http.content.*
+import io.ktor.server.netty.*
+import io.ktor.server.plugins.*
+import io.ktor.server.plugins.callloging.*
+import io.ktor.server.plugins.contentnegotiation.*
+import io.ktor.server.plugins.cors.routing.*
+import io.ktor.server.plugins.forwardedheaders.*
+import io.ktor.server.plugins.statuspages.*
+import io.ktor.server.request.*
+import io.ktor.server.response.*
+import io.ktor.server.routing.*
+import io.ktor.utils.io.*
+import kotlinx.serialization.ExperimentalSerializationApi
+import kotlinx.serialization.json.Json
+import org.postgresql.util.PSQLState
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.slf4j.event.Level
+import tech.libeufin.common.*
+import tech.libeufin.common.db.dbInit
+import tech.libeufin.common.db.pgDataSource
+import java.net.InetAddress
+import java.sql.SQLException
+import java.time.Instant
+import java.util.zip.DataFormatException
+import java.util.zip.Inflater
+import kotlin.io.path.Path
+import kotlin.io.path.exists
+import kotlin.io.path.readText
+
+/**
+ * This plugin checks for body length limit and inflates the requests that have "Content-Encoding: deflate"
+ */
+fun bodyLimitPlugin(logger: Logger): ApplicationPlugin<Unit> {
+ return createApplicationPlugin("BodyLimitAndDecompression") {
+ onCallReceive { call ->
+ // TODO check content length as an optimisation
+ transformBody { body ->
+ val bytes = ByteArray(MAX_BODY_LENGTH.toInt() + 1)
+ var read = 0
+ when (val encoding = call.request.headers[HttpHeaders.ContentEncoding]) {
+ "deflate" -> {
+ // Decompress and check decompressed length
+ val inflater = Inflater()
+ while (!body.isClosedForRead) {
+ body.read { buf ->
+ inflater.setInput(buf)
+ try {
+ read += inflater.inflate(bytes, read, bytes.size - read)
+ } catch (e: DataFormatException) {
+ logger.error("Deflated request failed to inflate: ${e.message}")
+ throw badRequest(
+ "Could not inflate request",
+ TalerErrorCode.GENERIC_COMPRESSION_INVALID
+ )
+ }
+ }
+ if (read > MAX_BODY_LENGTH)
+ throw badRequest("Decompressed body is suspiciously big > $MAX_BODY_LENGTH B")
+ }
+ }
+ null -> {
+ // Check body length
+ while (true) {
+ val new = body.readAvailable(bytes, read, bytes.size - read)
+ if (new == -1) break // Channel is closed
+ read += new
+ if (read > MAX_BODY_LENGTH)
+ throw badRequest("Body is suspiciously big > $MAX_BODY_LENGTH B")
+ }
+ }
+ else -> throw unsupportedMediaType(
+ "Content encoding '$encoding' not supported, expected plain or deflate",
+ TalerErrorCode.GENERIC_COMPRESSION_INVALID
+ )
+ }
+ ByteReadChannel(bytes, 0, read)
+ }
+ }
+ }
+}
+
+/** Set up web server handlers for a Taler API */
+fun Application.talerApi(logger: Logger, routes: Routing.() -> Unit) {
+ install(CallLogging) {
+ level = Level.INFO
+ this.logger = logger
+ format { call ->
+ val status = call.response.status()
+ val httpMethod = call.request.httpMethod.value
+ val path = call.request.path()
+ val msg = call.logMsg()
+ if (msg != null) {
+ "$status, $httpMethod $path, $msg"
+ } else {
+ "$status, $httpMethod $path"
+ }
+ }
+ }
+ install(XForwardedHeaders)
+ install(CORS) {
+ anyHost()
+ allowHeader(HttpHeaders.Authorization)
+ allowHeader(HttpHeaders.ContentType)
+ allowMethod(HttpMethod.Options)
+ allowMethod(HttpMethod.Patch)
+ allowMethod(HttpMethod.Delete)
+ allowCredentials = true
+ }
+ install(bodyLimitPlugin(logger))
+ install(IgnoreTrailingSlash)
+ install(ContentNegotiation) {
+ json(Json {
+ @OptIn(ExperimentalSerializationApi::class)
+ explicitNulls = false
+ encodeDefaults = true
+ ignoreUnknownKeys = true
+ })
+ }
+ install(StatusPages) {
+ status(HttpStatusCode.NotFound) { call, status ->
+ call.err(
+ status,
+ "There is no endpoint defined for the URL provided by the client. Check if you used the correct URL and/or file a report with the developers of the client software.",
+ TalerErrorCode.GENERIC_ENDPOINT_UNKNOWN
+ )
+ }
+ status(HttpStatusCode.MethodNotAllowed) { call, status ->
+ call.err(
+ status,
+ "The HTTP method used is invalid for this endpoint. This is likely a bug in the client implementation. Check if you are using the latest available version and/or file a report with the developers.",
+ TalerErrorCode.GENERIC_METHOD_INVALID
+ )
+ }
+ exception<Exception> { call, cause ->
+ logger.debug("request failed", cause)
+ // TODO nexus specific error code ?!
+ when (cause) {
+ is ApiException -> call.err(cause)
+ is SQLException -> {
+ when (cause.sqlState) {
+ PSQLState.SERIALIZATION_FAILURE.state -> call.err(
+ HttpStatusCode.InternalServerError,
+ "Transaction serialization failure",
+ TalerErrorCode.BANK_SOFT_EXCEPTION
+ )
+ else -> call.err(
+ HttpStatusCode.InternalServerError,
+ "Unexpected sql error with state ${cause.sqlState}",
+ TalerErrorCode.BANK_UNMANAGED_EXCEPTION
+ )
+ }
+ }
+ is BadRequestException -> {
+ /**
+ * NOTE: extracting the root cause helps with JSON error messages,
+ * because they mention the particular way they are invalid, but OTOH
+ * it loses (by getting null) other error messages, like for example
+ * the one from MissingRequestParameterException. Therefore, in order
+ * to get the most detailed message, we must consider BOTH sides:
+ * the 'cause' AND its root cause!
+ */
+ var rootCause: Throwable? = cause.cause
+ while (rootCause?.cause != null)
+ rootCause = rootCause.cause
+ // Telling apart invalid JSON vs missing parameter vs invalid parameter.
+ val talerErrorCode = when {
+ cause is MissingRequestParameterException ->
+ TalerErrorCode.GENERIC_PARAMETER_MISSING
+ cause is ParameterConversionException ->
+ TalerErrorCode.GENERIC_PARAMETER_MALFORMED
+ rootCause is CommonError -> when (rootCause) {
+ is CommonError.AmountFormat -> TalerErrorCode.BANK_BAD_FORMAT_AMOUNT
+ is CommonError.AmountNumberTooBig -> TalerErrorCode.BANK_NUMBER_TOO_BIG
+ is CommonError.Payto -> TalerErrorCode.GENERIC_JSON_INVALID
+ }
+ else -> TalerErrorCode.GENERIC_JSON_INVALID
+ }
+ call.err(
+ badRequest(
+ cause.message,
+ talerErrorCode,
+ /* Here getting _some_ error message, by giving precedence
+ * to the root cause, as otherwise JSON details would be lost. */
+ rootCause?.message
+ )
+ )
+ }
+ else -> {
+ call.err(
+ HttpStatusCode.InternalServerError,
+ cause.message,
+ TalerErrorCode.BANK_UNMANAGED_EXCEPTION
+ )
+ }
+ }
+ }
+ }
+ routing { routes() }
+} \ No newline at end of file
diff --git a/common/src/main/kotlin/db/helpers.kt b/common/src/main/kotlin/db/helpers.kt
new file mode 100644
index 00000000..13e0ace0
--- /dev/null
+++ b/common/src/main/kotlin/db/helpers.kt
@@ -0,0 +1,113 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+ *
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+ *
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.common.db
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.*
+import tech.libeufin.common.db.DbPool
+import tech.libeufin.common.PageParams
+import tech.libeufin.common.HistoryParams
+import org.postgresql.jdbc.PgConnection
+import org.postgresql.util.PSQLState
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import java.sql.PreparedStatement
+import java.sql.ResultSet
+import java.sql.SQLException
+import kotlin.math.abs
+
+/** Apply paging logic to a sql query */
+suspend fun <T> DbPool.page(
+ params: PageParams,
+ idName: String,
+ query: String,
+ bind: PreparedStatement.() -> Int = { 0 },
+ map: (ResultSet) -> T
+): List<T> = conn { conn ->
+ val backward = params.delta < 0
+ val pageQuery = """
+ $query
+ $idName ${if (backward) '<' else '>'} ?
+ ORDER BY $idName ${if (backward) "DESC" else "ASC"}
+ LIMIT ?
+ """
+ conn.prepareStatement(pageQuery).run {
+ val pad = bind()
+ setLong(pad + 1, params.start)
+ setInt(pad + 2, abs(params.delta))
+ all { map(it) }
+ }
+}
+
+/**
+* The following function returns the list of transactions, according
+* to the history parameters and perform long polling when necessary
+*/
+suspend fun <T> DbPool.poolHistory(
+ params: HistoryParams,
+ bankAccountId: Long,
+ listen: suspend (Long, suspend (Flow<Long>) -> List<T>) -> List<T>,
+ query: String,
+ accountColumn: String = "bank_account_id",
+ map: (ResultSet) -> T
+): List<T> {
+
+ suspend fun load(): List<T> = page(
+ params.page,
+ "bank_transaction_id",
+ "$query $accountColumn=? AND",
+ {
+ setLong(1, bankAccountId)
+ 1
+ },
+ map
+ )
+
+
+ // TODO do we want to handle polling when going backward and there is no transactions yet ?
+ // When going backward there is always at least one transaction or none
+ return if (params.page.delta >= 0 && params.polling.poll_ms > 0) {
+ listen(bankAccountId) { flow ->
+ coroutineScope {
+ // Start buffering notification before loading transactions to not miss any
+ val polling = launch {
+ withTimeoutOrNull(params.polling.poll_ms) {
+ flow.first { it > params.page.start } // Always forward so >
+ }
+ }
+ // Initial loading
+ val init = load()
+ // Long polling if we found no transactions
+ if (init.isEmpty()) {
+ if (polling.join() != null) {
+ load()
+ } else {
+ init
+ }
+ } else {
+ polling.cancel()
+ init
+ }
+ }
+ }
+ } else {
+ load()
+ }
+} \ No newline at end of file
diff --git a/common/src/main/kotlin/db/notifications.kt b/common/src/main/kotlin/db/notifications.kt
new file mode 100644
index 00000000..3f2fb753
--- /dev/null
+++ b/common/src/main/kotlin/db/notifications.kt
@@ -0,0 +1,91 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+ *
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+ *
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.common.db
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.postgresql.ds.PGSimpleDataSource
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+
+// SharedFlow that are manually counted for manual garbage collection
+class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int)
+
+fun watchNotifications(
+ pgSource: PGSimpleDataSource,
+ schema: String,
+ logger: Logger,
+ listeners: Map<String, (suspend (String) -> Unit)>
+) {
+ val backoff = ExpoBackoffDecorr()
+ // Run notification logic in a separated thread
+ kotlin.concurrent.thread(isDaemon = true) {
+ runBlocking {
+ while (true) {
+ try {
+ val conn = pgSource.pgConnection(schema)
+
+ // Listen to all notifications channels
+ for (channel in listeners.keys) {
+ conn.execSQLUpdate("LISTEN $channel")
+ }
+
+ backoff.reset()
+
+ while (true) {
+ conn.getNotifications(0) // Block until we receive at least one notification
+ .forEach {
+ // Dispatch
+ listeners[it.name]!!(it.parameter)
+ }
+ }
+ } catch (e: Exception) {
+ e.fmtLog(logger)
+ delay(backoff.next())
+ }
+ }
+ }
+ }
+}
+
+/** Listen to flow from [map] for [key] using [lambda]*/
+suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R {
+ // Register listener, create a new flow if missing
+ val flow = map.compute(key) { _, v ->
+ val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0)
+ tmp.count++
+ tmp
+ }!!.flow
+
+ try {
+ return lambda(flow)
+ } finally {
+ // Unregister listener, removing unused flow
+ map.compute(key) { _, v ->
+ v!!
+ v.count--
+ if (v.count > 0) v else null
+ }
+ }
+} \ No newline at end of file
diff --git a/common/src/main/kotlin/db/utils.kt b/common/src/main/kotlin/db/transaction.kt
index f06c4e4e..f06c4e4e 100644
--- a/common/src/main/kotlin/db/utils.kt
+++ b/common/src/main/kotlin/db/transaction.kt
diff --git a/common/src/main/kotlin/params.kt b/common/src/main/kotlin/params.kt
new file mode 100644
index 00000000..a24bd379
--- /dev/null
+++ b/common/src/main/kotlin/params.kt
@@ -0,0 +1,95 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.common
+
+import io.ktor.http.*
+import tech.libeufin.common.TalerAmount
+import tech.libeufin.common.TalerErrorCode
+import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.temporal.TemporalAdjusters
+import java.util.*
+
+fun Parameters.expect(name: String): String
+ = get(name) ?: throw badRequest("Missing '$name' parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
+fun Parameters.int(name: String): Int?
+ = get(name)?.run { toIntOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) }
+fun Parameters.expectInt(name: String): Int
+ = int(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
+fun Parameters.long(name: String): Long?
+ = get(name)?.run { toLongOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) }
+fun Parameters.expectLong(name: String): Long
+ = long(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
+fun Parameters.uuid(name: String): UUID? {
+ return get(name)?.run {
+ try {
+ UUID.fromString(this)
+ } catch (e: Exception) {
+ throw badRequest("Param '$name' not an UUID", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ }
+ }
+}
+fun Parameters.expectUuid(name: String): UUID
+ = uuid(name) ?: throw badRequest("Missing '$name' UUID parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING)
+fun Parameters.amount(name: String): TalerAmount?
+ = get(name)?.run {
+ try {
+ TalerAmount(this)
+ } catch (e: Exception) {
+ throw badRequest("Param '$name' not a taler amount", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ }
+ }
+
+data class PageParams(
+ val delta: Int, val start: Long
+) {
+ companion object {
+ fun extract(params: Parameters): PageParams {
+ val delta: Int = params.int("delta") ?: -20
+ val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE
+ if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ // TODO enforce delta limit
+ return PageParams(delta, start)
+ }
+ }
+}
+
+data class PollingParams(
+ val poll_ms: Long
+) {
+ companion object {
+ fun extract(params: Parameters): PollingParams {
+ val poll_ms: Long = params.long("long_poll_ms") ?: 0
+ if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ return PollingParams(poll_ms)
+ }
+ }
+}
+
+data class HistoryParams(
+ val page: PageParams, val polling: PollingParams
+) {
+ companion object {
+ fun extract(params: Parameters): HistoryParams {
+ return HistoryParams(PageParams.extract(params), PollingParams.extract(params))
+ }
+ }
+} \ No newline at end of file