aboutsummaryrefslogtreecommitdiff
path: root/src/exchangedb/plugin_exchangedb_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c102
1 files changed, 91 insertions, 11 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 1d05fb499..2d7ca0573 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session
110 110
111 111
112/** 112/**
113 * Event registration record.
114 */
115struct TALER_EXCHANGEDB_EventHandler
116{
117 /**
118 * Underlying GNUnet event handler.
119 */
120 struct GNUNET_DB_EventHandler *geh;
121
122 /**
123 * Entry in the heap.
124 */
125 struct GNUNET_CONTAINER_HeapNode *hn;
126
127 /**
128 * Our timeout.
129 */
130 struct GNUNET_TIME_Absolute timeout;
131
132 /**
133 * Callback to invoke (on @e timeout).
134 */
135 GNUNET_DB_EventCallback cb;
136
137 /**
138 * Closure for @e cb.
139 */
140 void *cb_cls;
141
142};
143
144
145/**
113 * Type of the "cls" argument given to each of the functions in 146 * Type of the "cls" argument given to each of the functions in
114 * our API. 147 * our API.
115 */ 148 */
@@ -133,6 +166,12 @@ struct PostgresClosure
133 char *sql_dir; 166 char *sql_dir;
134 167
135 /** 168 /**
169 * Heap of `struct TALER_EXCHANGEDB_EventHandler`
170 * by timeout.
171 */
172 struct GNUNET_CONTAINER_Heap *event_heap;
173
174 /**
136 * After how long should idle reserves be closed? 175 * After how long should idle reserves be closed?
137 */ 176 */
138 struct GNUNET_TIME_Relative idle_reserve_expiration_time; 177 struct GNUNET_TIME_Relative idle_reserve_expiration_time;
@@ -2832,18 +2871,41 @@ handle_events (void *cls)
2832 } 2871 }
2833 }; 2872 };
2834 nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; 2873 nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
2874 struct TALER_EXCHANGEDB_EventHandler *r;
2835 2875
2836 GNUNET_assert (0 == 2876 GNUNET_assert (0 ==
2837 pthread_mutex_lock (&pg->event_lock)); 2877 pthread_mutex_lock (&pg->event_lock));
2838 while (0 != pg->listener_count) 2878 while (0 != pg->listener_count)
2839 { 2879 {
2840 int ret; 2880 int ret;
2881 int timeout = -1; /* no timeout */
2841 2882
2842 GNUNET_assert (0 == 2883 GNUNET_assert (0 ==
2843 pthread_mutex_unlock (&pg->event_lock)); 2884 pthread_mutex_unlock (&pg->event_lock));
2885 while (1)
2886 {
2887 r = GNUNET_CONTAINER_heap_peek (pg->event_heap);
2888 if (NULL == r)
2889 break;
2890 if (GNUNET_TIME_absolute_is_future (r->timeout))
2891 break;
2892 GNUNET_assert (r ==
2893 GNUNET_CONTAINER_heap_remove_root (pg->event_heap));
2894 r->hn = NULL;
2895 r->cb (r->cb_cls,
2896 NULL,
2897 0);
2898 }
2899 if (NULL != r)
2900 {
2901 struct GNUNET_TIME_Relative rem;
2902
2903 rem = GNUNET_TIME_absolute_get_remaining (r->timeout);
2904 timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
2905 }
2844 ret = poll (pfds, 2906 ret = poll (pfds,
2845 nfds, 2907 nfds,
2846 -1 /* no timeout */); 2908 timeout);
2847 if (-1 == ret) 2909 if (-1 == ret)
2848 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, 2910 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
2849 "poll"); 2911 "poll");
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls,
2909 * @param cb_cls closure for @a cb 2971 * @param cb_cls closure for @a cb
2910 * @return handle useful to cancel the listener 2972 * @return handle useful to cancel the listener
2911 */ 2973 */
2912static struct GNUNET_DB_EventHandler * 2974static struct TALER_EXCHANGEDB_EventHandler *
2913postgres_event_listen (void *cls, 2975postgres_event_listen (void *cls,
2914 struct TALER_EXCHANGEDB_Session *session, 2976 struct GNUNET_TIME_Relative timeout,
2915 const struct GNUNET_DB_EventHeaderP *es, 2977 const struct GNUNET_DB_EventHeaderP *es,
2916 GNUNET_DB_EventCallback cb, 2978 GNUNET_DB_EventCallback cb,
2917 void *cb_cls) 2979 void *cb_cls)
2918{ 2980{
2919 struct PostgresClosure *pg = cls; 2981 struct PostgresClosure *pg = cls;
2920 struct GNUNET_DB_EventHandler *eh; 2982 struct TALER_EXCHANGEDB_EventHandler *eh;
2983 struct TALER_EXCHANGEDB_Session *session;
2921 2984
2985 session = postgres_get_session (pg);
2986 eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler);
2987 eh->cb = cb;
2988 eh->cb_cls = cb_cls;
2989 eh->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2990 eh->geh = GNUNET_PQ_event_listen (session->conn,
2991 es,
2992 cb,
2993 cb_cls);
2994 GNUNET_assert (NULL != eh->geh);
2995 eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap,
2996 eh,
2997 eh->timeout.abs_value_us);
2922 GNUNET_assert (0 == 2998 GNUNET_assert (0 ==
2923 pthread_mutex_lock (&pg->event_lock)); 2999 pthread_mutex_lock (&pg->event_lock));
2924 pg->listener_count++; 3000 pg->listener_count++;
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls,
2932 } 3008 }
2933 GNUNET_assert (0 == 3009 GNUNET_assert (0 ==
2934 pthread_mutex_unlock (&pg->event_lock)); 3010 pthread_mutex_unlock (&pg->event_lock));
2935 eh = GNUNET_PQ_event_listen (session->conn,
2936 es,
2937 cb,
2938 cb_cls);
2939 GNUNET_assert (NULL != eh);
2940 return eh; 3011 return eh;
2941} 3012}
2942 3013
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls,
2949 */ 3020 */
2950static void 3021static void
2951postgres_event_listen_cancel (void *cls, 3022postgres_event_listen_cancel (void *cls,
2952 struct GNUNET_DB_EventHandler *eh) 3023 struct TALER_EXCHANGEDB_EventHandler *eh)
2953{ 3024{
2954 struct PostgresClosure *pg = cls; 3025 struct PostgresClosure *pg = cls;
2955 3026
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls,
2971 } 3042 }
2972 GNUNET_assert (0 == 3043 GNUNET_assert (0 ==
2973 pthread_mutex_unlock (&pg->event_lock)); 3044 pthread_mutex_unlock (&pg->event_lock));
2974 GNUNET_PQ_event_listen_cancel (eh); 3045 if (NULL != eh->hn)
3046 {
3047 GNUNET_CONTAINER_heap_remove_node (eh->hn);
3048 eh->hn = NULL;
3049 }
3050 GNUNET_PQ_event_listen_cancel (eh->geh);
3051 GNUNET_free (eh);
2975} 3052}
2976 3053
2977 3054
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
10917 10994
10918 pg = GNUNET_new (struct PostgresClosure); 10995 pg = GNUNET_new (struct PostgresClosure);
10919 pg->cfg = cfg; 10996 pg->cfg = cfg;
10997 pg->event_heap = GNUNET_CONTAINER_heap_create (
10998 GNUNET_CONTAINER_HEAP_ORDER_MIN);
10920 pg->main_self = pthread_self (); /* loaded while single-threaded! */ 10999 pg->main_self = pthread_self (); /* loaded while single-threaded! */
10921 if (GNUNET_OK != 11000 if (GNUNET_OK !=
10922 GNUNET_CONFIGURATION_get_value_filename (cfg, 11001 GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
11166 GNUNET_break (0 == 11245 GNUNET_break (0 ==
11167 close (pg->event_fd)); 11246 close (pg->event_fd));
11168 pthread_mutex_destroy (&pg->event_lock); 11247 pthread_mutex_destroy (&pg->event_lock);
11248 GNUNET_CONTAINER_heap_destroy (pg->event_heap);
11169 GNUNET_free (pg->sql_dir); 11249 GNUNET_free (pg->sql_dir);
11170 GNUNET_free (pg->currency); 11250 GNUNET_free (pg->currency);
11171 GNUNET_free (pg); 11251 GNUNET_free (pg);