commit 29c30ba2dea6b94aa768c0f094085de3669a0be5
parent 8faafabb765d052fec69ada363dc132264ac7668
Author: t3sserakt <t3ss@posteo.de>
Date: Wed, 22 Apr 2026 16:10:02 +0200
WIP: measure the latency of text message of one account in a Group
Diffstat:
2 files changed, 363 insertions(+), 0 deletions(-)
diff --git a/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/LatencyRecorder.kt b/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/LatencyRecorder.kt
@@ -0,0 +1,133 @@
+package org.gnunet.gnunetmessenger.perf
+
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Records per-message send→receive latencies keyed by sequence id.
+ *
+ * Intended usage inside an AndroidJUnit4 instrumented test:
+ * val rec = LatencyRecorder()
+ * rec.markSend(seq) // right before sendText
+ * // ...somewhere in the message callback:
+ * rec.markReceive(seq) // first time seq is seen
+ * val stats = rec.summarize()
+ */
+class LatencyRecorder {
+
+ private val sendTimes = ConcurrentHashMap<Long, Long>()
+ private val latenciesNs = java.util.Collections.synchronizedList(mutableListOf<Long>())
+
+ fun markSend(seq: Long) {
+ sendTimes[seq] = System.nanoTime()
+ }
+
+ /** Idempotent: extra callbacks for the same seq (duplicates, echoes) are ignored. */
+ fun markReceive(seq: Long): Boolean {
+ val sent = sendTimes.remove(seq) ?: return false
+ val dtNs = System.nanoTime() - sent
+ latenciesNs.add(dtNs)
+ return true
+ }
+
+ fun receivedCount(): Int = latenciesNs.size
+ fun outstanding(): Int = sendTimes.size
+
+ fun summarize(totalWallNs: Long): PerfSummary {
+ val snapshot: LongArray = synchronized(latenciesNs) {
+ latenciesNs.toLongArray()
+ }
+ if (snapshot.isEmpty()) {
+ return PerfSummary.empty(totalWallNs)
+ }
+ snapshot.sort()
+
+ val n = snapshot.size
+ var sum = 0L
+ var lo = Long.MAX_VALUE
+ var hi = Long.MIN_VALUE
+ for (v in snapshot) {
+ sum += v
+ lo = min(lo, v)
+ hi = max(hi, v)
+ }
+
+ val mean = sum / n
+ fun percentile(p: Double): Long {
+ val idx = ((n - 1) * p).toInt().coerceIn(0, n - 1)
+ return snapshot[idx]
+ }
+
+ val p50 = percentile(0.50)
+ val p95 = percentile(0.95)
+ val p99 = percentile(0.99)
+
+ val throughputMsgPerSec =
+ if (totalWallNs > 0) n.toDouble() * 1_000_000_000.0 / totalWallNs.toDouble()
+ else 0.0
+
+ return PerfSummary(
+ received = n,
+ outstanding = sendTimes.size,
+ minNs = lo,
+ meanNs = mean,
+ p50Ns = p50,
+ p95Ns = p95,
+ p99Ns = p99,
+ maxNs = hi,
+ totalWallNs = totalWallNs,
+ throughputMsgPerSec = throughputMsgPerSec,
+ )
+ }
+}
+
+data class PerfSummary(
+ val received: Int,
+ val outstanding: Int,
+ val minNs: Long,
+ val meanNs: Long,
+ val p50Ns: Long,
+ val p95Ns: Long,
+ val p99Ns: Long,
+ val maxNs: Long,
+ val totalWallNs: Long,
+ val throughputMsgPerSec: Double,
+) {
+ fun render(label: String): String {
+ fun ms(ns: Long) = "%.2f ms".format(ns / 1_000_000.0)
+ return buildString {
+ appendLine("=== $label ===")
+ appendLine(" received : $received")
+ appendLine(" outstanding : $outstanding")
+ appendLine(" wall clock : ${ms(totalWallNs)}")
+ appendLine(" throughput : %.1f msg/s".format(throughputMsgPerSec))
+ appendLine(" latency min : ${ms(minNs)}")
+ appendLine(" latency mean : ${ms(meanNs)}")
+ appendLine(" latency p50 : ${ms(p50Ns)}")
+ appendLine(" latency p95 : ${ms(p95Ns)}")
+ appendLine(" latency p99 : ${ms(p99Ns)}")
+ appendLine(" latency max : ${ms(maxNs)}")
+ }
+ }
+
+ companion object {
+ fun empty(totalWallNs: Long) = PerfSummary(
+ received = 0, outstanding = 0,
+ minNs = 0, meanNs = 0, p50Ns = 0, p95Ns = 0, p99Ns = 0, maxNs = 0,
+ totalWallNs = totalWallNs, throughputMsgPerSec = 0.0,
+ )
+ }
+}
+
+private val PERF_SEQ_REGEX = Regex("""PERF-(\d+)""")
+
+/** Returns the seq number embedded in [text] as `PERF-<digits>`, or null. */
+fun extractPerfSeq(text: String?): Long? {
+ if (text.isNullOrEmpty()) return null
+ val m = PERF_SEQ_REGEX.find(text) ?: return null
+ return m.groupValues[1].toLongOrNull()
+}
+
+/** Renders a sequence number as a fixed-width tag for embedding in message text. */
+fun perfTag(seq: Long): String = "PERF-%010d".format(seq)
diff --git a/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/MessagePerformanceTest.kt b/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/MessagePerformanceTest.kt
@@ -0,0 +1,230 @@
+package org.gnunet.gnunetmessenger.perf
+
+import android.util.Log
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
+import org.gnunet.gnunetmessenger.model.ChatAccount
+import org.gnunet.gnunetmessenger.model.ChatContext
+import org.gnunet.gnunetmessenger.model.ChatHandle
+import org.gnunet.gnunetmessenger.model.GnunetReturnValue
+import org.gnunet.gnunetmessenger.model.MessageKind
+import org.gnunet.gnunetmessenger.model.MessengerApp
+import org.gnunet.gnunetmessenger.service.boundimpl.GnunetChatBoundService
+import org.junit.After
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Assume.assumeTrue
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+
+/**
+ * Throughput / latency microbenchmark for sending TEXT messages through the
+ * full client → IPC → libgnunetchat → daemon → callback pipeline.
+ *
+ * (this file): single-account group loopback. One connected account
+ * publishes into a group it created, and receives each message back via the
+ * chat callback. Exercises every layer except the cross-peer hop.
+ */
+@RunWith(AndroidJUnit4::class)
+class MessagePerformanceTest {
+
+ private val appContext = ApplicationProvider.getApplicationContext<android.content.Context>()
+ private val gnunetChat = GnunetChatBoundService(appContext)
+
+ private val recorder = LatencyRecorder()
+
+ // Tuneable.
+ private val warmupMessages = 10
+ private val measuredMessages = 200
+
+ /**
+ * The GNUnet monolithic scheduler is started by the server's MainActivity,
+ * not by the IPC service's onCreate. If the server app has never been
+ * launched in this process-lifetime, GNUNET_CHAT_start aborts because the
+ * scheduler is missing. Force-launch the server's MainActivity so the
+ * daemon boots before we bind.
+ */
+ @Before
+ fun bootServerDaemon() {
+ val launch = appContext.packageManager.getLaunchIntentForPackage(SERVER_PACKAGE)
+ if (launch != null) {
+ launch.addFlags(android.content.Intent.FLAG_ACTIVITY_NEW_TASK)
+ appContext.startActivity(launch)
+ // Give the server process and its native daemon some time to come up.
+ Thread.sleep(5_000)
+ }
+ }
+
+ @After
+ fun tearDown() = runTest {
+ gnunetChat.unbind()
+ delay(500)
+ }
+
+ @Test
+ fun singleAccountGroupLoopback_measuresLatencyAndThroughput() = runTest {
+ val allDone = CompletableDeferred<Unit>()
+ val expectedTotal = warmupMessages + measuredMessages
+ var seenCount = 0
+ var firstRecvAtNs: Long = 0L
+ var lastRecvAtNs: Long = 0L
+
+ // Fail fast if the IPC server package is missing; otherwise the test
+ // floods binder retries. See README / test setup for install steps.
+ val pm = appContext.packageManager
+ val serverInstalled = runCatching {
+ pm.getPackageInfo(SERVER_PACKAGE, 0); true
+ }.getOrDefault(false)
+ assumeTrue(
+ "GNUnet IPC server package '$SERVER_PACKAGE' is not installed on the device. " +
+ "Install gnunet-android before running the perf test.",
+ serverInstalled,
+ )
+
+ val handle: ChatHandle = gnunetChat.startChat(MessengerApp()) { _, msg ->
+ // Diagnostic: log every non-TEXT kind so we see LOGIN/CREATED_ACCOUNT/etc.
+ if (msg.kind != MessageKind.TEXT) {
+ Log.d(TAG, "callback kind=${msg.kind} text='${msg.text?.take(40)}'")
+ return@startChat
+ }
+ val seq = extractPerfSeq(msg.text) ?: return@startChat
+
+ val now = System.nanoTime()
+ val wasFirst = (firstRecvAtNs == 0L)
+ if (wasFirst) firstRecvAtNs = now
+ lastRecvAtNs = now
+
+ if (seq >= warmupMessages) {
+ recorder.markReceive(seq)
+ }
+ seenCount++
+ if (seenCount >= expectedTotal && !allDone.isCompleted) {
+ allDone.complete(Unit)
+ }
+ }
+ try {
+ // runTest uses a *virtual* scheduler: withTimeout/delay here would
+ // advance virtual time and never wait for the real binder to come up.
+ // Hop to a real dispatcher so we measure wall clock.
+ withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(20_000) { gnunetChat.awaitReady(handle) }
+ }
+ } catch (t: Throwable) {
+ Log.e(TAG, "awaitReady failed — server bind or startChat crashed", t)
+ throw AssertionError(
+ "startChat did not produce a live handle within 20s. " +
+ "Check logcat for 'GnunetChatBoundService' (bind state) and " +
+ "'GnunetChatIpc' / 'NativeBridge' (server-side startup). " +
+ "Underlying cause: ${t.javaClass.simpleName}: ${t.message}",
+ t,
+ )
+ }
+
+ // Create + connect the lone account.
+ val account = createAndConnectAccount(handle, "PerfAccount")
+
+ // Create a group the account is already a member of.
+ val group = gnunetChat.createGroup(handle, "perf-group-${System.currentTimeMillis()}")
+ val groupCtx: ChatContext = gnunetChat.getGroupContext(group)
+ // Give the group a moment to be fully set up on the server side.
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(500) }
+
+ Log.i(TAG, "Starting perf run: warmup=$warmupMessages measured=$measuredMessages")
+
+ val wallStart = System.nanoTime()
+ // Pipelined fire. LatencyRecorder handles out-of-order receive.
+ for (seq in 0L until expectedTotal.toLong()) {
+ if (seq >= warmupMessages) {
+ recorder.markSend(seq)
+ }
+ val text = "${perfTag(seq)} hello"
+ // sendText is runBlocking internally; this is a suspend-safe hot loop
+ // because the loop body yields on each call.
+ withContext(Dispatchers.IO) { gnunetChat.sendText(groupCtx, text) }
+ }
+
+ // Wait for everything to come back. Use real dispatcher (see note above).
+ try {
+ withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(60_000) { allDone.await() }
+ }
+ } catch (t: Throwable) {
+ Log.w(
+ TAG,
+ "Timed out waiting for all messages: received=${recorder.receivedCount()} " +
+ "outstanding=${recorder.outstanding()} seenCount=$seenCount",
+ )
+ }
+ val wallEnd = lastRecvAtNs.takeIf { it != 0L } ?: System.nanoTime()
+
+ val summary = recorder.summarize(totalWallNs = wallEnd - wallStart)
+ val rendered = summary.render("MessagePerformanceTest / single-account group loopback")
+ // Emit twice: once for logcat, once for the instrumentation stdout.
+ Log.i(TAG, "\n$rendered")
+ println(rendered)
+
+ assertTrue(
+ "Should have received >= 95% of measured messages (got ${summary.received}/$measuredMessages)",
+ summary.received >= (measuredMessages * 0.95).toInt(),
+ )
+ }
+
+ // ---- helpers ----
+
+ private suspend fun createAndConnectAccount(
+ handle: ChatHandle,
+ name: String,
+ ): ChatAccount {
+ // Check if the account is already there from a previous run. Re-creating
+ // the same name returns SYSERR and fails the assertion.
+ val existing = runCatching { gnunetChat.listAccounts(handle) }
+ .getOrDefault(emptyList())
+ Log.i(TAG, "listAccounts (pre-create): ${existing.size} accounts " +
+ existing.joinToString(",") { "'${it.name}'" })
+ val pre = existing.firstOrNull { it.name.equals(name, ignoreCase = true) }
+ if (pre != null) {
+ Log.i(TAG, "Account '$name' already exists; using it.")
+ gnunetChat.connect(handle, pre)
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(1_500) }
+ return pre
+ }
+
+ val res = gnunetChat.createAccount(handle, name)
+ Log.i(TAG, "createAccount('$name') -> $res")
+ assertEquals("createAccount('$name')", GnunetReturnValue.OK, res)
+
+ val account = withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(15_000) {
+ var found: ChatAccount? = null
+ var attempt = 0
+ while (found == null) {
+ val accounts = runCatching { gnunetChat.listAccounts(handle) }
+ .getOrDefault(emptyList())
+ if (attempt % 5 == 0) {
+ Log.i(TAG, "listAccounts attempt=$attempt size=${accounts.size} " +
+ accounts.joinToString(",") { "'${it.name}'" })
+ }
+ found = accounts.firstOrNull { it.name.equals(name, ignoreCase = true) }
+ if (found == null) delay(200)
+ attempt++
+ }
+ found
+ }
+ }
+ gnunetChat.connect(handle, account)
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(1_500) }
+ return account
+ }
+
+ companion object {
+ private const val TAG = "MessagePerfTest"
+ private const val SERVER_PACKAGE = "org.gnu.gnunet"
+ }
+}