summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c319
1 files changed, 171 insertions, 148 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 49cbb2b93..fa76cfb03 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016, 2017 GNUnet e.V.
+ Copyright (C) 2016-2018 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -30,19 +30,19 @@
/**
- * Information we keep for each loaded wire plugin.
+ * Information we keep for each supported account.
*/
-struct WirePlugin
+struct WireAccount
{
/**
- * Plugins are kept in a DLL.
+ * Accounts are kept in a DLL.
*/
- struct WirePlugin *next;
+ struct WireAccount *next;
/**
* Plugins are kept in a DLL.
*/
- struct WirePlugin *prev;
+ struct WireAccount *prev;
/**
* Handle to the plugin.
@@ -50,14 +50,14 @@ struct WirePlugin
struct TALER_WIRE_Plugin *wire_plugin;
/**
- * Name of the plugin.
+ * Wire transfer fee structure.
*/
- char *type;
+ struct TALER_EXCHANGEDB_AggregateFees *af;
/**
- * Wire transfer fee structure.
+ * Name of the section that configures this account.
*/
- struct TALER_EXCHANGEDB_AggregateFees *af;
+ char *section_name;
};
@@ -82,7 +82,7 @@ struct WirePrepareData
/**
* Wire plugin used for this preparation.
*/
- struct WirePlugin *wp;
+ struct WireAccount *wa;
/**
* Row ID of the transfer.
@@ -149,9 +149,9 @@ struct AggregationUnit
json_t *wire;
/**
- * Wire plugin to be used for the preparation.
+ * Wire account to be used for the preparation.
*/
- struct WirePlugin *wp;
+ struct WireAccount *wa;
/**
* Database session for all of our transactions.
@@ -200,12 +200,12 @@ struct CloseTransferContext
/**
* Wire transfer method.
*/
- char *type;
+ char *method;
/**
- * Wire plugin used for closing the reserve.
+ * Wire account used for closing the reserve.
*/
- struct WirePlugin *wp;
+ struct WireAccount *wa;
};
@@ -238,12 +238,12 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* Head of list of loaded wire plugins.
*/
-static struct WirePlugin *wp_head;
+static struct WireAccount *wa_head;
/**
* Tail of list of loaded wire plugins.
*/
-static struct WirePlugin *wp_tail;
+static struct WireAccount *wa_tail;
/**
* Next task to run, if any.
@@ -290,49 +290,22 @@ static int reserves_idle;
static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
-/**
- * Extract wire plugin type from @a wire address
- *
- * @param wire a wire address
- * @return NULL if @a wire is ill-formed
- */
-const char *
-extract_type (const json_t *wire)
-{
- const char *type;
- json_t *t;
-
- t = json_object_get (wire, "type");
- if (NULL == t)
- {
- GNUNET_break (0);
- return NULL;
- }
- type = json_string_value (t);
- if (NULL == type)
- {
- GNUNET_break (0);
- return NULL;
- }
- return type;
-}
-
/**
* Advance the "af" pointer in @a wp to point to the
* currently valid record.
*
- * @param wp wire transfer fee data structure to update
+ * @param wa wire transfer fee data structure to update
* @param now timestamp to update fees to
*/
static void
-advance_fees (struct WirePlugin *wp,
+advance_fees (struct WireAccount *wa,
struct GNUNET_TIME_Absolute now)
{
struct TALER_EXCHANGEDB_AggregateFees *af;
/* First, try to see if we have current fee information in memory */
- af = wp->af;
+ af = wa->af;
while ( (NULL != af) &&
(af->end_date.abs_value_us < now.abs_value_us) )
{
@@ -341,7 +314,7 @@ advance_fees (struct WirePlugin *wp,
GNUNET_free (af);
af = n;
}
- wp->af = af;
+ wa->af = af;
}
@@ -354,28 +327,28 @@ advance_fees (struct WirePlugin *wp,
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
-update_fees (struct WirePlugin *wp,
+update_fees (struct WireAccount *wa,
struct GNUNET_TIME_Absolute now,
struct TALER_EXCHANGEDB_Session *session)
{
enum GNUNET_DB_QueryStatus qs;
- advance_fees (wp,
+ advance_fees (wa,
now);
- if (NULL != wp->af)
+ if (NULL != wa->af)
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
/* Let's try to load it from disk... */
- wp->af = TALER_EXCHANGEDB_fees_read (cfg,
- wp->type);
- advance_fees (wp,
+ wa->af = TALER_EXCHANGEDB_fees_read (cfg,
+ wa->wire_plugin->method);
+ advance_fees (wa,
now);
- for (struct TALER_EXCHANGEDB_AggregateFees *p = wp->af;
+ for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af;
NULL != p;
p = p->next)
{
qs = db_plugin->insert_wire_fee (db_plugin->cls,
session,
- wp->type,
+ wa->wire_plugin->method,
p->start_date,
p->end_date,
&p->wire_fee,
@@ -383,53 +356,94 @@ update_fees (struct WirePlugin *wp,
&p->master_sig);
if (qs < 0)
{
- TALER_EXCHANGEDB_fees_free (wp->af);
- wp->af = NULL;
+ TALER_EXCHANGEDB_fees_free (wa->af);
+ wa->af = NULL;
return qs;
}
}
- if (NULL != wp->af)
+ if (NULL != wa->af)
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find current wire transfer fees for `%s'\n",
- wp->type);
+ wa->wire_plugin->method);
return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
}
/**
- * Find the wire plugin for the given wire address.
+ * Find the wire plugin for the given payto:// URL
*
- * @param type wire plugin type we need a plugin for
+ * @param method wire method we need an account for
* @return NULL on error
*/
-static struct WirePlugin *
-find_plugin (const char *type)
+static struct WireAccount *
+find_account_by_method (const char *method)
{
- struct WirePlugin *wp;
+ for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next)
+ if (0 == strcmp (method,
+ wa->wire_plugin->method))
+ return wa;
+ return NULL;
+}
+
- if (NULL == type)
+/**
+ * Find the wire plugin for the given payto:// URL
+ *
+ * @param url wire address we need an account for
+ * @return NULL on error
+ */
+static struct WireAccount *
+find_account_by_url (const char *url)
+{
+ char *method;
+ struct WireAccount *wa;
+
+ method = TALER_WIRE_payto_get_method (url);
+ if (NULL == method)
+ {
+ fprintf (stderr,
+ "Invalid payto:// URL `%s'\n",
+ url);
return NULL;
- for (wp = wp_head; NULL != wp; wp = wp->next)
- if (0 == strcmp (type,
- wp->type))
- return wp;
- wp = GNUNET_new (struct WirePlugin);
- wp->wire_plugin = TALER_WIRE_plugin_load (cfg,
- type);
- if (NULL == wp->wire_plugin)
+ }
+ wa = find_account_by_method (method);
+ GNUNET_free (method);
+ return wa;
+}
+
+
+/**
+ * Function called with information about a wire account. Adds the
+ * account to our list (if it is enabled and we can load the plugin).
+ *
+ * @param cls closure, NULL
+ * @param ai account information
+ */
+static void
+add_account_cb (void *cls,
+ const struct TALER_EXCHANGEDB_AccountInfo *ai)
+{
+ struct WireAccount *wa;
+
+ (void) cls;
+ if (GNUNET_YES != ai->debit_enabled)
+ return; /* not enabled for us, skip */
+ wa = GNUNET_new (struct WireAccount);
+ wa->wire_plugin = TALER_WIRE_plugin_load (cfg,
+ ai->plugin_name);
+ if (NULL == wa->wire_plugin)
{
fprintf (stderr,
"Failed to load wire plugin for `%s'\n",
- type);
- GNUNET_free (wp);
- return NULL;
+ ai->plugin_name);
+ GNUNET_free (wa);
+ return;
}
- wp->type = GNUNET_strdup (type);
- GNUNET_CONTAINER_DLL_insert (wp_head,
- wp_tail,
- wp);
- return wp;
+ wa->section_name = GNUNET_strdup (ai->section_name);
+ GNUNET_CONTAINER_DLL_insert (wa_head,
+ wa_tail,
+ wa);
}
@@ -471,7 +485,7 @@ shutdown_task (void *cls)
{
if (NULL != wpd->eh)
{
- wpd->wp->wire_plugin->execute_wire_transfer_cancel (wpd->wp->wire_plugin->cls,
+ wpd->wa->wire_plugin->execute_wire_transfer_cancel (wpd->wa->wire_plugin->cls,
wpd->eh);
wpd->eh = NULL;
}
@@ -484,7 +498,7 @@ shutdown_task (void *cls)
{
if (NULL != au->ph)
{
- au->wp->wire_plugin->prepare_wire_transfer_cancel (au->wp->wire_plugin->cls,
+ au->wa->wire_plugin->prepare_wire_transfer_cancel (au->wa->wire_plugin->cls,
au->ph);
au->ph = NULL;
}
@@ -494,29 +508,29 @@ shutdown_task (void *cls)
}
if (NULL != ctc)
{
- ctc->wp->wire_plugin->prepare_wire_transfer_cancel (ctc->wp->wire_plugin->cls,
+ ctc->wa->wire_plugin->prepare_wire_transfer_cancel (ctc->wa->wire_plugin->cls,
ctc->ph);
ctc->ph = NULL;
db_plugin->rollback (db_plugin->cls,
ctc->session);
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
}
TALER_EXCHANGEDB_plugin_unload (db_plugin);
{
- struct WirePlugin *wp;
+ struct WireAccount *wa;
- while (NULL != (wp = wp_head))
+ while (NULL != (wa = wa_head))
{
- GNUNET_CONTAINER_DLL_remove (wp_head,
- wp_tail,
- wp);
- TALER_WIRE_plugin_unload (wp->wire_plugin);
- TALER_EXCHANGEDB_fees_free (wp->af);
- GNUNET_free (wp->type);
- GNUNET_free (wp);
+ GNUNET_CONTAINER_DLL_remove (wa_head,
+ wa_tail,
+ wa);
+ TALER_WIRE_plugin_unload (wa->wire_plugin);
+ TALER_EXCHANGEDB_fees_free (wa->af);
+ GNUNET_free (wa->section_name);
+ GNUNET_free (wa);
}
}
GNUNET_CONFIGURATION_destroy (cfg);
@@ -568,7 +582,16 @@ exchange_serve_process_config ()
TALER_EXCHANGEDB_plugin_unload (db_plugin);
return GNUNET_SYSERR;
}
-
+ TALER_EXCHANGEDB_find_accounts (cfg,
+ &add_account_cb,
+ NULL);
+ if (NULL == wa_head)
+ {
+ fprintf (stderr,
+ "No wire accounts configured for debit!\n");
+ TALER_EXCHANGEDB_plugin_unload (db_plugin);
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
@@ -677,16 +700,13 @@ deposit_cb (void *cls,
}
GNUNET_assert (NULL == au->wire);
- au->wire = json_incref ((json_t *) wire);
- if (GNUNET_OK !=
- TALER_JSON_hash (au->wire,
- &au->h_wire))
+ if (NULL == (au->wire = json_incref ((json_t *) wire)))
{
GNUNET_break (0);
- json_decref (au->wire);
- au->wire = NULL;
return GNUNET_DB_STATUS_HARD_ERROR;
}
+ TALER_JSON_wire_signature_hash (wire,
+ &au->h_wire);
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
@@ -695,15 +715,20 @@ deposit_cb (void *cls,
TALER_B2S (&au->wtid),
TALER_amount2s (amount_with_fee),
(unsigned long long) row_id);
+ {
+ char *url;
- au->wp = find_plugin (extract_type (au->wire));
- if (NULL == au->wp)
+ url = TALER_JSON_wire_to_payto (au->wire);
+ au->wa = find_account_by_url (url);
+ GNUNET_free (url);
+ }
+ if (NULL == au->wa)
return GNUNET_DB_STATUS_HARD_ERROR;
/* make sure we have current fees */
au->execution_time = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&au->execution_time);
- qs = update_fees (au->wp,
+ qs = update_fees (au->wa,
au->execution_time,
au->session);
if (qs <= 0)
@@ -713,7 +738,7 @@ deposit_cb (void *cls,
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
- au->wire_fee = au->wp->af->wire_fee;
+ au->wire_fee = au->wa->af->wire_fee;
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
au->session,
@@ -942,7 +967,7 @@ prepare_close_cb (void *cls,
db_plugin->rollback (db_plugin->cls,
ctc->session);
/* start again */
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@@ -953,7 +978,7 @@ prepare_close_cb (void *cls,
/* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
ctc->session,
- ctc->type,
+ ctc->method,
buf,
buf_size);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@@ -963,7 +988,7 @@ prepare_close_cb (void *cls,
ctc->session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
return;
@@ -975,7 +1000,7 @@ prepare_close_cb (void *cls,
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
return;
@@ -983,7 +1008,7 @@ prepare_close_cb (void *cls,
/* finally commit */
(void) commit_or_warn (ctc->session);
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1029,7 +1054,7 @@ static enum GNUNET_DB_QueryStatus
expired_reserve_cb (void *cls,
const struct TALER_ReservePublicKeyP *reserve_pub,
const struct TALER_Amount *left,
- const json_t *account_details,
+ const char *account_details,
struct GNUNET_TIME_Absolute expiration_date)
{
struct ExpiredReserveContext *erc = cls;
@@ -1040,24 +1065,15 @@ expired_reserve_cb (void *cls,
const struct TALER_Amount *closing_fee;
int ret;
enum GNUNET_DB_QueryStatus qs;
- const char *type;
- struct WirePlugin *wp;
+ struct WireAccount *wa;
GNUNET_assert (NULL == ctc);
now = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&now);
/* lookup wire plugin */
- type = extract_type (account_details);
- if (NULL == type)
- {
- GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- wp = find_plugin (type);
- if (NULL == wp)
+ wa = find_account_by_url (account_details);
+ if (NULL == wa)
{
GNUNET_break (0);
global_ret = GNUNET_SYSERR;
@@ -1066,7 +1082,7 @@ expired_reserve_cb (void *cls,
}
/* lookup `closing_fee` */
- qs = update_fees (wp,
+ qs = update_fees (wa,
now,
session);
if (qs <= 0)
@@ -1079,7 +1095,7 @@ expired_reserve_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
return qs;
}
- closing_fee = &wp->af->closing_fee;
+ closing_fee = &wa->af->closing_fee;
/* calculate transfer amount */
ret = TALER_amount_subtract (&amount_without_fee,
@@ -1124,7 +1140,7 @@ expired_reserve_cb (void *cls,
{
/* success, perform wire transfer */
if (GNUNET_SYSERR ==
- wp->wire_plugin->amount_round (wp->wire_plugin->cls,
+ wa->wire_plugin->amount_round (wa->wire_plugin->cls,
&amount_without_fee))
{
GNUNET_break (0);
@@ -1133,11 +1149,12 @@ expired_reserve_cb (void *cls,
return GNUNET_DB_STATUS_HARD_ERROR;
}
ctc = GNUNET_new (struct CloseTransferContext);
- ctc->wp = wp;
+ ctc->wa = wa;
ctc->session = session;
- ctc->type = GNUNET_strdup (type);
+ ctc->method = TALER_WIRE_payto_get_method (account_details);
ctc->ph
- = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls,
+ = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls,
+ wa->section_name,
account_details,
&amount_without_fee,
exchange_base_url,
@@ -1149,7 +1166,7 @@ expired_reserve_cb (void *cls,
GNUNET_break (0);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (ctc->type);
+ GNUNET_free (ctc->method);
GNUNET_free (ctc);
ctc = NULL;
return GNUNET_DB_STATUS_HARD_ERROR;
@@ -1398,7 +1415,7 @@ run_aggregation (void *cls)
&au->total_amount,
&au->wire_fee)) ||
(GNUNET_SYSERR ==
- au->wp->wire_plugin->amount_round (au->wp->wire_plugin->cls,
+ au->wa->wire_plugin->amount_round (au->wa->wire_plugin->cls,
&au->final_amount)) ||
( (0 == au->final_amount.value) &&
(0 == au->final_amount.fraction) ) )
@@ -1479,22 +1496,28 @@ run_aggregation (void *cls)
TALER_B2S (&au->merchant_pub));
GNUNET_free (amount_s);
}
- au->ph = au->wp->wire_plugin->prepare_wire_transfer (au->wp->wire_plugin->cls,
- au->wire,
- &au->final_amount,
- exchange_base_url,
- &au->wtid,
- &prepare_cb,
- au);
+ {
+ char *url;
+
+ url = TALER_JSON_wire_to_payto (au->wire);
+ au->ph = au->wa->wire_plugin->prepare_wire_transfer (au->wa->wire_plugin->cls,
+ au->wa->section_name,
+ url,
+ &au->final_amount,
+ exchange_base_url,
+ &au->wtid,
+ &prepare_cb,
+ au);
+ GNUNET_free (url);
+ }
if (NULL == au->ph)
{
- GNUNET_break (0); /* why? how to best recover? */
+ /* something went very wrong, likely bad configuration,
+ abort */
db_plugin->rollback (db_plugin->cls,
session);
cleanup_au ();
- /* start again */
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ GNUNET_SCHEDULER_shutdown ();
return;
}
/* otherwise we continue with #prepare_cb(), see below */
@@ -1536,7 +1559,7 @@ prepare_cb (void *cls,
/* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
session,
- au->wp->type,
+ au->wa->wire_plugin->method,
buf,
buf_size);
/* Commit the WTID data to 'wire_out' to finally satisfy aggregation
@@ -1711,8 +1734,8 @@ wire_prepare_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n",
(unsigned long long) rowid);
- wpd->wp = find_plugin (wire_method);
- if (NULL == wpd->wp)
+ wpd->wa = find_account_by_method (wire_method);
+ if (NULL == wpd->wa)
{
/* Should really never happen here, as when we get
here the plugin should be in the cache. */
@@ -1725,7 +1748,7 @@ wire_prepare_cb (void *cls,
wpd = NULL;
return;
}
- wpd->eh = wpd->wp->wire_plugin->execute_wire_transfer (wpd->wp->wire_plugin->cls,
+ wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer (wpd->wa->wire_plugin->cls,
buf,
buf_size,
&wire_confirm_cb,