gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit 7bec38c1bf3572bd01ddd064f69d1b744f7725a8
parent 303d6a97bc552a337c992944c3151ea53c1f74dc
Author: Gabor X Toth <*@tg-x.net>
Date:   Wed, 25 Sep 2013 17:46:06 +0000

psyc service: start/stop, join/part, message transmission: lib -> psyc -> mcast; psyc API: stop/resume transmission

Diffstat:
Msrc/include/gnunet_protocols.h | 90++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Msrc/include/gnunet_psyc_service.h | 183++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
Msrc/psyc/Makefile.am | 10++++++++--
Msrc/psyc/gnunet-service-psyc.c | 374++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Asrc/psyc/psyc.conf | 7+++++++
Msrc/psyc/psyc.h | 177++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/psyc/psyc_api.c | 620++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Msrc/psyc/test_psyc.c | 74++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
8 files changed, 1413 insertions(+), 122 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h @@ -29,7 +29,7 @@ /******************************************************************************* * TODO: we need a way to register message types centrally (via some webpage). * For now: unofficial extensions should start at 48k, internal extensions - * define here should leave some room (4-10 additional messages to the previous + * defined here should leave some room (4-10 additional messages to the previous * extension). ******************************************************************************/ @@ -1985,9 +1985,9 @@ extern "C" #define GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT 655 -#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER 656 +#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET 656 -#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE 657 +/* 657 */ #define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY 658 @@ -2008,14 +2008,88 @@ extern "C" #define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT 665 -#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER 666 +#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS 666 -#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE 667 +#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE 667 + + +/******************************************************************************* + * PSYC message types + ******************************************************************************/ + +#define GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE 680 + + +#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_START 681 + +#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK 682 + +#define GNUNET_MESSAGE_TYPE_PSYC_MASTER_STOP 683 + + +#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN 684 + +#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK 685 + +#define GNUNET_MESSAGE_TYPE_PSYC_SLAVE_PART 686 + + +#define GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST 687 + +#define GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION 688 + + +#define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD 689 + +#define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690 + + +#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD 691 + +#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER 692 + +#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MOD_CONT 693 + +#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA 694 + +#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695 + + +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 696 + +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 697 + +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 698 + +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 699 + +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 700 + + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_METHOD 702 + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MODIFIER 703 + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MOD_CONT 704 + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_DATA 705 + +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_ACK 706 + + +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 707 + +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 708 + +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 709 + +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 710 -#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE 668 /** - * Next available: 680 + * Next available: 730 */ @@ -2029,7 +2103,7 @@ extern "C" /** * Multicast message from the origin to all members. */ -#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 700 +#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 730 /** * A unicast message from a group member to the origin. diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h @@ -184,6 +184,96 @@ enum GNUNET_PSYC_MessageFlags }; +/** + * M + */ +struct GNUNET_PSYC_MessageMethod +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Number of modifiers in the message. + */ + uint32_t mod_count GNUNET_PACKED; + + /** + * OR'ed GNUNET_PSYC_MasterTransmitFlags + */ + uint32_t flags GNUNET_PACKED; + + /** + * Sending slave's public key. NULL if the message is from the master, or when + * transmitting a message. + */ + struct GNUNET_CRYPTO_EccPublicSignKey slave_key; + + /* Followed by NUL-terminated method name. */ +}; + + +struct GNUNET_PSYC_MessageModifier +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER + */ + struct GNUNET_MessageHeader header; + + /** + * Size of value. + */ + uint32_t value_size GNUNET_PACKED; + + /** + * Size of name, including NUL terminator. + */ + uint16_t name_size GNUNET_PACKED; + + /** + * enum GNUNET_ENV_Operator + */ + uint8_t oper; + + /* Followed by NUL-terminated name, then the value. */ +}; + + +enum GNUNET_PSYC_DataStatus +{ + /** + * To be continued. + */ + GNUNET_PSYC_DATA_CONT = 0, + + /** + * Reached the end of message. + */ + GNUNET_PSYC_DATA_END = 1, + + /** + * Cancelled before the end. + */ + GNUNET_PSYC_DATA_CANCEL = 2 +}; + + +struct GNUNET_PSYC_MessageData +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER + */ + struct GNUNET_MessageHeader header; + + /** + * enum GNUNET_PSYC_DataStatus + */ + uint8_t status; +}; + /** * Handle that identifies a join request. * @@ -194,14 +284,14 @@ struct GNUNET_PSYC_JoinHandle; /** - * Method called from PSYC upon receiving a message indicating a call - * to a @e method. + * Method called from PSYC upon receiving a message indicating a call to a + * @e method. * * @param cls Closure. * @param slave_key Who transmitted the message. * - NULL for multicast messages from the master. - * - The hash of the sending slave's public key for unicast requests from - * one of the slaves to the master. + * - The sending slave's public key for unicast requests from one of the + * slaves to the master. * @param message_id Unique message counter for this message. * Unique only in combination with the given sender for this channel. * @param method_name Method name from PSYC. @@ -241,7 +331,8 @@ typedef int */ typedef int (*GNUNET_PSYC_JoinCallback) (void *cls, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + const struct GNUNET_CRYPTO_EccPublicSignKey + *slave_key, const char *method_name, size_t variable_count, const struct GNUNET_ENV_Modifier *variables, @@ -268,7 +359,8 @@ typedef int * multicast tree. Note that it is unnecessary to specify our own * peer identity in this array. * @param method_name Method name for the message transmitted with the response. - * @param env Environment containing transient variables for the message, or NULL. + * @param env Environment containing transient variables for the message, + * or NULL. * @param data Data of the message. * @param data_size Size of @a data. */ @@ -290,6 +382,16 @@ struct GNUNET_PSYC_Master; /** + * Function called after the channel master started. + * + * @param cls Closure. + * @param last_message_id Last message ID sent to the channel. + */ +typedef void +(*GNUNET_PSYC_MasterStartCallback) (void *cls, uint64_t max_message_id); + + +/** * Start a PSYC master channel. * * Will start a multicast group identified by the given ECC key. Messages @@ -313,6 +415,7 @@ struct GNUNET_PSYC_Master; * Used to automate join decisions. * @param method Function to invoke on messages received from slaves. * @param join_cb Function to invoke when a peer wants to join. + * @param start_cb Function to invoke after the channel master started. * @param cls Closure for @a method and @a join_cb. * @return Handle for the channel master, NULL on error. */ @@ -322,6 +425,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_PSYC_Policy policy, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, + GNUNET_PSYC_MasterStartCallback start_cb, void *cls); @@ -339,8 +443,10 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param[out] data Where to write the body of the message to give to the * method. The function must copy at most @a data_size bytes to @a data. * @return #GNUNET_SYSERR on error (fatal, aborts transmission) - * #GNUNET_NO on success, if more data is to be transmitted later - * (should be used if @a data_size was not big enough to take all the data) + * #GNUNET_NO on success, if more data is to be transmitted later. + * Should be used if @a data_size was not big enough to take all the + * data. If 0 is returned in @a data_size the transmission is paused, + * and can be resumed with GNUNET_PSYC_master_transmit_resume(). * #GNUNET_YES if this completes the transmission (all data supplied) */ typedef int @@ -403,6 +509,15 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, /** + * Resume transmission to the channel. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th); + + +/** * Abort transmission request to channel. * * @param th Handle of the request that is being aborted. @@ -427,10 +542,20 @@ struct GNUNET_PSYC_Slave; /** + * Function called after the slave joined. + * + * @param cls Closure. + * @param max_message_id Last message ID sent to the channel. + */ +typedef void +(*GNUNET_PSYC_SlaveJoinCallback) (void *cls, uint64_t max_message_id); + + +/** * Join a PSYC channel. * * The entity joining is always the local peer. The user must immediately use - * the GNUNET_PSYC_slave_to_master() functions to transmit a @e join_msg to the + * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the * channel; if the join request succeeds, the channel state (and @e recent * method calls) will be replayed to the joining member. There is no explicit * notification on failure (as the channel may simply take days to approve, @@ -464,6 +589,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, + GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, void *cls, const char *method_name, const struct GNUNET_ENV_Environment *env, @@ -475,7 +601,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * Part a PSYC channel. * * Will terminate the connection to the PSYC service. Polite clients should - * first explicitly send a @e part request (via GNUNET_PSYC_slave_to_master()). + * first explicitly send a @e part request (via GNUNET_PSYC_slave_transmit()). * * @param slave Slave handle. */ @@ -484,15 +610,16 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave); /** - * Function called to provide data for a transmission to the channel - * master (aka the @e host of the channel). + * Function called to provide data for a transmission to the channel master + * (a.k.a. the @e host of the channel). * * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO) * invalidates the respective transmission handle. * * @param cls Closure. - * @param[in,out] data_size Initially set to the number of bytes available in @a data, - * should be set to the number of bytes written to data (IN/OUT). + * @param[in,out] data_size Initially set to the number of bytes available in + * @a data, should be set to the number of bytes written to data + * (IN/OUT). * @param[out] data Where to write the body of the message to give to the method; * function must copy at most @a *data_size bytes to @a data. * @return #GNUNET_SYSERR on error (fatal, aborts transmission). @@ -541,6 +668,15 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, /** + * Resume transmission to the master. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th); + + +/** * Abort transmission request to master. * * @param th Handle of the request that is being aborted. @@ -556,7 +692,8 @@ struct GNUNET_PSYC_Channel; /** - * Convert a channel @a master to a @e channel handle to access the @e channel APIs. + * Convert a channel @a master to a @e channel handle to access the @e channel + * APIs. * * @param master Channel master handle. * @return Channel handle, valid for as long as @a master is valid. @@ -598,7 +735,8 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave); */ void GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + const struct GNUNET_CRYPTO_EccPublicSignKey + *slave_key, uint64_t announced_at, uint64_t effective_since); @@ -626,7 +764,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, */ void GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + const struct GNUNET_CRYPTO_EccPublicSignKey + *slave_key, uint64_t announced_at); @@ -702,6 +841,10 @@ GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, void GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story); + +/** + * Handle for a state query operation. + */ struct GNUNET_PSYC_StateQuery; @@ -730,9 +873,9 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, /** * Return all channel state variables whose name matches a given prefix. * - * A name matches if it starts with the given @a name_prefix, thus requesting the - * empty prefix ("") will match all values; requesting "_a_b" will also return - * values stored under "_a_b_c". + * A name matches if it starts with the given @a name_prefix, thus requesting + * the empty prefix ("") will match all values; requesting "_a_b" will also + * return values stored under "_a_b_c". * * The @a state_cb is invoked on all matching state variables asynchronously, as * the state is stored in and retrieved from the PSYCstore, diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am @@ -24,12 +24,14 @@ libgnunetpsyc_la_SOURCES = \ psyc.h libgnunetpsyc_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/env/libgnunetenv.la \ $(GN_LIBINTL) $(XLIB) libgnunetpsyc_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) $(WINFLAGS) \ -version-info 0:0:0 libgnunetpsyc_la_DEPENDENCIES = \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/env/libgnunetenv.la bin_PROGRAMS = @@ -41,10 +43,14 @@ gnunet_service_psyc_SOURCES = \ gnunet_service_psyc_LDADD = \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/multicast/libgnunetmulticast.la \ + $(top_builddir)/src/psycstore/libgnunetpsycstore.la \ $(GN_LIBINTL) gnunet_service_psyc_DEPENDENCIES = \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/multicast/libgnunetmulticast.la \ + $(top_builddir)/src/psycstore/libgnunetpsycstore.la if HAVE_TESTING diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c @@ -29,9 +29,13 @@ #include "gnunet_constants.h" #include "gnunet_protocols.h" #include "gnunet_statistics_service.h" +#include "gnunet_multicast_service.h" +#include "gnunet_psycstore_service.h" #include "gnunet_psyc_service.h" #include "psyc.h" +#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) + /** * Handle to our current configuration. @@ -48,6 +52,83 @@ static struct GNUNET_STATISTICS_Handle *stats; */ static struct GNUNET_SERVER_NotificationContext *nc; +/** + * Handle to the PSYCstore. + */ +static struct GNUNET_PSYCSTORE_Handle *store; + +/** + * Message in the transmission queue. + */ +struct TransmitMessage +{ + struct TransmitMessage *prev; + struct TransmitMessage *next; + + char *buf; + uint16_t size; + uint8_t status; +}; + +/** + * Common part of the client context for both a master and slave channel. + */ +struct Channel +{ + struct GNUNET_SERVER_Client *client; + + struct TransmitMessage *tmit_head; + struct TransmitMessage *tmit_tail; + + char *tmit_buf; + uint32_t tmit_mod_count; + uint32_t tmit_mod_recvd; + uint16_t tmit_size; + uint8_t tmit_status; + + uint8_t in_transmit; + uint8_t is_master; +}; + +/** + * Client context for a channel master. + */ +struct Master +{ + struct Channel channel; + struct GNUNET_CRYPTO_EccPrivateKey private_key; + struct GNUNET_CRYPTO_EccPublicSignKey public_key; + + struct GNUNET_MULTICAST_Origin *origin; + struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; + + uint64_t max_message_id; + uint64_t max_state_message_id; + uint64_t max_group_generation; + + /** + * enum GNUNET_PSYC_Policy + */ + uint32_t policy; +}; + + +/** + * Client context for a channel slave. + */ +struct Slave +{ + struct Channel channel; + struct GNUNET_CRYPTO_EccPrivateKey slave_key; + struct GNUNET_CRYPTO_EccPublicSignKey channel_key; + + struct GNUNET_MULTICAST_Member *member; + struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; + + uint64_t max_message_id; + uint64_t max_request_id; +}; + /** * Task run during shutdown. @@ -70,6 +151,279 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } } +/** + * Called whenever a client is disconnected. + * Frees our resources associated with that client. + * + * @param cls Closure. + * @param client Identification of the client. + */ +static void +client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + if (NULL == client) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); + + struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, + struct Channel); + GNUNET_assert (NULL != ch); + + if (NULL != ch->tmit_buf) + { + GNUNET_free (ch->tmit_buf); + ch->tmit_buf = NULL; + } + GNUNET_free (ch); +} + + +void +counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id, + uint64_t max_group_generation, uint64_t max_state_message_id) +{ + struct Channel *ch = cls; + struct CountersResult *res = GNUNET_malloc (sizeof (*res)); + res->header.size = htons (sizeof (*res)); + res->max_message_id = GNUNET_htonll (max_message_id); + + if (ch->is_master) + { + struct Master *mst = cls; + mst->max_message_id = max_message_id; + mst->max_state_message_id = max_state_message_id; + mst->max_group_generation = max_group_generation; + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + } + else + { + struct Slave *slv = cls; + slv->max_message_id = max_message_id; + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + } + + GNUNET_SERVER_notification_context_add (nc, ch->client); + GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, + GNUNET_NO); + GNUNET_free (res); +} + + +static void +handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct MasterStartRequest *req + = (const struct MasterStartRequest *) msg; + struct Master *mst = GNUNET_new (struct Master); + mst->channel.client = client; + mst->channel.is_master = GNUNET_YES; + mst->policy = ntohl (req->policy); + mst->private_key = req->channel_key; + GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key, + &mst->public_key); + + GNUNET_PSYCSTORE_counters_get (store, &mst->public_key, + counters_cb, mst); + + GNUNET_SERVER_client_set_user_context (client, mst); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +static void +handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct SlaveJoinRequest *req + = (const struct SlaveJoinRequest *) msg; + struct Slave *slv = GNUNET_new (struct Slave); + slv->channel.client = client; + slv->channel.is_master = GNUNET_NO; + slv->channel_key = req->channel_key; + slv->slave_key = req->slave_key; + + GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key, + counters_cb, slv); + + GNUNET_SERVER_client_set_user_context (client, slv); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +static void +send_transmit_ack (struct Channel *ch) +{ + struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); + res->header.size = htons (sizeof (*res)); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); + res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size); + + GNUNET_SERVER_notification_context_add (nc, ch->client); + GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, + GNUNET_NO); + GNUNET_free (res); +} + + +static int +transmit_notify (void *cls, size_t *data_size, void *data) +{ + struct Channel *ch = cls; + struct TransmitMessage *msg = ch->tmit_head; + + if (NULL == msg || *data_size < msg->size) + { + *data_size = 0; + return GNUNET_NO; + } + + memcpy (data, msg->buf, msg->size); + *data_size = msg->size; + + GNUNET_free (ch->tmit_buf); + GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); + + return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; +} + + +static int +master_transmit_message (struct Master *mst) +{ + if (NULL == mst->tmit_handle) + { + mst->tmit_handle + = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, + mst->max_group_generation, + transmit_notify, mst); + } + else + { + GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); + } + return GNUNET_OK; +} + + +static int +slave_transmit_message (struct Slave *slv) +{ + if (NULL == slv->tmit_handle) + { + slv->tmit_handle + = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id, + transmit_notify, slv); + } + else + { + GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); + } + return GNUNET_OK; +} + + +static int +buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) +{ + uint16_t size = ntohs (msg->size); + + if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) + return GNUNET_SYSERR; + + if (0 == ch->tmit_size) + { + ch->tmit_buf = GNUNET_malloc (size); + memcpy (ch->tmit_buf, msg, size); + ch->tmit_size = size; + } + else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size) + { + ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size); + memcpy (ch->tmit_buf + ch->tmit_size, msg, size); + ch->tmit_size += size; + } + + if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE + < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData)) + { + struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage); + tmit_msg->buf = (char *) msg; + tmit_msg->size = size; + tmit_msg->status = ch->tmit_status; + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); + + ch->is_master + ? master_transmit_message ((struct Master *) ch) + : slave_transmit_message ((struct Slave *) ch); + } + + return GNUNET_OK; +} + +static void +handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct GNUNET_PSYC_MessageMethod *meth + = (const struct GNUNET_PSYC_MessageMethod *) msg; + struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, + struct Channel); + GNUNET_assert (NULL != ch); + + if (GNUNET_NO != ch->in_transmit) + { + // FIXME: already transmitting a message, send back error message. + return; + } + + ch->tmit_buf = NULL; + ch->tmit_size = 0; + ch->tmit_mod_recvd = 0; + ch->tmit_mod_count = ntohl (meth->mod_count); + ch->tmit_status = GNUNET_PSYC_DATA_CONT; + + buffer_message (ch, msg); + + if (0 == ch->tmit_mod_count) + send_transmit_ack (ch); +}; + + +static void +handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct GNUNET_PSYC_MessageModifier *mod + = (const struct GNUNET_PSYC_MessageModifier *) msg; + struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, + struct Channel); + GNUNET_assert (NULL != ch); + + ch->tmit_mod_recvd++; + buffer_message (ch, msg); + + if (ch->tmit_mod_recvd == ch->tmit_mod_count) + send_transmit_ack (ch); +}; + + +static void +handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct GNUNET_PSYC_MessageData *data + = (const struct GNUNET_PSYC_MessageData *) msg; + struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, + struct Channel); + GNUNET_assert (NULL != ch); + + ch->tmit_status = data->status; + buffer_message (ch, msg); + send_transmit_ack (ch); +}; + /** * Initialize the PSYC service. @@ -83,14 +437,30 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { static const struct GNUNET_SERVER_MessageHandler handlers[] = { + { &handle_master_start, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, + + { &handle_slave_join, NULL, + GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, + + { &handle_transmit_method, NULL, + GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 }, + + { &handle_transmit_modifier, NULL, + GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 }, + + { &handle_transmit_data, NULL, + GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 }, + { NULL, NULL, 0, 0 } }; cfg = c; - + store = GNUNET_PSYCSTORE_connect (cfg); stats = GNUNET_STATISTICS_create ("psyc", cfg); - GNUNET_SERVER_add_handlers (server, handlers); nc = GNUNET_SERVER_notification_context_create (server, 1); + GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); } diff --git a/src/psyc/psyc.conf b/src/psyc/psyc.conf @@ -0,0 +1,7 @@ +[psyc] +AUTOSTART = YES +HOME = $SERVICEHOME +BINARY = gnunet-service-psyc +UNIXPATH = /tmp/gnunet-service-psyc.sock +UNIX_MATCH_UID = NO +UNIX_MATCH_GID = YES diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h @@ -24,13 +24,186 @@ * @author Gabor X Toth */ -#ifndef GNUNET_PSYC_H -#define GNUNET_PSYC_H +#ifndef PSYC_H +#define PSYC_H #include "gnunet_common.h" GNUNET_NETWORK_STRUCT_BEGIN +/**** service -> library ****/ + +/** + * Answer from service to client about last operation. + */ +struct OperationResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE + */ + struct GNUNET_MessageHeader header; + + /** + * Operation ID. + */ + uint32_t op_id GNUNET_PACKED; + + /** + * Status code for the operation. + */ + int64_t result_code GNUNET_PACKED; + + /* followed by 0-terminated error message (on error) */ + +}; + + +struct CountersResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS + */ + struct GNUNET_MessageHeader header; + + uint64_t max_message_id; +}; + + +/** + * Transmit acknowledgment. + * + * Sent after the last GNUNET_PSYC_MessageModifier and after each + * GNUNET_PSYC_MessageData. + * + * This message acknowledges previously received messages and asks for the next + * fragment of data. + */ +struct TransmitAck +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK + */ + struct GNUNET_MessageHeader header; + + /** + * Buffer space available for the next data fragment. + */ + uint16_t buf_avail; +}; + + +/**** library -> service ****/ + + +struct MasterStartRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_START + */ + struct GNUNET_MessageHeader header; + + struct GNUNET_CRYPTO_EccPrivateKey channel_key; + + uint32_t policy GNUNET_PACKED; +}; + + +struct SlaveJoinRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN + */ + struct GNUNET_MessageHeader header; + + uint32_t relay_count GNUNET_PACKED; + + struct GNUNET_CRYPTO_EccPublicSignKey channel_key; + + struct GNUNET_CRYPTO_EccPrivateKey slave_key; + + /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ +}; + + +struct ChannelSlaveAdd +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved; + + struct GNUNET_CRYPTO_EccPublicSignKey *slave_key; + + uint64_t announced_at; + + uint64_t effective_since; +}; + + +struct ChannelSlaveRemove +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved; + + struct GNUNET_CRYPTO_EccPublicSignKey *slave_key; + + uint64_t announced_at; +}; + + +struct StoryRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST + */ + struct GNUNET_MessageHeader header; + + uint64_t op_id; + + uint64_t start_message_id; + + uint64_t end_message_id; +}; + + +struct StateQuery +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_QUERY + */ + struct GNUNET_MessageHeader header; + + uint64_t op_id; + + /* Followed by NUL-terminated name. */ +}; + + +struct StateResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Size of name, including NUL terminator. + */ + uint16_t name_size GNUNET_PACKED; + + /** + * OR'd StateOpFlags + */ + uint8_t flags; + + /* Followed by NUL-terminated name, then the value. */ +}; GNUNET_NETWORK_STRUCT_END diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c @@ -36,15 +36,76 @@ #include "gnunet_psyc_service.h" #include "psyc.h" +#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) + + +struct OperationHandle +{ + struct OperationHandle *prev; + struct OperationHandle *next; + const struct GNUNET_MessageHeader *msg; +}; + /** - * Handle that identifies a join request. - * - * Used to match calls to #GNUNET_PSYC_JoinCallback to the - * corresponding calls to GNUNET_PSYC_join_decision(). + * Handle to access PSYC channel operations for both the master and slaves. */ -struct GNUNET_PSYC_JoinHandle +struct GNUNET_PSYC_Channel { - + /** + * Configuration to use. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Socket (if available). + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Currently pending transmission request, or NULL for none. + */ + struct GNUNET_CLIENT_TransmitHandle *th; + + /** + * Head of operations to transmit. + */ + struct OperationHandle *transmit_head; + + /** + * Tail of operations to transmit. + */ + struct OperationHandle *transmit_tail; + + /** + * Message to send on reconnect. + */ + struct GNUNET_MessageHeader *reconnect_msg; + + /** + * Task doing exponential back-off trying to reconnect. + */ + GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + + /** + * Time for next connect retry. + */ + struct GNUNET_TIME_Relative reconnect_delay; + + GNUNET_PSYC_Method method_cb; + + GNUNET_PSYC_JoinCallback join_cb; + + void *cb_cls; + + /** + * Are we polling for incoming messages right now? + */ + int in_receive; + + /** + * Are we currently transmitting a message? + */ + int in_transmit; }; @@ -53,23 +114,30 @@ struct GNUNET_PSYC_JoinHandle */ struct GNUNET_PSYC_Master { + struct GNUNET_PSYC_Channel ch; + + GNUNET_PSYC_MasterStartCallback start_cb; + uint64_t max_message_id; }; /** - * Handle for a pending PSYC transmission operation. + * Handle for a PSYC channel slave. */ -struct GNUNET_PSYC_MasterTransmitHandle +struct GNUNET_PSYC_Slave { - + struct GNUNET_PSYC_Channel ch; }; /** - * Handle for a PSYC channel slave. + * Handle that identifies a join request. + * + * Used to match calls to #GNUNET_PSYC_JoinCallback to the + * corresponding calls to GNUNET_PSYC_join_decision(). */ -struct GNUNET_PSYC_Slave +struct GNUNET_PSYC_JoinHandle { }; @@ -78,16 +146,20 @@ struct GNUNET_PSYC_Slave /** * Handle for a pending PSYC transmission operation. */ -struct GNUNET_PSYC_SlaveTransmitHandle +struct GNUNET_PSYC_MasterTransmitHandle { - + struct GNUNET_PSYC_Master *master; + const struct GNUNET_ENV_Environment *env; + GNUNET_PSYC_MasterTransmitNotify notify; + void *notify_cls; + enum GNUNET_PSYC_MasterTransmitFlags flags; }; /** - * Handle to access PSYC channel operations for both the master and slaves. + * Handle for a pending PSYC transmission operation. */ -struct GNUNET_PSYC_Channel +struct GNUNET_PSYC_SlaveTransmitHandle { }; @@ -102,45 +174,264 @@ struct GNUNET_PSYC_Story }; +/** + * Handle for a state query operation. + */ struct GNUNET_PSYC_StateQuery { }; -/** - * Function to call with the decision made for a join request. +/** + * Try again to connect to the PSYCstore service. * - * Must be called once and only once in response to an invocation of the - * #GNUNET_PSYC_JoinCallback. + * @param cls handle to the PSYCstore service. + * @param tc scheduler context + */ +static void +reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Reschedule a connect attempt to the service. * - * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved. - * @param relay_count Number of relays given. - * @param relays Array of suggested peers that might be useful relays to use - * when joining the multicast group (essentially a list of peers that - * are already part of the multicast group and might thus be willing - * to help with routing). If empty, only this local peer (which must - * be the multicast origin) is a good candidate for building the - * multicast tree. Note that it is unnecessary to specify our own - * peer identity in this array. - * @param method_name Method name for the message transmitted with the response. - * @param env Environment containing transient variables for the message, or NULL. - * @param data Data of the message. - * @param data_size Size of @a data. + * @param h transport service to reconnect */ -void -GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, - int is_admitted, - unsigned int relay_count, - const struct GNUNET_PeerIdentity *relays, - const char *method_name, - const struct GNUNET_ENV_Environment *env, - const void *data, - size_t data_size) +static void +reschedule_connect (struct GNUNET_PSYC_Channel *c) { + GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + + if (NULL != c->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (c->th); + c->th = NULL; + } + if (NULL != c->client) + { + GNUNET_CLIENT_disconnect (c->client); + c->client = NULL; + } + c->in_receive = GNUNET_NO; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling task to reconnect to PSYCstore service in %s.\n", + GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); + c->reconnect_task = + GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); + c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay); +} + + +/** + * Schedule transmission of the next message from our queue. + * + * @param h PSYCstore handle + */ +static void +transmit_next (struct GNUNET_PSYC_Channel *c); + + +/** + * Type of a function to call when we receive a message + * from the service. + * + * @param cls closure + * @param msg message received, NULL on timeout or fatal error + */ +static void +message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PSYC_Channel *ch = cls; + struct GNUNET_PSYC_Master *mst = cls; + struct GNUNET_PSYC_Slave *slv = cls; + + if (NULL == msg) + { + reschedule_connect (ch); + return; + } + uint16_t size_eq = 0; + uint16_t size_min = 0; + const uint16_t size = ntohs (msg->size); + const uint16_t type = ntohs (msg->type); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %d from PSYC service\n", type); + + switch (type) + { + case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: + case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: + size_eq = sizeof (struct CountersResult); + break; + } + + if (! ((0 < size_eq && size == size_eq) + || (0 < size_min && size >= size_min))) + { + GNUNET_break (0); + reschedule_connect (ch); + return; + } + + struct CountersResult *cres; + + switch (type) + { + case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: + cres = (struct CountersResult *) msg; + mst->max_message_id = GNUNET_ntohll (cres->max_message_id); + if (NULL != mst->start_cb) + mst->start_cb (ch->cb_cls, mst->max_message_id); + break; + + case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: + cres = (struct CountersResult *) msg; +#if TODO + slv->max_message_id = GNUNET_ntohll (cres->max_message_id); + if (NULL != slv->join_ack_cb) + mst->join_ack_cb (ch->cb_cls, mst->max_message_id); +#endif + break; + } + + GNUNET_CLIENT_receive (ch->client, &message_handler, ch, + GNUNET_TIME_UNIT_FOREVER_REL); +} + +/** + * Transmit next message to service. + * + * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. + * @param size Number of bytes available in buf. + * @param buf Where to copy the message. + * @return Number of bytes copied to buf. + */ +static size_t +send_next_message (void *cls, size_t size, void *buf) +{ + struct GNUNET_PSYC_Channel *ch = cls; + struct OperationHandle *op = ch->transmit_head; + size_t ret; + + ch->th = NULL; + if (NULL == op->msg) + return 0; + ret = ntohs (op->msg->size); + if (ret > size) + { + reschedule_connect (ch); + return 0; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending message of type %d to PSYCstore service\n", + ntohs (op->msg->type)); + memcpy (buf, op->msg, ret); + + GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); + GNUNET_free (op); + + if (NULL != ch->transmit_head) + transmit_next (ch); + + if (GNUNET_NO == ch->in_receive) + { + ch->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (ch->client, &message_handler, ch, + GNUNET_TIME_UNIT_FOREVER_REL); + } + return ret; +} + + +/** + * Schedule transmission of the next message from our queue. + * + * @param h PSYCstore handle. + */ +static void +transmit_next (struct GNUNET_PSYC_Channel *ch) +{ + if (NULL != ch->th || NULL == ch->client) + return; + + struct OperationHandle *op = ch->transmit_head; + if (NULL == op) + return; + + ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, + ntohs (op->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, + &send_next_message, + ch); +} + + +/** + * Try again to connect to the PSYC service. + * + * @param cls Channel handle. + * @param tc Scheduler context. + */ +static void +reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_PSYC_Channel *ch = cls; + + ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to PSYC service.\n"); + GNUNET_assert (NULL == ch->client); + ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); + GNUNET_assert (NULL != ch->client); + + if (NULL == ch->transmit_head || + ch->transmit_head->msg->type != ch->reconnect_msg->type) + { + struct OperationHandle *op + = GNUNET_malloc (sizeof (struct OperationHandle) + + ntohs (ch->reconnect_msg->size)); + memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size)); + op->msg = (struct GNUNET_MessageHeader *) &op[1]; + GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + } + + transmit_next (ch); +} + + +/** + * Disconnect from the PSYC service. + * + * @param cls Channel handle. + * @param tc Scheduler context. + */ +static void +disconnect (void *c) +{ + struct GNUNET_PSYC_Channel *ch = c; + GNUNET_assert (NULL != ch); + GNUNET_assert (ch->transmit_head == ch->transmit_tail); + if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (ch->reconnect_task); + ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != ch->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); + ch->th = NULL; + } + if (NULL != ch->client) + { + GNUNET_CLIENT_disconnect (ch->client); + ch->client = NULL; + } + if (NULL != ch->reconnect_msg) + ch->reconnect_msg = NULL; } @@ -177,57 +468,172 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_PSYC_Policy policy, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, + GNUNET_PSYC_MasterStartCallback master_started_cb, void *cls) { - return NULL; + struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); + struct GNUNET_PSYC_Channel *ch = &mst->ch; + struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); + + req->header.size = htons (sizeof (*req) + sizeof (*channel_key)); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); + req->channel_key = *channel_key; + req->policy = policy; + + ch->cfg = cfg; + ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; + ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); + + ch->method_cb = method; + ch->join_cb = join_cb; + ch->cb_cls = cls; + mst->start_cb = master_started_cb; + + return mst; +} + + +/** + * Stop a PSYC master channel. + * + * @param master PSYC channel master to stop. + */ +void +GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) +{ + disconnect (mst); + GNUNET_free (mst); +} + + +/** + * Function to call with the decision made for a join request. + * + * Must be called once and only once in response to an invocation of the + * #GNUNET_PSYC_JoinCallback. + * + * @param jh Join request handle. + * @param is_admitted #GNUNET_YES if joining is approved, + * #GNUNET_NO if it is disapproved. + * @param relay_count Number of relays given. + * @param relays Array of suggested peers that might be useful relays to use + * when joining the multicast group (essentially a list of peers that + * are already part of the multicast group and might thus be willing + * to help with routing). If empty, only this local peer (which must + * be the multicast origin) is a good candidate for building the + * multicast tree. Note that it is unnecessary to specify our own + * peer identity in this array. + * @param method_name Method name for the message transmitted with the response. + * @param env Environment containing transient variables for the message, or NULL. + * @param data Data of the message. + * @param data_size Size of @a data. + */ +void +GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, + int is_admitted, + unsigned int relay_count, + const struct GNUNET_PeerIdentity *relays, + const char *method_name, + const struct GNUNET_ENV_Environment *env, + const void *data, + size_t data_size) +{ + +} + + +/* FIXME: split up value into <64K chunks and transmit the continuations in + * MOD_CONT msgs */ +int +send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) +{ + struct GNUNET_PSYC_Channel *ch = cls; + size_t name_size = strlen (mod->name) + 1; + struct GNUNET_PSYC_MessageModifier *pmod; + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod) + + name_size + mod->value_size); + pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) pmod; + + pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER; + pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); + pmod->name_size = htons (name_size); + memcpy (&pmod[1], mod->name, name_size); + memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size); + + GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + return GNUNET_YES; } /** * Send a message to call a method to all members in the PSYC channel. * - * @param master Handle to the PSYC channel. + * @param mst Handle to the PSYC channel. * @param method_name Which method should be invoked. * @param env Environment containing state operations and transient variables * for the message, or NULL. * @param notify Function to call to obtain the arguments. * @param notify_cls Closure for @a notify. * @param flags Flags for the message being transmitted. - * @return Transmission handle, NULL on error (i.e. more than one request queued). + * @return Transmission handle, NULL on error (i.e. more than one request + * queued). */ struct GNUNET_PSYC_MasterTransmitHandle * -GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, +GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, const char *method_name, const struct GNUNET_ENV_Environment *env, GNUNET_PSYC_MasterTransmitNotify notify, void *notify_cls, enum GNUNET_PSYC_MasterTransmitFlags flags) { - return NULL; + GNUNET_assert (NULL != mst); + struct GNUNET_PSYC_Channel *ch = &mst->ch; + if (GNUNET_NO != ch->in_transmit) + return NULL; + ch->in_transmit = GNUNET_YES; + + struct GNUNET_PSYC_MessageMethod *pmeth; + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth)); + pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) pmeth; + + pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD; + size_t size = strlen (method_name) + 1; + pmeth->header.size = htons (sizeof (*pmeth) + size); + pmeth->flags = htonl (flags); + pmeth->mod_count + = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); + memcpy (&pmeth[1], method_name, size); + + GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + + GNUNET_ENV_environment_iterate (env, send_modifier, mst); + + struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th)); + th->master = mst; + th->env = env; + th->notify = notify; + th->notify_cls = notify_cls; + return th; } /** - * Abort transmission request to channel. + * Abort transmission request to the channel. * * @param th Handle of the request that is being aborted. */ void GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) { + struct GNUNET_PSYC_Master *mst = th->master; + struct GNUNET_PSYC_Channel *ch = &mst->ch; + if (GNUNET_NO != ch->in_transmit) + return; -} - - -/** - * Stop a PSYC master channel. - * - * @param master PSYC channel master to stop. - */ -void -GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) -{ - + } @@ -235,7 +641,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) * Join a PSYC channel. * * The entity joining is always the local peer. The user must immediately use - * the GNUNET_PSYC_slave_to_master() functions to transmit a @e join_msg to the + * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the * channel; if the join request succeeds, the channel state (and @e recent * method calls) will be replayed to the joining member. There is no explicit * notification on failure (as the channel may simply take days to approve, @@ -269,13 +675,32 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, + GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, void *cls, const char *method_name, const struct GNUNET_ENV_Environment *env, const void *data, size_t data_size) { - return NULL; + struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); + struct GNUNET_PSYC_Channel *ch = &slv->ch; + struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req)); + + req->header.size = htons (sizeof (*req) + + sizeof (*channel_key) + sizeof (*slave_key) + + relay_count * sizeof (*relays)); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); + req->channel_key = *channel_key; + req->slave_key = *slave_key; + req->relay_count = relay_count; + memcpy (&req[1], relays, relay_count * sizeof (*relays)); + + ch->cfg = cfg; + ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; + ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); + + return slv; } @@ -283,14 +708,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * Part a PSYC channel. * * Will terminate the connection to the PSYC service. Polite clients should - * first explicitly send a @e part request (via GNUNET_PSYC_slave_to_master()). + * first explicitly send a @e part request (via GNUNET_PSYC_slave_transmit()). * - * @param slave Slave handle. + * @param slv Slave handle. */ void -GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) +GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) { - + disconnect (slv); + GNUNET_free (slv); } @@ -299,11 +725,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) * * @param slave Slave handle. * @param method_name Which (PSYC) method should be invoked (on host). - * @param env Environment containing transient variables for the message, or NULL. + * @param env Environment containing transient variables for the message, or + * NULL. * @param notify Function to call when we are allowed to transmit (to get data). * @param notify_cls Closure for @a notify. * @param flags Flags for the message being transmitted. - * @return Transmission handle, NULL on error (i.e. more than one request queued). + * @return Transmission handle, NULL on error (i.e. more than one request + * queued). */ struct GNUNET_PSYC_SlaveTransmitHandle * GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, @@ -330,7 +758,8 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) /** - * Convert a channel @a master to a @e channel handle to access the @e channel APIs. + * Convert a channel @a master to a @e channel handle to access the @e channel + * APIs. * * @param master Channel master handle. * @return Channel handle, valid for as long as @a master is valid. @@ -338,7 +767,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) struct GNUNET_PSYC_Channel * GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) { - return NULL; + return (struct GNUNET_PSYC_Channel *) master; } @@ -351,7 +780,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) struct GNUNET_PSYC_Channel * GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) { - return NULL; + return (struct GNUNET_PSYC_Channel *) slave; } @@ -371,18 +800,30 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) * correctly; not doing so correctly will result in either denying other slaves * access or offering access to channel data to non-members. * - * @param channel Channel handle. + * @param ch Channel handle. * @param slave_key Identity of channel slave to add. * @param announced_at ID of the message that announced the membership change. * @param effective_since Addition of slave is in effect since this message ID. */ void -GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, +GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch, + const struct GNUNET_CRYPTO_EccPublicSignKey + *slave_key, uint64_t announced_at, uint64_t effective_since) { - + struct ChannelSlaveAdd *slvadd; + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd)); + slvadd = (struct ChannelSlaveAdd *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) slvadd; + + slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD; + slvadd->header.size = htons (sizeof (*slvadd)); + slvadd->announced_at = GNUNET_htonll (announced_at); + slvadd->effective_since = GNUNET_htonll (effective_since); + + GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + transmit_next (ch); } @@ -403,16 +844,27 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, * denying members access or offering access to channel data to * non-members. * - * @param channel Channel handle. + * @param ch Channel handle. * @param slave_key Identity of channel slave to remove. * @param announced_at ID of the message that announced the membership change. */ void -GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, - const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, +GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch, + const struct GNUNET_CRYPTO_EccPublicSignKey + *slave_key, uint64_t announced_at) { + struct ChannelSlaveRemove *slvrm; + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm)); + slvrm = (struct ChannelSlaveRemove *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) slvrm; + + slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; + slvrm->header.size = htons (sizeof (*slvrm)); + slvrm->announced_at = GNUNET_htonll (announced_at); + GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + transmit_next (ch); } @@ -424,7 +876,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, * * To get the latest message, use 0 for both the start and end message ID. * - * @param channel Which channel should be replayed? + * @param ch Which channel should be replayed? * @param start_message_id Earliest interesting point in history. * @param end_message_id Last (exclusive) interesting point in history. * @param method Function to invoke on messages received from the story. @@ -441,7 +893,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, * @return Handle to cancel story telling operation. */ struct GNUNET_PSYC_Story * -GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, +GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *ch, uint64_t start_message_id, uint64_t end_message_id, GNUNET_PSYC_Method method, @@ -495,9 +947,9 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, /** * Return all channel state variables whose name matches a given prefix. * - * A name matches if it starts with the given @a name_prefix, thus requesting the - * empty prefix ("") will match all values; requesting "_a_b" will also return - * values stored under "_a_b_c". + * A name matches if it starts with the given @a name_prefix, thus requesting + * the empty prefix ("") will match all values; requesting "_a_b" will also + * return values stored under "_a_b_c". * * The @a state_cb is invoked on all matching state variables asynchronously, as * the state is stored in and retrieved from the PSYCstore, diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c @@ -28,12 +28,12 @@ #include "platform.h" #include "gnunet_common.h" #include "gnunet_util_lib.h" -#include "gnunet_psycstore_service.h" #include "gnunet_testing_lib.h" +#include "gnunet_psyc_service.h" #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) -#define DEBUG_SERVICE 0 +#define DEBUG_SERVICE 1 /** @@ -41,11 +41,22 @@ */ static int res; +static const struct GNUNET_CONFIGURATION_Handle *cfg; + /** * Handle for task for timeout termination. */ static GNUNET_SCHEDULER_TaskIdentifier end_badly_task; +static struct GNUNET_PSYC_Master *mst; +static struct GNUNET_PSYC_Slave *slv; +static struct GNUNET_PSYC_Channel *ch; + +static struct GNUNET_CRYPTO_EccPrivateKey *channel_key; +static struct GNUNET_CRYPTO_EccPrivateKey *slave_key; + +static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; +static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; /** * Clean up all resources used. @@ -53,6 +64,11 @@ static GNUNET_SCHEDULER_TaskIdentifier end_badly_task; static void cleanup () { + if (master != NULL) + { + GNUNET_PSYC_master_stop (master); + master = NULL; + } GNUNET_SCHEDULER_shutdown (); } @@ -100,6 +116,42 @@ end () &end_normally, NULL); } + +static int +method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + uint64_t message_id, const char *method_name, + size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, + uint64_t data_offset, const void *data, size_t data_size, + enum GNUNET_PSYC_MessageFlags flags) +{ + return GNUNET_OK; +} + + +static int +join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, + const char *method_name, + size_t variable_count, const struct GNUNET_ENV_Modifier *variables, + const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh) +{ + return GNUNET_OK; +} + + +void +master_started (void *cls, uint64_t max_message_id) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); +} + + +void +slave_joined (void *cls, uint64_t max_message_id) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); +} + + /** * Main function of the test, run from scheduler. * @@ -110,14 +162,28 @@ end () static void #if DEBUG_SERVICE run (void *cls, char *const *args, const char *cfgfile, - const struct GNUNET_CONFIGURATION_Handle *cfg) + const struct GNUNET_CONFIGURATION_Handle *c) #else run (void *cls, - const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_TESTING_Peer *peer) #endif { + cfg = c; end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL); + + channel_key = GNUNET_CRYPTO_ecc_key_create (); + slave_key = GNUNET_CRYPTO_ecc_key_create (); + + GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key, &channel_pub_key); + GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key); + + mst = GNUNET_PSYC_master_start (cfg, channel_key, + GNUNET_PSYC_CHANNEL_PRIVATE, + &method, &join, &master_started, NULL); + + slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, + &method, &join, &slave_joined, NULL); }