diff options
Diffstat (limited to 'src/exchange/taler-exchange-transfer.c')
-rw-r--r-- | src/exchange/taler-exchange-transfer.c | 717 |
1 files changed, 531 insertions, 186 deletions
diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c index 1793dc988..9724b41fc 100644 --- a/src/exchange/taler-exchange-transfer.c +++ b/src/exchange/taler-exchange-transfer.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2020 Taler Systems SA + Copyright (C) 2016-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -27,6 +27,51 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" +/** + * What is the default batch size we use for credit history + * requests with the bank. See `batch_size` below. + */ +#define DEFAULT_BATCH_SIZE (4 * 1024) + +/** + * How often will we retry a request (given certain + * HTTP status codes) before giving up? + */ +#define MAX_RETRIES 16 + +/** + * Information about our work shard. + */ +struct Shard +{ + + /** + * Time when we started to work on this shard. + */ + struct GNUNET_TIME_Absolute shard_start_time; + + /** + * Offset the shard begins at. + */ + uint64_t shard_start; + + /** + * Exclusive offset where the shard ends. + */ + uint64_t shard_end; + + /** + * Offset where our current batch begins. + */ + uint64_t batch_start; + + /** + * Highest row processed in the current batch. + */ + uint64_t batch_end; + +}; + /** * Data we keep to #run_transfers(). There is at most @@ -38,9 +83,16 @@ struct WirePrepareData { /** - * Database session for all of our transactions. + * All transfers done in the same transaction + * are kept in a DLL. + */ + struct WirePrepareData *next; + + /** + * All transfers done in the same transaction + * are kept in a DLL. */ - struct TALER_EXCHANGEDB_Session *session; + struct WirePrepareData *prev; /** * Wire execution handle. @@ -50,13 +102,24 @@ struct WirePrepareData /** * Wire account used for this preparation. */ - struct TALER_EXCHANGEDB_WireAccount *wa; + const struct TALER_EXCHANGEDB_AccountInfo *wa; /** * Row ID of the transfer. */ unsigned long long row_id; + /** + * Number of bytes allocated after this struct + * with the prewire data. + */ + size_t buf_size; + + /** + * How often did we retry so far? + */ + unsigned int retries; + }; @@ -76,10 +139,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct GNUNET_SCHEDULER_Task *task; /** - * If we are currently executing a transfer, information about - * the active transfer is here. Otherwise, this variable is NULL. + * If we are currently executing transfers, information about + * the active transfers is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd_head; + +/** + * If we are currently executing transfers, information about + * the active transfers is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd_tail; + +/** + * Information about our work shard. */ -static struct WirePrepareData *wpd; +static struct Shard *shard; /** * Handle to the context for interacting with the bank / wire gateway. @@ -87,39 +161,71 @@ static struct WirePrepareData *wpd; static struct GNUNET_CURL_Context *ctx; /** - * Scheduler context for running the @e ctx. + * Randomized back-off we use on serialization errors. */ -static struct GNUNET_CURL_RescheduleContext *rc; +static struct GNUNET_TIME_Relative serialization_delay; /** - * How long should we sleep when idle before trying to find more work? + * Scheduler context for running the @e ctx. */ -static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; +static struct GNUNET_CURL_RescheduleContext *rc; /** * Value to return from main(). 0 on success, non-zero on errors. */ -static enum -{ - GR_SUCCESS = 0, - GR_WIRE_TRANSFER_FAILED = 1, - GR_DATABASE_COMMIT_HARD_FAIL = 2, - GR_INVARIANT_FAILURE = 3, - GR_WIRE_ACCOUNT_NOT_CONFIGURED = 4, - GR_WIRE_TRANSFER_BEGIN_FAIL = 5, - GR_DATABASE_TRANSACTION_BEGIN_FAIL = 6, - GR_DATABASE_SESSION_START_FAIL = 7, - GR_CONFIGURATION_INVALID = 8, - GR_CMD_LINE_UTF8_ERROR = 9, - GR_CMD_LINE_OPTIONS_WRONG = 10, - GR_DATABASE_FETCH_FAILURE = 11, -} global_ret; +static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; +/** + * How long should we sleep when idle before trying to find more work? + * Also used for how long we wait to grab a shard before trying it again. + * The value should be set to a bit above the average time it takes to + * process a shard. + */ +static struct GNUNET_TIME_Relative transfer_idle_sleep_interval; + +/** + * How long did we take to finish the last shard? + */ +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Size of the shards. + */ +static unsigned int shard_size = DEFAULT_BATCH_SIZE; + +/** + * How many workers should we plan our scheduling with? + */ +static unsigned int max_workers = 0; + + +/** + * Clean up all active bank interactions. + */ +static void +cleanup_wpd (void) +{ + struct WirePrepareData *wpd; + + while (NULL != (wpd = wpd_head)) + { + GNUNET_CONTAINER_DLL_remove (wpd_head, + wpd_tail, + wpd); + if (NULL != wpd->eh) + { + TALER_BANK_transfer_cancel (wpd->eh); + wpd->eh = NULL; + } + GNUNET_free (wpd); + } +} + /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -130,16 +236,6 @@ static void shutdown_task (void *cls) { (void) cls; - if (NULL != ctx) - { - GNUNET_CURL_fini (ctx); - ctx = NULL; - } - if (NULL != rc) - { - GNUNET_CURL_gnunet_rc_destroy (rc); - rc = NULL; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) @@ -147,42 +243,43 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } - if (NULL != wpd) - { - if (NULL != wpd->eh) - { - TALER_BANK_transfer_cancel (wpd->eh); - wpd->eh = NULL; - } - db_plugin->rollback (db_plugin->cls, - wpd->session); - GNUNET_free (wpd); - wpd = NULL; - } + cleanup_wpd (); + GNUNET_free (shard); + db_plugin->rollback (db_plugin->cls); /* just in case */ TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); cfg = NULL; + if (NULL != ctx) + { + GNUNET_CURL_fini (ctx); + ctx = NULL; + } + if (NULL != rc) + { + GNUNET_CURL_gnunet_rc_destroy (rc); + rc = NULL; + } } /** - * Parse the configuration for wirewatch. + * Parse the configuration for taler-exchange-transfer. * * @return #GNUNET_OK on success */ -static int -parse_wirewatch_config (void) +static enum GNUNET_GenericReturnValue +parse_transfer_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL", - &aggregator_idle_sleep_interval)) + "TRANSFER_IDLE_SLEEP_INTERVAL", + &transfer_idle_sleep_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL"); + "TRANSFER_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } if (NULL == @@ -193,7 +290,9 @@ parse_wirewatch_config (void) return GNUNET_SYSERR; } if (GNUNET_OK != - TALER_EXCHANGEDB_load_accounts (cfg)) + TALER_EXCHANGEDB_load_accounts (cfg, + TALER_EXCHANGEDB_ALO_DEBIT + | TALER_EXCHANGEDB_ALO_AUTHDATA)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No wire accounts configured for debit!\n"); @@ -208,18 +307,19 @@ parse_wirewatch_config (void) /** * Perform a database commit. If it fails, print a warning. * - * @param session session to perform the commit for. * @return status of commit */ static enum GNUNET_DB_QueryStatus -commit_or_warn (struct TALER_EXCHANGEDB_Session *session) +commit_or_warn (void) { enum GNUNET_DB_QueryStatus qs; - qs = db_plugin->commit (db_plugin->cls, - session); + qs = db_plugin->commit (db_plugin->cls); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + serialization_delay = GNUNET_TIME_UNIT_ZERO; return qs; + } GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) ? GNUNET_ERROR_TYPE_INFO : GNUNET_ERROR_TYPE_ERROR, @@ -238,100 +338,178 @@ static void run_transfers (void *cls); +static void +run_transfers_delayed (void *cls) +{ + (void) cls; + shard->shard_start_time = GNUNET_TIME_absolute_get (); + run_transfers (NULL); +} + + +/** + * Select shard to process. + * + * @param cls NULL + */ +static void +select_shard (void *cls); + + +/** + * We are done with the current batch. Commit + * and move on. + */ +static void +batch_done (void) +{ + /* batch done */ + GNUNET_assert (NULL == wpd_head); + switch (commit_or_warn ()) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization failure, trying again immediately!\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + shard->batch_start = shard->batch_end + 1; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Batch complete\n"); + /* continue with #run_transfers(), just to guard + against the unlikely case that there are more. */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return; + default: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + /** * Function called with the result from the execute step. * On success, we mark the respective wire transfer as finished, * and in general we afterwards continue to #run_transfers(), * except for irrecoverable errors. * - * @param cls NULL - * @param http_status_code #MHD_HTTP_OK on success - * @param ec taler error code - * @param row_id unique ID of the wire transfer in the bank's records - * @param wire_timestamp when did the transfer happen + * @param cls `struct WirePrepareData` we are working on + * @param tr transfer response */ static void wire_confirm_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_id, - struct GNUNET_TIME_Absolute wire_timestamp) + const struct TALER_BANK_TransferResponse *tr) { - struct TALER_EXCHANGEDB_Session *session = wpd->session; + struct WirePrepareData *wpd = cls; enum GNUNET_DB_QueryStatus qs; - (void) cls; - (void) row_id; - (void) wire_timestamp; wpd->eh = NULL; - if (MHD_HTTP_OK != http_status_code) + switch (tr->http_status) { + case MHD_HTTP_OK: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Wire transfer %llu completed successfully\n", + (unsigned long long) wpd->row_id); + qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, + wpd->row_id); + /* continued below */ + break; + case MHD_HTTP_NOT_FOUND: + case MHD_HTTP_CONFLICT: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Wire transaction failed: %u/%d\n", - http_status_code, - ec); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GR_WIRE_TRANSFER_FAILED; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; - return; - } - qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, - session, + "Wire transaction %llu failed: %u/%d\n", + (unsigned long long) wpd->row_id, + tr->http_status, + tr->ec); + qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls, wpd->row_id); - if (0 >= qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - db_plugin->rollback (db_plugin->cls, - session); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - } - else + /* continued below */ + break; + case 0: + case MHD_HTTP_TOO_MANY_REQUESTS: + case MHD_HTTP_INTERNAL_SERVER_ERROR: + case MHD_HTTP_BAD_GATEWAY: + case MHD_HTTP_SERVICE_UNAVAILABLE: + case MHD_HTTP_GATEWAY_TIMEOUT: + wpd->retries++; + if (wpd->retries < MAX_RETRIES) { - global_ret = GR_DATABASE_COMMIT_HARD_FAIL; - GNUNET_SCHEDULER_shutdown (); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Wire transfer %llu failed (%u), trying again\n", + (unsigned long long) wpd->row_id, + tr->http_status); + wpd->eh = TALER_BANK_transfer (ctx, + wpd->wa->auth, + &wpd[1], + wpd->buf_size, + &wire_confirm_cb, + wpd); + return; } - GNUNET_free (wpd); - wpd = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Wire transaction %llu failed: %u/%d\n", + (unsigned long long) wpd->row_id, + tr->http_status, + tr->ec); + cleanup_wpd (); + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Wire transfer %llu failed: %u/%d\n", + (unsigned long long) wpd->row_id, + tr->http_status, + tr->ec); + db_plugin->rollback (db_plugin->cls); + cleanup_wpd (); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); return; } - GNUNET_free (wpd); - wpd = NULL; - switch (commit_or_warn (session)) + shard->batch_end = GNUNET_MAX (wpd->row_id, + shard->batch_end); + switch (qs) { case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ + db_plugin->rollback (db_plugin->cls); + cleanup_wpd (); GNUNET_assert (NULL == task); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization failure, trying again immediately!\n"); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - global_ret = GR_DATABASE_COMMIT_HARD_FAIL; + db_plugin->rollback (db_plugin->cls); + cleanup_wpd (); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Wire transfer complete\n"); - /* continue with #run_transfers(), just to guard - against the unlikely case that there are more. */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return; - default: - GNUNET_break (0); - global_ret = GR_INVARIANT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_CONTAINER_DLL_remove (wpd_head, + wpd_tail, + wpd); + GNUNET_free (wpd); + break; } + if (NULL != wpd_head) + return; /* wait for other queries to complete */ + batch_done (); } @@ -352,19 +530,40 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct TALER_EXCHANGEDB_WireAccount *wa; + struct WirePrepareData *wpd; (void) cls; + if ( (NULL != task) || + (EXIT_SUCCESS != global_ret) ) + return; /* current transaction was aborted */ + if (rowid >= shard->shard_end) + { + /* skip */ + shard->batch_end = shard->shard_end - 1; + if (NULL != wpd_head) + return; + batch_done (); + return; + } if ( (NULL == wire_method) || (NULL == buf) ) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GR_DATABASE_FETCH_FAILURE; - goto cleanup; + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; } + wpd = GNUNET_malloc (sizeof (struct WirePrepareData) + + buf_size); + GNUNET_memcpy (&wpd[1], + buf, + buf_size); + wpd->buf_size = buf_size; wpd->row_id = rowid; + GNUNET_CONTAINER_DLL_insert (wpd_head, + wpd_tail, + wpd); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting wire transfer %llu\n", (unsigned long long) rowid); @@ -374,31 +573,27 @@ wire_prepare_cb (void *cls, /* Should really never happen here, as when we get here the wire account should be in the cache. */ GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GR_WIRE_ACCOUNT_NOT_CONFIGURED; - goto cleanup; + cleanup_wpd (); + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_NO_RESTART; + GNUNET_SCHEDULER_shutdown (); + return; } - wa = wpd->wa; wpd->eh = TALER_BANK_transfer (ctx, - &wa->auth, + wpd->wa->auth, buf, buf_size, &wire_confirm_cb, - NULL); + wpd); if (NULL == wpd->eh) { GNUNET_break (0); /* Irrecoverable */ - db_plugin->rollback (db_plugin->cls, - wpd->session); - global_ret = GR_WIRE_TRANSFER_BEGIN_FAIL; - goto cleanup; + cleanup_wpd (); + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; } - return; -cleanup: - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; } @@ -412,58 +607,98 @@ static void run_transfers (void *cls) { enum GNUNET_DB_QueryStatus qs; - struct TALER_EXCHANGEDB_Session *session; + int64_t limit; (void) cls; task = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for pending wire transfers\n"); - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) + limit = shard->shard_end - shard->batch_start; + if (0 >= limit) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); - global_ret = GR_DATABASE_SESSION_START_FAIL; - GNUNET_SCHEDULER_shutdown (); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shard [%llu,%llu) completed\n", + (unsigned long long) shard->shard_start, + (unsigned long long) shard->batch_end); + qs = db_plugin->complete_shard (db_plugin->cls, + "transfer", + shard->shard_start, + shard->batch_end + 1); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_free (shard); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got DB soft error for complete_shard. Rolling back.\n"); + GNUNET_free (shard); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&select_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; + } + shard_delay = GNUNET_TIME_absolute_get_duration ( + shard->shard_start_time); + GNUNET_free (shard); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&select_shard, + NULL); return; } + /* cap number of parallel connections to a reasonable + limit for concurrent requests to the bank */ + limit = GNUNET_MIN (limit, + 256); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for %lld pending wire transfers [%llu-...)\n", + (long long) limit, + (unsigned long long) shard->batch_start); if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "aggregator run transfer")) + db_plugin->start_read_committed (db_plugin->cls, + "aggregator run transfer")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - wpd = GNUNET_new (struct WirePrepareData); - wpd->session = session; + GNUNET_assert (NULL == task); qs = db_plugin->wire_prepare_data_get (db_plugin->cls, - session, + shard->batch_start, + limit, &wire_prepare_cb, NULL); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) - return; /* continued via continuation set in #wire_prepare_cb() */ - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_free (wpd); - wpd = NULL; switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: + cleanup_wpd (); + db_plugin->rollback (db_plugin->cls); GNUNET_break (0); - global_ret = GR_DATABASE_COMMIT_HARD_FAIL; + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization failure, trying again immediately!\n"); + cleanup_wpd (); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* no more prepared wire transfers, go sleep a bit! */ + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == wpd_head); GNUNET_assert (NULL == task); if (GNUNET_YES == test_mode) { @@ -475,15 +710,105 @@ run_transfers (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more pending wire transfers, going idle\n"); - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_transfers, + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, + &run_transfers_delayed, + NULL); + } + return; + default: + /* continued in wire_prepare_cb() */ + return; + } +} + + +/** + * Select shard to process. + * + * @param cls NULL + */ +static void +select_shard (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Relative delay; + uint64_t start; + uint64_t end; + + (void) cls; + task = NULL; + GNUNET_assert (NULL == wpd_head); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (0 == max_workers) + delay = GNUNET_TIME_UNIT_ZERO; + else + delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_WEAK, + 4 * GNUNET_TIME_relative_max ( + transfer_idle_sleep_interval, + GNUNET_TIME_relative_multiply (shard_delay, + max_workers)).rel_value_us); + qs = db_plugin->begin_shard (db_plugin->cls, + "transfer", + delay, + shard_size, + &start, + &end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain starting point for montoring from database!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + { + serialization_delay = GNUNET_TIME_randomized_backoff (serialization_delay, + GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization failure, trying again in %s!\n", + GNUNET_TIME_relative2s (serialization_delay, + true)); + task = GNUNET_SCHEDULER_add_delayed (serialization_delay, + &select_shard, NULL); } return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, + &select_shard, + NULL); + return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* should be impossible */ - GNUNET_assert (0); + /* continued below */ + break; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting with shard [%llu,%llu)\n", + (unsigned long long) start, + (unsigned long long) end); + shard = GNUNET_new (struct Shard); + shard->shard_start_time = GNUNET_TIME_absolute_get (); + shard->shard_start = start; + shard->shard_end = end; + shard->batch_start = start; + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); } @@ -506,10 +831,10 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != parse_transfer_config ()) { cfg = NULL; - global_ret = GR_CONFIGURATION_INVALID; + global_ret = EXIT_NOTCONFIGURED; return; } ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, @@ -520,9 +845,17 @@ run (void *cls, GNUNET_break (0); return; } - + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, + task = GNUNET_SCHEDULER_add_now (&select_shard, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); @@ -541,32 +874,44 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('S', + "size", + "SIZE", + "Size to process per shard (default: 1024)", + &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_uint ('w', + "workers", + "COUNT", + "Plan work load with up to COUNT worker processes (default: 16)", + &max_workers), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; + enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) - return GR_CMD_LINE_UTF8_ERROR; - if (GNUNET_OK != - GNUNET_PROGRAM_run (argc, argv, - "taler-exchange-transfer", - gettext_noop ( - "background process that executes outgoing wire transfers"), - options, - &run, NULL)) - { - GNUNET_free ((void *) argv); - return GR_CMD_LINE_OPTIONS_WRONG; - } - GNUNET_free ((void *) argv); + return EXIT_INVALIDARGUMENT; + TALER_OS_init (); + ret = GNUNET_PROGRAM_run ( + argc, argv, + "taler-exchange-transfer", + gettext_noop ( + "background process that executes outgoing wire transfers"), + options, + &run, NULL); + GNUNET_free_nz ((void *) argv); + if (GNUNET_SYSERR == ret) + return EXIT_INVALIDARGUMENT; + if (GNUNET_NO == ret) + return EXIT_SUCCESS; return global_ret; } |