diff options
Diffstat (limited to 'src/backend/taler-merchant-exchange.c')
-rw-r--r-- | src/backend/taler-merchant-exchange.c | 604 |
1 files changed, 465 insertions, 139 deletions
diff --git a/src/backend/taler-merchant-exchange.c b/src/backend/taler-merchant-exchange.c index 66893f03..ff480ca9 100644 --- a/src/backend/taler-merchant-exchange.c +++ b/src/backend/taler-merchant-exchange.c @@ -45,6 +45,13 @@ */ #define EXCHANGE_INQUIRY_LIMIT 16 + +/** + * Information about an inquiry job. + */ +struct Inquiry; + + /** * Information about an exchange. */ @@ -61,6 +68,16 @@ struct Exchange struct Exchange *prev; /** + * Head of active inquiries. + */ + struct Inquiry *w_head; + + /** + * Tail of active inquiries. + */ + struct Inquiry *w_tail; + + /** * Which exchange are we tracking here. */ char *exchange_url; @@ -91,6 +108,12 @@ struct Exchange struct GNUNET_TIME_Relative retry_delay; /** + * How long should we wait between requests + * for transfer details? + */ + struct GNUNET_TIME_Relative transfer_delay; + + /** * False to indicate that there is an ongoing * /keys transfer we are waiting for; * true to indicate that /keys data is up-to-date. @@ -121,6 +144,11 @@ struct Inquiry struct Exchange *exchange; /** + * Task where we retry fetching transfer details from the exchange. + */ + struct GNUNET_SCHEDULER_Task *task; + + /** * For which merchant instance is this tracking request? */ char *instance_id; @@ -136,12 +164,6 @@ struct Inquiry struct TALER_EXCHANGE_TransfersGetHandle *wdh; /** - * Pointer to the detail that we are currently - * checking in #check_transfer(). - */ - const struct TALER_TrackTransferDetails *current_detail; - - /** * When did the transfer happen? */ struct GNUNET_TIME_Absolute execution_time; @@ -161,24 +183,6 @@ struct Inquiry */ uint64_t rowid; - /** - * Which transaction detail are we currently looking at? - */ - unsigned int current_offset; - - /** - * #GNUNET_NO if we did not find a matching coin. - * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. - * #GNUNET_OK if we did find a matching coin. - */ - enum GNUNET_GenericReturnValue check_transfer_result; - - /** - * Are we done with the exchange request for this - * inquiry? - */ - bool exchange_done; - }; @@ -193,16 +197,6 @@ static struct Exchange *e_head; static struct Exchange *e_tail; /** - * Head of active inquiries. - */ -static struct Inquiry *w_head; - -/** - * Tail of active inquiries. - */ -static struct Inquiry *w_tail; - -/** * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; @@ -244,14 +238,17 @@ static unsigned int active_inquiries; static int global_ret; /** - * How many transactions should we fetch at most per batch? + * #GNUNET_YES if we are in test mode and should exit when idle. */ -static unsigned int batch_size = 32; +static int test_mode; /** - * #GNUNET_YES if we are in test mode and should exit when idle. + * True if the last DB query was limited by the + * #OPEN_INQUIRY_LIMIT and we thus should check again + * as soon as we are substantially below that limit, + * and not only when we get a DB notification. */ -static int test_mode; +static bool at_limit; /** @@ -267,27 +264,21 @@ exchange_request (void *cls); * The exchange @a e is ready to handle more inquiries, * prepare to launch them. * - * @param e exchange to potentially launch inquiries on + * @param[in,out] e exchange to potentially launch inquiries on */ static void -launch_inquiries_at_exchange (const struct Exchange *e) +launch_inquiries_at_exchange (struct Exchange *e) { - /* Note: this is an O(n) that should be optimized to an O(1) by tracking - inquiries per exchange at the exchange... */ - for (struct Inquiry w = w_head; + for (struct Inquiry *w = e->w_head; NULL != w; w = w->next) { - if (w->exchange_done) - continue; - if (w->exchange != e) - continue; + if (e->exchange_inquiries > EXCHANGE_INQUIRY_LIMIT) + break; if ( (NULL == w->task) && (NULL == w->wdh) ) { - /* Note: we should additionally count the number of exchange - requests launched here to not then run into the per-exchange - limit at exchange_request */ + e->exchange_inquiries++; w->task = GNUNET_SCHEDULER_add_now (&exchange_request, w); } @@ -296,38 +287,93 @@ launch_inquiries_at_exchange (const struct Exchange *e) /** + * Function that initiates a /keys download. + * + * @param cls a `struct Exchange *` + */ +static void +download_keys (void *cls); + + +/** * Function called with information about who is auditing * a particular exchange and what keys the exchange is using. * * @param cls closure with a `struct Exchange *` - * @param hr HTTP response data - * @param keys information about the various keys used - * by the exchange, NULL if /keys failed - * @param compat protocol compatibility information + * @param kr response data */ static void cert_cb ( void *cls, - const struct TALER_EXCHANGE_HttpResponse *hr, - const struct TALER_EXCHANGE_Keys *keys, - enum TALER_EXCHANGE_VersionCompatibility compat) + const struct TALER_EXCHANGE_KeysResponse *kr) { struct Exchange *e = cls; + struct GNUNET_TIME_Timestamp t; + struct GNUNET_TIME_Absolute n; - switch (hr->http_status) + switch (kr->hr.http_status) { case MHD_HTTP_OK: e->ready = true; launch_inquiries_at_exchange (e); - // FIXME: schedule retry? + /* Reset back-off */ + e->retry_delay = GNUNET_TIME_UNIT_ZERO; + /* Success: rate limit at once per minute */ + e->first_retry = GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES); + /* Moreover usually only go after the current + response actually expired */ + t = TALER_EXCHANGE_check_keys_current (e->conn, + TALER_EXCHANGE_CKF_NONE); + n = GNUNET_TIME_absolute_max (t.abs_time, + e->first_retry); + if (NULL != e->retry_task) + GNUNET_SCHEDULER_cancel (e->retry_task); + e->retry_task = GNUNET_SCHEDULER_add_at (n, + &download_keys, + e); break; default: - // FIXME: schedule retry! + e->retry_delay + = GNUNET_TIME_STD_BACKOFF (e->retry_delay); + e->first_retry + = GNUNET_TIME_relative_to_absolute (e->retry_delay); + + if (NULL != e->retry_task) + GNUNET_SCHEDULER_cancel (e->retry_task); + e->retry_task = GNUNET_SCHEDULER_add_delayed (e->retry_delay, + &download_keys, + e); break; } } +static void +download_keys (void *cls) +{ + struct Exchange *e = cls; + struct GNUNET_TIME_Relative n; + + /* If we do not hear back again soon, try again automatically */ + n = GNUNET_TIME_STD_BACKOFF (e->retry_delay); + n = GNUNET_TIME_relative_max (n, + GNUNET_TIME_UNIT_MINUTES); + e->retry_task = GNUNET_SCHEDULER_add_delayed (n, + &download_keys, + e); + if (NULL == e->conn) + e->conn = TALER_EXCHANGE_connect (ctx, + e->exchange_url, + &cert_cb, + e, + TALER_EXCHANGE_OPTION_END); + else + (void) TALER_EXCHANGE_check_keys_current (e->conn, + TALER_EXCHANGE_CKF_NONE); +} + + /** * Updates the transaction status for inquiry @a w to the given values. * @@ -344,7 +390,22 @@ update_transaction_status (const struct Inquiry *w, bool failed, bool verified) { - // FIXME: DB update here. Log result, on failure shutdown. + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->update_transfer_status (db_plugin->cls, + w->exchange->exchange_url, + &w->wtid, + next_attempt, + ec, + failed, + verified); + if (qs < 0) + { + GNUNET_break (0); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } } @@ -370,16 +431,22 @@ find_exchange (const char *exchange_url) GNUNET_CONTAINER_DLL_insert (e_head, e_tail, e); - e->conn = TALER_EXCHANGE_connect (ctx, - exchange_url, - &cert_cb, - e, - TALER_EXCHANGE_OPTION_END); + e->retry_task = GNUNET_SCHEDULER_add_now (&download_keys, + e); return e; } /** + * Finds new transfers that require work in the merchant database. + * + * @param cls NULL + */ +static void +find_work (void *cls); + + +/** * Free resources of @a w. * * @param[in] w inquiry job to terminate @@ -387,6 +454,8 @@ find_exchange (const char *exchange_url) static void end_inquiry (struct Inquiry *w) { + struct Exchange *e = w->exchange; + GNUNET_assert (active_inquiries > 0); active_inquiries--; if (NULL != w->wdh) @@ -396,10 +465,18 @@ end_inquiry (struct Inquiry *w) } GNUNET_free (w->instance_id); GNUNET_free (w->payto_uri); - GNUNET_CONTAINER_DLL_remove (w_head, - w_tail, + GNUNET_CONTAINER_DLL_remove (e->w_head, + e->w_tail, w); GNUNET_free (w); + if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && + (NULL == task) && + (at_limit) ) + { + at_limit = false; + task = GNUNET_SCHEDULER_add_now (&find_work, + NULL); + } } @@ -414,16 +491,16 @@ shutdown_task (void *cls) (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); - while (NULL != w_head) - { - struct Inquiry *w = w_head; - - end_inquiry (w); - } while (NULL != e_head) { struct Exchange *e = e_head; + while (NULL != e->w_head) + { + struct Inquiry *w = e->w_head; + + end_inquiry (w); + } GNUNET_free (e->exchange_url); TALER_EXCHANGE_disconnect (e->conn); GNUNET_CONTAINER_DLL_remove (e_head, @@ -452,26 +529,27 @@ shutdown_task (void *cls) } -// FIXME: where is this used? Where do we check the totals!? /** - * Check that the given @a wire_fee is what the @a exchange_pub should charge + * Check that the given @a wire_fee is what the @a e should charge * at the @a execution_time. If the fee is correct (according to our * database), return #GNUNET_OK. If we do not have the fee structure in our * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee * is bogus, we respond with the proof to the client and return * #GNUNET_SYSERR. * - * @param ptc context of the transfer to respond to + * @param w inquiry to check fees of * @param execution_time time of the wire transfer * @param wire_fee fee claimed by the exchange * @return #GNUNET_SYSERR if we returned hard proof of * missbehavior from the exchange to the client */ static enum GNUNET_GenericReturnValue -check_wire_fee (struct PostTransfersContext *ptc, +check_wire_fee (struct Inquiry *w, struct GNUNET_TIME_Timestamp execution_time, const struct TALER_Amount *wire_fee) { + struct Exchange *e = w->exchange; + const struct TALER_EXCHANGE_Keys *keys; struct TALER_WireFeeSet fees; struct TALER_MasterSignatureP master_sig; struct GNUNET_TIME_Timestamp start_date; @@ -479,35 +557,39 @@ check_wire_fee (struct PostTransfersContext *ptc, enum GNUNET_DB_QueryStatus qs; char *wire_method; - wire_method = TALER_payto_get_method (ptc->payto_uri); - qs = TMH_db->lookup_wire_fee (TMH_db->cls, - &ptc->master_pub, - wire_method, - execution_time, - &fees, - &start_date, - &end_date, - &master_sig); + keys = TALER_EXCHANGE_get_keys (e->conn); + if (NULL == keys) + { + GNUNET_break (0); + return GNUNET_NO; + } + wire_method = TALER_payto_get_method (w->payto_uri); + qs = db_plugin->lookup_wire_fee (db_plugin->cls, + &keys->master_pub, + wire_method, + execution_time, + &fees, + &start_date, + &end_date, + &master_sig); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); - ptc->response_code = MHD_HTTP_INTERNAL_SERVER_ERROR; - ptc->response = TALER_MHD_make_error (TALER_EC_GENERIC_DB_FETCH_FAILED, - "lookup_wire_fee"); + GNUNET_free (wire_method); return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: - ptc->soft_retry = true; + GNUNET_free (wire_method); return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", - TALER_B2S (&ptc->master_pub), + TALER_B2S (&keys->master_pub), wire_method, GNUNET_TIME_timestamp2s (execution_time), TALER_amount2s (wire_fee)); GNUNET_free (wire_method); - return GNUNET_NO; + return GNUNET_OK; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: break; } @@ -517,64 +599,179 @@ check_wire_fee (struct PostTransfersContext *ptc, GNUNET_free (wire_method); return GNUNET_OK; /* expected_fee >= wire_fee */ } - /* Wire fee check failed, export proof to auditor! */ - // FIXME: report error! GNUNET_free (wire_method); return GNUNET_SYSERR; } /** + * Closure for #check_transfer() + */ +struct CheckTransferContext +{ + + /** + * Pointer to the detail that we are currently + * checking in #check_transfer(). + */ + const struct TALER_TrackTransferDetails *current_detail; + + /** + * Which transaction detail are we currently looking at? + */ + unsigned int current_offset; + + /** + * #GNUNET_NO if we did not find a matching coin. + * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. + * #GNUNET_OK if we did find a matching coin. + */ + enum GNUNET_GenericReturnValue check_transfer_result; + + /** + * Set to error code, if any. + */ + enum TALER_ErrorCode ec; + + /** + * Set to true if @e ec indicates a permanent failure. + */ + bool failure; +}; + + +/** + * This function checks that the information about the coin which + * was paid back by _this_ wire transfer matches what _we_ (the merchant) + * knew about this coin. + * + * @param cls closure with our `struct CheckTransferContext *` + * @param exchange_url URL of the exchange that issued @a coin_pub + * @param amount_with_fee amount the exchange will transfer for this coin + * @param deposit_fee fee the exchange will charge for this coin + * @param refund_fee fee the exchange will charge for refunding this coin + * @param wire_fee paid wire fee + * @param h_wire hash of merchant's wire details + * @param deposit_timestamp when did the exchange receive the deposit + * @param refund_deadline until when are refunds allowed + * @param exchange_sig signature by the exchange + * @param exchange_pub exchange signing key used for @a exchange_sig + */ +static void +check_transfer (void *cls, + const char *exchange_url, + const struct TALER_Amount *amount_with_fee, + const struct TALER_Amount *deposit_fee, + const struct TALER_Amount *refund_fee, + const struct TALER_Amount *wire_fee, + const struct TALER_MerchantWireHashP *h_wire, + struct GNUNET_TIME_Timestamp deposit_timestamp, + struct GNUNET_TIME_Timestamp refund_deadline, + const struct TALER_ExchangeSignatureP *exchange_sig, + const struct TALER_ExchangePublicKeyP *exchange_pub) +{ + struct CheckTransferContext *ctc = cls; + const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; + + if (GNUNET_SYSERR == ctc->check_transfer_result) + { + GNUNET_break (0); + return; /* already had a serious issue; odd that we're called more than once as well... */ + } + if ( (0 != TALER_amount_cmp (amount_with_fee, + &ttd->coin_value)) || + (0 != TALER_amount_cmp (deposit_fee, + &ttd->coin_fee)) ) + { + /* Disagreement between the exchange and us about how much this + coin is worth! */ + GNUNET_break_op (0); + ctc->check_transfer_result = GNUNET_SYSERR; + /* Build the `TrackTransferConflictDetails` */ + ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; + ctc->failure = true; + /* TODO: this should be reported to the auditor! */ + return; + } + ctc->check_transfer_result = GNUNET_OK; +} + + +/** * Function called with detailed wire transfer data, including all * of the coin transactions that were combined into the wire transfer. * * @param cls closure a `struct Inquiry *` - * @param hr HTTP response details - * @param td transfer data + * @param tgr response details */ static void wire_transfer_cb (void *cls, - const struct TALER_EXCHANGE_HttpResponse *hr, - const struct TALER_EXCHANGE_TransferData *td) + const struct TALER_EXCHANGE_TransfersGetResponse *tgr) { struct Inquiry *w = cls; struct Exchange *e = w->exchange; enum GNUNET_DB_QueryStatus qs; + const struct TALER_EXCHANGE_TransferData *td = NULL; e->exchange_inquiries--; + w->wdh = NULL; if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) launch_inquiries_at_exchange (e); - w->wdh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got response code %u from exchange for GET /transfers/$WTID\n", - hr->http_status); - switch (hr->http_status) + tgr->hr.http_status); + switch (tgr->hr.http_status) { case MHD_HTTP_OK: + td = &tgr->details.ok.td; + e->transfer_delay = GNUNET_TIME_UNIT_ZERO; break; + case MHD_HTTP_BAD_REQUEST: + case MHD_HTTP_FORBIDDEN: + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_HARD_FAILURE, + true, + false); + return; case MHD_HTTP_NOT_FOUND: update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, - TALER_EC_MERCHANT_XXX, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_FATAL_NOT_FOUND, true, false); return; + case MHD_HTTP_INTERNAL_SERVER_ERROR: + case MHD_HTTP_BAD_GATEWAY: + case MHD_HTTP_GATEWAY_TIMEOUT: + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, + false, + false); + return; default: + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unexpected HTTP status %u\n", + tgr->hr.http_status); update_transaction_status (w, - GNUNET_TIME_relative_to_absolute (delay), - TALER_EC_MERCHANT_XXX, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, false, false); return; } - TMH_db->preflight (TMH_db->cls); - /* Ok, exchange answer is acceptable, store it */ - qs = TMH_db->insert_transfer_details (TMH_db->cls, - w->instance_id, - w->exchange_url, - w->payto_uri, - &w->wtid, - td); + db_plugin->preflight (db_plugin->cls); + qs = db_plugin->insert_transfer_details (db_plugin->cls, + w->instance_id, + w->exchange->exchange_url, + w->payto_uri, + &w->wtid, + td); if (0 > qs) { /* Always report on DB error as well to enable diagnostics */ @@ -585,18 +782,113 @@ wire_transfer_cb (void *cls, } if (0 == qs) { - GNUNET_break (0); + /* FIXME: set as confirmed!!? */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transfer already known. Ignoring duplicate.\n"); + /* FIXME: if not verified & not ultimately failed, + we probably should still try to verify! */ return; } - // FIXME: check wire fee somewhere!??! + + { + struct CheckTransferContext ctc = { + .ec = TALER_EC_NONE, + .failure = false + }; + + for (unsigned int i = 0; i<td->details_length; i++) + { + const struct TALER_TrackTransferDetails *ttd = &td->details[i]; + enum GNUNET_DB_QueryStatus qs; + + if (TALER_EC_NONE != ctc.ec) + break; /* already encountered an error */ + ctc.current_offset = i; + ctc.current_detail = ttd; + /* Set the coin as "never seen" before. */ + ctc.check_transfer_result = GNUNET_NO; + qs = db_plugin->lookup_deposits_by_contract_and_coin ( + db_plugin->cls, + w->instance_id, + &ttd->h_contract_terms, + &ttd->coin_pub, + &check_transfer, + &ctc); + switch (qs) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; + break; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* The exchange says we made this deposit, but WE do not + recall making it (corrupted / unreliable database?)! + Well, let's say thanks and accept the money! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to find payment data in DB\n"); + ctc.check_transfer_result = GNUNET_OK; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; + } + switch (ctc.check_transfer_result) + { + case GNUNET_NO: + /* Internal error: how can we have called #check_transfer() + but still have no result? */ + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; + return; + case GNUNET_SYSERR: + /* #check_transfer() failed, report conflict! */ + GNUNET_break_op (0); + GNUNET_assert (TALER_EC_NONE != ctc.ec); + return; + case GNUNET_OK: + break; + } + } + if (TALER_EC_NONE != ctc.ec) + { + update_transaction_status ( + w, + ctc.failure + ? GNUNET_TIME_UNIT_FOREVER_ABS + : GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + ctc.ec, + ctc.failure, + false); + return; + } + } + + if (GNUNET_SYSERR == + check_wire_fee (w, + td->execution_time, + &td->wire_fee)) + { + GNUNET_break_op (0); + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, + true, + false); + return; + } + if (0 != TALER_amount_cmp (&td->total_amount, &w->total)) { - /* record inconsistency in DB! (TODO: report to auditor!?) */ + GNUNET_break_op (0); update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, - TALER_EC_MERCHANT_XXX, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_CONFLICTING_TRANSFERS, true, false); return; @@ -622,8 +914,6 @@ exchange_request (void *cls) struct Exchange *e = w->exchange; GNUNET_assert (e->ready); - if (EXCHANGE_INQUIRY_LIMIT <= e->exchange_inquiries) - return; /* blocked by exchange rate limit */ w->wdh = TALER_EXCHANGE_transfers_get (e->conn, &w->wtid, &wire_transfer_cb, @@ -631,17 +921,21 @@ exchange_request (void *cls) if (NULL == w->wdh) { GNUNET_break (0); + e->exchange_inquiries--; + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); update_transaction_status (w, - GNUNET_TIME_relative_to_absolute (delay), - TALER_EC_MERCHANT_XXX, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, false, false); return; } - e->exchange_inquiries++; + /* Wait at least 1m for the network transfer */ update_transaction_status (w, - GNUNET_TIME_relative_to_absolute (delay), - TALER_EC_MERCHANT_EXCHANGE_GET_TRANSFER_IN_PROGRESS, + GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, false, false); } @@ -671,9 +965,11 @@ start_inquiry ( const struct TALER_Amount *total, struct GNUNET_TIME_Absolute next_attempt) { + struct Exchange *e; struct Inquiry *w; (void) cls; + e = find_exchange (exchange_url); active_inquiries++; w = GNUNET_new (struct Inquiry); w->payto_uri = GNUNET_strdup (payto_uri); @@ -681,37 +977,42 @@ start_inquiry ( w->rowid = rowid; w->wtid = *wtid; w->total = *total; - GNUNET_CONTAINER_DLL_insert (w_head, - w_tail, + GNUNET_CONTAINER_DLL_insert (e->w_head, + e->w_tail, w); - w->exchange = find_exchange (exchange_url); + w->exchange = e; if (w->exchange->ready) w->task = GNUNET_SCHEDULER_add_now (&exchange_request, w); + /* Wait at least 1 minute for /keys */ update_transaction_status (w, - GNUNET_TIME_relative_to_absolute (delay), - TALER_EC_MERCHANT_EXCHANGE_KEYS_IN_PROGRESS, + GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, false, false); } -/** - * Finds new transfers that require work in the merchant - * database. - * - * @param cls NULL - */ static void find_work (void *cls) { enum GNUNET_DB_QueryStatus qs; + int limit; + (void) cls; task = NULL; - // NOTE: SELECT WHERE confirmed AND NOT verified AND NOT failed?; - // FIXME: use LIMIT clause! => When do we try again if LIMIT applied? + GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); + limit = OPEN_INQUIRY_LIMIT - active_inquiries; + if (0 == limit) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Not looking for work: at limit\n"); + at_limit = true; + return; + } qs = db_plugin->select_open_transfers (db_plugin->cls, - OPEN_INQUIRY_LIMIT - active_inquiries, + limit, &start_inquiry, NULL); if (qs < 0) @@ -721,6 +1022,25 @@ find_work (void *cls) GNUNET_SCHEDULER_shutdown (); return; } + if (qs == limit) + { + /* DB limited response, re-trigger DB interaction + the moment we significantly fall below the + limit */ + at_limit = true; + } + if (0 == active_inquiries) + { + if (test_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more open inquiries and in test mode. Existing.\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No open inquiries found, waiting for notification to resume\n"); + } } @@ -740,6 +1060,12 @@ transfer_added (void *cls, (void) cls; (void) extra; (void) extra_size; + if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) + { + /* Trigger DB only once we are substantially below the limit */ + at_limit = true; + return; + } if (NULL != task) return; task = GNUNET_SCHEDULER_add_now (&find_work, @@ -795,7 +1121,7 @@ run (void *cls, { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), - .type = htons (TALER_DBEVENT_MERCHANT_XXX) + .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) }; eh = db_plugin->event_listen (db_plugin->cls, |