gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit 172ab07eeb1215cc9d22dabc589f7529ac2d59ea
parent d10808d7f17c5f6f1356c22ef0992965cbaf5ce1
Author: Gabor X Toth <*@tg-x.net>
Date:   Sat,  9 Nov 2013 23:12:27 +0000

psyc: handling messages from multicast and passing them to clients; pause/resume fixes

Diffstat:
Msrc/multicast/multicast_api.c | 11++++++-----
Msrc/psyc/gnunet-service-psyc.c | 320++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Msrc/psyc/psyc_api.c | 94++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
Msrc/psyc/test_psyc.c | 50+++++++++++++++++++++++++++++---------------------
4 files changed, 361 insertions(+), 114 deletions(-)

diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c @@ -363,11 +363,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; struct GNUNET_MULTICAST_MessageHeader *msg - = GNUNET_malloc (sizeof (*msg) + buf_size); + = GNUNET_malloc (buf_size); + buf_size -= sizeof (*msg); int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE) + || sizeof (*msg) + buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE) { LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify() returned error or invalid message size.\n"); @@ -379,15 +380,15 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc return; /* Transmission paused. */ msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); - msg->header.size = htons (buf_size); + msg->header.size = htons (sizeof (*msg) + buf_size); msg->message_id = mh->message_id; msg->group_generation = mh->group_generation; /* FIXME: add fragment ID and signature in the service instead of here */ msg->fragment_id = orig->next_fragment_id++; msg->fragment_offset = mh->fragment_offset; - mh->fragment_offset += buf_size; - msg->purpose.size = htonl (buf_size + mh->fragment_offset += sizeof (*msg) + buf_size; + msg->purpose.size = htonl (sizeof (*msg) + buf_size - sizeof (msg->header) - sizeof (msg->hop_counter) - sizeof (msg->signature)); diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c @@ -56,7 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc; static struct GNUNET_PSYCSTORE_Handle *store; /** - * channel's pub_key_hash -> struct Channel + * Channel's pub_key_hash -> struct Channel */ static struct GNUNET_CONTAINER_MultiHashMap *clients; @@ -70,6 +70,9 @@ struct TransmitMessage char *buf; uint16_t size; + /** + * enum GNUNET_PSYC_DataStatus + */ uint8_t status; }; @@ -83,15 +86,17 @@ struct Channel struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; - char *tmit_buf; GNUNET_SCHEDULER_TaskIdentifier tmit_task; uint32_t tmit_mod_count; uint32_t tmit_mod_recvd; - uint16_t tmit_size; + /** + * enum GNUNET_PSYC_DataStatus + */ uint8_t tmit_status; uint8_t in_transmit; uint8_t is_master; + uint8_t disconnected; }; /** @@ -142,6 +147,10 @@ struct Slave }; +static void +transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); + + /** * Task run during shutdown. * @@ -163,6 +172,30 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } } + +static void +client_cleanup (struct Channel *ch) +{ + if (ch->is_master) + { + struct Master *mst = (struct Master *) ch; + if (NULL != mst->origin) + GNUNET_MULTICAST_origin_stop (mst->origin); + } + else + { + struct Slave *slv = (struct Slave *) ch; + if (NULL != slv->join_req) + GNUNET_free (slv->join_req); + if (NULL != slv->relays) + GNUNET_free (slv->relays); + if (NULL != slv->member) + GNUNET_MULTICAST_member_part (slv->member); + } + + GNUNET_free (ch); +} + /** * Called whenever a client is disconnected. * Frees our resources associated with that client. @@ -188,30 +221,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) return; } - if (NULL != ch->tmit_buf) - { - GNUNET_free (ch->tmit_buf); - ch->tmit_buf = NULL; - } + ch->disconnected = GNUNET_YES; - if (ch->is_master) + /* Send pending messages to multicast before cleanup. */ + if (NULL != ch->tmit_head) { - struct Master *mst = (struct Master *) ch; - if (NULL != mst->origin) - GNUNET_MULTICAST_origin_stop (mst->origin); + transmit_message (ch, GNUNET_TIME_UNIT_ZERO); } else { - struct Slave *slv = (struct Slave *) ch; - if (NULL != slv->join_req) - GNUNET_free (slv->join_req); - if (NULL != slv->relays) - GNUNET_free (slv->relays); - if (NULL != slv->member) - GNUNET_MULTICAST_member_part (slv->member); + client_cleanup (ch); } - - GNUNET_free (ch); } void @@ -259,14 +279,98 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, } + +void +fragment_store_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "fragment_store() returned %l (%s)\n", result, err_msg); +} + +/** + * Send PSYC messages in an incoming multicast message to a client. + */ +int +send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan) +{ + const struct GNUNET_MULTICAST_MessageHeader *msg = cls; + struct Channel *ch = chan; + + uint16_t size = ntohs (msg->header.size); + uint16_t pos = 0; + + while (sizeof (*msg) + pos < size) + { + const struct GNUNET_MessageHeader *pmsg + = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); + uint16_t psize = ntohs (pmsg->size); + if (sizeof (*msg) + pos + psize > size) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Ignoring message of type %u with invalid size. " + "(%u + %u + %u > %u)\n", ntohs (pmsg->type), + sizeof (*msg), pos, psize, size); + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message of type %u and size %u to client.\n", + ntohs (pmsg->type), psize); + + GNUNET_SERVER_notification_context_add (nc, ch->client); + GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg, + GNUNET_NO); + pos += psize; + } + return GNUNET_YES; +} + + +/** + * Incoming message fragment from multicast. + * + * Store it using PSYCstore and send it to all clients of the channel. + */ void message_cb (void *cls, const struct GNUNET_MessageHeader *msg) { + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from multicast.\n", - ntohs (msg->type)); + "Received message of type %u and size %u from multicast.\n", + type, size); + + struct Channel *ch = cls; + struct Master *mst = cls; + struct Slave *slv = cls; + + struct GNUNET_CRYPTO_EddsaPublicKey *ch_key + = ch->is_master ? &mst->pub_key : &slv->chan_key; + struct GNUNET_HashCode *ch_key_hash + = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; + + switch (type) + { + case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: + GNUNET_PSYCSTORE_fragment_store (store, ch_key, + (const struct + GNUNET_MULTICAST_MessageHeader *) msg, + 0, NULL, NULL); + GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash, + send_to_client, (void *) msg); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Ignoring unknown message of type %u and size %u.\n", + type, size); + } } + +/** + * Response from PSYCstore with the current counter values for a channel master. + */ void master_counters_cb (void *cls, int result, uint64_t max_fragment_id, uint64_t max_message_id, uint64_t max_group_generation, @@ -299,6 +403,9 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, } +/** + * Response from PSYCstore with the current counter values for a channel slave. + */ void slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, uint64_t max_message_id, uint64_t max_group_generation, @@ -332,6 +439,9 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, } +/** + * Handle a connecting client starting a channel master. + */ static void handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) @@ -357,6 +467,9 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * Handle a connecting client joining as a channel slave. + */ static void handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) @@ -389,13 +502,26 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * Send transmission acknowledgement to a client. + * + * Sent after the last GNUNET_PSYC_MessageModifier and after each + * GNUNET_PSYC_MessageData. + * + * @param ch The channel struct for the client. + */ static void send_transmit_ack (struct Channel *ch) { struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); res->header.size = htons (sizeof (*res)); res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); - res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size); + + res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct TransmitMessage *tmit_msg = ch->tmit_tail; + if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status) + res->buf_avail -= tmit_msg->size; + res->buf_avail = htons (res->buf_avail); GNUNET_SERVER_notification_context_add (nc, ch->client); GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, @@ -404,30 +530,53 @@ send_transmit_ack (struct Channel *ch) } +/** + * Callback for the transmit functions of multicast. + */ static int transmit_notify (void *cls, size_t *data_size, void *data) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n"); struct Channel *ch = cls; struct TransmitMessage *msg = ch->tmit_head; - if (NULL == msg || *data_size < ntohs (msg->size)) + if (NULL == msg || *data_size < msg->size) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); *data_size = 0; return GNUNET_NO; } - *data_size = ntohs (msg->size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "transmit_notify: sending %u bytes.\n", msg->size); + + *data_size = msg->size; memcpy (data, msg->buf, *data_size); - GNUNET_free (ch->tmit_buf); - ch->tmit_buf = NULL; GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); + GNUNET_free (msg); + + int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; + + if (0 == ch->tmit_task) + { + if (NULL != ch->tmit_head) + { + transmit_message (ch, GNUNET_TIME_UNIT_ZERO); + } + else if (ch->disconnected) + { + /* FIXME: handle partial message (when still in_transmit) */ + client_cleanup (ch); + } + } - return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; + return ret; } +/** + * Transmit a message from a channel master to the multicast group. + */ static void master_transmit_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) @@ -449,6 +598,9 @@ master_transmit_message (void *cls, } +/** + * Transmit a message from a channel slave to the multicast group. + */ static void slave_transmit_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) @@ -468,50 +620,90 @@ slave_transmit_message (void *cls, } +/** + * Schedule message transmission from a channel to the multicast group. + * + * @param ch The channel. + * @param delay Transmission delay. + */ +static void +transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) +{ + if (0 != ch->tmit_task) + GNUNET_SCHEDULER_cancel (ch->tmit_task); + + ch->tmit_task + = ch->is_master + ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch) + : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); +} + +/** + * Queue incoming message parts from a client for transmission, and send them to + * the multicast group when the buffer is full or reached the end of message. + * + * @param ch Channel struct for the client. + * @param msg Message from the client. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR. + */ static int -buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) +queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) { uint16_t size = ntohs (msg->size); struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; + struct TransmitMessage *tmit_msg = ch->tmit_tail; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queueing message of type %u and size %u " + "for transmission to multicast.\n", + ntohs (msg->type), size); if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) return GNUNET_SYSERR; - if (0 == ch->tmit_size) - { - ch->tmit_buf = GNUNET_malloc (size); - memcpy (ch->tmit_buf, msg, size); - ch->tmit_size = size; - } - else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size) - { - ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size); - memcpy (ch->tmit_buf + ch->tmit_size, msg, size); - ch->tmit_size += size; - } - - if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData)) + if (NULL == tmit_msg + || tmit_msg->status != GNUNET_PSYC_DATA_CONT + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size) { - struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage); - tmit_msg->buf = (char *) msg; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Appending message qto new buffer.\n"); + /* Start filling up new buffer */ + tmit_msg = GNUNET_new (struct TransmitMessage); + tmit_msg->buf = GNUNET_malloc (size); + memcpy (tmit_msg->buf, msg, size); tmit_msg->size = size; tmit_msg->status = ch->tmit_status; GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - tmit_delay = GNUNET_TIME_UNIT_ZERO; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Appending message to existing buffer.\n"); + /* Append to existing buffer */ + tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); + memcpy (tmit_msg->buf + tmit_msg->size, msg, size); + tmit_msg->size += size; + tmit_msg->status = ch->tmit_status; } - if (0 != ch->tmit_task) - GNUNET_SCHEDULER_cancel (ch->tmit_task); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); - ch->tmit_task - = ch->is_master - ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch) - : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch); + /* Wait a bit for the remaining message parts from the client + if there's still some space left in the buffer. */ + if (GNUNET_PSYC_DATA_CONT == tmit_msg->status + && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData) + < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)) + tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); + + transmit_message (ch, tmit_delay); return GNUNET_OK; } +/** + * Incoming method from a client. + */ static void handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) @@ -524,18 +716,16 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, if (GNUNET_NO != ch->in_transmit) { - // FIXME: already transmitting a message, send back error message. + /* FIXME: already transmitting a message, send back error message. */ return; } ch->in_transmit = GNUNET_YES; - ch->tmit_buf = NULL; - ch->tmit_size = 0; ch->tmit_mod_recvd = 0; ch->tmit_mod_count = ntohl (meth->mod_count); ch->tmit_status = GNUNET_PSYC_DATA_CONT; - buffer_message (ch, msg); + queue_message (ch, msg); if (0 == ch->tmit_mod_count) send_transmit_ack (ch); @@ -544,18 +734,23 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, }; +/** + * Incoming modifier from a client. + */ static void handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { + /* const struct GNUNET_PSYC_MessageModifier *mod = (const struct GNUNET_PSYC_MessageModifier *) msg; + */ struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); ch->tmit_mod_recvd++; - buffer_message (ch, msg); + queue_message (ch, msg); if (ch->tmit_mod_recvd == ch->tmit_mod_count) send_transmit_ack (ch); @@ -564,6 +759,9 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, }; +/** + * Incoming data from a client. + */ static void handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) @@ -575,7 +773,7 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_assert (NULL != ch); ch->tmit_status = ntohs (data->status); - buffer_message (ch, msg); + queue_message (ch, msg); send_transmit_ack (ch); if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c @@ -69,12 +69,12 @@ struct GNUNET_PSYC_Channel /** * Head of operations to transmit. */ - struct OperationHandle *transmit_head; + struct OperationHandle *tmit_head; /** * Tail of operations to transmit. */ - struct OperationHandle *transmit_tail; + struct OperationHandle *tmit_tail; /** * Message to send on reconnect. @@ -116,6 +116,16 @@ struct GNUNET_PSYC_Channel * Buffer space available for transmitting the next data fragment. */ uint16_t tmit_buf_avail; + + /** + * Is transmission paused? + */ + uint8_t tmit_paused; + + /** + * Are we still waiting for a PSYC_TRANSMIT_ACK? + */ + uint8_t tmit_ack_pending; }; @@ -243,6 +253,11 @@ static void transmit_next (struct GNUNET_PSYC_Channel *ch); +/** + * Request data from client to transmit. + * + * @param mst Master handle. + */ static void master_transmit_data (struct GNUNET_PSYC_Master *mst) { @@ -268,12 +283,13 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) default: mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; data_size = 0; - LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); + LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); } if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) { /* Transmission paused, nothing to send. */ + ch->tmit_paused = GNUNET_YES; GNUNET_free (op); } else @@ -281,7 +297,8 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) GNUNET_assert (data_size <= ch->tmit_buf_avail); pdata->header.size = htons (sizeof (*pdata) + data_size); pdata->status = htons (mst->tmit->status); - GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); + ch->tmit_ack_pending = GNUNET_YES; transmit_next (ch); } } @@ -305,7 +322,6 @@ message_handler (void *cls, struct CountersResult *cres; struct TransmitAck *tack; - if (NULL == msg) { reschedule_connect (ch); @@ -317,7 +333,8 @@ message_handler (void *cls, uint16_t type = ntohs (msg->type); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d from PSYC service\n", type); + "Received message of type %d and size %u from PSYC service\n", + type, size); switch (type) { @@ -328,10 +345,16 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: size_eq = sizeof (struct TransmitAck); break; + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + size_min = sizeof (struct GNUNET_PSYC_MessageMethod); + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + size_min = sizeof (struct GNUNET_PSYC_MessageModifier); + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: + size_min = sizeof (struct GNUNET_PSYC_MessageData); } if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size >= size_min))) + || (0 < size_min && size_min <= size))) { GNUNET_break (0); reschedule_connect (ch); @@ -370,7 +393,9 @@ message_handler (void *cls, else { ch->tmit_buf_avail = ntohs (tack->buf_avail); - master_transmit_data (mst); + ch->tmit_ack_pending = GNUNET_NO; + if (GNUNET_NO == ch->tmit_paused) + master_transmit_data (mst); } } else @@ -378,6 +403,18 @@ message_handler (void *cls, /* TODO: slave */ } break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: + + break; } GNUNET_CLIENT_receive (ch->client, &message_handler, ch, @@ -397,9 +434,9 @@ static size_t send_next_message (void *cls, size_t size, void *buf) { struct GNUNET_PSYC_Channel *ch = cls; - struct OperationHandle *op = ch->transmit_head; + struct OperationHandle *op = ch->tmit_head; size_t ret; - + LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); ch->th = NULL; if (NULL == op->msg) return 0; @@ -409,15 +446,12 @@ send_next_message (void *cls, size_t size, void *buf) reschedule_connect (ch); return 0; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYC service\n", - ntohs (op->msg->type)); memcpy (buf, op->msg, ret); - GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); GNUNET_free (op); - if (NULL != ch->transmit_head) + if (NULL != ch->tmit_head) transmit_next (ch); if (GNUNET_NO == ch->in_receive) @@ -438,10 +472,11 @@ send_next_message (void *cls, size_t size, void *buf) static void transmit_next (struct GNUNET_PSYC_Channel *ch) { + LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); if (NULL != ch->th || NULL == ch->client) return; - struct OperationHandle *op = ch->transmit_head; + struct OperationHandle *op = ch->tmit_head; if (NULL == op) return; @@ -472,14 +507,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); GNUNET_assert (NULL != ch->client); - if (NULL == ch->transmit_head || - ch->transmit_head->msg->type != ch->reconnect_msg->type) + if (NULL == ch->tmit_head || + ch->tmit_head->msg->type != ch->reconnect_msg->type) { uint16_t reconn_size = ntohs (ch->reconnect_msg->size); struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); memcpy (&op[1], ch->reconnect_msg, reconn_size); op->msg = (struct GNUNET_MessageHeader *) &op[1]; - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op); } transmit_next (ch); } @@ -496,7 +531,7 @@ disconnect (void *c) struct GNUNET_PSYC_Channel *ch = c; GNUNET_assert (NULL != ch); - if (ch->transmit_head != ch->transmit_tail) + if (ch->tmit_head != ch->tmit_tail) { LOG (GNUNET_ERROR_TYPE_ERROR, "Disconnecting while there are still outstanding messages!\n"); @@ -654,7 +689,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) memcpy (&pmod[1], mod->name, name_size); memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); - GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); return GNUNET_YES; } @@ -699,7 +734,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); memcpy (&pmeth[1], method_name, size); - GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); GNUNET_ENV_environment_iterate (env, send_modifier, master); transmit_next (ch); @@ -720,7 +755,12 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, void GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) { - master_transmit_data (th->master); + struct GNUNET_PSYC_Channel *ch = &th->master->ch; + if (GNUNET_NO == ch->tmit_ack_pending) + { + ch->tmit_paused = GNUNET_NO; + master_transmit_data (th->master); + } } @@ -938,8 +978,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, slvadd->header.size = htons (sizeof (*slvadd)); slvadd->announced_at = GNUNET_htonll (announced_at); slvadd->effective_since = GNUNET_htonll (effective_since); - GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, - channel->transmit_tail, + GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, + channel->tmit_tail, op); transmit_next (channel); } @@ -979,8 +1019,8 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; slvrm->header.size = htons (sizeof (*slvrm)); slvrm->announced_at = GNUNET_htonll (announced_at); - GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, - channel->transmit_tail, + GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, + channel->tmit_tail, op); transmit_next (channel); } diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c @@ -144,10 +144,12 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, return GNUNET_OK; } + struct TransmitClosure { struct GNUNET_PSYC_MasterTransmitHandle *handle; uint8_t n; + uint8_t paused; uint8_t fragment_count; char *fragments[16]; uint16_t fragment_sizes[16]; @@ -157,8 +159,9 @@ struct TransmitClosure static void transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); struct TransmitClosure *tmit = cls; + tmit->paused = GNUNET_NO; GNUNET_PSYC_master_transmit_resume (tmit->handle); } @@ -167,33 +170,36 @@ static int transmit_notify (void *cls, size_t *data_size, void *data) { struct TransmitClosure *tmit = cls; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Transmit notify: %lu bytes\n", *data_size); - - if (tmit->fragment_count <= tmit->n) - return GNUNET_YES; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmit notify: %lu bytes available, " + "processing fragment %u/%u.\n", + *data_size, tmit->n + 1, tmit->fragment_count); GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); - *data_size = tmit->fragment_sizes[tmit->n]; - memcpy (data, tmit->fragments[tmit->n], *data_size); - tmit->n++; - - if (tmit->n == tmit->fragment_count - 1) + if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) { /* Send last fragment later. */ - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, - tmit); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); + tmit->paused = GNUNET_YES; + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 3), + &transmit_resume, tmit); *data_size = 0; return GNUNET_NO; } - return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; + + GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); + *data_size = tmit->fragment_sizes[tmit->n]; + memcpy (data, tmit->fragments[tmit->n], *data_size); + + return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; } void master_started (void *cls, uint64_t max_message_id) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Master started: %lu\n", max_message_id); struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, @@ -202,11 +208,13 @@ master_started (void *cls, uint64_t max_message_id) "_foo_bar", "foo bar baz", 11); struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); - tmit->fragment_count = 2; - tmit->fragments[0] = "foo bar"; - tmit->fragment_sizes[0] = 7; - tmit->fragments[1] = "baz!"; - tmit->fragment_sizes[1] = 4; + tmit->fragment_count = 3; + tmit->fragments[0] = "foo"; + tmit->fragment_sizes[0] = 4; + tmit->fragments[1] = "foo bar"; + tmit->fragment_sizes[1] = 7; + tmit->fragments[2] = "foo bar baz"; + tmit->fragment_sizes[2] = 11; tmit->handle = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);