summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/exchange/taler-exchange-aggregator.c411
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c575
-rw-r--r--src/exchangedb/test_exchangedb.c36
-rw-r--r--src/include/taler_exchangedb_plugin.h42
4 files changed, 466 insertions, 598 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 8dd46f7f1..7bd437b5d 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -346,18 +346,19 @@ advance_fees (struct WirePlugin *wp,
* @param wp wire transfer fee data structure to update
* @param now timestamp to update fees to
* @param session DB session to use
- * @return #GNUNET_OK on success, #GNUNET_SYSERR if we
- * lack current fee information (and need to exit)
+ * @return transaction status
*/
-static int
+static enum GNUNET_DB_QueryStatus
update_fees (struct WirePlugin *wp,
struct GNUNET_TIME_Absolute now,
struct TALER_EXCHANGEDB_Session *session)
{
+ enum GNUNET_DB_QueryStatus qs;
+
advance_fees (wp,
now);
if (NULL != wp->af)
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
/* Let's try to load it from disk... */
wp->af = TALER_EXCHANGEDB_fees_read (cfg,
wp->type);
@@ -367,26 +368,26 @@ update_fees (struct WirePlugin *wp,
NULL != p;
p = p->next)
{
- if (GNUNET_SYSERR ==
- db_plugin->insert_wire_fee (db_plugin->cls,
- session,
- wp->type,
- p->start_date,
- p->end_date,
- &p->wire_fee,
- &p->master_sig))
+ qs = db_plugin->insert_wire_fee (db_plugin->cls,
+ session,
+ wp->type,
+ p->start_date,
+ p->end_date,
+ &p->wire_fee,
+ &p->master_sig);
+ if (qs < 0)
{
TALER_EXCHANGEDB_fees_free (wp->af);
wp->af = NULL;
- return GNUNET_SYSERR;
+ return qs;
}
}
if (NULL != wp->af)
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find current wire transfer fees for `%s'\n",
wp->type);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
}
@@ -425,6 +426,26 @@ find_plugin (const char *type)
return wp;
}
+
+/**
+ * Free data stored in #au.
+ */
+static void
+cleanup_au (void)
+{
+ if (NULL == au)
+ return;
+ GNUNET_free_non_null (au->additional_rows);
+ if (NULL != au->wire)
+ {
+ json_decref (au->wire);
+ au->wire = NULL;
+ }
+ GNUNET_free (au);
+ au = NULL;
+}
+
+
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
*
@@ -463,11 +484,7 @@ shutdown_task (void *cls)
}
db_plugin->rollback (db_plugin->cls,
au->session);
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- au = NULL;
- GNUNET_free (au);
+ cleanup_au ();
}
if (NULL != ctc)
{
@@ -564,9 +581,9 @@ exchange_serve_process_config ()
* @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed
* @param wire wire details for the merchant
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
-static int
+static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls,
uint64_t row_id,
const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -588,7 +605,7 @@ deposit_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at row %llu\n",
(unsigned long long) row_id);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
au->row_id = row_id;
GNUNET_assert (NULL == au->wire);
@@ -604,38 +621,41 @@ deposit_cb (void *cls,
au->wp = find_plugin (extract_type (au->wire));
if (NULL == au->wp)
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
/* make sure we have current fees */
au->execution_time = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&au->execution_time);
- if (GNUNET_OK !=
- update_fees (au->wp,
- au->execution_time,
- au->session))
- return GNUNET_SYSERR;
+ qs = update_fees (au->wp,
+ au->execution_time,
+ au->session);
+ if (qs <= 0)
+ {
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
au->wire_fee = au->wp->af->wire_fee;
- if (GNUNET_OK !=
- db_plugin->insert_aggregation_tracking (db_plugin->cls,
- au->session,
- &au->wtid,
- row_id))
+ qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ row_id);
+ if (qs <= 0)
{
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
}
qs = db_plugin->mark_deposit_done (db_plugin->cls,
au->session,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
- /* FIXME #5010 */
- GNUNET_break (0);
- au->failed = GNUNET_YES;
- return GNUNET_SYSERR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
}
- return GNUNET_OK;
+ return qs;
}
@@ -653,9 +673,9 @@ deposit_cb (void *cls,
* @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed
* @param wire wire details for the merchant
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
aggregate_cb (void *cls,
uint64_t row_id,
const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -682,9 +702,12 @@ aggregate_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at %llu\n",
(unsigned long long) row_id);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
/* add to total */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding transaction amount %s to aggregation\n",
+ TALER_amount2s (&delta));
if (GNUNET_OK !=
TALER_amount_add (&au->total_amount,
&au->total_amount,
@@ -694,14 +717,14 @@ aggregate_cb (void *cls,
"Overflow or currency incompatibility during aggregation at %llu\n",
(unsigned long long) row_id);
/* Skip this one, but keep going! */
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
if (au->rows_offset >= aggregation_limit)
{
/* Bug: we asked for at most #aggregation_limit results! */
GNUNET_break (0);
/* Skip this one, but keep going. */
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
if (NULL == au->additional_rows)
au->additional_rows = GNUNET_new_array (aggregation_limit,
@@ -709,26 +732,27 @@ aggregate_cb (void *cls,
/* "append" to our list of rows */
au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */
- if (GNUNET_OK !=
- db_plugin->insert_aggregation_tracking (db_plugin->cls,
- au->session,
- &au->wtid,
- row_id))
+ qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
+ au->session,
+ &au->wtid,
+ row_id);
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
- GNUNET_break (0);
- return GNUNET_SYSERR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
}
qs = db_plugin->mark_deposit_done (db_plugin->cls,
au->session,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
- /* FIXME: #5010 */
- GNUNET_break (0);
- au->failed = GNUNET_YES;
- return GNUNET_SYSERR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
}
- return GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Added row %llu to aggregation\n",
+ (unsigned long long) row_id);
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -948,15 +972,18 @@ expired_reserve_cb (void *cls,
}
/* lookup `closing_fee` */
- if (GNUNET_OK !=
- update_fees (wp,
- now,
- session))
+ qs = update_fees (wp,
+ now,
+ session);
+ if (qs <= 0)
{
- GNUNET_break (0);
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_DB_STATUS_HARD_ERROR;
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ GNUNET_SCHEDULER_shutdown ();
+ return qs;
}
closing_fee = &wp->af->closing_fee;
@@ -1144,7 +1171,6 @@ run_aggregation (void *cls)
static int swap;
struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
- int ret;
const struct GNUNET_SCHEDULER_TaskContext *tc;
task = NULL;
@@ -1179,19 +1205,16 @@ run_aggregation (void *cls)
}
au = GNUNET_new (struct AggregationUnit);
au->session = session;
- ret = db_plugin->get_ready_deposit (db_plugin->cls,
- session,
- &deposit_cb,
- au);
- if (0 >= ret)
+ qs = db_plugin->get_ready_deposit (db_plugin->cls,
+ session,
+ &deposit_cb,
+ au);
+ if (0 >= qs)
{
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
db_plugin->rollback (db_plugin->cls,
session);
- if (GNUNET_SYSERR == ret)
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
@@ -1199,6 +1222,14 @@ run_aggregation (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ /* should re-try immediately */
+ swap--; /* do not count failed attempts */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more ready deposits, going to sleep\n");
if ( (GNUNET_YES == test_mode) &&
@@ -1209,12 +1240,13 @@ run_aggregation (void *cls)
}
else
{
- /* nothing to do, sleep for a minute and try again */
if ( (GNUNET_NO == reserves_idle) ||
- (GNUNET_YES == test_mode) )
+ (GNUNET_YES == test_mode) )
+ /* Possibly more to on reserves, go for it immediately */
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
NULL);
else
+ /* nothing to do, sleep for a minute and try again */
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&run_aggregation,
NULL);
@@ -1226,29 +1258,37 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Found ready deposit for %s, aggregating\n",
TALER_B2S (&au->merchant_pub));
- ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
- session,
- &au->h_wire,
- &au->merchant_pub,
- &aggregate_cb,
- au,
- aggregation_limit);
- if ( (GNUNET_SYSERR == ret) ||
+ qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
+ session,
+ &au->h_wire,
+ &au->merchant_pub,
+ &aggregate_cb,
+ au,
+ aggregation_limit);
+ if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
(GNUNET_YES == au->failed) )
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
- GNUNET_free_non_null (au->additional_rows);
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
db_plugin->rollback (db_plugin->cls,
session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
return;
}
-
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ return;
+ }
+
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
@@ -1263,13 +1303,16 @@ run_aggregation (void *cls)
(0 == au->final_amount.fraction) ) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Aggregate value too low for transfer\n");
+ "Aggregate value too low for transfer (%d/%s)\n",
+ qs,
+ TALER_amount2s (&au->final_amount));
/* Rollback ongoing transaction, as we will not use the respective
WTID and thus need to remove the tracking data */
db_plugin->rollback (db_plugin->cls,
session);
- /* Start another transaction to mark all* of the selected deposits
- *as minor! */
+
+ /* There were results, just the value was too low. Start another
+ transaction to mark all* of the selected deposits as minor! */
if (GNUNET_OK !=
db_plugin->start (db_plugin->cls,
session))
@@ -1277,16 +1320,11 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR;
+ cleanup_au ();
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
return;
}
/* Mark transactions by row_id as minor */
- ret = GNUNET_OK;
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
session,
au->row_id);
@@ -1303,13 +1341,11 @@ run_aggregation (void *cls)
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls,
session);
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
@@ -1319,21 +1355,13 @@ run_aggregation (void *cls)
{
db_plugin->rollback (db_plugin->cls,
session);
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
GNUNET_SCHEDULER_shutdown ();
return;
}
/* commit */
(void) commit_or_warn (session);
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
@@ -1361,11 +1389,7 @@ run_aggregation (void *cls)
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
- GNUNET_free_non_null (au->additional_rows);
- if (NULL != au->wire)
- json_decref (au->wire);
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
@@ -1388,8 +1412,10 @@ prepare_cb (void *cls,
size_t buf_size)
{
struct TALER_EXCHANGEDB_Session *session = au->session;
+ enum GNUNET_DB_QueryStatus qs;
GNUNET_free_non_null (au->additional_rows);
+ au->additional_rows = NULL;
if (NULL == buf)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -1398,74 +1424,53 @@ prepare_cb (void *cls,
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
- if (NULL != au->wire)
- {
- json_decref (au->wire);
- au->wire = NULL;
- }
- GNUNET_free (au);
- au = NULL;
+ cleanup_au ();
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing %u bytes of wire prepare data\n",
+ (unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */
- if (GNUNET_OK !=
- db_plugin->wire_prepare_data_insert (db_plugin->cls,
- session,
- au->wp->type,
- buf,
- buf_size))
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ session,
+ au->wp->type,
+ buf,
+ buf_size);
+ /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
+ table constraints */
+ if (qs >= 0)
+ qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
+ session,
+ au->execution_time,
+ &au->wtid,
+ au->wire,
+ &au->final_amount);
+ cleanup_au ();
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
- GNUNET_break (0); /* why? how to best recover? */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue for prepared wire data; trying again later!\n");
db_plugin->rollback (db_plugin->cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
- if (NULL != au->wire)
- {
- json_decref (au->wire);
- au->wire = NULL;
- }
- GNUNET_free (au);
- au = NULL;
return;
}
-
- /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
- table constraints */
- if (GNUNET_OK !=
- db_plugin->store_wire_transfer_out (db_plugin->cls,
- session,
- au->execution_time,
- &au->wtid,
- au->wire,
- &au->final_amount))
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
- GNUNET_break (0); /* why? how to best recover? */
+ GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
- /* start again */
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
- if (NULL != au->wire)
- {
- json_decref (au->wire);
- au->wire = NULL;
- }
- GNUNET_free (au);
- au = NULL;
+ /* die hard */
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Stored wire transfer out instructions\n");
- if (NULL != au->wire)
- {
- json_decref (au->wire);
- au->wire = NULL;
- }
- GNUNET_free (au);
- au = NULL;
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
@@ -1473,6 +1478,8 @@ prepare_cb (void *cls,
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Commit issue for prepared wire data; trying again later!\n");
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
return;
@@ -1512,6 +1519,7 @@ wire_confirm_cb (void *cls,
const char *emsg)
{
struct TALER_EXCHANGEDB_Session *session = wpd->session;
+ enum GNUNET_DB_QueryStatus qs;
wpd->eh = NULL;
if (GNUNET_SYSERR == success)
@@ -1527,16 +1535,25 @@ wire_confirm_cb (void *cls,
wpd = NULL;
return;
}
- if (GNUNET_OK !=
- db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
- session,
- wpd->row_id))
+ qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
+ session,
+ wpd->row_id);
+ if (0 >= qs)
{
- GNUNET_break (0); /* why!? */
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
db_plugin->rollback (db_plugin->cls,
session);
- global_ret = GNUNET_SYSERR;
- GNUNET_SCHEDULER_shutdown ();
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ NULL);
+ }
+ else
+ {
+ global_ret = GNUNET_SYSERR;
+ GNUNET_SCHEDULER_shutdown ();
+ }
GNUNET_free (wpd);
wpd = NULL;
return;
@@ -1621,7 +1638,7 @@ wire_prepare_cb (void *cls,
static void
run_transfers (void *cls)
{
- int ret;
+ enum GNUNET_DB_QueryStatus qs;
struct TALER_EXCHANGEDB_Session *session;
const struct GNUNET_SCHEDULER_TaskContext *tc;
@@ -1651,35 +1668,39 @@ run_transfers (void *cls)
}
wpd = GNUNET_new (struct WirePrepareData);
wpd->session = session;
- ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
- session,
- &wire_prepare_cb,
- NULL);
- if (GNUNET_SYSERR == ret)
+ qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
+ session,
+ &wire_prepare_cb,
+ NULL);
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
+ return; /* continues in #wire_prepare_cb() */
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ GNUNET_free (wpd);
+ wpd = NULL;
+ switch (qs)
{
- GNUNET_break (0); /* why? how to best recover? */
- db_plugin->rollback (db_plugin->cls,
- session);
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
return;
- }
- if (GNUNET_NO == ret)
- {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go back to aggregation! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, starting aggregation\n");
- db_plugin->rollback (db_plugin->cls,
- session);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
- GNUNET_free (wpd);
- wpd = NULL;
return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* should be impossible */
+ GNUNET_assert (0);
}
- /* otherwise, continues in #wire_prepare_cb() */
}
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 9ec998aa9..4c94c2d79 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -91,24 +91,6 @@ struct TALER_EXCHANGEDB_Session
*/
PGconn *conn;
- /**
- * Transaction state. Set to #GNUNET_OK by #postgres_start().
- * Set to #GNUNET_NO if any part of the transaction failed in a
- * transient way (i.e. #PG_DIAG_SQLSTATE_DEADLOCK or
- * #PG_DIAG_SQLSTATE_SERIALIZATION_FAILURE). Set to
- * #GNUNET_SYSERR if any part of the transaction failed in a
- * hard way or if we are not within a transaction scope.
- *
- * If #GNUNET_NO, #postgres_commit() will always just do a
- * rollback and return #GNUNET_NO as well (to retry).
- *
- * If #GNUNET_SYSERR, #postgres_commit() will always just do a
- * rollback and return #GNUNET_SYSERR as well.
- *
- * If #GNUNET_OK, #postgres_commit() will try to commit and
- * return the result from the commit operation.
- */
- int state;
};
@@ -1554,7 +1536,6 @@ postgres_get_session (void *cls)
return NULL;
}
session = GNUNET_new (struct TALER_EXCHANGEDB_Session);
- session->state = GNUNET_SYSERR;
session->conn = db_conn;
if (0 != pthread_setspecific (pc->db_conn_threadlocal,
session))
@@ -1592,11 +1573,9 @@ postgres_start (void *cls,
PQerrorMessage (session->conn));
GNUNET_break (0);
PQclear (result);
- session->state = GNUNET_SYSERR;
return GNUNET_SYSERR;
}
PQclear (result);
- session->state = GNUNET_OK;
return GNUNET_OK;
}
@@ -1619,51 +1598,6 @@ postgres_rollback (void *cls,
GNUNET_break (PGRES_COMMAND_OK ==
PQresultStatus (result));
PQclear (result);
- session->state = GNUNET_SYSERR;
-}
-
-
-/**
- * Check the @a result's error code to see what happened.
- * Also logs errors.
- *
- * @param session session used
- * @param result result to check
- * @return #GNUNET_OK if the request/transaction succeeded
- * #GNUNET_NO if it failed but could succeed if retried
- * #GNUNET_SYSERR on hard errors
- */
-static int
-evaluate_pq_result (struct TALER_EXCHANGEDB_Session *session,
- PGresult *result)
-{
- if (PGRES_COMMAND_OK !=
- PQresultStatus (result))
- {
- const char *sqlstate;
-
- sqlstate = PQresultErrorField (result,
- PG_DIAG_SQLSTATE);
- if (NULL == sqlstate)
- {
- /* very unexpected... */
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- if ( (0 == strcmp (sqlstate,
- PQ_DIAG_SQLSTATE_DEADLOCK)) ||
- (0 == strcmp (sqlstate,
- PQ_DIAG_SQLSTATE_SERIALIZATION_FAILURE)) )
- {
- /* These two can be retried and have a fair chance of working
- the next time */
- QUERY_ERR (result, session->conn);
- return GNUNET_NO;
- }
- BREAK_DB_ERR(result, session->conn);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
}
@@ -1689,87 +1623,6 @@ postgres_commit (void *cls,
/**
- * Update the @a session state based on the latest @a result from
- * the database. Checks the status code of @a result and possibly
- * sets the state to failed (#GNUNET_SYSERR) or transiently failed
- * (#GNUNET_NO).
- *
- * @param session the session in which the transaction is running
- * @param statement name of the statement we were executing (for logging)
- * @param result the result we got from Postgres
- * @return current session state, i.e.
- * #GNUNET_OK on success
- * #GNUNET_NO if the transaction had a transient failure
- * #GNUNET_SYSERR if the transaction had a hard failure
- */
-static int
-update_session_from_result (struct TALER_EXCHANGEDB_Session *session,
- const char *statement,
- PGresult *result)
-{
- int ret;
-
- if (GNUNET_OK != session->state)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR; /* we already failed, why do we keep going? */
- }
- ret = evaluate_pq_result (session,
- result);
- if (GNUNET_OK == ret)
- return ret;
- GNUNET_log ((GNUNET_NO == ret)
- ? GNUNET_ERROR_TYPE_INFO
- : GNUNET_ERROR_TYPE_ERROR,
- "Statement `%s' failed: %s/%s/%s/%s/%s",
- statement,
- PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY),
- PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL),
- PQresultErrorMessage (result),
- PQresStatus (PQresultStatus (result)),
- PQerrorMessage (session->conn));
- session->state = ret;
- return ret;
-}
-
-
-/**
- * Execute a named prepared @a statement that is NOT a SELECT statement
- * in @a session using the given @a params. Returns the resulting session
- * state.
- *
- * @param session session to execute the statement in
- * @param statement name of the statement
- * @param params parameters to give to the statement (#GNUNET_PQ_query_param_end-terminated)
- * @return #GNUNET_OK on success
- * #GNUNET_NO if the transaction had a transient failure
- * #GNUNET_SYSERR if the transaction had a hard failure
- */
-static int
-execute_prepared_non_select (struct TALER_EXCHANGEDB_Session *session,
- const char *statement,
- const struct GNUNET_PQ_QueryParam *params)
-{
- PGresult *result;
- int ret;
-
- if (GNUNET_OK != session->state)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR; /* we already failed, why keep going? */
- }
- result = GNUNET_PQ_exec_prepared (session->conn,
- statement,
- params);
- ret = update_session_from_result (session,
- statement,
- result);
- PQclear (result);
- return ret;
-}
-
-
-/**
* Insert a denomination key's public information into the database for
* reference by auditors and other consistency checks.
*
@@ -2787,10 +2640,9 @@ postgres_mark_deposit_done (void *cls,
* @param session connection to the database
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
- * @return number of rows processed, 0 if none exist,
- * #GNUNET_SYSERR on error
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_get_ready_deposit (void *cls,
struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
@@ -2801,77 +2653,161 @@ postgres_get_ready_deposit (void *cls,
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_end
};
- PGresult *result;
- unsigned int n;
- int ret;
+ struct TALER_Amount amount_with_fee;
+ struct TALER_Amount deposit_fee;
+ struct GNUNET_TIME_Absolute wire_deadline;
+ struct GNUNET_HashCode h_contract_terms;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+ uint64_t serial_id;
+ json_t *wire;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
+ &serial_id),
+ TALER_PQ_result_spec_amount ("amount_with_fee",
+ &amount_with_fee),
+ TALER_PQ_result_spec_amount ("fee_deposit",
+ &deposit_fee),
+ GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
+ &wire_deadline),
+ GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
+ &h_contract_terms),
+ GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
+ &merchant_pub),
+ GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
+ &coin_pub),
+ TALER_PQ_result_spec_json ("wire",
+ &wire),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
- result = GNUNET_PQ_exec_prepared (session->conn,
- "deposits_get_ready",
- params);
- if (PGRES_TUPLES_OK !=
- PQresultStatus (result))
- {
- BREAK_DB_ERR (result, session->conn);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- if (0 == (n = PQntuples (result)))
- {
- PQclear (result);
- return 0;
- }
- GNUNET_break (1 == n);
+ qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+ "deposits_get_ready",
+ params,
+ rs);
+ if (qs <= 0)
+ return qs;
+ qs = deposit_cb (deposit_cb_cls,
+ serial_id,
+ &merchant_pub,
+ &coin_pub,
+ &amount_with_fee,
+ &deposit_fee,
+ &h_contract_terms,
+ wire_deadline,
+ wire);
+ GNUNET_PQ_cleanup_result (rs);
+ return qs;
+}
+
+
+/**
+ * Closure for #match_deposit_cb().
+ */
+struct MatchingDepositContext
+{
+ /**
+ * Function to call for each result
+ */
+ TALER_EXCHANGEDB_DepositIterator deposit_cb;
+
+ /**
+ * Closure for @e deposit_cb.
+ */
+ void *deposit_cb_cls;
+
+ /**
+ * Public key of the merchant against which we are matching.
+ */
+ const struct TALER_MerchantPublicKeyP *merchant_pub;
+
+ /**
+ * Maximum number of results to return.
+ */
+ uint32_t limit;
+
+ /**
+ * Loop counter, actual number of results returned.
+ */
+ unsigned int i;
+
+ /**
+ * Set to #GNUNET_SYSERR on hard errors.
+ */
+ int status;
+};
+
+
+/**
+ * Helper function for #postgres_iterate_matching_deposits().
+ * To be called with the results of a SELECT statement
+ * that has returned @a num_results results.
+ *
+ * @param cls closure of type `struct MatchingDepositContext *`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+match_deposit_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct MatchingDepositContext *mdc = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found %u/%u matching deposits\n",
+ num_results,
+ mdc->limit);
+ num_results = GNUNET_MIN (num_results,
+ mdc->limit);
+ for (mdc->i=0;mdc->i<num_results;mdc->i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract_terms;
- struct TALER_MerchantPublicKeyP merchant_pub;
struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t serial_id;
- json_t *wire;
+ enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
- &serial_id),
+ &serial_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
&amount_with_fee),
TALER_PQ_result_spec_amount ("fee_deposit",
&deposit_fee),
GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
- &wire_deadline),
+ &wire_deadline),
GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
- &h_contract_terms),
- GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
- &merchant_pub),
+ &h_contract_terms),
GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
- &coin_pub),
- TALER_PQ_result_spec_json ("wire",
- &wire),
+ &coin_pub),
GNUNET_PQ_result_spec_end
};
-
+
if (GNUNET_OK !=
GNUNET_PQ_extract_result (result,
rs,
- 0))
+ mdc->i))
{
GNUNET_break (0);
- PQclear (result);
- return GNUNET_SYSERR;
+ mdc->status = GNUNET_SYSERR;
+ return;
}
- ret = deposit_cb (deposit_cb_cls,
- serial_id,
- &merchant_pub,
- &coin_pub,
- &amount_with_fee,
- &deposit_fee,
- &h_contract_terms,
- wire_deadline,
- wire);
+ qs = mdc->deposit_cb (mdc->deposit_cb_cls,
+ serial_id,
+ mdc->merchant_pub,
+ &coin_pub,
+ &amount_with_fee,
+ &deposit_fee,
+ &h_contract_terms,
+ wire_deadline,
+ NULL);
GNUNET_PQ_cleanup_result (rs);
- PQclear (result);
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+ break;
}
- return (GNUNET_OK == ret) ? 1 : 0;
}
@@ -2889,7 +2825,7 @@ postgres_get_ready_deposit (void *cls,
* @return transaction status code, if positive:
* number of rows processed, 0 if none exist
*/
-static int // FIXME: enum GNUNET_DB_QueryStatus
+static enum GNUNET_DB_QueryStatus
postgres_iterate_matching_deposits (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_HashCode *h_wire,
@@ -2903,75 +2839,27 @@ postgres_iterate_matching_deposits (void *cls,
GNUNET_PQ_query_param_auto_from_type (h_wire),
GNUNET_PQ_query_param_end
};
- PGresult *result;
- unsigned int i;
- unsigned int n;
+ struct MatchingDepositContext mdc;
+ enum GNUNET_DB_QueryStatus qs;
- result = GNUNET_PQ_exec_prepared (session->conn,
- "deposits_iterate_matching",
- params);
- if (PGRES_TUPLES_OK !=
- PQresultStatus (result))
- {
- BREAK_DB_ERR (result, session->conn);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- if (0 == (n = PQntuples (result)))
- {
- PQclear (result);
- return 0;
- }
- if (n > limit)
- n = limit;
- for (i=0;i<n;i++)
+ mdc.deposit_cb = deposit_cb;
+ mdc.deposit_cb_cls = deposit_cb_cls;
+ mdc.merchant_pub = merchant_pub;
+ mdc.limit = limit;
+ mdc.status = GNUNET_OK;
+ qs = GNUNET_PQ_eval_prepared_multi_select (session->conn,
+ "deposits_iterate_matching",
+ params,
+ &match_deposit_cb,
+ &mdc);
+ if (GNUNET_OK != mdc.status)
{
- struct TALER_Amount amount_with_fee;
- struct TALER_Amount deposit_fee;
- struct GNUNET_TIME_Absolute wire_deadline;
- struct GNUNET_HashCode h_contract_terms;
- struct TALER_CoinSpendPublicKeyP coin_pub;
- uint64_t serial_id;
- int ret;
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
- &serial_id),
- TALER_PQ_result_spec_amount ("amount_with_fee",
- &amount_with_fee),
- TALER_PQ_result_spec_amount ("fee_deposit",
- &deposit_fee),
- GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
- &wire_deadline),
- GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
- &h_contract_terms),
- GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
- &coin_pub),
- GNUNET_PQ_result_spec_end
- };
- if (GNUNET_OK !=
- GNUNET_PQ_extract_result (result,
- rs,
- i))
- {
- GNUNET_break (0);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- ret = deposit_cb (deposit_cb_cls,
- serial_id,
- merchant_pub,
- &coin_pub,
- &amount_with_fee,
- &deposit_fee,
- &h_contract_terms,
- wire_deadline,
- NULL);
- GNUNET_PQ_cleanup_result (rs);
- if (GNUNET_OK != ret)
- break;
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
- PQclear (result);
- return i;
+ if (qs >= 0)
+ return mdc.i;
+ return qs;
}
@@ -4493,11 +4381,9 @@ postgres_wire_lookup_deposit_wtid (void *cls,
* @param session database connection
* @param wtid the raw wire transfer identifier we used
* @param deposit_serial_id row in the deposits table for which this is aggregation data
- * @return #GNUNET_OK on success,
- * #GNUNET_NO on transient errors
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_insert_aggregation_tracking (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct TALER_WireTransferIdentifierRawP *wtid,
@@ -4510,9 +4396,9 @@ postgres_insert_aggregation_tracking (void *cls,
GNUNET_PQ_query_param_end
};
- return execute_prepared_non_select (session,
- "insert_aggregation_tracking",
- params);
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "insert_aggregation_tracking",
+ params);
}
@@ -4569,11 +4455,9 @@ postgres_get_wire_fee (void *cls,
* @param end_date when does the fee end being valid
* @param wire_fee how high is the wire transfer fee
* @param master_sig signature over the above by the exchange master key
- * @return #GNUNET_OK on success or if the record exists,
- * #GNUNET_NO on transient errors
- * #GNUNET_SYSERR on failure
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_insert_wire_fee (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *type,
@@ -4594,43 +4478,46 @@ postgres_insert_wire_fee (void *cls,
struct TALER_MasterSignatureP sig;
struct GNUNET_TIME_Absolute sd;
struct GNUNET_TIME_Absolute ed;
+ enum GNUNET_DB_QueryStatus qs;
- if (GNUNET_OK ==
- postgres_get_wire_fee (cls,
- session,
- type,
- start_date,
- &sd,
- &ed,
- &wf,
- &sig))
+ qs = postgres_get_wire_fee (cls,
+ session,
+ type,
+ start_date,
+ &sd,
+ &ed,
+ &wf,
+ &sig);
+ if (qs < 0)
+ return qs;
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
if (0 != memcmp (&sig,
master_sig,
sizeof (sig)))
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
if (0 != TALER_amount_cmp (wire_fee,
&wf))
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
if ( (sd.abs_value_us != start_date.abs_value_us) ||
(ed.abs_value_us != end_date.abs_value_us) )
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
/* equal record already exists */
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
}
- return execute_prepared_non_select (session,
- "insert_wire_fee",
- params);
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "insert_wire_fee",
+ params);
}
@@ -4862,9 +4749,9 @@ postgres_wire_prepare_data_insert (void *cls,
* @param cls closure
* @param session database connection
* @param rowid which entry to mark as finished
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_mark_finished (void *cls,
struct TALER_EXCHANGEDB_Session *session,
uint64_t rowid)
@@ -4874,9 +4761,9 @@ postgres_wire_prepare_data_mark_finished (void *cls,
GNUNET_PQ_query_param_end
};
- return execute_prepared_non_select (session,
- "wire_prepare_data_mark_done",
- params);
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "wire_prepare_data_mark_done",
+ params);
}
@@ -4888,76 +4775,46 @@ postgres_wire_prepare_data_mark_finished (void *cls,
* @param session database connection
* @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb
- * @return #GNUNET_OK on success,
- * #GNUNET_NO if there are no entries,
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_get (void *cls,
struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls)
{
- PGresult *result;
+ enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_end
};
+ uint64_t prewire_uuid;
+ char *type;
+ void *buf = NULL;
+ size_t buf_size;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
+ &prewire_uuid),
+ GNUNET_PQ_result_spec_string ("type",
+ &type),
+ GNUNET_PQ_result_spec_variable_size ("buf",
+ &buf,
+ &buf_size),
+ GNUNET_PQ_result_spec_end
+ };
- result = GNUNET_PQ_exec_prepared (session->conn,
- "wire_prepare_data_get",
- params);
- if (PGRES_TUPLES_OK != PQresultStatus (result))
- {
- QUERY_ERR (result, session->conn);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- if (0 == PQntuples (result))
- {
- PQclear (result);
- return GNUNET_NO;
- }
- if (1 != PQntuples (result))
- {
- GNUNET_break (0);
- PQclear (result);
- return GNUNET_SYSERR;
- }
-
- {
- uint64_t prewire_uuid;
- char *type;
- void *buf = NULL;
- size_t buf_size;
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
- &prewire_uuid),
- GNUNET_PQ_result_spec_string ("type",
- &type),
- GNUNET_PQ_result_spec_variable_size ("buf",
- &buf,
- &buf_size),
- GNUNET_PQ_result_spec_end
- };
-
- if (GNUNET_OK !=
- GNUNET_PQ_extract_result (result,
- rs,
- 0))
- {
- GNUNET_break (0);
- PQclear (result);
- return GNUNET_SYSERR;
- }
- cb (cb_cls,
- prewire_uuid,
- type,
- buf,
- buf_size);
- GNUNET_PQ_cleanup_result (rs);
- }
- PQclear (result);
- return GNUNET_OK;
+ qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+ "wire_prepare_data_get",
+ params,
+ rs);
+ if (0 >= qs)
+ return qs;
+ cb (cb_cls,
+ prewire_uuid,
+ type,
+ buf,
+ buf_size);
+ GNUNET_PQ_cleanup_result (rs);
+ return qs;
}
@@ -5003,7 +4860,6 @@ postgres_start_deferred_wire_out (void *cls,
return GNUNET_SYSERR;
}
PQclear (result);
- session->state = GNUNET_OK;
return GNUNET_OK;
}
@@ -5017,10 +4873,9 @@ postgres_start_deferred_wire_out (void *cls,
* @param wtid subject of the wire transfer
* @param wire_account details about the receiver account of the wire transfer
* @param amount amount that was transmitted
- * @return #GNUNET_OK on success
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
-static int
+static enum GNUNET_DB_QueryStatus
postgres_store_wire_transfer_out (void *cls,
struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date,
@@ -5036,9 +4891,9 @@ postgres_store_wire_transfer_out (void *cls,
GNUNET_PQ_query_param_end
};
- return execute_prepared_non_select (session,
- "insert_wire_out",
- params);
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "insert_wire_out",
+ params);
}
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 4ca7c39c9..773d64362 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -914,10 +914,9 @@ static uint64_t deposit_rowid;
* @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed
* @param wire wire details for the merchant, NULL from iterate_matching_deposits()
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR if deposit does
- * not match our expectations
+ * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
-static int
+static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls,
uint64_t rowid,
const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -953,10 +952,10 @@ deposit_cb (void *cls,
sizeof (struct GNUNET_HashCode))) ) )
{
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
- return GNUNET_OK;
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -1164,7 +1163,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK,
&master_sig,
sizeof (master_sig));
- if (GNUNET_OK !=
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_wire_fee (plugin->cls,
session,
"wire-method",
@@ -1176,7 +1175,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
GNUNET_break (0);
return GNUNET_SYSERR;
}
- if (GNUNET_OK !=
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->insert_wire_fee (plugin->cls,
session,
"wire-method",
@@ -1800,7 +1799,7 @@ run (void *cls)
NULL));
FAILIF (1 != auditor_row_cnt);
result = 9;
- FAILIF (1 !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->iterate_matching_deposits (plugin->cls,
session,
&deposit.h_wire,
@@ -1808,7 +1807,7 @@ run (void *cls)
&deposit_cb, &deposit,
2));
- FAILIF (1 !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
session,
&deposit_cb,
@@ -1821,7 +1820,7 @@ run (void *cls)
session));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_tiny (plugin->cls,
- session,
+ session,
deposit_rowid));
FAILIF (0 !=
plugin->get_ready_deposit (plugin->cls,
@@ -1838,18 +1837,18 @@ run (void *cls)
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
session));
- FAILIF (GNUNET_NO !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->test_deposit_done (plugin->cls,
session,
&deposit));
- FAILIF (GNUNET_OK !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_done (plugin->cls,
session,
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls,
session));
- FAILIF (GNUNET_YES !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->test_deposit_done (plugin->cls,
session,
&deposit));
@@ -1857,17 +1856,18 @@ run (void *cls)
result = 10;
deposit2 = deposit;
RND_BLK (&deposit2.merchant_pub); /* should fail if merchant is different */
- FAILIF (0 !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->have_deposit (plugin->cls,
session,
&deposit2));
deposit2.merchant_pub = deposit.merchant_pub;
RND_BLK (&deposit2.coin.coin_pub); /* should fail if coin is different */
- FAILIF (0 !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->have_deposit (plugin->cls,
session,
&deposit2));
- FAILIF (GNUNET_OK != test_melting (session));
+ FAILIF (GNUNET_OK !=
+ test_melting (session));
/* test insert_refund! */
@@ -1886,7 +1886,7 @@ run (void *cls)
/* test payback / revocation */
RND_BLK (&master_sig);
- FAILIF (GNUNET_OK !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_denomination_revocation (plugin->cls,
session,
&dkp_pub_hash,
@@ -1897,7 +1897,7 @@ run (void *cls)
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
session));
- FAILIF (GNUNET_NO !=
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->insert_denomination_revocation (plugin->cls,
session,
&dkp_pub_hash,
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index 9f80fda42..d3b48e496 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -674,9 +674,9 @@ struct TALER_EXCHANGEDB_Session;
* @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed
* @param receiver_wire_account wire details for the merchant, NULL from iterate_matching_deposits()
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
-typedef int
+typedef enum GNUNET_DB_QueryStatus
(*TALER_EXCHANGEDB_DepositIterator)(void *cls,
uint64_t rowid,
const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -1383,10 +1383,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session connection to the database
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
- * @return number of rows processed, 0 if none exist,
- * #GNUNET_SYSERR on error
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*get_ready_deposit) (void *cls,
struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
@@ -1418,9 +1417,9 @@ struct TALER_EXCHANGEDB_Plugin
* be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values
* are not supported, smaller values would be inefficient.
* @return number of rows processed, 0 if none exist,
- * #GNUNET_SYSERR on error
+ * transaction status code on error
*/
- int
+ enum GNUNET_DB_QueryStatus
(*iterate_matching_deposits) (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_HashCode *h_wire,
@@ -1753,11 +1752,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session database connection
* @param wtid the raw wire transfer identifier we used
* @param deposit_serial_id row in the deposits table for which this is aggregation data
- * @return #GNUNET_OK on success
- * #GNUNET_NO on transient errors
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*insert_aggregation_tracking)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct TALER_WireTransferIdentifierRawP *wtid,
@@ -1774,11 +1771,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param end_date when does the fee end being valid
* @param wire_fee how high is the wire transfer fee
* @param master_sig signature over the above by the exchange master key
- * @return #GNUNET_OK on success or if the record exists,
- * #GNUNET_NO on transient errors,
- * #GNUNET_SYSERR on failure
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*insert_wire_fee)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *wire_method,
@@ -1787,7 +1782,7 @@ struct TALER_EXCHANGEDB_Plugin
const struct TALER_Amount *wire_fee,
const struct TALER_MasterSignatureP *master_sig);
-
+
/**
* Obtain wire fee from database.
*
@@ -1879,9 +1874,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param cls closure
* @param session database connection
* @param rowid which entry to mark as finished
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*wire_prepare_data_mark_finished)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
uint64_t rowid);
@@ -1895,11 +1890,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session database connection
* @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb
- * @return #GNUNET_OK on success,
- * #GNUNET_NO if there are no entries,
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*wire_prepare_data_get)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_WirePreparationIterator cb,
@@ -1930,10 +1923,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param wtid subject of the wire transfer
* @param wire_account details about the receiver account of the wire transfer
* @param amount amount that was transmitted
- * @return #GNUNET_OK on success
- * #GNUNET_SYSERR on DB errors
+ * @return transaction status code
*/
- int
+ enum GNUNET_DB_QueryStatus
(*store_wire_transfer_out)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date,