diff options
Diffstat (limited to 'src/backend/taler-merchant-webhook.c')
-rw-r--r-- | src/backend/taler-merchant-webhook.c | 276 |
1 files changed, 203 insertions, 73 deletions
diff --git a/src/backend/taler-merchant-webhook.c b/src/backend/taler-merchant-webhook.c index 36e0f1a1..80db78fd 100644 --- a/src/backend/taler-merchant-webhook.c +++ b/src/backend/taler-merchant-webhook.c @@ -16,7 +16,7 @@ /** * @file taler-merchant-webhook.c * @brief Process that runs webhooks triggered by the merchant backend - * @author Christian Grothoff + * @author Priscilla HUANG */ #include "platform.h" #include <gnunet/gnunet_util_lib.h> @@ -24,25 +24,28 @@ #include <pthread.h> #include "taler_merchantdb_lib.h" #include "taler_merchantdb_plugin.h" +#include <taler/taler_dbevents.h> -struct Work_response +struct WorkResponse { - struct Work_response *next; - struct Work_response *prev; + struct WorkResponse *next; + struct WorkResponse *prev; struct GNUNET_CURL_Job *job; - uint64_t webhook_serial; + uint64_t webhook_pending_serial; char *body; struct curl_slist *job_headers; }; -static struct Work_response *w_head; +static struct WorkResponse *w_head; -static struct Work_response *w_tail; +static struct WorkResponse *w_tail; + +static struct GNUNET_DB_EventHandler *eh; /** - * The exchange's configuration. + * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; @@ -85,11 +88,16 @@ static int test_mode; static void shutdown_task (void *cls) { - struct Work_response *w; + struct WorkResponse *w; (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); + if (NULL != eh) + { + db_plugin->event_listen_cancel (eh); + eh = NULL; + } if (NULL != task) { GNUNET_SCHEDULER_cancel (task); @@ -121,8 +129,14 @@ shutdown_task (void *cls) } } -/* The fuction is defined after */ -static void select_work (void *cls); + +/** + * Select webhook to process. + * + * @param cls NULL + */ +static void +select_work (void *cls); /** @@ -140,47 +154,104 @@ handle_webhook_response (void *cls, const void *body, size_t body_size) { - struct Work_response *w = cls; - struct GNUNET_TIME_Relative next_attempt; + struct WorkResponse *w = cls; (void) body; (void) body_size; - - if (2 == response_code / 100) /* any 2xx http status code is OK! */ - { - db_plugin->delete_pending_webhook (db_plugin->cls, - w->webhook_serial); - } - else - { - switch (response_code) - { - case MHD_HTTP_BAD_REQUEST: - next_attempt = GNUNET_TIME_UNIT_FOREVER_REL; // never try again - break; - case MHD_HTTP_INTERNAL_SERVER_ERROR: - next_attempt = GNUNET_TIME_UNIT_MINUTES; - break; - case MHD_HTTP_FORBIDDEN: - next_attempt = GNUNET_TIME_UNIT_MINUTES; - break; - default: - next_attempt = GNUNET_TIME_UNIT_HOURS; - break; - } - db_plugin->update_pending_webhook (db_plugin->cls, - w->webhook_serial, - GNUNET_TIME_relative_to_absolute (next_attempt)); - } + w->job = NULL; GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); GNUNET_free (w->body); curl_slist_free_all (w->job_headers); - GNUNET_free (w); if (NULL == w_head) task = GNUNET_SCHEDULER_add_now (&select_work, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Webhook %llu returned with status %ld\n", + (unsigned long long) w->webhook_pending_serial, + response_code); + if (2 == response_code / 100) /* any 2xx http status code is OK! */ + { + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->delete_pending_webhook (db_plugin->cls, + w->webhook_pending_serial); + GNUNET_free (w); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to delete webhook, delete returned: %d\n", + qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Delete returned: %d\n", + qs); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Delete returned: %d\n", + qs); + return; + } + GNUNET_assert (0); + } + + { + struct GNUNET_TIME_Relative next_attempt; + enum GNUNET_DB_QueryStatus qs; + switch (response_code) + { + case MHD_HTTP_BAD_REQUEST: + next_attempt = GNUNET_TIME_UNIT_FOREVER_REL; // never try again + break; + case MHD_HTTP_INTERNAL_SERVER_ERROR: + next_attempt = GNUNET_TIME_UNIT_MINUTES; + break; + case MHD_HTTP_FORBIDDEN: + next_attempt = GNUNET_TIME_UNIT_MINUTES; + break; + default: + next_attempt = GNUNET_TIME_UNIT_HOURS; + break; + } + qs = db_plugin->update_pending_webhook (db_plugin->cls, + w->webhook_pending_serial, + GNUNET_TIME_relative_to_absolute ( + next_attempt)); + GNUNET_free (w); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to update pending webhook to next in %s Rval: %d\n", + GNUNET_TIME_relative2s (next_attempt, + true), + qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Next in %s Rval: %d\n", + GNUNET_TIME_relative2s (next_attempt, true), + qs); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Next in %s Rval: %d\n", + GNUNET_TIME_relative2s (next_attempt, true), + qs); + return; + } + GNUNET_assert (0); + } } @@ -188,7 +259,7 @@ handle_webhook_response (void *cls, * Typically called by `select_work`. * * @param cls a `json_t *` JSON array to build - * @param webhook_serial reference to the configured webhook template. + * @param webhook_pending_serial reference to the configured webhook template. * @param next_attempt is the time we should make the next request to the webhook. * @param retries how often have we tried this request to the webhook. * @param url to make request to @@ -198,7 +269,7 @@ handle_webhook_response (void *cls, */ static void pending_webhooks_cb (void *cls, - uint64_t webhook_serial, + uint64_t webhook_pending_serial, struct GNUNET_TIME_Absolute next_attempt, uint32_t retries, const char *url, @@ -206,17 +277,17 @@ pending_webhooks_cb (void *cls, const char *header, const char *body) { - struct Work_response *w = GNUNET_new (struct Work_response); + struct WorkResponse *w = GNUNET_new (struct WorkResponse); CURL *eh; - struct curl_slist *job_headers = NULL; (void) retries; (void) next_attempt; (void) cls; + struct curl_slist *job_headers = NULL; GNUNET_CONTAINER_DLL_insert (w_head, w_tail, w); - w->webhook_serial = webhook_serial; + w->webhook_pending_serial = webhook_pending_serial; eh = curl_easy_init (); GNUNET_assert (NULL != eh); GNUNET_assert (CURLE_OK == @@ -227,25 +298,48 @@ pending_webhooks_cb (void *cls, curl_easy_setopt (eh, CURLOPT_URL, url)); - - /* conversion body data */ - w->body = GNUNET_strdup (body); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, - CURLOPT_POSTFIELDS, - w->body)); + CURLOPT_VERBOSE, + 0L)); + /* conversion body data */ + if (NULL != body) + { + w->body = GNUNET_strdup (body); + GNUNET_assert (CURLE_OK == + curl_easy_setopt (eh, + CURLOPT_POSTFIELDS, + w->body)); + } /* conversion header to job_headers data */ - char *header_copy = GNUNET_strdup (header); + if (NULL != header) + { + char *header_copy = GNUNET_strdup (header); + + for (const char *tok = strtok (header_copy, "\n"); + NULL != tok; + tok = strtok (NULL, "\n")) + { + // extract all Key: value from 'header_copy'! + job_headers = curl_slist_append (job_headers, + tok); + } + GNUNET_free (header_copy); + GNUNET_assert (CURLE_OK == + curl_easy_setopt (eh, + CURLOPT_HTTPHEADER, + job_headers)); + w->job_headers = job_headers; + } GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, - CURLOPT_POSTFIELDS, - header_copy)); + CURLOPT_MAXREDIRS, + 5)); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, - CURLOPT_POSTFIELDS, - job_headers)); - w->job_headers = job_headers; + CURLOPT_FOLLOWLOCATION, + 1)); w->job = GNUNET_CURL_job_add_raw (ctx, eh, @@ -255,8 +349,8 @@ pending_webhooks_cb (void *cls, if (NULL == w->job) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start the curl job for webhook #%llu\n", - (unsigned long long) webhook_serial); + "Failed to start the curl job for pending webhook #%llu\n", + (unsigned long long) webhook_pending_serial); curl_slist_free_all (w->job_headers); GNUNET_free (w->body); GNUNET_CONTAINER_DLL_remove (w_head, @@ -269,13 +363,34 @@ pending_webhooks_cb (void *cls, } +/** + * Function called on events received from Postgres. + * + * @param cls closure, NULL + * @param extra additional event data provided + * @param extra_size number of bytes in @a extra + */ +static void +db_notify (void *cls, + const void *extra, + size_t extra_size) +{ + (void) cls; + (void) extra; + (void) extra_size; + + GNUNET_assert (NULL != task); + GNUNET_SCHEDULER_cancel (task); + task = GNUNET_SCHEDULER_add_now (&select_work, + NULL); +} /** * Typically called by `select_work`. * * @param cls a `json_t *` JSON array to build - * @param webhook_serial reference to the configured webhook template. + * @param webhook_pending_serial reference to the configured webhook template. * @param next_attempt is the time we should make the next request to the webhook. * @param retries how often have we tried this request to the webhook. * @param url to make request to @@ -285,7 +400,7 @@ pending_webhooks_cb (void *cls, */ static void future_webhook_cb (void *cls, - uint64_t webhook_serial, + uint64_t webhook_pending_serial, struct GNUNET_TIME_Absolute next_attempt, uint32_t retries, const char *url, @@ -293,7 +408,7 @@ future_webhook_cb (void *cls, const char *header, const char *body) { - (void) webhook_serial; + (void) webhook_pending_serial; (void) retries; (void) url; (void) http_method; @@ -306,11 +421,6 @@ future_webhook_cb (void *cls, } -/** - * Select webhook to process. - * - * @param cls NULL - */ static void select_work (void *cls) { @@ -328,7 +438,7 @@ select_work (void *cls) case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed!\n"); + "Failed to lookup pending webhooks!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; @@ -341,11 +451,12 @@ select_work (void *cls) qs = db_plugin->lookup_future_webhook (db_plugin->cls, &future_webhook_cb, NULL); - switch (qs) { + switch (qs) + { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed!\n"); + "Failed to lookup future webhook!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; @@ -353,6 +464,8 @@ select_work (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* wait 5 min */ + /* Note: this should not even be necessary if all webhooks + use the events properly... */ rel = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); task = GNUNET_SCHEDULER_add_delayed (rel, &select_work, @@ -366,7 +479,7 @@ select_work (void *cls) } -/* +/** * First task. * * @param cls closure, NULL @@ -393,6 +506,7 @@ run (void *cls, { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; return; } if (NULL == @@ -401,6 +515,7 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NOTCONFIGURED; return; } if (GNUNET_OK != @@ -409,8 +524,21 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database\n"); GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; return; - } + } + { + struct GNUNET_DB_EventHeaderP es = { + .size = htons (sizeof (es)), + .type = htons (TALER_DBEVENT_MERCHANT_WEBHOOK_PENDING) + }; + + eh = db_plugin->event_listen (db_plugin->cls, + &es, + GNUNET_TIME_UNIT_FOREVER_REL, + &db_notify, + NULL); + } GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&select_work, NULL); @@ -432,6 +560,8 @@ main (int argc, "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_timetravel ('T', + "timetravel"), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; @@ -458,4 +588,4 @@ main (int argc, } -/* end of taler-exchange-transfer.c */ +/* end of taler-merchant-webhook.c */ |