/* This file is part of TALER Copyright (C) 2016 GNUnet e.V. TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, If not, see */ /** * @file taler-mint-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff * * TODO: * - simplify global_ret: make it a global! * - handle shutdown more nicely (call 'cancel' method on wire transfers) */ #include "platform.h" #include #include #include #include "taler_mintdb_lib.h" #include "taler_mintdb_plugin.h" #include "taler_wire_lib.h" /** * Which currency is used by this mint? */ static char *mint_currency_string; /** * Which wireformat should be supported by this aggregator? */ static char *mint_wireformat; /** * Base directory of the mint (global) */ static char *mint_directory; /** * The mint's configuration (global) */ static struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our DB plugin. */ static struct TALER_MINTDB_Plugin *db_plugin; /** * Our wire plugin. */ static struct TALER_WIRE_Plugin *wire_plugin; /** * Task for the main #run() function. */ static struct GNUNET_SCHEDULER_Task *task; /** * Limit on the number of transactions we aggregate at once. Note * that the limit must be big enough to ensure that when transactions * of the smallest possible unit are aggregated, they do surpass the * "tiny" threshold beyond which we never trigger a wire transaction! * * TODO: make configurable (via config file or command line option) */ static unsigned int aggregation_limit = 10000; /** * Load configuration parameters for the mint * server into the corresponding global variables. * * @param mint_directory the mint's directory * @return #GNUNET_OK on success */ static int mint_serve_process_config (const char *mint_directory) { char *type; cfg = TALER_config_load (mint_directory); if (NULL == cfg) { fprintf (stderr, "Failed to load mint configuration\n"); return GNUNET_SYSERR; } if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "mint", "currency", &mint_currency_string)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "mint", "currency"); return GNUNET_SYSERR; } if (strlen (mint_currency_string) >= TALER_CURRENCY_LEN) { fprintf (stderr, "Currency `%s' longer than the allowed limit of %u characters.", mint_currency_string, (unsigned int) TALER_CURRENCY_LEN); return GNUNET_SYSERR; } if (NULL != mint_wireformat) GNUNET_CONFIGURATION_set_value_string (cfg, "mint", "wireformat", mint_wireformat); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "mint", "wireformat", &type)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "mint", "wireformat"); return GNUNET_SYSERR; } if (NULL == (db_plugin = TALER_MINTDB_plugin_load (cfg))) { fprintf (stderr, "Failed to initialize DB subsystem\n"); GNUNET_free (type); return GNUNET_SYSERR; } if (NULL == (wire_plugin = TALER_WIRE_plugin_load (cfg, type))) { fprintf (stderr, "Failed to load wire plugin for `%s'\n", type); GNUNET_free (type); return GNUNET_SYSERR; } GNUNET_free (type); return GNUNET_OK; } /** * Information about one aggregation process to * be executed. */ struct AggregationUnit { /** * Public key of the merchant. */ struct TALER_MerchantPublicKeyP merchant_pub; /** * Total amount to be transferred. */ struct TALER_Amount total_amount; /** * Hash of @e wire. */ struct GNUNET_HashCode h_wire; /** * Wire transfer identifier we use. */ struct TALER_WireTransferIdentifierRawP wtid; /** * Row ID of the transaction that started it all. */ unsigned long long row_id; /** * The current time. */ struct GNUNET_TIME_Absolute execution_time; /** * Wire details of the merchant. */ json_t *wire; /** * Database session for all of our transactions. */ struct TALER_MINTDB_Session *session; /** * Wire preparation handle. */ struct TALER_WIRE_PrepareHandle *ph; /** * Array of #aggregation_limit row_ids from the * aggregation. */ unsigned long long *additional_rows; /** * Pointer to global return value. Closure for #run(). */ int *global_ret; /** * Offset specifying how many #additional_rows are in use. */ unsigned int rows_offset; /** * Set to #GNUNET_YES if we have to abort due to failure. */ int failed; }; /** * Function called with details about deposits that have been made, * with the goal of executing the corresponding wire transaction. * * @param cls closure with the `struct AggregationUnit` * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin * @param amount_with_fee amount that was deposited including fee * @param deposit_fee amount the mint gets to keep as transaction fees * @param transaction_id unique transaction ID chosen by the merchant * @param h_contract hash of the contract between merchant and customer * @param wire_deadline by which the merchant adviced that he would like the * wire transfer to be executed * @param wire wire details for the merchant * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ static int deposit_cb (void *cls, unsigned long long row_id, const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_CoinSpendPublicKeyP *coin_pub, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, uint64_t transaction_id, const struct GNUNET_HashCode *h_contract, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { struct AggregationUnit *au = cls; au->merchant_pub = *merchant_pub; if (GNUNET_OK != TALER_amount_subtract (&au->total_amount, amount_with_fee, deposit_fee)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at %llu\n", row_id); return GNUNET_SYSERR; } au->row_id = row_id; au->wire = (json_t *) wire; au->execution_time = GNUNET_TIME_absolute_get (); TALER_hash_json (au->wire, &au->h_wire); json_incref (au->wire); GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); if (GNUNET_OK != db_plugin->insert_aggregation_tracking (db_plugin->cls, au->session, &au->wtid, merchant_pub, &au->h_wire, h_contract, transaction_id, au->execution_time, coin_pub, amount_with_fee, deposit_fee)) { GNUNET_break (0); return GNUNET_SYSERR; } if (GNUNET_OK != db_plugin->mark_deposit_done (db_plugin->cls, au->session, row_id)) { GNUNET_break (0); au->failed = GNUNET_YES; return GNUNET_SYSERR; } return GNUNET_OK; } /** * Function called with details about another deposit we * can aggregate into an existing aggregation unit. * * @param cls closure with the `struct AggregationUnit` * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin * @param amount_with_fee amount that was deposited including fee * @param deposit_fee amount the mint gets to keep as transaction fees * @param transaction_id unique transaction ID chosen by the merchant * @param h_contract hash of the contract between merchant and customer * @param wire_deadline by which the merchant adviced that he would like the * wire transfer to be executed * @param wire wire details for the merchant * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ static int aggregate_cb (void *cls, unsigned long long row_id, const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_CoinSpendPublicKeyP *coin_pub, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, uint64_t transaction_id, const struct GNUNET_HashCode *h_contract, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { struct AggregationUnit *au = cls; struct TALER_Amount delta; GNUNET_break (0 == memcmp (&au->merchant_pub, merchant_pub, sizeof (struct TALER_MerchantPublicKeyP))); /* compute contribution of this coin after fees */ if (GNUNET_OK != TALER_amount_subtract (&delta, amount_with_fee, deposit_fee)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at %llu\n", row_id); return GNUNET_SYSERR; } /* add to total */ if (GNUNET_OK != TALER_amount_add (&au->total_amount, &au->total_amount, &delta)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Overflow or currency incompatibility during aggregation at %llu\n", row_id); /* Skip this one, but keep going! */ return GNUNET_OK; } if (au->rows_offset >= aggregation_limit) { /* Bug: we asked for at most #aggregation_limit results! */ GNUNET_break (0); /* Skip this one, but keep going. */ return GNUNET_OK; } if (NULL == au->additional_rows) au->additional_rows = GNUNET_new_array (aggregation_limit, unsigned long long); /* "append" to our list of rows */ au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ if (GNUNET_OK != db_plugin->insert_aggregation_tracking (db_plugin->cls, au->session, &au->wtid, merchant_pub, &au->h_wire, h_contract, transaction_id, au->execution_time, coin_pub, amount_with_fee, deposit_fee)) { GNUNET_break (0); return GNUNET_SYSERR; } if (GNUNET_OK != db_plugin->mark_deposit_done (db_plugin->cls, au->session, row_id)) { GNUNET_break (0); au->failed = GNUNET_YES; return GNUNET_SYSERR; } return GNUNET_OK; } /** * Function to be called with the prepared transfer data. * * @param cls closure with the `struct AggregationUnit` * @param buf transaction data to persist, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ static void prepare_cb (void *cls, const char *buf, size_t buf_size); /** * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * * @param cls pointer to an `int` which we will return from main() * @param tc scheduler context */ static void run_aggregation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { int *global_ret = cls; struct TALER_MINTDB_Session *session; struct AggregationUnit *au; unsigned int i; int ret; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; if (NULL == (session = db_plugin->get_session (db_plugin->cls, GNUNET_NO))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); *global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != db_plugin->start (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); *global_ret = GNUNET_SYSERR; return; } au = GNUNET_new (struct AggregationUnit); au->session = session; ret = db_plugin->get_ready_deposit (db_plugin->cls, session, &deposit_cb, au); if (GNUNET_OK != ret) { GNUNET_free (au); db_plugin->rollback (db_plugin->cls, session); if (0 != ret) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); *global_ret = GNUNET_SYSERR; return; } /* nothing to do, sleep for a minute and try again */ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &run_aggregation, global_ret); return; } /* Now try to find other deposits to aggregate */ ret = db_plugin->iterate_matching_deposits (db_plugin->cls, session, &au->h_wire, &au->merchant_pub, &aggregate_cb, au, aggregation_limit); if ( (GNUNET_SYSERR == ret) || (GNUNET_YES == au->failed) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); GNUNET_free_non_null (au->additional_rows); GNUNET_free (au); db_plugin->rollback (db_plugin->cls, session); *global_ret = GNUNET_SYSERR; return; } /* Round to the unit supported by the wire transfer method */ GNUNET_assert (GNUNET_SYSERR != wire_plugin->amount_round (wire_plugin->cls, &au->total_amount)); /* Check if after rounding down, we still have an amount to transfer */ if ( (0 == au->total_amount.value) && (0 == au->total_amount.fraction) ) { /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, session); /* Start another transaction to mark all* of the selected deposits *as minor! */ if (GNUNET_OK != db_plugin->start (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); *global_ret = GNUNET_SYSERR; GNUNET_free_non_null (au->additional_rows); GNUNET_free (au); return; } /* Mark transactions by row_id as minor */ ret = GNUNET_OK; if (GNUNET_OK != db_plugin->mark_deposit_tiny (db_plugin->cls, session, au->row_id)) ret = GNUNET_SYSERR; else for (i=0;irows_offset;i++) if (GNUNET_OK != db_plugin->mark_deposit_tiny (db_plugin->cls, session, au->additional_rows[i])) ret = GNUNET_SYSERR; /* commit */ if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to commit database transaction!\n"); } GNUNET_free_non_null (au->additional_rows); GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } au->global_ret = global_ret; au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, au->wire, &au->total_amount, &au->wtid, &prepare_cb, au); /* FIXME: currently we have no clean-up plan on shutdown to call prepare_wire_transfer_cancel! Maybe make 'au' global? */ if (NULL == au->ph) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); GNUNET_free_non_null (au->additional_rows); GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } /* otherwise we continue with #prepare_cb(), see below */ } /** * Execute the wire transfers that we have committed to * do. * * @param cls pointer to an `int` which we will return from main() * @param tc scheduler context */ static void run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); /** * Function to be called with the prepared transfer data. * * @param cls closure with the `struct AggregationUnit` * @param buf transaction data to persist, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ static void prepare_cb (void *cls, const char *buf, size_t buf_size) { struct AggregationUnit *au = cls; int *global_ret = au->global_ret; struct TALER_MINTDB_Session *session = au->session; GNUNET_free_non_null (au->additional_rows); GNUNET_free (au); if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } /* Commit our intention to execute the wire transfer! */ if (GNUNET_OK != db_plugin->wire_prepare_data_insert (db_plugin->cls, session, mint_wireformat, buf, buf_size)) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } /* run alternative task: actually do wire transfer! */ task = GNUNET_SCHEDULER_add_now (&run_transfers, &global_ret); } /** * Data we keep to #run_transfers(). */ struct WirePrepareData { /** * Database session for all of our transactions. */ struct TALER_MINTDB_Session *session; /** * Wire execution handle. */ struct TALER_WIRE_ExecuteHandle *eh; /** * Pointer to global return value. Closure for #run(). */ int *global_ret; /** * Row ID of the transfer. */ unsigned long long row_id; }; /** * Function called with the result from the execute step. * * @param cls closure with the `struct WirePrepareData` * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure * @param emsg NULL on success, otherwise an error message */ static void wire_confirm_cb (void *cls, int success, const char *emsg) { struct WirePrepareData *wpd = cls; int *global_ret = wpd->global_ret; struct TALER_MINTDB_Session *session = wpd->session; wpd->eh = NULL; if (GNUNET_SYSERR == success) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Wire transaction failed: %s\n", emsg); db_plugin->rollback (db_plugin->cls, session); *global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } if (GNUNET_OK != db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, session, wpd->row_id)) { GNUNET_break (0); /* why!? */ db_plugin->rollback (db_plugin->cls, session); *global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } GNUNET_free (wpd); if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); return; } /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ task = GNUNET_SCHEDULER_add_now (&run_transfers, &global_ret); } /** * Callback with data about a prepared transaction. * * @param cls closure with the `struct WirePrepareData` * @param rowid row identifier used to mark prepared transaction as done * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ static void wire_prepare_cb (void *cls, unsigned long long rowid, const char *buf, size_t buf_size) { struct WirePrepareData *wpd = cls; int *global_ret = wpd->global_ret; wpd->row_id = rowid; wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, buf, buf_size, &wire_confirm_cb, wpd); /* FIXME: currently we have no clean-up plan on shutdown to call execute_wire_transfer_cancel! Maybe make 'wpd' global? */ if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, wpd->session); *global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } } /** * Execute the wire transfers that we have committed to * do. * * @param cls pointer to an `int` which we will return from main() * @param tc scheduler context */ static void run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { int *global_ret = cls; int ret; struct WirePrepareData *wpd; struct TALER_MINTDB_Session *session; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; if (NULL == (session = db_plugin->get_session (db_plugin->cls, GNUNET_NO))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); *global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != db_plugin->start (db_plugin->cls, session)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); *global_ret = GNUNET_SYSERR; return; } wpd = GNUNET_new (struct WirePrepareData); wpd->session = session; wpd->global_ret = global_ret; ret = db_plugin->wire_prepare_data_get (db_plugin->cls, session, mint_wireformat, &wire_prepare_cb, wpd); if (GNUNET_SYSERR == ret) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); *global_ret = GNUNET_SYSERR; GNUNET_free (wpd); return; } if (GNUNET_NO == ret) { /* no more prepared wire transfers, go back to aggregation! */ db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, global_ret); GNUNET_free (wpd); return; } /* otherwise, continues in #wire_prepare_cb() */ } /** * The main function of the taler-mint-httpd server ("the mint"). * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { static const struct GNUNET_GETOPT_CommandLineOption options[] = { {'d', "mint-dir", "DIR", "mint directory with configuration and keys for operating the mint", 1, &GNUNET_GETOPT_set_filename, &mint_directory}, {'f', "format", "WIREFORMAT", "wireformat to use, overrides WIREFORMAT option in [mint] section", 1, &GNUNET_GETOPT_set_filename, &mint_wireformat}, TALER_GETOPT_OPTION_HELP ("background process that aggregates and executes wire transfers to merchants"), GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; int ret = GNUNET_OK; GNUNET_assert (GNUNET_OK == GNUNET_log_setup ("taler-mint-aggregator", "INFO", NULL)); if (0 >= GNUNET_GETOPT_run ("taler-mint-aggregator", options, argc, argv)) return 1; if (NULL == mint_directory) { fprintf (stderr, "Mint directory not specified\n"); return 1; } if (GNUNET_OK != mint_serve_process_config (mint_directory)) { return 1; } GNUNET_SCHEDULER_run (&run_transfers, &ret); TALER_MINTDB_plugin_unload (db_plugin); TALER_WIRE_plugin_unload (wire_plugin); return (GNUNET_SYSERR == ret) ? 1 : 0; } /* end of taler-mint-aggregator.c */