gnunet

Main GNUnet Logic
Log | Files | Refs | Submodules | README | LICENSE

commit b469a43110fd4c08e61530324fc0cec02fa7162b
parent f874eb3423cde2257d57c0f5d651297b303b3cff
Author: Christian Grothoff <christian@grothoff.org>
Date:   Mon, 23 Jun 2014 19:58:56 +0000

-add support for 'update_inbound_delay' to HTTP client, complete plugin monitoring implementation

Diffstat:
Msrc/transport/plugin_transport_http_client.c | 226++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
1 file changed, 148 insertions(+), 78 deletions(-)

diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2002-2013 Christian Grothoff (and other contributing authors) + (C) 2002-2014 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -22,6 +22,7 @@ * @file transport/plugin_transport_http_client.c * @brief HTTP/S client transport plugin * @author Matthias Wachs + * @author Christian Grothoff */ #if BUILD_HTTPS @@ -237,7 +238,7 @@ struct Session uint32_t ats_address_network_type; /** - * Is the client PUT handle currently paused + * Is the client PUT handle currently paused? */ int put_paused; @@ -394,9 +395,7 @@ notify_session_monitor (struct HTTP_Client_Plugin *plugin, info.is_inbound = GNUNET_SYSERR; /* hard to say */ info.num_msg_pending = session->msgs_in_queue; info.num_bytes_pending = session->bytes_in_queue; - /* info.receive_delay remains zero as this is not supported by UDP - (cannot selectively not receive from 'some' peer while continuing - to receive from others) */ + info.receive_delay = session->next_receive; info.session_timeout = session->timeout; info.address = session->address; plugin->sic (plugin->sic_cls, @@ -406,11 +405,16 @@ notify_session_monitor (struct HTTP_Client_Plugin *plugin, /** - * Increment session timeout due to activity for a session + * Increment session timeout due to activity for session @a s. + * * @param s the session */ static void -client_reschedule_session_timeout (struct Session *s); +client_reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); +} /** @@ -564,7 +568,9 @@ http_client_plugin_send (void *cls, GNUNET_STATISTICS_update (plugin->env->stats, stat_txt, msgbuf_size, GNUNET_NO); GNUNET_free (stat_txt); - + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_UP); if (GNUNET_YES == s->put_tmp_disconnecting) { /* PUT connection is currently getting disconnected */ @@ -597,12 +603,10 @@ http_client_plugin_send (void *cls, s->put_tmp_disconnected = GNUNET_NO; GNUNET_break (s->client_put == NULL); if (GNUNET_SYSERR == client_connect_put (s)) - { return GNUNET_SYSERR; - } } - - client_schedule (s->plugin, GNUNET_YES); + client_schedule (s->plugin, + GNUNET_YES); return msgbuf_size; } @@ -655,7 +659,11 @@ client_delete_session (struct Session *s) s->overhead = 0; GNUNET_free (pos); } - + GNUNET_assert (0 == s->msgs_in_queue); + GNUNET_assert (0 == s->bytes_in_queue); + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_DOWN); if (NULL != s->msg_tk) { GNUNET_SERVER_mst_destroy (s->msg_tk); @@ -675,8 +683,8 @@ client_delete_session (struct Session *s) * @return #GNUNET_OK on success, #GNUNET_SYSERR on error */ static int -http_client_session_disconnect (void *cls, - struct Session *s) +http_client_plugin_session_disconnect (void *cls, + struct Session *s) { struct HTTP_Client_Plugin *plugin = cls; struct HTTP_Message *msg; @@ -688,7 +696,9 @@ http_client_session_disconnect (void *cls, { LOG (GNUNET_ERROR_TYPE_DEBUG, "Session %p/connection %p: disconnecting PUT connection to peer `%s'\n", - s, s->client_put, GNUNET_i2s (&s->target)); + s, + s->client_put, + GNUNET_i2s (&s->target)); /* remove curl handle from multi handle */ mret = curl_multi_remove_handle (plugin->curl_multi_handle, s->client_put); @@ -801,7 +811,7 @@ destroy_session_cb (void *cls, struct HTTP_Client_Plugin *plugin = cls; struct Session *session = value; - http_client_session_disconnect (plugin, session); + http_client_plugin_session_disconnect (plugin, session); return GNUNET_OK; } @@ -815,8 +825,8 @@ destroy_session_cb (void *cls, * @param target peer from which to disconnect */ static void -http_client_peer_disconnect (void *cls, - const struct GNUNET_PeerIdentity *target) +http_client_plugin_peer_disconnect (void *cls, + const struct GNUNET_PeerIdentity *target) { struct HTTP_Client_Plugin *plugin = cls; @@ -896,7 +906,12 @@ client_lookup_session (struct HTTP_Client_Plugin *plugin, /** - * FIXME. + * When we have nothing to transmit, we pause the HTTP PUT + * after a while (so that gnurl stops asking). This task + * is the delayed task that actually pauses the PUT. + * + * @param cls the `struct Session *` with the put + * @param tc scheduler context */ static void client_put_disconnect (void *cls, @@ -952,7 +967,9 @@ client_send_cb (void *stream, LOG (GNUNET_ERROR_TYPE_DEBUG, "Session %p/connection %p: nothing to send, suspending\n", s, s->client_put); - s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed (PUT_DISCONNECT_TIMEOUT, &client_put_disconnect, s); + s->put_disconnect_task = GNUNET_SCHEDULER_add_delayed (PUT_DISCONNECT_TIMEOUT, + &client_put_disconnect, + s); s->put_paused = GNUNET_YES; return CURL_READFUNC_PAUSE; } @@ -985,6 +1002,9 @@ client_send_cb (void *stream, s->overhead = 0; GNUNET_free (msg); } + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_UP); GNUNET_asprintf (&stat_txt, "# bytes currently in %s_client buffers", plugin->protocol); @@ -1024,10 +1044,19 @@ client_wake_up (void *cls, "Session %p/connection %p: Waking up GET handle\n", s, s->client_get); - s->put_paused = GNUNET_NO; + if (GNUNET_YES == s->put_paused) + { + /* PUT connection was paused, unpause */ + GNUNET_assert (s->put_disconnect_task != GNUNET_SCHEDULER_NO_TASK); + GNUNET_SCHEDULER_cancel (s->put_disconnect_task); + s->put_disconnect_task = GNUNET_SCHEDULER_NO_TASK; + s->put_paused = GNUNET_NO; + if (NULL != s->client_put) + curl_easy_pause (s->client_put, CURLPAUSE_CONT); + } if (NULL != s->client_get) - curl_easy_pause (s->client_get, CURLPAUSE_CONT); - + curl_easy_pause (s->client_get, + CURLPAUSE_CONT); } @@ -1055,7 +1084,10 @@ client_receive_mst_cb (void *cls, atsi.value = s->ats_address_network_type; GNUNET_break (s->ats_address_network_type != ntohl (GNUNET_ATS_NET_UNSPECIFIED)); - delay = s->plugin->env->receive (plugin->env->cls, s->address, s, message); + delay = s->plugin->env->receive (plugin->env->cls, + s->address, + s, + message); plugin->env->update_address_metrics (plugin->env->cls, s->address, s, &atsi, 1); @@ -1064,12 +1096,12 @@ client_receive_mst_cb (void *cls, "# bytes received via %s_client", plugin->protocol); GNUNET_STATISTICS_update (plugin->env->stats, - stat_txt, ntohs(message->size), GNUNET_NO); + stat_txt, + ntohs (message->size), + GNUNET_NO); GNUNET_free (stat_txt); - s->next_receive = - GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay); - + s->next_receive = GNUNET_TIME_relative_to_absolute (delay); if (GNUNET_TIME_absolute_get ().abs_value_us < s->next_receive.abs_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1117,7 +1149,10 @@ client_receive_put (void *stream, * @return bytes read from stream */ static size_t -client_receive (void *stream, size_t size, size_t nmemb, void *cls) +client_receive (void *stream, + size_t size, + size_t nmemb, + void *cls) { struct Session *s = cls; struct GNUNET_TIME_Absolute now; @@ -1131,11 +1166,13 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) if (now.abs_value_us < s->next_receive.abs_value_us) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); - struct GNUNET_TIME_Relative delta = - GNUNET_TIME_absolute_get_difference (now, s->next_receive); + struct GNUNET_TIME_Relative delta + = GNUNET_TIME_absolute_get_difference (now, s->next_receive); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Session %p / connection %p: No inbound bandwidth available! Next read was delayed for %s\n", - s, s->client_get, + s, + s->client_get, GNUNET_STRINGS_relative_time_to_string (delta, GNUNET_YES)); if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK) @@ -1143,13 +1180,21 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) GNUNET_SCHEDULER_cancel (s->recv_wakeup_task); s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK; } - s->recv_wakeup_task = - GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s); + s->recv_wakeup_task + = GNUNET_SCHEDULER_add_delayed (delta, + &client_wake_up, + s); return CURL_WRITEFUNC_PAUSE; } if (NULL == s->msg_tk) - s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); - GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO); + s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, + s); + GNUNET_SERVER_mst_receive (s->msg_tk, + s, + stream, + len, + GNUNET_NO, + GNUNET_NO); return len; } @@ -1161,7 +1206,8 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) * @param tc gnunet scheduler task context */ static void -client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +client_run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); /** @@ -1172,7 +1218,8 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); * @return #GNUNET_SYSERR for hard failure, #GNUNET_OK for ok */ static int -client_schedule (struct HTTP_Client_Plugin *plugin, int now) +client_schedule (struct HTTP_Client_Plugin *plugin, + int now) { fd_set rs; fd_set ws; @@ -1212,7 +1259,8 @@ client_schedule (struct HTTP_Client_Plugin *plugin, int now) if (mret != CURLM_OK) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("%s failed at %s:%d: `%s'\n"), + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("%s failed at %s:%d: `%s'\n"), "curl_multi_timeout", __FILE__, __LINE__, curl_multi_strerror (mret)); return GNUNET_SYSERR; @@ -1237,18 +1285,17 @@ client_schedule (struct HTTP_Client_Plugin *plugin, int now) * Task performing curl operations * * @param cls plugin as closure - * @param tc gnunet scheduler task context + * @param tc scheduler task context */ static void -client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +client_run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct HTTP_Client_Plugin *plugin = cls; int running; long http_statuscode; CURLMcode mret; - GNUNET_assert (cls != NULL); - plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; @@ -1267,7 +1314,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct Session *s = NULL; char *d = (char *) s; - if (easy_h == NULL) + if (NULL == easy_h) { GNUNET_break (0); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1353,7 +1400,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* Disconnect other transmission direction and tell transport */ s->get.easyhandle = NULL; s->get.s = NULL; - http_client_session_disconnect (plugin, s); + http_client_plugin_session_disconnect (plugin, s); } } } @@ -1568,7 +1615,6 @@ client_connect_put (struct Session *s) static int client_connect (struct Session *s) { - struct HTTP_Client_Plugin *plugin = s->plugin; int res = GNUNET_OK; @@ -1584,8 +1630,9 @@ client_connect (struct Session *s) } GNUNET_asprintf (&s->url, "%s/%s;%u", - http_common_plugin_address_to_url (NULL, s->address->address, - s->address->address_length), + http_common_plugin_address_to_url (NULL, + s->address->address, + s->address->address_length), GNUNET_i2s_full (plugin->env->my_identity), plugin->last_tag); @@ -1613,7 +1660,6 @@ client_connect (struct Session *s) HTTP_STAT_STR_CONNECTIONS, plugin->cur_connections, GNUNET_NO); - /* Re-schedule since handles have changed */ if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { @@ -1633,8 +1679,8 @@ client_connect (struct Session *s) * @return the network type */ static enum GNUNET_ATS_Network_Type -http_client_get_network (void *cls, - struct Session *session) +http_client_plugin_get_network (void *cls, + struct Session *session) { return ntohl (session->ats_address_network_type); } @@ -1673,7 +1719,7 @@ client_session_timeout (void *cls, GNUNET_STRINGS_relative_time_to_string (HTTP_CLIENT_SESSION_TIMEOUT, GNUNET_YES)); GNUNET_assert (GNUNET_OK == - http_client_session_disconnect (s->plugin, + http_client_plugin_session_disconnect (s->plugin, s)); } @@ -1732,7 +1778,6 @@ http_client_plugin_get_session (void *cls, salen = sizeof (struct sockaddr_in6); } ats = plugin->env->get_address_type (plugin->env->cls, sa, salen); - //fprintf (stderr, "Address %s is in %s\n", GNUNET_a2s (sa,salen), GNUNET_ATS_print_network_type(ntohl(ats.value))); GNUNET_free (sa); } else if (GNUNET_NO == res) @@ -1783,6 +1828,9 @@ http_client_plugin_get_session (void *cls, client_delete_session (s); return NULL; } + notify_session_monitor (plugin, + s, + GNUNET_TRANSPORT_SS_UP); /* or handshake? */ return s; } @@ -1811,19 +1859,6 @@ client_start (struct HTTP_Client_Plugin *plugin) /** - * Increment session timeout due to activity for session @a s. - * - * @param s the session - */ -static void -client_reschedule_session_timeout (struct Session *s) -{ - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); - s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); -} - - -/** * Another peer has suggested an address for this * peer and transport plugin. Check that this could be a valid * address. If so, consider adding it to the list @@ -1907,9 +1942,11 @@ client_configure_plugin (struct HTTP_Client_Plugin *plugin) /* Optional parameters */ - if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, - plugin->name, - "MAX_CONNECTIONS", &max_connections)) + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, + plugin->name, + "MAX_CONNECTIONS", + &max_connections)) max_connections = 128; plugin->max_connections = max_connections; @@ -1990,14 +2027,15 @@ client_configure_plugin (struct HTTP_Client_Plugin *plugin) } /* proxy http tunneling */ - if (GNUNET_SYSERR == (plugin->proxy_use_httpproxytunnel = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg, - plugin->name, "PROXY_HTTP_TUNNELING"))) + plugin->proxy_use_httpproxytunnel + = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg, + plugin->name, + "PROXY_HTTP_TUNNELING"); + if (GNUNET_SYSERR == plugin->proxy_use_httpproxytunnel) plugin->proxy_use_httpproxytunnel = GNUNET_NO; GNUNET_free_non_null (proxy_type); } - - return GNUNET_OK; } @@ -2046,6 +2084,38 @@ http_client_plugin_update_session_timeout (void *cls, /** + * Function that will be called whenever the transport service wants to + * notify the plugin that the inbound quota changed and that the plugin + * should update it's delay for the next receive value + * + * @param cls closure + * @param peer which peer was the session for + * @param session which session is being updated + * @param delay new delay to use for receiving + */ +static void +http_client_plugin_update_inbound_delay (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Session *s, + struct GNUNET_TIME_Relative delay) +{ + s->next_receive = GNUNET_TIME_relative_to_absolute (delay); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "New inbound delay %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_NO)); + if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (s->recv_wakeup_task); + s->recv_wakeup_task + = GNUNET_SCHEDULER_add_delayed (delay, + &client_wake_up, + s); + } +} + + +/** * Return information about the given session to the * monitor callback. * @@ -2131,16 +2201,17 @@ LIBGNUNET_PLUGIN_TRANSPORT_INIT (void *cls) api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions); api->cls = plugin; api->send = &http_client_plugin_send; - api->disconnect_session = &http_client_session_disconnect; + api->disconnect_session = &http_client_plugin_session_disconnect; api->query_keepalive_factor = &http_client_query_keepalive_factor; - api->disconnect_peer = &http_client_peer_disconnect; + api->disconnect_peer = &http_client_plugin_peer_disconnect; api->check_address = &http_client_plugin_address_suggested; api->get_session = &http_client_plugin_get_session; api->address_to_string = &http_client_plugin_address_to_string; api->string_to_address = &http_common_plugin_string_to_address; api->address_pretty_printer = &http_common_plugin_address_pretty_printer; - api->get_network = &http_client_get_network; + api->get_network = &http_client_plugin_get_network; api->update_session_timeout = &http_client_plugin_update_session_timeout; + api->update_inbound_delay = &http_client_plugin_update_inbound_delay; api->setup_monitor = &http_client_plugin_setup_monitor; #if BUILD_HTTPS plugin->name = "transport-https_client"; @@ -2150,7 +2221,6 @@ LIBGNUNET_PLUGIN_TRANSPORT_INIT (void *cls) plugin->protocol = "http"; #endif plugin->last_tag = 1; - plugin->options = 0; /* Setup options */ if (GNUNET_SYSERR == client_configure_plugin (plugin)) {