notifications.kt (3305B)
1 /* 2 * This file is part of LibEuFin. 3 * Copyright (C) 2024 Taler Systems S.A. 4 * 5 * LibEuFin is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation; either version 3, or 8 * (at your option) any later version. 9 * 10 * LibEuFin is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General 13 * Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public 16 * License along with LibEuFin; see the file COPYING. If not, see 17 * <http://www.gnu.org/licenses/> 18 */ 19 20 package tech.libeufin.common.db 21 22 import kotlinx.coroutines.delay 23 import kotlinx.coroutines.flow.Flow 24 import kotlinx.coroutines.flow.MutableSharedFlow 25 import kotlinx.coroutines.runBlocking 26 import org.postgresql.ds.PGSimpleDataSource 27 import org.slf4j.Logger 28 import tech.libeufin.common.ExpoBackoffDecorr 29 import tech.libeufin.common.fmtLog 30 import java.util.concurrent.ConcurrentHashMap 31 32 // SharedFlow that are manually counted for manual garbage collection 33 class CountedSharedFlow<T> { 34 val flow: MutableSharedFlow<T> = MutableSharedFlow() 35 var count: Int = 0 36 } 37 38 fun watchNotifications( 39 pgSource: PGSimpleDataSource, 40 schema: String, 41 logger: Logger, 42 listeners: Map<String, (suspend (String) -> Unit)> 43 ) { 44 val backoff = ExpoBackoffDecorr() 45 // Run notification logic in a separated thread 46 kotlin.concurrent.thread(isDaemon = true) { 47 runBlocking { 48 while (true) { 49 try { 50 val conn = pgSource.pgConnection(schema) 51 52 // Listen to all notifications channels 53 for (channel in listeners.keys) { 54 conn.execSQLUpdate("LISTEN $channel") 55 } 56 57 backoff.reset() 58 59 while (true) { 60 conn.getNotifications(0) // Block until we receive at least one notification 61 .forEach { 62 // Dispatch 63 try { 64 listeners[it.name]!!(it.parameter) 65 } catch (e: Exception) { 66 throw Exception("channel ${it.name} with input '${it.parameter}'", e) 67 } 68 } 69 } 70 } catch (e: Exception) { 71 e.fmtLog(logger) 72 delay(backoff.next()) 73 } 74 } 75 } 76 } 77 } 78 79 /** Listen to flow from [map] for [key] using [lambda]*/ 80 suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R { 81 // Register listener, create a new flow if missing 82 val flow = map.compute(key) { _, v -> 83 val tmp = v ?: CountedSharedFlow() 84 tmp.count++ 85 tmp 86 }!!.flow 87 88 try { 89 return lambda(flow) 90 } finally { 91 // Unregister listener, removing unused flow 92 map.compute(key) { _, v -> 93 v!! 94 v.count-- 95 if (v.count > 0) v else null 96 } 97 } 98 }