gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit 9602e23163e6d562197b40c361a46eead3276489
parent 016f9c0b2e61c6dab0057a3b4618db5624badf51
Author: Bart Polot <bart@net.in.tum.de>
Date:   Sat, 24 Aug 2013 00:47:43 +0000

- more refactoring client flow control


Diffstat:
Msrc/mesh/gnunet-service-mesh-enc.c | 234++++++++++++++++++++++++++++++++++++++++---------------------------------------
1 file changed, 118 insertions(+), 116 deletions(-)

diff --git a/src/mesh/gnunet-service-mesh-enc.c b/src/mesh/gnunet-service-mesh-enc.c @@ -401,6 +401,9 @@ struct MeshReliableMessage }; +/** + * Info about the traffic state for a client in a channel. + */ struct MeshChannelReliability { /** @@ -431,6 +434,16 @@ struct MeshChannelReliability unsigned int n_recv; /** + * Next MID to use for outgoing traffic. + */ + uint32_t mid_send; + + /** + * Next MID expected for incoming traffic. + */ + uint32_t mid_recv; + + /** * Can we send data to the client? */ int client_ready; @@ -496,26 +509,6 @@ struct MeshChannel enum MeshChannelState state; /** - * Next MID to use for fwd traffic. - */ - uint32_t mid_send_fwd; - - /** - * Next MID expected for fwd traffic. - */ - uint32_t mid_recv_fwd; - - /** - * Next MID to use for bck traffic. - */ - uint32_t mid_send_bck; - - /** - * Next MID expected for bck traffic. - */ - uint32_t mid_recv_bck; - - /** * Is the tunnel bufferless (minimum latency)? */ int nobuffer; @@ -556,13 +549,13 @@ struct MeshChannel * Reliability data. * Only present (non-NULL) at the owner of a tunnel. */ - struct MeshChannelReliability *fwd_rel; + struct MeshChannelReliability *root_rel; /** * Reliability data. * Only present (non-NULL) at the destination of a tunnel. */ - struct MeshChannelReliability *bck_rel; + struct MeshChannelReliability *dest_rel; }; @@ -1492,8 +1485,7 @@ send_local_channel_destroy (struct MeshChannel *ch, int fwd) * @param fwd Set to GNUNET_YES for FWD ACK (dest->owner) */ static void -send_local_ack (struct MeshChannel *ch, - int fwd) +send_local_ack (struct MeshChannel *ch, int fwd) { struct GNUNET_MESH_LocalAck msg; struct MeshChannelReliability *rel; @@ -1518,7 +1510,7 @@ send_local_ack (struct MeshChannel *ch, c->handle, &msg.header, GNUNET_NO); - rel = fwd ? ch->fwd_rel : ch->bck_rel; + rel = fwd ? ch->root_rel : ch->dest_rel; rel->client_ready = GNUNET_YES; } @@ -1861,31 +1853,6 @@ send_connection_ack (struct MeshConnection *connection, int fwd) /** - * Build a hop-by-hop ACK message and queue it to send for the given connection. - * - * @param c Which connection to send the hop-by-hop ACK. - * @param ack Value of the ACK. - * @param fwd Is this a fwd ACK? (will go dest->root) - */ -static void -send_ack (struct MeshConnection *c, uint32_t ack, int fwd) -{ - struct GNUNET_MESH_ACK msg; - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); - msg.ack = htonl (ack); - msg.cid = c->id; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "send %s ack %u on %s\n", - fwd ? "FWD" : "BCK", ack, GNUNET_h2s (&c->id)); - - send_prebuilt_message_connection (&msg.header, c, NULL, !fwd); -} - - -/** * Core callback to write a pre-constructed data packet to core buffer * * @param cls Closure (MeshTransmissionDescriptor with data in "data" member). @@ -2446,13 +2413,13 @@ connection_unlock_queue (struct MeshConnection *c, int fwd) /* FIXME randomize channel selection, not always first channel */ for (ch = t->channel_head; NULL != ch; ch = ch->next) { - rel = fwd ? ch->fwd_rel : ch->bck_rel; + rel = fwd ? ch->root_rel : ch->dest_rel; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " channel %X - %s\n", ch->gid, rel->client_ready ? "ready " : "not ready"); if (GNUNET_NO == rel->client_ready) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not ready!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending local ack!\n"); send_local_ack (ch, fwd); return; /* FIXME authorize all channels? */ } @@ -3021,10 +2988,10 @@ channel_add_client (struct MeshChannel *ch, struct MeshClient *c) return; } - GNUNET_break (NULL == ch->bck_rel); - ch->bck_rel = GNUNET_new (struct MeshChannelReliability); - ch->bck_rel->ch = ch; - ch->bck_rel->expected_delay = MESH_RETRANSMIT_TIME; + GNUNET_break (NULL == ch->dest_rel); + ch->dest_rel = GNUNET_new (struct MeshChannelReliability); + ch->dest_rel->ch = ch; + ch->dest_rel->expected_delay = MESH_RETRANSMIT_TIME; ch->dest = c; } @@ -3132,30 +3099,28 @@ channel_send_data_ack (struct MeshChannel *ch, int fwd) struct MeshReliableMessage *copy; unsigned int delta; uint64_t mask; - uint32_t *mid; uint16_t type; if (GNUNET_NO == ch->reliable) { return; } - rel = fwd ? ch->bck_rel : ch->fwd_rel; - mid = fwd ? &ch->mid_recv_fwd : &ch->mid_recv_bck; + rel = fwd ? ch->dest_rel : ch->root_rel; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack for %u\n", - *mid - 1); + rel->mid_recv - 1); type = GNUNET_MESSAGE_TYPE_MESH_DATA_ACK; msg.header.type = htons (type); msg.header.size = htons (sizeof (msg)); msg.chid = htonl (ch->gid); - msg.mid = htonl (*mid - 1); + msg.mid = htonl (rel->mid_recv - 1); msg.futures = 0; for (copy = rel->head_recv; NULL != copy; copy = copy->next) { if (copy->type != type) continue; - delta = copy->mid - *mid; + delta = copy->mid - rel->mid_recv; if (63 < delta) break; mask = 0x1LL << delta; @@ -3185,6 +3150,7 @@ connection_send_ack (struct MeshConnection *c, int fwd) { struct MeshFlowControl *next_fc; struct MeshFlowControl *prev_fc; + struct GNUNET_MESH_ACK msg; uint32_t ack; int delta; @@ -3220,7 +3186,14 @@ connection_send_ack (struct MeshConnection *c, int fwd) } prev_fc->last_ack_sent = ack; - send_ack (c, ack, fwd); + + /* Build ACK message and send on connection */ + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); + msg.ack = htonl (ack); + msg.cid = c->id; + + send_prebuilt_message_connection (&msg.header, c, NULL, !fwd); } @@ -3291,34 +3264,33 @@ channel_send_client_data (struct MeshChannel *ch, static void channel_send_client_buffered_data (struct MeshChannel *ch, struct MeshClient *c, - struct MeshChannelReliability *rel) + int fwd) { struct MeshReliableMessage *copy; - uint32_t *mid; + struct MeshChannelReliability *rel; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n"); + rel = fwd ? ch->dest_rel : ch->root_rel; if (GNUNET_NO == rel->client_ready) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n"); return; } - mid = rel == ch->bck_rel ? &ch->mid_recv_fwd : &ch->mid_recv_bck; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n"); copy = rel->head_recv; /* We never buffer channel management messages */ if (NULL != copy) { - if (copy->mid == *mid || GNUNET_NO == ch->reliable) + if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable) { struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) &copy[1]; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n", - copy->mid, *mid + 1); - channel_send_client_data (ch, msg, (rel == ch->bck_rel)); + copy->mid, rel->mid_recv + 1); + channel_send_client_data (ch, msg, fwd); rel->n_recv--; - *mid = *mid + 1; + rel->mid_recv++; GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); GNUNET_free (copy); } @@ -3326,7 +3298,7 @@ channel_send_client_buffered_data (struct MeshChannel *ch, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n", - *mid, + rel->mid_recv, copy->mid); return; } @@ -3562,7 +3534,7 @@ channel_retransmit_message (void *cls, * is stalled. */ payload = (struct GNUNET_MESH_Data *) &copy[1]; - fwd = (rel == ch->fwd_rel); + fwd = (rel == ch->root_rel); c = tunnel_get_connection (ch->t, fwd); hop = connection_get_hop (c, fwd); for (q = hop->queue_head; NULL != q; q = q->next) @@ -3581,7 +3553,7 @@ channel_retransmit_message (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid); - send_prebuilt_message_channel (&payload->header, ch, ch->fwd_rel == rel); + send_prebuilt_message_channel (&payload->header, ch, fwd); GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); } else @@ -3619,35 +3591,47 @@ channel_send_client_ack (struct MeshChannel *ch, int fwd) } /* Send ACK (fwd indicates traffic to be ACK'd) to client */ - rel = fwd ? ch->fwd_rel : ch->bck_rel; + rel = fwd ? ch->root_rel : ch->dest_rel; if (GNUNET_NO == rel->client_ready) send_local_ack (ch, fwd); else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client ready\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client was ready\n"); } /** * Send ACK on one or more connections due to buffer space to the client. * + * Iterates all connections of the tunnel and sends ACKs appropriately. + * * @param ch Channel which has some free buffer space. - * @param buffer Buffer space. - * @param fwd Is this in the FWD direction? + * @param fwd Is this in for FWD traffic? (ACK goes dest->root) */ static void -channel_send_connection_ack (struct MeshChannel *ch, uint32_t buffer, int fwd) +channel_send_connection_ack (struct MeshChannel *ch, int fwd) { struct MeshTunnel2 *t = ch->t; struct MeshConnection *c; struct MeshFlowControl *fc; + struct MeshChannelReliability *rel; uint32_t allowed; uint32_t to_allow; unsigned int cs; + uint32_t buffer; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Channel send connection %s ack on %s:%X\n", fwd ? "FWD" : "BCK", peer2s (ch->t->peer), ch->gid); + /* Check */ + rel = fwd ? ch->root_rel : ch->dest_rel; + if (NULL == rel) + { + GNUNET_break (0); + return; + } + buffer = 64 - rel->n_sent; + /* Count connections, how many messages are already allowed */ for (cs = 0, allowed = 0, c = t->connection_head; NULL != c; c = c->next) { @@ -3677,7 +3661,7 @@ channel_send_connection_ack (struct MeshChannel *ch, uint32_t buffer, int fwd) { continue; } - send_ack (c, fc->last_ack_sent + 1, fwd); + connection_send_ack (c, fwd); to_allow--; } @@ -3686,6 +3670,26 @@ channel_send_connection_ack (struct MeshChannel *ch, uint32_t buffer, int fwd) /** + * Send an ACK on the appropriate connection/channel, depending on + * the direction and the position of the peer. + * + * @param c Which connection to send the hop-by-hop ACK. + * @param ch Channel, if any. + * @param fwd Is this a fwd ACK? (will go dest->root) + */ +static void +send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) +{ + if (NULL == ch) + { + connection_send_ack (c, fwd); + return; + } + channel_send_connection_ack (ch, fwd); +} + + +/** * Channel was ACK'd by remote peer, mark as ready and cancel retransmission. * * @param ch Channel to mark as ready. @@ -3698,9 +3702,12 @@ channel_confirm (struct MeshChannel *ch, int fwd) struct MeshReliableMessage *copy; struct MeshReliableMessage *next; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " channel confirm %s:%X\n", + peer2s (ch->t->peer), ch->gid); ch->state = MESH_CHANNEL_READY; - rel = fwd ? ch->fwd_rel : ch->bck_rel; + rel = fwd ? ch->root_rel : ch->dest_rel; for (copy = rel->head_sent; NULL != copy; copy = next) { struct GNUNET_MessageHeader *msg; @@ -3736,8 +3743,8 @@ channel_save_copy (struct MeshChannel *ch, uint16_t type; uint16_t size; - rel = fwd ? ch->fwd_rel : ch->bck_rel; - mid = fwd ? ch->mid_send_fwd : ch->mid_send_bck; + rel = fwd ? ch->root_rel : ch->dest_rel; + mid = rel->mid_send; type = ntohs (msg->type); size = ntohs (msg->size); @@ -4213,8 +4220,8 @@ channel_destroy (struct MeshChannel *ch) } } - channel_rel_free_all (ch->fwd_rel); - channel_rel_free_all (ch->bck_rel); + channel_rel_free_all (ch->root_rel); + channel_rel_free_all (ch->dest_rel); GNUNET_CONTAINER_DLL_remove (ch->t->channel_head, ch->t->channel_tail, ch); GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO); @@ -4878,7 +4885,6 @@ handle_data (struct MeshTunnel2 *t, const struct GNUNET_MESH_Data *msg, int fwd) struct MeshChannel *ch; struct MeshClient *c; uint32_t mid; - uint32_t *mid_recv; uint16_t type; size_t size; @@ -4907,9 +4913,8 @@ handle_data (struct MeshTunnel2 *t, const struct GNUNET_MESH_Data *msg, int fwd) } /* Initialize FWD/BCK data */ - c = fwd ? ch->dest : ch->root; - rel = fwd ? ch->bck_rel : ch->fwd_rel; - mid_recv = fwd ? &ch->mid_recv_fwd : &ch->mid_recv_bck; + c = fwd ? ch->dest : ch->root; + rel = fwd ? ch->dest_rel : ch->root_rel; if (NULL == c) { @@ -4925,19 +4930,18 @@ handle_data (struct MeshTunnel2 *t, const struct GNUNET_MESH_Data *msg, int fwd) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " mid %u\n", mid); if (GNUNET_NO == ch->reliable || - ( !GMC_is_pid_bigger (*mid_recv, mid) && - GMC_is_pid_bigger (*mid_recv + 64, mid) ) ) + ( !GMC_is_pid_bigger (rel->mid_recv, mid) && + GMC_is_pid_bigger (rel->mid_recv + 64, mid) ) ) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! RECV %u\n", mid); if (GNUNET_YES == ch->reliable) { /* Is this the exact next expected messasge? */ - if (mid == *mid_recv) + if (mid == rel->mid_recv) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "as expected\n"); - *mid_recv = *mid_recv + 1; + rel->mid_recv++; channel_send_client_data (ch, msg, fwd); - channel_send_client_buffered_data (ch, c, rel); } else { @@ -4945,8 +4949,11 @@ handle_data (struct MeshTunnel2 *t, const struct GNUNET_MESH_Data *msg, int fwd) channel_rel_add_buffered_data (msg, rel); } } - else /* Tunnel unreliable, send to clients directly */ + else { + /* Tunnel is unreliable: send to clients directly */ + /* FIXME: accept Out Of Order traffic */ + rel->mid_recv = mid + 1; channel_send_client_data (ch, msg, fwd); } } @@ -4955,7 +4962,7 @@ handle_data (struct MeshTunnel2 *t, const struct GNUNET_MESH_Data *msg, int fwd) GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " MID %u not expected (%u - %u), dropping!\n", - mid, *mid_recv, *mid_recv + 64); + mid, rel->mid_recv, rel->mid_recv + 64); } channel_send_data_ack (ch, fwd); @@ -4999,11 +5006,11 @@ handle_data_ack (struct MeshTunnel2 *t, if (GNUNET_YES == fwd) { - rel = ch->fwd_rel; + rel = ch->root_rel; } else { - rel = ch->bck_rel; + rel = ch->dest_rel; } if (NULL == rel) { @@ -5411,12 +5418,7 @@ handle_channel_create (struct MeshTunnel2 *t, channel_add_client (ch, c); if (GNUNET_YES == ch->reliable) - { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n"); - ch->bck_rel = GNUNET_malloc (sizeof (struct MeshChannelReliability)); - ch->bck_rel->ch = ch; - ch->bck_rel->expected_delay = MESH_RETRANSMIT_TIME; - } send_local_channel_create (ch); channel_send_ack (ch, !fwd); @@ -6197,9 +6199,9 @@ handle_local_channel_create (void *cls, struct GNUNET_SERVER_Client *client, channel_set_options (ch, ntohl (msg->opt)); /* In unreliable channels, we'll use the DLL to buffer data for the root */ - ch->fwd_rel = GNUNET_new (struct MeshChannelReliability); - ch->fwd_rel->ch = ch; - ch->fwd_rel->expected_delay = MESH_RETRANSMIT_TIME; + ch->root_rel = GNUNET_new (struct MeshChannelReliability); + ch->root_rel->ch = ch; + ch->root_rel->expected_delay = MESH_RETRANSMIT_TIME; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s[%x]:%u (%x)\n", peer2s (t->peer), ch->gid, ch->port, ch->lid_root); @@ -6367,7 +6369,7 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, return; } - rel = fwd ? ch->fwd_rel : ch->bck_rel; + rel = fwd ? ch->root_rel : ch->dest_rel; rel->client_ready = GNUNET_NO; /* Ok, everything is correct, send the message. */ @@ -6375,12 +6377,10 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_MESH_Data *payload; uint16_t p2p_size = sizeof(struct GNUNET_MESH_Data) + size; unsigned char cbuf[p2p_size]; - uint32_t *mid; - mid = fwd ? &ch->mid_send_fwd : &ch->mid_send_bck; payload = (struct GNUNET_MESH_Data *) cbuf; - payload->mid = htonl (*mid); - *mid = *mid + 1; + payload->mid = htonl (rel->mid_send); + rel->mid_send++; memcpy (&payload[1], &msg[1], size); payload->header.size = htons (p2p_size); payload->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA); @@ -6445,12 +6445,14 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, return; } - fwd = chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV; - rel = fwd ? ch->fwd_rel : ch->bck_rel; + /* If client is root, the ACK is going FWD, therefore this is "BCK". */ + /* If client is dest, the ACK is going BCK, therefore this is "FWD" */ + fwd = chid >= GNUNET_MESH_LOCAL_CHANNEL_ID_SERV; + rel = fwd ? ch->dest_rel : ch->root_rel; rel->client_ready = GNUNET_YES; - channel_send_client_buffered_data (ch, c, rel); - channel_send_connection_ack (ch, 64 - rel->n_recv, fwd); + channel_send_client_buffered_data (ch, c, fwd); + send_ack (NULL, ch, fwd); GNUNET_SERVER_receive_done (client, GNUNET_OK);