commit c798cf50578bb04104be1e7dbc3f5a367dc4a14b
parent 29c30ba2dea6b94aa768c0f094085de3669a0be5
Author: t3sserakt <t3ss@posteo.de>
Date: Mon, 27 Apr 2026 15:50:03 +0200
WIP: two account perf measure
Diffstat:
1 file changed, 331 insertions(+), 0 deletions(-)
diff --git a/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/TwoAccountPerformanceTest.kt b/GNUnetMessenger/app/src/androidTest/java/org/gnunet/gnunetmessenger/perf/TwoAccountPerformanceTest.kt
@@ -0,0 +1,331 @@
+package org.gnunet.gnunetmessenger.perf
+
+import android.util.Log
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
+import org.gnunet.gnunetmessenger.model.ChatAccount
+import org.gnunet.gnunetmessenger.model.ChatContext
+import org.gnunet.gnunetmessenger.model.ChatHandle
+import org.gnunet.gnunetmessenger.model.GnunetReturnValue
+import org.gnunet.gnunetmessenger.model.MessageKind
+import org.gnunet.gnunetmessenger.model.MessengerApp
+import org.gnunet.gnunetmessenger.service.boundimpl.GnunetChatBoundService
+import org.junit.After
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Assume.assumeTrue
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+
+/**
+ * Phase 2: two parallel sessions on the same daemon.
+ *
+ * Spins up two independent `GnunetChatBoundService` clients (so two native
+ * GNUNET_CHAT_Handle instances via `g_sessions`), each with its own account
+ * and its own group. Both fire half of the measured messages concurrently
+ * into their own group and receive their own loopback. Latencies feed a
+ * shared recorder via disjoint seq ranges.
+ *
+ * What this measures vs Phase 1:
+ * - Phase 1 = one client, one session, one account: pure pipeline cost.
+ * - Phase 2 = two clients, two sessions, two accounts under concurrent
+ * load: surfaces daemon-side serialization, lock contention, and IPC
+ * queue behavior.
+ *
+ * Note: this is *not* a cross-account A→B routing test. The existing
+ * GnunetChatLobbyTest documents that lobby-join doesn't actually establish
+ * a cross-account connection at the protocol level, so a true A→B test
+ * would just time out at the JOIN step. Parallel loopback is the most
+ * meaningful two-account experiment that actually completes end-to-end.
+ */
+@RunWith(AndroidJUnit4::class)
+class TwoAccountPerformanceTest {
+
+ private val appContext = ApplicationProvider.getApplicationContext<android.content.Context>()
+ private val clientAlpha = GnunetChatBoundService(appContext)
+ private val clientBeta = GnunetChatBoundService(appContext)
+
+ private val recorder = LatencyRecorder()
+
+ private val warmupPerClient = 10
+ private val measuredPerClient = 500
+
+ /**
+ * Same boot-trick as Phase 1: the GNUnet scheduler is started by the
+ * server's MainActivity, not by the IPC service's onCreate. Force-launch
+ * it so the daemon is alive before either client binds.
+ */
+ @Before
+ fun bootServerDaemon() {
+ val launch = appContext.packageManager.getLaunchIntentForPackage(SERVER_PACKAGE)
+ if (launch != null) {
+ launch.addFlags(android.content.Intent.FLAG_ACTIVITY_NEW_TASK)
+ appContext.startActivity(launch)
+ Thread.sleep(5_000)
+ }
+ }
+
+ @After
+ fun tearDown() = runTest {
+ runCatching { clientAlpha.unbind() }
+ runCatching { clientBeta.unbind() }
+ delay(500)
+ }
+
+ @Test
+ fun twoSessionParallelLoopback_measuresLatencyAndThroughput() = runTest {
+ val pm = appContext.packageManager
+ val serverInstalled = runCatching {
+ pm.getPackageInfo(SERVER_PACKAGE, 0); true
+ }.getOrDefault(false)
+ assumeTrue(
+ "GNUnet IPC server package '$SERVER_PACKAGE' is not installed on the device. " +
+ "Install gnunet-android before running the perf test.",
+ serverInstalled,
+ )
+
+ // --- per-session receive bookkeeping ---
+ val perClientTotal = warmupPerClient + measuredPerClient
+ val alphaSeqStart = 0L
+ val betaSeqStart = perClientTotal.toLong()
+
+ val alphaDone = CompletableDeferred<Unit>()
+ val betaDone = CompletableDeferred<Unit>()
+ var alphaSeen = 0
+ var betaSeen = 0
+ var alphaFirstRecvNs = 0L
+ var alphaLastRecvNs = 0L
+ var betaFirstRecvNs = 0L
+ var betaLastRecvNs = 0L
+
+ val handleAlpha: ChatHandle = clientAlpha.startChat(MessengerApp()) { _, msg ->
+ if (msg.kind != MessageKind.TEXT) {
+ Log.d(TAG, "[alpha] kind=${msg.kind} text='${msg.text?.take(40)}'")
+ return@startChat
+ }
+ val seq = extractPerfSeq(msg.text) ?: return@startChat
+ // Only count messages this session emitted (its own loopback).
+ if (seq < alphaSeqStart || seq >= alphaSeqStart + perClientTotal) return@startChat
+
+ val now = System.nanoTime()
+ if (alphaFirstRecvNs == 0L) alphaFirstRecvNs = now
+ alphaLastRecvNs = now
+
+ val warmupCutoff = alphaSeqStart + warmupPerClient
+ if (seq >= warmupCutoff) recorder.markReceive(seq)
+ alphaSeen++
+ if (alphaSeen >= perClientTotal && !alphaDone.isCompleted) {
+ alphaDone.complete(Unit)
+ }
+ }
+
+ val handleBeta: ChatHandle = clientBeta.startChat(MessengerApp()) { _, msg ->
+ if (msg.kind != MessageKind.TEXT) {
+ Log.d(TAG, "[beta] kind=${msg.kind} text='${msg.text?.take(40)}'")
+ return@startChat
+ }
+ val seq = extractPerfSeq(msg.text) ?: return@startChat
+ if (seq < betaSeqStart || seq >= betaSeqStart + perClientTotal) return@startChat
+
+ val now = System.nanoTime()
+ if (betaFirstRecvNs == 0L) betaFirstRecvNs = now
+ betaLastRecvNs = now
+
+ val warmupCutoff = betaSeqStart + warmupPerClient
+ if (seq >= warmupCutoff) recorder.markReceive(seq)
+ betaSeen++
+ if (betaSeen >= perClientTotal && !betaDone.isCompleted) {
+ betaDone.complete(Unit)
+ }
+ }
+
+ // Wait for both sessions to be live before doing anything else.
+ try {
+ withContext(Dispatchers.Default.limitedParallelism(2)) {
+ withTimeout(20_000) {
+ coroutineScope {
+ val a = async { clientAlpha.awaitReady(handleAlpha) }
+ val b = async { clientBeta.awaitReady(handleBeta) }
+ awaitAll(a, b)
+ }
+ }
+ }
+ } catch (t: Throwable) {
+ Log.e(TAG, "awaitReady failed for one of the two sessions", t)
+ throw AssertionError(
+ "Two-session startChat did not produce live handles within 20s. " +
+ "Underlying cause: ${t.javaClass.simpleName}: ${t.message}",
+ t,
+ )
+ }
+
+ // Create + connect one account per client.
+ val accountAlpha = createAndConnectAccount(clientAlpha, handleAlpha, "PerfAccountAlpha")
+ val accountBeta = createAndConnectAccount(clientBeta, handleBeta, "PerfAccountBeta")
+
+ // Each client gets its own loopback group.
+ val groupAlpha = clientAlpha.createGroup(
+ handleAlpha, "perf-alpha-${System.currentTimeMillis()}",
+ )
+ val groupBeta = clientBeta.createGroup(
+ handleBeta, "perf-beta-${System.currentTimeMillis()}",
+ )
+ val ctxAlpha: ChatContext = clientAlpha.getGroupContext(groupAlpha)
+ val ctxBeta: ChatContext = clientBeta.getGroupContext(groupBeta)
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(500) }
+
+ Log.i(
+ TAG,
+ "Starting two-session perf run: warmup=$warmupPerClient measured=$measuredPerClient per session",
+ )
+
+ val wallStart = System.nanoTime()
+ // Fire both sessions in parallel — same daemon, two binders, two groups.
+ withContext(Dispatchers.Default.limitedParallelism(2)) {
+ coroutineScope {
+ val sendAlpha = async {
+ sendBatch(
+ client = clientAlpha,
+ ctx = ctxAlpha,
+ seqStart = alphaSeqStart,
+ total = perClientTotal,
+ warmup = warmupPerClient,
+ )
+ }
+ val sendBeta = async {
+ sendBatch(
+ client = clientBeta,
+ ctx = ctxBeta,
+ seqStart = betaSeqStart,
+ total = perClientTotal,
+ warmup = warmupPerClient,
+ )
+ }
+ awaitAll(sendAlpha, sendBeta)
+ }
+ }
+
+ // Wait for both sides' loopback to drain.
+ try {
+ withContext(Dispatchers.Default.limitedParallelism(2)) {
+ withTimeout(120_000) {
+ coroutineScope {
+ val a = async { alphaDone.await() }
+ val b = async { betaDone.await() }
+ awaitAll(a, b)
+ }
+ }
+ }
+ } catch (t: Throwable) {
+ Log.w(
+ TAG,
+ "Timed out waiting for parallel loopback: " +
+ "alphaSeen=$alphaSeen/$perClientTotal betaSeen=$betaSeen/$perClientTotal " +
+ "received=${recorder.receivedCount()} outstanding=${recorder.outstanding()}",
+ )
+ }
+
+ val wallEnd = maxOf(alphaLastRecvNs, betaLastRecvNs)
+ .takeIf { it != 0L } ?: System.nanoTime()
+ val totalMeasured = measuredPerClient * 2
+
+ val summary = recorder.summarize(totalWallNs = wallEnd - wallStart)
+ val rendered = summary.render(
+ "TwoAccountPerformanceTest / parallel two-session loopback " +
+ "(2 × $perClientTotal msgs, ${measuredPerClient * 2} measured)",
+ )
+ Log.i(TAG, "\n$rendered")
+ println(rendered)
+ Log.i(
+ TAG,
+ "per-session counts: alpha=$alphaSeen/$perClientTotal beta=$betaSeen/$perClientTotal",
+ )
+
+ assertTrue(
+ "Should have received >= 95% of measured messages " +
+ "(got ${summary.received}/$totalMeasured)",
+ summary.received >= (totalMeasured * 0.95).toInt(),
+ )
+ }
+
+ // ---- helpers ----
+
+ private suspend fun sendBatch(
+ client: GnunetChatBoundService,
+ ctx: ChatContext,
+ seqStart: Long,
+ total: Int,
+ warmup: Int,
+ ) {
+ for (i in 0 until total) {
+ val seq = seqStart + i
+ if (i >= warmup) recorder.markSend(seq)
+ val text = "${perfTag(seq)} hello"
+ withContext(Dispatchers.IO) { client.sendText(ctx, text) }
+ }
+ }
+
+ private suspend fun createAndConnectAccount(
+ client: GnunetChatBoundService,
+ handle: ChatHandle,
+ name: String,
+ ): ChatAccount {
+ val existing = runCatching { client.listAccounts(handle) }
+ .getOrDefault(emptyList())
+ Log.i(
+ TAG, "[$name] listAccounts (pre-create): ${existing.size} accounts " +
+ existing.joinToString(",") { "'${it.name}'" },
+ )
+ val pre = existing.firstOrNull { it.name.equals(name, ignoreCase = true) }
+ if (pre != null) {
+ Log.i(TAG, "[$name] account already exists; reusing.")
+ client.connect(handle, pre)
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(1_500) }
+ return pre
+ }
+
+ val res = client.createAccount(handle, name)
+ Log.i(TAG, "[$name] createAccount -> $res")
+ assertEquals("createAccount('$name')", GnunetReturnValue.OK, res)
+
+ val account = withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(15_000) {
+ var found: ChatAccount? = null
+ var attempt = 0
+ while (found == null) {
+ val accounts = runCatching { client.listAccounts(handle) }
+ .getOrDefault(emptyList())
+ if (attempt % 5 == 0) {
+ Log.i(
+ TAG,
+ "[$name] listAccounts attempt=$attempt size=${accounts.size} " +
+ accounts.joinToString(",") { "'${it.name}'" },
+ )
+ }
+ found = accounts.firstOrNull { it.name.equals(name, ignoreCase = true) }
+ if (found == null) delay(200)
+ attempt++
+ }
+ found
+ }
+ }
+ client.connect(handle, account)
+ withContext(Dispatchers.Default.limitedParallelism(1)) { delay(1_500) }
+ return account
+ }
+
+ companion object {
+ private const val TAG = "TwoAcctPerfTest"
+ private const val SERVER_PACKAGE = "org.gnu.gnunet"
+ }
+}