diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-04-20 21:38:02 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-04-20 21:38:02 +0200 |
commit | 27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9 (patch) | |
tree | 16da56b06fb589d3be77fc48c2116b7cb54647dc /src/exchange | |
parent | 92d9ec69e6d8e9f7eb0be0d6a7f67444189b319e (diff) | |
download | exchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.tar.gz exchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.tar.bz2 exchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.zip |
finished implementing #4956 in principle, but not yet tested
Diffstat (limited to 'src/exchange')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 368 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd_responses.c | 7 | ||||
-rw-r--r-- | src/exchange/test-taler-exchange-aggregator-postgres.conf | 9 | ||||
-rw-r--r-- | src/exchange/test_taler_exchange_httpd.conf | 10 |
4 files changed, 385 insertions, 9 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index e9587930a..54757d860 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -48,7 +48,7 @@ struct WirePlugin * Handle to the plugin. */ struct TALER_WIRE_Plugin *wire_plugin; - + /** * Name of the plugin. */ @@ -178,6 +178,34 @@ struct AggregationUnit /** + * Context we use while closing a reserve. + */ +struct CloseTransferContext +{ + /** + * Handle for preparing the wire transfer. + */ + struct TALER_WIRE_PrepareHandle *ph; + + /** + * Our database session. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire transfer method. + */ + char *type; +}; + + +/** + * Active context while processing reserve closing, + * or NULL. + */ +static struct CloseTransferContext *ctc; + +/** * Which currency is used by this exchange? */ static char *exchange_currency_string; @@ -236,6 +264,11 @@ static int global_ret; static int test_mode; /** + * Did #run_reserve_closures() have any work during its last run? + */ +static int reserves_idle; + +/** * Limit on the number of transactions we aggregate at once. Note * that the limit must be big enough to ensure that when transactions * of the smallest possible unit are aggregated, they do surpass the @@ -666,7 +699,8 @@ aggregate_cb (void *cls, /** - * Function to be called with the prepared transfer data. + * Function to be called with the prepared transfer data + * when running an aggregation on a merchant. * * @param cls closure with the `struct AggregationUnit` * @param buf transaction data to persist, NULL on error @@ -679,6 +713,319 @@ prepare_cb (void *cls, /** + * Main work function that finds and triggers transfers for reserves + * closures. + * + * @param cls closure + */ +static void +run_reserve_closures (void *cls); + + +/** + * Main work function that queries the DB and aggregates transactions + * into larger wire transfers. + * + * @param cls NULL + */ +static void +run_aggregation (void *cls); + + +/** + * Function to be called with the prepared transfer data + * when closing a reserve. + * + * @param cls closure with a `struct CloseTransferContext` + * @param buf transaction data to persist, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +prepare_close_cb (void *cls, + const char *buf, + size_t buf_size) +{ + GNUNET_assert (cls == ctc); + ctc->ph = NULL; + if (NULL == buf) + { + GNUNET_break (0); /* why? how to best recover? */ + db_plugin->rollback (db_plugin->cls, + ctc->session); + /* start again */ + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + return; + } + + /* Commit our intention to execute the wire transfer! */ + if (GNUNET_OK != + db_plugin->wire_prepare_data_insert (db_plugin->cls, + ctc->session, + ctc->type, + buf, + buf_size)) + { + GNUNET_break (0); /* why? how to best recover? */ + db_plugin->rollback (db_plugin->cls, + ctc->session); + /* start again */ + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + return; + } + + /* finally commit */ + if (GNUNET_OK != + db_plugin->commit (db_plugin->cls, + ctc->session)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit database transaction!\n"); + } + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); +} + + +/** + * Function called with details about expired reserves. + * We trigger the reserve closure by inserting the respective + * closing record and prewire instructions into the respective + * tables. + * + * @param cls a `struct TALER_EXCHANGEDB_Session *` + * @param reserve_pub public key of the reserve + * @param left amount left in the reserve + * @param account_details information about the reserve's bank account + * @param expiration_date when did the reserve expire + * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + */ +static int +expired_reserve_cb (void *cls, + const struct TALER_ReservePublicKeyP *reserve_pub, + const struct TALER_Amount *left, + const json_t *account_details, + struct GNUNET_TIME_Absolute expiration_date) +{ + struct TALER_EXCHANGEDB_Session *session = cls; + struct GNUNET_TIME_Absolute now; + struct TALER_WireTransferIdentifierRawP wtid; + struct TALER_Amount amount_without_fee; + const struct TALER_Amount *closing_fee; + int ret; + int iret; + const char *type; + struct WirePlugin *wp; + + GNUNET_assert (NULL == ctc); + now = GNUNET_TIME_absolute_get (); + + /* lookup wire plugin */ + type = extract_type (account_details); + if (NULL == type) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + wp = find_plugin (type); + if (NULL == wp) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + + /* lookup `closing_fee` */ + if (GNUNET_OK != + update_fees (wp, + now, + session)) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + closing_fee = &wp->af->closing_fee; + + /* calculate transfer amount */ + ret = TALER_amount_subtract (&amount_without_fee, + left, + closing_fee); + if ( (GNUNET_SYSERR == ret) || + (GNUNET_NO == ret) ) + { + /* Closing fee higher than remaining balance, close + without wire transfer. */ + closing_fee = left; + TALER_amount_get_zero (left->currency, + &amount_without_fee); + } + + /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to + be future-compatible, we use the memset + min construction */ + memset (&wtid, + 0, + sizeof (wtid)); + memcpy (&wtid, + reserve_pub, + GNUNET_MIN (sizeof (wtid), + sizeof (*reserve_pub))); + iret = db_plugin->insert_reserve_closed (db_plugin->cls, + session, + reserve_pub, + now, + account_details, + &wtid, + left, + closing_fee); + if ( (GNUNET_OK == ret) && + (GNUNET_OK == iret) ) + { + /* success, perform wire transfer */ + if (GNUNET_SYSERR == + wp->wire_plugin->amount_round (wp->wire_plugin->cls, + &amount_without_fee)) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + ctc = GNUNET_new (struct CloseTransferContext); + ctc->session = session; + ctc->type = GNUNET_strdup (type); + ctc->ph + = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls, + au->wire, + &amount_without_fee, + exchange_base_url, + &wtid, + &prepare_close_cb, + ctc); + if (NULL == ctc->ph) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (ctc->type); + GNUNET_free (ctc); + ctc = NULL; + } + return GNUNET_SYSERR; + } + /* Check for hard failure */ + if (GNUNET_SYSERR == iret) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + /* Reserve balance was almost zero; just commit */ + if (GNUNET_OK != + db_plugin->commit (db_plugin->cls, + session)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit database transaction!\n"); + } + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return GNUNET_SYSERR; +} + + +/** + * Main work function that finds and triggers transfers for reserves + * closures. + * + * @param cls closure + */ +static void +run_reserve_closures (void *cls) +{ + struct TALER_EXCHANGEDB_Session *session; + int ret; + const struct GNUNET_SCHEDULER_TaskContext *tc; + + task = NULL; + reserves_idle = GNUNET_NO; + tc = GNUNET_SCHEDULER_get_task_context (); + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Checking for reserves to close\n"); + if (NULL == (session = db_plugin->get_session (db_plugin->cls))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database session!\n"); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + session)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } + ret = db_plugin->get_expired_reserves (db_plugin->cls, + session, + GNUNET_TIME_absolute_get (), + &expired_reserve_cb, + session); + if (GNUNET_SYSERR == ret) + { + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + session); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_NO == ret) + { + reserves_idle = GNUNET_YES; + db_plugin->rollback (db_plugin->cls, + session); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + return; + } +} + + +/** * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * @@ -687,6 +1034,7 @@ prepare_cb (void *cls, static void run_aggregation (void *cls) { + static int swap; struct TALER_EXCHANGEDB_Session *session; unsigned int i; int ret; @@ -696,6 +1044,12 @@ run_aggregation (void *cls) tc = GNUNET_SCHEDULER_get_task_context (); if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (0 == (++swap % 2)) + { + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Checking for ready deposits to aggregate\n"); if (NULL == (session = db_plugin->get_session (db_plugin->cls))) @@ -748,9 +1102,13 @@ run_aggregation (void *cls) else { /* nothing to do, sleep for a minute and try again */ - task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &run_aggregation, - NULL); + if (GNUNET_NO == reserves_idle) + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); + else + task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &run_aggregation, + NULL); } return; } diff --git a/src/exchange/taler-exchange-httpd_responses.c b/src/exchange/taler-exchange-httpd_responses.c index 1de383e37..0d7406451 100644 --- a/src/exchange/taler-exchange-httpd_responses.c +++ b/src/exchange/taler-exchange-httpd_responses.c @@ -886,17 +886,16 @@ compile_reserve_history (const struct TALER_EXCHANGEDB_ReserveHistory *rh, rcc.reserve_pub = pos->details.closing->reserve_pub; TALER_JSON_hash (pos->details.closing->receiver_account_details, &rcc.h_wire); - TALER_JSON_hash (pos->details.closing->transfer_details, - &rcc.h_transfer); + rcc.wtid = pos->details.closing->transfer_details; TEH_KS_sign (&rcc.purpose, &pub, &sig); GNUNET_assert (0 == json_array_append_new (json_history, - json_pack ("{s:s, s:O, s:O, s:o, s:o, s:o, s:o, s:o}", + json_pack ("{s:s, s:O, s:o, s:o, s:o, s:o, s:o, s:o}", "type", "CLOSING", "receiver_account_details", pos->details.closing->receiver_account_details, - "transfer_details", pos->details.closing->transfer_details, + "transfer_details", GNUNET_JSON_from_data_auto (&pos->details.closing->transfer_details), "exchange_pub", GNUNET_JSON_from_data_auto (&pub), "exchange_sig", GNUNET_JSON_from_data_auto (&sig), "timestamp", GNUNET_JSON_from_time_abs (pos->details.closing->execution_date), diff --git a/src/exchange/test-taler-exchange-aggregator-postgres.conf b/src/exchange/test-taler-exchange-aggregator-postgres.conf index a5ee91aa9..00736e44d 100644 --- a/src/exchange/test-taler-exchange-aggregator-postgres.conf +++ b/src/exchange/test-taler-exchange-aggregator-postgres.conf @@ -19,6 +19,15 @@ MASTER_PUBLIC_KEY = 98NJW3CQHZQGQXTY3K85K531XKPAPAVV4Q5V8PYYRR00NJGZWNVG # Expected base URL of the exchange. BASE_URL = "https://exchange.taler.net/" +[exchangedb] +# After how long do we close idle reserves? The exchange +# and the auditor must agree on this value. We currently +# expect it to be globally defined for the whole system, +# as there is no way for wallets to query this value. Thus, +# it is only configurable for testing, and should be treated +# as constant in production. +IDLE_RESERVE_EXPIRATION_TIME = 4 weeks + [exchangedb-postgres] #The connection string the plugin has to use for connecting to the database diff --git a/src/exchange/test_taler_exchange_httpd.conf b/src/exchange/test_taler_exchange_httpd.conf index 945031dd1..5f282713b 100644 --- a/src/exchange/test_taler_exchange_httpd.conf +++ b/src/exchange/test_taler_exchange_httpd.conf @@ -19,6 +19,16 @@ MASTER_PUBLIC_KEY = 98NJW3CQHZQGQXTY3K85K531XKPAPAVV4Q5V8PYYRR00NJGZWNVG DB = postgres +[exchangedb] +# After how long do we close idle reserves? The exchange +# and the auditor must agree on this value. We currently +# expect it to be globally defined for the whole system, +# as there is no way for wallets to query this value. Thus, +# it is only configurable for testing, and should be treated +# as constant in production. +IDLE_RESERVE_EXPIRATION_TIME = 4 weeks + + [exchangedb-postgres] DB_CONN_STR = "postgres:///talercheck" |