/* This file is part of TALER Copyright (C) 2016-2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and prepares their execution * @author Christian Grothoff */ #include "platform.h" #include #include #include #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" #include "taler_kyclogic_lib.h" #include "taler_bank_service.h" #include "taler_dbevents.h" /** * Information about one aggregation process to be executed. There is * at most one of these around at any given point in time. * Note that this limits parallelism, and we might want * to revise this decision at a later point. */ struct AggregationUnit { /** * Public key of the merchant. */ struct TALER_MerchantPublicKeyP merchant_pub; /** * Transient amount already found aggregated, * set only if @e have_transient is true. */ struct TALER_Amount trans; /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; /** * Final amount to be transferred (after fee and rounding down). */ struct TALER_Amount final_amount; /** * Wire fee we charge for @e wp at @e execution_time. */ struct TALER_WireFeeSet fees; /** * Wire transfer identifier we use. */ struct TALER_WireTransferIdentifierRawP wtid; /** * The current time (which triggered the aggregation and * defines the wire fee). */ struct GNUNET_TIME_Timestamp execution_time; /** * Wire details of the merchant. */ char *payto_uri; /** * Selected wire target for the aggregation. */ struct TALER_PaytoHashP h_payto; /** * Exchange wire account to be used for the preparation and * eventual execution of the aggregate wire transfer. */ const struct TALER_EXCHANGEDB_AccountInfo *wa; /** * Row in KYC table for legitimization requirements * that are pending for this aggregation, or 0 if none. */ uint64_t requirement_row; /** * Set to #GNUNET_OK during transient checking * while everything is OK. Otherwise see return * value of #do_aggregate(). */ enum GNUNET_GenericReturnValue ret; /** * Do we have an entry in the transient table for * this aggregation? */ bool have_transient; }; /** * Work shard we are processing. */ struct Shard { /** * When did we start processing the shard? */ struct GNUNET_TIME_Timestamp start_time; /** * Starting row of the shard. */ uint32_t shard_start; /** * Inclusive end row of the shard. */ uint32_t shard_end; /** * Number of starting points found in the shard. */ uint64_t work_counter; }; /** * What is the smallest unit we support for wire transfers? * We will need to round down to a multiple of this amount. */ static struct TALER_Amount currency_round_unit; /** * What is the largest amount we transfer before triggering * an AML check? */ static struct TALER_Amount aml_threshold; /** * What is the base URL of this exchange? Used in the * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. */ static char *exchange_base_url; /** * Set to #GNUNET_YES if this exchange does not support KYC checks * and thus deposits are to be aggregated regardless of the * KYC status of the target account. */ static int kyc_off; /** * The exchange's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our database plugin. */ static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** * How long should we sleep when idle before trying to find more work? */ static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; /** * How big are the shards we are processing? Is an inclusive offset, so every * shard ranges from [X,X+shard_size) exclusive. So a shard covers * shard_size slots. The maximum value for shard_size is INT32_MAX+1. */ static uint32_t shard_size; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * * @param cls a `struct Shard *` */ static void run_aggregation (void *cls); /** * Work on transactions unlocked by KYC. * * @param cls NULL */ static void drain_kyc_alerts (void *cls); /** * Free data stored in @a au, but not @a au itself (stack allocated). * * @param au aggregation unit to clean up */ static void cleanup_au (struct AggregationUnit *au) { GNUNET_assert (NULL != au); GNUNET_free (au->payto_uri); memset (au, 0, sizeof (*au)); } /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure */ static void shutdown_task (void *cls) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) { GNUNET_SCHEDULER_cancel (task); task = NULL; } TALER_KYCLOGIC_kyc_done (); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); cfg = NULL; } /** * Parse the configuration for aggregator. * * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue parse_aggregator_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "exchange", "BASE_URL", &exchange_base_url)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", "BASE_URL"); return GNUNET_SYSERR; } if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "exchange", "AGGREGATOR_IDLE_SLEEP_INTERVAL", &aggregator_idle_sleep_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", "AGGREGATOR_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } if ( (GNUNET_OK != TALER_config_get_amount (cfg, "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || (TALER_amount_is_zero (¤cy_round_unit)) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); return GNUNET_SYSERR; } if (GNUNET_OK != TALER_config_get_amount (cfg, "exchange", "AML_THRESHOLD", &aml_threshold)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); return GNUNET_SYSERR; } if (GNUNET_OK != TALER_EXCHANGEDB_load_accounts (cfg, TALER_EXCHANGEDB_ALO_DEBIT)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No wire accounts configured for debit!\n"); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; return GNUNET_SYSERR; } return GNUNET_OK; } /** * Perform a database commit. If it fails, print a warning. * * @return status of commit */ static enum GNUNET_DB_QueryStatus commit_or_warn (void) { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->commit (db_plugin->cls); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) return qs; GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) ? GNUNET_ERROR_TYPE_INFO : GNUNET_ERROR_TYPE_ERROR, "Failed to commit database transaction!\n"); return qs; } /** * Release lock on shard @a s in the database. * On error, terminates this process. * * @param[in] s shard to free (and memory to release) */ static void release_shard (struct Shard *s) { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->release_revolving_shard ( db_plugin->cls, "aggregator", s->shard_start, s->shard_end); GNUNET_free (s); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* Strange, but let's just continue */ break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* normal case */ break; } } /** * Trigger the wire transfer for the @a au_active * and delete the record of the aggregation. * * @param au_active information about the aggregation */ static enum GNUNET_DB_QueryStatus trigger_wire_transfer (const struct AggregationUnit *au_active) { enum GNUNET_DB_QueryStatus qs; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Preparing wire transfer of %s to %s\n", TALER_amount2s (&au_active->final_amount), TALER_B2S (&au_active->merchant_pub)); { void *buf; size_t buf_size; TALER_BANK_prepare_transfer (au_active->payto_uri, &au_active->final_amount, exchange_base_url, &au_active->wtid, &buf, &buf_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Storing %u bytes of wire prepare data\n", (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, au_active->wa->method, buf, buf_size); GNUNET_free (buf); } /* Commit the WTID data to 'wire_out' */ if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, au_active->execution_time, &au_active->wtid, &au_active->h_payto, au_active->wa->section_name, &au_active->final_amount); if ( (qs >= 0) && au_active->have_transient) qs = db_plugin->delete_aggregation_transient (db_plugin->cls, &au_active->h_payto, &au_active->wtid); return qs; } /** * Callback to return all applicable amounts for the KYC * decision to @ a cb. * * @param cls a `struct AggregationUnit *` * @param limit time limit for the iteration * @param cb function to call with the amounts * @param cb_cls closure for @a cb */ static void return_relevant_amounts (void *cls, struct GNUNET_TIME_Absolute limit, TALER_EXCHANGEDB_KycAmountCallback cb, void *cb_cls) { const struct AggregationUnit *au_active = cls; enum GNUNET_DB_QueryStatus qs; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Returning amount %s in KYC check\n", TALER_amount2s (&au_active->total_amount)); if (GNUNET_OK != cb (cb_cls, &au_active->total_amount, GNUNET_TIME_absolute_get ())) return; qs = db_plugin->select_aggregation_amounts_for_kyc_check ( db_plugin->cls, &au_active->h_payto, limit, cb, cb_cls); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to select aggregation amounts for KYC limit check!\n"); } } /** * Test if KYC is required for a transfer to @a h_payto. * * @param[in,out] au_active aggregation unit to check for * @return true if KYC checks are satisfied */ static bool kyc_satisfied (struct AggregationUnit *au_active) { char *requirement; enum GNUNET_DB_QueryStatus qs; if (kyc_off) return true; qs = TALER_KYCLOGIC_kyc_test_required ( TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, &au_active->h_payto, db_plugin->select_satisfied_kyc_processes, db_plugin->cls, &return_relevant_amounts, (void *) au_active, &requirement); if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return false; } if (NULL == requirement) return true; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "KYC requirement for %s is %s\n", TALER_amount2s (&au_active->total_amount), requirement); qs = db_plugin->insert_kyc_requirement_for_account ( db_plugin->cls, requirement, &au_active->h_payto, NULL, /* not a reserve */ &au_active->requirement_row); if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to persist KYC requirement `%s' in DB!\n", requirement); } else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Legitimization process %llu started\n", (unsigned long long) au_active->requirement_row); } GNUNET_free (requirement); return false; } /** * Function called on each @a amount that was found to * be relevant for an AML check. * * @param cls closure with the `struct TALER_Amount *` where we store the sum * @param amount encountered transaction amount * @param date when was the amount encountered * @return #GNUNET_OK to continue to iterate, * #GNUNET_NO to abort iteration * #GNUNET_SYSERR on internal error (also abort itaration) */ static enum GNUNET_GenericReturnValue sum_for_aml ( void *cls, const struct TALER_Amount *amount, struct GNUNET_TIME_Absolute date) { struct TALER_Amount *sum = cls; (void) date; if (0 > TALER_amount_add (sum, sum, amount)) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Test if AML is required for a transfer to @a h_payto. * * @param[in,out] au_active aggregation unit to check for * @return true if AML checks are satisfied */ static bool aml_satisfied (struct AggregationUnit *au_active) { enum GNUNET_DB_QueryStatus qs; struct TALER_Amount total; struct TALER_Amount threshold; enum TALER_AmlDecisionState decision; struct TALER_EXCHANGEDB_KycStatus kyc; total = au_active->final_amount; qs = db_plugin->select_aggregation_amounts_for_kyc_check ( db_plugin->cls, &au_active->h_payto, GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), GNUNET_TIME_UNIT_MONTHS), &sum_for_aml, &total); if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return false; } qs = db_plugin->select_aml_threshold (db_plugin->cls, &au_active->h_payto, &decision, &kyc, &threshold); if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return false; } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { threshold = aml_threshold; /* use default */ decision = TALER_AML_NORMAL; } switch (decision) { case TALER_AML_NORMAL: if (0 >= TALER_amount_cmp (&total, &threshold)) { /* total <= threshold, do nothing */ return true; } qs = db_plugin->trigger_aml_process (db_plugin->cls, &au_active->h_payto, &total); if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return false; } return false; case TALER_AML_PENDING: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "AML already pending, doing nothing\n"); return false; case TALER_AML_FROZEN: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Account frozen, doing nothing\n"); return false; } GNUNET_assert (0); return false; } /** * Perform the main aggregation work for @a au. Expects to be in * a working transaction, which the caller must also ultimately commit * (or rollback) depending on our return value. * * @param[in,out] au aggregation unit to work on * @return #GNUNET_OK if aggregation succeeded, * #GNUNET_NO to rollback and try again (serialization issue) * #GNUNET_SYSERR hard error, terminate aggregator process */ static enum GNUNET_GenericReturnValue do_aggregate (struct AggregationUnit *au) { enum GNUNET_DB_QueryStatus qs; au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( au->payto_uri); if (NULL == au->wa) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No exchange account configured for `%s', please fix your setup to continue!\n", au->payto_uri); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; } { struct GNUNET_TIME_Timestamp start_date; struct GNUNET_TIME_Timestamp end_date; struct TALER_MasterSignatureP master_sig; qs = db_plugin->get_wire_fee (db_plugin->cls, au->wa->method, au->execution_time, &start_date, &end_date, &au->fees, &master_sig); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not get wire fees for %s at %s. Aborting run.\n", au->wa->method, GNUNET_TIME_timestamp2s (au->execution_time)); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; } } /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ready deposit for %s, aggregating by target %s\n", TALER_B2S (&au->merchant_pub), au->payto_uri); qs = db_plugin->select_aggregation_transient (db_plugin->cls, &au->h_payto, &au->merchant_pub, au->wa->section_name, &au->wtid, &au->trans); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup transient aggregates!\n"); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No transient aggregation found, starting %s\n", TALER_B2S (&au->wtid)); au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: au->have_transient = true; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Transient aggregation found, resuming %s\n", TALER_B2S (&au->wtid)); break; } qs = db_plugin->aggregate (db_plugin->cls, &au->h_payto, &au->merchant_pub, &au->wtid, &au->total_amount); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute aggregation!\n"); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregation total is %s.\n", TALER_amount2s (&au->total_amount)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ if (au->have_transient) GNUNET_assert (0 <= TALER_amount_add (&au->total_amount, &au->total_amount, &au->trans)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Rounding aggregate of %s\n", TALER_amount2s (&au->total_amount)); if ( (0 >= TALER_amount_subtract (&au->final_amount, &au->total_amount, &au->fees.wire)) || (GNUNET_SYSERR == TALER_amount_round_down (&au->final_amount, ¤cy_round_unit)) || (TALER_amount_is_zero (&au->final_amount)) || (! kyc_satisfied (au)) || (! aml_satisfied (au)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not ready for wire transfer (%d/%s)\n", qs, TALER_amount2s (&au->final_amount)); if (au->have_transient) qs = db_plugin->update_aggregation_transient (db_plugin->cls, &au->h_payto, &au->wtid, au->requirement_row, &au->total_amount); else qs = db_plugin->create_aggregation_transient (db_plugin->cls, &au->h_payto, au->wa->section_name, &au->merchant_pub, &au->wtid, au->requirement_row, &au->total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue, trying again later!\n"); return GNUNET_NO; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_break (0); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; } /* commit */ return GNUNET_OK; } qs = trigger_wire_transfer (au); switch (qs) { case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue during aggregation; trying again later!\n") ; return GNUNET_NO; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; default: break; } { struct TALER_CoinDepositEventP rep = { .header.size = htons (sizeof (rep)), .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), .merchant_pub = au->merchant_pub }; db_plugin->event_notify (db_plugin->cls, &rep.header, NULL, 0); } return GNUNET_OK; } static void run_aggregation (void *cls) { struct Shard *s = cls; struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; enum GNUNET_GenericReturnValue ret; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); /* make sure we have current fees */ memset (&au_active, 0, sizeof (au_active)); au_active.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_OK != db_plugin->start_deferred_wire_out (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); release_shard (s); return; } qs = db_plugin->get_ready_deposit ( db_plugin->cls, s->shard_start, s->shard_end, &au_active.merchant_pub, &au_active.payto_uri); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to begin deposit iteration!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); release_shard (s); return; case GNUNET_DB_STATUS_SOFT_ERROR: cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: { uint64_t counter = s->work_counter; struct GNUNET_TIME_Relative duration = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Completed shard [%u,%u] after %s with %llu deposits\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end, GNUNET_TIME_relative2s (duration, true), (unsigned long long) counter); release_shard (s); if ( (GNUNET_YES == test_mode) && (0 == counter) ) { /* in test mode, shutdown after a shard is done with 0 work */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No work done and in test mode, shutting down\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == task); /* If we ended up doing zero work, sleep a bit */ if (0 == counter) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Going to sleep for %s before trying again\n", GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, true)); task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, &drain_kyc_alerts, NULL); } else { task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); } return; } case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: s->work_counter++; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Found ready deposit!\n"); /* continued below */ break; } TALER_payto_hash (au_active.payto_uri, &au_active.h_payto); ret = do_aggregate (&au_active); cleanup_au (&au_active); switch (ret) { case GNUNET_SYSERR: global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); release_shard (s); return; case GNUNET_NO: db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; case GNUNET_OK: /* continued below */ break; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Committing aggregation result\n"); /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ release_shard (s); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Commit complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; default: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ release_shard (s); return; } } /** * Select a shard to work on. * * @param cls NULL */ static void run_shard (void *cls) { struct Shard *s; enum GNUNET_DB_QueryStatus qs; (void) cls; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running aggregation shard\n"); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database connection!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } s = GNUNET_new (struct Shard); s->start_time = GNUNET_TIME_timestamp_get (); qs = db_plugin->begin_revolving_shard (db_plugin->cls, "aggregator", shard_size, 1U + INT32_MAX, &s->shard_start, &s->shard_end); if (0 >= qs) { if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { static struct GNUNET_TIME_Relative delay; GNUNET_free (s); delay = GNUNET_TIME_randomized_backoff (delay, GNUNET_TIME_UNIT_SECONDS); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_delayed (delay, &run_shard, NULL); return; } GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to begin shard (%d)!\n", qs); GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting shard [%u:%u]!\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); } /** * Function called on transient aggregations matching * a particular hash of a payto URI. * * @param cls * @param payto_uri corresponding payto URI * @param wtid wire transfer identifier of transient aggregation * @param merchant_pub public key of the merchant * @param total amount aggregated so far * @return true to continue to iterate */ static bool handle_transient_cb ( void *cls, const char *payto_uri, const struct TALER_WireTransferIdentifierRawP *wtid, const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_Amount *total) { struct AggregationUnit *au = cls; if (GNUNET_OK != au->ret) { GNUNET_break (0); return false; } au->payto_uri = GNUNET_strdup (payto_uri); au->wtid = *wtid; au->merchant_pub = *merchant_pub; au->trans = *total; au->have_transient = true; au->ret = do_aggregate (au); GNUNET_free (au->payto_uri); return (GNUNET_OK == au->ret); } static void drain_kyc_alerts (void *cls) { enum GNUNET_DB_QueryStatus qs; struct AggregationUnit au; (void) cls; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Draining KYC alerts\n"); memset (&au, 0, sizeof (au)); au.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database connection!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } if (GNUNET_OK != db_plugin->start (db_plugin->cls, "handle kyc alerts")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } while (1) { qs = db_plugin->drain_kyc_alert (db_plugin->cls, 1, &au.h_payto); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_SOFT_ERROR: db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: qs = db_plugin->commit (db_plugin->cls); if (qs < 0) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to commit KYC drain\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_shard, NULL); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* handled below */ break; } au.ret = GNUNET_OK; qs = db_plugin->find_aggregation_transient (db_plugin->cls, &au.h_payto, &handle_transient_cb, &au); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup transient aggregates!\n"); db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_SOFT_ERROR: /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: continue; /* while (1) */ default: break; } break; } /* while(1) */ { enum GNUNET_GenericReturnValue ret; ret = au.ret; cleanup_au (&au); switch (ret) { case GNUNET_SYSERR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ return; case GNUNET_NO: db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_OK: /* continued below */ break; } } switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Commit complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; default: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ return; } } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { unsigned long long ass; (void) cls; (void) args; (void) cfgfile; cfg = c; if (GNUNET_OK != parse_aggregator_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; return; } if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "exchange", "AGGREGATOR_SHARD_SIZE", &ass)) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; return; } if ( (0 == ass) || (ass > INT32_MAX) ) shard_size = 1U + INT32_MAX; else shard_size = (uint32_t) ass; if (GNUNET_OK != TALER_KYCLOGIC_kyc_init (cfg)) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; return; } GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); } /** * The main function of the taler-exchange-aggregator. * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, non-zero on error, see #global_ret */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_flag ('y', "kyc-off", "perform wire transfers without KYC checks", &kyc_off), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return EXIT_INVALIDARGUMENT; TALER_OS_init (); ret = GNUNET_PROGRAM_run ( argc, argv, "taler-exchange-aggregator", gettext_noop ( "background process that aggregates and executes wire transfers"), options, &run, NULL); GNUNET_free_nz ((void *) argv); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; return global_ret; } /* end of taler-exchange-aggregator.c */