summaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-04-20 21:38:02 +0200
committerChristian Grothoff <christian@grothoff.org>2017-04-20 21:38:02 +0200
commit27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9 (patch)
tree16da56b06fb589d3be77fc48c2116b7cb54647dc /src/exchange
parent92d9ec69e6d8e9f7eb0be0d6a7f67444189b319e (diff)
downloadexchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.tar.gz
exchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.tar.bz2
exchange-27c921c7c45f8ea8fed5c945a9e0ae0cfcc1c8e9.zip
finished implementing #4956 in principle, but not yet tested
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/taler-exchange-aggregator.c368
-rw-r--r--src/exchange/taler-exchange-httpd_responses.c7
-rw-r--r--src/exchange/test-taler-exchange-aggregator-postgres.conf9
-rw-r--r--src/exchange/test_taler_exchange_httpd.conf10
4 files changed, 385 insertions, 9 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index e9587930a..54757d860 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -48,7 +48,7 @@ struct WirePlugin
* Handle to the plugin.
*/
struct TALER_WIRE_Plugin *wire_plugin;
-
+
/**
* Name of the plugin.
*/
@@ -178,6 +178,34 @@ struct AggregationUnit
/**
+ * Context we use while closing a reserve.
+ */
+struct CloseTransferContext
+{
+ /**
+ * Handle for preparing the wire transfer.
+ */
+ struct TALER_WIRE_PrepareHandle *ph;
+
+ /**
+ * Our database session.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Wire transfer method.
+ */
+ char *type;
+};
+
+
+/**
+ * Active context while processing reserve closing,
+ * or NULL.
+ */
+static struct CloseTransferContext *ctc;
+
+/**
* Which currency is used by this exchange?
*/
static char *exchange_currency_string;
@@ -236,6 +264,11 @@ static int global_ret;
static int test_mode;
/**
+ * Did #run_reserve_closures() have any work during its last run?
+ */
+static int reserves_idle;
+
+/**
* Limit on the number of transactions we aggregate at once. Note
* that the limit must be big enough to ensure that when transactions
* of the smallest possible unit are aggregated, they do surpass the
@@ -666,7 +699,8 @@ aggregate_cb (void *cls,
/**
- * Function to be called with the prepared transfer data.
+ * Function to be called with the prepared transfer data
+ * when running an aggregation on a merchant.
*
* @param cls closure with the `struct AggregationUnit`
* @param buf transaction data to persist, NULL on error
@@ -679,6 +713,319 @@ prepare_cb (void *cls,
/**
+ * Main work function that finds and triggers transfers for reserves
+ * closures.
+ *
+ * @param cls closure
+ */
+static void
+run_reserve_closures (void *cls);
+
+
+/**
+ * Main work function that queries the DB and aggregates transactions
+ * into larger wire transfers.
+ *
+ * @param cls NULL
+ */
+static void
+run_aggregation (void *cls);
+
+
+/**
+ * Function to be called with the prepared transfer data
+ * when closing a reserve.
+ *
+ * @param cls closure with a `struct CloseTransferContext`
+ * @param buf transaction data to persist, NULL on error
+ * @param buf_size number of bytes in @a buf, 0 on error
+ */
+static void
+prepare_close_cb (void *cls,
+ const char *buf,
+ size_t buf_size)
+{
+ GNUNET_assert (cls == ctc);
+ ctc->ph = NULL;
+ if (NULL == buf)
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ ctc->session);
+ /* start again */
+ GNUNET_free (ctc->type);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ return;
+ }
+
+ /* Commit our intention to execute the wire transfer! */
+ if (GNUNET_OK !=
+ db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ ctc->session,
+ ctc->type,
+ buf,
+ buf_size))
+ {
+ GNUNET_break (0); /* why? how to best recover? */
+ db_plugin->rollback (db_plugin->cls,
+ ctc->session);
+ /* start again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ GNUNET_free (ctc->type);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ return;
+ }
+
+ /* finally commit */
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ ctc->session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit database transaction!\n");
+ }
+ GNUNET_free (ctc->type);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+}
+
+
+/**
+ * Function called with details about expired reserves.
+ * We trigger the reserve closure by inserting the respective
+ * closing record and prewire instructions into the respective
+ * tables.
+ *
+ * @param cls a `struct TALER_EXCHANGEDB_Session *`
+ * @param reserve_pub public key of the reserve
+ * @param left amount left in the reserve
+ * @param account_details information about the reserve's bank account
+ * @param expiration_date when did the reserve expire
+ * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ */
+static int
+expired_reserve_cb (void *cls,
+ const struct TALER_ReservePublicKeyP *reserve_pub,
+ const struct TALER_Amount *left,
+ const json_t *account_details,
+ struct GNUNET_TIME_Absolute expiration_date)
+{
+ struct TALER_EXCHANGEDB_Session *session = cls;
+ struct GNUNET_TIME_Absolute now;
+ struct TALER_WireTransferIdentifierRawP wtid;
+ struct TALER_Amount amount_without_fee;
+ const struct TALER_Amount *closing_fee;
+ int ret;
+ int iret;
+ const char *type;
+ struct WirePlugin *wp;
+
+ GNUNET_assert (NULL == ctc);
+ now = GNUNET_TIME_absolute_get ();
+
+ /* lookup wire plugin */
+ type = extract_type (account_details);
+ if (NULL == type)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+ wp = find_plugin (type);
+ if (NULL == wp)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+
+ /* lookup `closing_fee` */
+ if (GNUNET_OK !=
+ update_fees (wp,
+ now,
+ session))
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+ closing_fee = &wp->af->closing_fee;
+
+ /* calculate transfer amount */
+ ret = TALER_amount_subtract (&amount_without_fee,
+ left,
+ closing_fee);
+ if ( (GNUNET_SYSERR == ret) ||
+ (GNUNET_NO == ret) )
+ {
+ /* Closing fee higher than remaining balance, close
+ without wire transfer. */
+ closing_fee = left;
+ TALER_amount_get_zero (left->currency,
+ &amount_without_fee);
+ }
+
+ /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to
+ be future-compatible, we use the memset + min construction */
+ memset (&wtid,
+ 0,
+ sizeof (wtid));
+ memcpy (&wtid,
+ reserve_pub,
+ GNUNET_MIN (sizeof (wtid),
+ sizeof (*reserve_pub)));
+ iret = db_plugin->insert_reserve_closed (db_plugin->cls,
+ session,
+ reserve_pub,
+ now,
+ account_details,
+ &wtid,
+ left,
+ closing_fee);
+ if ( (GNUNET_OK == ret) &&
+ (GNUNET_OK == iret) )
+ {
+ /* success, perform wire transfer */
+ if (GNUNET_SYSERR ==
+ wp->wire_plugin->amount_round (wp->wire_plugin->cls,
+ &amount_without_fee))
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+ ctc = GNUNET_new (struct CloseTransferContext);
+ ctc->session = session;
+ ctc->type = GNUNET_strdup (type);
+ ctc->ph
+ = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls,
+ au->wire,
+ &amount_without_fee,
+ exchange_base_url,
+ &wtid,
+ &prepare_close_cb,
+ ctc);
+ if (NULL == ctc->ph)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ GNUNET_free (ctc->type);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ }
+ return GNUNET_SYSERR;
+ }
+ /* Check for hard failure */
+ if (GNUNET_SYSERR == iret)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+ /* Reserve balance was almost zero; just commit */
+ if (GNUNET_OK !=
+ db_plugin->commit (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit database transaction!\n");
+ }
+ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
+ NULL);
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Main work function that finds and triggers transfers for reserves
+ * closures.
+ *
+ * @param cls closure
+ */
+static void
+run_reserve_closures (void *cls)
+{
+ struct TALER_EXCHANGEDB_Session *session;
+ int ret;
+ const struct GNUNET_SCHEDULER_TaskContext *tc;
+
+ task = NULL;
+ reserves_idle = GNUNET_NO;
+ tc = GNUNET_SCHEDULER_get_task_context ();
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Checking for reserves to close\n");
+ if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database session!\n");
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ ret = db_plugin->get_expired_reserves (db_plugin->cls,
+ session,
+ GNUNET_TIME_absolute_get (),
+ &expired_reserve_cb,
+ session);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ reserves_idle = GNUNET_YES;
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ return;
+ }
+}
+
+
+/**
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
@@ -687,6 +1034,7 @@ prepare_cb (void *cls,
static void
run_aggregation (void *cls)
{
+ static int swap;
struct TALER_EXCHANGEDB_Session *session;
unsigned int i;
int ret;
@@ -696,6 +1044,12 @@ run_aggregation (void *cls)
tc = GNUNET_SCHEDULER_get_task_context ();
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ if (0 == (++swap % 2))
+ {
+ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
+ NULL);
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Checking for ready deposits to aggregate\n");
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
@@ -748,9 +1102,13 @@ run_aggregation (void *cls)
else
{
/* nothing to do, sleep for a minute and try again */
- task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
- &run_aggregation,
- NULL);
+ if (GNUNET_NO == reserves_idle)
+ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
+ NULL);
+ else
+ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+ &run_aggregation,
+ NULL);
}
return;
}
diff --git a/src/exchange/taler-exchange-httpd_responses.c b/src/exchange/taler-exchange-httpd_responses.c
index 1de383e37..0d7406451 100644
--- a/src/exchange/taler-exchange-httpd_responses.c
+++ b/src/exchange/taler-exchange-httpd_responses.c
@@ -886,17 +886,16 @@ compile_reserve_history (const struct TALER_EXCHANGEDB_ReserveHistory *rh,
rcc.reserve_pub = pos->details.closing->reserve_pub;
TALER_JSON_hash (pos->details.closing->receiver_account_details,
&rcc.h_wire);
- TALER_JSON_hash (pos->details.closing->transfer_details,
- &rcc.h_transfer);
+ rcc.wtid = pos->details.closing->transfer_details;
TEH_KS_sign (&rcc.purpose,
&pub,
&sig);
GNUNET_assert (0 ==
json_array_append_new (json_history,
- json_pack ("{s:s, s:O, s:O, s:o, s:o, s:o, s:o, s:o}",
+ json_pack ("{s:s, s:O, s:o, s:o, s:o, s:o, s:o, s:o}",
"type", "CLOSING",
"receiver_account_details", pos->details.closing->receiver_account_details,
- "transfer_details", pos->details.closing->transfer_details,
+ "transfer_details", GNUNET_JSON_from_data_auto (&pos->details.closing->transfer_details),
"exchange_pub", GNUNET_JSON_from_data_auto (&pub),
"exchange_sig", GNUNET_JSON_from_data_auto (&sig),
"timestamp", GNUNET_JSON_from_time_abs (pos->details.closing->execution_date),
diff --git a/src/exchange/test-taler-exchange-aggregator-postgres.conf b/src/exchange/test-taler-exchange-aggregator-postgres.conf
index a5ee91aa9..00736e44d 100644
--- a/src/exchange/test-taler-exchange-aggregator-postgres.conf
+++ b/src/exchange/test-taler-exchange-aggregator-postgres.conf
@@ -19,6 +19,15 @@ MASTER_PUBLIC_KEY = 98NJW3CQHZQGQXTY3K85K531XKPAPAVV4Q5V8PYYRR00NJGZWNVG
# Expected base URL of the exchange.
BASE_URL = "https://exchange.taler.net/"
+[exchangedb]
+# After how long do we close idle reserves? The exchange
+# and the auditor must agree on this value. We currently
+# expect it to be globally defined for the whole system,
+# as there is no way for wallets to query this value. Thus,
+# it is only configurable for testing, and should be treated
+# as constant in production.
+IDLE_RESERVE_EXPIRATION_TIME = 4 weeks
+
[exchangedb-postgres]
#The connection string the plugin has to use for connecting to the database
diff --git a/src/exchange/test_taler_exchange_httpd.conf b/src/exchange/test_taler_exchange_httpd.conf
index 945031dd1..5f282713b 100644
--- a/src/exchange/test_taler_exchange_httpd.conf
+++ b/src/exchange/test_taler_exchange_httpd.conf
@@ -19,6 +19,16 @@ MASTER_PUBLIC_KEY = 98NJW3CQHZQGQXTY3K85K531XKPAPAVV4Q5V8PYYRR00NJGZWNVG
DB = postgres
+[exchangedb]
+# After how long do we close idle reserves? The exchange
+# and the auditor must agree on this value. We currently
+# expect it to be globally defined for the whole system,
+# as there is no way for wallets to query this value. Thus,
+# it is only configurable for testing, and should be treated
+# as constant in production.
+IDLE_RESERVE_EXPIRATION_TIME = 4 weeks
+
+
[exchangedb-postgres]
DB_CONN_STR = "postgres:///talercheck"