diff options
author | Antoine A <> | 2024-04-08 11:39:53 +0200 |
---|---|---|
committer | Antoine A <> | 2024-04-09 15:46:02 +0200 |
commit | ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d (patch) | |
tree | d8feeb4cd4a3dc60d7411370a01a6b9e33d5fc61 | |
parent | fc72eb5f8b3e82bca93b90c292848e5b731cee9d (diff) | |
download | libeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.tar.gz libeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.tar.bz2 libeufin-ed18c6cb21f98a71c9cec2bd1cc2b84ae1520b7d.zip |
Move more API and DB logic in common in preparation for libeufin-nexus REST API
31 files changed, 776 insertions, 647 deletions
diff --git a/bank/build.gradle b/bank/build.gradle index 9070480a..01b884ab 100644 --- a/bank/build.gradle +++ b/bank/build.gradle @@ -27,13 +27,8 @@ dependencies { implementation("com.github.ajalt.clikt:clikt:$clikt_version") implementation("io.ktor:ktor-server-core:$ktor_version") - implementation("io.ktor:ktor-server-call-logging:$ktor_version") - implementation("io.ktor:ktor-server-cors:$ktor_version") - implementation("io.ktor:ktor-server-content-negotiation:$ktor_version") - implementation("io.ktor:ktor-server-status-pages:$ktor_version") implementation("io.ktor:ktor-server-netty:$ktor_version") implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version") - implementation("io.ktor:ktor-server-forwarded-header:$ktor_version") // UNIX domain sockets support (used to connect to PostgreSQL) implementation("com.kohlschutter.junixsocket:junixsocket-core:$junixsocket_version") diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt index 4bcd2ee1..766aa5f6 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt @@ -36,9 +36,6 @@ val TOKEN_DEFAULT_DURATION: Duration = Duration.ofDays(1L) val RESERVED_ACCOUNTS = setOf("admin", "bank") const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5 -// Security -const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB - // API version const val COREBANK_API_VERSION: String = "4:7:0" const val CONVERSION_API_VERSION: String = "0:0:0" diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Error.kt b/bank/src/main/kotlin/tech/libeufin/bank/Error.kt index 18735633..9c5a115d 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Error.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Error.kt @@ -23,110 +23,7 @@ import io.ktor.server.application.* import io.ktor.server.response.* import io.ktor.util.* import kotlinx.serialization.Serializable -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.TalerErrorCode - -/** - * Convenience type to throw errors along the bank activity - * and that is meant to be caught by Ktor and responded to the - * client. - */ -class LibeufinException( - // Status code that Ktor will set for the response. - val httpStatus: HttpStatusCode, - // Error detail object, after Taler API. - val talerError: TalerError -) : Exception(talerError.hint) - -/** - * Error object to respond to the client. The - * 'code' field takes values from the GANA gnu-taler-error-code - * specification. 'hint' is a human-readable description - * of the error. - */ -@Serializable -data class TalerError( - @kotlinx.serialization.Transient val err: TalerErrorCode = TalerErrorCode.END, - val code: Int, - val hint: String? = null, - val detail: String? = null -) - -private val LOG_MSG = AttributeKey<String>("log_msg") - -fun ApplicationCall.logMsg(): String? = attributes.getOrNull(LOG_MSG) - -suspend fun ApplicationCall.err( - status: HttpStatusCode, - hint: String?, - error: TalerErrorCode -) { - err( - LibeufinException( - httpStatus = status, talerError = TalerError( - code = error.code, err = error, hint = hint - ) - ) - ) -} - -suspend fun ApplicationCall.err( - err: LibeufinException -) { - attributes.put(LOG_MSG, "${err.talerError.err.name} ${err.talerError.hint}") - respond( - status = err.httpStatus, - message = err.talerError - ) -} - - -fun libeufinError( - status: HttpStatusCode, - hint: String?, - error: TalerErrorCode, - detail: String? = null -): LibeufinException = LibeufinException( - httpStatus = status, talerError = TalerError( - code = error.code, err = error, hint = hint, detail = detail - ) -) - -/* ----- HTTP error ----- */ - -fun forbidden( - hint: String = "No rights on the resource", - error: TalerErrorCode = TalerErrorCode.END -): LibeufinException = libeufinError(HttpStatusCode.Forbidden, hint, error) - -fun unauthorized( - hint: String, - error: TalerErrorCode = TalerErrorCode.GENERIC_UNAUTHORIZED -): LibeufinException = libeufinError(HttpStatusCode.Unauthorized, hint, error) - -fun internalServerError(hint: String?): LibeufinException - = libeufinError(HttpStatusCode.InternalServerError, hint, TalerErrorCode.GENERIC_INTERNAL_INVARIANT_FAILURE) - -fun notFound( - hint: String, - error: TalerErrorCode -): LibeufinException = libeufinError(HttpStatusCode.NotFound, hint, error) - -fun conflict( - hint: String, error: TalerErrorCode -): LibeufinException = libeufinError(HttpStatusCode.Conflict, hint, error) - -fun badRequest( - hint: String? = null, - error: TalerErrorCode = TalerErrorCode.GENERIC_JSON_INVALID, - detail: String? = null -): LibeufinException = libeufinError(HttpStatusCode.BadRequest, hint, error, detail) - -fun unsupportedMediaType( - hint: String, - error: TalerErrorCode = TalerErrorCode.END, -): LibeufinException = libeufinError(HttpStatusCode.UnsupportedMediaType, hint, error) - +import tech.libeufin.common.* /* ----- Currency checks ----- */ @@ -146,21 +43,21 @@ fun BankConfig.checkFiatCurrency(amount: TalerAmount) { /* ----- Common errors ----- */ -fun unknownAccount(id: String): LibeufinException { +fun unknownAccount(id: String): ApiException { return notFound( "Account '$id' not found", TalerErrorCode.BANK_UNKNOWN_ACCOUNT ) } -fun unknownCreditorAccount(id: String): LibeufinException { +fun unknownCreditorAccount(id: String): ApiException { return conflict( "Creditor account '$id' not found", TalerErrorCode.BANK_UNKNOWN_CREDITOR ) } -fun unsupportedTanChannel(channel: TanChannel): LibeufinException { +fun unsupportedTanChannel(channel: TanChannel): ApiException { return conflict( "Unsupported tan channel $channel", TalerErrorCode.BANK_TAN_CHANNEL_NOT_SUPPORTED diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt index 1e28af34..8733b907 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt @@ -35,12 +35,6 @@ import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.http.content.* import io.ktor.server.netty.* -import io.ktor.server.plugins.* -import io.ktor.server.plugins.callloging.* -import io.ktor.server.plugins.contentnegotiation.* -import io.ktor.server.plugins.cors.routing.* -import io.ktor.server.plugins.forwardedheaders.* -import io.ktor.server.plugins.statuspages.* import io.ktor.server.request.* import io.ktor.server.response.* import io.ktor.server.routing.* @@ -55,6 +49,7 @@ import tech.libeufin.bank.api.* import tech.libeufin.bank.db.AccountDAO.* import tech.libeufin.bank.db.Database import tech.libeufin.common.* +import tech.libeufin.common.api.* import tech.libeufin.common.db.dbInit import tech.libeufin.common.db.pgDataSource import java.net.InetAddress @@ -68,187 +63,23 @@ import kotlin.io.path.readText private val logger: Logger = LoggerFactory.getLogger("libeufin-bank") // Dirty local variable to stop the server in test TODO remove this ugly hack -var engine: ApplicationEngine? = null +var engine: ApplicationEngine? = null -/** - * This plugin checks for body length limit and inflates the requests that have "Content-Encoding: deflate" - */ -val bodyPlugin = createApplicationPlugin("BodyLimitAndDecompression") { - onCallReceive { call -> - // TODO check content length as an optimisation - transformBody { body -> - val bytes = ByteArray(MAX_BODY_LENGTH.toInt() + 1) - var read = 0 - when (val encoding = call.request.headers[HttpHeaders.ContentEncoding]) { - "deflate" -> { - // Decompress and check decompressed length - val inflater = Inflater() - while (!body.isClosedForRead) { - body.read { buf -> - inflater.setInput(buf) - try { - read += inflater.inflate(bytes, read, bytes.size - read) - } catch (e: DataFormatException) { - logger.error("Deflated request failed to inflate: ${e.message}") - throw badRequest( - "Could not inflate request", - TalerErrorCode.GENERIC_COMPRESSION_INVALID - ) - } - } - if (read > MAX_BODY_LENGTH) - throw badRequest("Decompressed body is suspiciously big > $MAX_BODY_LENGTH B") - } - } - null -> { - // Check body length - while (true) { - val new = body.readAvailable(bytes, read, bytes.size - read) - if (new == -1) break // Channel is closed - read += new - if (read > MAX_BODY_LENGTH) - throw badRequest("Body is suspiciously big > $MAX_BODY_LENGTH B") - } - } - else -> throw unsupportedMediaType( - "Content encoding '$encoding' not supported, expected plain or deflate", - TalerErrorCode.GENERIC_COMPRESSION_INVALID - ) - } - ByteReadChannel(bytes, 0, read) - } - } -} /** * Set up web server handlers for the Taler corebank API. */ -fun Application.corebankWebApp(db: Database, ctx: BankConfig) { - install(CallLogging) { - this.level = Level.INFO - this.logger = tech.libeufin.bank.logger - this.format { call -> - val status = call.response.status() - val httpMethod = call.request.httpMethod.value - val path = call.request.path() - val msg = call.logMsg() - if (msg != null) { - "$status, $httpMethod $path, $msg" - } else { - "$status, $httpMethod $path" - } - } - } - install(XForwardedHeaders) - install(CORS) { - anyHost() - allowHeader(HttpHeaders.Authorization) - allowHeader(HttpHeaders.ContentType) - allowMethod(HttpMethod.Options) - allowMethod(HttpMethod.Patch) - allowMethod(HttpMethod.Delete) - allowCredentials = true - } - install(bodyPlugin) - install(IgnoreTrailingSlash) - install(ContentNegotiation) { - json(Json { - @OptIn(ExperimentalSerializationApi::class) - explicitNulls = false - encodeDefaults = true - ignoreUnknownKeys = true - }) - } - install(StatusPages) { - status(HttpStatusCode.NotFound) { call, status -> - call.err( - status, - "There is no endpoint defined for the URL provided by the client. Check if you used the correct URL and/or file a report with the developers of the client software.", - TalerErrorCode.GENERIC_ENDPOINT_UNKNOWN - ) - } - status(HttpStatusCode.MethodNotAllowed) { call, status -> - call.err( - status, - "The HTTP method used is invalid for this endpoint. This is likely a bug in the client implementation. Check if you are using the latest available version and/or file a report with the developers.", - TalerErrorCode.GENERIC_METHOD_INVALID - ) - } - exception<Exception> { call, cause -> - logger.debug("request failed", cause) - when (cause) { - is LibeufinException -> call.err(cause) - is SQLException -> { - when (cause.sqlState) { - PSQLState.SERIALIZATION_FAILURE.state -> call.err( - HttpStatusCode.InternalServerError, - "Transaction serialization failure", - TalerErrorCode.BANK_SOFT_EXCEPTION - ) - else -> call.err( - HttpStatusCode.InternalServerError, - "Unexpected sql error with state ${cause.sqlState}", - TalerErrorCode.BANK_UNMANAGED_EXCEPTION - ) - } - } - is BadRequestException -> { - /** - * NOTE: extracting the root cause helps with JSON error messages, - * because they mention the particular way they are invalid, but OTOH - * it loses (by getting null) other error messages, like for example - * the one from MissingRequestParameterException. Therefore, in order - * to get the most detailed message, we must consider BOTH sides: - * the 'cause' AND its root cause! - */ - var rootCause: Throwable? = cause.cause - while (rootCause?.cause != null) - rootCause = rootCause.cause - // Telling apart invalid JSON vs missing parameter vs invalid parameter. - val talerErrorCode = when { - cause is MissingRequestParameterException -> - TalerErrorCode.GENERIC_PARAMETER_MISSING - cause is ParameterConversionException -> - TalerErrorCode.GENERIC_PARAMETER_MALFORMED - rootCause is CommonError -> when (rootCause) { - is CommonError.AmountFormat -> TalerErrorCode.BANK_BAD_FORMAT_AMOUNT - is CommonError.AmountNumberTooBig -> TalerErrorCode.BANK_NUMBER_TOO_BIG - is CommonError.Payto -> TalerErrorCode.GENERIC_JSON_INVALID - } - else -> TalerErrorCode.GENERIC_JSON_INVALID - } - call.err( - badRequest( - cause.message, - talerErrorCode, - /* Here getting _some_ error message, by giving precedence - * to the root cause, as otherwise JSON details would be lost. */ - rootCause?.message - ) - ) - } - else -> { - call.err( - HttpStatusCode.InternalServerError, - cause.message, - TalerErrorCode.BANK_UNMANAGED_EXCEPTION - ) - } - } - } - } - routing { - coreBankApi(db, ctx) - conversionApi(db, ctx) - bankIntegrationApi(db, ctx) - wireGatewayApi(db, ctx) - revenueApi(db, ctx) - ctx.spaPath?.let { - get("/") { - call.respondRedirect("/webui/") - } - staticFiles("/webui/", it.toFile()) +fun Application.corebankWebApp(db: Database, ctx: BankConfig) = talerApi(logger) { + coreBankApi(db, ctx) + conversionApi(db, ctx) + bankIntegrationApi(db, ctx) + wireGatewayApi(db, ctx) + revenueApi(db, ctx) + ctx.spaPath?.let { + get("/") { + call.respondRedirect("/webui/") } + staticFiles("/webui/", it.toFile()) } } diff --git a/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt b/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt index bb148166..41b57d1d 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/TalerCommon.kt @@ -30,7 +30,7 @@ import kotlinx.serialization.json.JsonDecoder import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.jsonPrimitive import kotlinx.serialization.json.longOrNull -import tech.libeufin.common.TalerAmount +import tech.libeufin.common.* import java.net.URL import java.time.Duration import java.time.Instant diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt index 05505214..b88d8fa1 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/api/BankIntegrationApi.kt @@ -30,7 +30,7 @@ import tech.libeufin.bank.* import tech.libeufin.bank.db.AbortResult import tech.libeufin.bank.db.Database import tech.libeufin.bank.db.WithdrawalDAO.WithdrawalSelectionResult -import tech.libeufin.common.TalerErrorCode +import tech.libeufin.common.* fun Routing.bankIntegrationApi(db: Database, ctx: BankConfig) { get("/taler-integration/config") { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt index 8dcc1f60..2a466db6 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/api/ConversionApi.kt @@ -28,14 +28,13 @@ import tech.libeufin.bank.auth.authAdmin import tech.libeufin.bank.db.ConversionDAO import tech.libeufin.bank.db.ConversionDAO.ConversionResult import tech.libeufin.bank.db.Database -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.TalerErrorCode +import tech.libeufin.common.* fun Routing.conversionApi(db: Database, ctx: BankConfig) = conditional(ctx.allowConversion) { get("/conversion-info/config") { val config = db.conversion.getConfig(ctx.regionalCurrency, ctx.fiatCurrency!!) if (config == null) { - throw libeufinError( + throw apiError( HttpStatusCode.NotImplemented, "conversion rate not configured yet", TalerErrorCode.END @@ -62,7 +61,7 @@ fun Routing.conversionApi(db: Database, ctx: BankConfig) = conditional(ctx.allow "$input is too small to be converted", TalerErrorCode.BANK_BAD_CONVERSION ) - is ConversionResult.MissingConfig -> throw libeufinError( + is ConversionResult.MissingConfig -> throw apiError( HttpStatusCode.NotImplemented, "conversion rate not configured yet", TalerErrorCode.END diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt index 1ef1ace5..c659b0f0 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt @@ -699,7 +699,7 @@ private fun Routing.coreBankTanApi(db: Database, ctx: BankConfig) { exitValue } if (exitValue != 0) { - throw libeufinError( + throw apiError( HttpStatusCode.BadGateway, "Tan channel script failure with exit value $exitValue", TalerErrorCode.BANK_TAN_CHANNEL_SCRIPT_FAILED @@ -733,7 +733,7 @@ private fun Routing.coreBankTanApi(db: Database, ctx: BankConfig) { "Incorrect TAN code", TalerErrorCode.BANK_TAN_CHALLENGE_FAILED ) - TanSolveResult.NoRetry -> throw libeufinError( + TanSolveResult.NoRetry -> throw apiError( HttpStatusCode.TooManyRequests, "Too many failed confirmation attempt", TalerErrorCode.BANK_TAN_RATE_LIMITED diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt index f91054ce..b46cf875 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/api/RevenueApi.kt @@ -25,6 +25,7 @@ import io.ktor.server.routing.* import tech.libeufin.bank.* import tech.libeufin.bank.auth.auth import tech.libeufin.bank.db.Database +import tech.libeufin.common.* fun Routing.revenueApi(db: Database, ctx: BankConfig) { auth(db, TokenScope.readonly) { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt index 5b10f7d1..8a35de4c 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/api/WireGatewayApi.kt @@ -35,8 +35,7 @@ import tech.libeufin.bank.db.Database import tech.libeufin.bank.db.ExchangeDAO import tech.libeufin.bank.db.ExchangeDAO.AddIncomingResult import tech.libeufin.bank.db.ExchangeDAO.TransferResult -import tech.libeufin.common.BankPaytoCtx -import tech.libeufin.common.TalerErrorCode +import tech.libeufin.common.* import java.time.Instant diff --git a/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt b/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt index 83bde34f..c0ed1100 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt @@ -27,11 +27,8 @@ import io.ktor.util.* import io.ktor.util.pipeline.* import tech.libeufin.bank.* import tech.libeufin.bank.db.Database -import tech.libeufin.common.Base32Crockford -import tech.libeufin.common.TalerErrorCode +import tech.libeufin.common.* import tech.libeufin.common.crypto.PwCrypto -import tech.libeufin.common.decodeBase64 -import tech.libeufin.common.splitOnce import java.time.Instant /** Used to store if the currently authenticated user is admin */ diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt index c4f25657..799d466a 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt @@ -20,12 +20,8 @@ package tech.libeufin.bank.db import tech.libeufin.bank.* -import tech.libeufin.common.ShortHashCode -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.asInstant -import tech.libeufin.common.db.getAmount -import tech.libeufin.common.db.oneOrNull -import tech.libeufin.common.micros +import tech.libeufin.common.* +import tech.libeufin.common.db.* import java.time.Instant /** Data access logic for cashout operations */ diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt index 5849881d..04317b68 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt @@ -22,11 +22,8 @@ package tech.libeufin.bank.db import tech.libeufin.bank.ConversionRate import tech.libeufin.bank.DecimalNumber import tech.libeufin.bank.RoundingMode -import tech.libeufin.bank.internalServerError -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.getAmount -import tech.libeufin.common.db.oneOrNull -import tech.libeufin.common.db.transaction +import tech.libeufin.common.* +import tech.libeufin.common.db.* /** Data access logic for conversion */ class ConversionDAO(private val db: Database) { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt index b6e6fd7e..9efb821a 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -27,17 +27,17 @@ import kotlinx.coroutines.withTimeoutOrNull import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.libeufin.bank.* -import tech.libeufin.common.asInstant +import tech.libeufin.common.* import tech.libeufin.common.db.* import java.sql.PreparedStatement import java.sql.ResultSet -import kotlin.math.abs +import java.util.concurrent.ConcurrentHashMap +import java.util.* private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db") class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, internal val fiatCurrency: String?): DbPool(dbConfig, "libeufin-bank") { - internal val notifWatcher: NotificationWatcher = NotificationWatcher(pgSource) - + // DAOs val cashout = CashoutDAO(this) val withdrawal = WithdrawalDAO(this) val exchange = ExchangeDAO(this) @@ -48,6 +48,67 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte val tan = TanDAO(this) val gc = GcDAO(this) + // Transaction flows, the keys are the bank account id + private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() + private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() + private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() + private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() + // Withdrawal confirmation flow, the key is the public withdrawal UUID + private val withdrawalFlow = ConcurrentHashMap<UUID, CountedSharedFlow<WithdrawalStatus>>() + + init { + watchNotifications(pgSource, "libeufin_bank", LoggerFactory.getLogger("libeufin-bank-db-watcher"), mapOf( + "bank_tx" to { + val (debtor, creditor, debitRow, creditRow) = it.split(' ', limit = 4).map { it.toLong() } + bankTxFlows[debtor]?.run { + flow.emit(debitRow) + } + bankTxFlows[creditor]?.run { + flow.emit(creditRow) + } + revenueTxFlows[creditor]?.run { + flow.emit(creditRow) + } + }, + "outgoing_tx" to { + val (account, merchant, debitRow, creditRow) = it.split(' ', limit = 4).map { it.toLong() } + outgoingTxFlows[account]?.run { + flow.emit(debitRow) + } + }, + "incoming_tx" to { + val (account, row) = it.split(' ', limit = 2).map { it.toLong() } + incomingTxFlows[account]?.run { + flow.emit(row) + } + }, + "withdrawal_status" to { + val raw = it.split(' ', limit = 2) + val uuid = UUID.fromString(raw[0]) + val status = WithdrawalStatus.valueOf(raw[1]) + withdrawalFlow[uuid]?.run { + flow.emit(status) + } + } + )) + } + + /** Listen for new bank transactions for [account] */ + suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(bankTxFlows, account, lambda) + /** Listen for new taler outgoing transactions from [account] */ + suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(outgoingTxFlows, exchange, lambda) + /** Listen for new taler incoming transactions to [account] */ + suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(incomingTxFlows, exchange, lambda) + /** Listen for new taler outgoing transactions to [account] */ + suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(revenueTxFlows, merchant, lambda) + /** Listen for new withdrawal confirmations */ + suspend fun <R> listenWithdrawals(withdrawal: UUID, lambda: suspend (Flow<WithdrawalStatus>) -> R): R + = listen(withdrawalFlow, withdrawal, lambda) + suspend fun monitor( params: MonitorParams ): MonitorResponse = conn { conn -> @@ -87,7 +148,7 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte talerOutCount = it.getLong("taler_out_count"), talerOutVolume = it.getAmount("taler_out_volume", bankCurrency), ) - } ?: MonitorNoConversion( + } ?: MonitorNoConversion( talerInCount = it.getLong("taler_in_count"), talerInVolume = it.getAmount("taler_in_volume", bankCurrency), talerOutCount = it.getLong("taler_out_count"), @@ -95,85 +156,6 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte ) } ?: throw internalServerError("No result from DB procedure stats_get_frame") } - - /** Apply paging logic to a sql query */ - internal suspend fun <T> page( - params: PageParams, - idName: String, - query: String, - bind: PreparedStatement.() -> Int = { 0 }, - map: (ResultSet) -> T - ): List<T> = conn { conn -> - val backward = params.delta < 0 - val pageQuery = """ - $query - $idName ${if (backward) '<' else '>'} ? - ORDER BY $idName ${if (backward) "DESC" else "ASC"} - LIMIT ? - """ - conn.prepareStatement(pageQuery).run { - val pad = bind() - setLong(pad + 1, params.start) - setInt(pad + 2, abs(params.delta)) - all { map(it) } - } - } - - /** - * The following function returns the list of transactions, according - * to the history parameters and perform long polling when necessary - */ - internal suspend fun <T> poolHistory( - params: HistoryParams, - bankAccountId: Long, - listen: suspend NotificationWatcher.(Long, suspend (Flow<Long>) -> List<T>) -> List<T>, - query: String, - accountColumn: String = "bank_account_id", - map: (ResultSet) -> T - ): List<T> { - - suspend fun load(): List<T> = page( - params.page, - "bank_transaction_id", - "$query $accountColumn=? AND", - { - setLong(1, bankAccountId) - 1 - }, - map - ) - - - // TODO do we want to handle polling when going backward and there is no transactions yet ? - // When going backward there is always at least one transaction or none - return if (params.page.delta >= 0 && params.polling.poll_ms > 0) { - notifWatcher.(listen)(bankAccountId) { flow -> - coroutineScope { - // Start buffering notification before loading transactions to not miss any - val polling = launch { - withTimeoutOrNull(params.polling.poll_ms) { - flow.first { it > params.page.start } // Always forward so > - } - } - // Initial loading - val init = load() - // Long polling if we found no transactions - if (init.isEmpty()) { - if (polling.join() != null) { - load() - } else { - init - } - } else { - polling.cancel() - init - } - } - } - } else { - load() - } - } } /** Result status of withdrawal or cashout operation abortion */ diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt index 3ae15c7e..644c17d3 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt @@ -20,12 +20,8 @@ package tech.libeufin.bank.db import tech.libeufin.bank.* -import tech.libeufin.common.BankPaytoCtx -import tech.libeufin.common.EddsaPublicKey -import tech.libeufin.common.ShortHashCode -import tech.libeufin.common.db.getAmount -import tech.libeufin.common.db.getBankPayto -import tech.libeufin.common.micros +import tech.libeufin.common.* +import tech.libeufin.common.db.* import java.time.Instant /** Data access logic for exchange specific logic */ @@ -36,7 +32,7 @@ class ExchangeDAO(private val db: Database) { exchangeId: Long, ctx: BankPaytoCtx ): List<IncomingReserveTransaction> - = db.poolHistory(params, exchangeId, NotificationWatcher::listenIncoming, """ + = db.poolHistory(params, exchangeId, db::listenIncoming, """ SELECT bank_transaction_id ,transaction_date @@ -65,7 +61,7 @@ class ExchangeDAO(private val db: Database) { exchangeId: Long, ctx: BankPaytoCtx ): List<OutgoingTransaction> - = db.poolHistory(params, exchangeId, NotificationWatcher::listenOutgoing, """ + = db.poolHistory(params, exchangeId, db::listenOutgoing, """ SELECT bank_transaction_id ,transaction_date diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt deleted file mode 100644 index 077cb771..00000000 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt +++ /dev/null @@ -1,151 +0,0 @@ -/* - * This file is part of LibEuFin. - * Copyright (C) 2023 Taler Systems S.A. - - * LibEuFin is free software; you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation; either version 3, or - * (at your option) any later version. - - * LibEuFin is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General - * Public License for more details. - - * You should have received a copy of the GNU Affero General Public - * License along with LibEuFin; see the file COPYING. If not, see - * <http://www.gnu.org/licenses/> - */ - -package tech.libeufin.bank.db - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.postgresql.ds.PGSimpleDataSource -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import tech.libeufin.bank.* -import tech.libeufin.common.* -import tech.libeufin.common.db.* -import java.util.* -import java.util.concurrent.ConcurrentHashMap - -private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db-watcher") - -/** Postgres notification collector and distributor */ -internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { - // ShareFlow that are manually counted for manual garbage collection - private class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int) - - // Transaction flows, the keys are the bank account id - private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() - private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() - private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() - private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow<Long>>() - // Withdrawal confirmation flow, the key is the public withdrawal UUID - private val withdrawalFlow = ConcurrentHashMap<UUID, CountedSharedFlow<WithdrawalStatus>>() - - private val backoff = ExpoBackoffDecorr() - - init { - // Run notification logic in a separated thread - kotlin.concurrent.thread(isDaemon = true) { - runBlocking { - while (true) { - try { - val conn = pgSource.pgConnection("libeufin_bank") - - // Listen to all notifications channels - conn.execSQLUpdate("LISTEN bank_tx") - conn.execSQLUpdate("LISTEN outgoing_tx") - conn.execSQLUpdate("LISTEN incoming_tx") - conn.execSQLUpdate("LISTEN withdrawal_status") - - backoff.reset() - - while (true) { - conn.getNotifications(0) // Block until we receive at least one notification - .forEach { - // Extract information and dispatch - when (it.name) { - "bank_tx" -> { - val (debtor, creditor, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } - bankTxFlows[debtor]?.run { - flow.emit(debitRow) - } - bankTxFlows[creditor]?.run { - flow.emit(creditRow) - } - revenueTxFlows[creditor]?.run { - flow.emit(creditRow) - } - } - "outgoing_tx" -> { - val (account, merchant, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } - outgoingTxFlows[account]?.run { - flow.emit(debitRow) - } - } - "incoming_tx" -> { - val (account, row) = it.parameter.split(' ', limit = 2).map { it.toLong() } - incomingTxFlows[account]?.run { - flow.emit(row) - } - } - "withdrawal_status" -> { - val raw = it.parameter.split(' ', limit = 2) - val uuid = UUID.fromString(raw[0]) - val status = WithdrawalStatus.valueOf(raw[1]) - withdrawalFlow[uuid]?.run { - flow.emit(status) - } - } - } - } - } - } catch (e: Exception) { - e.fmtLog(logger) - delay(backoff.next()) - } - } - } - } - } - - /** Listen to flow from [map] for [key] using [lambda]*/ - private suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R { - // Register listener, create a new flow if missing - val flow = map.compute(key) { _, v -> - val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0) - tmp.count++ - tmp - }!!.flow - - try { - return lambda(flow) - } finally { - // Unregister listener, removing unused flow - map.compute(key) { _, v -> - v!! - v.count-- - if (v.count > 0) v else null - } - } - } - - /** Listen for new bank transactions for [account] */ - suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R - = listen(bankTxFlows, account, lambda) - /** Listen for new taler outgoing transactions from [account] */ - suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R - = listen(outgoingTxFlows, exchange, lambda) - /** Listen for new taler incoming transactions to [account] */ - suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R - = listen(incomingTxFlows, exchange, lambda) - /** Listen for new taler outgoing transactions to [account] */ - suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R - = listen(revenueTxFlows, merchant, lambda) - /** Listen for new withdrawal confirmations */ - suspend fun <R> listenWithdrawals(withdrawal: UUID, lambda: suspend (Flow<WithdrawalStatus>) -> R): R - = listen(withdrawalFlow, withdrawal, lambda) -}
\ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt index 388d530a..524d5578 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/TanDAO.kt @@ -21,9 +21,8 @@ package tech.libeufin.bank.db import tech.libeufin.bank.Operation import tech.libeufin.bank.TanChannel -import tech.libeufin.bank.internalServerError -import tech.libeufin.common.db.oneOrNull -import tech.libeufin.common.micros +import tech.libeufin.common.* +import tech.libeufin.common.db.* import java.time.Duration import java.time.Instant import java.util.concurrent.TimeUnit diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt index 7fb8823d..794eb04d 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/TransactionDAO.kt @@ -189,7 +189,7 @@ class TransactionDAO(private val db: Database) { accountId: Long, ctx: BankPaytoCtx ): List<BankAccountTransactionInfo> { - return db.poolHistory(params, accountId, NotificationWatcher::listenBank, """ + return db.poolHistory(params, accountId, db::listenBank, """ SELECT bank_transaction_id ,transaction_date @@ -221,7 +221,7 @@ class TransactionDAO(private val db: Database) { accountId: Long, ctx: BankPaytoCtx ): List<RevenueIncomingBankTransaction> - = db.poolHistory(params, accountId, NotificationWatcher::listenRevenue, """ + = db.poolHistory(params, accountId, db::listenRevenue, """ SELECT bank_transaction_id ,transaction_date diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt index 6a02205c..efaa4b74 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt @@ -24,12 +24,8 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull import tech.libeufin.bank.* -import tech.libeufin.common.EddsaPublicKey -import tech.libeufin.common.Payto -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.getAmount -import tech.libeufin.common.db.oneOrNull -import tech.libeufin.common.micros +import tech.libeufin.common.* +import tech.libeufin.common.db.* import java.time.Instant import java.util.* @@ -207,7 +203,7 @@ class WithdrawalDAO(private val db: Database) { load: suspend () -> T? ): T? { return if (params.polling.poll_ms > 0) { - db.notifWatcher.listenWithdrawals(uuid) { flow -> + db.listenWithdrawals(uuid) { flow -> coroutineScope { // Start buffering notification before loading transactions to not miss any val polling = launch { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt index e502f62c..094b7996 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt @@ -149,7 +149,7 @@ fun Route.intercept(callback: Route.() -> Unit, interceptor: suspend PipelineCon fun Route.conditional(implemented: Boolean, callback: Route.() -> Unit): Route = intercept(callback) { if (!implemented) { - throw libeufinError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END) + throw apiError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END) } } diff --git a/bank/src/main/kotlin/tech/libeufin/bank/params.kt b/bank/src/main/kotlin/tech/libeufin/bank/params.kt index 72e6cabd..3f34fb36 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/params.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/params.kt @@ -20,44 +20,13 @@ package tech.libeufin.bank import io.ktor.http.* -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.TalerErrorCode +import tech.libeufin.common.* import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset import java.time.temporal.TemporalAdjusters import java.util.* -fun Parameters.expect(name: String): String - = get(name) ?: throw badRequest("Missing '$name' parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) -fun Parameters.int(name: String): Int? - = get(name)?.run { toIntOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) } -fun Parameters.expectInt(name: String): Int - = int(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) -fun Parameters.long(name: String): Long? - = get(name)?.run { toLongOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) } -fun Parameters.expectLong(name: String): Long - = long(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) -fun Parameters.uuid(name: String): UUID? { - return get(name)?.run { - try { - UUID.fromString(this) - } catch (e: Exception) { - throw badRequest("Param '$name' not an UUID", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) - } - } -} -fun Parameters.expectUuid(name: String): UUID - = uuid(name) ?: throw badRequest("Missing '$name' UUID parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) -fun Parameters.amount(name: String): TalerAmount? - = get(name)?.run { - try { - TalerAmount(this) - } catch (e: Exception) { - throw badRequest("Param '$name' not a taler amount", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) - } - } - data class MonitorParams( val timeframe: Timeframe, val date: LocalDateTime @@ -125,42 +94,6 @@ data class AccountParams( } } -data class PageParams( - val delta: Int, val start: Long -) { - companion object { - fun extract(params: Parameters): PageParams { - val delta: Int = params.int("delta") ?: -20 - val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE - if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) - // TODO enforce delta limit - return PageParams(delta, start) - } - } -} - -data class PollingParams( - val poll_ms: Long -) { - companion object { - fun extract(params: Parameters): PollingParams { - val poll_ms: Long = params.long("long_poll_ms") ?: 0 - if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) - return PollingParams(poll_ms) - } - } -} - -data class HistoryParams( - val page: PageParams, val polling: PollingParams -) { - companion object { - fun extract(params: Parameters): HistoryParams { - return HistoryParams(PageParams.extract(params), PollingParams.extract(params)) - } - } -} - data class RateParams( val debit: TalerAmount?, val credit: TalerAmount? ) { diff --git a/common/build.gradle b/common/build.gradle index cc9649af..cdc9c3e0 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -24,10 +24,19 @@ dependencies { implementation("org.postgresql:postgresql:$postgres_version") implementation("com.zaxxer:HikariCP:5.1.0") + implementation("io.ktor:ktor-server-core:$ktor_version") + implementation("io.ktor:ktor-server-call-logging:$ktor_version") + implementation("io.ktor:ktor-server-cors:$ktor_version") + implementation("io.ktor:ktor-server-content-negotiation:$ktor_version") + implementation("io.ktor:ktor-server-status-pages:$ktor_version") + implementation("io.ktor:ktor-server-netty:$ktor_version") + implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version") + implementation("io.ktor:ktor-server-forwarded-header:$ktor_version") implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version") implementation("io.ktor:ktor-server-test-host:$ktor_version") - implementation("org.jetbrains.kotlin:kotlin-test:$kotlin_version") + implementation("com.github.ajalt.clikt:clikt:$clikt_version") + implementation("org.jetbrains.kotlin:kotlin-test:$kotlin_version") testImplementation("uk.org.webcompere:system-stubs-core:2.1.6") }
\ No newline at end of file diff --git a/common/src/main/kotlin/ApiError.kt b/common/src/main/kotlin/ApiError.kt new file mode 100644 index 00000000..15ae871c --- /dev/null +++ b/common/src/main/kotlin/ApiError.kt @@ -0,0 +1,128 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ +package tech.libeufin.common + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.response.* +import io.ktor.util.* +import kotlinx.serialization.Serializable +import tech.libeufin.common.TalerAmount +import tech.libeufin.common.TalerErrorCode + +/** + * Convenience type to throw errors along the API activity + * and that is meant to be caught by Ktor and responded to the + * client. + */ +class ApiException( + // Status code that Ktor will set for the response. + val httpStatus: HttpStatusCode, + // Error detail object, after Taler API. + val talerError: TalerError +) : Exception(talerError.hint) + +/** + * Error object to respond to the client. The + * 'code' field takes values from the GANA gnu-taler-error-code + * specification. 'hint' is a human-readable description + * of the error. + */ +@Serializable +data class TalerError( + @kotlinx.serialization.Transient val err: TalerErrorCode = TalerErrorCode.END, + val code: Int, + val hint: String? = null, + val detail: String? = null +) + +private val LOG_MSG = AttributeKey<String>("log_msg") + +fun ApplicationCall.logMsg(): String? = attributes.getOrNull(LOG_MSG) + +suspend fun ApplicationCall.err( + status: HttpStatusCode, + hint: String?, + error: TalerErrorCode +) { + err( + ApiException( + httpStatus = status, talerError = TalerError( + code = error.code, err = error, hint = hint + ) + ) + ) +} + +suspend fun ApplicationCall.err( + err: ApiException +) { + attributes.put(LOG_MSG, "${err.talerError.err.name} ${err.talerError.hint}") + respond( + status = err.httpStatus, + message = err.talerError + ) +} + + +fun apiError( + status: HttpStatusCode, + hint: String?, + error: TalerErrorCode, + detail: String? = null +): ApiException = ApiException( + httpStatus = status, talerError = TalerError( + code = error.code, err = error, hint = hint, detail = detail + ) +) + +/* ----- HTTP error ----- */ + +fun forbidden( + hint: String = "No rights on the resource", + error: TalerErrorCode = TalerErrorCode.END +): ApiException = apiError(HttpStatusCode.Forbidden, hint, error) + +fun unauthorized( + hint: String, + error: TalerErrorCode = TalerErrorCode.GENERIC_UNAUTHORIZED +): ApiException = apiError(HttpStatusCode.Unauthorized, hint, error) + +fun internalServerError(hint: String?): ApiException + = apiError(HttpStatusCode.InternalServerError, hint, TalerErrorCode.GENERIC_INTERNAL_INVARIANT_FAILURE) + +fun notFound( + hint: String, + error: TalerErrorCode +): ApiException = apiError(HttpStatusCode.NotFound, hint, error) + +fun conflict( + hint: String, error: TalerErrorCode +): ApiException = apiError(HttpStatusCode.Conflict, hint, error) + +fun badRequest( + hint: String? = null, + error: TalerErrorCode = TalerErrorCode.GENERIC_JSON_INVALID, + detail: String? = null +): ApiException = apiError(HttpStatusCode.BadRequest, hint, error, detail) + +fun unsupportedMediaType( + hint: String, + error: TalerErrorCode = TalerErrorCode.END, +): ApiException = apiError(HttpStatusCode.UnsupportedMediaType, hint, error)
\ No newline at end of file diff --git a/common/src/main/kotlin/Constants.kt b/common/src/main/kotlin/Constants.kt index 7cc6ab0b..57f378c0 100644 --- a/common/src/main/kotlin/Constants.kt +++ b/common/src/main/kotlin/Constants.kt @@ -20,4 +20,7 @@ package tech.libeufin.common // DB const val MIN_VERSION: Int = 14 -const val SERIALIZATION_RETRY: Int = 10
\ No newline at end of file +const val SERIALIZATION_RETRY: Int = 10 + +// Security +const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB
\ No newline at end of file diff --git a/common/src/main/kotlin/TalerCommon.kt b/common/src/main/kotlin/TalerCommon.kt index 7d561c5c..cec5d9dc 100644 --- a/common/src/main/kotlin/TalerCommon.kt +++ b/common/src/main/kotlin/TalerCommon.kt @@ -27,6 +27,7 @@ import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor import kotlinx.serialization.descriptors.SerialDescriptor import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder +import tech.libeufin.common.* import java.net.URI sealed class CommonError(msg: String): Exception(msg) { diff --git a/common/src/main/kotlin/TalerMessage.kt b/common/src/main/kotlin/TalerMessage.kt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/common/src/main/kotlin/TalerMessage.kt diff --git a/common/src/main/kotlin/api/server.kt b/common/src/main/kotlin/api/server.kt new file mode 100644 index 00000000..ba6f2f61 --- /dev/null +++ b/common/src/main/kotlin/api/server.kt @@ -0,0 +1,225 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.api + +import io.ktor.http.* +import io.ktor.serialization.kotlinx.json.* +import io.ktor.server.application.* +import io.ktor.server.engine.* +import io.ktor.server.http.content.* +import io.ktor.server.netty.* +import io.ktor.server.plugins.* +import io.ktor.server.plugins.callloging.* +import io.ktor.server.plugins.contentnegotiation.* +import io.ktor.server.plugins.cors.routing.* +import io.ktor.server.plugins.forwardedheaders.* +import io.ktor.server.plugins.statuspages.* +import io.ktor.server.request.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.ktor.utils.io.* +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.json.Json +import org.postgresql.util.PSQLState +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.slf4j.event.Level +import tech.libeufin.common.* +import tech.libeufin.common.db.dbInit +import tech.libeufin.common.db.pgDataSource +import java.net.InetAddress +import java.sql.SQLException +import java.time.Instant +import java.util.zip.DataFormatException +import java.util.zip.Inflater +import kotlin.io.path.Path +import kotlin.io.path.exists +import kotlin.io.path.readText + +/** + * This plugin checks for body length limit and inflates the requests that have "Content-Encoding: deflate" + */ +fun bodyLimitPlugin(logger: Logger): ApplicationPlugin<Unit> { + return createApplicationPlugin("BodyLimitAndDecompression") { + onCallReceive { call -> + // TODO check content length as an optimisation + transformBody { body -> + val bytes = ByteArray(MAX_BODY_LENGTH.toInt() + 1) + var read = 0 + when (val encoding = call.request.headers[HttpHeaders.ContentEncoding]) { + "deflate" -> { + // Decompress and check decompressed length + val inflater = Inflater() + while (!body.isClosedForRead) { + body.read { buf -> + inflater.setInput(buf) + try { + read += inflater.inflate(bytes, read, bytes.size - read) + } catch (e: DataFormatException) { + logger.error("Deflated request failed to inflate: ${e.message}") + throw badRequest( + "Could not inflate request", + TalerErrorCode.GENERIC_COMPRESSION_INVALID + ) + } + } + if (read > MAX_BODY_LENGTH) + throw badRequest("Decompressed body is suspiciously big > $MAX_BODY_LENGTH B") + } + } + null -> { + // Check body length + while (true) { + val new = body.readAvailable(bytes, read, bytes.size - read) + if (new == -1) break // Channel is closed + read += new + if (read > MAX_BODY_LENGTH) + throw badRequest("Body is suspiciously big > $MAX_BODY_LENGTH B") + } + } + else -> throw unsupportedMediaType( + "Content encoding '$encoding' not supported, expected plain or deflate", + TalerErrorCode.GENERIC_COMPRESSION_INVALID + ) + } + ByteReadChannel(bytes, 0, read) + } + } + } +} + +/** Set up web server handlers for a Taler API */ +fun Application.talerApi(logger: Logger, routes: Routing.() -> Unit) { + install(CallLogging) { + level = Level.INFO + this.logger = logger + format { call -> + val status = call.response.status() + val httpMethod = call.request.httpMethod.value + val path = call.request.path() + val msg = call.logMsg() + if (msg != null) { + "$status, $httpMethod $path, $msg" + } else { + "$status, $httpMethod $path" + } + } + } + install(XForwardedHeaders) + install(CORS) { + anyHost() + allowHeader(HttpHeaders.Authorization) + allowHeader(HttpHeaders.ContentType) + allowMethod(HttpMethod.Options) + allowMethod(HttpMethod.Patch) + allowMethod(HttpMethod.Delete) + allowCredentials = true + } + install(bodyLimitPlugin(logger)) + install(IgnoreTrailingSlash) + install(ContentNegotiation) { + json(Json { + @OptIn(ExperimentalSerializationApi::class) + explicitNulls = false + encodeDefaults = true + ignoreUnknownKeys = true + }) + } + install(StatusPages) { + status(HttpStatusCode.NotFound) { call, status -> + call.err( + status, + "There is no endpoint defined for the URL provided by the client. Check if you used the correct URL and/or file a report with the developers of the client software.", + TalerErrorCode.GENERIC_ENDPOINT_UNKNOWN + ) + } + status(HttpStatusCode.MethodNotAllowed) { call, status -> + call.err( + status, + "The HTTP method used is invalid for this endpoint. This is likely a bug in the client implementation. Check if you are using the latest available version and/or file a report with the developers.", + TalerErrorCode.GENERIC_METHOD_INVALID + ) + } + exception<Exception> { call, cause -> + logger.debug("request failed", cause) + // TODO nexus specific error code ?! + when (cause) { + is ApiException -> call.err(cause) + is SQLException -> { + when (cause.sqlState) { + PSQLState.SERIALIZATION_FAILURE.state -> call.err( + HttpStatusCode.InternalServerError, + "Transaction serialization failure", + TalerErrorCode.BANK_SOFT_EXCEPTION + ) + else -> call.err( + HttpStatusCode.InternalServerError, + "Unexpected sql error with state ${cause.sqlState}", + TalerErrorCode.BANK_UNMANAGED_EXCEPTION + ) + } + } + is BadRequestException -> { + /** + * NOTE: extracting the root cause helps with JSON error messages, + * because they mention the particular way they are invalid, but OTOH + * it loses (by getting null) other error messages, like for example + * the one from MissingRequestParameterException. Therefore, in order + * to get the most detailed message, we must consider BOTH sides: + * the 'cause' AND its root cause! + */ + var rootCause: Throwable? = cause.cause + while (rootCause?.cause != null) + rootCause = rootCause.cause + // Telling apart invalid JSON vs missing parameter vs invalid parameter. + val talerErrorCode = when { + cause is MissingRequestParameterException -> + TalerErrorCode.GENERIC_PARAMETER_MISSING + cause is ParameterConversionException -> + TalerErrorCode.GENERIC_PARAMETER_MALFORMED + rootCause is CommonError -> when (rootCause) { + is CommonError.AmountFormat -> TalerErrorCode.BANK_BAD_FORMAT_AMOUNT + is CommonError.AmountNumberTooBig -> TalerErrorCode.BANK_NUMBER_TOO_BIG + is CommonError.Payto -> TalerErrorCode.GENERIC_JSON_INVALID + } + else -> TalerErrorCode.GENERIC_JSON_INVALID + } + call.err( + badRequest( + cause.message, + talerErrorCode, + /* Here getting _some_ error message, by giving precedence + * to the root cause, as otherwise JSON details would be lost. */ + rootCause?.message + ) + ) + } + else -> { + call.err( + HttpStatusCode.InternalServerError, + cause.message, + TalerErrorCode.BANK_UNMANAGED_EXCEPTION + ) + } + } + } + } + routing { routes() } +}
\ No newline at end of file diff --git a/common/src/main/kotlin/db/helpers.kt b/common/src/main/kotlin/db/helpers.kt new file mode 100644 index 00000000..13e0ace0 --- /dev/null +++ b/common/src/main/kotlin/db/helpers.kt @@ -0,0 +1,113 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + * + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + * + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + * + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.db + +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.* +import tech.libeufin.common.db.DbPool +import tech.libeufin.common.PageParams +import tech.libeufin.common.HistoryParams +import org.postgresql.jdbc.PgConnection +import org.postgresql.util.PSQLState +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.SQLException +import kotlin.math.abs + +/** Apply paging logic to a sql query */ +suspend fun <T> DbPool.page( + params: PageParams, + idName: String, + query: String, + bind: PreparedStatement.() -> Int = { 0 }, + map: (ResultSet) -> T +): List<T> = conn { conn -> + val backward = params.delta < 0 + val pageQuery = """ + $query + $idName ${if (backward) '<' else '>'} ? + ORDER BY $idName ${if (backward) "DESC" else "ASC"} + LIMIT ? + """ + conn.prepareStatement(pageQuery).run { + val pad = bind() + setLong(pad + 1, params.start) + setInt(pad + 2, abs(params.delta)) + all { map(it) } + } +} + +/** +* The following function returns the list of transactions, according +* to the history parameters and perform long polling when necessary +*/ +suspend fun <T> DbPool.poolHistory( + params: HistoryParams, + bankAccountId: Long, + listen: suspend (Long, suspend (Flow<Long>) -> List<T>) -> List<T>, + query: String, + accountColumn: String = "bank_account_id", + map: (ResultSet) -> T +): List<T> { + + suspend fun load(): List<T> = page( + params.page, + "bank_transaction_id", + "$query $accountColumn=? AND", + { + setLong(1, bankAccountId) + 1 + }, + map + ) + + + // TODO do we want to handle polling when going backward and there is no transactions yet ? + // When going backward there is always at least one transaction or none + return if (params.page.delta >= 0 && params.polling.poll_ms > 0) { + listen(bankAccountId) { flow -> + coroutineScope { + // Start buffering notification before loading transactions to not miss any + val polling = launch { + withTimeoutOrNull(params.polling.poll_ms) { + flow.first { it > params.page.start } // Always forward so > + } + } + // Initial loading + val init = load() + // Long polling if we found no transactions + if (init.isEmpty()) { + if (polling.join() != null) { + load() + } else { + init + } + } else { + polling.cancel() + init + } + } + } + } else { + load() + } +}
\ No newline at end of file diff --git a/common/src/main/kotlin/db/notifications.kt b/common/src/main/kotlin/db/notifications.kt new file mode 100644 index 00000000..3f2fb753 --- /dev/null +++ b/common/src/main/kotlin/db/notifications.kt @@ -0,0 +1,91 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + * + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + * + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + * + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.db + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.postgresql.ds.PGSimpleDataSource +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import tech.libeufin.common.* +import tech.libeufin.common.db.* +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +// SharedFlow that are manually counted for manual garbage collection +class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int) + +fun watchNotifications( + pgSource: PGSimpleDataSource, + schema: String, + logger: Logger, + listeners: Map<String, (suspend (String) -> Unit)> +) { + val backoff = ExpoBackoffDecorr() + // Run notification logic in a separated thread + kotlin.concurrent.thread(isDaemon = true) { + runBlocking { + while (true) { + try { + val conn = pgSource.pgConnection(schema) + + // Listen to all notifications channels + for (channel in listeners.keys) { + conn.execSQLUpdate("LISTEN $channel") + } + + backoff.reset() + + while (true) { + conn.getNotifications(0) // Block until we receive at least one notification + .forEach { + // Dispatch + listeners[it.name]!!(it.parameter) + } + } + } catch (e: Exception) { + e.fmtLog(logger) + delay(backoff.next()) + } + } + } + } +} + +/** Listen to flow from [map] for [key] using [lambda]*/ +suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R { + // Register listener, create a new flow if missing + val flow = map.compute(key) { _, v -> + val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0) + tmp.count++ + tmp + }!!.flow + + try { + return lambda(flow) + } finally { + // Unregister listener, removing unused flow + map.compute(key) { _, v -> + v!! + v.count-- + if (v.count > 0) v else null + } + } +}
\ No newline at end of file diff --git a/common/src/main/kotlin/db/utils.kt b/common/src/main/kotlin/db/transaction.kt index f06c4e4e..f06c4e4e 100644 --- a/common/src/main/kotlin/db/utils.kt +++ b/common/src/main/kotlin/db/transaction.kt diff --git a/common/src/main/kotlin/params.kt b/common/src/main/kotlin/params.kt new file mode 100644 index 00000000..a24bd379 --- /dev/null +++ b/common/src/main/kotlin/params.kt @@ -0,0 +1,95 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common + +import io.ktor.http.* +import tech.libeufin.common.TalerAmount +import tech.libeufin.common.TalerErrorCode +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.temporal.TemporalAdjusters +import java.util.* + +fun Parameters.expect(name: String): String + = get(name) ?: throw badRequest("Missing '$name' parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) +fun Parameters.int(name: String): Int? + = get(name)?.run { toIntOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) } +fun Parameters.expectInt(name: String): Int + = int(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) +fun Parameters.long(name: String): Long? + = get(name)?.run { toLongOrNull() ?: throw badRequest("Param '$name' not a number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) } +fun Parameters.expectLong(name: String): Long + = long(name) ?: throw badRequest("Missing '$name' number parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) +fun Parameters.uuid(name: String): UUID? { + return get(name)?.run { + try { + UUID.fromString(this) + } catch (e: Exception) { + throw badRequest("Param '$name' not an UUID", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) + } + } +} +fun Parameters.expectUuid(name: String): UUID + = uuid(name) ?: throw badRequest("Missing '$name' UUID parameter", TalerErrorCode.GENERIC_PARAMETER_MISSING) +fun Parameters.amount(name: String): TalerAmount? + = get(name)?.run { + try { + TalerAmount(this) + } catch (e: Exception) { + throw badRequest("Param '$name' not a taler amount", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) + } + } + +data class PageParams( + val delta: Int, val start: Long +) { + companion object { + fun extract(params: Parameters): PageParams { + val delta: Int = params.int("delta") ?: -20 + val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE + if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) + // TODO enforce delta limit + return PageParams(delta, start) + } + } +} + +data class PollingParams( + val poll_ms: Long +) { + companion object { + fun extract(params: Parameters): PollingParams { + val poll_ms: Long = params.long("long_poll_ms") ?: 0 + if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) + return PollingParams(poll_ms) + } + } +} + +data class HistoryParams( + val page: PageParams, val polling: PollingParams +) { + companion object { + fun extract(params: Parameters): HistoryParams { + return HistoryParams(PageParams.extract(params), PollingParams.extract(params)) + } + } +}
\ No newline at end of file |