summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-06-24 00:41:41 +0200
committerChristian Grothoff <christian@grothoff.org>2017-06-24 00:41:41 +0200
commit9aa2e94812278f1bd392483a6a9b36f138c20aef (patch)
tree04c136d81ace5cf3ca265789886168d761b36501
parent6acb0271d6413b250cf78b42aed52a9cb0d0869c (diff)
downloadexchange-9aa2e94812278f1bd392483a6a9b36f138c20aef.tar.gz
exchange-9aa2e94812278f1bd392483a6a9b36f138c20aef.tar.bz2
exchange-9aa2e94812278f1bd392483a6a9b36f138c20aef.zip
working on #5010 for aggregator
-rw-r--r--src/exchange/taler-exchange-aggregator.c150
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c188
-rw-r--r--src/exchangedb/test_exchangedb.c4
-rw-r--r--src/include/taler_exchangedb_plugin.h19
4 files changed, 210 insertions, 151 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 10240190b..8dd46f7f1 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -812,6 +812,8 @@ prepare_close_cb (void *cls,
const char *buf,
size_t buf_size)
{
+ enum GNUNET_DB_QueryStatus qs;
+
GNUNET_assert (cls == ctc);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -832,14 +834,25 @@ prepare_close_cb (void *cls,
}
/* Commit our intention to execute the wire transfer! */
- if (GNUNET_OK !=
- db_plugin->wire_prepare_data_insert (db_plugin->cls,
- ctc->session,
- ctc->type,
- buf,
- buf_size))
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ ctc->session,
+ ctc->type,
+ buf,
+ buf_size);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ ctc->session);
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ GNUNET_free (ctc->type);
+ GNUNET_free (ctc);
+ ctc = NULL;
+ return;
+ }
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
- GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
ctc->session);
/* start again */
@@ -864,32 +877,52 @@ prepare_close_cb (void *cls,
/**
+ * Closure for #expired_reserve_cb().
+ */
+struct ExpiredReserveContext
+{
+
+ /**
+ * Database session we are using.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Set to #GNUNET_YES if the transaction continues
+ * asynchronously.
+ */
+ int async_cont;
+};
+
+
+/**
* Function called with details about expired reserves.
* We trigger the reserve closure by inserting the respective
* closing record and prewire instructions into the respective
* tables.
*
- * @param cls a `struct TALER_EXCHANGEDB_Session *`
+ * @param cls a `struct ExpiredReserveContext *`
* @param reserve_pub public key of the reserve
* @param left amount left in the reserve
* @param account_details information about the reserve's bank account
* @param expiration_date when did the reserve expire
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
expired_reserve_cb (void *cls,
const struct TALER_ReservePublicKeyP *reserve_pub,
const struct TALER_Amount *left,
const json_t *account_details,
struct GNUNET_TIME_Absolute expiration_date)
{
- struct TALER_EXCHANGEDB_Session *session = cls;
+ struct ExpiredReserveContext *erc = cls;
+ struct TALER_EXCHANGEDB_Session *session = erc->session;
struct GNUNET_TIME_Absolute now;
struct TALER_WireTransferIdentifierRawP wtid;
struct TALER_Amount amount_without_fee;
const struct TALER_Amount *closing_fee;
int ret;
- int iret;
+ enum GNUNET_DB_QueryStatus qs;
const char *type;
struct WirePlugin *wp;
@@ -901,21 +934,17 @@ expired_reserve_cb (void *cls,
if (NULL == type)
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
wp = find_plugin (type);
if (NULL == wp)
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
/* lookup `closing_fee` */
@@ -925,11 +954,9 @@ expired_reserve_cb (void *cls,
session))
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
closing_fee = &wp->af->closing_fee;
@@ -956,22 +983,22 @@ expired_reserve_cb (void *cls,
reserve_pub,
GNUNET_MIN (sizeof (wtid),
sizeof (*reserve_pub)));
- iret = db_plugin->insert_reserve_closed (db_plugin->cls,
- session,
- reserve_pub,
- now,
- account_details,
- &wtid,
- left,
- closing_fee);
+ qs = db_plugin->insert_reserve_closed (db_plugin->cls,
+ session,
+ reserve_pub,
+ now,
+ account_details,
+ &wtid,
+ left,
+ closing_fee);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Closing reserve %s over %s (%d, %d)\n",
TALER_B2S (reserve_pub),
TALER_amount2s (left),
ret,
- iret);
+ qs);
if ( (GNUNET_OK == ret) &&
- (GNUNET_OK == iret) )
+ (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) )
{
/* success, perform wire transfer */
if (GNUNET_SYSERR ==
@@ -979,11 +1006,9 @@ expired_reserve_cb (void *cls,
&amount_without_fee))
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
ctc = GNUNET_new (struct CloseTransferContext);
ctc->wp = wp;
@@ -1000,33 +1025,29 @@ expired_reserve_cb (void *cls,
if (NULL == ctc->ph)
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
GNUNET_free (ctc->type);
GNUNET_free (ctc);
ctc = NULL;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
- return GNUNET_SYSERR;
+ erc->async_cont = GNUNET_YES;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/* Check for hard failure */
- if (GNUNET_SYSERR == iret)
+ if ( (GNUNET_SYSERR == ret) ||
+ (GNUNET_DB_STATUS_HARD_ERROR == qs) )
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
- /* Reserve balance was almost zero; just commit */
+ /* Reserve balance was almost zero OR soft error */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reserve was virtually empty, moving on\n");
- (void) commit_or_warn (session);
- task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
- NULL);
- return GNUNET_SYSERR;
+ return qs;
}
@@ -1040,9 +1061,10 @@ static void
run_reserve_closures (void *cls)
{
struct TALER_EXCHANGEDB_Session *session;
- int ret;
+ enum GNUNET_DB_QueryStatus qs;
const struct GNUNET_SCHEDULER_TaskContext *tc;
-
+ struct ExpiredReserveContext erc;
+
task = NULL;
reserves_idle = GNUNET_NO;
tc = GNUNET_SCHEDULER_get_task_context ();
@@ -1068,22 +1090,29 @@ run_reserve_closures (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- ret = db_plugin->get_expired_reserves (db_plugin->cls,
- session,
- GNUNET_TIME_absolute_get (),
- &expired_reserve_cb,
- session);
- if (GNUNET_SYSERR == ret)
+ erc.session = session;
+ erc.async_cont = GNUNET_NO;
+ qs = db_plugin->get_expired_reserves (db_plugin->cls,
+ session,
+ GNUNET_TIME_absolute_get (),
+ &expired_reserve_cb,
+ &erc);
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
return;
- }
- if (GNUNET_NO == ret)
- {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more idle reserves, going back to aggregation\n");
reserves_idle = GNUNET_YES;
@@ -1092,6 +1121,13 @@ run_reserve_closures (void *cls)
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ if (GNUNET_YES == erc.async_cont)
+ break;
+ (void) commit_or_warn (session);
+ task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
+ NULL);
+ return;
}
}
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index a41b19662..98ac11773 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1454,7 +1454,9 @@ postgres_prepare (PGconn *db_conn)
" FROM reserves"
" WHERE expiration_date<=$1"
" AND (current_balance_val != 0 "
- " OR current_balance_frac != 0);",
+ " OR current_balance_frac != 0)"
+ " ORDER BY expiration_date ASC"
+ " LIMIT 1;",
1),
/* Used in #postgres_get_coin_transactions() to obtain payback transactions
for a coin */
@@ -2884,8 +2886,8 @@ postgres_get_ready_deposit (void *cls,
* @param deposit_cb function to call for each deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @param limit maximum number of matching deposits to return
- * @return number of rows processed, 0 if none exist,
- * #GNUNET_SYSERR on error
+ * @return transaction status code, if positive:
+ * number of rows processed, 0 if none exist
*/
static int
postgres_iterate_matching_deposits (void *cls,
@@ -4633,57 +4635,50 @@ postgres_insert_wire_fee (void *cls,
/**
- * Obtain information about expired reserves and their
- * remaining balances.
- *
- * @param cls closure of the plugin
- * @param session database connection
- * @param now timestamp based on which we decide expiration
- * @param rec function to call on expired reserves
- * @param rec_cls closure for @a rec
- * @return #GNUNET_SYSERR on database error
- * #GNUNET_NO if there are no expired non-empty reserves
- * #GNUNET_OK on success
+ * Closure for #reserve_expired_cb().
*/
-static int
-postgres_get_expired_reserves (void *cls,
- struct TALER_EXCHANGEDB_Session *session,
- struct GNUNET_TIME_Absolute now,
- TALER_EXCHANGEDB_ReserveExpiredCallback rec,
- void *rec_cls)
+struct ExpiredReserveContext
{
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_absolute_time (&now),
- GNUNET_PQ_query_param_end
- };
- PGresult *result;
- int nrows;
+ /**
+ * Function to call for each expired reserve.
+ */
+ TALER_EXCHANGEDB_ReserveExpiredCallback rec;
- result = GNUNET_PQ_exec_prepared (session->conn,
- "get_expired_reserves",
- params);
- if (PGRES_TUPLES_OK !=
- PQresultStatus (result))
- {
- BREAK_DB_ERR (result, session->conn);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- nrows = PQntuples (result);
- if (0 == nrows)
- {
- /* no matches found */
- PQclear (result);
- return GNUNET_NO;
- }
+ /**
+ * Closure to give to @e rec.
+ */
+ void *rec_cls;
- for (int i=0;i<nrows;i++)
+ /**
+ * Set to #GNUNET_SYSERR on error.
+ */
+ int status;
+};
+
+
+/**
+ * Function to be called with the results of a SELECT statement
+ * that has returned @a num_results results.
+ *
+ * @param cls closure
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+reserve_expired_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct ExpiredReserveContext *erc = cls;
+ int ret;
+
+ ret = GNUNET_OK;
+ for (unsigned int i=0;i<num_results;i++)
{
struct GNUNET_TIME_Absolute exp_date;
json_t *account_details;
struct TALER_ReservePublicKeyP reserve_pub;
struct TALER_Amount remaining_balance;
- int ret;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_absolute_time ("expiration_date",
&exp_date),
@@ -4701,21 +4696,59 @@ postgres_get_expired_reserves (void *cls,
rs,
i))
{
- PQclear (result);
GNUNET_break (0);
- return GNUNET_SYSERR;
+ ret = GNUNET_SYSERR;
+ break;
}
- ret = rec (rec_cls,
- &reserve_pub,
- &remaining_balance,
- account_details,
- exp_date);
+ ret = erc->rec (erc->rec_cls,
+ &reserve_pub,
+ &remaining_balance,
+ account_details,
+ exp_date);
GNUNET_PQ_cleanup_result (rs);
if (GNUNET_OK != ret)
break;
}
- PQclear (result);
- return GNUNET_OK;
+ erc->status = ret;
+}
+
+
+/**
+ * Obtain information about expired reserves and their
+ * remaining balances.
+ *
+ * @param cls closure of the plugin
+ * @param session database connection
+ * @param now timestamp based on which we decide expiration
+ * @param rec function to call on expired reserves
+ * @param rec_cls closure for @a rec
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_get_expired_reserves (void *cls,
+ struct TALER_EXCHANGEDB_Session *session,
+ struct GNUNET_TIME_Absolute now,
+ TALER_EXCHANGEDB_ReserveExpiredCallback rec,
+ void *rec_cls)
+{
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_end
+ };
+ struct ExpiredReserveContext ectx;
+ enum GNUNET_DB_QueryStatus qs;
+
+ ectx.rec = rec;
+ ectx.rec_cls = rec_cls;
+ ectx.status = GNUNET_OK;
+ qs = GNUNET_PQ_eval_prepared_multi_select (session->conn,
+ "get_expired_reserves",
+ params,
+ &reserve_expired_cb,
+ &ectx);
+ if (GNUNET_OK != ectx.status)
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ return qs;
}
@@ -4730,11 +4763,9 @@ postgres_get_expired_reserves (void *cls,
* @param wtid wire transfer details
* @param amount_with_fee amount we charged to the reserve
* @param closing_fee how high is the closing fee
- * @return #GNUNET_OK on success,
- * #GNUNET_NO if the record exists or on transient errors
- * #GNUNET_SYSERR on failure
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_insert_reserve_closed (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct TALER_ReservePublicKeyP *reserve_pub,
@@ -4757,11 +4788,11 @@ postgres_insert_reserve_closed (void *cls,
int ret;
enum GNUNET_DB_QueryStatus qs;
- ret = execute_prepared_non_select (session,
- "reserves_close_insert",
- params);
- if (GNUNET_OK != ret)
- return ret;
+ qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "reserves_close_insert",
+ params);
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+ return qs;
/* update reserve balance */
reserve.pub = *reserve_pub;
@@ -4770,10 +4801,11 @@ postgres_insert_reserve_closed (void *cls,
session,
&reserve)))
{
- /* FIXME: #5010 */
/* Existence should have been checked before we got here... */
- GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs);
- return (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) ? GNUNET_NO : GNUNET_SYSERR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ return qs;
}
ret = TALER_amount_subtract (&reserve.balance,
&reserve.balance,
@@ -4786,18 +4818,12 @@ postgres_insert_reserve_closed (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Closing of reserve `%s' refused due to balance missmatch. Retrying.\n",
TALER_B2S (reserve_pub));
- return GNUNET_NO;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
GNUNET_break (GNUNET_NO == ret);
- qs = reserves_update (cls,
- session,
- &reserve);
- if (0 >= qs)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
+ return reserves_update (cls,
+ session,
+ &reserve);
}
@@ -4809,9 +4835,9 @@ postgres_insert_reserve_closed (void *cls,
* @param type type of the wire transfer (i.e. "sepa")
* @param buf buffer with wire transfer preparation data
* @param buf_size number of bytes in @a buf
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return query status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_insert (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *type,
@@ -4824,9 +4850,9 @@ postgres_wire_prepare_data_insert (void *cls,
GNUNET_PQ_query_param_end
};
- return execute_prepared_non_select (session,
- "wire_prepare_data_insert",
- params);
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "wire_prepare_data_insert",
+ params);
}
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index ae0c82130..4ca7c39c9 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -119,7 +119,7 @@ test_wire_prepare (struct TALER_EXCHANGEDB_Session *session)
session,
&dead_prepare_cb,
NULL));
- FAILIF (GNUNET_OK !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->wire_prepare_data_insert (plugin->cls,
session,
"testcase",
@@ -1666,7 +1666,7 @@ run (void *cls)
GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":0.000010",
&fee_closing));
- FAILIF (GNUNET_OK !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_reserve_closed (plugin->cls,
session,
&reserve_pub,
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index 9a97b5f37..9f80fda42 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -1009,9 +1009,9 @@ typedef int
* @param left amount left in the reserve
* @param account_details information about the reserve's bank account
* @param expiration_date when did the reserve expire
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code to pass on
*/
-typedef int
+typedef enum GNUNET_DB_QueryStatus
(*TALER_EXCHANGEDB_ReserveExpiredCallback)(void *cls,
const struct TALER_ReservePublicKeyP *reserve_pub,
const struct TALER_Amount *left,
@@ -1821,11 +1821,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param now timestamp based on which we decide expiration
* @param rec function to call on expired reserves
* @param rec_cls closure for @a rec
- * @return #GNUNET_SYSERR on database error
- * #GNUNET_NO if there are no expired non-empty reserves
- * #GNUNET_OK on success
+ * @return transaction status
*/
- int
+ enum GNUNET_DB_QueryStatus
(*get_expired_reserves)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute now,
@@ -1844,10 +1842,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param wtid identifier for the wire transfer
* @param amount_with_fee amount we charged to the reserve
* @param closing_fee how high is the closing fee
- * @return #GNUNET_OK on success, #GNUNET_NO if the record exists,
- * #GNUNET_SYSERR on failure
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*insert_reserve_closed)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct TALER_ReservePublicKeyP *reserve_pub,
@@ -1866,9 +1863,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param type type of the wire transfer (i.e. "sepa")
* @param buf buffer with wire transfer preparation data
* @param buf_size number of bytes in @a buf
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return query status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*wire_prepare_data_insert)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *type,