diff options
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 102 |
1 files changed, 91 insertions, 11 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1d05fb499..2d7ca0573 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c | |||
@@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session | |||
110 | 110 | ||
111 | 111 | ||
112 | /** | 112 | /** |
113 | * Event registration record. | ||
114 | */ | ||
115 | struct TALER_EXCHANGEDB_EventHandler | ||
116 | { | ||
117 | /** | ||
118 | * Underlying GNUnet event handler. | ||
119 | */ | ||
120 | struct GNUNET_DB_EventHandler *geh; | ||
121 | |||
122 | /** | ||
123 | * Entry in the heap. | ||
124 | */ | ||
125 | struct GNUNET_CONTAINER_HeapNode *hn; | ||
126 | |||
127 | /** | ||
128 | * Our timeout. | ||
129 | */ | ||
130 | struct GNUNET_TIME_Absolute timeout; | ||
131 | |||
132 | /** | ||
133 | * Callback to invoke (on @e timeout). | ||
134 | */ | ||
135 | GNUNET_DB_EventCallback cb; | ||
136 | |||
137 | /** | ||
138 | * Closure for @e cb. | ||
139 | */ | ||
140 | void *cb_cls; | ||
141 | |||
142 | }; | ||
143 | |||
144 | |||
145 | /** | ||
113 | * Type of the "cls" argument given to each of the functions in | 146 | * Type of the "cls" argument given to each of the functions in |
114 | * our API. | 147 | * our API. |
115 | */ | 148 | */ |
@@ -133,6 +166,12 @@ struct PostgresClosure | |||
133 | char *sql_dir; | 166 | char *sql_dir; |
134 | 167 | ||
135 | /** | 168 | /** |
169 | * Heap of `struct TALER_EXCHANGEDB_EventHandler` | ||
170 | * by timeout. | ||
171 | */ | ||
172 | struct GNUNET_CONTAINER_Heap *event_heap; | ||
173 | |||
174 | /** | ||
136 | * After how long should idle reserves be closed? | 175 | * After how long should idle reserves be closed? |
137 | */ | 176 | */ |
138 | struct GNUNET_TIME_Relative idle_reserve_expiration_time; | 177 | struct GNUNET_TIME_Relative idle_reserve_expiration_time; |
@@ -2832,18 +2871,41 @@ handle_events (void *cls) | |||
2832 | } | 2871 | } |
2833 | }; | 2872 | }; |
2834 | nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; | 2873 | nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; |
2874 | struct TALER_EXCHANGEDB_EventHandler *r; | ||
2835 | 2875 | ||
2836 | GNUNET_assert (0 == | 2876 | GNUNET_assert (0 == |
2837 | pthread_mutex_lock (&pg->event_lock)); | 2877 | pthread_mutex_lock (&pg->event_lock)); |
2838 | while (0 != pg->listener_count) | 2878 | while (0 != pg->listener_count) |
2839 | { | 2879 | { |
2840 | int ret; | 2880 | int ret; |
2881 | int timeout = -1; /* no timeout */ | ||
2841 | 2882 | ||
2842 | GNUNET_assert (0 == | 2883 | GNUNET_assert (0 == |
2843 | pthread_mutex_unlock (&pg->event_lock)); | 2884 | pthread_mutex_unlock (&pg->event_lock)); |
2885 | while (1) | ||
2886 | { | ||
2887 | r = GNUNET_CONTAINER_heap_peek (pg->event_heap); | ||
2888 | if (NULL == r) | ||
2889 | break; | ||
2890 | if (GNUNET_TIME_absolute_is_future (r->timeout)) | ||
2891 | break; | ||
2892 | GNUNET_assert (r == | ||
2893 | GNUNET_CONTAINER_heap_remove_root (pg->event_heap)); | ||
2894 | r->hn = NULL; | ||
2895 | r->cb (r->cb_cls, | ||
2896 | NULL, | ||
2897 | 0); | ||
2898 | } | ||
2899 | if (NULL != r) | ||
2900 | { | ||
2901 | struct GNUNET_TIME_Relative rem; | ||
2902 | |||
2903 | rem = GNUNET_TIME_absolute_get_remaining (r->timeout); | ||
2904 | timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us; | ||
2905 | } | ||
2844 | ret = poll (pfds, | 2906 | ret = poll (pfds, |
2845 | nfds, | 2907 | nfds, |
2846 | -1 /* no timeout */); | 2908 | timeout); |
2847 | if (-1 == ret) | 2909 | if (-1 == ret) |
2848 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | 2910 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, |
2849 | "poll"); | 2911 | "poll"); |
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls, | |||
2909 | * @param cb_cls closure for @a cb | 2971 | * @param cb_cls closure for @a cb |
2910 | * @return handle useful to cancel the listener | 2972 | * @return handle useful to cancel the listener |
2911 | */ | 2973 | */ |
2912 | static struct GNUNET_DB_EventHandler * | 2974 | static struct TALER_EXCHANGEDB_EventHandler * |
2913 | postgres_event_listen (void *cls, | 2975 | postgres_event_listen (void *cls, |
2914 | struct TALER_EXCHANGEDB_Session *session, | 2976 | struct GNUNET_TIME_Relative timeout, |
2915 | const struct GNUNET_DB_EventHeaderP *es, | 2977 | const struct GNUNET_DB_EventHeaderP *es, |
2916 | GNUNET_DB_EventCallback cb, | 2978 | GNUNET_DB_EventCallback cb, |
2917 | void *cb_cls) | 2979 | void *cb_cls) |
2918 | { | 2980 | { |
2919 | struct PostgresClosure *pg = cls; | 2981 | struct PostgresClosure *pg = cls; |
2920 | struct GNUNET_DB_EventHandler *eh; | 2982 | struct TALER_EXCHANGEDB_EventHandler *eh; |
2983 | struct TALER_EXCHANGEDB_Session *session; | ||
2921 | 2984 | ||
2985 | session = postgres_get_session (pg); | ||
2986 | eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler); | ||
2987 | eh->cb = cb; | ||
2988 | eh->cb_cls = cb_cls; | ||
2989 | eh->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
2990 | eh->geh = GNUNET_PQ_event_listen (session->conn, | ||
2991 | es, | ||
2992 | cb, | ||
2993 | cb_cls); | ||
2994 | GNUNET_assert (NULL != eh->geh); | ||
2995 | eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap, | ||
2996 | eh, | ||
2997 | eh->timeout.abs_value_us); | ||
2922 | GNUNET_assert (0 == | 2998 | GNUNET_assert (0 == |
2923 | pthread_mutex_lock (&pg->event_lock)); | 2999 | pthread_mutex_lock (&pg->event_lock)); |
2924 | pg->listener_count++; | 3000 | pg->listener_count++; |
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls, | |||
2932 | } | 3008 | } |
2933 | GNUNET_assert (0 == | 3009 | GNUNET_assert (0 == |
2934 | pthread_mutex_unlock (&pg->event_lock)); | 3010 | pthread_mutex_unlock (&pg->event_lock)); |
2935 | eh = GNUNET_PQ_event_listen (session->conn, | ||
2936 | es, | ||
2937 | cb, | ||
2938 | cb_cls); | ||
2939 | GNUNET_assert (NULL != eh); | ||
2940 | return eh; | 3011 | return eh; |
2941 | } | 3012 | } |
2942 | 3013 | ||
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls, | |||
2949 | */ | 3020 | */ |
2950 | static void | 3021 | static void |
2951 | postgres_event_listen_cancel (void *cls, | 3022 | postgres_event_listen_cancel (void *cls, |
2952 | struct GNUNET_DB_EventHandler *eh) | 3023 | struct TALER_EXCHANGEDB_EventHandler *eh) |
2953 | { | 3024 | { |
2954 | struct PostgresClosure *pg = cls; | 3025 | struct PostgresClosure *pg = cls; |
2955 | 3026 | ||
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls, | |||
2971 | } | 3042 | } |
2972 | GNUNET_assert (0 == | 3043 | GNUNET_assert (0 == |
2973 | pthread_mutex_unlock (&pg->event_lock)); | 3044 | pthread_mutex_unlock (&pg->event_lock)); |
2974 | GNUNET_PQ_event_listen_cancel (eh); | 3045 | if (NULL != eh->hn) |
3046 | { | ||
3047 | GNUNET_CONTAINER_heap_remove_node (eh->hn); | ||
3048 | eh->hn = NULL; | ||
3049 | } | ||
3050 | GNUNET_PQ_event_listen_cancel (eh->geh); | ||
3051 | GNUNET_free (eh); | ||
2975 | } | 3052 | } |
2976 | 3053 | ||
2977 | 3054 | ||
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) | |||
10917 | 10994 | ||
10918 | pg = GNUNET_new (struct PostgresClosure); | 10995 | pg = GNUNET_new (struct PostgresClosure); |
10919 | pg->cfg = cfg; | 10996 | pg->cfg = cfg; |
10997 | pg->event_heap = GNUNET_CONTAINER_heap_create ( | ||
10998 | GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
10920 | pg->main_self = pthread_self (); /* loaded while single-threaded! */ | 10999 | pg->main_self = pthread_self (); /* loaded while single-threaded! */ |
10921 | if (GNUNET_OK != | 11000 | if (GNUNET_OK != |
10922 | GNUNET_CONFIGURATION_get_value_filename (cfg, | 11001 | GNUNET_CONFIGURATION_get_value_filename (cfg, |
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls) | |||
11166 | GNUNET_break (0 == | 11245 | GNUNET_break (0 == |
11167 | close (pg->event_fd)); | 11246 | close (pg->event_fd)); |
11168 | pthread_mutex_destroy (&pg->event_lock); | 11247 | pthread_mutex_destroy (&pg->event_lock); |
11248 | GNUNET_CONTAINER_heap_destroy (pg->event_heap); | ||
11169 | GNUNET_free (pg->sql_dir); | 11249 | GNUNET_free (pg->sql_dir); |
11170 | GNUNET_free (pg->currency); | 11250 | GNUNET_free (pg->currency); |
11171 | GNUNET_free (pg); | 11251 | GNUNET_free (pg); |