gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit 36a9952f00f7b9316d41ce2f970b0a46f3f2fb51
parent 6137b58b4a534e3ac6c10a495f766085da1f43d0
Author: t3sserakt <t3ss@posteo.de>
Date:   Wed, 20 Dec 2023 08:51:23 +0100

Transport: Added cleanup task to remove QueueEntry we got no ACK for.

Diffstat:
Msrc/service/transport/gnunet-service-transport.c | 208+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
1 file changed, 140 insertions(+), 68 deletions(-)

diff --git a/src/service/transport/gnunet-service-transport.c b/src/service/transport/gnunet-service-transport.c @@ -318,6 +318,12 @@ */ #define QUEUE_LENGTH_LIMIT 32 +/** + * + */ +#define QUEUE_ENTRY_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) + GNUNET_NETWORK_STRUCT_BEGIN @@ -1820,6 +1826,11 @@ struct QueueEntry * Message ID used for this message with the queue used for transmission. */ uint64_t mid; + + /** + * Timestamp this QueueEntry was created. + */ + struct GNUNET_TIME_Absolute creation_timestamp; }; @@ -2547,6 +2558,11 @@ struct TransportClient unsigned int total_queue_length; /** + * Task to check for timed out QueueEntry. + */ + struct GNUNET_SCHEDULER_Task *free_queue_entry_task; + + /** * Characteristics of this communicator. */ enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; @@ -4025,6 +4041,8 @@ client_disconnect_cb (void *cls, struct Queue *q; struct AddressListEntry *ale; + if (NULL != tc->details.communicator.free_queue_entry_task) + GNUNET_SCHEDULER_cancel (tc->details.communicator.free_queue_entry_task); while (NULL != (q = tc->details.communicator.queue_head)) free_queue (q); while (NULL != (ale = tc->details.communicator.addr_head)) @@ -4482,6 +4500,38 @@ sign_ephemeral (struct DistanceVector *dv) } +static void +free_queue_entry (struct QueueEntry *qe, + struct TransportClient *tc); + + +static void +free_timedout_queue_entry (void *cls) +{ + struct TransportClient *tc = cls; + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + + for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue; + queue = queue->next_client) + { + for (struct QueueEntry *qep = queue->queue_head; NULL != qep; + qep = qep->next) + { + struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference (qep->creation_timestamp, now); + if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Freeing timed out QueueEntry with MID %" PRIu64 + " and QID %u\n", + qep->mid, + queue->qid); + free_queue_entry(qep, tc); + } + } + } +} + + /** * Send the message @a payload on @a queue. * @@ -4522,6 +4572,7 @@ queue_send_msg (struct Queue *queue, struct QueueEntry *qe; qe = GNUNET_new (struct QueueEntry); + qe->creation_timestamp = GNUNET_TIME_absolute_get (); qe->mid = queue->mid_gen; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Create QueueEntry with MID %" PRIu64 @@ -4552,11 +4603,14 @@ queue_send_msg (struct Queue *queue, { // Messages without FC or fragments can get here. if (NULL != pm) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message %" PRIu64 " (pm type %u) was not send because queue has no capacity.\n", pm->logging_uuid, pm->pmt); + pm->qe = NULL; + } GNUNET_free (env); GNUNET_free (qe); return; @@ -4579,6 +4633,15 @@ queue_send_msg (struct Queue *queue, if (0 == queue->q_capacity) queue->idle = GNUNET_NO; + if (GNUNET_NO == queue->idle) + { + struct TransportClient *tc = queue->tc; + + if (NULL == tc->details.communicator.free_queue_entry_task) + tc->details.communicator.free_queue_entry_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &free_timedout_queue_entry, + tc); + } if (NULL != pm && NULL != (pa = pm->pa_head)) { while (pm != pa->pm) @@ -10368,77 +10431,12 @@ handle_del_queue_message (void *cls, } -/** - * Message was transmitted. Process the request. - * - * @param cls the client - * @param sma the send message that was sent - */ static void -handle_send_message_ack (void *cls, - const struct GNUNET_TRANSPORT_SendMessageToAck *sma) +free_queue_entry (struct QueueEntry *qe, + struct TransportClient *tc) { - struct TransportClient *tc = cls; - struct QueueEntry *qe; struct PendingMessage *pm; - if (CT_COMMUNICATOR != tc->type) - { - GNUNET_break (0); - GNUNET_SERVICE_client_drop (tc->client); - return; - } - - /* find our queue entry matching the ACK */ - qe = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Looking for queue for PID %s\n", - GNUNET_i2s (&sma->receiver)); - for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue; - queue = queue->next_client) - { - if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver)) - continue; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found PID %s\n", - GNUNET_i2s (&queue->neighbour->pid)); - - - for (struct QueueEntry *qep = queue->queue_head; NULL != qep; - qep = qep->next) - { - if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl ( - sma->qid)) - continue; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %" - PRIu64 " Ack QID %u\n", - qep->mid, - queue->qid, - GNUNET_ntohll (sma->mid), - ntohl (sma->qid)); - qe = qep; - if ((NULL != qe->pm) && (qe->pm->qe != qe)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "For pending message %" PRIu64 " we had retransmissions.\n", - qe->pm->logging_uuid); - break; - } - } - if (NULL == qe) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n", - GNUNET_ntohll (sma->mid), - ntohl (sma->qid)); - // TODO I guess this can happen, if the Ack from the peer comes before the Ack from the queue. - // Update: Maybe QueueEntry was accidentally freed during freeing PendingMessage. - /* this should never happen */ - // GNUNET_break (0); - // GNUNET_SERVICE_client_drop (tc->client); - GNUNET_SERVICE_client_continue (tc->client); - return; - } GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head, qe->queue->queue_tail, qe); @@ -10451,7 +10449,6 @@ handle_send_message_ack (void *cls, GNUNET_i2s (&qe->queue->neighbour->pid), qe->queue->queue_length, tc->details.communicator.total_queue_length); - GNUNET_SERVICE_client_continue (tc->client); /* if applicable, resume transmissions that waited on ACK */ if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == @@ -10526,6 +10523,81 @@ handle_send_message_ack (void *cls, /** + * Message was transmitted. Process the request. + * + * @param cls the client + * @param sma the send message that was sent + */ +static void +handle_send_message_ack (void *cls, + const struct GNUNET_TRANSPORT_SendMessageToAck *sma) +{ + struct TransportClient *tc = cls; + struct QueueEntry *qe; + + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + + /* find our queue entry matching the ACK */ + qe = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Looking for queue for PID %s\n", + GNUNET_i2s (&sma->receiver)); + for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue; + queue = queue->next_client) + { + if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver)) + continue; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found PID %s\n", + GNUNET_i2s (&queue->neighbour->pid)); + + + for (struct QueueEntry *qep = queue->queue_head; NULL != qep; + qep = qep->next) + { + if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl ( + sma->qid)) + continue; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %" + PRIu64 " Ack QID %u\n", + qep->mid, + queue->qid, + GNUNET_ntohll (sma->mid), + ntohl (sma->qid)); + qe = qep; + if ((NULL != qe->pm) && (qe->pm->qe != qe)) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "For pending message %" PRIu64 " we had retransmissions.\n", + qe->pm->logging_uuid); + break; + } + } + if (NULL == qe) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n", + GNUNET_ntohll (sma->mid), + ntohl (sma->qid)); + // TODO I guess this can happen, if the Ack from the peer comes before the Ack from the queue. + // Update: Maybe QueueEntry was accidentally freed during freeing PendingMessage. + /* this should never happen */ + // GNUNET_break (0); + // GNUNET_SERVICE_client_drop (tc->client); + GNUNET_SERVICE_client_continue (tc->client); + return; + } + free_queue_entry (qe, tc); + GNUNET_SERVICE_client_continue (tc->client); +} + + +/** * Iterator telling new MONITOR client about all existing * queues to peers. *