diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-08-22 00:12:18 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-08-22 00:12:18 +0200 |
commit | 10f9272e45ea97d1b8f8059c9d285049ff4b606d (patch) | |
tree | dbb087c7ce3c4681d75252d79b6170460a2e2ec4 | |
parent | 9ad3469f07cfd944c2012a44851cdabf46703e22 (diff) | |
download | exchange-10f9272e45ea97d1b8f8059c9d285049ff4b606d.tar.gz exchange-10f9272e45ea97d1b8f8059c9d285049ff4b606d.zip |
-implement long polling support on reserve status (but not yet in C client library)
m--------- | contrib/gana | 0 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd.c | 26 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd_reserves_get.c | 176 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd_reserves_get.h | 9 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 102 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 31 |
6 files changed, 312 insertions, 32 deletions
diff --git a/contrib/gana b/contrib/gana | |||
Subproject 2e967c48b395a3edb85982e2e349cb82e76dcb2 | Subproject efb2a1fd64e17159c56ff3674083837b5a657a6 | ||
diff --git a/src/exchange/taler-exchange-httpd.c b/src/exchange/taler-exchange-httpd.c index 80649c0bc..c06695e4d 100644 --- a/src/exchange/taler-exchange-httpd.c +++ b/src/exchange/taler-exchange-httpd.c | |||
@@ -1430,8 +1430,14 @@ run_single_request (void) | |||
1430 | } | 1430 | } |
1431 | MHD_run (mhd); | 1431 | MHD_run (mhd); |
1432 | } | 1432 | } |
1433 | TEH_resume_keys_requests (true); | 1433 | { |
1434 | MHD_stop_daemon (mhd); | 1434 | MHD_socket sock = MHD_quiesce_daemon (mhd); |
1435 | |||
1436 | TEH_resume_keys_requests (true); | ||
1437 | TEH_reserves_get_cleanup (); | ||
1438 | MHD_stop_daemon (mhd); | ||
1439 | GNUNET_break (0 == close (sock)); | ||
1440 | } | ||
1435 | mhd = NULL; | 1441 | mhd = NULL; |
1436 | if (cld != waitpid (cld, | 1442 | if (cld != waitpid (cld, |
1437 | &status, | 1443 | &status, |
@@ -1494,8 +1500,15 @@ run_main_loop (int fh, | |||
1494 | { | 1500 | { |
1495 | case GNUNET_OK: | 1501 | case GNUNET_OK: |
1496 | case GNUNET_SYSERR: | 1502 | case GNUNET_SYSERR: |
1497 | TEH_resume_keys_requests (true); | 1503 | { |
1498 | MHD_stop_daemon (mhd); | 1504 | MHD_socket sock = MHD_quiesce_daemon (mhd); |
1505 | |||
1506 | TEH_resume_keys_requests (true); | ||
1507 | TEH_reserves_get_cleanup (); | ||
1508 | MHD_stop_daemon (mhd); | ||
1509 | GNUNET_break (0 == close (sock)); | ||
1510 | } | ||
1511 | mhd = NULL; | ||
1499 | break; | 1512 | break; |
1500 | case GNUNET_NO: | 1513 | case GNUNET_NO: |
1501 | { | 1514 | { |
@@ -1507,7 +1520,9 @@ run_main_loop (int fh, | |||
1507 | flags = fcntl (sock, F_GETFD); | 1520 | flags = fcntl (sock, F_GETFD); |
1508 | GNUNET_assert (-1 != flags); | 1521 | GNUNET_assert (-1 != flags); |
1509 | flags &= ~FD_CLOEXEC; | 1522 | flags &= ~FD_CLOEXEC; |
1510 | GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags)); | 1523 | GNUNET_assert (-1 != fcntl (sock, |
1524 | F_SETFD, | ||
1525 | flags)); | ||
1511 | chld = fork (); | 1526 | chld = fork (); |
1512 | if (-1 == chld) | 1527 | if (-1 == chld) |
1513 | { | 1528 | { |
@@ -1551,6 +1566,7 @@ run_main_loop (int fh, | |||
1551 | sleep (1); | 1566 | sleep (1); |
1552 | /* Now we're really done, practice clean shutdown */ | 1567 | /* Now we're really done, practice clean shutdown */ |
1553 | TEH_resume_keys_requests (true); | 1568 | TEH_resume_keys_requests (true); |
1569 | TEH_reserves_get_cleanup (); | ||
1554 | MHD_stop_daemon (mhd); | 1570 | MHD_stop_daemon (mhd); |
1555 | } | 1571 | } |
1556 | break; | 1572 | break; |
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.c b/src/exchange/taler-exchange-httpd_reserves_get.c index d08543a4e..6501f600a 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.c +++ b/src/exchange/taler-exchange-httpd_reserves_get.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of TALER | 2 | This file is part of TALER |
3 | Copyright (C) 2014-2020 Taler Systems SA | 3 | Copyright (C) 2014-2021 Taler Systems SA |
4 | 4 | ||
5 | TALER is free software; you can redistribute it and/or modify it under the | 5 | TALER is free software; you can redistribute it and/or modify it under the |
6 | terms of the GNU Affero General Public License as published by the Free Software | 6 | terms of the GNU Affero General Public License as published by the Free Software |
@@ -25,6 +25,7 @@ | |||
25 | #include <jansson.h> | 25 | #include <jansson.h> |
26 | #include "taler_mhd_lib.h" | 26 | #include "taler_mhd_lib.h" |
27 | #include "taler_json_lib.h" | 27 | #include "taler_json_lib.h" |
28 | #include "taler_dbevents.h" | ||
28 | #include "taler-exchange-httpd_reserves_get.h" | 29 | #include "taler-exchange-httpd_reserves_get.h" |
29 | #include "taler-exchange-httpd_responses.h" | 30 | #include "taler-exchange-httpd_responses.h" |
30 | 31 | ||
@@ -50,25 +51,113 @@ struct ReservePoller | |||
50 | struct MHD_Connection *connection; | 51 | struct MHD_Connection *connection; |
51 | 52 | ||
52 | /** | 53 | /** |
53 | * Entry in the timeout heap. | ||
54 | */ | ||
55 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
56 | |||
57 | /** | ||
58 | * Subscription for the database event we are | 54 | * Subscription for the database event we are |
59 | * waiting for. | 55 | * waiting for. |
60 | */ | 56 | */ |
61 | struct GNUNET_DB_EventHandler *eh; | 57 | struct TALER_EXCHANGEDB_EventHandler *eh; |
62 | 58 | ||
63 | /** | 59 | /** |
64 | * When will this request time out? | 60 | * When will this request time out? |
65 | */ | 61 | */ |
66 | struct GNUNET_TIME_Absolute timeout; | 62 | struct GNUNET_TIME_Absolute timeout; |
67 | 63 | ||
64 | /** | ||
65 | * True if we are still suspended. | ||
66 | */ | ||
67 | bool suspended; | ||
68 | |||
68 | }; | 69 | }; |
69 | 70 | ||
70 | 71 | ||
71 | /** | 72 | /** |
73 | * Head of list of requests in long polling. | ||
74 | */ | ||
75 | static struct ReservePoller *rp_head; | ||
76 | |||
77 | /** | ||
78 | * Tail of list of requests in long polling. | ||
79 | */ | ||
80 | static struct ReservePoller *rp_tail; | ||
81 | |||
82 | |||
83 | void | ||
84 | TEH_reserves_get_cleanup () | ||
85 | { | ||
86 | struct ReservePoller *rp; | ||
87 | |||
88 | while (NULL != (rp = rp_head)) | ||
89 | { | ||
90 | GNUNET_CONTAINER_DLL_remove (rp_head, | ||
91 | rp_tail, | ||
92 | rp); | ||
93 | if (rp->suspended) | ||
94 | { | ||
95 | rp->suspended = false; | ||
96 | MHD_resume_connection (rp->connection); | ||
97 | } | ||
98 | } | ||
99 | } | ||
100 | |||
101 | |||
102 | /** | ||
103 | * Function called once a connection is done to | ||
104 | * clean up the `struct ReservePoller` state. | ||
105 | * | ||
106 | * @param rc context to clean up for | ||
107 | */ | ||
108 | static void | ||
109 | rp_cleanup (struct TEH_RequestContext *rc) | ||
110 | { | ||
111 | struct ReservePoller *rp = rc->rh_ctx; | ||
112 | |||
113 | if (NULL != rp->eh) | ||
114 | { | ||
115 | TEH_plugin->event_listen_cancel (TEH_plugin->cls, | ||
116 | rp->eh); | ||
117 | rp->eh = NULL; | ||
118 | } | ||
119 | GNUNET_free (rp); | ||
120 | } | ||
121 | |||
122 | |||
123 | /** | ||
124 | * Function called on events received from Postgres. | ||
125 | * Wakes up long pollers. | ||
126 | * | ||
127 | * @param cls the `struct TEH_RequestContext *` | ||
128 | * @param extra additional event data provided | ||
129 | * @param extra_size number of bytes in @a extra | ||
130 | */ | ||
131 | static void | ||
132 | db_event_cb (void *cls, | ||
133 | const void *extra, | ||
134 | size_t extra_size) | ||
135 | { | ||
136 | struct TEH_RequestContext *rc = cls; | ||
137 | struct ReservePoller *rp = rc->rh_ctx; | ||
138 | struct GNUNET_AsyncScopeSave old_scope; | ||
139 | |||
140 | (void) extra; | ||
141 | (void) extra_size; | ||
142 | if (NULL == rp) | ||
143 | return; /* event triggered while main transaction | ||
144 | was still running */ | ||
145 | if (! rp->suspended) | ||
146 | return; /* might get multiple wake-up events */ | ||
147 | rp->suspended = false; | ||
148 | GNUNET_async_scope_enter (&rc->async_scope_id, | ||
149 | &old_scope); | ||
150 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
151 | "Resuming from long-polling on reserve\n"); | ||
152 | GNUNET_CONTAINER_DLL_remove (rp_head, | ||
153 | rp_tail, | ||
154 | rp); | ||
155 | MHD_resume_connection (rp->connection); | ||
156 | GNUNET_async_scope_restore (&old_scope); | ||
157 | } | ||
158 | |||
159 | |||
160 | /** | ||
72 | * Send reserve history to client. | 161 | * Send reserve history to client. |
73 | * | 162 | * |
74 | * @param connection connection to the client | 163 | * @param connection connection to the client |
@@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, | |||
157 | { | 246 | { |
158 | struct ReserveHistoryContext rsc; | 247 | struct ReserveHistoryContext rsc; |
159 | MHD_RESULT mhd_ret; | 248 | MHD_RESULT mhd_ret; |
249 | struct GNUNET_TIME_Relative timeout; | ||
250 | struct TALER_EXCHANGEDB_EventHandler *eh = NULL; | ||
160 | 251 | ||
161 | if (GNUNET_OK != | 252 | if (GNUNET_OK != |
162 | GNUNET_STRINGS_string_to_data (args[0], | 253 | GNUNET_STRINGS_string_to_data (args[0], |
@@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, | |||
170 | TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED, | 261 | TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED, |
171 | args[0]); | 262 | args[0]); |
172 | } | 263 | } |
264 | { | ||
265 | const char *long_poll_timeout_ms; | ||
266 | |||
267 | long_poll_timeout_ms | ||
268 | = MHD_lookup_connection_value (rc->connection, | ||
269 | MHD_GET_ARGUMENT_KIND, | ||
270 | "timeout_ms"); | ||
271 | if (NULL != long_poll_timeout_ms) | ||
272 | { | ||
273 | unsigned int timeout_ms; | ||
274 | char dummy; | ||
275 | |||
276 | if (1 != sscanf (long_poll_timeout_ms, | ||
277 | "%u%c", | ||
278 | &timeout_ms, | ||
279 | &dummy)) | ||
280 | { | ||
281 | GNUNET_break_op (0); | ||
282 | return TALER_MHD_reply_with_error (rc->connection, | ||
283 | MHD_HTTP_BAD_REQUEST, | ||
284 | TALER_EC_GENERIC_PARAMETER_MALFORMED, | ||
285 | "timeout_ms (must be non-negative number)"); | ||
286 | } | ||
287 | timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
288 | timeout_ms); | ||
289 | } | ||
290 | } | ||
291 | if (! GNUNET_TIME_relative_is_zero (timeout)) | ||
292 | { | ||
293 | struct TALER_ReserveEventP rep = { | ||
294 | .header.size = htons (sizeof (rep)), | ||
295 | .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), | ||
296 | .reserve_pub = rsc.reserve_pub | ||
297 | }; | ||
298 | |||
299 | eh = TEH_plugin->event_listen (TEH_plugin->cls, | ||
300 | timeout, | ||
301 | &rep.header, | ||
302 | &db_event_cb, | ||
303 | rc); | ||
304 | } | ||
173 | rsc.rh = NULL; | 305 | rsc.rh = NULL; |
174 | if (GNUNET_OK != | 306 | if (GNUNET_OK != |
175 | TEH_DB_run_transaction (rc->connection, | 307 | TEH_DB_run_transaction (rc->connection, |
@@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, | |||
178 | &reserve_history_transaction, | 310 | &reserve_history_transaction, |
179 | &rsc)) | 311 | &rsc)) |
180 | return mhd_ret; | 312 | return mhd_ret; |
181 | |||
182 | /* generate proper response */ | 313 | /* generate proper response */ |
183 | if (NULL == rsc.rh) | 314 | if (NULL == rsc.rh) |
184 | return TALER_MHD_reply_with_error (rc->connection, | 315 | { |
185 | MHD_HTTP_NOT_FOUND, | 316 | struct ReservePoller *rp = rc->rh_ctx; |
186 | TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, | 317 | |
187 | args[0]); | 318 | if ( (NULL != rp) || |
319 | (GNUNET_TIME_relative_is_zero (timeout)) ) | ||
320 | { | ||
321 | return TALER_MHD_reply_with_error (rc->connection, | ||
322 | MHD_HTTP_NOT_FOUND, | ||
323 | TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, | ||
324 | args[0]); | ||
325 | } | ||
326 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
327 | "Long-polling on reserve for %s\n", | ||
328 | GNUNET_STRINGS_relative_time_to_string (timeout, | ||
329 | GNUNET_YES)); | ||
330 | rp = GNUNET_new (struct ReservePoller); | ||
331 | rp->connection = rc->connection; | ||
332 | rp->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
333 | rp->eh = eh; | ||
334 | rc->rh_ctx = rp; | ||
335 | rc->rh_cleaner = &rp_cleanup; | ||
336 | rp->suspended = true; | ||
337 | MHD_suspend_connection (rc->connection); | ||
338 | return MHD_YES; | ||
339 | } | ||
188 | mhd_ret = reply_reserve_history_success (rc->connection, | 340 | mhd_ret = reply_reserve_history_success (rc->connection, |
189 | rsc.rh); | 341 | rsc.rh); |
190 | TEH_plugin->free_reserve_history (TEH_plugin->cls, | 342 | TEH_plugin->free_reserve_history (TEH_plugin->cls, |
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.h b/src/exchange/taler-exchange-httpd_reserves_get.h index 1eb9ab60e..30c6559f6 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.h +++ b/src/exchange/taler-exchange-httpd_reserves_get.h | |||
@@ -28,6 +28,15 @@ | |||
28 | 28 | ||
29 | 29 | ||
30 | /** | 30 | /** |
31 | * Shutdown reserves-get subsystem. Resumes all | ||
32 | * suspended long-polling clients and cleans up | ||
33 | * data structures. | ||
34 | */ | ||
35 | void | ||
36 | TEH_reserves_get_cleanup (void); | ||
37 | |||
38 | |||
39 | /** | ||
31 | * Handle a GET "/reserves/" request. Parses the | 40 | * Handle a GET "/reserves/" request. Parses the |
32 | * given "reserve_pub" in @a args (which should contain the | 41 | * given "reserve_pub" in @a args (which should contain the |
33 | * EdDSA public key of a reserve) and then respond with the | 42 | * EdDSA public key of a reserve) and then respond with the |
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1d05fb499..2d7ca0573 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c | |||
@@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session | |||
110 | 110 | ||
111 | 111 | ||
112 | /** | 112 | /** |
113 | * Event registration record. | ||
114 | */ | ||
115 | struct TALER_EXCHANGEDB_EventHandler | ||
116 | { | ||
117 | /** | ||
118 | * Underlying GNUnet event handler. | ||
119 | */ | ||
120 | struct GNUNET_DB_EventHandler *geh; | ||
121 | |||
122 | /** | ||
123 | * Entry in the heap. | ||
124 | */ | ||
125 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
126 | |||
127 | /** | ||
128 | * Our timeout. | ||
129 | */ | ||
130 | struct GNUNET_TIME_Absolute timeout; | ||
131 | |||
132 | /** | ||
133 | * Callback to invoke (on @e timeout). | ||
134 | */ | ||
135 | GNUNET_DB_EventCallback cb; | ||
136 | |||
137 | /** | ||
138 | * Closure for @e cb. | ||
139 | */ | ||
140 | void *cb_cls; | ||
141 | |||
142 | }; | ||
143 | |||
144 | |||
145 | /** | ||
113 | * Type of the "cls" argument given to each of the functions in | 146 | * Type of the "cls" argument given to each of the functions in |
114 | * our API. | 147 | * our API. |
115 | */ | 148 | */ |
@@ -133,6 +166,12 @@ struct PostgresClosure | |||
133 | char *sql_dir; | 166 | char *sql_dir; |
134 | 167 | ||
135 | /** | 168 | /** |
169 | * Heap of `struct TALER_EXCHANGEDB_EventHandler` | ||
170 | * by timeout. | ||
171 | */ | ||
172 | struct GNUNET_CONTAINER_Heap *event_heap; | ||
173 | |||
174 | /** | ||
136 | * After how long should idle reserves be closed? | 175 | * After how long should idle reserves be closed? |
137 | */ | 176 | */ |
138 | struct GNUNET_TIME_Relative idle_reserve_expiration_time; | 177 | struct GNUNET_TIME_Relative idle_reserve_expiration_time; |
@@ -2832,18 +2871,41 @@ handle_events (void *cls) | |||
2832 | } | 2871 | } |
2833 | }; | 2872 | }; |
2834 | nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; | 2873 | nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; |
2874 | struct TALER_EXCHANGEDB_EventHandler *r; | ||
2835 | 2875 | ||
2836 | GNUNET_assert (0 == | 2876 | GNUNET_assert (0 == |
2837 | pthread_mutex_lock (&pg->event_lock)); | 2877 | pthread_mutex_lock (&pg->event_lock)); |
2838 | while (0 != pg->listener_count) | 2878 | while (0 != pg->listener_count) |
2839 | { | 2879 | { |
2840 | int ret; | 2880 | int ret; |
2881 | int timeout = -1; /* no timeout */ | ||
2841 | 2882 | ||
2842 | GNUNET_assert (0 == | 2883 | GNUNET_assert (0 == |
2843 | pthread_mutex_unlock (&pg->event_lock)); | 2884 | pthread_mutex_unlock (&pg->event_lock)); |
2885 | while (1) | ||
2886 | { | ||
2887 | r = GNUNET_CONTAINER_heap_peek (pg->event_heap); | ||
2888 | if (NULL == r) | ||
2889 | break; | ||
2890 | if (GNUNET_TIME_absolute_is_future (r->timeout)) | ||
2891 | break; | ||
2892 | GNUNET_assert (r == | ||
2893 | GNUNET_CONTAINER_heap_remove_root (pg->event_heap)); | ||
2894 | r->hn = NULL; | ||
2895 | r->cb (r->cb_cls, | ||
2896 | NULL, | ||
2897 | 0); | ||
2898 | } | ||
2899 | if (NULL != r) | ||
2900 | { | ||
2901 | struct GNUNET_TIME_Relative rem; | ||
2902 | |||
2903 | rem = GNUNET_TIME_absolute_get_remaining (r->timeout); | ||
2904 | timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us; | ||
2905 | } | ||
2844 | ret = poll (pfds, | 2906 | ret = poll (pfds, |
2845 | nfds, | 2907 | nfds, |
2846 | -1 /* no timeout */); | 2908 | timeout); |
2847 | if (-1 == ret) | 2909 | if (-1 == ret) |
2848 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | 2910 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, |
2849 | "poll"); | 2911 | "poll"); |
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls, | |||
2909 | * @param cb_cls closure for @a cb | 2971 | * @param cb_cls closure for @a cb |
2910 | * @return handle useful to cancel the listener | 2972 | * @return handle useful to cancel the listener |
2911 | */ | 2973 | */ |
2912 | static struct GNUNET_DB_EventHandler * | 2974 | static struct TALER_EXCHANGEDB_EventHandler * |
2913 | postgres_event_listen (void *cls, | 2975 | postgres_event_listen (void *cls, |
2914 | struct TALER_EXCHANGEDB_Session *session, | 2976 | struct GNUNET_TIME_Relative timeout, |
2915 | const struct GNUNET_DB_EventHeaderP *es, | 2977 | const struct GNUNET_DB_EventHeaderP *es, |
2916 | GNUNET_DB_EventCallback cb, | 2978 | GNUNET_DB_EventCallback cb, |
2917 | void *cb_cls) | 2979 | void *cb_cls) |
2918 | { | 2980 | { |
2919 | struct PostgresClosure *pg = cls; | 2981 | struct PostgresClosure *pg = cls; |
2920 | struct GNUNET_DB_EventHandler *eh; | 2982 | struct TALER_EXCHANGEDB_EventHandler *eh; |
2983 | struct TALER_EXCHANGEDB_Session *session; | ||
2921 | 2984 | ||
2985 | session = postgres_get_session (pg); | ||
2986 | eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler); | ||
2987 | eh->cb = cb; | ||
2988 | eh->cb_cls = cb_cls; | ||
2989 | eh->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
2990 | eh->geh = GNUNET_PQ_event_listen (session->conn, | ||
2991 | es, | ||
2992 | cb, | ||
2993 | cb_cls); | ||
2994 | GNUNET_assert (NULL != eh->geh); | ||
2995 | eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap, | ||
2996 | eh, | ||
2997 | eh->timeout.abs_value_us); | ||
2922 | GNUNET_assert (0 == | 2998 | GNUNET_assert (0 == |
2923 | pthread_mutex_lock (&pg->event_lock)); | 2999 | pthread_mutex_lock (&pg->event_lock)); |
2924 | pg->listener_count++; | 3000 | pg->listener_count++; |
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls, | |||
2932 | } | 3008 | } |
2933 | GNUNET_assert (0 == | 3009 | GNUNET_assert (0 == |
2934 | pthread_mutex_unlock (&pg->event_lock)); | 3010 | pthread_mutex_unlock (&pg->event_lock)); |
2935 | eh = GNUNET_PQ_event_listen (session->conn, | ||
2936 | es, | ||
2937 | cb, | ||
2938 | cb_cls); | ||
2939 | GNUNET_assert (NULL != eh); | ||
2940 | return eh; | 3011 | return eh; |
2941 | } | 3012 | } |
2942 | 3013 | ||
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls, | |||
2949 | */ | 3020 | */ |
2950 | static void | 3021 | static void |
2951 | postgres_event_listen_cancel (void *cls, | 3022 | postgres_event_listen_cancel (void *cls, |
2952 | struct GNUNET_DB_EventHandler *eh) | 3023 | struct TALER_EXCHANGEDB_EventHandler *eh) |
2953 | { | 3024 | { |
2954 | struct PostgresClosure *pg = cls; | 3025 | struct PostgresClosure *pg = cls; |
2955 | 3026 | ||
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls, | |||
2971 | } | 3042 | } |
2972 | GNUNET_assert (0 == | 3043 | GNUNET_assert (0 == |
2973 | pthread_mutex_unlock (&pg->event_lock)); | 3044 | pthread_mutex_unlock (&pg->event_lock)); |
2974 | GNUNET_PQ_event_listen_cancel (eh); | 3045 | if (NULL != eh->hn) |
3046 | { | ||
3047 | GNUNET_CONTAINER_heap_remove_node (eh->hn); | ||
3048 | eh->hn = NULL; | ||
3049 | } | ||
3050 | GNUNET_PQ_event_listen_cancel (eh->geh); | ||
3051 | GNUNET_free (eh); | ||
2975 | } | 3052 | } |
2976 | 3053 | ||
2977 | 3054 | ||
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) | |||
10917 | 10994 | ||
10918 | pg = GNUNET_new (struct PostgresClosure); | 10995 | pg = GNUNET_new (struct PostgresClosure); |
10919 | pg->cfg = cfg; | 10996 | pg->cfg = cfg; |
10997 | pg->event_heap = GNUNET_CONTAINER_heap_create ( | ||
10998 | GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
10920 | pg->main_self = pthread_self (); /* loaded while single-threaded! */ | 10999 | pg->main_self = pthread_self (); /* loaded while single-threaded! */ |
10921 | if (GNUNET_OK != | 11000 | if (GNUNET_OK != |
10922 | GNUNET_CONFIGURATION_get_value_filename (cfg, | 11001 | GNUNET_CONFIGURATION_get_value_filename (cfg, |
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls) | |||
11166 | GNUNET_break (0 == | 11245 | GNUNET_break (0 == |
11167 | close (pg->event_fd)); | 11246 | close (pg->event_fd)); |
11168 | pthread_mutex_destroy (&pg->event_lock); | 11247 | pthread_mutex_destroy (&pg->event_lock); |
11248 | GNUNET_CONTAINER_heap_destroy (pg->event_heap); | ||
11169 | GNUNET_free (pg->sql_dir); | 11249 | GNUNET_free (pg->sql_dir); |
11170 | GNUNET_free (pg->currency); | 11250 | GNUNET_free (pg->currency); |
11171 | GNUNET_free (pg); | 11251 | GNUNET_free (pg); |
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 61c764a53..4cf6514f3 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h | |||
@@ -73,9 +73,32 @@ struct TALER_EXCHANGEDB_DenominationKeyInformationP | |||
73 | }; | 73 | }; |
74 | 74 | ||
75 | 75 | ||
76 | /** | ||
77 | * Signature of events signalling a reseve got funding. | ||
78 | */ | ||
79 | struct TALER_ReserveEventP | ||
80 | { | ||
81 | /** | ||
82 | * Of type #TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING. | ||
83 | */ | ||
84 | struct GNUNET_DB_EventHeaderP header; | ||
85 | |||
86 | /** | ||
87 | * Public key of the reserve the event is about. | ||
88 | */ | ||
89 | struct TALER_ReservePublicKeyP reserve_pub; | ||
90 | }; | ||
91 | |||
92 | |||
76 | GNUNET_NETWORK_STRUCT_END | 93 | GNUNET_NETWORK_STRUCT_END |
77 | 94 | ||
78 | /** | 95 | /** |
96 | * Event registration record. | ||
97 | */ | ||
98 | struct TALER_EXCHANGEDB_EventHandler; | ||
99 | |||
100 | |||
101 | /** | ||
79 | * Meta data about an exchange online signing key. | 102 | * Meta data about an exchange online signing key. |
80 | */ | 103 | */ |
81 | struct TALER_EXCHANGEDB_SignkeyMetaData | 104 | struct TALER_EXCHANGEDB_SignkeyMetaData |
@@ -2149,16 +2172,16 @@ struct TALER_EXCHANGEDB_Plugin | |||
2149 | * Register callback to be invoked on events of type @a es. | 2172 | * Register callback to be invoked on events of type @a es. |
2150 | * | 2173 | * |
2151 | * @param cls database context to use | 2174 | * @param cls database context to use |
2152 | * @param session connection to use | 2175 | * @param timeout how long to wait at most |
2153 | * @param es specification of the event to listen for | 2176 | * @param es specification of the event to listen for |
2154 | * @param cb function to call when the event happens, possibly | 2177 | * @param cb function to call when the event happens, possibly |
2155 | * multiple times (until cancel is invoked) | 2178 | * multiple times (until cancel is invoked) |
2156 | * @param cb_cls closure for @a cb | 2179 | * @param cb_cls closure for @a cb |
2157 | * @return handle useful to cancel the listener | 2180 | * @return handle useful to cancel the listener |
2158 | */ | 2181 | */ |
2159 | struct GNUNET_DB_EventHandler * | 2182 | struct TALER_EXCHANGEDB_EventHandler * |
2160 | (*event_listen)(void *cls, | 2183 | (*event_listen)(void *cls, |
2161 | struct TALER_EXCHANGEDB_Session *session, | 2184 | struct GNUNET_TIME_Relative timeout, |
2162 | const struct GNUNET_DB_EventHeaderP *es, | 2185 | const struct GNUNET_DB_EventHeaderP *es, |
2163 | GNUNET_DB_EventCallback cb, | 2186 | GNUNET_DB_EventCallback cb, |
2164 | void *cb_cls); | 2187 | void *cb_cls); |
@@ -2171,7 +2194,7 @@ struct TALER_EXCHANGEDB_Plugin | |||
2171 | */ | 2194 | */ |
2172 | void | 2195 | void |
2173 | (*event_listen_cancel)(void *cls, | 2196 | (*event_listen_cancel)(void *cls, |
2174 | struct GNUNET_DB_EventHandler *eh); | 2197 | struct TALER_EXCHANGEDB_EventHandler *eh); |
2175 | 2198 | ||
2176 | 2199 | ||
2177 | /** | 2200 | /** |