summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c448
1 files changed, 199 insertions, 249 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 71a3efd7d..2704f5910 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2018 Taler Systems SA
+ Copyright (C) 2016-2020 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -26,6 +26,7 @@
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
+#include "taler_bank_service.h"
#include "taler_wire_lib.h"
@@ -45,9 +46,14 @@ struct WireAccount
struct WireAccount *prev;
/**
- * Handle to the plugin.
+ * Account information.
*/
- struct TALER_WIRE_Plugin *wire_plugin;
+ struct TALER_Account account;
+
+ /**
+ * Authentication data.
+ */
+ struct TALER_BANK_AuthenticationData auth;
/**
* Wire transfer fee structure.
@@ -59,6 +65,11 @@ struct WireAccount
*/
char *section_name;
+ /**
+ * Name of the wire method underlying the account.
+ */
+ char *method;
+
};
@@ -77,7 +88,7 @@ struct WirePrepareData
/**
* Wire execution handle.
*/
- struct TALER_WIRE_ExecuteHandle *eh;
+ struct TALER_BANK_WireExecuteHandle *eh;
/**
* Wire plugin used for this preparation.
@@ -187,10 +198,6 @@ struct AggregationUnit
*/
struct CloseTransferContext
{
- /**
- * Handle for preparing the wire transfer.
- */
- struct TALER_WIRE_PrepareHandle *ph;
/**
* Our database session.
@@ -263,6 +270,16 @@ static struct WirePrepareData *wpd;
static struct AggregationUnit *au;
/**
+ * Handle to the context for interacting with the bank.
+ */
+static struct GNUNET_CURL_Context *ctx;
+
+/**
+ * Scheduler context for running the @e ctx.
+ */
+static struct GNUNET_CURL_RescheduleContext *rc;
+
+/**
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
* on serious errors.
*/
@@ -339,7 +356,7 @@ update_fees (struct WireAccount *wa,
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
/* Let's try to load it from disk... */
wa->af = TALER_EXCHANGEDB_fees_read (cfg,
- wa->wire_plugin->method);
+ wa->method);
advance_fees (wa,
now);
for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af;
@@ -348,7 +365,7 @@ update_fees (struct WireAccount *wa,
{
qs = db_plugin->insert_wire_fee (db_plugin->cls,
session,
- wa->wire_plugin->method,
+ wa->method,
p->start_date,
p->end_date,
&p->wire_fee,
@@ -365,7 +382,7 @@ update_fees (struct WireAccount *wa,
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find current wire transfer fees for `%s'\n",
- wa->wire_plugin->method);
+ wa->method);
return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
}
@@ -381,7 +398,7 @@ find_account_by_method (const char *method)
{
for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next)
if (0 == strcmp (method,
- wa->wire_plugin->method))
+ wa->method))
return wa;
return NULL;
}
@@ -431,13 +448,40 @@ add_account_cb (void *cls,
if (GNUNET_YES != ai->debit_enabled)
return; /* not enabled for us, skip */
wa = GNUNET_new (struct WireAccount);
- wa->wire_plugin = TALER_WIRE_plugin_load (cfg,
- ai->plugin_name);
- if (NULL == wa->wire_plugin)
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_string (cfg,
+ ai->section_name,
+ "METHOD",
+ &wa->method))
{
- fprintf (stderr,
- "Failed to load wire plugin for `%s'\n",
- ai->plugin_name);
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ ai->section_name,
+ "METHOD");
+ GNUNET_free (wa);
+ return;
+ }
+ if (GNUNET_OK !=
+ TALER_BANK_auth_parse_cfg (cfg,
+ ai->section_name,
+ &wa->auth))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ "Failed to load account `%s'\n",
+ ai->section_name);
+ GNUNET_free (wa->method);
+ GNUNET_free (wa);
+ return;
+ }
+ if (GNUNET_OK !=
+ TALER_BANK_account_parse_cfg (cfg,
+ ai->section_name,
+ &wa->account))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ "Failed to load account `%s'\n",
+ ai->section_name);
+ TALER_BANK_auth_free (&wa->auth);
+ GNUNET_free (wa->method);
GNUNET_free (wa);
return;
}
@@ -476,6 +520,16 @@ static void
shutdown_task (void *cls)
{
(void) cls;
+ if (NULL != ctx)
+ {
+ GNUNET_CURL_fini (ctx);
+ ctx = NULL;
+ }
+ if (NULL != rc)
+ {
+ GNUNET_CURL_gnunet_rc_destroy (rc);
+ rc = NULL;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running shutdown\n");
if (NULL != task)
@@ -487,9 +541,7 @@ shutdown_task (void *cls)
{
if (NULL != wpd->eh)
{
- wpd->wa->wire_plugin->execute_wire_transfer_cancel (
- wpd->wa->wire_plugin->cls,
- wpd->eh);
+ TALER_BANK_execute_wire_transfer_cancel (wpd->eh);
wpd->eh = NULL;
}
db_plugin->rollback (db_plugin->cls,
@@ -499,23 +551,12 @@ shutdown_task (void *cls)
}
if (NULL != au)
{
- if (NULL != au->ph)
- {
- au->wa->wire_plugin->prepare_wire_transfer_cancel (
- au->wa->wire_plugin->cls,
- au->ph);
- au->ph = NULL;
- }
db_plugin->rollback (db_plugin->cls,
au->session);
cleanup_au ();
}
if (NULL != ctc)
{
- ctc->wa->wire_plugin->prepare_wire_transfer_cancel (
- ctc->wa->wire_plugin->cls,
- ctc->ph);
- ctc->ph = NULL;
db_plugin->rollback (db_plugin->cls,
ctc->session);
GNUNET_free (ctc->method);
@@ -532,9 +573,11 @@ shutdown_task (void *cls)
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
- TALER_WIRE_plugin_unload (wa->wire_plugin);
+ TALER_WIRE_account_free (&wa->account);
+ TALER_BANK_auth_free (&wa->auth);
TALER_EXCHANGEDB_fees_free (wa->af);
GNUNET_free (wa->section_name);
+ GNUNET_free (wa->method);
GNUNET_free (wa);
}
}
@@ -922,20 +965,6 @@ aggregate_cb (void *cls,
/**
- * Function to be called with the prepared transfer data
- * when running an aggregation on a merchant.
- *
- * @param cls closure with the `struct AggregationUnit`
- * @param buf transaction data to persist, NULL on error
- * @param buf_size number of bytes in @a buf, 0 on error
- */
-static void
-prepare_cb (void *cls,
- const char *buf,
- size_t buf_size);
-
-
-/**
* Main work function that finds and triggers transfers for reserves
* closures.
*
@@ -989,83 +1018,6 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
/**
- * Function to be called with the prepared transfer data
- * when closing a reserve.
- *
- * @param cls closure with a `struct CloseTransferContext`
- * @param buf transaction data to persist, NULL on error
- * @param buf_size number of bytes in @a buf, 0 on error
- */
-static void
-prepare_close_cb (void *cls,
- const char *buf,
- size_t buf_size)
-{
- enum GNUNET_DB_QueryStatus qs;
-
- GNUNET_assert (cls == ctc);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Prepared for reserve closing\n");
- ctc->ph = NULL;
- if (NULL == buf)
- {
- GNUNET_break (0); /* why? how to best recover? */
- db_plugin->rollback (db_plugin->cls,
- ctc->session);
- /* start again */
- GNUNET_free (ctc->method);
- GNUNET_free (ctc);
- ctc = NULL;
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- return;
- }
-
- /* Commit our intention to execute the wire transfer! */
- qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
- ctc->session,
- ctc->method,
- buf,
- buf_size);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- ctc->session);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (ctc->method);
- GNUNET_free (ctc);
- ctc = NULL;
- return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- db_plugin->rollback (db_plugin->cls,
- ctc->session);
- /* start again */
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- GNUNET_free (ctc->method);
- GNUNET_free (ctc);
- ctc = NULL;
- return;
- }
-
- /* finally commit */
- (void) commit_or_warn (ctc->session);
- GNUNET_free (ctc->method);
- GNUNET_free (ctc);
- ctc = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Reserve closure committed, running transfer\n");
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
-}
-
-
-/**
* Closure for #expired_reserve_cb().
*/
struct ExpiredReserveContext
@@ -1113,6 +1065,8 @@ expired_reserve_cb (void *cls,
int ret;
enum GNUNET_DB_QueryStatus qs;
struct WireAccount *wa;
+ void *buf;
+ size_t buf_size;
/* NOTE: potential optimization: use custom SQL API to not
fetch this: */
@@ -1121,7 +1075,7 @@ expired_reserve_cb (void *cls,
now = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&now);
- /* lookup wire plugin */
+ /* lookup account we should use */
wa = find_account_by_url (account_details);
if (NULL == wa)
{
@@ -1161,6 +1115,18 @@ expired_reserve_cb (void *cls,
TALER_amount_get_zero (left->currency,
&amount_without_fee));
}
+ /* round down to enable transfer */
+ if (GNUNET_SYSERR ==
+ TALER_WIRE_amount_round (&amount_without_fee))
+ {
+ GNUNET_break (0);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ if ( (0 == amount_without_fee.value) &&
+ (0 == amount_without_fee.fraction) )
+ ret = GNUNET_NO;
/* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to
be future-compatible, we use the memset + min construction */
@@ -1171,61 +1137,23 @@ expired_reserve_cb (void *cls,
reserve_pub,
GNUNET_MIN (sizeof (wtid),
sizeof (*reserve_pub)));
-
- qs = db_plugin->insert_reserve_closed (db_plugin->cls,
- session,
- reserve_pub,
- now,
- account_details,
- &wtid,
- left,
- closing_fee);
-
+ if (GNUNET_SYSERR != ret)
+ qs = db_plugin->insert_reserve_closed (db_plugin->cls,
+ session,
+ reserve_pub,
+ now,
+ account_details,
+ &wtid,
+ left,
+ closing_fee);
+ else
+ ret = GNUNET_DB_STATUS_HARD_ERROR;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Closing reserve %s over %s (%d, %d)\n",
TALER_B2S (reserve_pub),
TALER_amount2s (left),
ret,
qs);
- if ( (GNUNET_OK == ret) &&
- (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) )
- {
- /* success, perform wire transfer */
- if (GNUNET_SYSERR ==
- wa->wire_plugin->amount_round (wa->wire_plugin->cls,
- &amount_without_fee))
- {
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- ctc = GNUNET_new (struct CloseTransferContext);
- ctc->wa = wa;
- ctc->session = session;
- ctc->method = TALER_WIRE_payto_get_method (account_details);
- ctc->ph
- = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls,
- wa->section_name,
- account_details,
- &amount_without_fee,
- exchange_base_url,
- &wtid,
- &prepare_close_cb,
- ctc);
- if (NULL == ctc->ph)
- {
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (ctc->method);
- GNUNET_free (ctc);
- ctc = NULL;
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- erc->async_cont = GNUNET_YES;
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
/* Check for hard failure */
if ( (GNUNET_SYSERR == ret) ||
(GNUNET_DB_STATUS_HARD_ERROR == qs) )
@@ -1235,10 +1163,59 @@ expired_reserve_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
return GNUNET_DB_STATUS_HARD_ERROR;
}
- /* Reserve balance was almost zero OR soft error */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Reserve was virtually empty, moving on\n");
- return qs;
+ if ( (GNUNET_OK != ret) ||
+ (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) )
+ {
+ /* Reserve balance was almost zero OR soft error */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reserve was virtually empty, moving on\n");
+ (void) commit_or_warn (ctc->session);
+ GNUNET_free (ctc->method);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ return qs;
+ }
+
+ /* success, perform wire transfer */
+ ctc = GNUNET_new (struct CloseTransferContext);
+ ctc->wa = wa;
+ ctc->session = session;
+ ctc->method = TALER_WIRE_payto_get_method (account_details);
+ TALER_BANK_prepare_wire_transfer (account_details,
+ &amount_without_fee,
+ exchange_base_url,
+ &wtid,
+ &buf,
+ &buf_size);
+ /* Commit our intention to execute the wire transfer! */
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ ctc->session,
+ ctc->method,
+ buf,
+ buf_size);
+ GNUNET_free (buf);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_break (0);
+ GNUNET_free (ctc->method);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ /* start again */
+ GNUNET_free (ctc->method);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
+ }
+ erc->async_cont = GNUNET_YES;
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -1344,6 +1321,8 @@ run_aggregation (void *cls)
struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
const struct GNUNET_SCHEDULER_TaskContext *tc;
+ void *buf;
+ size_t buf_size;
(void) cls;
task = NULL;
@@ -1470,8 +1449,7 @@ run_aggregation (void *cls)
&au->total_amount,
&au->wire_fee)) ||
(GNUNET_SYSERR ==
- au->wa->wire_plugin->amount_round (au->wa->wire_plugin->cls,
- &au->final_amount)) ||
+ TALER_WIRE_amount_round (&au->final_amount)) ||
( (0 == au->final_amount.value) &&
(0 == au->final_amount.fraction) ) )
{
@@ -1555,70 +1533,26 @@ run_aggregation (void *cls)
char *url;
url = TALER_JSON_wire_to_payto (au->wire);
- au->ph = au->wa->wire_plugin->prepare_wire_transfer (
- au->wa->wire_plugin->cls,
- au->wa->section_name,
- url,
- &au->final_amount,
- exchange_base_url,
- &au->wtid,
- &prepare_cb,
- au);
+ TALER_BANK_prepare_wire_transfer (url,
+ &au->final_amount,
+ exchange_base_url,
+ &au->wtid,
+ &buf,
+ &buf_size);
GNUNET_free (url);
}
- if (NULL == au->ph)
- {
- /* something went very wrong, likely bad configuration,
- abort */
- db_plugin->rollback (db_plugin->cls,
- session);
- cleanup_au ();
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- /* otherwise we continue with #prepare_cb(), see below */
-}
-
-
-/**
- * Function to be called with the prepared transfer data.
- *
- * @param cls NULL
- * @param buf transaction data to persist, NULL on error
- * @param buf_size number of bytes in @a buf, 0 on error
- */
-static void
-prepare_cb (void *cls,
- const char *buf,
- size_t buf_size)
-{
- struct TALER_EXCHANGEDB_Session *session = au->session;
- enum GNUNET_DB_QueryStatus qs;
-
- (void) cls;
GNUNET_free_non_null (au->additional_rows);
au->additional_rows = NULL;
- if (NULL == buf)
- {
- GNUNET_break (0); /* why? how to best recover? */
- db_plugin->rollback (db_plugin->cls,
- session);
- /* start again */
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- cleanup_au ();
- return;
- }
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing %u bytes of wire prepare data\n",
(unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
session,
- au->wa->wire_plugin->method,
+ au->wa->method,
buf,
buf_size);
+ GNUNET_free (buf);
/* Commit the WTID data to 'wire_out' to finally satisfy aggregation
table constraints */
if (qs >= 0)
@@ -1691,29 +1625,30 @@ prepare_cb (void *cls,
* Function called with the result from the execute step.
*
* @param cls NULL
- * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
- * @param serial_id unique ID of the wire transfer in the bank's records; UINT64_MAX on error
- * @param emsg NULL on success, otherwise an error message
+ * @param http_status_code #MHD_HTTP_OK on success
+ * @param ec taler error code
+ * @param row_id unique ID of the wire transfer in the bank's records
+ * @param wire_timestamp when did the transfer happen
*/
static void
wire_confirm_cb (void *cls,
- int success,
- const void *row_id,
- size_t row_id_size,
- const char *emsg)
+ unsigned int http_status_code,
+ enum TALER_ErrorCode ec,
+ uint64_t row_id,
+ struct GNUNET_TIME_Absolute wire_timestamp)
{
struct TALER_EXCHANGEDB_Session *session = wpd->session;
enum GNUNET_DB_QueryStatus qs;
(void) cls;
(void) row_id;
- (void) row_id_size;
wpd->eh = NULL;
- if (GNUNET_SYSERR == success)
+ if (MHD_HTTP_OK != http_status_code)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Wire transaction failed: %s\n",
- emsg);
+ "Wire transaction failed: %u/%d\n",
+ http_status_code,
+ ec);
db_plugin->rollback (db_plugin->cls,
session);
global_ret = GNUNET_SYSERR;
@@ -1792,6 +1727,8 @@ wire_prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
+ struct WireAccount *wa;
+
(void) cls;
wpd->row_id = rowid;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1811,12 +1748,15 @@ wire_prepare_cb (void *cls,
wpd = NULL;
return;
}
- wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer (
- wpd->wa->wire_plugin->cls,
- buf,
- buf_size,
- &wire_confirm_cb,
- NULL);
+ wa = wpd->wa;
+ wpd->eh = TALER_BANK_execute_wire_transfer (ctx,
+ wa->account.details.x_taler_bank.
+ account_base_url,
+ &wa->auth,
+ buf,
+ buf_size,
+ &wire_confirm_cb,
+ NULL);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -1927,6 +1867,7 @@ run (void *cls,
(void) cls;
(void) args;
(void) cfgfile;
+
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (c,
"exchange",
@@ -1947,6 +1888,15 @@ run (void *cls,
global_ret = 1;
return;
}
+ ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
+ &rc);
+ rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ if (NULL == ctx)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,