From 777dd74b16064c91068e617a6fd39b2800fc0588 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 12 Aug 2021 19:07:28 +0200 Subject: implement long-polling in fakebank --- src/bank-lib/fakebank.c | 716 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 636 insertions(+), 80 deletions(-) (limited to 'src/bank-lib') diff --git a/src/bank-lib/fakebank.c b/src/bank-lib/fakebank.c index ecb5934e6..580012b02 100644 --- a/src/bank-lib/fakebank.c +++ b/src/bank-lib/fakebank.c @@ -21,11 +21,12 @@ * @brief library that fakes being a Taler bank for testcases * @author Christian Grothoff */ -// TODO: support long polling // TODO: support adding WAD transfers #include "platform.h" #include +#include +#include #include "taler_fakebank_lib.h" #include "taler_bank_service.h" #include "taler_mhd_lib.h" @@ -43,6 +44,73 @@ */ #define MAX_URL_LEN 64 +/** + * Per account information. + */ +struct Account; + + +/** + * Types of long polling activities. + */ +enum LongPollType +{ + /** + * Transfer TO the exchange. + */ + LP_CREDIT, + + /** + * Transfer FROM the exchange. + */ + LP_DEBIT + +}; + +/** + * Client waiting for activity on this account. + */ +struct LongPoller +{ + + /** + * Kept in a DLL. + */ + struct LongPoller *next; + + /** + * Kept in a DLL. + */ + struct LongPoller *prev; + + /** + * Account this long poller is waiting on. + */ + struct Account *account; + + /** + * Entry in the heap for this long poller. + */ + struct GNUNET_CONTAINER_HeapNode *hn; + + /** + * Client that is waiting for transactions. + */ + struct MHD_Connection *conn; + + /** + * When will this long poller time out? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * What does the @e connection wait for? + */ + enum LongPollType type; + +}; + + /** * Details about a transcation we (as the simulated bank) received. */ @@ -74,6 +142,16 @@ struct Account */ struct Transaction *out_tail; + /** + * Kept in a DLL. + */ + struct LongPoller *lp_head; + + /** + * Kept in a DLL. + */ + struct LongPoller *lp_tail; + /** * Account name (string, not payto!) */ @@ -256,6 +334,23 @@ struct TALER_FAKEBANK_Handle */ struct GNUNET_SCHEDULER_Task *mhd_task; + /** + * Task for expiring long-polling connections, + * unless we are using a thread pool (then NULL). + */ + struct GNUNET_SCHEDULER_Task *lp_task; + + /** + * Task for expiring long-polling connections, unless we are using the + * GNUnet scheduler (then NULL). + */ + pthread_t lp_thread; + + /** + * MIN-heap of long pollers, sorted by timeout. + */ + struct GNUNET_CONTAINER_Heap *lp_heap; + /** * Hashmap of reserve public keys to * `struct Transaction` with that reserve public @@ -319,6 +414,17 @@ struct TALER_FAKEBANK_Handle */ uint16_t port; + /** + * Event FD to signal @a lp_thread a change in + * @a lp_heap. + */ + int lp_event; + + /** + * Set to true once we are shutting down. + */ + bool in_shutdown; + #if EPOLL_SUPPORT /** * Boxed @e mhd_fd. @@ -333,6 +439,145 @@ struct TALER_FAKEBANK_Handle }; +/** + * Special address "con_cls" can point to to indicate that the handler has + * been called more than once already (was previously suspended). + */ +static int special_ptr; + + +/** + * Task run whenever HTTP server operations are pending. + * + * @param cls the `struct TALER_FAKEBANK_Handle` + */ +static void +run_mhd (void *cls); + + +/** + * Trigger the @a lp. Frees associated resources, + * except the entry of @a lp in the timeout heap. + * Must be called while the ``big lock`` is held. + * + * @param[in] lp long poller to trigger + * @param[in,out] h fakebank handle + */ +static void +lp_trigger (struct LongPoller *lp, + struct TALER_FAKEBANK_Handle *h) +{ + struct Account *acc = lp->account; + + GNUNET_CONTAINER_DLL_remove (acc->lp_head, + acc->lp_tail, + lp); + MHD_resume_connection (lp->conn); + GNUNET_free (lp); + if (NULL != h->mhd_task) + GNUNET_SCHEDULER_cancel (h->mhd_task); + h->mhd_task = + GNUNET_SCHEDULER_add_now (&run_mhd, + h); +} + + +/** + * Thread that is run to wake up connections that have hit their timeout. Runs + * until in_shutdown is set to true. Must be send signals via lp_event on + * shutdown and/or whenever the heap changes to an earlier timeout. + * + * @param cls a `struct TALER_FAKEBANK_Handle *` + * @return NULL + */ +static void * +lp_expiration_thread (void *cls) +{ + struct TALER_FAKEBANK_Handle *h = cls; + + GNUNET_assert (0 == + pthread_mutex_lock (&h->big_lock)); + while (! h->in_shutdown) + { + struct LongPoller *lp; + int timeout_ms; + + lp = GNUNET_CONTAINER_heap_peek (h->lp_heap); + while ( (NULL != lp) && + GNUNET_TIME_absolute_is_past (lp->timeout)) + { + GNUNET_assert (lp == + GNUNET_CONTAINER_heap_remove_root (h->lp_heap)); + GNUNET_assert (0 == + pthread_mutex_lock (&h->big_lock)); + lp_trigger (lp, + h); + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + lp = GNUNET_CONTAINER_heap_peek (h->lp_heap); + } + if (NULL != lp) + { + struct GNUNET_TIME_Relative rem; + unsigned long long left_ms; + + rem = GNUNET_TIME_absolute_get_remaining (lp->timeout); + left_ms = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us; + if (left_ms > INT_MAX) + timeout_ms = INT_MAX; + else + timeout_ms = (int) left_ms; + } + else + { + timeout_ms = -1; /* infinity */ + } + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + { + struct pollfd p = { + .fd = h->lp_event, + .events = POLLIN + }; + int ret; + + ret = poll (&p, + 1, + timeout_ms); + if (-1 == ret) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "poll"); + } + else if (1 == ret) + { + /* clear event */ + uint64_t ev; + ssize_t iret; + + iret = read (h->lp_event, + &ev, + sizeof (ev)); + if (-1 == iret) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "read"); + } + else + { + GNUNET_break (sizeof (uint64_t) == iret); + } + } + } + GNUNET_assert (0 == + pthread_mutex_lock (&h->big_lock)); + } + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + return NULL; +} + + /** * Lookup account with @a name, and if it does not exist, create it. * @@ -626,6 +871,36 @@ post_transaction (struct TALER_FAKEBANK_Handle *h, ca->in_tail, old); } + { + struct LongPoller *nxt; + + for (struct LongPoller *lp = debit_acc->lp_head; + NULL != lp; + lp = nxt) + { + nxt = lp->next; + if (LP_DEBIT == lp->type) + { + GNUNET_assert (lp == + GNUNET_CONTAINER_heap_remove_node (lp->hn)); + lp_trigger (lp, + h); + } + } + for (struct LongPoller *lp = credit_acc->lp_head; + NULL != lp; + lp = nxt) + { + nxt = lp->next; + if (LP_CREDIT == lp->type) + { + GNUNET_assert (lp == + GNUNET_CONTAINER_heap_remove_node (lp->hn)); + lp_trigger (lp, + h); + } + } + } GNUNET_assert (0 == pthread_mutex_unlock (&h->big_lock)); if ( (NULL != old) && @@ -884,6 +1159,7 @@ free_account (void *cls, { struct Account *account = val; + GNUNET_assert (NULL == account->lp_head); GNUNET_free (account->account_name); GNUNET_free (account); return GNUNET_OK; @@ -898,6 +1174,11 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h) GNUNET_SCHEDULER_cancel (h->mhd_task); h->mhd_task = NULL; } + if (NULL != h->lp_task) + { + GNUNET_SCHEDULER_cancel (h->lp_task); + h->lp_task = NULL; + } #if EPOLL_SUPPORT if (NULL != h->mhd_rfd) { @@ -910,6 +1191,39 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h) MHD_stop_daemon (h->mhd_bank); h->mhd_bank = NULL; } + if (-1 != h->lp_event) + { + uint64_t val = 1; + void *ret; + struct LongPoller *lp; + + GNUNET_assert (0 == + pthread_mutex_lock (&h->big_lock)); + h->in_shutdown = true; + while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap))) + lp_trigger (lp, + h); + GNUNET_break (sizeof (val) == + write (h->lp_event, + &val, + sizeof (val))); + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + GNUNET_break (0 == + pthread_join (h->lp_thread, + &ret)); + GNUNET_break (NULL == ret); + GNUNET_break (0 == close (h->lp_event)); + h->lp_event = -1; + } + else + { + struct LongPoller *lp; + + while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap))) + lp_trigger (lp, + h); + } if (NULL != h->accounts) { GNUNET_CONTAINER_multihashmap_iterate (h->accounts, @@ -919,6 +1233,7 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h) } GNUNET_CONTAINER_multihashmap_destroy (h->uuid_map); GNUNET_CONTAINER_multipeermap_destroy (h->rpubs); + GNUNET_CONTAINER_heap_destroy (h->lp_heap); GNUNET_assert (0 == pthread_mutex_destroy (&h->big_lock)); GNUNET_assert (0 == @@ -960,6 +1275,10 @@ handle_mhd_completion_callback (void *cls, (void) cls; (void) connection; (void) toe; + if (NULL == *con_cls) + return; + if (&special_ptr == *con_cls) + return; GNUNET_JSON_post_parser_cleanup (*con_cls); *con_cls = NULL; } @@ -988,7 +1307,6 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h, json_t *json; uint64_t row_id; struct GNUNET_TIME_Absolute timestamp; - enum GNUNET_GenericReturnValue ret; pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX, connection, @@ -1017,6 +1335,7 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h, struct TALER_Amount amount; struct TALER_ReservePublicKeyP reserve_pub; char *debit; + enum GNUNET_GenericReturnValue ret; struct GNUNET_JSON_Specification spec[] = { GNUNET_JSON_spec_fixed_auto ("reserve_pub", &reserve_pub), @@ -1029,14 +1348,13 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h, }; if (GNUNET_OK != - GNUNET_JSON_parse (json, - spec, - NULL, NULL)) + (ret = TALER_MHD_parse_json_data (connection, + json, + spec))) { - GNUNET_break (0); + GNUNET_break_op (0); json_decref (json); - /* We're fakebank, no need for nice error handling */ - return MHD_NO; + return (GNUNET_NO == ret) ? MHD_YES : MHD_NO; } if (0 != strcasecmp (amount.currency, h->currency)) @@ -1141,6 +1459,7 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h, char *credit; const char *base_url; struct TALER_Amount amount; + enum GNUNET_GenericReturnValue ret; struct GNUNET_JSON_Specification spec[] = { GNUNET_JSON_spec_fixed_auto ("request_uid", &uuid), @@ -1157,14 +1476,13 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h, }; if (GNUNET_OK != - GNUNET_JSON_parse (json, - spec, - NULL, NULL)) + (ret = TALER_MHD_parse_json_data (connection, + json, + spec))) { - GNUNET_break (0); + GNUNET_break_op (0); json_decref (json); - /* We are fakebank, no need for nice error handling */ - return MHD_NO; + return (GNUNET_NO == ret) ? MHD_YES : MHD_NO; } { int ret; @@ -1223,20 +1541,17 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h, * * @param h the fakebank handle * @param connection the connection - * @param con_cls place to store state, not used * @return MHD result code */ static MHD_RESULT handle_home_page (struct TALER_FAKEBANK_Handle *h, - struct MHD_Connection *connection, - void **con_cls) + struct MHD_Connection *connection) { MHD_RESULT ret; struct MHD_Response *resp; #define HELLOMSG "Hello, Fakebank!" (void) h; - (void) con_cls; resp = MHD_create_response_from_buffer ( strlen (HELLOMSG), HELLOMSG, @@ -1292,9 +1607,11 @@ struct HistoryArgs * @param h bank handle to work on * @param connection MHD connection. * @param[out] ha will contain the parsed values. - * @return #GNUNET_OK only if the parsing succeeds. + * @return #GNUNET_OK only if the parsing succeeds, + * #GNUNET_SYSERR if it failed, + * #GNUNET_NO if it failed and an error was returned */ -static int +static enum GNUNET_GenericReturnValue parse_history_common_args (const struct TALER_FAKEBANK_Handle *h, struct MHD_Connection *connection, struct HistoryArgs *ha) @@ -1305,6 +1622,7 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h, unsigned long long lp_timeout; unsigned long long sval; long long d; + char dummy; start = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, @@ -1319,23 +1637,60 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h, lp_timeout = 0; if ( (NULL == delta) || (1 != sscanf (delta, - "%lld", - &d)) || - ( (NULL != long_poll_ms) && - (1 != sscanf (long_poll_ms, - "%llu", - &lp_timeout)) ) || - ( (NULL != start) && - (1 != sscanf (start, - "%llu", - &sval)) ) ) + "%lld%c", + &d, + &dummy)) ) { /* Fail if one of the above failed. */ /* Invalid request, given that this is fakebank we impolitely * just kill the connection instead of returning a nice error. */ - GNUNET_break (0); - return GNUNET_NO; + GNUNET_break_op (0); + return (MHD_YES == + TALER_MHD_reply_with_error (connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "delta")) + ? GNUNET_NO + : GNUNET_SYSERR; + } + if ( (NULL != long_poll_ms) && + (1 != sscanf (long_poll_ms, + "%llu%c", + &lp_timeout, + &dummy)) ) + { + /* Fail if one of the above failed. */ + /* Invalid request, given that this is fakebank we impolitely + * just kill the connection instead of returning a nice error. + */ + GNUNET_break_op (0); + return (MHD_YES == + TALER_MHD_reply_with_error (connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "long_poll_ms")) + ? GNUNET_NO + : GNUNET_SYSERR; + } + if ( (NULL != start) && + (1 != sscanf (start, + "%llu%c", + &sval, + &dummy)) ) + { + /* Fail if one of the above failed. */ + /* Invalid request, given that this is fakebank we impolitely + * just kill the connection instead of returning a nice error. + */ + GNUNET_break_op (0); + return (MHD_YES == + TALER_MHD_reply_with_error (connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "start")) + ? GNUNET_NO + : GNUNET_SYSERR; } if (NULL == start) ha->start_idx = (d > 0) ? 0 : h->serial_counter; @@ -1344,8 +1699,14 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h, ha->delta = (int64_t) d; if (0 == ha->delta) { - GNUNET_break (0); - return GNUNET_NO; + GNUNET_break_op (0); + return (MHD_YES == + TALER_MHD_reply_with_error (connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "delta")) + ? GNUNET_NO + : GNUNET_SYSERR; } ha->lp_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, @@ -1358,34 +1719,147 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h, } +/** + * Task run when a long poller is about to time out. + * Only used in single-threaded mode. + * + * @param cls a `struct TALER_FAKEBANK_Handle *` + */ +static void +lp_timeout (void *cls) +{ + struct TALER_FAKEBANK_Handle *h = cls; + struct LongPoller *lp; + + h->lp_task = NULL; + while (NULL != (lp = GNUNET_CONTAINER_heap_peek (h->lp_heap))) + { + if (GNUNET_TIME_absolute_is_future (lp->timeout)) + break; + GNUNET_assert (lp == + GNUNET_CONTAINER_heap_remove_root (h->lp_heap)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Timeout reached for long poller %p\n", + lp->conn); + lp_trigger (lp, + h); + } + if (NULL == lp) + return; + h->lp_task = GNUNET_SCHEDULER_add_at (lp->timeout, + &lp_timeout, + h); +} + + +/** + * Reschedule the timeout task of @a h for time @a t. + * + * @param h fakebank handle + * @param t when will the next connection timeout expire + */ +static void +reschedule_lp_timeout (struct TALER_FAKEBANK_Handle *h, + struct GNUNET_TIME_Absolute t) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Scheduling timeout task for %s\n", + GNUNET_STRINGS_absolute_time_to_string (t)); + if (-1 != h->lp_event) + { + uint64_t num = 1; + + GNUNET_break (sizeof (num) == + write (h->lp_event, + &num, + sizeof (num))); + } + else + { + if (NULL != h->lp_task) + GNUNET_SCHEDULER_cancel (h->lp_task); + h->lp_task = GNUNET_SCHEDULER_add_at (t, + &lp_timeout, + h); + } +} + + +/** + * Start long-polling for @a connection and @a acc + * for transfers in @a dir. Must be called with the + * "big lock" held. + * + * @param[in,out] h fakebank handle + * @param[in,out] connection to suspend + * @param[in,out] acc account affected + * @param lp_timeout how long to suspend + * @param dir direction of transfers to watch for + */ +static void +start_lp (struct TALER_FAKEBANK_Handle *h, + struct MHD_Connection *connection, + struct Account *acc, + struct GNUNET_TIME_Relative lp_timeout, + enum LongPollType dir) +{ + struct LongPoller *lp; + bool toc; + + lp = GNUNET_new (struct LongPoller); + lp->account = acc; + lp->conn = connection; + lp->timeout = GNUNET_TIME_relative_to_absolute (lp_timeout); + lp->type = dir; + lp->hn = GNUNET_CONTAINER_heap_insert (h->lp_heap, + lp, + lp->timeout.abs_value_us); + toc = (lp == + GNUNET_CONTAINER_heap_peek (h->lp_heap)); + GNUNET_CONTAINER_DLL_insert (acc->lp_head, + acc->lp_tail, + lp); + MHD_suspend_connection (connection); + if (toc) + reschedule_lp_timeout (h, + lp->timeout); + +} + + /** * Handle incoming HTTP request for /history/outgoing * * @param h the fakebank handle * @param connection the connection * @param account which account the request is about - * @return MHD result code + * @param con_cls closure for request (NULL or &special_ptr) */ static MHD_RESULT handle_debit_history (struct TALER_FAKEBANK_Handle *h, struct MHD_Connection *connection, - const char *account) + const char *account, + void **con_cls) { struct HistoryArgs ha; struct Account *acc; struct Transaction *pos; json_t *history; char *debit_payto; + enum GNUNET_GenericReturnValue ret; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Handling /history/outgoing connection %p\n", + connection); if (GNUNET_OK != - parse_history_common_args (h, - connection, - &ha)) + (ret = parse_history_common_args (h, + connection, + &ha))) { - GNUNET_break (0); - return MHD_NO; + return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES; } - + if (&special_ptr == *con_cls) + ha.lp_timeout = GNUNET_TIME_UNIT_ZERO; acc = lookup_account (h, account); GNUNET_asprintf (&debit_payto, @@ -1430,16 +1904,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h, if ( (NULL == t) || overflow) { + GNUNET_free (debit_payto); + if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) && + (0 < ha.delta)) + { + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + return TALER_MHD_REPLY_JSON_PACK ( + connection, + MHD_HTTP_OK, + GNUNET_JSON_pack_array_steal ( + "outgoing_transactions", + history)); + } + *con_cls = &special_ptr; + start_lp (h, + connection, + acc, + ha.lp_timeout, + LP_DEBIT); GNUNET_assert (0 == pthread_mutex_unlock (&h->big_lock)); - GNUNET_free (debit_payto); - /* FIXME: suspend for long-polling instead */ - return TALER_MHD_REPLY_JSON_PACK ( - connection, - MHD_HTTP_OK, - GNUNET_JSON_pack_array_steal ( - "outgoing_transactions", - history)); + json_decref (history); + return MHD_YES; } if (t->debit_account != acc) { @@ -1524,6 +2011,21 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h, if (0 < ha.delta) pos = pos->next_out; } + if ( (0 == json_array_size (history)) && + (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) && + (0 < ha.delta)) + { + *con_cls = &special_ptr; + start_lp (h, + connection, + acc, + ha.lp_timeout, + LP_DEBIT); + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + json_decref (history); + return MHD_YES; + } GNUNET_assert (0 == pthread_mutex_unlock (&h->big_lock)); GNUNET_free (debit_payto); @@ -1546,22 +2048,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h, static MHD_RESULT handle_credit_history (struct TALER_FAKEBANK_Handle *h, struct MHD_Connection *connection, - const char *account) + const char *account, + void **con_cls) { struct HistoryArgs ha; struct Account *acc; const struct Transaction *pos; json_t *history; char *credit_payto; + enum GNUNET_GenericReturnValue ret; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Handling /history/incoming connection %p\n", + connection); if (GNUNET_OK != - parse_history_common_args (h, - connection, - &ha)) + (ret = parse_history_common_args (h, + connection, + &ha))) { - GNUNET_break (0); - return MHD_NO; + return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES; } + if (&special_ptr == *con_cls) + ha.lp_timeout = GNUNET_TIME_UNIT_ZERO; + *con_cls = &special_ptr; acc = lookup_account (h, account); history = json_array (); @@ -1601,15 +2110,28 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h, if ( (NULL == t) || overflow) { + GNUNET_free (credit_payto); + if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) && + (0 < ha.delta)) + { + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + return TALER_MHD_REPLY_JSON_PACK (connection, + MHD_HTTP_OK, + GNUNET_JSON_pack_array_steal ( + "incoming_transactions", + history)); + } + *con_cls = &special_ptr; + start_lp (h, + connection, + acc, + ha.lp_timeout, + LP_CREDIT); GNUNET_assert (0 == pthread_mutex_unlock (&h->big_lock)); - GNUNET_free (credit_payto); - /* FIXME: suspend for long-polling instead */ - return TALER_MHD_REPLY_JSON_PACK (connection, - MHD_HTTP_OK, - GNUNET_JSON_pack_array_steal ( - "incoming_transactions", - history)); + json_decref (history); + return MHD_YES; } if (skip) { @@ -1681,6 +2203,21 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h, if (0 < ha.delta) pos = pos->next_in; } + if ( (0 == json_array_size (history)) && + (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) && + (0 < ha.delta)) + { + *con_cls = &special_ptr; + start_lp (h, + connection, + acc, + ha.lp_timeout, + LP_CREDIT); + GNUNET_assert (0 == + pthread_mutex_unlock (&h->big_lock)); + json_decref (history); + return MHD_YES; + } GNUNET_assert (0 == pthread_mutex_unlock (&h->big_lock)); GNUNET_free (credit_payto); @@ -1702,7 +2239,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h, * @param account which account should process the request * @param upload_data request data * @param upload_data_size size of @a upload_data in bytes - * @param con_cls closure for request (a `struct Buffer *`) + * @param con_cls closure * @return MHD result code */ static MHD_RESULT @@ -1727,18 +2264,19 @@ serve (struct TALER_FAKEBANK_Handle *h, (NULL != account) ) return handle_credit_history (h, connection, - account); + account, + con_cls); if ( (0 == strcmp (url, "/history/outgoing")) && (NULL != account) ) return handle_debit_history (h, connection, - account); + account, + con_cls); if (0 == strcmp (url, "/")) return handle_home_page (h, - connection, - con_cls); + connection); } else if (0 == strcasecmp (method, MHD_HTTP_METHOD_POST)) @@ -1762,12 +2300,15 @@ serve (struct TALER_FAKEBANK_Handle *h, con_cls); } /* Unexpected URL path, just close the connection. */ - /* We're rather impolite here, but it's a testcase. */ TALER_LOG_ERROR ("Breaking URL: %s %s\n", method, url); GNUNET_break_op (0); - return MHD_NO; + return TALER_MHD_reply_with_error ( + connection, + MHD_HTTP_NOT_FOUND, + TALER_EC_GENERIC_ENDPOINT_UNKNOWN, + url); } @@ -1781,7 +2322,7 @@ serve (struct TALER_FAKEBANK_Handle *h, * @param version HTTP version (ignored) * @param upload_data request data * @param upload_data_size size of @a upload_data in bytes - * @param con_cls closure for request (a `struct Buffer *`) + * @param con_cls closure for request * @return MHD result code */ static MHD_RESULT @@ -1823,15 +2364,6 @@ handle_mhd_request (void *cls, } -/** - * Task run whenever HTTP server operations are pending. - * - * @param cls the `struct TALER_FAKEBANK_Handle` - */ -static void -run_mhd (void *cls); - - #if EPOLL_SUPPORT /** * Schedule MHD. This function should be called initially when an @@ -1982,6 +2514,7 @@ TALER_FAKEBANK_start2 (uint16_t port, } GNUNET_assert (strlen (currency) < TALER_CURRENCY_LEN); h = GNUNET_new (struct TALER_FAKEBANK_Handle); + h->lp_event = -1; h->port = port; h->ram_limit = ram_limit; h->serial_counter = 0; @@ -2027,6 +2560,7 @@ TALER_FAKEBANK_start2 (uint16_t port, TALER_FAKEBANK_stop (h); return NULL; } + h->lp_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); h->currency = GNUNET_strdup (currency); GNUNET_asprintf (&h->my_baseurl, "http://localhost:%u/", @@ -2061,6 +2595,28 @@ TALER_FAKEBANK_start2 (uint16_t port, } else { + h->lp_event = eventfd (0, + EFD_CLOEXEC); + if (-1 == h->lp_event) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "eventfd"); + TALER_FAKEBANK_stop (h); + return NULL; + } + if (0 != + pthread_create (&h->lp_thread, + NULL, + &lp_expiration_thread, + h)) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "pthread_create"); + GNUNET_break (0 == close (h->lp_event)); + h->lp_event = -1; + TALER_FAKEBANK_stop (h); + return NULL; + } h->mhd_bank = MHD_start_daemon (MHD_USE_DEBUG | MHD_USE_AUTO_INTERNAL_THREAD | MHD_ALLOW_SUSPEND_RESUME -- cgit v1.2.3