gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit a9e630d6cbc7f777f157a034b9ebf0c26c93fa68
parent e0af01758cb7eec5aea8e942c3d54a71b2c2100b
Author: Florian Dold <florian.dold@gmail.com>
Date:   Mon, 12 Aug 2013 14:34:16 +0000

- listener re-connects transparently
- bugs


Diffstat:
Msrc/include/gnunet_set_service.h | 9++++++---
Msrc/set/gnunet-service-set.c | 25++++++++++++++++++-------
Msrc/set/gnunet-service-set_union.c | 53++++++++++++++++++++++++++++++++++++-----------------
Msrc/set/set.h | 2+-
Msrc/set/set_api.c | 106+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
5 files changed, 146 insertions(+), 49 deletions(-)

diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h @@ -208,8 +208,7 @@ typedef int (*GNUNET_SET_ElementIterator) (void *cls, * @param other_peer the other peer * @param context_msg message with application specific information from * the other peer - * @param request request from the other peer, use GNUNET_SET_accept - * Will be NULL if the listener failed. + * @param request request from the other peer (never NULL), use GNUNET_SET_accept * to accept it, otherwise the request will be refused * Note that we can't just return value from the listen callback, * as it is also necessary to specify the set we want to do the @@ -315,7 +314,9 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, /** - * Wait for set operation requests for the given application id + * Wait for set operation requests for the given application ID. + * If the connection to the set service is lost, the listener is + * re-created transparently with exponential backoff. * * @param cfg configuration to use for connecting to * the set service @@ -336,6 +337,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Cancel the given listen operation. + * After calling cancel, the listen callback for this listen handle + * will not be called again. * * @param lh handle for the listen operation */ diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c @@ -442,8 +442,8 @@ handle_incoming_msg (struct OperationState *op, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n", - ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); - listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); + ntohl (msg->operation), GNUNET_h2s (&msg->app_id)); + listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id); if (NULL == listener) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -477,6 +477,7 @@ handle_client_iterate (void *cls, return; } + GNUNET_SERVER_receive_done (client, GNUNET_OK); set->vt->iterate (set); } @@ -557,21 +558,30 @@ handle_client_listen (void *cls, listener->client = client; listener->client_mq = GNUNET_MQ_queue_for_server_client (client); listener->app_id = msg->app_id; - listener->operation = ntohs (msg->operation); + listener->operation = ntohl (msg->operation); GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", listener->operation, GNUNET_h2s (&listener->app_id)); for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) { - if ( (NULL == incoming->spec) || - (0 != incoming->suggest_id) ) + if (NULL == incoming->spec) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n"); + continue; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n", + incoming->spec->operation, GNUNET_h2s (&incoming->spec->app_id), incoming->suggest_id); + + if (0 != incoming->suggest_id) continue; if (listener->operation != incoming->spec->operation) continue; if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) continue; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n"); incoming_suggest (incoming, listener); } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n"); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -942,8 +952,9 @@ dispatch_p2p_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", ntohs (message->type)); - ret = tc->vt->msg_handler (tc->op, message); + /* FIXME: do this before or after the handler? */ GNUNET_MESH_receive_done (tunnel); + ret = tc->vt->msg_handler (tc->op, message); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", ntohs (message->type)); return ret; @@ -1023,7 +1034,7 @@ main (int argc, char *const *argv) int ret; ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); return (GNUNET_OK == ret) ? 0 : 1; } diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c @@ -147,6 +147,8 @@ struct OperationState /** * Maps IBF-Keys (specific to the current salt) to elements. + * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. + * Colliding IBF-Keys are linked. */ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; @@ -493,7 +495,7 @@ send_operation_request (struct OperationState *eo) GNUNET_SERVER_client_disconnect (eo->spec->set->client); return; } - msg->operation = htons (GNUNET_SET_OPERATION_UNION); + msg->operation = htonl (GNUNET_SET_OPERATION_UNION); msg->app_id = eo->spec->app_id; msg->salt = htonl (eo->spec->salt); GNUNET_MQ_send (eo->mq, ev); @@ -524,7 +526,7 @@ send_operation_request (struct OperationState *eo) * GNUNET_NO if not. */ static int -insert_element_iterator (void *cls, +op_register_element_iterator (void *cls, uint32_t key, void *value) { @@ -549,12 +551,16 @@ insert_element_iterator (void *cls, /** * Insert an element into the union operation's * key-to-element mapping. Takes ownership of 'ee'. + * Note that this does not insert the element in the set, + * only in the operation's key-element mapping. + * This is done to speed up re-tried operations, if some elements + * were transmitted, and then the IBF fails to decode. * * @param eo the union operation * @param ee the element entry */ static void -insert_element (struct OperationState *eo, struct ElementEntry *ee) +op_register_element (struct OperationState *eo, struct ElementEntry *ee) { int ret; struct IBF_Key ibf_key; @@ -566,14 +572,14 @@ insert_element (struct OperationState *eo, struct ElementEntry *ee) k->ibf_key = ibf_key; ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, - insert_element_iterator, k); + op_register_element_iterator, k); /* was the element inserted into a colliding bucket? */ if (GNUNET_SYSERR == ret) return; GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } @@ -623,7 +629,7 @@ init_key_to_element_iterator (void *cls, e->remote = GNUNET_NO; - insert_element (eo, e); + op_register_element (eo, e); return GNUNET_YES; } @@ -861,27 +867,32 @@ decode_and_send (struct OperationState *eo) ibf_destroy (eo->remote_ibf); eo->remote_ibf = NULL; - num_decoded = 0; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); + num_decoded = 0; + last_key.key_val = 0; + while (1) { int res; + int cycle_detected = GNUNET_NO; - if (num_decoded > 0) - last_key = key; + last_key = key; res = ibf_decode (diff_ibf, &side, &key); if (res == GNUNET_OK) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", key.key_val); - num_decoded += 1; - if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", - num_decoded, diff_ibf->size); - if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) || - (num_decoded > 1 && last_key.key_val == key.key_val)) + num_decoded += 1; + if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, diff_ibf->size); + cycle_detected = GNUNET_YES; + } + } + if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) { int next_order; next_order = 0; @@ -922,6 +933,8 @@ decode_and_send (struct OperationState *eo) /* FIXME: before sending the request, check if we may just have the element */ /* FIXME: merge multiple requests */ + /* FIXME: remember somewhere that we already requested the element, + * so that we don't request it again with the next ibf if decoding fails */ ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); @@ -1089,7 +1102,9 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) ee->remote = GNUNET_YES; GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); - insert_element (eo, ee); + /* FIXME: see if the element has already been inserted! */ + + op_register_element (eo, ee); send_client_element (eo, &ee->element); } @@ -1386,6 +1401,8 @@ int union_handle_p2p_message (struct OperationState *eo, const struct GNUNET_MessageHeader *mh) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", + ntohs (mh->type), ntohs (mh->size)); switch (ntohs (mh->type)) { case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: @@ -1490,6 +1507,8 @@ union_iterate (struct Set *set) { struct GNUNET_MQ_Envelope *ev; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u elements\n", + GNUNET_CONTAINER_multihashmap_size (set->state->elements)); GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); GNUNET_MQ_send (set->client_mq, ev); diff --git a/src/set/set.h b/src/set/set.h @@ -59,7 +59,7 @@ struct GNUNET_SET_ListenMessage /** * Operation type, values of enum GNUNET_SET_OperationType */ - uint16_t operation GNUNET_PACKED; + uint32_t operation GNUNET_PACKED; /** * application id diff --git a/src/set/set_api.c b/src/set/set_api.c @@ -169,6 +169,13 @@ struct GNUNET_SET_ListenHandle struct GNUNET_MQ_Handle* mq; /** + * Configuration handle for the listener, stored + * here to be able to reconnect transparently on + * connection failure. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** * Function to call on a new incoming request, * or on error. */ @@ -178,9 +185,30 @@ struct GNUNET_SET_ListenHandle * Closure for listen_cb. */ void *listen_cls; + + /** + * Operation we listen for. + */ + enum GNUNET_SET_OperationType operation; + + /** + * Application ID we listen for. + */ + struct GNUNET_HashCode app_id; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_backoff; }; +/* forward declaration */ +static void +listen_connect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + /** * Handle element for iteration over the set. * @@ -198,7 +226,8 @@ handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh) if (NULL == set->iterator) return; - element.type = htons (mh->type); + element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage); + element.type = htons (msg->element_type); element.data = &msg[1]; set->iterator (set->iterator_cls, &element); } @@ -266,6 +295,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) oh->result_cb (oh->result_cls, &e, result_status); } + /** * Handle request message for a listen operation * @@ -297,9 +327,9 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) amsg->request_id = htonl (0); amsg->accept_reject_id = msg->accept_id; GNUNET_MQ_send (lh->mq, mqm); - GNUNET_free (req); LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n"); } + GNUNET_free (req); LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n"); @@ -313,8 +343,14 @@ handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_SET_ListenHandle *lh = cls; - /* FIXME: why do you do this? */ - lh->listen_cb (lh->listen_cls, NULL, NULL, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n"); + GNUNET_CLIENT_disconnect (lh->client); + lh->client = NULL; + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + + GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh); + lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); } @@ -465,6 +501,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) set->client = NULL; GNUNET_MQ_destroy (set->mq); set->mq = NULL; + GNUNET_free (set); } @@ -514,11 +551,49 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, return oh; } + +/** + * Connect to the set service in order to listen + * for request. + * + * @param cls the listen handle to connect + * @param tc task context if invoked as a task, NULL otherwise + */ +static void +listen_connect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SET_ListenMessage *msg; + struct GNUNET_SET_ListenHandle *lh = cls; + static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, + GNUNET_MQ_HANDLERS_END + }; + + GNUNET_assert (NULL == lh->client); + lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); + if (NULL == lh->client) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "could not connect to set (wrong configuration?), giving up listening\n"); + return; + } + GNUNET_assert (NULL == lh->mq); + lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, + handle_client_listener_error, lh); + mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); + msg->operation = htonl (lh->operation); + msg->app_id = lh->app_id; + GNUNET_MQ_send (lh->mq, mqm); +} + + /** * Wait for set operation requests for the given application id * * @param cfg configuration to use for connecting to - * the set service + * the set service, needs to be valid for the lifetime of the listen handle * @param operation operation we want to listen for * @param app_id id of the application that handles set operation requests * @param listen_cb called for each incoming request matching the operation @@ -534,25 +609,15 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, void *listen_cls) { struct GNUNET_SET_ListenHandle *lh; - struct GNUNET_MQ_Envelope *mqm; - struct GNUNET_SET_ListenMessage *msg; - static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { - {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, - GNUNET_MQ_HANDLERS_END - }; lh = GNUNET_new (struct GNUNET_SET_ListenHandle); - lh->client = GNUNET_CLIENT_connect ("set", cfg); lh->listen_cb = listen_cb; lh->listen_cls = listen_cls; - GNUNET_assert (NULL != lh->client); - lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, - handle_client_listener_error, lh); - mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); - msg->operation = htons (operation); - msg->app_id = *app_id; - GNUNET_MQ_send (lh->mq, mqm); - + lh->cfg = cfg; + lh->operation = operation; + lh->app_id = *app_id; + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + listen_connect (lh, NULL); return lh; } @@ -680,7 +745,6 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, } - /** * Iterate over all elements in the given set. * Note that this operation involves transferring every element of the set