summaryrefslogtreecommitdiff
path: root/src/backend/taler-merchant-exchange.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/taler-merchant-exchange.c')
-rw-r--r--src/backend/taler-merchant-exchange.c1304
1 files changed, 1304 insertions, 0 deletions
diff --git a/src/backend/taler-merchant-exchange.c b/src/backend/taler-merchant-exchange.c
new file mode 100644
index 00000000..7945cb50
--- /dev/null
+++ b/src/backend/taler-merchant-exchange.c
@@ -0,0 +1,1304 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2023 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+/**
+ * @file taler-merchant-exchange.c
+ * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include <gnunet/gnunet_util_lib.h>
+#include <jansson.h>
+#include <pthread.h>
+#include <taler/taler_dbevents.h>
+#include "taler_merchant_bank_lib.h"
+#include "taler_merchantdb_lib.h"
+#include "taler_merchantdb_plugin.h"
+
+/**
+ * Timeout for the exchange interaction. Rather long as we should do
+ * long-polling and do not want to wake up too often.
+ */
+#define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \
+ GNUNET_TIME_UNIT_MINUTES, \
+ 30)
+
+/**
+ * How many inquiries do we process concurrently at most.
+ */
+#define OPEN_INQUIRY_LIMIT 1024
+
+/**
+ * How many inquiries do we process concurrently per exchange at most.
+ */
+#define EXCHANGE_INQUIRY_LIMIT 16
+
+
+/**
+ * Information about an inquiry job.
+ */
+struct Inquiry;
+
+
+/**
+ * Information about an exchange.
+ */
+struct Exchange
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct Exchange *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct Exchange *prev;
+
+ /**
+ * Head of active inquiries.
+ */
+ struct Inquiry *w_head;
+
+ /**
+ * Tail of active inquiries.
+ */
+ struct Inquiry *w_tail;
+
+ /**
+ * Which exchange are we tracking here.
+ */
+ char *exchange_url;
+
+ /**
+ * A connection to this exchange
+ */
+ struct TALER_EXCHANGE_GetKeysHandle *conn;
+
+ /**
+ * The keys of this exchange
+ */
+ struct TALER_EXCHANGE_Keys *keys;
+
+ /**
+ * Task where we retry fetching /keys from the exchange.
+ */
+ struct GNUNET_SCHEDULER_Task *retry_task;
+
+ /**
+ * How many active inquiries do we have right now with this exchange.
+ */
+ unsigned int exchange_inquiries;
+
+ /**
+ * How soon can may we, at the earliest, re-download /keys?
+ */
+ struct GNUNET_TIME_Absolute first_retry;
+
+ /**
+ * How long should we wait between the next retry?
+ */
+ struct GNUNET_TIME_Relative retry_delay;
+
+ /**
+ * How long should we wait between requests
+ * for transfer details?
+ */
+ struct GNUNET_TIME_Relative transfer_delay;
+
+ /**
+ * False to indicate that there is an ongoing
+ * /keys transfer we are waiting for;
+ * true to indicate that /keys data is up-to-date.
+ */
+ bool ready;
+
+};
+
+
+/**
+ * Information about an inquiry job.
+ */
+struct Inquiry
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct Inquiry *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct Inquiry *prev;
+
+ /**
+ * Handle to the exchange that made the transfer.
+ */
+ struct Exchange *exchange;
+
+ /**
+ * Task where we retry fetching transfer details from the exchange.
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * For which merchant instance is this tracking request?
+ */
+ char *instance_id;
+
+ /**
+ * payto:// URI used for the transfer.
+ */
+ char *payto_uri;
+
+ /**
+ * Handle for the /wire/transfers request.
+ */
+ struct TALER_EXCHANGE_TransfersGetHandle *wdh;
+
+ /**
+ * When did the transfer happen?
+ */
+ struct GNUNET_TIME_Timestamp execution_time;
+
+ /**
+ * Argument for the /wire/transfers request.
+ */
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ /**
+ * Amount of the wire transfer.
+ */
+ struct TALER_Amount total;
+
+ /**
+ * Row of the wire transfer in our database.
+ */
+ uint64_t rowid;
+
+};
+
+
+/**
+ * Head of known exchanges.
+ */
+static struct Exchange *e_head;
+
+/**
+ * Tail of known exchanges.
+ */
+static struct Exchange *e_tail;
+
+/**
+ * The merchant's configuration.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+/**
+ * Our database plugin.
+ */
+static struct TALER_MERCHANTDB_Plugin *db_plugin;
+
+/**
+ * Handle to the context for interacting with the bank.
+ */
+static struct GNUNET_CURL_Context *ctx;
+
+/**
+ * Scheduler context for running the @e ctx.
+ */
+static struct GNUNET_CURL_RescheduleContext *rc;
+
+/**
+ * Main task for #find_work().
+ */
+static struct GNUNET_SCHEDULER_Task *task;
+
+/**
+ * Event handler to learn that there are new transfers
+ * to check.
+ */
+static struct GNUNET_DB_EventHandler *eh;
+
+/**
+ * How many active inquiries do we have right now.
+ */
+static unsigned int active_inquiries;
+
+/**
+ * Set to true if we ever encountered any problem.
+ */
+static bool found_problem;
+
+/**
+ * Value to return from main(). 0 on success, non-zero on errors.
+ */
+static int global_ret;
+
+/**
+ * #GNUNET_YES if we are in test mode and should exit when idle.
+ */
+static int test_mode;
+
+/**
+ * True if the last DB query was limited by the
+ * #OPEN_INQUIRY_LIMIT and we thus should check again
+ * as soon as we are substantially below that limit,
+ * and not only when we get a DB notification.
+ */
+static bool at_limit;
+
+
+/**
+ * Initiate download from exchange.
+ *
+ * @param cls a `struct Inquiry *`
+ */
+static void
+exchange_request (void *cls);
+
+
+/**
+ * The exchange @a e is ready to handle more inquiries,
+ * prepare to launch them.
+ *
+ * @param[in,out] e exchange to potentially launch inquiries on
+ */
+static void
+launch_inquiries_at_exchange (struct Exchange *e)
+{
+ for (struct Inquiry *w = e->w_head;
+ NULL != w;
+ w = w->next)
+ {
+ if (e->exchange_inquiries > EXCHANGE_INQUIRY_LIMIT)
+ break;
+ if ( (NULL == w->task) &&
+ (NULL == w->wdh) )
+ {
+ e->exchange_inquiries++;
+ w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
+ w);
+ }
+ }
+}
+
+
+/**
+ * Function that initiates a /keys download.
+ *
+ * @param cls a `struct Exchange *`
+ */
+static void
+download_keys (void *cls);
+
+
+/**
+ * Function called with information about who is auditing
+ * a particular exchange and what keys the exchange is using.
+ *
+ * @param cls closure with a `struct Exchange *`
+ * @param kr response data
+ * @param[in] keys the keys of the exchange
+ */
+static void
+cert_cb (
+ void *cls,
+ const struct TALER_EXCHANGE_KeysResponse *kr,
+ struct TALER_EXCHANGE_Keys *keys)
+{
+ struct Exchange *e = cls;
+ struct GNUNET_TIME_Absolute n;
+
+ e->conn = NULL;
+ switch (kr->hr.http_status)
+ {
+ case MHD_HTTP_OK:
+ e->ready = true;
+ TALER_EXCHANGE_keys_decref (e->keys);
+ e->keys = keys;
+ launch_inquiries_at_exchange (e);
+ /* Reset back-off */
+ e->retry_delay = GNUNET_TIME_UNIT_ZERO;
+ /* Success: rate limit at once per minute */
+ e->first_retry = GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MINUTES);
+ n = GNUNET_TIME_absolute_max (e->first_retry,
+ keys->key_data_expiration.abs_time);
+ if (NULL != e->retry_task)
+ GNUNET_SCHEDULER_cancel (e->retry_task);
+ e->retry_task = GNUNET_SCHEDULER_add_at (n,
+ &download_keys,
+ e);
+ break;
+ default:
+ e->retry_delay
+ = GNUNET_TIME_STD_BACKOFF (e->retry_delay);
+ e->first_retry
+ = GNUNET_TIME_relative_to_absolute (e->retry_delay);
+ if (NULL != e->retry_task)
+ GNUNET_SCHEDULER_cancel (e->retry_task);
+ e->retry_task = GNUNET_SCHEDULER_add_delayed (e->retry_delay,
+ &download_keys,
+ e);
+ break;
+ }
+}
+
+
+static void
+download_keys (void *cls)
+{
+ struct Exchange *e = cls;
+ struct GNUNET_TIME_Relative n;
+
+ /* If we do not hear back again soon, try again automatically */
+ n = GNUNET_TIME_STD_BACKOFF (e->retry_delay);
+ n = GNUNET_TIME_relative_max (n,
+ GNUNET_TIME_UNIT_MINUTES);
+ e->retry_task = GNUNET_SCHEDULER_add_delayed (n,
+ &download_keys,
+ e);
+ if ( (NULL == e->keys) ||
+ (GNUNET_TIME_absolute_is_past (e->keys->key_data_expiration.abs_time)) )
+ e->conn = TALER_EXCHANGE_get_keys (ctx,
+ e->exchange_url,
+ e->keys,
+ &cert_cb,
+ e);
+}
+
+
+/**
+ * Updates the transaction status for inquiry @a w to the given values.
+ *
+ * @param w inquiry to update status for
+ * @param next_attempt when should we retry @a w (if ever)
+ * @param ec error code to use (if any)
+ * @param failed failure status (if ultimately failed)
+ * @param verified success status (if ultimately successful)
+ */
+static void
+update_transaction_status (const struct Inquiry *w,
+ struct GNUNET_TIME_Absolute next_attempt,
+ enum TALER_ErrorCode ec,
+ bool failed,
+ bool verified)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (failed)
+ found_problem = true;
+ qs = db_plugin->update_transfer_status (db_plugin->cls,
+ w->exchange->exchange_url,
+ &w->wtid,
+ next_attempt,
+ ec,
+ failed,
+ verified);
+ if (qs < 0)
+ {
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Lookup our internal data structure for the given
+ * @a exchange_url or create one if we do not yet have
+ * one.
+ *
+ * @param exchange_url base URL of the exchange
+ * @return our state for this exchange
+ */
+static struct Exchange *
+find_exchange (const char *exchange_url)
+{
+ struct Exchange *e;
+
+ for (e = e_head; NULL != e; e = e->next)
+ if (0 == strcmp (exchange_url,
+ e->exchange_url))
+ return e;
+ e = GNUNET_new (struct Exchange);
+ e->exchange_url = GNUNET_strdup (exchange_url);
+ GNUNET_CONTAINER_DLL_insert (e_head,
+ e_tail,
+ e);
+ e->retry_task = GNUNET_SCHEDULER_add_now (&download_keys,
+ e);
+ return e;
+}
+
+
+/**
+ * Finds new transfers that require work in the merchant database.
+ *
+ * @param cls NULL
+ */
+static void
+find_work (void *cls);
+
+
+/**
+ * Free resources of @a w.
+ *
+ * @param[in] w inquiry job to terminate
+ */
+static void
+end_inquiry (struct Inquiry *w)
+{
+ struct Exchange *e = w->exchange;
+
+ GNUNET_assert (active_inquiries > 0);
+ active_inquiries--;
+ if (NULL != w->wdh)
+ {
+ TALER_EXCHANGE_transfers_get_cancel (w->wdh);
+ w->wdh = NULL;
+ }
+ GNUNET_free (w->instance_id);
+ GNUNET_free (w->payto_uri);
+ GNUNET_CONTAINER_DLL_remove (e->w_head,
+ e->w_tail,
+ w);
+ GNUNET_free (w);
+ if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) &&
+ (NULL == task) &&
+ (at_limit) )
+ {
+ at_limit = false;
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&find_work,
+ NULL);
+ }
+ if ( (NULL == task) &&
+ (! at_limit) &&
+ (0 == active_inquiries) &&
+ (test_mode) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No more open inquiries and in test mode. Existing.\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * We're being aborted with CTRL-C (or SIGTERM). Shut down.
+ *
+ * @param cls closure (NULL)
+ */
+static void
+shutdown_task (void *cls)
+{
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Running shutdown\n");
+ while (NULL != e_head)
+ {
+ struct Exchange *e = e_head;
+
+ while (NULL != e->w_head)
+ {
+ struct Inquiry *w = e->w_head;
+
+ end_inquiry (w);
+ }
+ GNUNET_free (e->exchange_url);
+ if (NULL != e->conn)
+ {
+ TALER_EXCHANGE_get_keys_cancel (e->conn);
+ e->conn = NULL;
+ }
+ if (NULL != e->keys)
+ {
+ TALER_EXCHANGE_keys_decref (e->keys);
+ e->keys = NULL;
+ }
+ if (NULL != e->retry_task)
+ {
+ GNUNET_SCHEDULER_cancel (e->retry_task);
+ e->retry_task = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove (e_head,
+ e_tail,
+ e);
+ GNUNET_free (e);
+ }
+ if (NULL != eh)
+ {
+ db_plugin->event_listen_cancel (eh);
+ eh = NULL;
+ }
+ if (NULL != task)
+ {
+ GNUNET_SCHEDULER_cancel (task);
+ task = NULL;
+ }
+ TALER_MERCHANTDB_plugin_unload (db_plugin);
+ db_plugin = NULL;
+ cfg = NULL;
+ if (NULL != ctx)
+ {
+ GNUNET_CURL_fini (ctx);
+ ctx = NULL;
+ }
+ if (NULL != rc)
+ {
+ GNUNET_CURL_gnunet_rc_destroy (rc);
+ rc = NULL;
+ }
+}
+
+
+/**
+ * Check that the given @a wire_fee is what the @a e should charge
+ * at the @a execution_time. If the fee is correct (according to our
+ * database), return #GNUNET_OK. If we do not have the fee structure in our
+ * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee
+ * is bogus, we respond with the proof to the client and return
+ * #GNUNET_SYSERR.
+ *
+ * @param w inquiry to check fees of
+ * @param execution_time time of the wire transfer
+ * @param wire_fee fee claimed by the exchange
+ * @return #GNUNET_SYSERR if we returned hard proof of
+ * missbehavior from the exchange to the client
+ */
+static enum GNUNET_GenericReturnValue
+check_wire_fee (struct Inquiry *w,
+ struct GNUNET_TIME_Timestamp execution_time,
+ const struct TALER_Amount *wire_fee)
+{
+ struct Exchange *e = w->exchange;
+ const struct TALER_EXCHANGE_Keys *keys = e->keys;
+ struct TALER_WireFeeSet fees;
+ struct TALER_MasterSignatureP master_sig;
+ struct GNUNET_TIME_Timestamp start_date;
+ struct GNUNET_TIME_Timestamp end_date;
+ enum GNUNET_DB_QueryStatus qs;
+ char *wire_method;
+
+ if (NULL == keys)
+ {
+ GNUNET_break (0);
+ return GNUNET_NO;
+ }
+ wire_method = TALER_payto_get_method (w->payto_uri);
+ qs = db_plugin->lookup_wire_fee (db_plugin->cls,
+ &keys->master_pub,
+ wire_method,
+ execution_time,
+ &fees,
+ &start_date,
+ &end_date,
+ &master_sig);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_free (wire_method);
+ return GNUNET_SYSERR;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_free (wire_method);
+ return GNUNET_NO;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n",
+ TALER_B2S (&keys->master_pub),
+ wire_method,
+ GNUNET_TIME_timestamp2s (execution_time),
+ TALER_amount2s (wire_fee));
+ GNUNET_free (wire_method);
+ return GNUNET_OK;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break;
+ }
+ if ( (GNUNET_OK !=
+ TALER_amount_cmp_currency (&fees.wire,
+ wire_fee)) ||
+ (0 > TALER_amount_cmp (&fees.wire,
+ wire_fee)) )
+ {
+ GNUNET_break_op (0);
+ GNUNET_free (wire_method);
+ return GNUNET_SYSERR; /* expected_fee >= wire_fee */
+ }
+ GNUNET_free (wire_method);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Closure for #check_transfer()
+ */
+struct CheckTransferContext
+{
+
+ /**
+ * Pointer to the detail that we are currently
+ * checking in #check_transfer().
+ */
+ const struct TALER_TrackTransferDetails *current_detail;
+
+ /**
+ * Which transaction detail are we currently looking at?
+ */
+ unsigned int current_offset;
+
+ /**
+ * #GNUNET_NO if we did not find a matching coin.
+ * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match.
+ * #GNUNET_OK if we did find a matching coin.
+ */
+ enum GNUNET_GenericReturnValue check_transfer_result;
+
+ /**
+ * Set to error code, if any.
+ */
+ enum TALER_ErrorCode ec;
+
+ /**
+ * Set to true if @e ec indicates a permanent failure.
+ */
+ bool failure;
+};
+
+
+/**
+ * This function checks that the information about the coin which
+ * was paid back by _this_ wire transfer matches what _we_ (the merchant)
+ * knew about this coin.
+ *
+ * @param cls closure with our `struct CheckTransferContext *`
+ * @param exchange_url URL of the exchange that issued @a coin_pub
+ * @param amount_with_fee amount the exchange will transfer for this coin
+ * @param deposit_fee fee the exchange will charge for this coin
+ * @param refund_fee fee the exchange will charge for refunding this coin
+ * @param wire_fee paid wire fee
+ * @param h_wire hash of merchant's wire details
+ * @param deposit_timestamp when did the exchange receive the deposit
+ * @param refund_deadline until when are refunds allowed
+ * @param exchange_sig signature by the exchange
+ * @param exchange_pub exchange signing key used for @a exchange_sig
+ */
+static void
+check_transfer (void *cls,
+ const char *exchange_url,
+ const struct TALER_Amount *amount_with_fee,
+ const struct TALER_Amount *deposit_fee,
+ const struct TALER_Amount *refund_fee,
+ const struct TALER_Amount *wire_fee,
+ const struct TALER_MerchantWireHashP *h_wire,
+ struct GNUNET_TIME_Timestamp deposit_timestamp,
+ struct GNUNET_TIME_Timestamp refund_deadline,
+ const struct TALER_ExchangeSignatureP *exchange_sig,
+ const struct TALER_ExchangePublicKeyP *exchange_pub)
+{
+ struct CheckTransferContext *ctc = cls;
+ const struct TALER_TrackTransferDetails *ttd = ctc->current_detail;
+
+ if (GNUNET_SYSERR == ctc->check_transfer_result)
+ {
+ GNUNET_break (0);
+ return; /* already had a serious issue; odd that we're called more than once as well... */
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking coin with value %s\n",
+ TALER_amount2s (amount_with_fee));
+ if ( (GNUNET_OK !=
+ TALER_amount_cmp_currency (amount_with_fee,
+ &ttd->coin_value)) ||
+ (0 != TALER_amount_cmp (amount_with_fee,
+ &ttd->coin_value)) )
+ {
+ /* Disagreement between the exchange and us about how much this
+ coin is worth! */
+ GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Disagreement about coin value %s\n",
+ TALER_amount2s (amount_with_fee));
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Exchange gave it a value of %s\n",
+ TALER_amount2s (&ttd->coin_value));
+ ctc->check_transfer_result = GNUNET_SYSERR;
+ /* Build the `TrackTransferConflictDetails` */
+ ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS;
+ ctc->failure = true;
+ /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */
+ return;
+ }
+ if ( (GNUNET_OK !=
+ TALER_amount_cmp_currency (deposit_fee,
+ &ttd->coin_fee)) ||
+ (0 != TALER_amount_cmp (deposit_fee,
+ &ttd->coin_fee)) )
+ {
+ /* Disagreement between the exchange and us about how much this
+ coin is worth! */
+ GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Expected fee is %s\n",
+ TALER_amount2s (&ttd->coin_fee));
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Fee claimed by exchange is %s\n",
+ TALER_amount2s (deposit_fee));
+ ctc->check_transfer_result = GNUNET_SYSERR;
+ /* Build the `TrackTransferConflictDetails` */
+ ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS;
+ ctc->failure = true;
+ /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */
+ return;
+ }
+ ctc->check_transfer_result = GNUNET_OK;
+}
+
+
+/**
+ * Function called with detailed wire transfer data, including all
+ * of the coin transactions that were combined into the wire transfer.
+ *
+ * @param cls closure a `struct Inquiry *`
+ * @param tgr response details
+ */
+static void
+wire_transfer_cb (void *cls,
+ const struct TALER_EXCHANGE_TransfersGetResponse *tgr)
+{
+ struct Inquiry *w = cls;
+ struct Exchange *e = w->exchange;
+ enum GNUNET_DB_QueryStatus qs;
+ const struct TALER_EXCHANGE_TransferData *td = NULL;
+
+ e->exchange_inquiries--;
+ w->wdh = NULL;
+ if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries)
+ launch_inquiries_at_exchange (e);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got response code %u from exchange for GET /transfers/$WTID\n",
+ tgr->hr.http_status);
+ switch (tgr->hr.http_status)
+ {
+ case MHD_HTTP_OK:
+ td = &tgr->details.ok.td;
+ w->execution_time = td->execution_time;
+ e->transfer_delay = GNUNET_TIME_UNIT_ZERO;
+ break;
+ case MHD_HTTP_BAD_REQUEST:
+ case MHD_HTTP_FORBIDDEN:
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_HARD_FAILURE,
+ true,
+ false);
+ end_inquiry (w);
+ return;
+ case MHD_HTTP_NOT_FOUND:
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_FATAL_NOT_FOUND,
+ true,
+ false);
+ end_inquiry (w);
+ return;
+ case MHD_HTTP_INTERNAL_SERVER_ERROR:
+ case MHD_HTTP_BAD_GATEWAY:
+ case MHD_HTTP_GATEWAY_TIMEOUT:
+ e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (
+ e->transfer_delay),
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE,
+ false,
+ false);
+ end_inquiry (w);
+ return;
+ default:
+ e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Unexpected HTTP status %u\n",
+ tgr->hr.http_status);
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (
+ e->transfer_delay),
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE,
+ false,
+ false);
+ end_inquiry (w);
+ return;
+ }
+ db_plugin->preflight (db_plugin->cls);
+ qs = db_plugin->insert_transfer_details (db_plugin->cls,
+ w->instance_id,
+ w->exchange->exchange_url,
+ w->payto_uri,
+ &w->wtid,
+ td);
+ if (0 > qs)
+ {
+ /* Always report on DB error as well to enable diagnostics */
+ GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transfer already known. Ignoring duplicate.\n");
+ return;
+ }
+
+ {
+ struct CheckTransferContext ctc = {
+ .ec = TALER_EC_NONE,
+ .failure = false
+ };
+
+ for (unsigned int i = 0; i<td->details_length; i++)
+ {
+ const struct TALER_TrackTransferDetails *ttd = &td->details[i];
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (TALER_EC_NONE != ctc.ec)
+ break; /* already encountered an error */
+ ctc.current_offset = i;
+ ctc.current_detail = ttd;
+ /* Set the coin as "never seen" before. */
+ ctc.check_transfer_result = GNUNET_NO;
+ qs = db_plugin->lookup_deposits_by_contract_and_coin (
+ db_plugin->cls,
+ w->instance_id,
+ &ttd->h_contract_terms,
+ &ttd->coin_pub,
+ &check_transfer,
+ &ctc);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_break (0);
+ ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED;
+ break;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED;
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* The exchange says we made this deposit, but WE do not
+ recall making it (corrupted / unreliable database?)!
+ Well, let's say thanks and accept the money! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to find payment data in DB\n");
+ ctc.check_transfer_result = GNUNET_OK;
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break;
+ }
+ switch (ctc.check_transfer_result)
+ {
+ case GNUNET_NO:
+ /* Internal error: how can we have called #check_transfer()
+ but still have no result? */
+ GNUNET_break (0);
+ ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE;
+ return;
+ case GNUNET_SYSERR:
+ /* #check_transfer() failed, report conflict! */
+ GNUNET_break_op (0);
+ GNUNET_assert (TALER_EC_NONE != ctc.ec);
+ break;
+ case GNUNET_OK:
+ break;
+ }
+ }
+ if (TALER_EC_NONE != ctc.ec)
+ {
+ update_transaction_status (
+ w,
+ ctc.failure
+ ? GNUNET_TIME_UNIT_FOREVER_ABS
+ : GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MINUTES),
+ ctc.ec,
+ ctc.failure,
+ false);
+ end_inquiry (w);
+ return;
+ }
+ }
+
+ if (GNUNET_SYSERR ==
+ check_wire_fee (w,
+ td->execution_time,
+ &td->wire_fee))
+ {
+ GNUNET_break_op (0);
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE,
+ true,
+ false);
+ end_inquiry (w);
+ return;
+ }
+
+ if ( (GNUNET_OK !=
+ TALER_amount_cmp_currency (&td->total_amount,
+ &w->total)) ||
+ (0 !=
+ TALER_amount_cmp (&td->total_amount,
+ &w->total)) )
+ {
+ GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Wire transfer total value was %s\n",
+ TALER_amount2s (&w->total));
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Exchange claimed total value to be %s\n",
+ TALER_amount2s (&td->total_amount));
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_CONFLICTING_TRANSFERS,
+ true,
+ false);
+ end_inquiry (w);
+ return;
+ }
+ /* set transaction to successful */
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_NONE,
+ false,
+ true);
+ end_inquiry (w);
+}
+
+
+/**
+ * Initiate download from an exchange for a given inquiry.
+ *
+ * @param cls a `struct Inquiry *`
+ */
+static void
+exchange_request (void *cls)
+{
+ struct Inquiry *w = cls;
+ struct Exchange *e = w->exchange;
+
+ w->task = NULL;
+ GNUNET_assert (e->ready);
+ w->wdh = TALER_EXCHANGE_transfers_get (
+ ctx,
+ e->exchange_url,
+ e->keys,
+ &w->wtid,
+ &wire_transfer_cb,
+ w);
+ if (NULL == w->wdh)
+ {
+ GNUNET_break (0);
+ e->exchange_inquiries--;
+ e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (
+ e->transfer_delay),
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE,
+ false,
+ false);
+ end_inquiry (w);
+ return;
+ }
+ /* Wait at least 1m for the network transfer */
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MINUTES),
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST,
+ false,
+ false);
+}
+
+
+/**
+ * Function called with information about a transfer we
+ * should ask the exchange about.
+ *
+ * @param cls closure (NULL)
+ * @param rowid row of the transfer in the merchant database
+ * @param instance_id instance that received the transfer
+ * @param exchange_url base URL of the exchange that initiated the transfer
+ * @param payto_uri account of the merchant that received the transfer
+ * @param wtid wire transfer subject identifying the aggregation
+ * @param total total amount that was wired
+ * @param next_attempt when should we next try to interact with the exchange
+ */
+static void
+start_inquiry (
+ void *cls,
+ uint64_t rowid,
+ const char *instance_id,
+ const char *exchange_url,
+ const char *payto_uri,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total,
+ struct GNUNET_TIME_Absolute next_attempt)
+{
+ struct Exchange *e;
+ struct Inquiry *w;
+
+ (void) cls;
+ if (GNUNET_TIME_absolute_is_future (next_attempt))
+ {
+ if (NULL == task)
+ task = GNUNET_SCHEDULER_add_at (next_attempt,
+ &find_work,
+ NULL);
+ return;
+ }
+ e = find_exchange (exchange_url);
+ for (w = e->w_head; NULL != w; w = w->next)
+ {
+ if (0 == GNUNET_memcmp (&w->wtid,
+ wtid))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Already processing inquiry. Aborting ongoing inquiry\n");
+ end_inquiry (w);
+ break;
+ }
+ }
+
+ active_inquiries++;
+ w = GNUNET_new (struct Inquiry);
+ w->payto_uri = GNUNET_strdup (payto_uri);
+ w->instance_id = GNUNET_strdup (instance_id);
+ w->rowid = rowid;
+ w->wtid = *wtid;
+ w->total = *total;
+ GNUNET_CONTAINER_DLL_insert (e->w_head,
+ e->w_tail,
+ w);
+ w->exchange = e;
+ if (w->exchange->ready)
+ w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
+ w);
+ /* Wait at least 1 minute for /keys */
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MINUTES),
+ TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS,
+ false,
+ false);
+}
+
+
+static void
+find_work (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ int limit;
+
+ (void) cls;
+ task = NULL;
+ GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries);
+ limit = OPEN_INQUIRY_LIMIT - active_inquiries;
+ if (0 == limit)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Not looking for work: at limit\n");
+ at_limit = true;
+ return;
+ }
+ at_limit = false;
+ qs = db_plugin->select_open_transfers (db_plugin->cls,
+ limit,
+ &start_inquiry,
+ NULL);
+ if (qs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain open transfers from database\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (qs == limit)
+ {
+ /* DB limited response, re-trigger DB interaction
+ the moment we significantly fall below the
+ limit */
+ at_limit = true;
+ }
+ if (0 == active_inquiries)
+ {
+ if (test_mode)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No more open inquiries and in test mode. Existing.\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No open inquiries found, waiting for notification to resume\n");
+ }
+}
+
+
+/**
+ * Function called when transfers are added to the merchant database. We look
+ * for more work.
+ *
+ * @param cls closure (NULL)
+ * @param extra additional event data provided
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+transfer_added (void *cls,
+ const void *extra,
+ size_t extra_size)
+{
+ (void) cls;
+ (void) extra;
+ (void) extra_size;
+ if (active_inquiries > OPEN_INQUIRY_LIMIT / 2)
+ {
+ /* Trigger DB only once we are substantially below the limit */
+ at_limit = true;
+ return;
+ }
+ if (NULL != task)
+ return;
+ task = GNUNET_SCHEDULER_add_now (&find_work,
+ NULL);
+}
+
+
+/**
+ * First task.
+ *
+ * @param cls closure, NULL
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be NULL!)
+ * @param c configuration
+ */
+static void
+run (void *cls,
+ char *const *args,
+ const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ (void) args;
+ (void) cfgfile;
+
+ cfg = c;
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
+ ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
+ &rc);
+ rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ if (NULL == ctx)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
+ return;
+ }
+ if (NULL ==
+ (db_plugin = TALER_MERCHANTDB_plugin_load (cfg)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to initialize DB subsystem\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->connect (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to connect to database\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
+ return;
+ }
+ {
+ struct GNUNET_DB_EventHeaderP es = {
+ .size = htons (sizeof (es)),
+ .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED)
+ };
+
+ eh = db_plugin->event_listen (db_plugin->cls,
+ &es,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transfer_added,
+ NULL);
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&find_work,
+ NULL);
+}
+
+
+/**
+ * The main function of taler-merchant-exchange
+ *
+ * @param argc number of arguments from the command line
+ * @param argv command line arguments
+ * @return 0 ok, 1 on error
+ */
+int
+main (int argc,
+ char *const *argv)
+{
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_timetravel ('T',
+ "timetravel"),
+ GNUNET_GETOPT_option_flag ('t',
+ "test",
+ "run in test mode and exit when idle",
+ &test_mode),
+ GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
+ GNUNET_GETOPT_OPTION_END
+ };
+ enum GNUNET_GenericReturnValue ret;
+
+ if (GNUNET_OK !=
+ GNUNET_STRINGS_get_utf8_args (argc, argv,
+ &argc, &argv))
+ return EXIT_INVALIDARGUMENT;
+ TALER_OS_init ();
+ ret = GNUNET_PROGRAM_run (
+ argc, argv,
+ "taler-merchant-exchange",
+ gettext_noop (
+ "background process that reconciles bank transfers with orders by asking the exchange"),
+ options,
+ &run, NULL);
+ GNUNET_free_nz ((void *) argv);
+ if (GNUNET_SYSERR == ret)
+ return EXIT_INVALIDARGUMENT;
+ if (GNUNET_NO == ret)
+ return EXIT_SUCCESS;
+ if ( (found_problem) &&
+ (0 == global_ret) )
+ global_ret = 7;
+ return global_ret;
+}
+
+
+/* end of taler-merchant-exchange.c */