gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit ca391c48238c36462ca11be9299cc7c9a09e6bbe
parent bc524a2441e4e3ebf697b22e32ca9f1ed6d742b2
Author: ng0 <ng0@n0.is>
Date:   Sun, 31 Dec 2017 12:17:15 +0000

Merge branch 'master' of gnunet.org:gnunet

Diffstat:
Msrc/cadet/gnunet-service-cadet_tunnels.c | 4+++-
Msrc/conversation/gnunet-helper-audio-playback-gst.c | 5+++++
Msrc/conversation/gnunet-helper-audio-playback.c | 5+++++
Msrc/conversation/gnunet_gst.c | 5+++++
Msrc/conversation/microphone.c | 4+++-
Msrc/core/gnunet-service-core.c | 3+++
Msrc/core/gnunet-service-core_kx.c | 3+++
Msrc/fs/fs_dirmetascan.c | 3+++
Msrc/fs/test_fs_download_persistence.c | 1-
Msrc/fs/test_fs_publish_persistence.c | 1-
Msrc/include/gnunet_common.h | 15++++++++++++---
Msrc/include/gnunet_mst_lib.h | 4+++-
Msrc/include/gnunet_network_lib.h | 6+++---
Msrc/include/gnunet_protocols.h | 30+++++++++++++++++++-----------
Msrc/include/gnunet_scheduler_lib.h | 44++++++++++++++++++++++++++------------------
Msrc/include/gnunet_service_lib.h | 7++++++-
Msrc/multicast/gnunet-service-multicast.c | 248++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Msrc/multicast/multicast_api.c | 51++++++++++++++++++++++++---------------------------
Msrc/multicast/test_multicast_multipeer.c | 312++++++++++++++++++++++++++++++++++++++++---------------------------------------
Msrc/psyc/gnunet-service-psyc.c | 131++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Msrc/psyc/psyc_api.c | 103++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Msrc/psyc/test_psyc.c | 28+++++++++++++++++++++-------
Msrc/psyc/test_psyc.conf | 16++++++++++++++++
Msrc/psycstore/psycstore_api.c | 10+++++-----
Msrc/social/gnunet-service-social.c | 774++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/social/gnunet-social.c | 26+++++++++++++-------------
Msrc/social/social_api.c | 292++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Msrc/social/test_social.c | 268++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Msrc/social/test_social.conf | 19+++++++++++++++++++
Msrc/statistics/gnunet-service-statistics.c | 4+++-
Msrc/testbed/gnunet-helper-testbed.c | 7++++---
Msrc/transport/gnunet-helper-transport-wlan-dummy.c | 6++++++
Msrc/transport/plugin_transport_http_server.c | 2+-
Msrc/util/client.c | 41+++++++++++++++++++++++++++++++++++------
Msrc/util/mq.c | 22++++++++++++++++++++++
Msrc/util/mst.c | 27++++++++++++++++++++++-----
Msrc/util/network.c | 6+++---
Msrc/util/program.c | 12+++++++++++-
Msrc/util/scheduler.c | 1547++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Msrc/util/service.c | 31++++++++++++++++++++++++++++++-
Msrc/vpn/gnunet-service-vpn.c | 3+++
41 files changed, 2451 insertions(+), 1675 deletions(-)

diff --git a/src/cadet/gnunet-service-cadet_tunnels.c b/src/cadet/gnunet-service-cadet_tunnels.c @@ -2856,7 +2856,9 @@ handle_plaintext_channel_destroy (void *cls, * * @param cls the `struct CadetTunnel` that got the message * @param msg the message - * @return #GNUNET_OK (continue to process) + * @return #GNUNET_OK on success (always) + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int handle_decrypted (void *cls, diff --git a/src/conversation/gnunet-helper-audio-playback-gst.c b/src/conversation/gnunet-helper-audio-playback-gst.c @@ -221,6 +221,11 @@ feed_buffer_to_gst (const char *audio, size_t b_len) /** * Message callback + * + * @param msg message we received. + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing due to disconnect (no error) + * #GNUNET_SYSERR to stop further processing due to error */ static int stdin_receiver (void *cls, diff --git a/src/conversation/gnunet-helper-audio-playback.c b/src/conversation/gnunet-helper-audio-playback.c @@ -546,6 +546,11 @@ ogg_demux_and_decode () /** * Message callback + * + * @param msg message we received. + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing due to disconnect (no error) + * #GNUNET_SYSERR to stop further processing due to error */ static int stdin_receiver (void *cls, diff --git a/src/conversation/gnunet_gst.c b/src/conversation/gnunet_gst.c @@ -649,6 +649,11 @@ gnunet_read (GNUNET_gstData * d) /** * Message callback + * + * @param msg message we received. + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing due to disconnect (no error) + * #GNUNET_SYSERR to stop further processing due to error */ static int stdin_receiver (void *cls, diff --git a/src/conversation/microphone.c b/src/conversation/microphone.c @@ -65,7 +65,9 @@ struct Microphone * * @param cls clsoure with our `struct Microphone` * @param msg the message from the helper - * @return #GNUNET_OK on success, #GNUNET_SYSERR on error + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int process_record_messages (void *cls, diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c @@ -426,6 +426,9 @@ struct TokenizerContext * * @param cls reservation request (`struct TokenizerContext`) * @param message the actual message + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int tokenized_cb (void *cls, diff --git a/src/core/gnunet-service-core_kx.c b/src/core/gnunet-service-core_kx.c @@ -708,6 +708,9 @@ setup_fresh_ping (struct GSC_KeyExchangeInfo *kx) * * @param cls the `struct GSC_KeyExchangeInfo` * @param m the message + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int deliver_message (void *cls, diff --git a/src/fs/fs_dirmetascan.c b/src/fs/fs_dirmetascan.c @@ -246,6 +246,9 @@ finish_scan (void *cls) * * @param cls the closure (directory scanner object) * @param msg message from the helper process + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int process_helper_msgs (void *cls, diff --git a/src/fs/test_fs_download_persistence.c b/src/fs/test_fs_download_persistence.c @@ -179,7 +179,6 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *event) GNUNET_FS_DOWNLOAD_OPTION_NONE, "download", NULL); break; case GNUNET_FS_STATUS_DOWNLOAD_COMPLETED: - consider_restart (event->status); printf ("Download complete, %llu kbps.\n", (unsigned long long) (FILESIZE * 1000000LL / (1 + diff --git a/src/fs/test_fs_publish_persistence.c b/src/fs/test_fs_publish_persistence.c @@ -134,7 +134,6 @@ progress_cb (void *cls, switch (event->status) { case GNUNET_FS_STATUS_PUBLISH_COMPLETED: - consider_restart (event->status); ret = event->value.publish.cctx; printf ("Publish complete, %llu kbps.\n", (unsigned long long) (FILESIZE * 1000000LL / diff --git a/src/include/gnunet_common.h b/src/include/gnunet_common.h @@ -988,7 +988,8 @@ GNUNET_ntoh_double (double d); * arr is important since size is the number of elements and * not the size in bytes * @param size the number of elements in the existing vector (number - * of elements to copy over) + * of elements to copy over), will be updated with the new + * array size * @param tsize the target size for the resulting vector, use 0 to * free the vector (then, arr will be NULL afterwards). */ @@ -996,8 +997,16 @@ GNUNET_ntoh_double (double d); /** * @ingroup memory - * Append an element to a list (growing the - * list by one). + * Append an element to a list (growing the list by one). + * + * @param arr base-pointer of the vector, may be NULL if size is 0; + * will be updated to reflect the new address. The TYPE of + * arr is important since size is the number of elements and + * not the size in bytes + * @param size the number of elements in the existing vector (number + * of elements to copy over), will be updated with the new + * array size + * @param element the element that will be appended to the array */ #define GNUNET_array_append(arr,size,element) do { GNUNET_array_grow(arr,size,size+1); arr[size-1] = element; } while(0) diff --git a/src/include/gnunet_mst_lib.h b/src/include/gnunet_mst_lib.h @@ -61,7 +61,9 @@ struct GNUNET_MessageStreamTokenizer; * * @param cls closure * @param message the actual message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing due to disconnect (no error) + * #GNUNET_SYSERR to stop further processing due to error */ typedef int (*GNUNET_MessageTokenizerCallback) (void *cls, diff --git a/src/include/gnunet_network_lib.h b/src/include/gnunet_network_lib.h @@ -464,7 +464,7 @@ GNUNET_NETWORK_fdset_copy (struct GNUNET_NETWORK_FDSet *to, * @return POSIX file descriptor */ int -GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_fd (const struct GNUNET_NETWORK_Handle *desc); /** @@ -474,7 +474,7 @@ GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc); * @return POSIX file descriptor */ struct sockaddr* -GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_addr (const struct GNUNET_NETWORK_Handle *desc); /** @@ -484,7 +484,7 @@ GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc); * @return socklen_t for sockaddr */ socklen_t -GNUNET_NETWORK_get_addrlen (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_addrlen (const struct GNUNET_NETWORK_Handle *desc); /** diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h @@ -2067,7 +2067,11 @@ extern "C" /** S->C: slave join acknowledgement */ #define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK 684 -/* 685-686 */ +/** C->S: request to part from a channel */ +#define GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST 685 + +/** S->C: acknowledgement that a slave of master parted from a channel */ +#define GNUNET_MESSAGE_TYPE_PSYC_PART_ACK 686 /** M->S->C: incoming join request from multicast */ #define GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST 687 @@ -2258,6 +2262,7 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 755 +// FIXME: this is never used! /** * Group terminated. */ @@ -2398,35 +2403,38 @@ extern "C" /** C->S: request to leave a place */ #define GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE 848 +/** S->C: place leave acknowledgement */ +#define GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE_ACK 849 + /** C->S: add place to GNS zone */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE 849 +#define GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE 850 /** C->S: add nym to GNS zone */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM 850 +#define GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM 851 /** C->S: connect application */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT 851 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT 852 /** C->S: detach a place from application */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH 852 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH 853 /** S->C: notify about an existing ego */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO 853 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO 854 /** S->C: end of ego list */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END 854 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END 855 /** S->C: notify about an existing place */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE 855 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE 856 /** S->C: end of place list */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END 856 +#define GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END 857 /** C->S: set message processing flags */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET 860 +#define GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET 858 /** C->S: clear message processing flags */ -#define GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR 861 +#define GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR 859 /******************************************************************************* * X-VINE DHT messages diff --git a/src/include/gnunet_scheduler_lib.h b/src/include/gnunet_scheduler_lib.h @@ -152,14 +152,14 @@ struct GNUNET_SCHEDULER_FdInfo * NULL if this is about a file handle or if no network * handle was given to the scheduler originally. */ - struct GNUNET_NETWORK_Handle *fd; + const struct GNUNET_NETWORK_Handle *fd; /** * GNUnet file handle the event is about, matches @a sock, * NULL if this is about a network socket or if no network * handle was given to the scheduler originally. */ - struct GNUNET_DISK_FileHandle *fh; + const struct GNUNET_DISK_FileHandle *fh; /** * Type of the event that was generated related to @e sock. @@ -216,17 +216,18 @@ struct GNUNET_SCHEDULER_TaskContext /** * Function used by event-loop implementations to signal the scheduler - * that a particular @a task is ready due to an event of type @a et. + * that a particular @a task is ready due to an event specified in the + * et field of @a fdi. * * This function will then queue the task to notify the application * that the task is ready (with the respective priority). * * @param task the task that is ready - * @param et information about why the task is ready + * @param fdi information about the related FD */ void GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, - enum GNUNET_SCHEDULER_EventType et); + struct GNUNET_SCHEDULER_FdInfo *fdi); /** @@ -241,15 +242,16 @@ struct GNUNET_SCHEDULER_Handle; * there are tasks left to run just to give other tasks a chance as * well. If we return #GNUNET_YES, the driver should call this * function again as soon as possible, while if we return #GNUNET_NO - * it must block until the operating system has more work as the - * scheduler has no more work to do right now. + * it must block until either the operating system has more work (the + * scheduler has no more work to do right now) or the timeout set by + * the scheduler (using the set_wakeup callback) is reached. * * @param sh scheduler handle that was given to the `loop` * @return #GNUNET_OK if there are more tasks that are ready, * and thus we would like to run more (yield to avoid * blocking other activities for too long) * #GNUNET_NO if we are done running tasks (yield to block) - * #GNUNET_SYSERR on error + * #GNUNET_SYSERR on error, e.g. no tasks were ready */ int GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh); @@ -268,8 +270,11 @@ struct GNUNET_SCHEDULER_Driver void *cls; /** - * Add a @a task to be run if the conditions given - * in @a fdi are satisfied. + * Add a @a task to be run if the conditions specified in the + * et field of the given @a fdi are satisfied. The et field will + * be cleared after this call and the driver is expected to set + * the type of the actual event before passing @a fdi to + * #GNUNET_SCHEDULER_task_ready. * * @param cls closure * @param task task to add @@ -280,21 +285,21 @@ struct GNUNET_SCHEDULER_Driver int (*add)(void *cls, struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_SCHEDULER_FdInfo *fdi); + struct GNUNET_SCHEDULER_FdInfo *fdi); /** - * Delete a @a task from the set of tasks to be run. + * Delete a @a task from the set of tasks to be run. A task may + * comprise multiple FdInfo entries previously added with the add + * function. The driver is expected to delete them all. * * @param cls closure * @param task task to delete - * @param fdi conditions to watch for (must match @e add call) * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure - * (i.e. @a task or @a fdi do not match prior @e add call) + * (i.e. @a task does not match prior @e add call) */ int (*del)(void *cls, - struct GNUNET_SCHEDULER_Task *task, - const struct GNUNET_SCHEDULER_FdInfo *fdi); + struct GNUNET_SCHEDULER_Task *task); /** * Set time at which we definitively want to get a wakeup call. @@ -309,7 +314,10 @@ struct GNUNET_SCHEDULER_Driver /** * Event loop's "main" function, to be called from * #GNUNET_SCHEDULER_run_with_driver() to actually - * launch the loop. + * launch the loop. The loop should run as long as + * tasks (added by the add callback) are available + * OR the wakeup time (added by the set_wakeup + * callback) is not FOREVER. * * @param cls closure * @param sh scheduler handle to pass to @@ -359,7 +367,7 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select (void); diff --git a/src/include/gnunet_service_lib.h b/src/include/gnunet_service_lib.h @@ -366,11 +366,16 @@ GNUNET_SERVICE_client_disable_continue_warning (struct GNUNET_SERVICE_Client *c) /** * Ask the server to disconnect from the given client. This is the * same as returning #GNUNET_SYSERR within the check procedure when - * handling a message, wexcept that it allows dropping of a client even + * handling a message, except that it allows dropping of a client even * when not handling a message from that client. The `disconnect_cb` * will be called on @a c even if the application closes the connection * using this function. * + * This function should be called (outside of util's internal logic) + * if (and usually only if) the client has violated the + * protocol. Otherwise, we should leave it to the client to disconnect + * from the service. + * * @param c client to disconnect now */ void diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c @@ -137,6 +137,7 @@ struct Channel */ struct GNUNET_CADET_Channel *channel; + // FIXME: not used /** * CADET transmission handle. */ @@ -228,7 +229,7 @@ struct Group /** * Is the client disconnected? #GNUNET_YES or #GNUNET_NO */ - uint8_t disconnected; + uint8_t is_disconnected; /** * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)? @@ -365,6 +366,8 @@ client_send_join_decision (struct Member *mem, static void shutdown_task (void *cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutting down\n"); if (NULL != cadet) { GNUNET_CADET_disconnect (cadet); @@ -420,6 +423,11 @@ cleanup_member (struct Member *mem) GNUNET_free (mem->join_dcsn); mem->join_dcsn = NULL; } + if (NULL != mem->origin_channel) + { + GNUNET_CADET_channel_destroy (mem->origin_channel->channel); + mem->origin_channel = NULL; + } GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); GNUNET_free (mem); } @@ -553,36 +561,47 @@ client_send (struct GNUNET_SERVICE_Client *client, * Send message to all clients connected to the group. */ static void -client_send_group (const struct Group *grp, - const struct GNUNET_MessageHeader *msg) +client_send_group_keep_envelope (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "%p Sending message to all clients of the group.\n", grp); + struct ClientList *cli = grp->clients_head; - struct ClientList *cl = grp->clients_head; - while (NULL != cl) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to all clients of the group.\n", + grp); + while (NULL != cli) { - struct GNUNET_MQ_Envelope * - env = GNUNET_MQ_msg_copy (msg); - - GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client), - env); - cl = cl->next; + GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client), + env); + cli = cli->next; } } /** + * Send message to all clients connected to the group and + * takes care of freeing @env. + */ +static void +client_send_group (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) +{ + client_send_group_keep_envelope (grp, env); + GNUNET_MQ_discard (env); +} + + +/** * Iterator callback for sending a message to origin clients. */ static int client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) { - const struct GNUNET_MessageHeader *msg = cls; + struct GNUNET_MQ_Envelope *env = cls; struct Member *orig = origin; - client_send_group (&orig->group, msg); + client_send_group_keep_envelope (&orig->group, env); return GNUNET_YES; } @@ -594,12 +613,12 @@ static int client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *member) { - const struct GNUNET_MessageHeader *msg = cls; + struct GNUNET_MQ_Envelope *env = cls; struct Member *mem = member; if (NULL != mem->join_dcsn) { /* Only send message to admitted members */ - client_send_group (&mem->group, msg); + client_send_group_keep_envelope (&mem->group, env); } return GNUNET_YES; } @@ -615,15 +634,16 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, */ static int client_send_all (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, client_send_origin_cb, - (void *) msg); + (void *) env); n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, client_send_member_cb, - (void *) msg); + (void *) env); + GNUNET_MQ_discard (env); return n; } @@ -636,14 +656,14 @@ client_send_all (struct GNUNET_HashCode *pub_key_hash, */ static int client_send_random (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb, - (void *) msg); + (void *) env); if (n <= 0) n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb, - (void *) msg); + (void *) env); return n; } @@ -658,12 +678,12 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash, */ static int client_send_origin (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, client_send_origin_cb, - (void *) msg); + (void *) env); return n; } @@ -677,17 +697,12 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash, static void client_send_ack (struct GNUNET_HashCode *pub_key_hash) { + struct GNUNET_MQ_Envelope *env; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending message ACK to client.\n"); - - static struct GNUNET_MessageHeader *msg = NULL; - if (NULL == msg) - { - msg = GNUNET_malloc (sizeof (*msg)); - msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); - msg->size = htons (sizeof (*msg)); - } - client_send_all (pub_key_hash, msg); + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); + client_send_all (pub_key_hash, env); } @@ -983,7 +998,8 @@ handle_cadet_join_request (void *cls, chn->peer = req->peer; chn->join_status = JOIN_WAITING; - client_send_all (&group_pub_hash, &req->header); + client_send_all (&group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); } @@ -1102,7 +1118,8 @@ handle_cadet_message (void *cls, { struct Channel *chn = cls; GNUNET_CADET_receive_done (chn->channel); - client_send_all (&chn->group_pub_hash, &msg->header); + client_send_all (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&msg->header)); } @@ -1153,30 +1170,32 @@ handle_cadet_request (void *cls, { struct Channel *chn = cls; GNUNET_CADET_receive_done (chn->channel); - client_send_origin (&chn->group_pub_hash, &req->header); + client_send_origin (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); } -static int -check_cadet_replay_request (void *cls, - const struct MulticastReplayRequestMessage *req) -{ - uint16_t size = ntohs (req->header.size); - if (size < sizeof (*req)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - struct Channel *chn = cls; - if (NULL == chn) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - return GNUNET_OK; -} +// FIXME: do checks in handle_cadet_replay_request +//static int +//check_cadet_replay_request (void *cls, +// const struct MulticastReplayRequestMessage *req) +//{ +// uint16_t size = ntohs (req->header.size); +// if (size < sizeof (*req)) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// struct Channel *chn = cls; +// if (NULL == chn) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// return GNUNET_OK; +//} /** @@ -1187,6 +1206,7 @@ handle_cadet_replay_request (void *cls, const struct MulticastReplayRequestMessage *req) { struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); struct MulticastReplayRequestMessage rep = *req; @@ -1203,12 +1223,16 @@ handle_cadet_replay_request (void *cls, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } struct GNUNET_HashCode key_hash; - replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, - rep.flags, &key_hash); + replay_key_hash (rep.fragment_id, + rep.message_id, + rep.fragment_offset, + rep.flags, + &key_hash); GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - client_send_random (&chn->group_pub_hash, &rep.header); + client_send_random (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&rep.header)); } @@ -1290,10 +1314,10 @@ cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) struct MulticastJoinDecisionMessageHeader, chn), - GNUNET_MQ_hd_var_size (cadet_replay_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - struct MulticastReplayRequestMessage, - chn), + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + chn), GNUNET_MQ_hd_var_size (cadet_replay_response, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, @@ -1357,6 +1381,7 @@ handle_client_origin_start (void *cls, grp->is_origin = GNUNET_YES; grp->pub_key = pub_key; grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); @@ -1379,10 +1404,10 @@ handle_client_origin_start (void *cls, struct MulticastJoinRequestMessage, grp), - GNUNET_MQ_hd_var_size (cadet_replay_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - struct MulticastReplayRequestMessage, - grp), + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), GNUNET_MQ_hd_var_size (cadet_replay_response, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, @@ -1484,6 +1509,7 @@ handle_client_member_join (void *cls, grp->is_origin = GNUNET_NO; grp->pub_key = msg->group_pub_key; grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; group_set_cadet_port_hash (grp); if (NULL == grp_mem) @@ -1494,7 +1520,8 @@ handle_client_member_join (void *cls, } GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - + + // FIXME: should the members hash map have option UNIQUE_FAST? GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } @@ -1509,10 +1536,11 @@ handle_client_member_join (void *cls, char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client connected to group %s as member %s (%s).\n", + "Client connected to group %s as member %s (%s). size = %d\n", GNUNET_h2s (&grp->pub_key_hash), GNUNET_h2s2 (&mem->pub_key_hash), - str); + str, + GNUNET_CONTAINER_multihashmap_size (members)); GNUNET_free (str); if (NULL != mem->join_dcsn) @@ -1567,7 +1595,9 @@ handle_client_member_join (void *cls, GNUNET_free (mem->join_req); mem->join_req = req; - if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&mem->join_req->header))) { /* No local origins, send to remote origin */ cadet_send_join_request (mem); } @@ -1580,7 +1610,7 @@ static void client_send_join_decision (struct Member *mem, const struct MulticastJoinDecisionMessageHeader *hdcsn) { - client_send_group (&mem->group, &hdcsn->header); + client_send_group (&mem->group, GNUNET_MQ_msg_copy (&hdcsn->header)); const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; @@ -1621,8 +1651,9 @@ handle_client_join_decision (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Got join decision from client for group %s..\n", + "%p got join decision from client for group %s..\n", grp, GNUNET_h2s (&grp->pub_key_hash)); struct GNUNET_CONTAINER_MultiHashMap * @@ -1652,6 +1683,32 @@ handle_client_join_decision (void *cls, } +static void +handle_client_part_request (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + struct GNUNET_MQ_Envelope *env; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p got part request from client for group %s.\n", + grp, GNUNET_h2s (&grp->pub_key_hash)); + grp->is_disconnected = GNUNET_YES; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK); + client_send_group (grp, env); + GNUNET_SERVICE_client_continue (client); +} + + static int check_client_multicast_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) @@ -1667,6 +1724,7 @@ static void handle_client_multicast_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) { + // FIXME: what if GNUNET_YES == grp->is_disconnected? Do we allow sending messages? struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Group *grp = c->group; @@ -1680,6 +1738,7 @@ handle_client_multicast_message (void *cls, GNUNET_assert (GNUNET_YES == grp->is_origin); struct Origin *orig = grp->origin; + // FIXME: use GNUNET_MQ_msg_copy /* FIXME: yucky, should use separate message structs for P2P and CS! */ struct GNUNET_MULTICAST_MessageHeader * out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header); @@ -1696,7 +1755,7 @@ handle_client_multicast_message (void *cls, GNUNET_assert (0); } - client_send_all (&grp->pub_key_hash, &out->header); + client_send_all (&grp->pub_key_hash, GNUNET_MQ_msg_copy (&out->header)); cadet_send_children (&grp->pub_key_hash, &out->header); client_send_ack (&grp->pub_key_hash); GNUNET_free (out); @@ -1730,6 +1789,7 @@ handle_client_multicast_request (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_assert (GNUNET_NO == grp->is_origin); struct Member *mem = grp->member; @@ -1751,7 +1811,9 @@ handle_client_multicast_request (void *cls, } uint8_t send_ack = GNUNET_YES; - if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&out->header))) { /* No local origins, send to remote origin */ if (NULL != mem->origin_channel) { @@ -1792,6 +1854,7 @@ handle_client_replay_request (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_assert (GNUNET_NO == grp->is_origin); struct Member *mem = grp->member; @@ -1812,7 +1875,9 @@ handle_client_replay_request (void *cls, GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (0 == client_send_origin (&grp->pub_key_hash, &rep->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&rep->header))) { /* No local origin, replay from remote members / origin. */ if (NULL != mem->origin_channel) { @@ -1821,6 +1886,7 @@ handle_client_replay_request (void *cls, else { /* FIXME: not yet connected to origin */ + GNUNET_assert (0); GNUNET_SERVICE_client_drop (client); return; } @@ -1880,6 +1946,7 @@ handle_client_replay_response_end (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); struct GNUNET_HashCode key_hash; replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, @@ -1939,6 +2006,7 @@ handle_client_replay_response (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); const struct GNUNET_MessageHeader *msg = &res->header; if (GNUNET_MULTICAST_REC_OK == res->error_code) @@ -2033,9 +2101,14 @@ client_notify_disconnect (void *cls, grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", GNUNET_h2s (&grp->pub_key_hash)); + // FIXME (due to protocol change): here we must not remove all clients, + // only the one we were notified about! struct ClientList *cl = grp->clients_head; while (NULL != cl) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating clients for group %p\n", + grp); if (cl->client == client) { GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); @@ -2049,16 +2122,7 @@ client_notify_disconnect (void *cls, if (NULL == grp->clients_head) { /* Last client disconnected. */ -#if FIXME - if (NULL != grp->tmit_head) - { /* Send pending messages via CADET before cleanup. */ - transmit_message (grp); - } - else -#endif - { - cleanup_group (grp); - } + cleanup_group (grp); } } @@ -2103,9 +2167,9 @@ run (void *cls, GNUNET_SERVICE_MAIN ("multicast", GNUNET_SERVICE_OPTION_NONE, - run, - client_notify_connect, - client_notify_disconnect, + &run, + &client_notify_connect, + &client_notify_disconnect, NULL, GNUNET_MQ_hd_fixed_size (client_origin_start, GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, @@ -2119,6 +2183,10 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, struct MulticastJoinDecisionMessageHeader, NULL), + GNUNET_MQ_hd_fixed_size (client_part_request, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST, + struct GNUNET_MessageHeader, + NULL), GNUNET_MQ_hd_var_size (client_multicast_message, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, struct GNUNET_MULTICAST_MessageHeader, diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c @@ -542,31 +542,12 @@ group_cleanup (struct GNUNET_MULTICAST_Group *grp) static void -group_disconnect (struct GNUNET_MULTICAST_Group *grp, - GNUNET_ContinuationCallback cb, - void *cls) +handle_group_part_ack (void *cls, + const struct GNUNET_MessageHeader *msg) { - grp->is_disconnecting = GNUNET_YES; - grp->disconnect_cb = cb; - grp->disconnect_cls = cls; + struct GNUNET_MULTICAST_Group *grp = cls; - if (NULL != grp->mq) - { - struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq); - if (NULL != last) - { - GNUNET_MQ_notify_sent (last, - (GNUNET_SCHEDULER_TaskCallback) group_cleanup, grp); - } - else - { - group_cleanup (grp); - } - } - else - { - group_cleanup (grp); - } + group_cleanup (grp); } @@ -779,6 +760,10 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig) GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, struct MulticastJoinRequestMessage, grp), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), GNUNET_MQ_hd_fixed_size (group_replay_request, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, struct MulticastReplayRequestMessage, @@ -879,8 +864,13 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, void *stop_cls) { struct GNUNET_MULTICAST_Group *grp = &orig->grp; + struct GNUNET_MQ_Envelope *env; - group_disconnect (grp, stop_cb, stop_cls); + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = stop_cb; + grp->disconnect_cls = stop_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); } @@ -1065,6 +1055,10 @@ member_connect (struct GNUNET_MULTICAST_Member *mem) GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, struct MulticastJoinDecisionMessageHeader, mem), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), GNUNET_MQ_hd_fixed_size (group_replay_request, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, struct MulticastReplayRequestMessage, @@ -1198,16 +1192,19 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, GNUNET_ContinuationCallback part_cb, void *part_cls) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MQ_Envelope *env; mem->join_dcsn_cb = NULL; grp->join_req_cb = NULL; grp->message_cb = NULL; grp->replay_msg_cb = NULL; grp->replay_frag_cb = NULL; - - group_disconnect (grp, part_cb, part_cls); + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = part_cb; + grp->disconnect_cls = part_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); } diff --git a/src/multicast/test_multicast_multipeer.c b/src/multicast/test_multicast_multipeer.c @@ -35,9 +35,10 @@ #define PEERS_REQUESTED 12 -struct multicast_peer +struct MulticastPeerContext { int peer; /* peer number */ + struct GNUNET_CRYPTO_EcdsaPrivateKey *key; const struct GNUNET_PeerIdentity *id; struct GNUNET_TESTBED_Operation *op; /* not yet in use */ struct GNUNET_TESTBED_Operation *pi_op; /* not yet in use */ @@ -61,7 +62,7 @@ static void service_connect (void *cls, void *ca_result, const char *emsg); -static struct multicast_peer **mc_peers; +static struct MulticastPeerContext **multicast_peers; static struct GNUNET_TESTBED_Peer **peers; // FIXME: refactor @@ -69,18 +70,14 @@ static struct GNUNET_TESTBED_Operation *op[PEERS_REQUESTED]; static struct GNUNET_TESTBED_Operation *pi_op[PEERS_REQUESTED]; static struct GNUNET_MULTICAST_Origin *origin; -static struct GNUNET_MULTICAST_Member *member[PEERS_REQUESTED]; /* first element always empty */ +static struct GNUNET_MULTICAST_Member *members[PEERS_REQUESTED]; /* first element always empty */ static struct GNUNET_SCHEDULER_Task *timeout_tid; -static struct GNUNET_CRYPTO_EddsaPrivateKey group_key; +//static struct GNUNET_CRYPTO_EddsaPrivateKey *group_key; static struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; static struct GNUNET_HashCode group_pub_key_hash; -static struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key[PEERS_REQUESTED]; -static struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key[PEERS_REQUESTED]; - - /** * Global result for testcase. */ @@ -93,6 +90,8 @@ static int result; static void shutdown_task (void *cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutdown_task!\n"); for (int i=0;i<PEERS_REQUESTED;i++) { if (NULL != op[i]) @@ -107,14 +106,16 @@ shutdown_task (void *cls) } } - if (NULL != mc_peers) + if (NULL != multicast_peers) { for (int i=0; i < PEERS_REQUESTED; i++) { - GNUNET_free (mc_peers[i]); - mc_peers[i] = NULL; + GNUNET_free (multicast_peers[i]->key); + GNUNET_free (multicast_peers[i]); + multicast_peers[i] = NULL; } - GNUNET_free (mc_peers); + GNUNET_free (multicast_peers); + multicast_peers = NULL; } if (NULL != timeout_tid) @@ -141,11 +142,11 @@ member_join_request (void *cls, const struct GNUNET_MessageHeader *join_msg, struct GNUNET_MULTICAST_JoinHandle *jh) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) sent a join request.\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); } @@ -154,7 +155,7 @@ notify (void *cls, size_t *data_size, void *data) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct pingpong_msg *pp_msg = GNUNET_new (struct pingpong_msg); pp_msg->peer = mc_peer->peer; @@ -178,18 +179,18 @@ member_join_decision (void *cls, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_msg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct GNUNET_MULTICAST_MemberTransmitHandle *req; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) received a decision from origin: %s\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id), + GNUNET_i2s (multicast_peers[mc_peer->peer]->id), (GNUNET_YES == is_admitted)?"accepted":"rejected"); if (GNUNET_YES == is_admitted) { - req = GNUNET_MULTICAST_member_to_origin (member[mc_peer->peer], + req = GNUNET_MULTICAST_member_to_origin (members[mc_peer->peer], 0, notify, cls); @@ -215,10 +216,32 @@ member_replay_msg () static void +origin_disconnected_cb (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Origin disconnected. Shutting down.\n"); + result = GNUNET_YES; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +member_disconnected_cb (void *cls) +{ + for (int i = 1; i < PEERS_REQUESTED; ++i) + if (GNUNET_NO == multicast_peers[i]->test_ok) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All member disconnected. Stopping origin.\n"); + GNUNET_MULTICAST_origin_stop (origin, origin_disconnected_cb, cls); +} + + +static void member_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct pingpong_msg *pp_msg = (struct pingpong_msg*) &(msg[1]); if (PONG == pp_msg->msg && mc_peer->peer == pp_msg->peer) @@ -226,18 +249,15 @@ member_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%i (%s) receives a pong\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); - + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); mc_peer->test_ok = GNUNET_OK; - } - - // Test for completeness of received PONGs - for (int i=1; i<PEERS_REQUESTED; i++) - if (GNUNET_NO == mc_peers[i]->test_ok) - return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "peer #%u (%s) parting from multicast group\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); - result = GNUNET_YES; - GNUNET_SCHEDULER_shutdown(); + GNUNET_MULTICAST_member_part (members[mc_peer->peer], member_disconnected_cb, cls); + } } @@ -349,81 +369,53 @@ origin_message (void *cls, static void -multicast_da (void *cls, - void *op_result) +multicast_disconnect (void *cls, + void *op_result) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; - if (0 == mc_peer->peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Origin closes multicast group\n"); - - GNUNET_MULTICAST_origin_stop (origin, NULL, cls); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "peer #%u (%s) parting from multicast group\n", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); - - GNUNET_MULTICAST_member_part (member[mc_peer->peer], NULL, cls); - } } static void * -multicast_ca (void *cls, - const struct GNUNET_CONFIGURATION_Handle *cfg) +multicast_connect (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *multicast_peer = cls; struct GNUNET_MessageHeader *join_msg; char data[64]; - if (0 == mc_peer->peer) + multicast_peer->key = GNUNET_CRYPTO_ecdsa_key_create (); + if (0 == multicast_peer->peer) { - struct GNUNET_CRYPTO_EddsaPrivateKey *key = GNUNET_CRYPTO_eddsa_key_create (); - GNUNET_CRYPTO_eddsa_key_get_public (key, &group_pub_key); + GNUNET_CRYPTO_eddsa_key_get_public (multicast_peer->key, &group_pub_key); GNUNET_CRYPTO_hash (&group_pub_key, sizeof (group_pub_key), &group_pub_key_hash); - - group_key = *key; - origin = GNUNET_MULTICAST_origin_start (cfg, - &group_key, - 0, - origin_join_request, - origin_replay_frag, - origin_replay_msg, - origin_request, - origin_message, - cls); - - if (NULL == origin) { + multicast_peer->key, + 0, + origin_join_request, + origin_replay_frag, + origin_replay_msg, + origin_request, + origin_message, + cls); + if (NULL == origin) + { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u could not create a multicast group", - mc_peer->peer); + multicast_peer->peer); return NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u connected as origin to group %s\n", - mc_peer->peer, + multicast_peer->peer, GNUNET_h2s (&group_pub_key_hash)); - return origin; } else { - // Get members keys - member_pub_key[mc_peer->peer] = GNUNET_new (struct GNUNET_CRYPTO_EcdsaPublicKey); - member_key[mc_peer->peer] = GNUNET_CRYPTO_ecdsa_key_create (); - GNUNET_CRYPTO_ecdsa_key_get_public (member_key[mc_peer->peer], - member_pub_key[mc_peer->peer]); - sprintf(data, "Hi, I am peer #%u (%s). Can I enter?", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id)); uint8_t data_size = strlen (data) + 1; join_msg = GNUNET_malloc (sizeof (join_msg) + data_size); join_msg->size = htons (sizeof (join_msg) + data_size); @@ -432,24 +424,25 @@ multicast_ca (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) tries to join multicast group %s\n", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id), + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id), GNUNET_h2s (&group_pub_key_hash)); - member[mc_peer->peer] = GNUNET_MULTICAST_member_join (cfg, - &group_pub_key, - member_key[mc_peer->peer], - mc_peers[0]->id, - 0, - NULL, - join_msg, /* join message */ - member_join_request, - member_join_decision, - member_replay_frag, - member_replay_msg, - member_message, - cls); - return member[mc_peer->peer]; + members[multicast_peer->peer] = + GNUNET_MULTICAST_member_join (cfg, + &group_pub_key, + multicast_peer->key, + multicast_peers[0]->id, + 0, + NULL, + join_msg, /* join message */ + member_join_request, + member_join_decision, + member_replay_frag, + member_replay_msg, + member_message, + cls); + return members[multicast_peer->peer]; } } @@ -460,7 +453,7 @@ peer_information_cb (void *cls, const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; if (NULL == pinfo) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got no peer information\n"); @@ -468,7 +461,7 @@ peer_information_cb (void *cls, GNUNET_SCHEDULER_shutdown (); } - mc_peers[mc_peer->peer]->id = pinfo->result.id; + multicast_peers[mc_peer->peer]->id = pinfo->result.id; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got peer information of %s (%s)\n", @@ -478,22 +471,28 @@ peer_information_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Create peer #%u (%s)\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); if (0 != mc_peer->peer) { /* connect to multicast service of members */ - op[mc_peer->peer] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ - peers[mc_peer->peer], /* The peer whose service to connect to */ - "multicast", /* The name of the service */ - service_connect, /* callback to call after a handle to service - is opened */ - cls, /* closure for the above callback */ - multicast_ca, /* callback to call with peer's configuration; - this should open the needed service connection */ - multicast_da, /* callback to be called when closing the - opened service connection */ - cls); /* closure for the above two callbacks */ + op[mc_peer->peer] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[mc_peer->peer], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + cls, + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + cls); } } @@ -504,14 +503,14 @@ service_connect (void *cls, void *ca_result, const char *emsg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; if (NULL == ca_result) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connection adapter not created for peer #%u (%s)\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); result = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown(); @@ -525,7 +524,7 @@ service_connect (void *cls, pi_op[i] = GNUNET_TESTBED_peer_get_information (peers[i], GNUNET_TESTBED_PIT_IDENTITY, peer_information_cb, - mc_peers[i]); + multicast_peers[i]); } } } @@ -549,50 +548,51 @@ service_connect (void *cls, * @param links_failed number of links testbed was unable to establish */ static void testbed_master (void *cls, - struct GNUNET_TESTBED_RunHandle *h, - unsigned int num_peers, - struct GNUNET_TESTBED_Peer **p, - unsigned int links_succeeded, - unsigned int links_failed) + struct GNUNET_TESTBED_RunHandle *h, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **p, + unsigned int links_succeeded, + unsigned int links_failed) { /* Testbed is ready with peers running and connected in a pre-defined overlay topology (FIXME) */ - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Connected to testbed_master()\n"); - peers = p; - - mc_peers = GNUNET_new_array (PEERS_REQUESTED, struct multicast_peer*); + multicast_peers = GNUNET_new_array (PEERS_REQUESTED, struct MulticastPeerContext*); // Create test contexts for members for (int i = 0; i<PEERS_REQUESTED; i++) { - mc_peers[i] = GNUNET_new (struct multicast_peer); - mc_peers[i]->peer = i; - mc_peers[i]->test_ok = GNUNET_NO; + multicast_peers[i] = GNUNET_new (struct MulticastPeerContext); + multicast_peers[i]->peer = i; + multicast_peers[i]->test_ok = GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Create origin peer\n"); - - op[0] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ - peers[0], /* The peer whose service to connect to */ - "multicast", /* The name of the service */ - service_connect, /* callback to call after a handle to service - is opened */ - mc_peers[0], /* closure for the above callback */ - multicast_ca, /* callback to call with peer's configuration; - this should open the needed service connection */ - multicast_da, /* callback to be called when closing the - opened service connection */ - mc_peers[0]); /* closure for the above two callbacks */ - - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule a new task on shutdown */ - + op[0] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[0], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + multicast_peers[0], + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + multicast_peers[0]); + /* Schedule a new task on shutdown */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule the shutdown task with a delay of a few Seconds */ - timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 400), - &timeout_task, NULL); + timeout_tid = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 400), + &timeout_task, + NULL); } @@ -616,15 +616,21 @@ main (int argc, char *argv[]) } result = GNUNET_SYSERR; - ret = GNUNET_TESTBED_test_run - ("test-multicast-multipeer", /* test case name */ - config_file, /* template configuration */ - PEERS_REQUESTED, /* number of peers to start */ - 0LL, /* Event mask - set to 0 for no event notifications */ - NULL, /* Controller event callback */ - NULL, /* Closure for controller event callback */ - testbed_master, /* continuation callback to be called when testbed setup is complete */ - NULL); /* Closure for the test_master callback */ + ret = + GNUNET_TESTBED_test_run ("test-multicast-multipeer", + config_file, + /* number of peers to start */ + PEERS_REQUESTED, + /* Event mask - set to 0 for no event notifications */ + 0LL, + /* Controller event callback */ + NULL, + /* Closure for controller event callback */ + NULL, + /* called when testbed setup is complete */ + testbed_master, + /* Closure for the test_master callback */ + NULL); if ( (GNUNET_OK != ret) || (GNUNET_OK != result) ) return 1; return 0; diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c @@ -279,7 +279,7 @@ struct Channel * Is the client disconnected? * #GNUNET_YES or #GNUNET_NO */ - uint8_t is_disconnected; + uint8_t is_disconnecting; /** * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? @@ -508,8 +508,6 @@ cleanup_master (struct Master *mst) { struct Channel *chn = &mst->channel; - if (NULL != mst->origin) - GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); } @@ -546,11 +544,6 @@ cleanup_slave (struct Slave *slv) GNUNET_free (slv->relays); slv->relays = NULL; } - if (NULL != slv->member) - { - GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME - slv->member = NULL; - } GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); } @@ -603,15 +596,16 @@ client_notify_disconnect (void *cls, if (NULL == chn) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p User context is NULL in client_disconnect()\n", + "%p User context is NULL in client_notify_disconnect ()\n", chn); GNUNET_break (0); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client (%s) disconnected from channel %s\n", + "%p Client %p (%s) disconnected from channel %s\n", chn, + client, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); @@ -645,15 +639,8 @@ client_notify_disconnect (void *cls, chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); - chn->is_disconnected = GNUNET_YES; - if (NULL != chn->tmit_head) - { /* Send pending messages to multicast before cleanup. */ - transmit_message (chn); - } - else - { - cleanup_channel (chn); - } + chn->is_disconnecting = GNUNET_YES; + cleanup_channel (chn); } } @@ -688,7 +675,7 @@ client_send_msg (const struct Channel *chn, const struct GNUNET_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending message to clients.\n", + "Sending message to clients of channel %p.\n", chn); struct ClientList *cli = chn->clients_head; @@ -699,7 +686,6 @@ client_send_msg (const struct Channel *chn, GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client), env); - cli = cli->next; } } @@ -734,7 +720,7 @@ client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id, GNUNET_memcpy (&res[1], data, data_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n", + "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n", client, GNUNET_ntohll (op_id), result_code, @@ -1202,12 +1188,12 @@ fragment_queue_insert (struct Channel *chn, else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype || frag_offset == fragq->header_size) { /* header is now complete */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Header of message %" PRIu64 " is complete.\n", chn, GNUNET_ntohll (mmsg->message_id)); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Adding message %" PRIu64 " to queue.\n", chn, GNUNET_ntohll (mmsg->message_id)); @@ -1215,7 +1201,7 @@ fragment_queue_insert (struct Channel *chn, } else { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->message_id), @@ -1230,7 +1216,7 @@ fragment_queue_insert (struct Channel *chn, if (frag_offset == fragq->size) fragq->state = MSG_FRAG_STATE_END; else - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->message_id), @@ -1285,7 +1271,7 @@ static void fragment_queue_run (struct Channel *chn, uint64_t msg_id, struct FragmentQueue *fragq, uint8_t drop) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n", chn, msg_id, @@ -1413,7 +1399,7 @@ store_recv_state_modify_result (void *cls, int64_t result, static uint64_t message_queue_run (struct Channel *chn) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Running message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; @@ -1421,7 +1407,7 @@ message_queue_run (struct Channel *chn) while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); @@ -1431,7 +1417,7 @@ message_queue_run (struct Channel *chn) if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p No fragq (%p) or header not complete.\n", chn, fragq); break; @@ -1453,7 +1439,7 @@ message_queue_run (struct Channel *chn) && (chn->max_message_id != msg_id - 1 && chn->max_message_id != msg_id)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Out of order message. " "(%" PRIu64 " != %" PRIu64 " - 1)\n", chn, chn->max_message_id, msg_id); @@ -1469,7 +1455,7 @@ message_queue_run (struct Channel *chn) { if (msg_id - fragq->state_delta != chn->max_state_message_id) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Out of order stateful message. " "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", chn, msg_id, fragq->state_delta, chn->max_state_message_id); @@ -1515,8 +1501,6 @@ message_queue_run (struct Channel *chn) static uint64_t message_queue_drop (struct Channel *chn) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, @@ -1703,7 +1687,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, res.result_code = htonl (result); res.max_message_id = GNUNET_htonll (max_message_id); - if (GNUNET_OK == result || GNUNET_NO == result) + if (GNUNET_YES == result || GNUNET_NO == result) { chn->max_message_id = max_message_id; chn->max_state_message_id = max_state_message_id; @@ -1831,6 +1815,9 @@ handle_client_slave_join (void *cls, struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; struct GNUNET_HashCode pub_key_hash, slv_pub_hash; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "got join request from client %p\n", + client); GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); @@ -1905,7 +1892,7 @@ handle_client_slave_join (void *cls, GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, - &store_recv_slave_counters, slv); + &store_recv_slave_counters, slv); } else { @@ -1952,8 +1939,9 @@ handle_client_slave_join (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client connected as slave to channel %s.\n", - slv, GNUNET_h2s (&chn->pub_key_hash)); + "Client %p connected as slave to channel %s.\n", + client, + GNUNET_h2s (&chn->pub_key_hash)); struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); cli->client = client; @@ -2037,6 +2025,49 @@ handle_client_join_decision (void *cls, } +static void +channel_part_cb (void *cls) +{ + struct GNUNET_SERVICE_Client *client = cls; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); +} + + +static void +handle_client_part_request (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct Client *c = cls; + + c->channel->is_disconnecting = GNUNET_YES; + if (GNUNET_YES == c->channel->is_master) + { + struct Master *mst = (struct Master *) c->channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got part request from master %p\n", + mst); + GNUNET_assert (NULL != mst->origin); + GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client); + } + else + { + struct Slave *slv = (struct Slave *) c->channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got part request from slave %p\n", + slv); + GNUNET_assert (NULL != slv->member); + GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client); + } + GNUNET_SERVICE_client_continue (c->client); +} + + /** * Send acknowledgement to a client. * @@ -2096,7 +2127,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) { GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn); } - else if (GNUNET_YES == chn->is_disconnected + else if (GNUNET_YES == chn->is_disconnecting && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) { /* FIXME: handle partial message (when still in_transmit) */ @@ -2208,12 +2239,10 @@ transmit_message (struct Channel *chn) static void master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { tmit_msg->id = ++mst->max_message_id; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: message_id=%" PRIu64 "\n", mst, tmit_msg->id); struct GNUNET_PSYC_MessageMethod *pmeth @@ -2225,7 +2254,7 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) } else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: state_delta=%" PRIu64 "\n", mst, tmit_msg->id - mst->max_state_message_id); pmeth->state_delta = GNUNET_htonll (tmit_msg->id @@ -2234,7 +2263,7 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) } else { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message: state not modified\n", mst); pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); } @@ -2359,7 +2388,9 @@ handle_client_psyc_message (void *cls, if (GNUNET_YES != chn->is_ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Channel is not ready yet, disconnecting client.\n", chn); + "%p Channel is not ready yet, disconnecting client %p.\n", + chn, + client); GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; @@ -2789,9 +2820,9 @@ run (void *cls, GNUNET_SERVICE_MAIN ("psyc", GNUNET_SERVICE_OPTION_NONE, - run, - client_notify_connect, - client_notify_disconnect, + &run, + &client_notify_connect, + &client_notify_disconnect, NULL, GNUNET_MQ_hd_fixed_size (client_master_start, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, @@ -2805,6 +2836,10 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, struct GNUNET_PSYC_JoinDecisionMessage, NULL), + GNUNET_MQ_hd_fixed_size (client_part_request, + GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST, + struct GNUNET_MessageHeader, + NULL), GNUNET_MQ_hd_var_size (client_psyc_message, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, struct GNUNET_MessageHeader, diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c @@ -260,6 +260,10 @@ handle_channel_result (void *cls, GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), GNUNET_ntohll (res->result_code), data, data_size, NULL); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "handle_channel_result: Received result message with OP ID %" PRIu64 "\n", + GNUNET_ntohll (res->op_id)); } @@ -555,6 +559,9 @@ handle_slave_join_decision (void *cls, static void channel_cleanup (struct GNUNET_PSYC_Channel *chn) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "cleaning up channel %p\n", + chn); if (NULL != chn->tmit) { GNUNET_PSYC_transmit_destroy (chn->tmit); @@ -562,6 +569,7 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn) } if (NULL != chn->recv) { + GNUNET_PSYC_receive_destroy (chn->recv); chn->recv = NULL; } @@ -585,30 +593,12 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn) static void -channel_disconnect (struct GNUNET_PSYC_Channel *chn, - GNUNET_ContinuationCallback cb, - void *cls) +handle_channel_part_ack (void *cls, + const struct GNUNET_MessageHeader *msg) { - chn->is_disconnecting = GNUNET_YES; - chn->disconnect_cb = cb; - chn->disconnect_cls = cls; + struct GNUNET_PSYC_Channel *chn = cls; - if (NULL != chn->mq) - { - struct GNUNET_MQ_Envelope *env = GNUNET_MQ_get_last_envelope (chn->mq); - if (NULL != env) - { - GNUNET_MQ_notify_sent (env, (GNUNET_SCHEDULER_TaskCallback) channel_cleanup, chn); - } - else - { - channel_cleanup (chn); - } - } - else - { - channel_cleanup (chn); - } + channel_cleanup (chn); } @@ -671,6 +661,10 @@ master_connect (struct GNUNET_PSYC_Master *mst) GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, struct GNUNET_PSYC_JoinRequestMessage, mst), + GNUNET_MQ_hd_fixed_size (channel_part_ack, + GNUNET_MESSAGE_TYPE_PSYC_PART_ACK, + struct GNUNET_MessageHeader, + chn), GNUNET_MQ_hd_var_size (channel_message, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, struct GNUNET_PSYC_MessageHeader, @@ -694,8 +688,11 @@ master_connect (struct GNUNET_PSYC_Master *mst) GNUNET_MQ_handler_end () }; - chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", - handlers, master_disconnected, mst); + chn->mq = GNUNET_CLIENT_connect (chn->cfg, + "psyc", + handlers, + &master_disconnected, + mst); GNUNET_assert (NULL != chn->mq); chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); @@ -780,10 +777,13 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst, void *stop_cls) { struct GNUNET_PSYC_Channel *chn = &mst->chn; + struct GNUNET_MQ_Envelope *env; - /* FIXME: send msg to service */ - - channel_disconnect (chn, stop_cb, stop_cls); + chn->is_disconnecting = GNUNET_YES; + chn->disconnect_cb = stop_cb; + chn->disconnect_cls = stop_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST); + GNUNET_MQ_send (chn->mq, env); } @@ -931,7 +931,8 @@ slave_reconnect (void *cls) * Reconnect after backoff period. */ static void -slave_disconnected (void *cls, enum GNUNET_MQ_Error error) +slave_disconnected (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_PSYC_Slave *slv = cls; struct GNUNET_PSYC_Channel *chn = &slv->chn; @@ -950,7 +951,7 @@ slave_disconnected (void *cls, enum GNUNET_MQ_Error error) chn->mq = NULL; } chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, - slave_reconnect, + &slave_reconnect, slv); chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); } @@ -970,6 +971,10 @@ slave_connect (struct GNUNET_PSYC_Slave *slv) GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, struct GNUNET_PSYC_JoinDecisionMessage, slv), + GNUNET_MQ_hd_fixed_size (channel_part_ack, + GNUNET_MESSAGE_TYPE_PSYC_PART_ACK, + struct GNUNET_MessageHeader, + chn), GNUNET_MQ_hd_var_size (channel_message, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, struct GNUNET_PSYC_MessageHeader, @@ -993,9 +998,19 @@ slave_connect (struct GNUNET_PSYC_Slave *slv) GNUNET_MQ_handler_end () }; - chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", - handlers, slave_disconnected, slv); - GNUNET_assert (NULL != chn->mq); + chn->mq = GNUNET_CLIENT_connect (chn->cfg, + "psyc", + handlers, + &slave_disconnected, + slv); + if (NULL == chn->mq) + { + chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, + &slave_reconnect, + slv); + chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); + return; + } chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); GNUNET_MQ_send_copy (chn->mq, chn->connect_env); @@ -1107,10 +1122,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv, void *part_cls) { struct GNUNET_PSYC_Channel *chn = &slv->chn; + struct GNUNET_MQ_Envelope *env; - /* FIXME: send msg to service */ - - channel_disconnect (chn, part_cb, part_cls); + chn->is_disconnecting = GNUNET_YES; + chn->disconnect_cb = part_cb; + chn->disconnect_cls = part_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST); + GNUNET_MQ_send (chn->mq, env); } @@ -1233,6 +1251,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, req->did_join = GNUNET_YES; req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "GNUNET_PSYC_channel_slave_add, OP ID: %" PRIu64 "\n", + GNUNET_ntohll (req->op_id)); GNUNET_MQ_send (chn->mq, env); } @@ -1283,6 +1304,9 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, req->did_join = GNUNET_NO; req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "GNUNET_PSYC_channel_slave_remove, OP ID: %" PRIu64 "\n", + GNUNET_ntohll (req->op_id)); GNUNET_MQ_send (chn->mq, env); } @@ -1321,6 +1345,10 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, req->message_limit = GNUNET_htonll (message_limit); req->flags = htonl (flags); req->op_id = GNUNET_htonll (hist->op_id); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_history_replay, OP ID: %" PRIu64 "\n", + GNUNET_ntohll (req->op_id)); GNUNET_memcpy (&req[1], method_prefix, method_size); GNUNET_MQ_send (chn->mq, env); @@ -1459,6 +1487,11 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn, struct GNUNET_MQ_Envelope * env = GNUNET_MQ_msg_extra (req, name_size, type); req->op_id = GNUNET_htonll (sr->op_id); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_state_get, OP ID: %" PRIu64 "\n", + GNUNET_ntohll (req->op_id)); + GNUNET_memcpy (&req[1], name, name_size); GNUNET_MQ_send (chn->mq, env); diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c @@ -755,15 +755,22 @@ slave_add () static void +schedule_second_slave_join (void *cls) +{ + slave_join (TEST_SLAVE_JOIN_ACCEPT); +} + + +static void first_slave_parted (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n"); - slave_join (TEST_SLAVE_JOIN_ACCEPT); + GNUNET_SCHEDULER_add_now (&schedule_second_slave_join, NULL); } static void -schedule_slave_part (void *cls) +schedule_first_slave_part (void *cls) { GNUNET_PSYC_slave_part (slv, GNUNET_NO, &first_slave_parted, NULL); } @@ -783,7 +790,7 @@ join_decision_cb (void *cls, case TEST_SLAVE_JOIN_REJECT: GNUNET_assert (0 == is_admitted); GNUNET_assert (1 == join_req_count); - GNUNET_SCHEDULER_add_now (&schedule_slave_part, NULL); + GNUNET_SCHEDULER_add_now (&schedule_first_slave_part, NULL); break; case TEST_SLAVE_JOIN_ACCEPT: @@ -844,11 +851,18 @@ slave_join (int t) struct GNUNET_PSYC_Message * join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9); - slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, + slv = GNUNET_PSYC_slave_join (cfg, + &channel_pub_key, + slave_key, GNUNET_PSYC_SLAVE_JOIN_NONE, - &origin, 0, NULL, - &slave_message_cb, &slave_message_part_cb, - &slave_connect_cb, &join_decision_cb, NULL, + &origin, + 0, + NULL, + &slave_message_cb, + &slave_message_part_cb, + &slave_connect_cb, + &join_decision_cb, + NULL, join_msg); GNUNET_free (join_msg); slv_chn = GNUNET_PSYC_slave_get_channel (slv); diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf @@ -0,0 +1,16 @@ +@INLINE@ ../../contrib/no_forcestart.conf + +[PATHS] +GNUNET_TEST_HOME = /tmp/gnunet-test-psyc/ + +[transport] +PLUGINS = tcp + +[nat] +DISABLEV6 = YES +ENABLE_UPNP = NO +BEHIND_NAT = NO +ALLOW_NAT = NO +INTERNAL_ADDRESS = 127.0.0.1 +EXTERNAL_ADDRESS = 127.0.0.1 + diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c @@ -148,14 +148,14 @@ handle_result_code (void *cls, const struct OperationResult *opres) str, size - sizeof (*opres), (void **) &op)) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "handle_result_code: Received result message with operation ID: %" PRIu64 "\n", + "handle_result_code: Received result message with OP ID: %" PRIu64 "\n", GNUNET_ntohll (opres->op_id)); GNUNET_free (op); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, - "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n", + "handle_result_code: No callback registered for OP ID %" PRIu64 ".\n", GNUNET_ntohll (opres->op_id)); } h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; @@ -187,7 +187,7 @@ handle_result_counters (void *cls, const struct CountersResult *cres) else { LOG (GNUNET_ERROR_TYPE_DEBUG, - "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n", + "handle_result_counters: No callback registered for OP ID %" PRIu64 ".\n", GNUNET_ntohll (cres->op_id)); } h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; @@ -233,7 +233,7 @@ handle_result_fragment (void *cls, const struct FragmentResult *fres) else { LOG (GNUNET_ERROR_TYPE_DEBUG, - "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n", + "handle_result_fragment: No callback registered for OP ID %" PRIu64 ".\n", GNUNET_ntohll (fres->op_id)); } h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; @@ -282,7 +282,7 @@ handle_result_state (void *cls, const struct StateResult *sres) else { LOG (GNUNET_ERROR_TYPE_DEBUG, - "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n", + "handle_result_state: No callback registered for OP ID %" PRIu64 ".\n", GNUNET_ntohll (sres->op_id)); } h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; diff --git a/src/social/gnunet-service-social.c b/src/social/gnunet-service-social.c @@ -96,7 +96,7 @@ static struct GNUNET_CONTAINER_MultiHashMap *apps_places; * Application subscriptions per place. * H(place_pub_key) -> H(app_id) */ -static struct GNUNET_CONTAINER_MultiHashMap *places_apps; +//static struct GNUNET_CONTAINER_MultiHashMap *places_apps; /** * Connected applications. @@ -255,10 +255,10 @@ struct Place uint8_t is_ready; /** - * Is the client disconnected? + * Is the client disconnecting? * #GNUNET_YES or #GNUNET_NO */ - uint8_t is_disconnected; + uint8_t is_disconnecting; /** * Is this a host (#GNUNET_YES), or guest (#GNUNET_NO)? @@ -348,7 +348,7 @@ struct Guest /** * Join request to be transmitted to the master on join. */ - struct GNUNET_MessageHeader *join_req; + struct GNUNET_MessageHeader *join_req; // FIXME: not used! /** * Join decision received from PSYC. @@ -487,8 +487,6 @@ cleanup_host (struct Host *hst) { struct Place *plc = &hst->place; - if (NULL != hst->master) - GNUNET_PSYC_master_stop (hst->master, GNUNET_NO, NULL, NULL); // FIXME GNUNET_CONTAINER_multihashmap_destroy (hst->join_reqs); GNUNET_CONTAINER_multihashmap_destroy (hst->relay_msgs); GNUNET_CONTAINER_multihashmap_remove (hosts, &plc->pub_key_hash, plc); @@ -505,7 +503,7 @@ cleanup_guest (struct Guest *gst) struct GNUNET_CONTAINER_MultiHashMap * plc_gst = GNUNET_CONTAINER_multihashmap_get (place_guests, &plc->pub_key_hash); - GNUNET_assert (NULL != plc_gst); // FIXME + GNUNET_assert (NULL != plc_gst); GNUNET_CONTAINER_multihashmap_remove (plc_gst, &plc->ego_pub_hash, gst); if (0 == GNUNET_CONTAINER_multihashmap_size (plc_gst)) @@ -520,8 +518,6 @@ cleanup_guest (struct Guest *gst) GNUNET_free (gst->join_req); if (NULL != gst->relays) GNUNET_free (gst->relays); - if (NULL != gst->slave) - GNUNET_PSYC_slave_part (gst->slave, GNUNET_NO, NULL, NULL); // FIXME GNUNET_CONTAINER_multihashmap_remove (guests, &plc->pub_key_hash, plc); } @@ -537,8 +533,8 @@ cleanup_place (void *cls) struct Place *plc = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Cleaning up place %s\n", - plc, GNUNET_h2s (&plc->pub_key_hash)); + "cleaning up place %s\n", + GNUNET_h2s (&plc->pub_key_hash)); (GNUNET_YES == plc->is_host) ? cleanup_host ((struct Host *) plc) @@ -583,12 +579,19 @@ client_notify_disconnect (void *cls, { if (cli->client == client) { - GNUNET_CONTAINER_DLL_remove (plc->clients_head, plc->clients_tail, cli); + GNUNET_CONTAINER_DLL_remove (plc->clients_head, + plc->clients_tail, + cli); GNUNET_free (cli); break; } cli = cli->next; } + if (GNUNET_YES == plc->is_disconnecting) + { + GNUNET_PSYC_slicer_destroy (plc->slicer); + GNUNET_free (plc); + } } @@ -605,46 +608,55 @@ client_notify_connect (void *cls, struct GNUNET_SERVICE_Client *client, struct GNUNET_MQ_Handle *mq) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); + struct Client *c = GNUNET_new (struct Client); - struct Client *c = GNUNET_malloc (sizeof (*c)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client %p connected with queue %p\n", + client, + mq); c->client = client; - return c; } /** - * Send message to a client. - */ -static inline void -client_send_msg (struct GNUNET_SERVICE_Client *client, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_MQ_Envelope * - env = GNUNET_MQ_msg_copy (msg); - - GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), - env); -} - - -/** - * Send message to all clients connected to a place. + * Send message to all clients connected to a place and + * takes care of freeing @env. */ static void place_send_msg (const struct Place *plc, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { + struct ClientListItem *cli = plc->clients_head; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending message to clients of place.\n", plc); - - struct ClientListItem *cli = plc->clients_head; while (NULL != cli) { - client_send_msg (cli->client, msg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message to client %p\n", + cli); + GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client), + env); cli = cli->next; } + GNUNET_MQ_discard (env); +} + + +static void +place_send_leave_ack (struct Place *plc) +{ + struct GNUNET_MQ_Envelope *env; + + for (struct ClientListItem *cli = plc->clients_head; + NULL != cli; + cli = cli->next) + { + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE_ACK); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client), + env); + } } @@ -666,23 +678,21 @@ static void client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id, int64_t result_code, const void *data, uint16_t data_size) { + struct GNUNET_MQ_Envelope *env; struct GNUNET_OperationResultMessage *res; - res = GNUNET_malloc (sizeof (*res) + data_size); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); - res->header.size = htons (sizeof (*res) + data_size); + env = GNUNET_MQ_msg_extra (res, + data_size, + GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); res->result_code = GNUNET_htonll (result_code); res->op_id = op_id; if (0 < data_size) GNUNET_memcpy (&res[1], data, data_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending result to client for operation #%" PRIu64 ": " "%" PRId64 " (size: %u)\n", client, GNUNET_ntohll (op_id), result_code, data_size); - - client_send_msg (client, &res->header); - GNUNET_free (res); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); } @@ -690,19 +700,21 @@ static void client_send_host_enter_ack (struct GNUNET_SERVICE_Client *client, struct Host *hst, uint32_t result) { + struct GNUNET_MQ_Envelope *env; + struct HostEnterAck *hack; struct Place *plc = &hst->place; - struct HostEnterAck hack; - hack.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK); - hack.header.size = htons (sizeof (hack)); - hack.result_code = htonl (result); - hack.max_message_id = GNUNET_htonll (plc->max_message_id); - hack.place_pub_key = plc->pub_key; + env = GNUNET_MQ_msg (hack, + GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK); + hack->result_code = htonl (result); + hack->max_message_id = GNUNET_htonll (plc->max_message_id); + hack->place_pub_key = plc->pub_key; if (NULL != client) - client_send_msg (client, &hack.header); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); else - place_send_msg (plc, &hack.header); + place_send_msg (plc, env); } @@ -736,7 +748,8 @@ psyc_recv_join_request (void *cls, GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); GNUNET_CONTAINER_multihashmap_put (hst->join_reqs, &slave_key_hash, jh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - place_send_msg (&hst->place, &req->header); + place_send_msg (&hst->place, + GNUNET_MQ_msg_copy (&req->header)); } @@ -746,18 +759,29 @@ psyc_recv_join_request (void *cls, static void psyc_slave_connected (void *cls, int result, uint64_t max_message_id) { + struct GNUNET_PSYC_CountersResultMessage *res; + struct GNUNET_MQ_Envelope *env; struct Guest *gst = cls; struct Place *plc = &gst->place; + plc->max_message_id = max_message_id; plc->is_ready = GNUNET_YES; + env = GNUNET_MQ_msg (res, + GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); + res->result_code = + (result != GNUNET_SYSERR) ? htonl (GNUNET_OK) : htonl (GNUNET_SYSERR); + res->max_message_id = GNUNET_htonll (plc->max_message_id); + place_send_msg (plc, env); +} - struct GNUNET_PSYC_CountersResultMessage res; - res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); - res.header.size = htons (sizeof (res)); - res.result_code = htonl (result); - res.max_message_id = GNUNET_htonll (plc->max_message_id); - place_send_msg (plc, &res.header); +static void +slave_parted_after_join_decision (void *cls) +{ + struct Guest *gst = cls; + + GNUNET_assert (NULL != gst->join_dcsn); + place_send_msg (&gst->place, GNUNET_MQ_msg_copy (&gst->join_dcsn->header)); } @@ -771,7 +795,21 @@ psyc_recv_join_dcsn (void *cls, const struct GNUNET_PSYC_Message *join_msg) { struct Guest *gst = cls; - place_send_msg (&gst->place, &dcsn->header); + + gst->join_dcsn = GNUNET_malloc (dcsn->header.size); + GNUNET_memcpy (gst->join_dcsn, + dcsn, + dcsn->header.size); + if (GNUNET_NO == is_admitted) + { + GNUNET_PSYC_slave_part (gst->slave, + GNUNET_NO, + &slave_parted_after_join_decision, + gst); + gst->slave = NULL; + return; + } + place_send_msg (&gst->place, GNUNET_MQ_msg_copy (&gst->join_dcsn->header)); } @@ -792,7 +830,7 @@ psyc_recv_message (void *cls, GNUNET_PSYC_slicer_message (plc->slicer, msg); - place_send_msg (plc, &msg->header); + place_send_msg (plc, GNUNET_MQ_msg_copy (&msg->header)); } @@ -1096,9 +1134,6 @@ place_init (struct Place *plc) static int place_add (const struct PlaceEnterRequest *ereq) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding place to hashmap:\n"); - struct EgoPlacePublicKey ego_place_pub_key = { .ego_pub_key = ereq->ego_pub_key, .place_pub_key = ereq->place_pub_key, @@ -1173,7 +1208,9 @@ app_place_add (const char *app_id, return GNUNET_NO; if (GNUNET_SYSERR == place_add (ereq)) + { return GNUNET_SYSERR; + } if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (app_places, &ego_place_pub_hash, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) @@ -1181,32 +1218,6 @@ app_place_add (const char *app_id, GNUNET_break (0); return GNUNET_SYSERR; } - - struct GNUNET_HashCode place_pub_hash; - GNUNET_CRYPTO_hash (&ereq->place_pub_key, sizeof (ereq->place_pub_key), &place_pub_hash); - - struct GNUNET_CONTAINER_MultiHashMap * - place_apps = GNUNET_CONTAINER_multihashmap_get (places_apps, &place_pub_hash); - if (NULL == place_apps) - { - place_apps = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (places_apps, &place_pub_hash, place_apps, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) - { - GNUNET_break (0); - } - } - - size_t app_id_size = strlen (app_id) + 1; - void *app_id_value = GNUNET_malloc (app_id_size); - GNUNET_memcpy (app_id_value, app_id, app_id_size); - - if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (place_apps, &app_id_hash, app_id_value, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - GNUNET_break (0); - } - return GNUNET_OK; } @@ -1223,7 +1234,10 @@ static int app_place_save (const char *app_id, const struct PlaceEnterRequest *ereq) { - app_place_add (app_id, ereq); + if (GNUNET_SYSERR == app_place_add (app_id, ereq)) + { + GNUNET_assert (0); + } if (NULL == dir_places) return GNUNET_SYSERR; @@ -1304,18 +1318,6 @@ app_place_remove (const char *app_id, if (NULL != app_places) GNUNET_CONTAINER_multihashmap_remove (app_places, &place_pub_hash, NULL); - struct GNUNET_CONTAINER_MultiHashMap * - place_apps = GNUNET_CONTAINER_multihashmap_get (places_apps, &place_pub_hash); - if (NULL != place_apps) - { - void *app_id_value = GNUNET_CONTAINER_multihashmap_get (place_apps, &app_id_hash); - if (NULL != app_id_value) - { - GNUNET_CONTAINER_multihashmap_remove (place_apps, &app_id_hash, app_id_value); - GNUNET_free (app_id_value); - } - } - int ret = GNUNET_OK; if (0 != unlink (app_place_filename)) @@ -1407,6 +1409,124 @@ msg_proc_parse (const struct MsgProcRequest *mpreq, } +void +app_notify_place (const struct GNUNET_MessageHeader *msg, + struct GNUNET_SERVICE_Client *client) +{ + struct AppPlaceMessage *amsg; + struct GNUNET_MQ_Envelope *env; + uint16_t msg_size = ntohs (msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending place notification of type %u to client.\n", + client, ntohs (msg->type)); + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER: + { + struct HostEnterRequest *hreq = (struct HostEnterRequest *) msg; + if (msg_size < sizeof (struct HostEnterRequest)) + return; + env = GNUNET_MQ_msg (amsg, + GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE); + // FIXME: also notify about not entered places + amsg->place_state = GNUNET_SOCIAL_PLACE_STATE_ENTERED; + amsg->is_host = GNUNET_YES; + amsg->ego_pub_key = hreq->ego_pub_key; + amsg->place_pub_key = hreq->place_pub_key; + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); + break; + } + case GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER: + { + if (msg_size < sizeof (struct GuestEnterRequest)) + return; + struct GuestEnterRequest *greq = (struct GuestEnterRequest *) msg; + env = GNUNET_MQ_msg (amsg, + GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE); + // FIXME: also notify about not entered places + amsg->place_state = GNUNET_SOCIAL_PLACE_STATE_ENTERED; + amsg->is_host = GNUNET_NO; + amsg->ego_pub_key = greq->ego_pub_key; + amsg->place_pub_key = greq->place_pub_key; + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); + break; + } + default: + return; + } +} + + +void +app_notify_place_end (struct GNUNET_SERVICE_Client *client) +{ + struct GNUNET_MQ_Envelope *env; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending end of place list notification to client\n", + client); + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); +} + + +void +app_notify_ego (struct Ego *ego, struct GNUNET_SERVICE_Client *client) +{ + struct AppEgoMessage *emsg; + struct GNUNET_MQ_Envelope *env; + size_t name_size = strlen (ego->name) + 1; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending ego notification to client: %s\n", + client, ego->name); + env = GNUNET_MQ_msg_extra (emsg, + name_size, + GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO); + GNUNET_CRYPTO_ecdsa_key_get_public (&ego->key, &emsg->ego_pub_key); + GNUNET_memcpy (&emsg[1], ego->name, name_size); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); +} + + +void +app_notify_ego_end (struct GNUNET_SERVICE_Client *client) +{ + struct GNUNET_MQ_Envelope *env; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending end of ego list notification to client\n", + client); + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); +} + + +int +app_place_entry_notify (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + struct GNUNET_MessageHeader * + msg = GNUNET_CONTAINER_multihashmap_get (places, key); + if (NULL != msg) + app_notify_place (msg, cls); + return GNUNET_YES; +} + + +int +ego_entry (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + app_notify_ego (value, cls); + return GNUNET_YES; +} + + static int check_client_msg_proc_set (void *cls, const struct MsgProcRequest *mpreq) @@ -1518,9 +1638,8 @@ static void handle_client_host_enter (void *cls, const struct HostEnterRequest *hr) { - struct Client *c = cls; + struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; - struct HostEnterRequest * hreq = (struct HostEnterRequest *) GNUNET_copy_message (&hr->header); @@ -1578,7 +1697,7 @@ handle_client_host_enter (void *cls, if (ret != GNUNET_SYSERR) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as host to place %s.\n", hst, GNUNET_h2s (&plc->pub_key_hash)); @@ -1586,6 +1705,7 @@ handle_client_host_enter (void *cls, cli->client = client; GNUNET_CONTAINER_DLL_insert (plc->clients_head, plc->clients_tail, cli); c->place = plc; + app_notify_place (&hreq->header, client); } GNUNET_CRYPTO_eddsa_key_clear (&hreq->place_key); @@ -1622,8 +1742,12 @@ guest_enter (const struct GuestEnterRequest *greq, struct Guest **ret_gst) struct Ego *ego = GNUNET_CONTAINER_multihashmap_get (egos, &ego_pub_hash); if (NULL == ego) + { return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "entering as guest\n"); struct GNUNET_HashCode place_pub_hash; GNUNET_CRYPTO_hash (&greq->place_pub_key, sizeof (greq->place_pub_key), &place_pub_hash); @@ -1635,9 +1759,16 @@ guest_enter (const struct GuestEnterRequest *greq, struct Guest **ret_gst) if (NULL != plc_gst) gst = GNUNET_CONTAINER_multihashmap_get (plc_gst, &ego_pub_hash); - if (NULL == gst || NULL == gst->slave) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "plc_gst = %p, gst = %p\n", + plc_gst, + gst); + if (NULL == gst) { gst = GNUNET_new (struct Guest); + } + if (NULL == gst->slave) + { gst->origin = greq->origin; gst->relay_count = ntohl (greq->relay_count); @@ -1710,11 +1841,12 @@ guest_enter (const struct GuestEnterRequest *greq, struct Guest **ret_gst) plc_gst = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); (void) GNUNET_CONTAINER_multihashmap_put (place_guests, &plc->pub_key_hash, plc_gst, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - } - (void) GNUNET_CONTAINER_multihashmap_put (plc_gst, &plc->ego_pub_hash, gst, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - (void) GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, gst, + (void) GNUNET_CONTAINER_multihashmap_put (plc_gst, &plc->ego_pub_hash, gst, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + (void) GNUNET_CONTAINER_multihashmap_put (guests, &plc->pub_key_hash, gst, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + } gst->slave = GNUNET_PSYC_slave_join (cfg, &plc->pub_key, &plc->ego_key, gst->join_flags, &gst->origin, @@ -1724,6 +1856,9 @@ guest_enter (const struct GuestEnterRequest *greq, struct Guest **ret_gst) &psyc_recv_join_dcsn, gst, join_msg); plc->channel = GNUNET_PSYC_slave_get_channel (gst->slave); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "slave entered channel %p\n", + plc->channel); ret = GNUNET_YES; } @@ -1734,78 +1869,96 @@ guest_enter (const struct GuestEnterRequest *greq, struct Guest **ret_gst) static int -check_client_guest_enter (void *cls, - const struct GuestEnterRequest *greq) -{ - return GNUNET_OK; -} - - -/** - * Handle a connecting client entering a place as guest. - */ -static void -handle_client_guest_enter (void *cls, - const struct GuestEnterRequest *greq) +client_guest_enter (struct Client *c, + const struct GuestEnterRequest *greq) { - struct Client *c = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client_guest_enter\n"); + struct GNUNET_PSYC_CountersResultMessage *result_msg; + struct GNUNET_MQ_Envelope *env; struct GNUNET_SERVICE_Client *client = c->client; - uint16_t remaining = ntohs (greq->header.size) - sizeof (*greq); const char *app_id = NULL; uint16_t offset = GNUNET_STRINGS_buffer_tokenize ((const char *) &greq[1], remaining, 1, &app_id); - if (0 == offset) - { - GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); - return; - } - struct Guest *gst = NULL; struct Place *plc = NULL; + if (0 == offset) + { + return GNUNET_SYSERR; + } switch (guest_enter (greq, &gst)) { case GNUNET_YES: + { plc = c->place = &gst->place; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "guest entered successfully to local place %s\n", + GNUNET_h2s (&plc->pub_key_hash)); plc->guest = gst; app_place_save (app_id, (const struct PlaceEnterRequest *) greq); + app_notify_place (&greq->header, client); break; - + } case GNUNET_NO: { plc = c->place = &gst->place; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "guest re-entered successfully to local place %s\n", + GNUNET_h2s (&plc->pub_key_hash)); plc->guest = gst; - - struct GNUNET_PSYC_CountersResultMessage res; - res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); - res.header.size = htons (sizeof (res)); - res.result_code = htonl (GNUNET_OK); - res.max_message_id = GNUNET_htonll (plc->max_message_id); - - client_send_msg (client, &res.header); + env = GNUNET_MQ_msg (result_msg, + GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); + result_msg->result_code = htonl (GNUNET_OK); + result_msg->max_message_id = GNUNET_htonll (plc->max_message_id); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); if (NULL != gst->join_dcsn) - client_send_msg (client, &gst->join_dcsn->header); - + { + env = GNUNET_MQ_msg_copy (&gst->join_dcsn->header); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); + } break; } case GNUNET_SYSERR: - GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); - return; + { + return GNUNET_SYSERR; + } } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client connected as guest to place %s.\n", - gst, GNUNET_h2s (&plc->pub_key_hash)); struct ClientListItem *cli = GNUNET_new (struct ClientListItem); cli->client = client; GNUNET_CONTAINER_DLL_insert (plc->clients_head, plc->clients_tail, cli); + return GNUNET_OK; +} - c->place = plc; - GNUNET_SERVICE_client_continue (client); + +static int +check_client_guest_enter (void *cls, + const struct GuestEnterRequest *greq) +{ + return GNUNET_OK; +} + + +/** + * Handle a connecting client entering a place as guest. + */ +static void +handle_client_guest_enter (void *cls, + const struct GuestEnterRequest *greq) +{ + struct Client *c = cls; + + if (GNUNET_SYSERR == client_guest_enter (c, greq)) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (c->client); + return; + } + GNUNET_SERVICE_client_continue (c->client); } @@ -1830,7 +1983,7 @@ gns_result_guest_enter (void *cls, uint32_t rd_count, { struct GuestEnterByNameClosure *gcls = cls; struct Client *c = gcls->client; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNS result: %u records.\n", c, rd_count); @@ -1882,7 +2035,7 @@ gns_result_guest_enter (void *cls, uint32_t rd_count, p += relay_size; GNUNET_memcpy (p, gcls->join_msg, join_msg_size); - handle_client_guest_enter (c, greq); + client_guest_enter (c, greq); GNUNET_free (gcls->app_id); if (NULL != gcls->password) @@ -1960,118 +2113,7 @@ handle_client_guest_enter_by_name (void *cls, GNUNET_GNSRECORD_TYPE_PLACE, GNUNET_GNS_LO_DEFAULT, &gns_result_guest_enter, gcls); -} - - -void -app_notify_place (struct GNUNET_MessageHeader *msg, - struct GNUNET_SERVICE_Client *client) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending place notification of type %u to client.\n", - client, ntohs (msg->type)); - - uint16_t msg_size = ntohs (msg->size); - struct AppPlaceMessage amsg; - amsg.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE); - amsg.header.size = htons (sizeof (amsg)); - // FIXME: also notify about not entered places - amsg.place_state = GNUNET_SOCIAL_PLACE_STATE_ENTERED; - - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER: - if (msg_size < sizeof (struct HostEnterRequest)) - return; - struct HostEnterRequest *hreq = (struct HostEnterRequest *) msg; - amsg.is_host = GNUNET_YES; - amsg.ego_pub_key = hreq->ego_pub_key; - amsg.place_pub_key = hreq->place_pub_key; - break; - - case GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER: - if (msg_size < sizeof (struct GuestEnterRequest)) - return; - struct GuestEnterRequest *greq = (struct GuestEnterRequest *) msg; - amsg.is_host = GNUNET_NO; - amsg.ego_pub_key = greq->ego_pub_key; - amsg.place_pub_key = greq->place_pub_key; - break; - - default: - return; - } - - client_send_msg (client, &amsg.header); -} - - -void -app_notify_place_end (struct GNUNET_SERVICE_Client *client) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending end of place list notification to client\n", - client); - - struct GNUNET_MessageHeader msg; - msg.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END); - msg.size = htons (sizeof (msg)); - - client_send_msg (client, &msg); -} - - -void -app_notify_ego (struct Ego *ego, struct GNUNET_SERVICE_Client *client) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending ego notification to client: %s\n", - client, ego->name); - - size_t name_size = strlen (ego->name) + 1; - struct AppEgoMessage *emsg = GNUNET_malloc (sizeof (*emsg) + name_size); - emsg->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO); - emsg->header.size = htons (sizeof (*emsg) + name_size); - - GNUNET_CRYPTO_ecdsa_key_get_public (&ego->key, &emsg->ego_pub_key); - GNUNET_memcpy (&emsg[1], ego->name, name_size); - - client_send_msg (client, &emsg->header); - GNUNET_free (emsg); -} - - -void -app_notify_ego_end (struct GNUNET_SERVICE_Client *client) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending end of ego list notification to client\n", - client); - - struct GNUNET_MessageHeader msg; - msg.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END); - msg.size = htons (sizeof (msg)); - - client_send_msg (client, &msg); -} - - -int -app_place_entry_notify (void *cls, const struct GNUNET_HashCode *key, void *value) -{ - struct GNUNET_MessageHeader * - msg = GNUNET_CONTAINER_multihashmap_get (places, key); - if (NULL != msg) - app_notify_place (msg, cls); - return GNUNET_YES; -} - - -int -ego_entry (void *cls, const struct GNUNET_HashCode *key, void *value) -{ - app_notify_ego (value, cls); - return GNUNET_YES; + GNUNET_SERVICE_client_continue (client); } @@ -2154,13 +2196,15 @@ handle_client_app_detach (void *cls, } -int -app_places_entry_remove (void *cls, const struct GNUNET_HashCode *key, void *value) +static void +place_leave_cb (void *cls) { struct Place *plc = cls; - const char *app_id = value; - app_place_remove (app_id, &plc->ego_pub_key, &plc->pub_key); - return GNUNET_YES; + + place_send_leave_ack (plc); + (GNUNET_YES == plc->is_host) + ? cleanup_host ((struct Host *) plc) + : cleanup_guest ((struct Guest *) plc); } @@ -2174,6 +2218,11 @@ handle_client_place_leave (void *cls, struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Place *plc = c->place; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "got leave request from %s for place %s", + plc->is_host? "host" : "slave", + GNUNET_h2s (&plc->pub_key_hash)); if (NULL == plc) { GNUNET_break (0); @@ -2181,40 +2230,28 @@ handle_client_place_leave (void *cls, return; } - /* FIXME: remove all app subscriptions and leave this place */ - - struct GNUNET_CONTAINER_MultiHashMap * - place_apps = GNUNET_CONTAINER_multihashmap_get (places_apps, &plc->pub_key_hash); - if (NULL != place_apps) + if (GNUNET_YES != plc->is_disconnecting) { - GNUNET_CONTAINER_multihashmap_iterate (place_apps, app_places_entry_remove, plc); - } - - /* FIXME: disconnect from the network, but keep local connection for history access */ - - /* Disconnect all clients connected to the place */ - struct ClientListItem *cli = plc->clients_head, *next; - while (NULL != cli) - { - GNUNET_CONTAINER_DLL_remove (plc->clients_head, plc->clients_tail, cli); - GNUNET_SERVICE_client_drop (cli->client); - next = cli->next; - GNUNET_free (cli); - cli = next; - } - - if (GNUNET_YES != plc->is_disconnected) - { - plc->is_disconnected = GNUNET_YES; - if (NULL != plc->tmit_msgs_head) - { /* Send pending messages to PSYC before cleanup. */ - psyc_transmit_message (plc); + plc->is_disconnecting = GNUNET_YES; + if (plc->is_host) + { + struct Host *host = plc->host; + GNUNET_assert (NULL != host); + GNUNET_PSYC_master_stop (host->master, GNUNET_NO, &place_leave_cb, plc); } else { - cleanup_place (plc); + struct Guest *guest = plc->guest; + GNUNET_assert (NULL != guest); + GNUNET_PSYC_slave_part (guest->slave, GNUNET_NO, &place_leave_cb, plc); } } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "got leave request but place is already leaving\n"); + } + GNUNET_SERVICE_client_continue (client); } @@ -2273,6 +2310,9 @@ handle_client_join_decision (void *cls, ? (struct GNUNET_PSYC_Message *) &dcsn[1] : NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "jcls.msg = %p\n", + jcls.msg); struct GNUNET_HashCode slave_pub_hash; GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key), &slave_pub_hash); @@ -2302,10 +2342,11 @@ handle_client_join_decision (void *cls, static void send_message_ack (struct Place *plc, struct GNUNET_SERVICE_Client *client) { - struct GNUNET_MessageHeader res; - res.size = htons (sizeof (res)); - res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); - client_send_msg (client, &res); + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), + env); } @@ -2437,7 +2478,6 @@ psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data) { *data_size = 0; tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg); - plc->is_disconnected = GNUNET_YES; GNUNET_SERVICE_client_drop (tmit_frag->client); GNUNET_SCHEDULER_add_now (&cleanup_place, plc); return ret; @@ -2479,11 +2519,7 @@ psyc_transmit_notify_data (void *cls, uint16_t *data_size, void *data) { psyc_transmit_message (plc); } - else if (GNUNET_YES == plc->is_disconnected) - { - /* FIXME: handle partial message (when still in_transmit) */ - cleanup_place (plc); - } + /* FIXME: handle partial message (when still in_transmit) */ } return ret; } @@ -2597,7 +2633,6 @@ psyc_transmit_notify_mod (void *cls, uint16_t *data_size, void *data, *data_size = 0; ret = GNUNET_SYSERR; tmit_msg = psyc_transmit_queue_next_msg (plc, tmit_msg); - plc->is_disconnected = GNUNET_YES; GNUNET_SERVICE_client_drop (tmit_frag->client); GNUNET_SCHEDULER_add_now (&cleanup_place, plc); } @@ -2862,26 +2897,26 @@ psyc_transmit_queue_message (struct Place *plc, } -/** - * Cancel transmission of current message to PSYC. - * - * @param plc Place to send to. - * @param client Client the message originates from. - */ -static void -psyc_transmit_cancel (struct Place *plc, struct GNUNET_SERVICE_Client *client) -{ - uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; - - struct GNUNET_MessageHeader msg; - msg.size = htons (sizeof (msg)); - msg.type = htons (type); - - psyc_transmit_queue_message (plc, client, sizeof (msg), &msg, type, type, NULL); - psyc_transmit_message (plc); - - /* FIXME: cleanup */ -} +///** +// * Cancel transmission of current message to PSYC. +// * +// * @param plc Place to send to. +// * @param client Client the message originates from. +// */ +//static void +//psyc_transmit_cancel (struct Place *plc, struct GNUNET_SERVICE_Client *client) +//{ +// uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; +// +// struct GNUNET_MessageHeader msg; +// msg.size = htons (sizeof (msg)); +// msg.type = htons (type); +// +// psyc_transmit_queue_message (plc, client, sizeof (msg), &msg, type, type, NULL); +// psyc_transmit_message (plc); +// +// /* FIXME: cleanup */ +//} static int @@ -2902,17 +2937,19 @@ handle_client_psyc_message (void *cls, struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Place *plc = c->place; + int ret; + if (NULL == plc) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "received PSYC message for non-existing client %p\n", + client); GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } - - int ret = GNUNET_SYSERR; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message from client.\n", plc); + "%p Received message of type %d from client.\n", plc, ntohs (msg->type)); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); if (GNUNET_YES != plc->is_ready) @@ -2933,20 +2970,19 @@ handle_client_psyc_message (void *cls, "%p Received message with invalid payload size (%u) from client.\n", plc, psize); GNUNET_break (0); - psyc_transmit_cancel (plc, client); GNUNET_SERVICE_client_drop (client); return; } - uint16_t first_ptype = 0, last_ptype = 0; - if (GNUNET_SYSERR - == GNUNET_PSYC_receive_check_parts (psize, (const char *) &msg[1], - &first_ptype, &last_ptype)) + uint16_t first_ptype = 0; + uint16_t last_ptype = 0; + if (GNUNET_SYSERR == + GNUNET_PSYC_receive_check_parts (psize, (const char *) &msg[1], + &first_ptype, &last_ptype)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Received invalid message part from client.\n", plc); GNUNET_break (0); - psyc_transmit_cancel (plc, client); GNUNET_SERVICE_client_drop (client); return; } @@ -2963,20 +2999,19 @@ handle_client_psyc_message (void *cls, c->tmit_msg = NULL; ret = psyc_transmit_message (plc); } - + else + { + ret = GNUNET_SYSERR; + } if (GNUNET_OK != ret) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Received invalid message part from client.\n", plc); GNUNET_break (0); - psyc_transmit_cancel (plc, client); - ret = GNUNET_SYSERR; - } - - if (GNUNET_OK == ret) - GNUNET_SERVICE_client_continue (client); - else GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_SERVICE_client_continue (client); } @@ -3006,7 +3041,7 @@ psyc_recv_history_message (void *cls, const struct GNUNET_PSYC_MessageHeader *ms GNUNET_memcpy (&res[1], msg, size); /** @todo FIXME: send only to requesting client */ - place_send_msg (plc, &res->header); + place_send_msg (plc, GNUNET_MQ_msg_copy (&res->header)); GNUNET_free (res); } @@ -3108,29 +3143,24 @@ psyc_recv_state_var (void *cls, uint32_t value_size, uint32_t full_value_size) { + struct GNUNET_OperationResultMessage *result_msg; + struct GNUNET_MQ_Envelope *env; struct OperationClosure *opcls = cls; struct Client *c = opcls->client; struct Place *plc = c->place; + uint16_t size = ntohs (mod->size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Received state variable %s from PSYC\n", plc, name); - - uint16_t size = ntohs (mod->size); - - struct GNUNET_OperationResultMessage * - res = GNUNET_malloc (sizeof (*res) + size); - res->header.size = htons (sizeof (*res) + size); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); - res->op_id = opcls->op_id; - res->result_code = GNUNET_htonll (GNUNET_OK); - - GNUNET_memcpy (&res[1], mod, size); - + env = GNUNET_MQ_msg_extra (result_msg, + size, + GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + result_msg->op_id = opcls->op_id; + result_msg->result_code = GNUNET_htonll (GNUNET_OK); + GNUNET_memcpy (&result_msg[1], mod, size); /** @todo FIXME: send only to requesting client */ - place_send_msg (plc, &res->header); - - GNUNET_free (res); + place_send_msg (plc, env); } @@ -3184,7 +3214,7 @@ handle_client_state_get (void *cls, uint16_t size = ntohs (req->header.size); const char *name = (const char *) &req[1]; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p State get #%" PRIu64 ": %s\n", plc, GNUNET_ntohll (req->op_id), name); @@ -3382,7 +3412,7 @@ path_basename (const char *path) if (NULL != basename) basename++; - if (NULL == basename || '\0' == basename) + if (NULL == basename || '\0' == *basename) return NULL; return basename; @@ -3468,7 +3498,10 @@ file_place_load (void *cls, const char *place_filename) return GNUNET_OK; } - app_place_add (plcls->app_id, ereq); + if (GNUNET_SYSERR == app_place_add (plcls->app_id, ereq)) + { + GNUNET_assert (0); + } GNUNET_free (ereq); return GNUNET_OK; } @@ -3523,6 +3556,10 @@ identity_recv_ego (void *cls, struct GNUNET_IDENTITY_Ego *id_ego, if (NULL == id_ego) // end of initial list of egos return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "social service received ego %s\n", + name); + struct GNUNET_CRYPTO_EcdsaPublicKey ego_pub_key; GNUNET_IDENTITY_ego_get_public_key (id_ego, &ego_pub_key); @@ -3571,6 +3608,9 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *svc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "starting social service\n"); + cfg = c; service = svc; GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer); @@ -3583,7 +3623,7 @@ run (void *cls, apps = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); places = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_NO); apps_places = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_NO); - places_apps = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_NO); + //places_apps = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_NO); id = GNUNET_IDENTITY_connect (cfg, &identity_recv_ego, NULL); gns = GNUNET_GNS_connect (cfg); diff --git a/src/social/gnunet-social.c b/src/social/gnunet-social.c @@ -283,7 +283,7 @@ exit_fail () static void host_left (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "The host has left the place.\n"); exit_success (); } @@ -309,7 +309,7 @@ host_leave () static void guest_left (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Guest has left the place.\n"); } @@ -518,7 +518,7 @@ look_var (void *cls, uint32_t value_size, uint32_t full_value_size) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Received var: %s\n%.*s\n", name, value_size, (const char *) value); } @@ -558,7 +558,7 @@ slicer_recv_method (void *cls, const char *method_name) { method_received = method_name; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Received method for message ID %" PRIu64 ":\n" "%s (flags: %x)\n", message_id, method_name, ntohl (meth->flags)); @@ -584,7 +584,7 @@ slicer_recv_modifier (void *cls, uint16_t full_value_size) { #if 0 - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Received modifier for message ID %" PRIu64 ":\n" "%c%s: %.*s (size: %u)\n", message_id, oper, name, value_size, (const char *) value, value_size); @@ -608,7 +608,7 @@ slicer_recv_data (void *cls, uint16_t data_size) { #if 0 - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Received data for message ID %" PRIu64 ":\n" "%.*s\n", message_id, data_size, (const char *) data); @@ -631,7 +631,7 @@ slicer_recv_eom (void *cls, uint8_t is_cancelled) { printf(".\n"); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Received end of message ID %" PRIu64 ", cancelled: %u\n", message_id, is_cancelled); @@ -668,7 +668,7 @@ guest_recv_entry_decision (void *cls, int is_admitted, const struct GNUNET_PSYC_Message *entry_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Guest received entry decision %d\n", is_admitted); @@ -683,7 +683,7 @@ guest_recv_entry_decision (void *cls, GNUNET_PSYC_message_parse (pmsg, &method_name, env, &data, &data_size); GNUNET_free (pmsg); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "%s\n%.*s\n", method_name, data_size, (const char *) data); } @@ -704,7 +704,7 @@ guest_recv_local_enter (void *cls, int result, uint64_t max_message_id) { char *pub_str = GNUNET_CRYPTO_eddsa_public_key_to_string (pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Guest entered local place: %s, max_message_id: %" PRIu64 "\n", pub_str, max_message_id); GNUNET_free (pub_str); @@ -802,7 +802,7 @@ host_answer_door (void *cls, char * nym_str = GNUNET_CRYPTO_ecdsa_public_key_to_string (nym_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Entry request: %s\n", nym_str); GNUNET_free (nym_str); @@ -840,7 +840,7 @@ host_farewell (void *cls, char * nym_str = GNUNET_CRYPTO_ecdsa_public_key_to_string (nym_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Farewell: %s\n", nym_str); GNUNET_free (nym_str); } @@ -856,7 +856,7 @@ host_entered (void *cls, int result, { place_pub_key = *pub_key; char *pub_str = GNUNET_CRYPTO_eddsa_public_key_to_string (pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Host entered: %s, max_message_id: %" PRIu64 "\n", pub_str, max_message_id); GNUNET_free (pub_str); diff --git a/src/social/social_api.c b/src/social/social_api.c @@ -183,6 +183,7 @@ struct GNUNET_SOCIAL_Place */ struct GNUNET_PSYC_Slicer *slicer; + // FIXME: do we need is_disconnecing like on the psyc and multicast APIs? /** * Function called after disconnected from the service. */ @@ -371,6 +372,68 @@ struct ZoneAddNymHandle }; +/*** CLEANUP / DISCONNECT ***/ + + +static void +host_cleanup (struct GNUNET_SOCIAL_Host *hst) +{ + if (NULL != hst->slicer) + { + GNUNET_PSYC_slicer_destroy (hst->slicer); + hst->slicer = NULL; + } + GNUNET_free (hst); +} + + +static void +guest_cleanup (struct GNUNET_SOCIAL_Guest *gst) +{ + GNUNET_free (gst); +} + + +static void +place_cleanup (struct GNUNET_SOCIAL_Place *plc) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "cleaning up place %p\n", + plc); + if (NULL != plc->tmit) + { + GNUNET_PSYC_transmit_destroy (plc->tmit); + plc->tmit = NULL; + } + if (NULL != plc->connect_env) + { + GNUNET_MQ_discard (plc->connect_env); + plc->connect_env = NULL; + } + if (NULL != plc->mq) + { + GNUNET_MQ_destroy (plc->mq); + plc->mq = NULL; + } + if (NULL != plc->disconnect_cb) + { + plc->disconnect_cb (plc->disconnect_cls); + plc->disconnect_cb = NULL; + } + + (GNUNET_YES == plc->is_host) + ? host_cleanup ((struct GNUNET_SOCIAL_Host *) plc) + : guest_cleanup ((struct GNUNET_SOCIAL_Guest *) plc); +} + + +static void +place_disconnect (struct GNUNET_SOCIAL_Place *plc) +{ + place_cleanup (plc); +} + + /*** NYM ***/ static struct GNUNET_SOCIAL_Nym * @@ -428,7 +491,7 @@ host_recv_notice_place_leave_method (void *cls, struct GNUNET_SOCIAL_Nym *nym = nym_get_or_create (&msg->slave_pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Host received method for message ID %" PRIu64 " from nym %s: %s\n", message_id, GNUNET_h2s (&nym->pub_key_hash), method_name); @@ -436,7 +499,7 @@ host_recv_notice_place_leave_method (void *cls, hst->notice_place_leave_env = GNUNET_PSYC_env_create (); char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&hst->notice_place_leave_nym->pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "_notice_place_leave: got method from nym %s (%s).\n", GNUNET_h2s (&hst->notice_place_leave_nym->pub_key_hash), str); GNUNET_free (str); @@ -458,7 +521,7 @@ host_recv_notice_place_leave_modifier (void *cls, if (NULL == hst->notice_place_leave_env) return; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Host received modifier for _notice_place_leave message with ID %" PRIu64 ":\n" "%c%s: %.*s\n", message_id, oper, name, value_size, (const char *) value); @@ -485,7 +548,7 @@ host_recv_notice_place_leave_eom (void *cls, return; char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&hst->notice_place_leave_nym->pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "_notice_place_leave: got EOM from nym %s (%s).\n", GNUNET_h2s (&hst->notice_place_leave_nym->pub_key_hash), str); GNUNET_free (str); @@ -1015,100 +1078,24 @@ handle_app_place_end (void *cls, } -/*** CLEANUP / DISCONNECT ***/ - - -static void -host_cleanup (struct GNUNET_SOCIAL_Host *hst) -{ - if (NULL != hst->slicer) - { - GNUNET_PSYC_slicer_destroy (hst->slicer); - hst->slicer = NULL; - } - GNUNET_free (hst); -} - - +/** + * Handler for a #GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE_ACK message received + * from the social service. + * + * @param cls the place of type `struct GNUNET_SOCIAL_Place` + * @param msg the message received from the service + */ static void -guest_cleanup (struct GNUNET_SOCIAL_Guest *gst) +handle_place_leave_ack (void *cls, + const struct GNUNET_MessageHeader *msg) { - GNUNET_free (gst); -} - + struct GNUNET_SOCIAL_Place *plc = cls; -static void -place_cleanup (struct GNUNET_SOCIAL_Place *plc) -{ - struct GNUNET_HashCode place_pub_hash; - GNUNET_CRYPTO_hash (&plc->pub_key, sizeof (plc->pub_key), &place_pub_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s place cleanup: %s\n", - GNUNET_YES == plc->is_host ? "host" : "guest", - GNUNET_h2s (&place_pub_hash)); - - if (NULL != plc->tmit) - { - GNUNET_PSYC_transmit_destroy (plc->tmit); - plc->tmit = NULL; - } - if (NULL != plc->connect_env) - { - GNUNET_MQ_discard (plc->connect_env); - plc->connect_env = NULL; - } - if (NULL != plc->mq) - { - GNUNET_MQ_destroy (plc->mq); - plc->mq = NULL; - } - if (NULL != plc->disconnect_cb) - { - plc->disconnect_cb (plc->disconnect_cls); - plc->disconnect_cb = NULL; - } - - (GNUNET_YES == plc->is_host) - ? host_cleanup ((struct GNUNET_SOCIAL_Host *) plc) - : guest_cleanup ((struct GNUNET_SOCIAL_Guest *) plc); -} - - -void -place_disconnect (struct GNUNET_SOCIAL_Place *plc, - GNUNET_ContinuationCallback cb, - void *cls) -{ - plc->disconnect_cb = cb; - plc->disconnect_cls = cls; - - if (NULL != plc->mq) - { - struct GNUNET_MQ_Envelope *env = GNUNET_MQ_get_last_envelope (plc->mq); - if (NULL != env) - { - GNUNET_MQ_notify_sent (env, (GNUNET_SCHEDULER_TaskCallback) place_cleanup, plc); - } - else - { - place_cleanup (plc); - } - } - else - { - place_cleanup (plc); - } -} - - -void -place_leave (struct GNUNET_SOCIAL_Place *plc) -{ - struct GNUNET_MessageHeader *msg; - struct GNUNET_MQ_Envelope * - env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); - - GNUNET_MQ_send (plc->mq, env); + "%s left place %p\n", + plc->is_host ? "host" : "guest", + plc); + place_disconnect (plc); } @@ -1168,6 +1155,10 @@ host_connect (struct GNUNET_SOCIAL_Host *hst) GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK, struct HostEnterAck, hst), + GNUNET_MQ_hd_fixed_size (place_leave_ack, + GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE_ACK, + struct GNUNET_MessageHeader, + plc), GNUNET_MQ_hd_var_size (host_enter_request, GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, struct GNUNET_PSYC_JoinRequestMessage, @@ -1516,6 +1507,9 @@ GNUNET_SOCIAL_host_announce (struct GNUNET_SOCIAL_Host *hst, void *notify_data_cls, enum GNUNET_SOCIAL_AnnounceFlags flags) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "PSYC_transmit_message for host, method: %s\n", + method_name); if (GNUNET_OK == GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env, NULL, notify_data, notify_data_cls, flags)) @@ -1580,7 +1574,11 @@ GNUNET_SOCIAL_host_disconnect (struct GNUNET_SOCIAL_Host *hst, GNUNET_ContinuationCallback disconnect_cb, void *cls) { - place_disconnect (&hst->plc, disconnect_cb, cls); + struct GNUNET_SOCIAL_Place *plc = &hst->plc; + + plc->disconnect_cb = disconnect_cb; + plc->disconnect_cls = cls; + place_disconnect (plc); } @@ -1607,10 +1605,15 @@ GNUNET_SOCIAL_host_leave (struct GNUNET_SOCIAL_Host *hst, GNUNET_ContinuationCallback disconnect_cb, void *cls) { + struct GNUNET_MQ_Envelope *envelope; + GNUNET_SOCIAL_host_announce (hst, "_notice_place_closing", env, NULL, NULL, GNUNET_SOCIAL_ANNOUNCE_NONE); - place_leave (&hst->plc); - GNUNET_SOCIAL_host_disconnect (hst, disconnect_cb, cls); + hst->plc.disconnect_cb = disconnect_cb; + hst->plc.disconnect_cls = cls; + envelope = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); + GNUNET_MQ_send (hst->plc.mq, + envelope); } @@ -1670,6 +1673,10 @@ guest_connect (struct GNUNET_SOCIAL_Guest *gst) GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK, struct GNUNET_PSYC_CountersResultMessage, gst), + GNUNET_MQ_hd_fixed_size (place_leave_ack, + GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE_ACK, + struct GNUNET_MessageHeader, + plc), GNUNET_MQ_hd_var_size (guest_enter_decision, GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, struct GNUNET_PSYC_JoinDecisionMessage, @@ -1896,6 +1903,64 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app, } +struct ReconnectContext +{ + struct GNUNET_SOCIAL_Guest *guest; + int *result; + int64_t *max_message_id; + GNUNET_SOCIAL_GuestEnterCallback enter_cb; + void *enter_cls; +}; + + +static void +guest_enter_reconnect_cb (void *cls, + int result, + const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, + uint64_t max_message_id) +{ + struct ReconnectContext *reconnect_ctx = cls; + + GNUNET_assert (NULL != reconnect_ctx); + reconnect_ctx->result = GNUNET_new (int); + *(reconnect_ctx->result) = result; + reconnect_ctx->max_message_id = GNUNET_new (int64_t); + *(reconnect_ctx->max_message_id) = max_message_id; +} + + +static void +guest_entry_dcsn_reconnect_cb (void *cls, + int is_admitted, + const struct GNUNET_PSYC_Message *entry_resp) +{ + struct ReconnectContext *reconnect_ctx = cls; + struct GNUNET_SOCIAL_Guest *gst = reconnect_ctx->guest; + + GNUNET_assert (NULL != reconnect_ctx); + GNUNET_assert (NULL != reconnect_ctx->result); + GNUNET_assert (NULL != reconnect_ctx->max_message_id); + if (GNUNET_YES != is_admitted) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Guest was rejected after calling " + "GNUNET_SOCIAL_guest_enter_reconnect ()\n"); + } + else if (NULL != reconnect_ctx->enter_cb) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "guest reconnected!\n"); + reconnect_ctx->enter_cb (reconnect_ctx->enter_cls, + *(reconnect_ctx->result), + &gst->plc.pub_key, + *(reconnect_ctx->max_message_id)); + } + GNUNET_free (reconnect_ctx->result); + GNUNET_free (reconnect_ctx->max_message_id); + GNUNET_free (reconnect_ctx); +} + + /** * Reconnect to an already entered place as guest. * @@ -1906,8 +1971,8 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app, * Flags for the entry. * @param slicer * Slicer to use for processing incoming requests from guests. - * @param local_enter_cb - * Called upon connection established to the social service. + * @param enter_cb + * Called upon re-entering is complete. * @param entry_decision_cb * Called upon receiving entry decision. * @@ -1917,11 +1982,12 @@ struct GNUNET_SOCIAL_Guest * GNUNET_SOCIAL_guest_enter_reconnect (struct GNUNET_SOCIAL_GuestConnection *gconn, enum GNUNET_PSYC_SlaveJoinFlags flags, struct GNUNET_PSYC_Slicer *slicer, - GNUNET_SOCIAL_GuestEnterCallback local_enter_cb, + GNUNET_SOCIAL_GuestEnterCallback enter_cb, void *cls) { struct GNUNET_SOCIAL_Guest *gst = GNUNET_malloc (sizeof (*gst)); struct GNUNET_SOCIAL_Place *plc = &gst->plc; + struct ReconnectContext *reconnect_ctx; uint16_t app_id_size = strlen (gconn->app->id) + 1; struct GuestEnterRequest *greq; @@ -1940,10 +2006,15 @@ GNUNET_SOCIAL_guest_enter_reconnect (struct GNUNET_SOCIAL_GuestConnection *gconn plc->pub_key = gconn->plc_msg.place_pub_key; plc->ego_pub_key = gconn->plc_msg.ego_pub_key; - plc->op = GNUNET_OP_create (); + reconnect_ctx = GNUNET_new (struct ReconnectContext); + reconnect_ctx->guest = gst; + reconnect_ctx->enter_cb = enter_cb; + reconnect_ctx->enter_cls = cls; - gst->enter_cb = local_enter_cb; - gst->cb_cls = cls; + plc->op = GNUNET_OP_create (); + gst->enter_cb = &guest_enter_reconnect_cb; + gst->entry_dcsn_cb = &guest_entry_dcsn_reconnect_cb; + gst->cb_cls = reconnect_ctx; guest_connect (gst); return gst; @@ -2028,7 +2099,11 @@ GNUNET_SOCIAL_guest_disconnect (struct GNUNET_SOCIAL_Guest *gst, GNUNET_ContinuationCallback disconnect_cb, void *cls) { - place_disconnect (&gst->plc, disconnect_cb, cls); + struct GNUNET_SOCIAL_Place *plc = &gst->plc; + + plc->disconnect_cb = disconnect_cb; + plc->disconnect_cls = cls; + place_disconnect (plc); } @@ -2054,10 +2129,15 @@ GNUNET_SOCIAL_guest_leave (struct GNUNET_SOCIAL_Guest *gst, GNUNET_ContinuationCallback disconnect_cb, void *cls) { + struct GNUNET_MQ_Envelope *envelope; + GNUNET_SOCIAL_guest_talk (gst, "_notice_place_leave", env, NULL, NULL, GNUNET_SOCIAL_TALK_NONE); - place_leave (&gst->plc); - GNUNET_SOCIAL_guest_disconnect (gst, disconnect_cb, cls); + gst->plc.disconnect_cb = disconnect_cb; + gst->plc.disconnect_cls = cls; + envelope = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); + GNUNET_MQ_send (gst->plc.mq, + envelope); } diff --git a/src/social/test_social.c b/src/social/test_social.c @@ -129,22 +129,22 @@ enum TEST_HOST_ANSWER_DOOR_REFUSE = 4, TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 5, TEST_HOST_ANSWER_DOOR_ADMIT = 6, - TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 9, - TEST_HOST_ANNOUNCE = 10, - TEST_HOST_ANNOUNCE_END = 11, - TEST_GUEST_TALK = 12, - TEST_HOST_ANNOUNCE2 = 13, - TEST_HOST_ANNOUNCE2_END = 14, - TEST_GUEST_HISTORY_REPLAY = 15, - TEST_GUEST_HISTORY_REPLAY_LATEST = 16, - TEST_GUEST_LOOK_AT = 17, - TEST_GUEST_LOOK_FOR = 18, - TEST_GUEST_LEAVE = 18, - TEST_ZONE_ADD_PLACE = 20, - TEST_GUEST_ENTER_BY_NAME = 21, - TEST_RECONNECT = 22, - TEST_GUEST_LEAVE2 = 23, - TEST_HOST_LEAVE = 24, + TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 7, + TEST_HOST_ANNOUNCE = 8, + TEST_HOST_ANNOUNCE_END = 9, + TEST_GUEST_TALK = 10, + TEST_HOST_ANNOUNCE2 = 11, + TEST_HOST_ANNOUNCE2_END = 12, + TEST_GUEST_HISTORY_REPLAY = 13, + TEST_GUEST_HISTORY_REPLAY_LATEST = 14, + TEST_GUEST_LOOK_AT = 15, + TEST_GUEST_LOOK_FOR = 16, + TEST_GUEST_LEAVE = 17, + TEST_ZONE_ADD_PLACE = 18, + TEST_GUEST_ENTER_BY_NAME = 19, + TEST_RECONNECT = 20, + TEST_GUEST_LEAVE2 = 21, + TEST_HOST_LEAVE = 22, } test; @@ -180,10 +180,28 @@ host_announce2 (); /** - * Clean up all resources used. + * Terminate the test case (failure). + * + * @param cls NULL + */ +static void +end_badly (void *cls) +{ + end_badly_task = NULL; + GNUNET_SCHEDULER_shutdown (); + res = 2; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Test FAILED.\n"); +} + + +/** + * Terminate the test case (failure). + * + * @param cls NULL */ static void -cleanup () +end_shutdown (void *cls) { if (NULL != id) { @@ -202,7 +220,11 @@ cleanup () GNUNET_PSYC_slicer_destroy (host_slicer); host_slicer = NULL; } - + if (NULL != end_badly_task) + { + GNUNET_SCHEDULER_cancel (end_badly_task); + end_badly_task = NULL; + } if (NULL != gst) { GNUNET_SOCIAL_guest_leave (gst, NULL, NULL, NULL); @@ -216,21 +238,6 @@ cleanup () hst_plc = NULL; } GNUNET_SOCIAL_app_disconnect (app, NULL, NULL); - GNUNET_SCHEDULER_shutdown (); -} - - -/** - * Terminate the test case (failure). - * - * @param cls NULL - */ -static void -end_badly (void *cls) -{ - res = 1; - cleanup (); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n"); } @@ -242,9 +249,9 @@ end_badly (void *cls) static void end_normally (void *cls) { + GNUNET_SCHEDULER_shutdown (); res = 0; - cleanup (); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test PASSED.\n"); } @@ -254,7 +261,7 @@ end_normally (void *cls) static void end () { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Ending tests.\n", test); if (end_badly_task != NULL) @@ -271,7 +278,7 @@ transmit_resume (void *cls) { struct TransmitClosure *tmit = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Transmission resumed.\n", test); if (NULL != tmit->host_ann) GNUNET_SOCIAL_host_announce_resume (tmit->host_ann); @@ -296,7 +303,7 @@ notify_data (void *cls, uint16_t *data_size, void *data) } uint16_t size = strlen (tmit->data[tmit->n]); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Transmit notify data: %u bytes available, " "processing fragment %u/%u (size %u).\n", test, *data_size, tmit->n + 1, tmit->data_count, size); @@ -309,7 +316,7 @@ notify_data (void *cls, uint16_t *data_size, void *data) if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Transmission paused.\n", test); tmit->paused = GNUNET_YES; GNUNET_SCHEDULER_add_delayed ( @@ -331,7 +338,7 @@ notify_data (void *cls, uint16_t *data_size, void *data) static void host_left () { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: The host has left the place.\n", test); end (); } @@ -352,7 +359,7 @@ host_farewell2 (void *cls, const struct GNUNET_SOCIAL_Nym *nym, struct GNUNET_PSYC_Environment *env) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Nym left the place again.\n"); GNUNET_SCHEDULER_add_now (&schedule_host_leave, NULL); } @@ -365,13 +372,14 @@ host_reconnected (void *cls, int result, { place_pub_key = *home_pub_key; GNUNET_CRYPTO_hash (&place_pub_key, sizeof (place_pub_key), &place_pub_hash); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host reconnected to place %s\n", test, GNUNET_h2s (&place_pub_hash)); is_host_reconnected = GNUNET_YES; if (GNUNET_YES == is_guest_reconnected) { + GNUNET_assert (NULL != gst); GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL); } } @@ -382,7 +390,7 @@ guest_reconnected (void *cls, int result, const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, uint64_t max_message_id) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest reconnected to place: %d\n", test, result); GNUNET_assert (0 <= result); @@ -390,6 +398,7 @@ guest_reconnected (void *cls, int result, is_guest_reconnected = GNUNET_YES; if (GNUNET_YES == is_host_reconnected) { + GNUNET_assert (NULL != gst); GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL); } } @@ -398,7 +407,7 @@ guest_reconnected (void *cls, int result, static void app_connected (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: App connected: %p\n", test, cls); } @@ -411,21 +420,28 @@ app_recv_host (void *cls, enum GNUNET_SOCIAL_AppPlaceState place_state) { struct GNUNET_HashCode host_pub_hash; - GNUNET_CRYPTO_hash (host_pub_key, sizeof (*host_pub_key), &host_pub_hash); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_CRYPTO_hash (host_pub_key, + sizeof (*host_pub_key), + &host_pub_hash); + + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Got app host place notification: %s\n", - test, GNUNET_h2s (&host_pub_hash)); + test, + GNUNET_h2s (&host_pub_hash)); if (test == TEST_RECONNECT) { if (0 == memcmp (&place_pub_key, host_pub_key, sizeof (*host_pub_key))) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Reconnecting to host place: %s\n", test, GNUNET_h2s (&host_pub_hash)); - hst = GNUNET_SOCIAL_host_enter_reconnect (hconn, host_slicer, host_reconnected, - host_answer_door, host_farewell2, NULL); + hst = GNUNET_SOCIAL_host_enter_reconnect (hconn, host_slicer, + &host_reconnected, + &host_answer_door, + &host_farewell2, + NULL); } } } @@ -439,21 +455,30 @@ app_recv_guest (void *cls, enum GNUNET_SOCIAL_AppPlaceState place_state) { struct GNUNET_HashCode guest_pub_hash; - GNUNET_CRYPTO_hash (guest_pub_key, sizeof (*guest_pub_key), &guest_pub_hash); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_CRYPTO_hash (guest_pub_key, + sizeof (*guest_pub_key), + &guest_pub_hash); + + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Got app guest place notification: %s\n", test, GNUNET_h2s (&guest_pub_hash)); if (test == TEST_RECONNECT) { - if (0 == memcmp (&place_pub_key, guest_pub_key, sizeof (*guest_pub_key))) + if (0 == memcmp (&place_pub_key, + guest_pub_key, + sizeof (*guest_pub_key))) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Reconnecting to guest place: %s\n", test, GNUNET_h2s (&guest_pub_hash)); - gst = GNUNET_SOCIAL_guest_enter_reconnect (gconn, GNUNET_PSYC_SLAVE_JOIN_NONE, - guest_slicer, guest_reconnected, NULL); + gst = GNUNET_SOCIAL_guest_enter_reconnect (gconn, + GNUNET_PSYC_SLAVE_JOIN_NONE, + guest_slicer, + &guest_reconnected, + NULL); + GNUNET_assert (NULL != gst); } } } @@ -478,7 +503,7 @@ app_recv_ego (void *cls, const char *name) { char *ego_pub_str = GNUNET_CRYPTO_ecdsa_public_key_to_string (ego_pub_key); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Got app ego notification: %p %s %s\n", test, ego, name, ego_pub_str); GNUNET_free (ego_pub_str); @@ -487,15 +512,30 @@ app_recv_ego (void *cls, { host_ego = ego; host_pub_key = ego_pub_key; - GNUNET_assert (TEST_IDENTITIES_CREATE == test); - enter_if_ready (); + if (TEST_IDENTITIES_CREATE == test) + { + enter_if_ready (); + } + else + { + GNUNET_assert (TEST_RECONNECT == test); + } } else if (NULL != strstr (name, guest_name)) { guest_ego = ego; guest_pub_key = ego_pub_key; - GNUNET_assert (TEST_IDENTITIES_CREATE == test); - enter_if_ready (); + if (TEST_IDENTITIES_CREATE == test) + { + enter_if_ready (); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "test = %d\n", + test); + GNUNET_assert (TEST_RECONNECT == test); + } } } @@ -504,7 +544,6 @@ static void schedule_reconnect (void *cls) { test = TEST_RECONNECT; - GNUNET_SOCIAL_host_disconnect (hst, NULL, NULL); GNUNET_SOCIAL_guest_disconnect (gst, NULL, NULL); hst = NULL; @@ -512,10 +551,10 @@ schedule_reconnect (void *cls) GNUNET_SOCIAL_app_disconnect (app, NULL, NULL); app = GNUNET_SOCIAL_app_connect (cfg, app_id, - app_recv_ego, - app_recv_host, - app_recv_guest, - app_connected, + &app_recv_ego, + &app_recv_host, + &app_recv_guest, + &app_connected, NULL); } @@ -524,7 +563,7 @@ static void host_recv_zone_add_place_result (void *cls, int64_t result, const void *data, uint16_t data_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Zone add place result: %" PRId64 " (%.*s).\n", test, result, data_size, (const char *) data); GNUNET_assert (GNUNET_YES == result); @@ -538,7 +577,7 @@ static void zone_add_place () { test = TEST_ZONE_ADD_PLACE; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Adding place to zone.\n", test); GNUNET_SOCIAL_zone_add_place (app, host_ego, "home", "let.me*in!", @@ -557,7 +596,7 @@ host_farewell (void *cls, nym_key = GNUNET_SOCIAL_nym_get_pub_key (nym); char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (nym_key); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Farewell: nym %s (%s) has left the place.\n", test, GNUNET_h2s (GNUNET_SOCIAL_nym_get_pub_key_hash (nym)), str); GNUNET_free (str); @@ -578,13 +617,13 @@ host_farewell (void *cls, static void guest_left (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: The guest has left the place.\n", test); } static void -guest_leave() +guest_leave () { if (test < TEST_RECONNECT) test = TEST_GUEST_LEAVE; @@ -615,11 +654,11 @@ guest_look_for_result (void *cls, uint16_t data_size) { struct ResultClosure *rcls = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: guest_look_for_result: %" PRId64 "\n", test, result_code); GNUNET_assert (GNUNET_OK == result_code); - GNUNET_assert (3 == rcls->n); + GNUNET_assert (6 == rcls->n); GNUNET_free (rcls); GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL); } @@ -635,7 +674,7 @@ guest_look_for_var (void *cls, { struct ResultClosure *rcls = cls; rcls->n++; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: guest_look_for_var: %s\n%.*s\n", test, name, value_size, (const char *) value); } @@ -656,7 +695,7 @@ guest_look_at_result (void *cls, int64_t result_code, { struct ResultClosure *rcls = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: guest_look_at_result: %" PRId64 "\n", test, result_code); GNUNET_assert (GNUNET_OK == result_code); @@ -677,7 +716,7 @@ guest_look_at_var (void *cls, struct ResultClosure *rcls = cls; rcls->n++; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: guest_look_at_var: %s\n%.*s\n", test ,name, value_size, (const char *) value); } @@ -696,7 +735,7 @@ static void guest_recv_history_replay_latest_result (void *cls, int64_t result, const void *data, uint16_t data_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received latest history replay result " "(%" PRIu32 " messages, %" PRId64 " fragments):\n" "%.*s\n", @@ -725,7 +764,7 @@ static void guest_recv_history_replay_result (void *cls, int64_t result, const void *data, uint16_t data_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received history replay result: %" PRId64 "\n" "%.*s\n", test, result, data_size, (const char *) data); @@ -756,7 +795,7 @@ guest_recv_method (void *cls, uint64_t message_id, const char *method_name) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received method for message ID %" PRIu64 ":\n" "%s (flags: %x)\n", test, message_id, method_name, ntohl (meth->flags)); @@ -775,7 +814,7 @@ guest_recv_modifier (void *cls, uint16_t value_size, uint16_t full_value_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received modifier for message ID %" PRIu64 ":\n" "%c%s: %.*s (size: %u)\n", test, message_id, oper, name, value_size, (const char *) value, value_size); @@ -793,7 +832,7 @@ guest_recv_mod_foo_bar (void *cls, uint16_t value_size, uint16_t full_value_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received modifier matching _foo_bar for message ID %" PRIu64 ":\n" "%c%s: %.*s (size: %u)\n", test, message_id, oper, name, value_size, (const char *) value, value_size); @@ -811,7 +850,7 @@ guest_recv_data (void *cls, const void *data, uint16_t data_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received data for message ID %" PRIu64 ":\n" "%.*s\n", test, message_id, data_size, (const char *) data); @@ -826,7 +865,7 @@ guest_recv_eom (void *cls, uint64_t message_id, uint8_t is_cancelled) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received end of message ID %" PRIu64 ", cancelled: %u\n", test, message_id, is_cancelled); @@ -868,7 +907,7 @@ host_recv_method (void *cls, uint64_t message_id, const char *method_name) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host received method for message ID %" PRIu64 ":\n" "%s\n", test, message_id, method_name); @@ -887,7 +926,7 @@ host_recv_modifier (void *cls, uint16_t value_size, uint16_t full_value_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host received modifier for message ID %" PRIu64 ":\n" "%c%s: %.*s\n", test, message_id, oper, name, value_size, (const char *) value); @@ -902,7 +941,7 @@ host_recv_data (void *cls, const void *data, uint16_t data_size) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host received data for message ID %" PRIu64 ":\n" "%.*s\n", test, message_id, data_size, (const char *) data); @@ -916,7 +955,7 @@ host_recv_eom (void *cls, uint64_t message_id, uint8_t is_cancelled) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host received end of message ID %" PRIu64 ", cancelled: %u\n", test, message_id, is_cancelled); @@ -981,7 +1020,7 @@ host_announce () { test = TEST_HOST_ANNOUNCE; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host announcement.\n", test); tmit = (struct TransmitClosure) {}; @@ -1015,7 +1054,7 @@ host_announce2 () test = TEST_HOST_ANNOUNCE2; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host announcement 2.\n", test); tmit = (struct TransmitClosure) {}; @@ -1025,7 +1064,7 @@ host_announce2 () GNUNET_PSYC_env_add (tmit.env, GNUNET_PSYC_OP_ASSIGN, "_foo2_bar", DATA2ARG ("FOO BAR")); GNUNET_PSYC_env_add (tmit.env, GNUNET_PSYC_OP_ASSIGN, - "_foo2_bar", DATA2ARG ("FOO BAR BAZ")); + "_foo2_bar_baz", DATA2ARG ("FOO BAR BAZ")); tmit.data[0] = "AAA BBB CCC "; tmit.data[1] = "ABC DEF GHI JKL.\n"; tmit.data[2] = "TESTING ONE TWO THREE.\n"; @@ -1043,7 +1082,7 @@ guest_recv_entry_decision (void *cls, int is_admitted, const struct GNUNET_PSYC_Message *entry_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Guest received entry decision (try %u): %d.\n", test, join_req_count, is_admitted); @@ -1068,7 +1107,8 @@ guest_recv_entry_decision (void *cls, { case TEST_GUEST_RECV_ENTRY_DCSN_REFUSE: GNUNET_assert (GNUNET_NO == is_admitted); - guest_enter (); + test = TEST_HOST_ANSWER_DOOR_ADMIT; + GNUNET_SOCIAL_guest_disconnect (gst, &guest_enter, NULL); break; case TEST_GUEST_RECV_ENTRY_DCSN_ADMIT: @@ -1097,7 +1137,7 @@ host_answer_door (void *cls, { join_req_count++; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Test #%u: Host received entry request from guest (try %u).\n", (uint8_t) test, join_req_count); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1118,7 +1158,7 @@ host_answer_door (void *cls, // fall through case TEST_GUEST_ENTER_BY_NAME: - join_resp = GNUNET_PSYC_message_create ("_notice_place_admit", env, + join_resp = GNUNET_PSYC_message_create ("_notice_place_admit", env, DATA2ARG ("Welcome, nym!")); GNUNET_SOCIAL_host_entry_decision (hst, nym, GNUNET_YES, join_resp); break; @@ -1135,18 +1175,18 @@ guest_recv_local_enter (void *cls, int result, const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, uint64_t max_message_id) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%u: Guest entered to local place: %d\n", + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Test #%u: Guest entered local place: %d\n", test, result); - GNUNET_assert (0 <= result); + GNUNET_assert (GNUNET_OK == result); } static void guest_enter () { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%u: Entering to place as guest.\n", test); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Test #%u: Entering place as guest.\n", test); struct GuestEnterMessage *emsg = &guest_enter_msg; @@ -1177,8 +1217,8 @@ static void guest_enter_by_name () { test = TEST_GUEST_ENTER_BY_NAME; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Test #%u: Entering to place by name as guest.\n", test); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Test #%u: Entering place by name as guest.\n", test); struct GuestEnterMessage *emsg = &guest_enter_msg; @@ -1222,7 +1262,7 @@ guest_init () guest_recv_data, guest_recv_eom, NULL); GNUNET_PSYC_slicer_modifier_add (guest_slicer, "_foo_bar", guest_recv_mod_foo_bar, &mod_foo_bar_rcls); - test = TEST_HOST_ANSWER_DOOR_ADMIT; + test = TEST_HOST_ANSWER_DOOR_REFUSE; GNUNET_SOCIAL_zone_add_nym (app, guest_ego, "host", host_pub_key, GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES), @@ -1270,8 +1310,8 @@ host_entered (void *cls, int result, { place_pub_key = *home_pub_key; GNUNET_CRYPTO_hash (&place_pub_key, sizeof (place_pub_key), &place_pub_hash); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%u: Host entered to place %s\n", + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Test #%u: Host entered place %s\n", test, GNUNET_h2s (&place_pub_hash)); guest_enter (); } @@ -1285,8 +1325,8 @@ host_enter () host_recv_method, host_recv_modifier, host_recv_data, host_recv_eom, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%u: Entering to place as host.\n", test); + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Test #%u: Entering place as host.\n", test); test = TEST_HOST_ENTER; hst = GNUNET_SOCIAL_host_enter (app, host_ego, GNUNET_PSYC_CHANNEL_PRIVATE, @@ -1306,6 +1346,8 @@ start_app_if_ready () { return; } + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "starting app...\n"); app = GNUNET_SOCIAL_app_connect (cfg, app_id, app_recv_ego, @@ -1324,17 +1366,17 @@ identity_ego_cb (void *cls, struct GNUNET_IDENTITY_Ego *ego, { if (ego == identity_host_ego) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Host ego deleted\n"); } else if (ego == identity_guest_ego) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Guest ego deleted\n"); } else if (0 == strcmp (name, host_name)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Created ego %s\n", name); identity_host_ego = ego; @@ -1342,7 +1384,7 @@ identity_ego_cb (void *cls, struct GNUNET_IDENTITY_Ego *ego, } else if (0 == strcmp (name, guest_name)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Created guest ego %s\n", name); identity_guest_ego = ego; @@ -1370,9 +1412,11 @@ run (void *cls, #endif { cfg = c; + res = 1; end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL); - + GNUNET_SCHEDULER_add_shutdown (&end_shutdown, + NULL); GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer); id = GNUNET_IDENTITY_connect (cfg, &identity_ego_cb, NULL); diff --git a/src/social/test_social.conf b/src/social/test_social.conf @@ -0,0 +1,19 @@ +@INLINE@ ../../contrib/no_forcestart.conf + +[PATHS] +GNUNET_TEST_HOME = /tmp/gnunet-test-social/ + +[social] +FORCESTART = YES + +[transport] +PLUGINS = tcp + +[nat] +DISABLEV6 = YES +ENABLE_UPNP = NO +BEHIND_NAT = NO +ALLOW_NAT = NO +INTERNAL_ADDRESS = 127.0.0.1 +EXTERNAL_ADDRESS = 127.0.0.1 + diff --git a/src/statistics/gnunet-service-statistics.c b/src/statistics/gnunet-service-statistics.c @@ -998,7 +998,9 @@ client_disconnect_cb (void *cls, * * @param cls NULL * @param message the message found on disk - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int inject_message (void *cls, diff --git a/src/testbed/gnunet-helper-testbed.c b/src/testbed/gnunet-helper-testbed.c @@ -292,8 +292,9 @@ child_death_task (void *cls) * * @param cls identification of the client * @param message the actual message - * - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int tokenizer_cb (void *cls, @@ -359,7 +360,7 @@ tokenizer_cb (void *cls, cfg = GNUNET_CONFIGURATION_create (); if (GNUNET_OK != GNUNET_CONFIGURATION_deserialize (cfg, - config, + config, ul_config_size, NULL)) { diff --git a/src/transport/gnunet-helper-transport-wlan-dummy.c b/src/transport/gnunet-helper-transport-wlan-dummy.c @@ -121,6 +121,9 @@ send_mac_to_plugin (char *buffer, struct GNUNET_TRANSPORT_WLAN_MacAddress *mac) * * @param cls the 'struct SendBuffer' to copy the converted message to * @param hdr inbound message from the FIFO + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int stdin_send (void *cls, @@ -167,6 +170,9 @@ stdin_send (void *cls, * * @param cls the 'struct SendBuffer' to copy to * @param hdr the message we received to copy to the buffer + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int file_in_send (void *cls, diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c @@ -1651,7 +1651,7 @@ server_send_callback (void *cls, * * @param cls current session as closure * @param message the message to be forwarded to transport service - * @return #GNUNET_OK + * @return #GNUNET_OK (all OK) */ static int server_receive_mst_cb (void *cls, diff --git a/src/util/client.c b/src/util/client.c @@ -261,14 +261,27 @@ transmit_ready (void *cls) pos = (const char *) cstate->msg; len = ntohs (cstate->msg->size); GNUNET_assert (cstate->msg_off < len); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: message of type %u trying to send with socket %p (MQ: %p\n", + ntohs(cstate->msg->type), + cstate->sock, + cstate->mq); + RETRY: ret = GNUNET_NETWORK_socket_send (cstate->sock, &pos[cstate->msg_off], len - cstate->msg_off); if (-1 == ret) { - if (EINTR == errno) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "client: error during sending message of type %u\n", + ntohs(cstate->msg->type)); + if (EINTR == errno){ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: retrying message of type %u\n", + ntohs(cstate->msg->type)); goto RETRY; + } GNUNET_MQ_inject_error (cstate->mq, GNUNET_MQ_ERROR_WRITE); return; @@ -277,6 +290,9 @@ transmit_ready (void *cls) cstate->msg_off += ret; if (cstate->msg_off < len) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: rescheduling message of type %u\n", + ntohs(cstate->msg->type)); cstate->send_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, cstate->sock, @@ -286,6 +302,9 @@ transmit_ready (void *cls) GNUNET_MQ_impl_send_in_flight (cstate->mq); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: sending message of type %u successful\n", + ntohs(cstate->msg->type)); cstate->msg = NULL; GNUNET_MQ_impl_send_continue (cstate->mq); } @@ -297,7 +316,9 @@ transmit_ready (void *cls) * * @param cls the `struct ClientState` * @param msg message we received. - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing due to disconnect (no error) + * #GNUNET_SYSERR to stop further processing due to error */ static int recv_message (void *cls, @@ -306,7 +327,7 @@ recv_message (void *cls, struct ClientState *cstate = cls; if (GNUNET_YES == cstate->in_destroy) - return GNUNET_SYSERR; + return GNUNET_NO; LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u and size %u from %s\n", ntohs (msg->type), @@ -315,7 +336,7 @@ recv_message (void *cls, GNUNET_MQ_inject_message (cstate->mq, msg); if (GNUNET_YES == cstate->in_destroy) - return GNUNET_SYSERR; + return GNUNET_NO; return GNUNET_OK; } @@ -371,8 +392,12 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_cancel (cstate->recv_task); if (NULL != cstate->retry_task) GNUNET_SCHEDULER_cancel (cstate->retry_task); - if (NULL != cstate->sock) + if (NULL != cstate->sock){ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: destroying socket: %p\n", + cstate->sock); GNUNET_NETWORK_socket_close (cstate->sock); + } cancel_aps (cstate); GNUNET_free (cstate->service_name); GNUNET_free_non_null (cstate->hostname); @@ -794,8 +819,12 @@ connection_client_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_assert (NULL == cstate->send_task); cstate->msg = msg; cstate->msg_off = 0; - if (NULL == cstate->sock) + if (NULL == cstate->sock){ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client: message of type %u waiting for socket\n", + ntohs(msg->type)); return; /* still waiting for connection */ + } cstate->send_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, cstate->sock, diff --git a/src/util/mq.c b/src/util/mq.c @@ -357,6 +357,12 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, } GNUNET_assert (NULL == mq->envelope_head); mq->current_envelope = ev; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "mq: sending message of type %u, queue empty (MQ: %p)\n", + ntohs(ev->mh->type), + mq); + mq->send_impl (mq, ev->mh, mq->impl_state); @@ -452,6 +458,11 @@ impl_send_continue (void *cls) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "mq: sending message of type %u from queue\n", + ntohs(mq->current_envelope->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); @@ -840,6 +851,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) ev); GNUNET_assert (0 < mq->queue_length); mq->queue_length--; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "MQ destroy drops message of type %u\n", + ntohs (ev->mh->type)); GNUNET_MQ_discard (ev); } if (NULL != mq->current_envelope) @@ -847,6 +861,9 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) /* we can only discard envelopes that * are not queued! */ mq->current_envelope->parent_queue = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "MQ destroy drops current message of type %u\n", + ntohs (mq->current_envelope->mh->type)); GNUNET_MQ_discard (mq->current_envelope); mq->current_envelope = NULL; GNUNET_assert (0 < mq->queue_length); @@ -928,6 +945,11 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, mq->current_envelope); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "mq: sending canceled message of type %u queue\n", + ntohs(ev->mh->type)); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); diff --git a/src/util/mst.c b/src/util/mst.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2010, 2016 GNUnet e.V. + Copyright (C) 2010, 2016, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -126,6 +126,7 @@ GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst, int need_align; unsigned long offset; int ret; + int cbret; GNUNET_assert (mst->off <= mst->pos); GNUNET_assert (mst->pos <= mst->curr_buf); @@ -229,9 +230,17 @@ do_align: if (one_shot == GNUNET_YES) one_shot = GNUNET_SYSERR; mst->off += want; - if (GNUNET_SYSERR == mst->cb (mst->cb_cls, - hdr)) + if (GNUNET_OK != + (cbret = mst->cb (mst->cb_cls, + hdr))) + { + if (GNUNET_SYSERR == cbret) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failure processing message of type %u and size %u\n", + ntohs (hdr->type), + ntohs (hdr->size)); return GNUNET_SYSERR; + } if (mst->off == mst->pos) { /* reset to beginning of buffer, it's free right now! */ @@ -271,9 +280,17 @@ do_align: } if (one_shot == GNUNET_YES) one_shot = GNUNET_SYSERR; - if (GNUNET_SYSERR == mst->cb (mst->cb_cls, - hdr)) + if (GNUNET_OK != + (cbret = mst->cb (mst->cb_cls, + hdr))) + { + if (GNUNET_SYSERR == cbret) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failure processing message of type %u and size %u\n", + ntohs (hdr->type), + ntohs (hdr->size)); return GNUNET_SYSERR; + } buf += want; size -= want; } diff --git a/src/util/network.c b/src/util/network.c @@ -1223,7 +1223,7 @@ GNUNET_NETWORK_fdset_copy (struct GNUNET_NETWORK_FDSet *to, * @return POSIX file descriptor */ int -GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_fd (const struct GNUNET_NETWORK_Handle *desc) { return desc->fd; } @@ -1236,7 +1236,7 @@ GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc) * @return sockaddr */ struct sockaddr* -GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_addr (const struct GNUNET_NETWORK_Handle *desc) { return desc->addr; } @@ -1249,7 +1249,7 @@ GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc) * @return socklen_t for sockaddr */ socklen_t -GNUNET_NETWORK_get_addrlen (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_addrlen (const struct GNUNET_NETWORK_Handle *desc) { return desc->addrlen; } diff --git a/src/util/program.c b/src/util/program.c @@ -69,6 +69,16 @@ struct CommandContext /** + * task run when the scheduler shuts down + */ +static void +shutdown_task (void *cls) +{ + GNUNET_SPEEDUP_stop_ (); +} + + +/** * Initial task called by the scheduler for each * program. Runs the program-specific main task. */ @@ -78,6 +88,7 @@ program_main (void *cls) struct CommandContext *cc = cls; GNUNET_SPEEDUP_start_(cc->cfg); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_RESOLVER_connect (cc->cfg); cc->task (cc->task_cls, cc->args, cc->cfgfile, cc->cfg); } @@ -306,7 +317,6 @@ GNUNET_PROGRAM_run2 (int argc, char *const *argv, const char *binaryName, } ret = GNUNET_OK; cleanup: - GNUNET_SPEEDUP_stop_ (); GNUNET_CONFIGURATION_destroy (cfg); GNUNET_free_non_null (cc.cfgfile); GNUNET_free (cfg_fn); diff --git a/src/util/scheduler.c b/src/util/scheduler.c @@ -89,12 +89,6 @@ struct GNUNET_SCHEDULER_Handle * @deprecated */ struct GNUNET_NETWORK_FDSet *ws; - - /** - * Driver we used for the event loop. - */ - const struct GNUNET_SCHEDULER_Driver *driver; - }; @@ -124,36 +118,40 @@ struct GNUNET_SCHEDULER_Task void *callback_cls; /** - * Handle to the scheduler's state. + * Information about which FDs are ready for this task (and why). */ - const struct GNUNET_SCHEDULER_Handle *sh; + struct GNUNET_SCHEDULER_FdInfo *fds; /** - * Set of file descriptors this task is waiting - * for for reading. Once ready, this is updated - * to reflect the set of file descriptors ready - * for operation. + * Storage location used for @e fds if we want to avoid + * a separate malloc() call in the common case that this + * task is only about a single FD. */ - struct GNUNET_NETWORK_FDSet *read_set; + struct GNUNET_SCHEDULER_FdInfo fdx; /** - * Set of file descriptors this task is waiting for for writing. - * Once ready, this is updated to reflect the set of file - * descriptors ready for operation. + * Size of the @e fds array. */ - struct GNUNET_NETWORK_FDSet *write_set; + unsigned int fds_len; /** - * Information about which FDs are ready for this task (and why). + * if this task is related to multiple FDs this array contains + * all FdInfo structs that were marked as ready by calling + * #GNUNET_SCHEDULER_task_ready */ - const struct GNUNET_SCHEDULER_FdInfo *fds; + struct GNUNET_SCHEDULER_FdInfo *ready_fds; /** - * Storage location used for @e fds if we want to avoid - * a separate malloc() call in the common case that this - * task is only about a single FD. + * Size of the @e ready_fds array */ - struct GNUNET_SCHEDULER_FdInfo fdx; + unsigned int ready_fds_len; + + /** + * Do we own the network and file handles referenced by the FdInfo + * structs in the fds array. This will only be GNUNET_YES if the + * task was created by the #GNUNET_SCHEDULER_add_select function. + */ + int own_handles; /** * Absolute timeout value for the task, or @@ -169,11 +167,6 @@ struct GNUNET_SCHEDULER_Task #endif /** - * Size of the @e fds array. - */ - unsigned int fds_len; - - /** * Why is the task ready? Set after task is added to ready queue. * Initially set to zero. All reasons that have already been * satisfied (i.e. read or write ready) will be set over time. @@ -224,11 +217,72 @@ struct GNUNET_SCHEDULER_Task int num_backtrace_strings; #endif +}; + + +/** + * A struct representing an event the select driver is waiting for + */ +struct Scheduled +{ + struct Scheduled *prev; + + struct Scheduled *next; + + /** + * the task, the event is related to + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * information about the network socket / file descriptor where + * the event is expected to occur + */ + struct GNUNET_SCHEDULER_FdInfo *fdi; + + /** + * the event types (multiple event types can be ORed) the select + * driver is expected to wait for + */ + enum GNUNET_SCHEDULER_EventType et; +}; + + +/** + * Driver context used by GNUNET_SCHEDULER_run + */ +struct DriverContext +{ + /** + * the head of a DLL containing information about the events the + * select driver is waiting for + */ + struct Scheduled *scheduled_head; + + /** + * the tail of a DLL containing information about the events the + * select driver is waiting for + */ + struct Scheduled *scheduled_tail; + /** + * the time until the select driver will wake up again (after + * calling select) + */ + struct GNUNET_TIME_Relative timeout; }; /** + * The driver used for the event loop. Will be handed over to + * the scheduler in #GNUNET_SCHEDULER_run_from_driver(), peristed + * there in this variable for later use in functions like + * #GNUNET_SCHEDULER_add_select(), #add_without_sets() and + * #GNUNET_SCHEDULER_cancel(). + */ +static const struct GNUNET_SCHEDULER_Driver *scheduler_driver; + +/** * Head of list of tasks waiting for an event. */ static struct GNUNET_SCHEDULER_Task *pending_head; @@ -330,6 +384,11 @@ static struct GNUNET_SCHEDULER_TaskContext tc; */ static void *scheduler_select_cls; +/** + * Scheduler handle used for the driver functions + */ +static struct GNUNET_SCHEDULER_Handle sh; + /** * Sets the select function to use in the scheduler (scheduler_select). @@ -364,115 +423,44 @@ check_priority (enum GNUNET_SCHEDULER_Priority p) /** - * Update all sets and timeout for select. - * - * @param rs read-set, set to all FDs we would like to read (updated) - * @param ws write-set, set to all FDs we would like to write (updated) - * @param timeout next timeout (updated) + * chooses the nearest timeout from all pending tasks, to be used + * to tell the driver the next wakeup time (using its set_wakeup + * callback) */ -static void -update_sets (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws, - struct GNUNET_TIME_Relative *timeout) +struct GNUNET_TIME_Absolute +get_timeout () { struct GNUNET_SCHEDULER_Task *pos; struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative to; + struct GNUNET_TIME_Absolute timeout; - now = GNUNET_TIME_absolute_get (); pos = pending_timeout_head; + now = GNUNET_TIME_absolute_get (); + timeout = GNUNET_TIME_UNIT_FOREVER_ABS; if (NULL != pos) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; + { + timeout = now; + } + else + { + timeout = pos->timeout; + } } for (pos = pending_head; NULL != pos; pos = pos->next) { - if (pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) + if (0 != pos->reason) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; + timeout = now; + } + else if ((pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) && + (timeout.abs_value_us > pos->timeout.abs_value_us)) + { + timeout = pos->timeout; } - if (-1 != pos->read_fd) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - if (-1 != pos->write_fd) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if (NULL != pos->read_set) - GNUNET_NETWORK_fdset_add (rs, pos->read_set); - if (NULL != pos->write_set) - GNUNET_NETWORK_fdset_add (ws, pos->write_set); - if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; - } -} - - -/** - * Check if the ready set overlaps with the set we want to have ready. - * If so, update the want set (set all FDs that are ready). If not, - * return #GNUNET_NO. - * - * @param ready set that is ready - * @param want set that we want to be ready - * @return #GNUNET_YES if there was some overlap - */ -static int -set_overlaps (const struct GNUNET_NETWORK_FDSet *ready, - struct GNUNET_NETWORK_FDSet *want) -{ - if ((NULL == want) || (NULL == ready)) - return GNUNET_NO; - if (GNUNET_NETWORK_fdset_overlap (ready, want)) - { - /* copy all over (yes, there maybe unrelated bits, - * but this should not hurt well-written clients) */ - GNUNET_NETWORK_fdset_copy (want, ready); - return GNUNET_YES; } - return GNUNET_NO; -} - - -/** - * Check if the given task is eligible to run now. - * Also set the reason why it is eligible. - * - * @param task task to check if it is ready - * @param now the current time - * @param rs set of FDs ready for reading - * @param ws set of FDs ready for writing - * @return #GNUNET_YES if we can run it, #GNUNET_NO if not. - */ -static int -is_ready (struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_TIME_Absolute now, - const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - enum GNUNET_SCHEDULER_Reason reason; - - reason = task->reason; - if (now.abs_value_us >= task->timeout.abs_value_us) - reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - (((task->read_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd))) || - (set_overlaps (rs, task->read_set)))) - reason |= GNUNET_SCHEDULER_REASON_READ_READY; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (((task->write_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd))) - || (set_overlaps (ws, task->write_set)))) - reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; - if (0 == reason) - return GNUNET_NO; /* not ready */ - reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; - task->reason = reason; - return GNUNET_YES; + return timeout; } @@ -495,51 +483,6 @@ queue_ready_task (struct GNUNET_SCHEDULER_Task *task) /** - * Check which tasks are ready and move them - * to the respective ready queue. - * - * @param rs FDs ready for reading - * @param ws FDs ready for writing - */ -static void -check_ready (const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - struct GNUNET_SCHEDULER_Task *pos; - struct GNUNET_SCHEDULER_Task *next; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - while (NULL != (pos = pending_timeout_head)) - { - if (now.abs_value_us >= pos->timeout.abs_value_us) - pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if (0 == pos->reason) - break; - GNUNET_CONTAINER_DLL_remove (pending_timeout_head, - pending_timeout_tail, - pos); - if (pending_timeout_last == pos) - pending_timeout_last = NULL; - queue_ready_task (pos); - } - pos = pending_head; - while (NULL != pos) - { - next = pos->next; - if (GNUNET_YES == is_ready (pos, now, rs, ws)) - { - GNUNET_CONTAINER_DLL_remove (pending_head, - pending_tail, - pos); - queue_ready_task (pos); - } - pos = next; - } -} - - -/** * Request the shutdown of a scheduler. Marks all tasks * awaiting shutdown as ready. Note that tasks * scheduled with #GNUNET_SCHEDULER_add_shutdown() AFTER this call @@ -562,25 +505,6 @@ GNUNET_SCHEDULER_shutdown () /** - * Destroy a task (release associated resources) - * - * @param t task to destroy - */ -static void -destroy_task (struct GNUNET_SCHEDULER_Task *t) -{ - if (NULL != t->read_set) - GNUNET_NETWORK_fdset_destroy (t->read_set); - if (NULL != t->write_set) - GNUNET_NETWORK_fdset_destroy (t->write_set); -#if EXECINFO - GNUNET_free (t->backtrace_strings); -#endif - GNUNET_free (t); -} - - -/** * Output stack trace of task @a t. * * @param t task to dump stack trace of @@ -592,88 +516,59 @@ dump_backtrace (struct GNUNET_SCHEDULER_Task *t) unsigned int i; for (i = 0; i < t->num_backtrace_strings; i++) - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p trace %u: %s\n", - t, - i, - t->backtrace_strings[i]); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Task %p trace %u: %s\n", + t, + i, + t->backtrace_strings[i]); #endif } /** - * Run at least one task in the highest-priority queue that is not - * empty. Keep running tasks until we are either no longer running - * "URGENT" tasks or until we have at least one "pending" task (which - * may become ready, hence we should select on it). Naturally, if - * there are no more ready tasks, we also return. + * Destroy a task (release associated resources) * - * @param rs FDs ready for reading - * @param ws FDs ready for writing + * @param t task to destroy */ static void -run_ready (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws) +destroy_task (struct GNUNET_SCHEDULER_Task *t) { - enum GNUNET_SCHEDULER_Priority p; - struct GNUNET_SCHEDULER_Task *pos; + unsigned int i; - max_priority_added = GNUNET_SCHEDULER_PRIORITY_KEEP; - do + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying task %p\n", + t); + + if (GNUNET_YES == t->own_handles) { - if (0 == ready_count) - return; - GNUNET_assert (NULL == ready_head[GNUNET_SCHEDULER_PRIORITY_KEEP]); - /* yes, p>0 is correct, 0 is "KEEP" which should - * always be an empty queue (see assertion)! */ - for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--) - { - pos = ready_head[p]; - if (NULL != pos) - break; - } - GNUNET_assert (NULL != pos); /* ready_count wrong? */ - GNUNET_CONTAINER_DLL_remove (ready_head[p], - ready_tail[p], - pos); - ready_count--; - current_priority = pos->priority; - current_lifeness = pos->lifeness; - active_task = pos; -#if PROFILE_DELAYS - if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > - DELAY_THRESHOLD.rel_value_us) + for (i = 0; i != t->fds_len; ++i) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p took %s to be scheduled\n", - pos, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), - GNUNET_YES)); + const struct GNUNET_NETWORK_Handle *fd = t->fds[i].fd; + const struct GNUNET_DISK_FileHandle *fh = t->fds[i].fh; + if (fd) + { + GNUNET_NETWORK_socket_free_memory_only_ ((struct GNUNET_NETWORK_Handle *) fd); + } + if (fh) + { + // FIXME: on WIN32 this is not enough! A function + // GNUNET_DISK_file_free_memory_only would be nice + GNUNET_free ((void *) fh); + } } -#endif - tc.reason = pos->reason; - tc.read_ready = (NULL == pos->read_set) ? rs : pos->read_set; - if ((-1 != pos->read_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY))) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - tc.write_ready = (NULL == pos->write_set) ? ws : pos->write_set; - if ((-1 != pos->write_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if ((0 != (tc.reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (-1 != pos->write_fd) && - (!GNUNET_NETWORK_fdset_test_native (ws, pos->write_fd))) - GNUNET_assert (0); // added to ready in previous select loop! - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", - pos); - pos->callback (pos->callback_cls); - dump_backtrace (pos); - active_task = NULL; - destroy_task (pos); - tasks_run++; } - while ((NULL == pending_head) || (p >= max_priority_added)); + if (t->fds_len > 1) + { + GNUNET_array_grow (t->fds, t->fds_len, 0); + } + if (t->ready_fds_len > 0) + { + GNUNET_array_grow (t->ready_fds, t->ready_fds_len, 0); + } +#if EXECINFO + GNUNET_free (t->backtrace_strings); +#endif + GNUNET_free (t); } @@ -700,22 +595,22 @@ sighandler_pipe () #endif -/** - * Wait for a short time. - * Sleeps for @a ms ms (as that should be long enough for virtually all - * modern systems to context switch and allow another process to do - * some 'real' work). - * - * @param ms how many ms to wait - */ -static void -short_wait (unsigned int ms) -{ - struct GNUNET_TIME_Relative timeout; - - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms); - (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout); -} +///** +// * Wait for a short time. +// * Sleeps for @a ms ms (as that should be long enough for virtually all +// * modern systems to context switch and allow another process to do +// * some 'real' work). +// * +// * @param ms how many ms to wait +// */ +//static void +//short_wait (unsigned int ms) +//{ +// struct GNUNET_TIME_Relative timeout; +// +// timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms); +// (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout); +//} /** @@ -737,35 +632,31 @@ sighandler_shutdown () } -/** - * Check if the system is still alive. Trigger shutdown if we - * have tasks, but none of them give us lifeness. - * - * @return #GNUNET_OK to continue the main loop, - * #GNUNET_NO to exit - */ -static int -check_lifeness () +void +shutdown_if_no_lifeness () { struct GNUNET_SCHEDULER_Task *t; if (ready_count > 0) - return GNUNET_OK; + return; for (t = pending_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; + if (GNUNET_YES == t->lifeness) + return; for (t = shutdown_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; + if (GNUNET_YES == t->lifeness) + return; for (t = pending_timeout_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; - if (NULL != shutdown_head) + if (GNUNET_YES == t->lifeness) + return; + /* No lifeness! Cancel all pending tasks the driver knows about and shutdown */ + t = pending_head; + while (NULL != t) { - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; + struct GNUNET_SCHEDULER_Task *next = t->next; + GNUNET_SCHEDULER_cancel (t); + t = next; } - return GNUNET_NO; + GNUNET_SCHEDULER_shutdown (); } @@ -787,251 +678,219 @@ void GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { - GNUNET_SCHEDULER_run_with_optional_signals(GNUNET_YES, task, task_cls); + struct GNUNET_SCHEDULER_Driver *driver; + struct DriverContext context = {.scheduled_head = NULL, + .scheduled_tail = NULL, + .timeout = GNUNET_TIME_UNIT_FOREVER_REL}; + + driver = GNUNET_SCHEDULER_driver_select (); + driver->cls = &context; + + GNUNET_SCHEDULER_run_with_driver (driver, task, task_cls); + + GNUNET_free (driver); } -void -GNUNET_SCHEDULER_run_with_optional_signals (int install_signals, - GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + +/** + * Obtain the task context, giving the reason why the current task was + * started. + * + * @return current tasks' scheduler context + */ +const struct GNUNET_SCHEDULER_TaskContext * +GNUNET_SCHEDULER_get_task_context () { - struct GNUNET_NETWORK_FDSet *rs; - struct GNUNET_NETWORK_FDSet *ws; - struct GNUNET_TIME_Relative timeout; - int ret; - struct GNUNET_SIGNAL_Context *shc_int; - struct GNUNET_SIGNAL_Context *shc_term; -#if (SIGTERM != GNUNET_TERM_SIG) - struct GNUNET_SIGNAL_Context *shc_gterm; -#endif + GNUNET_assert (NULL != active_task); + return &tc; +} -#ifndef MINGW - struct GNUNET_SIGNAL_Context *shc_quit; - struct GNUNET_SIGNAL_Context *shc_hup; - struct GNUNET_SIGNAL_Context *shc_pipe; -#endif - unsigned long long last_tr; - unsigned int busy_wait_warning; - const struct GNUNET_DISK_FileHandle *pr; - char c; - GNUNET_assert (NULL == active_task); - rs = GNUNET_NETWORK_fdset_create (); - ws = GNUNET_NETWORK_fdset_create (); - GNUNET_assert (NULL == shutdown_pipe_handle); - shutdown_pipe_handle = GNUNET_DISK_pipe (GNUNET_NO, - GNUNET_NO, - GNUNET_NO, - GNUNET_NO); - GNUNET_assert (NULL != shutdown_pipe_handle); - pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, - GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); - my_pid = getpid (); +/** + * Get information about the current load of this scheduler. Use this + * function to determine if an elective task should be added or simply + * dropped (if the decision should be made based on the number of + * tasks ready to run). + * + * @param p priority level to look at + * @return number of tasks pending right now + */ +unsigned int +GNUNET_SCHEDULER_get_load (enum GNUNET_SCHEDULER_Priority p) +{ + struct GNUNET_SCHEDULER_Task *pos; + unsigned int ret; - if (GNUNET_YES == install_signals) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Registering signal handlers\n"); - shc_int = GNUNET_SIGNAL_handler_install (SIGINT, - &sighandler_shutdown); - shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, - &sighandler_shutdown); -#if (SIGTERM != GNUNET_TERM_SIG) - shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG, - &sighandler_shutdown); -#endif -#ifndef MINGW - shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE, - &sighandler_pipe); - shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, - &sighandler_shutdown); - shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, - &sighandler_shutdown); -#endif - } + GNUNET_assert (NULL != active_task); + if (p == GNUNET_SCHEDULER_PRIORITY_COUNT) + return ready_count; + if (p == GNUNET_SCHEDULER_PRIORITY_KEEP) + p = current_priority; + ret = 0; + for (pos = ready_head[check_priority (p)]; NULL != pos; pos = pos->next) + ret++; + return ret; +} - current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT; - current_lifeness = GNUNET_YES; - GNUNET_SCHEDULER_add_with_reason_and_priority (task, - task_cls, - GNUNET_SCHEDULER_REASON_STARTUP, - GNUNET_SCHEDULER_PRIORITY_DEFAULT); - active_task = (void *) (long) -1; /* force passing of sanity check */ - GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO, - &GNUNET_OS_install_parent_control_handler, - NULL); - active_task = NULL; - last_tr = 0; - busy_wait_warning = 0; - while (GNUNET_OK == check_lifeness ()) + +void +init_fd_info (struct GNUNET_SCHEDULER_Task *t, + const struct GNUNET_NETWORK_Handle *const *read_nh, + unsigned int read_nh_len, + const struct GNUNET_NETWORK_Handle *const *write_nh, + unsigned int write_nh_len, + const struct GNUNET_DISK_FileHandle *const *read_fh, + unsigned int read_fh_len, + const struct GNUNET_DISK_FileHandle *const *write_fh, + unsigned int write_fh_len) +{ + // FIXME: if we have exactly two network handles / exactly two file handles + // and they are equal, we can make one FdInfo with both + // GNUNET_SCHEDULER_ET_IN and GNUNET_SCHEDULER_ET_OUT set. + struct GNUNET_SCHEDULER_FdInfo *fdi; + + t->fds_len = read_nh_len + write_nh_len + read_fh_len + write_fh_len; + if (1 == t->fds_len) { - GNUNET_NETWORK_fdset_zero (rs); - GNUNET_NETWORK_fdset_zero (ws); - timeout = GNUNET_TIME_UNIT_FOREVER_REL; - update_sets (rs, ws, &timeout); - GNUNET_NETWORK_fdset_handle_set (rs, pr); - if (ready_count > 0) + fdi = &t->fdx; + t->fds = fdi; + if (1 == read_nh_len) { - /* no blocking, more work already ready! */ - timeout = GNUNET_TIME_UNIT_ZERO; + fdi->fd = *read_nh; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = GNUNET_NETWORK_get_fd (*read_nh); + t->read_fd = fdi->sock; + t->write_fd = -1; + } + else if (1 == write_nh_len) + { + fdi->fd = *write_nh; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = GNUNET_NETWORK_get_fd (*write_nh); + t->read_fd = -1; + t->write_fd = fdi->sock; + } + else if (1 == read_fh_len) + { + fdi->fh = *read_fh; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = (*read_fh)->fd; // FIXME: does not work under WIN32 + t->read_fd = fdi->sock; + t->write_fd = -1; } - if (NULL == scheduler_select) - ret = GNUNET_NETWORK_socket_select (rs, - ws, - NULL, - timeout); else - ret = scheduler_select (scheduler_select_cls, - rs, - ws, - NULL, - timeout); - if (ret == GNUNET_SYSERR) { - if (errno == EINTR) - continue; - - LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); -#ifndef MINGW -#if USE_LSOF - char lsof[512]; - - snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); - (void) close (1); - (void) dup2 (2, 1); - if (0 != system (lsof)) - LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, - "system"); -#endif -#endif -#if DEBUG_FDS - struct GNUNET_SCHEDULER_Task *t; - - for (t = pending_head; NULL != t; t = t->next) - { - if (-1 != t->read_fd) - { - int flags = fcntl (t->read_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->read_fd); - dump_backtrace (t); - } - } - if (-1 != t->write_fd) - { - int flags = fcntl (t->write_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->write_fd); - dump_backtrace (t); - } - } - } -#endif - GNUNET_assert (0); - break; + fdi->fh = *write_fh; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = (*write_fh)->fd; // FIXME: does not work under WIN32 + t->read_fd = -1; + t->write_fd = fdi->sock; } - - if ( (0 == ret) && - (0 == timeout.rel_value_us) && - (busy_wait_warning > 16) ) + } + else + { + fdi = GNUNET_new_array (t->fds_len, struct GNUNET_SCHEDULER_FdInfo); + t->fds = fdi; + t->read_fd = -1; + t->write_fd = -1; + unsigned int i; + for (i = 0; i != read_nh_len; ++i) { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Looks like we're busy waiting...\n"); - short_wait (100); /* mitigate */ + fdi->fd = read_nh[i]; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = GNUNET_NETWORK_get_fd (read_nh[i]); + ++fdi; } - check_ready (rs, ws); - run_ready (rs, ws); - if (GNUNET_NETWORK_fdset_handle_isset (rs, pr)) + for (i = 0; i != write_nh_len; ++i) { - /* consume the signal */ - GNUNET_DISK_file_read (pr, &c, sizeof (c)); - /* mark all active tasks as ready due to shutdown */ - GNUNET_SCHEDULER_shutdown (); + fdi->fd = write_nh[i]; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = GNUNET_NETWORK_get_fd (write_nh[i]); + ++fdi; } - if (last_tr == tasks_run) + for (i = 0; i != read_fh_len; ++i) { - short_wait (1); - busy_wait_warning++; + fdi->fh = read_fh[i]; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = (read_fh[i])->fd; // FIXME: does not work under WIN32 + ++fdi; } - else + for (i = 0; i != write_fh_len; ++i) { - last_tr = tasks_run; - busy_wait_warning = 0; + fdi->fh = write_fh[i]; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = (write_fh[i])->fd; // FIXME: does not work under WIN32 + ++fdi; } } - - if (GNUNET_YES == install_signals) - { - GNUNET_SIGNAL_handler_uninstall (shc_int); - GNUNET_SIGNAL_handler_uninstall (shc_term); -#if (SIGTERM != GNUNET_TERM_SIG) - GNUNET_SIGNAL_handler_uninstall (shc_gterm); -#endif -#ifndef MINGW - GNUNET_SIGNAL_handler_uninstall (shc_pipe); - GNUNET_SIGNAL_handler_uninstall (shc_quit); - GNUNET_SIGNAL_handler_uninstall (shc_hup); -#endif - } - - GNUNET_DISK_pipe_close (shutdown_pipe_handle); - shutdown_pipe_handle = NULL; - GNUNET_NETWORK_fdset_destroy (rs); - GNUNET_NETWORK_fdset_destroy (ws); } /** - * Obtain the task context, giving the reason why the current task was - * started. + * calls the given function @a func on each FdInfo related to @a t. + * Optionally updates the event type field in each FdInfo after calling + * @a func. * - * @return current tasks' scheduler context - */ -const struct GNUNET_SCHEDULER_TaskContext * -GNUNET_SCHEDULER_get_task_context () + * @param t the task + * @param driver_func the function to call with each FdInfo contained in + * in @a t + * @param if_not_ready only call @a driver_func on FdInfos that are not + * ready + * @param et the event type to be set in each FdInfo after calling + * @a driver_func on it, or -1 if no updating not desired. + */ +void driver_add_multiple (struct GNUNET_SCHEDULER_Task *t, + enum GNUNET_SCHEDULER_EventType et) { - GNUNET_assert (NULL != active_task); - return &tc; + struct GNUNET_SCHEDULER_FdInfo *fdi; + int success = GNUNET_YES; + + for (int i = 0; i != t->fds_len; ++i) + { + fdi = &t->fds[i]; + success = scheduler_driver->add (scheduler_driver->cls, t, fdi) && success; + if (et != -1) + { + fdi->et = et; + } + } + if (GNUNET_YES != success) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not add task\n"); + } } -/** - * Get information about the current load of this scheduler. Use this - * function to determine if an elective task should be added or simply - * dropped (if the decision should be made based on the number of - * tasks ready to run). - * - * @param p priority level to look at - * @return number of tasks pending right now - */ -unsigned int -GNUNET_SCHEDULER_get_load (enum GNUNET_SCHEDULER_Priority p) +void +shutdown_cb (void *cls) { - struct GNUNET_SCHEDULER_Task *pos; - unsigned int ret; + char c; + const struct GNUNET_DISK_FileHandle *pr; - GNUNET_assert (NULL != active_task); - if (p == GNUNET_SCHEDULER_PRIORITY_COUNT) - return ready_count; - if (p == GNUNET_SCHEDULER_PRIORITY_KEEP) - p = current_priority; - ret = 0; - for (pos = ready_head[check_priority (p)]; NULL != pos; pos = pos->next) - ret++; - return ret; + pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, + GNUNET_DISK_PIPE_END_READ); + GNUNET_assert (! GNUNET_DISK_handle_invalid (pr)); + /* consume the signal */ + GNUNET_DISK_file_read (pr, &c, sizeof (c)); + /* mark all active tasks as ready due to shutdown */ + GNUNET_SCHEDULER_shutdown (); } /** * Cancel the task with the specified identifier. - * The task must not yet have run. + * The task must not yet have run. Only allowed to be called as long as the + * scheduler is running (#GNUNET_SCHEDULER_run or + * #GNUNET_SCHEDULER_run_with_driver has been called and has not returned yet). * * @param task id of the task to cancel * @return original closure of the task @@ -1040,34 +899,50 @@ void * GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) { enum GNUNET_SCHEDULER_Priority p; + int is_fd_task; void *ret; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "canceling task %p\n", + task); + + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); GNUNET_assert ( (NULL != active_task) || - (GNUNET_NO == task->lifeness) ); - if (! task->in_ready_list) + (GNUNET_NO == task->lifeness) ); + is_fd_task = (NULL != task->fds); + if (is_fd_task) { - if ( (-1 == task->read_fd) && - (-1 == task->write_fd) && - (NULL == task->read_set) && - (NULL == task->write_set) ) + int del_result = scheduler_driver->del (scheduler_driver->cls, task); + if (GNUNET_OK != del_result) { - if (GNUNET_YES == task->on_shutdown) - GNUNET_CONTAINER_DLL_remove (shutdown_head, - shutdown_tail, - task); - else - GNUNET_CONTAINER_DLL_remove (pending_timeout_head, - pending_timeout_tail, - task); - if (task == pending_timeout_last) - pending_timeout_last = NULL; + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not delete task\n"); + GNUNET_assert (0); } - else + } + if (! task->in_ready_list) + { + if (is_fd_task) { GNUNET_CONTAINER_DLL_remove (pending_head, pending_tail, task); } + else if (GNUNET_YES == task->on_shutdown) + { + GNUNET_CONTAINER_DLL_remove (shutdown_head, + shutdown_tail, + task); + } + else + { + GNUNET_CONTAINER_DLL_remove (pending_timeout_head, + pending_timeout_tail, + task); + if (pending_timeout_last == task) + pending_timeout_last = NULL; + } } else { @@ -1078,9 +953,6 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) ready_count--; } ret = task->callback_cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Canceling task %p\n", - task); destroy_task (task); return ret; } @@ -1101,7 +973,7 @@ init_backtrace (struct GNUNET_SCHEDULER_Task *t) = backtrace (backtrace_array, MAX_TRACE_DEPTH); t->backtrace_strings = backtrace_symbols (backtrace_array, - t->num_backtrace_strings); + t->num_backtrace_strings); dump_backtrace (t); #endif } @@ -1218,7 +1090,7 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, pending_timeout_last = t; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task: %p\n", + "Adding task %p\n", t); init_backtrace (t); return t; @@ -1238,8 +1110,8 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed_with_priority (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - GNUNET_SCHEDULER_TaskCallback task, + enum GNUNET_SCHEDULER_Priority priority, + GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { return GNUNET_SCHEDULER_add_at_with_priority (GNUNET_TIME_relative_to_absolute (delay), @@ -1307,12 +1179,12 @@ GNUNET_SCHEDULER_add_at (struct GNUNET_TIME_Absolute at, struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { return GNUNET_SCHEDULER_add_delayed_with_priority (delay, - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - task, - task_cls); + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + task, + task_cls); } @@ -1333,11 +1205,11 @@ GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { return GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_ZERO, - task, - task_cls); + task, + task_cls); } @@ -1353,7 +1225,7 @@ GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { struct GNUNET_SCHEDULER_Task *t; @@ -1370,12 +1242,12 @@ GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, t->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; t->priority = GNUNET_SCHEDULER_PRIORITY_SHUTDOWN; t->on_shutdown = GNUNET_YES; - t->lifeness = GNUNET_YES; + t->lifeness = GNUNET_NO; GNUNET_CONTAINER_DLL_insert (shutdown_head, - shutdown_tail, - t); + shutdown_tail, + t); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task: %p\n", + "Adding shutdown task %p\n", t); init_backtrace (t); return t; @@ -1409,6 +1281,33 @@ GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness, } +#if DEBUG_FDS +/** + * check a raw file descriptor and abort if it is bad (for debugging purposes) + * + * @param t the task related to the file descriptor + * @param raw_fd the raw file descriptor to check + */ +void +check_fd (struct GNUNET_SCHEDULER_Task *t, int raw_fd) +{ + if (-1 != raw_fd) + { + int flags = fcntl (raw_fd, F_GETFD); + + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + raw_fd); + init_backtrace (t); + GNUNET_assert (0); + } + } +} +#endif + + /** * Schedule a new task to be run with a specified delay or when any of * the specified file descriptor sets is ready. The delay can be used @@ -1437,9 +1336,11 @@ GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness, #ifndef MINGW static struct GNUNET_SCHEDULER_Task * add_without_sets (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - int rfd, - int wfd, + enum GNUNET_SCHEDULER_Priority priority, + const struct GNUNET_NETWORK_Handle *read_nh, + const struct GNUNET_NETWORK_Handle *write_nh, + const struct GNUNET_DISK_FileHandle *read_fh, + const struct GNUNET_DISK_FileHandle *write_fh, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { @@ -1448,39 +1349,23 @@ add_without_sets (struct GNUNET_TIME_Relative delay, GNUNET_assert (NULL != active_task); GNUNET_assert (NULL != task); t = GNUNET_new (struct GNUNET_SCHEDULER_Task); + init_fd_info (t, + &read_nh, + read_nh ? 1 : 0, + &write_nh, + write_nh ? 1 : 0, + &read_fh, + read_fh ? 1 : 0, + &write_fh, + write_fh ? 1 : 0); t->callback = task; t->callback_cls = task_cls; #if DEBUG_FDS - if (-1 != rfd) - { - int flags = fcntl (rfd, F_GETFD); - - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - rfd); - init_backtrace (t); - GNUNET_assert (0); - } - } - if (-1 != wfd) - { - int flags = fcntl (wfd, F_GETFD); - - if (flags == -1 && errno == EBADF) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - wfd); - init_backtrace (t); - GNUNET_assert (0); - } - } + check_fd (t, NULL != read_nh ? GNUNET_NETWORK_get_fd (read_nh) : -1); + check_fd (t, NULL != write_nh ? GNUNET_NETWORK_get_fd (write_nh) : -1); + check_fd (t, NULL != read_fh ? read_fh->fd : -1); + check_fd (t, NULL != write_fh ? write_fh->fd : -1); #endif - t->read_fd = rfd; - GNUNET_assert (wfd >= -1); - t->write_fd = wfd; #if PROFILE_DELAYS t->start_time = GNUNET_TIME_absolute_get (); #endif @@ -1490,11 +1375,9 @@ add_without_sets (struct GNUNET_TIME_Relative delay, GNUNET_CONTAINER_DLL_insert (pending_head, pending_tail, t); + driver_add_multiple (t, GNUNET_SCHEDULER_ET_NONE); max_priority_added = GNUNET_MAX (max_priority_added, t->priority); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task %p\n", - t); init_backtrace (t); return t; } @@ -1507,6 +1390,9 @@ add_without_sets (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param rfd read file-descriptor @@ -1522,8 +1408,8 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, void *task_cls) { return GNUNET_SCHEDULER_add_read_net_with_priority (delay, - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - rfd, task, task_cls); + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + rfd, task, task_cls); } @@ -1534,6 +1420,9 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, * socket being ready. The task will be scheduled for execution once * either the delay has expired or the socket operation is ready. It * will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority to use for the task @@ -1545,9 +1434,9 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - struct GNUNET_NETWORK_Handle *rfd, - GNUNET_SCHEDULER_TaskCallback task, + enum GNUNET_SCHEDULER_Priority priority, + struct GNUNET_NETWORK_Handle *rfd, + GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { return GNUNET_SCHEDULER_add_net_with_priority (delay, priority, @@ -1565,6 +1454,9 @@ GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay, * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the priority of * the calling task. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param wfd write file-descriptor @@ -1592,6 +1484,9 @@ GNUNET_SCHEDULER_add_write_net (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority of the task @@ -1612,6 +1507,9 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); + #if MINGW struct GNUNET_NETWORK_FDSet *s; struct GNUNET_SCHEDULER_Task * ret; @@ -1627,10 +1525,13 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_NETWORK_fdset_destroy (s); return ret; #else + GNUNET_assert (on_read || on_write); GNUNET_assert (GNUNET_NETWORK_get_fd (fd) >= 0); return add_without_sets (delay, priority, - on_read ? GNUNET_NETWORK_get_fd (fd) : -1, - on_write ? GNUNET_NETWORK_get_fd (fd) : -1, + on_read ? fd : NULL, + on_write ? fd : NULL, + NULL, + NULL, task, task_cls); #endif } @@ -1642,6 +1543,9 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param rfd read file-descriptor @@ -1668,6 +1572,9 @@ GNUNET_SCHEDULER_add_read_file (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param wfd write file-descriptor @@ -1694,6 +1601,9 @@ GNUNET_SCHEDULER_add_write_file (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority of the task @@ -1712,6 +1622,9 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, int on_read, int on_write, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); + #if MINGW struct GNUNET_NETWORK_FDSet *s; struct GNUNET_SCHEDULER_Task * ret; @@ -1727,19 +1640,70 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_NETWORK_fdset_destroy (s); return ret; #else - int real_fd; - - GNUNET_DISK_internal_file_handle_ (fd, &real_fd, sizeof (int)); - GNUNET_assert (real_fd >= 0); - return add_without_sets ( - delay, priority, - on_read ? real_fd : -1, - on_write ? real_fd : -1, - task, task_cls); + GNUNET_assert (on_read || on_write); + GNUNET_assert (fd->fd >= 0); + return add_without_sets (delay, priority, + NULL, + NULL, + on_read ? fd : NULL, + on_write ? fd : NULL, + task, task_cls); #endif } +void +extract_handles (struct GNUNET_SCHEDULER_Task *t, + const struct GNUNET_NETWORK_FDSet *fdset, + const struct GNUNET_NETWORK_Handle ***ntarget, + unsigned int *extracted_nhandles, + const struct GNUNET_DISK_FileHandle ***ftarget, + unsigned int *extracted_fhandles) +{ + // FIXME: this implementation only works for unix, for WIN32 the file handles + // in fdset must be handled separately + const struct GNUNET_NETWORK_Handle **nhandles; + const struct GNUNET_DISK_FileHandle **fhandles; + unsigned int nhandles_len, fhandles_len; + int sock; + + nhandles = NULL; + fhandles = NULL; + nhandles_len = 0; + fhandles_len = 0; + for (sock = 0; sock != fdset->nsds; ++sock) + { + if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (fdset, sock)) + { + struct GNUNET_NETWORK_Handle *nhandle; + struct GNUNET_DISK_FileHandle *fhandle; + + nhandle = GNUNET_NETWORK_socket_box_native (sock); + if (NULL != nhandle) + { + GNUNET_array_append (nhandles, nhandles_len, nhandle); + } + else + { + fhandle = GNUNET_DISK_get_handle_from_int_fd (sock); + if (NULL != fhandle) + { + GNUNET_array_append (fhandles, fhandles_len, fhandle); + } + else + { + GNUNET_assert (0); + } + } + } + } + *ntarget = nhandles_len > 0 ? nhandles : NULL; + *ftarget = fhandles_len > 0 ? fhandles : NULL; + *extracted_nhandles = nhandles_len; + *extracted_fhandles = fhandles_len; +} + + /** * Schedule a new task to be run with a specified delay or when any of * the specified file descriptor sets is ready. The delay can be used @@ -1755,6 +1719,9 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, * || any-rs-ready * || any-ws-ready) ) * </code> + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param prio how important is this task? * @param delay how long should we wait? @@ -1774,13 +1741,20 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, void *task_cls) { struct GNUNET_SCHEDULER_Task *t; - - if ( (NULL == rs) && - (NULL == ws) ) + const struct GNUNET_NETWORK_Handle **read_nhandles; + const struct GNUNET_NETWORK_Handle **write_nhandles; + const struct GNUNET_DISK_FileHandle **read_fhandles; + const struct GNUNET_DISK_FileHandle **write_fhandles; + unsigned int read_nhandles_len, write_nhandles_len, + read_fhandles_len, write_fhandles_len; + + if (((NULL == rs) && (NULL == ws)) || ((0 == rs->nsds) && (0 == ws->nsds))) return GNUNET_SCHEDULER_add_delayed_with_priority (delay, prio, task, task_cls); + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); GNUNET_assert (NULL != active_task); GNUNET_assert (NULL != task); t = GNUNET_new (struct GNUNET_SCHEDULER_Task); @@ -1788,16 +1762,48 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, t->callback_cls = task_cls; t->read_fd = -1; t->write_fd = -1; + t->own_handles = GNUNET_YES; + read_nhandles = NULL; + write_nhandles = NULL; + read_fhandles = NULL; + write_fhandles = NULL; + read_nhandles_len = 0; + write_nhandles_len = 0; + read_fhandles_len = 0; + write_fhandles_len = 0; if (NULL != rs) { - t->read_set = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_copy (t->read_set, rs); + extract_handles (t, + rs, + &read_nhandles, + &read_nhandles_len, + &read_fhandles, + &read_fhandles_len); } if (NULL != ws) { - t->write_set = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_copy (t->write_set, ws); + extract_handles (t, + ws, + &write_nhandles, + &write_nhandles_len, + &write_fhandles, + &write_fhandles_len); } + init_fd_info (t, + read_nhandles, + read_nhandles_len, + write_nhandles, + write_nhandles_len, + read_fhandles, + read_fhandles_len, + write_fhandles, + write_fhandles_len); + /* free the arrays of pointers to network / file handles, the actual + * handles will be freed in destroy_task */ + GNUNET_array_grow (read_nhandles, read_nhandles_len, 0); + GNUNET_array_grow (write_nhandles, write_nhandles_len, 0); + GNUNET_array_grow (read_fhandles, read_fhandles_len, 0); + GNUNET_array_grow (write_fhandles, write_fhandles_len, 0); #if PROFILE_DELAYS t->start_time = GNUNET_TIME_absolute_get (); #endif @@ -1810,8 +1816,9 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, GNUNET_CONTAINER_DLL_insert (pending_head, pending_tail, t); + driver_add_multiple (t, GNUNET_SCHEDULER_ET_NONE); max_priority_added = GNUNET_MAX (max_priority_added, - t->priority); + t->priority); LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding task %p\n", t); @@ -1822,17 +1829,18 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, /** * Function used by event-loop implementations to signal the scheduler - * that a particular @a task is ready due to an event of type @a et. + * that a particular @a task is ready due to an event specified in the + * et field of @a fdi. * * This function will then queue the task to notify the application * that the task is ready (with the respective priority). * - * @param task the task that is ready, NULL for wake up calls - * @param et information about why the task is ready + * @param task the task that is ready + * @param fdi information about the related FD */ void GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, - enum GNUNET_SCHEDULER_EventType et) + struct GNUNET_SCHEDULER_FdInfo *fdi) { enum GNUNET_SCHEDULER_Reason reason; struct GNUNET_TIME_Absolute now; @@ -1842,17 +1850,20 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, if (now.abs_value_us >= task->timeout.abs_value_us) reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; if ( (0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - (0 != (GNUNET_SCHEDULER_ET_IN & et)) ) + (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et)) ) reason |= GNUNET_SCHEDULER_REASON_READ_READY; if ( (0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (0 != (GNUNET_SCHEDULER_ET_OUT & et)) ) + (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)) ) reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; task->reason = reason; - task->fds = &task->fdx; - task->fdx.et = et; - task->fds_len = 1; - queue_ready_task (task); + if (GNUNET_NO == task->in_ready_list) + { + GNUNET_CONTAINER_DLL_remove (pending_head, + pending_tail, + task); + queue_ready_task (task); + } } @@ -1862,15 +1873,16 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, * there are tasks left to run just to give other tasks a chance as * well. If we return #GNUNET_YES, the driver should call this * function again as soon as possible, while if we return #GNUNET_NO - * it must block until the operating system has more work as the - * scheduler has no more work to do right now. + * it must block until either the operating system has more work (the + * scheduler has no more work to do right now) or the timeout set by + * the scheduler (using the set_wakeup callback) is reached. * * @param sh scheduler handle that was given to the `loop` * @return #GNUNET_OK if there are more tasks that are ready, * and thus we would like to run more (yield to avoid * blocking other activities for too long) * #GNUNET_NO if we are done running tasks (yield to block) - * #GNUNET_SYSERR on error + * #GNUNET_SYSERR on error, e.g. no tasks were ready */ int GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) @@ -1894,9 +1906,27 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) pending_timeout_last = NULL; queue_ready_task (pos); } + pos = pending_head; + while (NULL != pos) + { + struct GNUNET_SCHEDULER_Task *next = pos->next; + if (now.abs_value_us >= pos->timeout.abs_value_us) + { + pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; + GNUNET_CONTAINER_DLL_remove (pending_head, + pending_tail, + pos); + queue_ready_task (pos); + } + pos = next; + } if (0 == ready_count) - return GNUNET_NO; + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "GNUNET_SCHEDULER_run_from_driver was called, but no tasks are ready!\n"); + return GNUNET_SYSERR; + } /* find out which task priority level we are going to process this time */ @@ -1916,49 +1946,74 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) while (NULL != (pos = ready_head[p])) { GNUNET_CONTAINER_DLL_remove (ready_head[p], - ready_tail[p], - pos); + ready_tail[p], + pos); ready_count--; current_priority = pos->priority; current_lifeness = pos->lifeness; active_task = pos; #if PROFILE_DELAYS if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > - DELAY_THRESHOLD.rel_value_us) + DELAY_THRESHOLD.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p took %s to be scheduled\n", - pos, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), - GNUNET_YES)); + "Task %p took %s to be scheduled\n", + pos, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), + GNUNET_YES)); } #endif tc.reason = pos->reason; GNUNET_NETWORK_fdset_zero (sh->rs); GNUNET_NETWORK_fdset_zero (sh->ws); + // FIXME: do we have to remove FdInfos from fds if they are not ready? tc.fds_len = pos->fds_len; tc.fds = pos->fds; - tc.read_ready = (NULL == pos->read_set) ? sh->rs : pos->read_set; - if ( (-1 != pos->read_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY)) ) - GNUNET_NETWORK_fdset_set_native (sh->rs, - pos->read_fd); - tc.write_ready = (NULL == pos->write_set) ? sh->ws : pos->write_set; - if ((-1 != pos->write_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) - GNUNET_NETWORK_fdset_set_native (sh->ws, - pos->write_fd); + for (int i = 0; i != pos->fds_len; ++i) + { + struct GNUNET_SCHEDULER_FdInfo *fdi = &pos->fds[i]; + if (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et)) + { + GNUNET_NETWORK_fdset_set_native (sh->rs, + fdi->sock); + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)) + { + GNUNET_NETWORK_fdset_set_native (sh->ws, + fdi->sock); + } + } + tc.read_ready = sh->rs; + tc.write_ready = sh->ws; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", - pos); + "Running task %p\n", + pos); + GNUNET_assert (NULL != pos->callback); pos->callback (pos->callback_cls); + if (NULL != pos->fds) + { + int del_result = scheduler_driver->del (scheduler_driver->cls, pos); + if (GNUNET_OK != del_result) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not delete task\n"); + GNUNET_assert (0); + } + } active_task = NULL; dump_backtrace (pos); destroy_task (pos); tasks_run++; } + shutdown_if_no_lifeness (); if (0 == ready_count) + { + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); return GNUNET_NO; + } + scheduler_driver->set_wakeup (scheduler_driver->cls, + GNUNET_TIME_absolute_get ()); return GNUNET_OK; } @@ -1981,8 +2036,8 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) */ int GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, - GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + GNUNET_SCHEDULER_TaskCallback task, + void *task_cls) { int ret; struct GNUNET_SIGNAL_Context *shc_int; @@ -1997,7 +2052,6 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, #endif struct GNUNET_SCHEDULER_Task tsk; const struct GNUNET_DISK_FileHandle *pr; - struct GNUNET_SCHEDULER_Handle sh; /* general set-up */ GNUNET_assert (NULL == active_task); @@ -2009,54 +2063,56 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, GNUNET_assert (NULL != shutdown_pipe_handle); pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); my_pid = getpid (); + scheduler_driver = driver; /* install signal handlers */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering signal handlers\n"); shc_int = GNUNET_SIGNAL_handler_install (SIGINT, - &sighandler_shutdown); + &sighandler_shutdown); shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, - &sighandler_shutdown); + &sighandler_shutdown); #if (SIGTERM != GNUNET_TERM_SIG) shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG, - &sighandler_shutdown); + &sighandler_shutdown); #endif #ifndef MINGW shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE, - &sighandler_pipe); + &sighandler_pipe); shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, - &sighandler_shutdown); + &sighandler_shutdown); shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, - &sighandler_shutdown); + &sighandler_shutdown); #endif /* Setup initial tasks */ current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT; - current_lifeness = GNUNET_YES; + current_lifeness = GNUNET_NO; memset (&tsk, - 0, - sizeof (tsk)); + 0, + sizeof (tsk)); active_task = &tsk; - tsk.sh = &sh; + GNUNET_SCHEDULER_add_now (&GNUNET_OS_install_parent_control_handler, + NULL); + GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, + pr, + &shutdown_cb, + NULL); + current_lifeness = GNUNET_YES; GNUNET_SCHEDULER_add_with_reason_and_priority (task, task_cls, GNUNET_SCHEDULER_REASON_STARTUP, GNUNET_SCHEDULER_PRIORITY_DEFAULT); - GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO, - &GNUNET_OS_install_parent_control_handler, - NULL); active_task = NULL; - driver->set_wakeup (driver->cls, - GNUNET_TIME_absolute_get ()); - + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); /* begin main event loop */ sh.rs = GNUNET_NETWORK_fdset_create (); sh.ws = GNUNET_NETWORK_fdset_create (); - sh.driver = driver; + GNUNET_NETWORK_fdset_handle_set (sh.rs, pr); ret = driver->loop (driver->cls, - &sh); + &sh); GNUNET_NETWORK_fdset_destroy (sh.rs); GNUNET_NETWORK_fdset_destroy (sh.ws); @@ -2073,20 +2129,211 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, #endif GNUNET_DISK_pipe_close (shutdown_pipe_handle); shutdown_pipe_handle = NULL; + scheduler_driver = NULL; + return ret; +} + + +int +select_add (void *cls, + struct GNUNET_SCHEDULER_Task *task, + struct GNUNET_SCHEDULER_FdInfo *fdi) +{ + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + GNUNET_assert (NULL != task); + GNUNET_assert (NULL != fdi); + GNUNET_assert (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et) || + 0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)); + + if (!((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (fdi->sock < 0)) + { + /* exactly one out of {fd, hf} must be != NULL and the OS handle must be valid */ + return GNUNET_SYSERR; + } + + struct Scheduled *scheduled = GNUNET_new (struct Scheduled); + scheduled->task = task; + scheduled->fdi = fdi; + scheduled->et = fdi->et; + + GNUNET_CONTAINER_DLL_insert (context->scheduled_head, + context->scheduled_tail, + scheduled); + return GNUNET_OK; +} + + +int +select_del (void *cls, + struct GNUNET_SCHEDULER_Task *task) +{ + struct DriverContext *context; + struct Scheduled *pos; + int ret; + + GNUNET_assert (NULL != cls); + + context = cls; + ret = GNUNET_SYSERR; + pos = context->scheduled_head; + while (NULL != pos) + { + struct Scheduled *next = pos->next; + if (pos->task == task) + { + GNUNET_CONTAINER_DLL_remove (context->scheduled_head, + context->scheduled_tail, + pos); + GNUNET_free (pos); + ret = GNUNET_OK; + } + pos = next; + } return ret; } +int +select_loop (void *cls, + struct GNUNET_SCHEDULER_Handle *sh) +{ + struct GNUNET_NETWORK_FDSet *rs; + struct GNUNET_NETWORK_FDSet *ws; + struct DriverContext *context; + int select_result; + int tasks_ready; + + context = cls; + GNUNET_assert (NULL != context); + rs = GNUNET_NETWORK_fdset_create (); + ws = GNUNET_NETWORK_fdset_create (); + tasks_ready = GNUNET_NO; + while (NULL != context->scheduled_head || + GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != context->timeout.rel_value_us) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "select timeout = %s\n", + GNUNET_STRINGS_relative_time_to_string (context->timeout, GNUNET_NO)); + + GNUNET_NETWORK_fdset_zero (rs); + GNUNET_NETWORK_fdset_zero (ws); + struct Scheduled *pos; + for (pos = context->scheduled_head; NULL != pos; pos = pos->next) + { + if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et)) + { + GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock); + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et)) + { + GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock); + } + } + if (NULL == scheduler_select) + { + select_result = GNUNET_NETWORK_socket_select (rs, + ws, + NULL, + context->timeout); + } + else + { + select_result = scheduler_select (scheduler_select_cls, + rs, + ws, + NULL, + context->timeout); + } + if (select_result == GNUNET_SYSERR) + { + if (errno == EINTR) + continue; + + LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); +#ifndef MINGW +#if USE_LSOF + char lsof[512]; + + snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); + (void) close (1); + (void) dup2 (2, 1); + if (0 != system (lsof)) + LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, + "system"); +#endif +#endif +#if DEBUG_FDS + struct Scheduled *s; + for (s = context->scheduled_head; NULL != s; s = s->next) + { + int flags = fcntl (s->fdi->sock, F_GETFD); + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + s->fdi->sock); + } + } +#endif + GNUNET_assert (0); + return GNUNET_SYSERR; + } + for (pos = context->scheduled_head; NULL != pos; pos = pos->next) + { + int is_ready = GNUNET_NO; + if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et) && + GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, pos->fdi->sock)) + { + pos->fdi->et |= GNUNET_SCHEDULER_ET_IN; + is_ready = GNUNET_YES; + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et) && + GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, pos->fdi->sock)) + { + pos->fdi->et |= GNUNET_SCHEDULER_ET_OUT; + is_ready = GNUNET_YES; + } + if (GNUNET_YES == is_ready) + { + GNUNET_SCHEDULER_task_ready (pos->task, pos->fdi); + } + } + tasks_ready = GNUNET_SCHEDULER_run_from_driver (sh); + GNUNET_assert (GNUNET_SYSERR != tasks_ready); + } + return GNUNET_OK; +} + + +void +select_set_wakeup(void *cls, + struct GNUNET_TIME_Absolute dt) +{ + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + + context->timeout = GNUNET_TIME_absolute_get_remaining (dt); +} + + /** * Obtain the driver for using select() as the event loop. * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select () { - GNUNET_break (0); // not implemented - return NULL; + struct GNUNET_SCHEDULER_Driver *select_driver; + select_driver = GNUNET_new (struct GNUNET_SCHEDULER_Driver); + + select_driver->loop = &select_loop; + select_driver->add = &select_add; + select_driver->del = &select_del; + select_driver->set_wakeup = &select_set_wakeup; + + return select_driver; } diff --git a/src/util/service.c b/src/util/service.c @@ -1932,6 +1932,11 @@ do_send (void *cls) size_t left; const char *buf; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "service: sending message with type %u", + ntohs(client->msg->type)); + + client->send_task = NULL; buf = (const char *) client->msg; left = ntohs (client->msg->size) - client->msg_pos; @@ -1941,6 +1946,8 @@ do_send (void *cls) GNUNET_assert (ret <= (ssize_t) left); if (0 == ret) { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "no data send"); GNUNET_MQ_inject_error (client->mq, GNUNET_MQ_ERROR_WRITE); return; @@ -1958,6 +1965,9 @@ do_send (void *cls) if (EPIPE != errno) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "socket send returned with error code %i", + errno); GNUNET_MQ_inject_error (client->mq, GNUNET_MQ_ERROR_WRITE); return; @@ -2402,7 +2412,7 @@ resume_client_receive (void *cls) GNUNET_YES); if (GNUNET_SYSERR == ret) { - if (NULL != c->drop_task) + if (NULL == c->drop_task) GNUNET_SERVICE_client_drop (c); return; } @@ -2431,6 +2441,7 @@ resume_client_receive (void *cls) void GNUNET_SERVICE_client_continue (struct GNUNET_SERVICE_Client *c) { + GNUNET_assert (NULL == c->drop_task); GNUNET_assert (GNUNET_YES == c->needs_continue); GNUNET_assert (NULL == c->recv_task); c->needs_continue = GNUNET_NO; @@ -2513,6 +2524,24 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c) { struct GNUNET_SERVICE_Handle *sh = c->sh; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client dropped: %p (MQ: %p)\n", + c, + c->mq); + +#if EXECINFO + void *backtrace_array[MAX_TRACE_DEPTH]; + int num_backtrace_strings = backtrace (backtrace_array, MAX_TRACE_DEPTH); + char **backtrace_strings = + backtrace_symbols (backtrace_array, + t->num_backtrace_strings); + for (unsigned int i = 0; i < num_backtrace_strings; i++) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "client drop trace %u: %s\n", + i, + backtrace_strings[i]); +#endif + if (NULL != c->drop_task) { /* asked to drop twice! */ diff --git a/src/vpn/gnunet-service-vpn.c b/src/vpn/gnunet-service-vpn.c @@ -2218,6 +2218,9 @@ route_packet (struct DestinationEntry *destination, * * @param cls closure, NULL * @param message message we got from the client (VPN channel interface) + * @return #GNUNET_OK on success, + * #GNUNET_NO to stop further processing (no error) + * #GNUNET_SYSERR to stop further processing with error */ static int message_token (void *cls,