libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit b2423b33b921c9ac95ed40d95ec9fbe9d7811d2b
parent 569904a351f8f5aa34c4419aed445854d2341746
Author: ms <ms@taler.net>
Date:   Mon, 11 Oct 2021 09:24:35 +0200

Logging and sending HTTP chunks via library's handlers.

Diffstat:
Mutil/src/main/kotlin/UnixDomainSocket.kt | 110++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
1 file changed, 61 insertions(+), 49 deletions(-)

diff --git a/util/src/main/kotlin/UnixDomainSocket.kt b/util/src/main/kotlin/UnixDomainSocket.kt @@ -1,75 +1,75 @@ import io.ktor.application.* import io.ktor.client.statement.* import io.ktor.http.* +import io.ktor.http.HttpHeaders import io.ktor.http.HttpMethod import io.ktor.server.engine.* import io.ktor.server.testing.* +import io.ktor.utils.io.pool.* import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufInputStream import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledDirectByteBuf import io.netty.channel.* import io.netty.channel.epoll.EpollEventLoopGroup import io.netty.channel.epoll.EpollServerDomainSocketChannel import io.netty.channel.unix.DomainSocketAddress +import io.netty.handler.codec.LengthFieldPrepender import io.netty.handler.codec.http.* import io.netty.handler.codec.http.DefaultHttpResponse +import io.netty.handler.codec.http.HttpMessage +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import io.netty.handler.stream.ChunkedInput +import io.netty.handler.stream.ChunkedStream +import io.netty.handler.stream.ChunkedWriteHandler import io.netty.util.AttributeKey +import io.netty.util.ReferenceCountUtil +import org.slf4j.LoggerFactory +import java.io.ByteArrayInputStream +import java.io.InputStream +import java.nio.charset.Charset fun startServer(unixSocketPath: String, app: Application.() -> Unit) { - val boss = EpollEventLoopGroup() val worker = EpollEventLoopGroup() - val serverBootstrap = ServerBootstrap() - serverBootstrap.group(boss, worker).channel( - EpollServerDomainSocketChannel::class.java - ).childHandler(LibeufinHttpInit(app)) - - val socketPath = DomainSocketAddress(unixSocketPath) - serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync() + try { + val serverBootstrap = ServerBootstrap() + serverBootstrap.group(boss, worker).channel( + EpollServerDomainSocketChannel::class.java + ).childHandler(LibeufinHttpInit(app)) + val socketPath = DomainSocketAddress(unixSocketPath) + logger.debug("Listening on $unixSocketPath ..") + serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync() + } finally { + boss.shutdownGracefully() + worker.shutdownGracefully() + } } -private val ktorApplicationKey = AttributeKey.newInstance<Application.() -> Unit>("KtorApplicationCall") - -class LibeufinHttpInit(private val app: Application.() -> Unit) : ChannelInitializer<Channel>() { +class LibeufinHttpInit( + private val app: Application.() -> Unit +) : ChannelInitializer<Channel>() { override fun initChannel(ch: Channel) { - val libeufinHandler = LibeufinHttpHandler() ch.pipeline( - ).addLast( - HttpServerCodec() - ).addLast( - HttpObjectAggregator(Int.MAX_VALUE) - ).addLast( - libeufinHandler - ) - val libeufinCtx: ChannelHandlerContext = ch.pipeline().context(libeufinHandler) - libeufinCtx.attr(ktorApplicationKey).set(app) + ).addLast(LoggingHandler("tech.libeufin.util") + ).addLast(HttpServerCodec() // in- and out- bound + ).addLast(HttpObjectAggregator(Int.MAX_VALUE) // only in- bound + ).addLast(ChunkedWriteHandler() + ).addLast(LibeufinHttpHandler(app)) // in- bound, and triggers out- bound. } } -class LibeufinHttpHandler : SimpleChannelInboundHandler<FullHttpRequest>() { - +class LibeufinHttpHandler( + private val app: Application.() -> Unit +) : SimpleChannelInboundHandler<FullHttpRequest>() { @OptIn(EngineAPI::class) - override fun channelRead0(ctx: ChannelHandlerContext?, msg: FullHttpRequest) { - val app = ctx?.attr(ktorApplicationKey)?.get() - if (app == null) throw UtilError( - HttpStatusCode.InternalServerError, - "custom libEufin Unix-domain-socket+HTTP handler lost its Web app", - null - ) - /** - * Below is only a echo of what euFin gets from the network. All - * the checks should then occur at the Web app + Ktor level. Hence, - * a HTTP call of GET with a non-empty body is not to be blocked / warned - * at this level. - * - * The only exception is the HTTP version value in the response, as the - * response returned by the Web app does not set it. Therefore, this - * proxy echoes back the HTTP version that was read in the request. - */ + override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { withTestApplication(app) { val httpVersion = msg.protocolVersion() - // Proxying the request with Ktor API. - val call = handleRequest(closeRequest = false) { + // Proxying the request to Ktor API. + val call = handleRequest { msg.headers().forEach { addHeader(it.key, it.value) } method = HttpMethod(msg.method().name()) uri = msg.uri() @@ -81,17 +81,29 @@ class LibeufinHttpHandler : SimpleChannelInboundHandler<FullHttpRequest>() { "app proxied via Unix domain socket did not include a response status code", ec = null // FIXME: to be defined. ) - // Responding with Netty API. - val response = DefaultFullHttpResponse( + // Responding to Netty API. + val response = DefaultHttpResponse( httpVersion, - HttpResponseStatus.valueOf(statusCode), - Unpooled.wrappedBuffer(call.response.byteContent ?: ByteArray(0)) + HttpResponseStatus.valueOf(statusCode) ) + var chunked = false call.response.headers.allValues().forEach { s, list -> - response.headers().set(s, list.joinToString()) // joinToString() separates with ", " by default. + if (s == HttpHeaders.TransferEncoding && list.contains("chunked")) + chunked = true + response.headers().set(s, list.joinToString()) + } + ctx.writeAndFlush(response) + if (chunked) { + ctx.writeAndFlush( + HttpChunkedInput( + ChunkedStream( + ByteArrayInputStream(call.response.byteContent) + ) + ) + ) + } else { + ctx.writeAndFlush(Unpooled.wrappedBuffer(call.response.byteContent)) } - ctx.write(response) - ctx.flush() } } } \ No newline at end of file