commit c1ef824f95612bea94a28563151efa99e18f47f6
parent 16b5da18df2d725bbcaa2e6efa05c793492ab445
Author: t3serakt <t3ss@posteo.de>
Date: Thu, 18 Apr 2024 16:57:53 +0200
Added sync protocol.
Diffstat:
2 files changed, 234 insertions(+), 33 deletions(-)
diff --git a/src/service/core/test_core_just_run_topo.conf b/src/service/core/test_core_just_run_topo.conf
@@ -0,0 +1,9 @@
+M:1
+N:2
+X:1
+T:libgnunet_test_core_plugin_cmd_just_run
+K:1|{connect:{P:1:1:tcp}}
+R:1|{tcp_port:0}|{udp_port:0}
+R:2|{tcp_port:0}|{udp_port:0}
+P:1:1|{connect:{K:1:tcp}}
+P:2:1|{connect:{K:1:tcp}}
diff --git a/src/service/transport/gnunet-service-transport.c b/src/service/transport/gnunet-service-transport.c
@@ -325,6 +325,12 @@
#define QUEUE_ENTRY_TIMEOUT \
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * Difference of the avarage RTT for the DistanceVector calculate by us and the target
+ * we are willing to accept for starting the burst.
+ */
+#define RTT_DIFF \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
GNUNET_NETWORK_STRUCT_BEGIN
@@ -984,6 +990,15 @@ struct TransportFlowControlMessage
*/
struct GNUNET_TIME_AbsoluteNBO sender_time;
+ /**
+ * Avarage RTT for the DistanceVector of the VirtualLink we tell the target.
+ */
+ struct GNUNET_TIME_RelativeNBO rtt;
+
+ /**
+ * We tell the target, if we are ready to start the burst.
+ */
+ unsigned int sync_ready;
/**
* Number of TransportGlobalNattedAddress following the struct.
@@ -1470,6 +1485,17 @@ struct VirtualLink
struct GNUNET_TIME_Relative last_fc_rtt;
/**
+ * Avarage RTT for over all paths of the DistanceVector of this VirtualLink
+ * calculated by the target.
+ */
+ struct GNUNET_TIME_Relative other_rtt;
+
+ /**
+ * The task to start the burst.
+ */
+ struct GNUNET_SCHEDULER_Task *burst_task;
+
+ /**
* Used to generate unique UUIDs for messages that are being
* fragmented.
*/
@@ -1570,6 +1596,27 @@ struct VirtualLink
* would decrement the window by one per CORE client).
*/
int core_recv_window;
+
+ /**
+ * Are we ready to start the burst?
+ */
+ enum GNUNET_GenericReturnValue sync_ready;
+
+ /**
+ * Did the target tell us it is ready to start the burst?
+ */
+ enum GNUNET_GenericReturnValue other_sync_ready;
+
+ /**
+ * Did we start the burst?
+ */
+ enum GNUNET_GenericReturnValue burst_sync;
+
+ /**
+ * Factor we multiply the avarage RTT with for calculating the the delay to start the burst.
+ * The factor depends on which peer first got ready to sync.
+ */
+ unsigned long long rtt_factor;
};
@@ -4570,7 +4617,8 @@ handle_communicator_available (
tc->details.communicator.cc =
(enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Communicator with prefix `%s' connected\n",
+ "Communicator for peer %s with prefix '%s' connected\n",
+ GNUNET_i2s (&GST_my_identity),
tc->details.communicator.address_prefix);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -5368,6 +5416,10 @@ add_global_addresses (void *cls,
}
+static struct GNUNET_TIME_Relative
+calculate_rtt (struct DistanceVector *dv);
+
+
/**
* Something changed on the virtual link with respect to flow
* control. Consider retransmitting the FC window size.
@@ -5382,9 +5434,10 @@ consider_sending_fc (void *cls)
struct TransportFlowControlMessage *fc;
struct GNUNET_TIME_Relative duration;
struct GNUNET_TIME_Relative rtt;
+ struct GNUNET_TIME_Relative rtt_avarage;
struct Neighbour *n = vl->n;
- if (0 < n->number_of_addresses)
+ if (NULL != n && 0 < n->number_of_addresses)
{
char *tgnas = GNUNET_malloc (n->number_of_addresses * sizeof (struct TransportGlobalNattedAddress) + n->size_of_global_addresses);
size_t addresses_size;
@@ -5416,13 +5469,21 @@ consider_sending_fc (void *cls)
VL, as that determines "significantly". We have the delay, but
the bandwidth statistics need to be added for the VL!*/(void) duration;
+ if (NULL != vl->dv)
+ rtt_avarage = calculate_rtt (vl->dv);
+ else
+ rtt_avarage = GNUNET_TIME_UNIT_FOREVER_REL;
+ fc->rtt = GNUNET_TIME_relative_hton (rtt_avarage);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending FC seq %u to %s with new window %llu\n",
+ "Sending FC seq %u to %s with new window %llu %llu %u\n",
(unsigned int) vl->fc_seq_gen,
GNUNET_i2s (&vl->target),
- (unsigned long long) vl->incoming_fc_window_size);
+ (unsigned long long) vl->incoming_fc_window_size,
+ rtt_avarage.rel_value_us,
+ vl->sync_ready);
monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
vl->last_fc_transmission = monotime;
+ fc->sync_ready = vl->sync_ready;
fc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL);
fc->seq = htonl (vl->fc_seq_gen++);
fc->inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size
@@ -7362,22 +7423,33 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
return GNUNET_SYSERR;
}
for (unsigned int i = 2; i < path_len; i++)
- if (NULL != lookup_neighbour (&path[i]))
+ {
+ struct Neighbour *n = lookup_neighbour (&path[i]);
+ struct GNUNET_TIME_Absolute q_timeout;
+
+ if (NULL != n)
{
- /* Useless path: we have a direct connection to some hop
- in the middle of the path, so this one is not even
- terribly useful for redundancy */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Path of %u hops useless: directly link to hop %u (%s)\n",
- path_len,
- i,
- GNUNET_i2s (&path[i]));
- GNUNET_STATISTICS_update (GST_stats,
- "# Useless DV path ignored: hop is neighbour",
- 1,
- GNUNET_NO);
- return GNUNET_SYSERR;
+ q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+ q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+ if (0 != GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+ {
+ /* Useless path: we have a direct active connection to some hop
+ in the middle of the path, so this one is not even
+ terribly useful for redundancy */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Path of %u hops useless: directly link to hop %u (%s)\n",
+ path_len,
+ i,
+ GNUNET_i2s (&path[i]));
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Useless DV path ignored: hop is neighbour",
+ 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
}
+ }
dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
if (NULL == dv)
{
@@ -7875,6 +7947,33 @@ neighbour_store_dvmono_cb (void *cls, int success)
}
+static struct GNUNET_TIME_Relative
+get_network_latency (const struct TransportDVLearnMessage *dvl)
+{
+ struct GNUNET_TIME_Relative host_latency_sum;
+ struct GNUNET_TIME_Relative latency;
+ struct GNUNET_TIME_Relative network_latency;
+ uint16_t nhops = ntohs (dvl->num_hops);;
+
+ /* We initiated this, learn the forward path! */
+ host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
+
+ // Need also something to lookup initiation time
+ // to compute RTT! -> add RTT argument here?
+ latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (
+ dvl->monotonic_time));
+ GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
+ // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
+ // (based on dvl->challenge, we can identify time of origin!)
+
+ network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
+ /* assumption: latency on all links is the same */
+ network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
+
+ return network_latency;
+}
+
+
/**
* Communicator gave us a DV learn message. Process the request.
*
@@ -8047,26 +8146,13 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
{
struct GNUNET_PeerIdentity path[nhops + 1];
- struct GNUNET_TIME_Relative host_latency_sum;
- struct GNUNET_TIME_Relative latency;
struct GNUNET_TIME_Relative network_latency;
/* We initiated this, learn the forward path! */
path[0] = GST_my_identity;
path[1] = hops[0].hop;
- host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
-
- // Need also something to lookup initiation time
- // to compute RTT! -> add RTT argument here?
- latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (
- dvl->monotonic_time));
- GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
- // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
- // (based on dvl->challenge, we can identify time of origin!)
- network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
- /* assumption: latency on all links is the same */
- network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
+ network_latency = get_network_latency (dvl);
for (unsigned int i = 2; i <= nhops; i++)
{
@@ -8094,6 +8180,8 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
{
/* last hop was bi-directional, we could learn something here! */
struct GNUNET_PeerIdentity path[nhops + 2];
+ struct GNUNET_TIME_Relative ilat;
+ struct GNUNET_TIME_Relative network_latency;
path[0] = GST_my_identity;
path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
@@ -8116,9 +8204,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
"Learned inverse path with %u hops to %s\n",
i + 2,
GNUNET_i2s (&path[i + 2]));
+ network_latency = get_network_latency (dvl);
+ ilat = GNUNET_TIME_relative_multiply (network_latency, i + 2);
iret = learn_dv_path (path,
i + 3,
- GNUNET_TIME_UNIT_FOREVER_REL,
+ ilat,
GNUNET_TIME_relative_to_absolute (
ADDRESS_VALIDATION_LIFETIME));
if (GNUNET_SYSERR == iret)
@@ -9630,6 +9720,36 @@ check_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
}
}
+static struct GNUNET_TIME_Relative
+calculate_rtt (struct DistanceVector *dv)
+{
+ struct GNUNET_TIME_Relative ret = GNUNET_TIME_UNIT_ZERO;
+ unsigned int n_hops = 0;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "calculate_rtt\n");
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "calculate_rtt %llu\n",
+ pos->pd.aged_rtt.rel_value_us);
+ n_hops++;
+ ret = GNUNET_TIME_relative_add (GNUNET_TIME_relative_multiply (pos->pd.aged_rtt, pos->distance + 2), ret);
+ }
+
+ GNUNET_assert (0 != n_hops);
+
+ return ret ;
+}
+
+static void
+start_burst (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Burst started \n");
+}
+
/**
* Communicator gave us a transport address validation response. Process the
* request.
@@ -9643,6 +9763,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
{
struct CommunicatorMessageContext *cmc = cls;
struct VirtualLink *vl;
+ struct GNUNET_TIME_Absolute q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
uint32_t seq;
struct GNUNET_TIME_Absolute st;
uint64_t os;
@@ -9673,6 +9794,77 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
vl,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
}
+ if (NULL != vl->n && GNUNET_YES != vl->burst_sync)
+ {
+ for (struct Queue *q = vl->n->queue_head; NULL != q; q = q->next_neighbour)
+ q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+ }
+
+ if ((NULL == vl->n ||
+ 0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+ && GNUNET_YES != vl->burst_sync)
+ {
+ struct GNUNET_TIME_Relative rtt;
+ struct GNUNET_TIME_Relative rel1;
+ struct GNUNET_TIME_Relative rel2;
+
+ if (NULL != vl->dv)
+ rtt = calculate_rtt (vl->dv);
+ else
+ rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ vl->other_rtt = GNUNET_TIME_relative_ntoh (fc->rtt);
+ vl->other_sync_ready = fc->sync_ready;
+ rel1 = GNUNET_TIME_relative_subtract (vl->other_rtt, rtt);
+ rel2 = GNUNET_TIME_relative_subtract (rtt, vl->other_rtt);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready %u, other rtt %llu and rtt %llu rel1 %llu rel2 %llu\n",
+ vl->other_sync_ready,
+ vl->other_rtt.rel_value_us,
+ rtt.rel_value_us,
+ rel1.rel_value_us,
+ rel2.rel_value_us);
+ if ((vl->other_rtt.rel_value_us != GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us &&
+ rtt.rel_value_us != GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us) &&
+ rel1.rel_value_us < RTT_DIFF.rel_value_us &&
+ rel2.rel_value_us < RTT_DIFF.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 1\n");
+ if (GNUNET_YES == vl->other_sync_ready && GNUNET_YES == vl->sync_ready)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 2\n");
+ vl->burst_sync = GNUNET_YES;
+ vl->burst_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (rtt, vl->rtt_factor), &start_burst, vl);
+ }
+ else if (GNUNET_NO == vl->other_sync_ready)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 3\n");
+ if (NULL != vl->burst_task)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 4\n");
+ GNUNET_SCHEDULER_cancel (vl->burst_task);
+ vl->burst_task = NULL;
+ }
+ vl->rtt_factor = 4;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 5\n");
+ vl->rtt_factor = 2;
+ }
+ vl->sync_ready = GNUNET_YES;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "other sync ready 6\n");
+ vl->sync_ready = GNUNET_NO;
+ }
+ }
if (0 != ntohl (fc->number_of_addresses))
{
unsigned int number_of_addresses = ntohl (fc->number_of_addresses);