gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2
parent 1bf8c98f6d843f30e9abfa6dde31e31e50170c06
Author: Gabor X Toth <*@tg-x.net>
Date:   Wed, 12 Mar 2014 16:39:41 +0000

PSYC: in-order delivery of fragments; tests for large messages

Cache message received fragments from multicast and deliver them
in the correct order to clients.

Test messages with large modifier and data payloads.

Diffstat:
Msrc/include/gnunet_psyc_service.h | 17+++++++++++------
Msrc/multicast/multicast_api.c | 30+++++++++++++++++-------------
Msrc/psyc/gnunet-service-psyc.c | 315+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Msrc/psyc/psyc.h | 4++--
Msrc/psyc/psyc_api.c | 44++++++++++++++++++++++++++++----------------
Msrc/psyc/psyc_common.c | 24+++++++++++++++---------
Msrc/psyc/test_psyc.c | 66+++++++++++++++++++++++++++++++++++++++++++++++++++---------------
7 files changed, 418 insertions(+), 82 deletions(-)

diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h @@ -444,10 +444,14 @@ typedef int * contain: "name\0value". If the whole value does not fit, subsequent * calls to this function should write continuations of the value to * @a data. - * @param oper Where to write the operator of the modifier. Only needed during - * the first call to this callback at the beginning of the modifier. - * In case of subsequent calls asking for value continuations @a oper is - * set to #NULL. + * @param[out] oper Where to write the operator of the modifier. + * Only needed during the first call to this callback at the beginning + * of the modifier. In case of subsequent calls asking for value + * continuations @a oper is set to #NULL. + * @param[out] value_size Where to write the full size of the value. + * Only needed during the first call to this callback at the beginning + * of the modifier. In case of subsequent calls asking for value + * continuations @a value_size is set to #NULL. * @return #GNUNET_SYSERR on error (fatal, aborts transmission) * #GNUNET_NO on success, if more data is to be transmitted later. * Should be used if @a data_size was not big enough to take all the @@ -461,7 +465,8 @@ typedef int (*GNUNET_PSYC_TransmitNotifyModifier) (void *cls, uint16_t *data_size, void *data, - uint8_t *oper); + uint8_t *oper, + uint32_t *value_size); /** * Flags for transmitting messages to a channel by the master. @@ -659,7 +664,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, * @param th Handle of the request that is being resumed. */ void -GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th); +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th); /** diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c @@ -178,17 +178,21 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, const struct GNUNET_MessageHeader *msg = cls; struct GNUNET_MULTICAST_Group *grp = group; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling message callback for a message of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); - if (GNUNET_YES == grp->is_origin) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling origin's message callback " + "for a message of type %u and size %u.\n", + ntohs (msg->type), ntohs (msg->size)); struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; orig->message_cb (orig->cls, msg); } else { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling slave's message callback " + "for a message of type %u and size %u.\n", + ntohs (msg->type), ntohs (msg->size)); struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; mem->message_cb (mem->cls, msg); } @@ -449,8 +453,8 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc struct GNUNET_MULTICAST_Origin *orig = cls; struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; - size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; - char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = ""; struct GNUNET_MULTICAST_MessageHeader *msg = (struct GNUNET_MULTICAST_MessageHeader *) buf; int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); @@ -495,9 +499,9 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc handle_multicast_message (&orig->grp, msg); if (GNUNET_NO == ret) - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_origin_to_all, orig); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), + schedule_origin_to_all, orig); } @@ -526,10 +530,10 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, mh->notify = notify; mh->notify_cls = notify_cls; - /* FIXME: remove delay, it's there only for testing */ - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_origin_to_all, origin); + /* add some delay for testing */ + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), + schedule_origin_to_all, origin); return &origin->msg_handle; } diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c @@ -24,6 +24,8 @@ * @author Gabor X Toth */ +#include <inttypes.h> + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" @@ -77,6 +79,45 @@ struct TransmitMessage uint8_t state; }; + +/** + * Cache for received message fragments. + * Message fragments are only sent to clients after all modifiers arrived. + * + * chan_key -> MultiHashMap chan_msgs + */ +static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; + + +/** + * Entry in the chan_msgs hashmap of @a recv_cache: + * fragment_id -> FragmentEntry + */ +struct FragmentEntry +{ + struct GNUNET_MULTICAST_MessageHeader *mmsg; + uint16_t ref_count; +}; + + +/** + * Entry in the @a recv_msgs hash map of a @a Channel. + * message_id -> FragmentCache + */ +struct FragmentCache +{ + /** + * Total size of header fragments (METHOD & MODIFIERs) + */ + uint64_t header_size; + + /** + * Fragment IDs stored in @a recv_cache. + */ + struct GNUNET_CONTAINER_Heap *fragments; +}; + + /** * Common part of the client context for both a master and slave channel. */ @@ -87,6 +128,12 @@ struct Channel struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; + /** + * Received fragments not yet sent to the client. + * message_id -> FragmentCache + */ + struct GNUNET_CONTAINER_MultiHashMap *recv_msgs; + GNUNET_SCHEDULER_TaskIdentifier tmit_task; /** @@ -213,6 +260,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void client_cleanup (struct Channel *ch) { + /* FIXME: fragment_cache_clear */ + if (ch->is_master) { struct Master *mst = (struct Master *) ch; @@ -323,6 +372,189 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg) } +static void +message_to_client (struct Channel *ch, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + uint16_t size = ntohs (mmsg->header.size); + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending message to client. " + "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", + ch, GNUNET_ntohll (mmsg->fragment_id), + GNUNET_ntohll (mmsg->message_id)); + + pmsg = GNUNET_malloc (psize); + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + + GNUNET_SERVER_notification_context_add (nc, ch->client); + GNUNET_SERVER_notification_context_unicast (nc, ch->client, + (const struct GNUNET_MessageHeader *) pmsg, + GNUNET_NO); + GNUNET_free (pmsg); +} + + +/** + * Convert an uint64_t in network byte order to a HashCode + * that can be used as key in a MultiHashMap + */ +static inline void +hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) +{ + /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ + + n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); + n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); + + *key = (struct GNUNET_HashCode) {{ 0 }}; + *((uint64_t *) key) + = (n << 32) | (n >> 32); +} + + +/** + * Convert an uint64_t in host byte order to a HashCode + * that can be used as key in a MultiHashMap + */ +static inline void +hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) +{ +#if __BYTE_ORDER == __BIG_ENDIAN + hash_key_from_nll (key, n); +#elif __BYTE_ORDER == __LITTLE_ENDIAN + *key = (struct GNUNET_HashCode) {{ 0 }}; + *((uint64_t *) key) = n; +#else + #error byteorder undefined +#endif +} + + +static void +fragment_cache_insert (struct Channel *ch, + const struct GNUNET_HashCode *chan_key_hash, + const struct GNUNET_HashCode *msg_id, + struct FragmentCache *frag_cache, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint16_t last_part_type) +{ + uint16_t size = ntohs (mmsg->header.size); + struct GNUNET_CONTAINER_MultiHashMap + *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash); + + if (NULL == frag_cache) + { + frag_cache = GNUNET_new (struct FragmentCache); + frag_cache->fragments + = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + + if (NULL == ch->recv_msgs) + { + ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + } + GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + if (NULL == chan_msgs) + { + chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (recv_cache, chan_key_hash, chan_msgs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + } + + struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode); + hash_key_from_nll (frag_id, mmsg->fragment_id); + struct FragmentEntry + *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); + if (NULL == frag_entry) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Adding message fragment to cache. " + "fragment_id: %" PRIu64 ", " + "header_size: %" PRIu64 " + %" PRIu64 ").\n", + ch, GNUNET_ntohll (mmsg->fragment_id), + frag_cache->header_size, size); + frag_entry = GNUNET_new (struct FragmentEntry); + frag_entry->ref_count = 1; + frag_entry->mmsg = GNUNET_malloc (size); + memcpy (frag_entry->mmsg, mmsg, size); + GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + else + { + frag_entry->ref_count++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Message fragment already in cache. " + "fragment_id: %" PRIu64 ", ref_count: %u\n", + ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count); + } + + switch (last_part_type) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: + frag_cache->header_size += size; + } + GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id, + GNUNET_ntohll (mmsg->fragment_id)); +} + + +static void +fragment_cache_clear (struct Channel *ch, + const struct GNUNET_HashCode *chan_key_hash, + const struct GNUNET_HashCode *msg_id, + struct FragmentCache *frag_cache, + uint8_t send_to_client) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Clearing message fragment cache.\n", ch); + + struct GNUNET_CONTAINER_MultiHashMap + *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash); + GNUNET_assert (NULL != chan_msgs); + struct GNUNET_HashCode *frag_id; + + while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments))) + { + struct FragmentEntry + *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); + if (frag_entry != NULL) + { + if (GNUNET_YES == send_to_client) + { + message_to_client (ch, frag_entry->mmsg); + } + if (1 == frag_entry->ref_count) + { + GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry); + GNUNET_free (frag_entry->mmsg); + GNUNET_free (frag_entry); + } + else + { + frag_entry->ref_count--; + } + } + GNUNET_free (frag_id); + } + + GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache); + GNUNET_CONTAINER_heap_destroy (frag_cache->fragments); + GNUNET_free (frag_cache); +} + + /** * Incoming message fragment from multicast. * @@ -358,11 +590,15 @@ message_cb (struct Channel *ch, rcb, rcb_cls); #endif - const struct GNUNET_MULTICAST_MessageHeader *mmsg - = (const struct GNUNET_MULTICAST_MessageHeader *) msg; + const struct GNUNET_MULTICAST_MessageHeader + *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg; - if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), - (const char *) &mmsg[1])) + uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg), + (const char *) &mmsg[1]); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Last message part type %u\n", ptype); + + if (GNUNET_NO == ptype) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Received message with invalid parts from multicast. " @@ -371,20 +607,55 @@ message_cb (struct Channel *ch, break; } - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = mmsg->message_id; + struct GNUNET_HashCode msg_id; + hash_key_from_nll (&msg_id, mmsg->message_id); - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + struct FragmentCache *frag_cache = NULL; + if (NULL != ch->recv_msgs) + frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id); - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, - (const struct GNUNET_MessageHeader *) pmsg, - GNUNET_NO); - GNUNET_free (pmsg); + switch (ptype) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: + /* FIXME: check state flag / max_state_message_id */ + if (NULL == frag_cache) + { + message_to_client (ch, mmsg); + break; + } + else + { + if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size) + { /* first data fragment after the header, send cached fragments */ + fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_YES); + message_to_client (ch, mmsg); + break; + } + else + { /* still missing fragments from the header, cache data fragment */ + /* fall thru */ + } + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: + /* not all modifiers arrived yet, cache fragment */ + fragment_cache_insert (ch, chan_key_hash, &msg_id, frag_cache, mmsg, ptype); + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: + if (NULL != frag_cache) + { /* fragments not yet sent to client, remove from cache */ + fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_NO); + } + else + { + message_to_client (ch, mmsg); + } + break; + } break; } default: @@ -457,8 +728,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, const struct GNUNET_MULTICAST_RequestHeader *req = (const struct GNUNET_MULTICAST_RequestHeader *) msg; - if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req), - (const char *) &req[1])) + /* FIXME: see message_cb() */ + if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req), + (const char *) &req[1])) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping message with invalid parts " @@ -826,7 +1098,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, if (GNUNET_YES != ch->ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Ignoring message from client, channel is not ready yet.\n", + "%p Dropping message from client, channel is not ready yet.\n", ch); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; @@ -912,11 +1184,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, store = GNUNET_PSYCSTORE_connect (cfg); stats = GNUNET_STATISTICS_create ("psyc", cfg); clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); nc = GNUNET_SERVER_notification_context_create (server, 1); GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, - NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, NULL); } diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h @@ -31,8 +31,8 @@ #include "gnunet_psyc_service.h" -int -GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data); +uint16_t +GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data); void GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c @@ -336,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch) /** - * Queue an incoming message part for transmission to the PSYC service. + * Queue a message part for transmission to the PSYC service. * * The message part is added to the current message buffer. * When this buffer is full, it is added to the transmission queue. @@ -390,7 +390,7 @@ queue_message (struct GNUNET_PSYC_Channel *ch, op->msg->size = sizeof (*op->msg) + size; memcpy (&op->msg[1], msg, size); } - + if (NULL != op && (GNUNET_YES == end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD @@ -433,12 +433,12 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); - notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, - &data_size, &mod[1], &mod->oper); + notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], + &mod->oper, &mod->value_size); mod->name_size = strnlen ((char *) &mod[1], data_size); if (mod->name_size < data_size) { - mod->value_size = htons (data_size - 1 - mod->name_size); + mod->value_size = htonl (mod->value_size); mod->name_size = htons (mod->name_size); } else if (0 < data_size) @@ -451,10 +451,10 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) case MSG_STATE_MOD_CONT: { max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); msg->size = sizeof (struct GNUNET_MessageHeader); notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, - &data_size, &msg[1], NULL); + &data_size, &msg[1], NULL, NULL); break; } default: @@ -669,6 +669,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, ch->recv_message_id = GNUNET_ntohll (msg->message_id); ch->recv_flags = flags; ch->recv_slave_key = msg->slave_key; + ch->recv_mod_value_size = 0; + ch->recv_mod_value_size_expected = 0; } else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) { @@ -703,7 +705,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Discarding message of type %u with invalid size %u.\n", + "Dropping message of type %u with invalid size %u.\n", ptype, psize); recv_error (ch); return; @@ -753,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, if (MSG_STATE_START != ch->recv_state) { LOG (GNUNET_ERROR_TYPE_WARNING, - "Discarding out of order message method.\n"); + "Dropping out of order message method (%u).\n", + ch->recv_state); /* It is normal to receive an incomplete message right after connecting, * but should not happen later. * FIXME: add a check for this condition. @@ -766,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, if ('\0' != *((char *) meth + psize - 1)) { LOG (GNUNET_ERROR_TYPE_WARNING, - "Discarding message with malformed method. " + "Dropping message with malformed method. " "Message ID: %" PRIu64 "\n", ch->recv_message_id); GNUNET_break_op (0); recv_error (ch); @@ -782,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, || MSG_STATE_MOD_CONT == ch->recv_state)) { LOG (GNUNET_ERROR_TYPE_WARNING, - "Discarding out of order message modifier.\n"); + "Dropping out of order message modifier (%u).\n", + ch->recv_state); GNUNET_break_op (0); recv_error (ch); return; @@ -792,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, = (struct GNUNET_PSYC_MessageModifier *) pmsg; uint16_t name_size = ntohs (mod->name_size); - ch->recv_mod_value_size_expected = ntohs (mod->value_size); + ch->recv_mod_value_size_expected = ntohl (mod->value_size); ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; if (psize < sizeof (*mod) + name_size + 1 || '\0' != *((char *) &mod[1] + name_size) || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) { - LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); + LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); GNUNET_break_op (0); recv_error (ch); return; @@ -816,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, - "Discarding out of order message modifier continuation.\n"); + "Dropping out of order message modifier continuation " + "!(%u == %u || %u == %u) || %lu < %lu.\n", + MSG_STATE_MODIFIER, ch->recv_state, + MSG_STATE_MOD_CONT, ch->recv_state, + ch->recv_mod_value_size_expected, ch->recv_mod_value_size); GNUNET_break_op (0); recv_error (ch); return; @@ -829,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) { LOG (GNUNET_ERROR_TYPE_WARNING, - "Discarding out of order message data fragment.\n"); + "Dropping out of order message data fragment " + "(%u < %u || %lu != %lu).\n", + ch->recv_state, MSG_STATE_METHOD, + ch->recv_mod_value_size_expected, ch->recv_mod_value_size); + GNUNET_break_op (0); recv_error (ch); return; @@ -1412,7 +1424,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, * @param th Handle of the request that is being resumed. */ void -GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) { channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); } diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c @@ -33,28 +33,33 @@ * @param data_size Size of @a data. * @param data Data. * - * @return GNUNET_YES or GNUNET_NO + * @return Message type number + * or GNUNET_NO if the message contains invalid or no parts. */ -int -GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data) +uint16_t +GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data) { const struct GNUNET_MessageHeader *pmsg; + uint16_t ptype = GNUNET_NO; uint16_t psize = 0; uint16_t pos = 0; - for (pos = 0; data_size + pos < data_size; pos += psize) + for (pos = 0; pos < data_size; pos += psize) { pmsg = (const struct GNUNET_MessageHeader *) (data + pos); psize = ntohs (pmsg->size); - if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size) + ptype = ntohs (pmsg->type); + if (psize < sizeof (*pmsg) || pos + psize > data_size + || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD + || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Invalid message part of type %u and size %u.", - ntohs (pmsg->type), psize); + "Invalid message part of type %u and size %u.\n", + ptype, psize); return GNUNET_NO; } } - return GNUNET_YES; + return ptype; } @@ -89,7 +94,8 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, uint16_t name_size = ntohs (mod->name_size); char oper = ' ' < mod->oper ? mod->oper : ' '; GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], - ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1); + size - sizeof (*mod) - name_size - 1, + ((char *) &mod[1]) + name_size + 1); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c @@ -35,7 +35,7 @@ #include "gnunet_env_lib.h" #include "gnunet_psyc_service.h" -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) #define DEBUG_SERVICE 1 @@ -72,6 +72,7 @@ struct TransmitClosure char *data[16]; const char *mod_value; size_t mod_value_size; + uint8_t data_delay[16]; uint8_t data_count; uint8_t paused; uint8_t n; @@ -259,13 +260,16 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); struct TransmitClosure *tmit = cls; - tmit->paused = GNUNET_NO; - GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit); + if (NULL != tmit->mst_tmit) + GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit); + else + GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit); } static int -tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) +tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, + uint32_t *full_value_size) { struct TransmitClosure *tmit = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -288,6 +292,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) return GNUNET_YES; } + GNUNET_assert (value_size < UINT32_MAX); + *full_value_size = value_size; *oper = op; name_size = strlen (name); @@ -351,7 +357,7 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) uint16_t size = strlen (tmit->data[tmit->n]); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmit notify data: %lu bytes available, " + "Transmit notify data: %u bytes available, " "processing fragment %u/%u (size %u).\n", *data_size, tmit->n + 1, tmit->data_count, size); if (*data_size < size) @@ -361,17 +367,18 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) return GNUNET_SYSERR; } - if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) + if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) { - /* Send last fragment later. */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); tmit->paused = GNUNET_YES; - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 3), - &transmit_resume, tmit); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + tmit->data_delay[tmit->n]), + &transmit_resume, tmit); *data_size = 0; return GNUNET_NO; } + tmit->paused = GNUNET_NO; *data_size = size; memcpy (data, tmit->data[tmit->n], size); @@ -416,8 +423,9 @@ slave_join () GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, "_foo_bar", "foo bar baz", 11); slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, - 16, relays, &slave_message, &join_request, &slave_joined, - NULL, "_request_join", env, "some data", 9); + 16, relays, &slave_message, &join_request, + &slave_joined, NULL, "_request_join", env, + "some data", 9); GNUNET_ENV_environment_destroy (env); } @@ -427,17 +435,45 @@ master_transmit () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); test = TEST_MASTER_TRANSMIT; + uint32_t i, j; + + char *name_max = "_test_max"; + uint8_t name_max_size = sizeof ("_test_max"); + char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD); + for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++) + val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.'; + + char *name_cont = "_test_cont"; + uint8_t name_cont_size = sizeof ("_test_cont"); + char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD + + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); + for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++) + val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':'; + for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++) + val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!'; tmit = GNUNET_new (struct TransmitClosure); tmit->env = GNUNET_ENV_environment_create (); GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, "_foo", "bar baz", 7); GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, + name_max, val_max, + GNUNET_PSYC_MODIFIER_MAX_PAYLOAD + - name_max_size); + GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, "_foo_bar", "foo bar baz", 11); + GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, + name_cont, val_cont, + GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size + + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); tmit->data[0] = "foo"; - tmit->data[1] = "foo bar"; - tmit->data[2] = "foo bar baz"; - tmit->data_count = 3; + tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1); + for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++) + tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_'; + tmit->data[2] = "foo bar"; + tmit->data[3] = "foo bar baz"; + tmit->data_delay[1] = 3; + tmit->data_count = 4; tmit->mst_tmit = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, tmit_notify_data, tmit,