summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-pull-debit.ts')
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-debit.ts386
1 files changed, 264 insertions, 122 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 6cc552714..0355b58ad 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -33,10 +33,12 @@ import {
HttpStatusCode,
Logger,
NotificationType,
+ ObservabilityEventType,
PeerContractTerms,
PreparePeerPullDebitRequest,
PreparePeerPullDebitResponse,
RefreshReason,
+ SelectedProspectiveCoin,
TalerError,
TalerErrorCode,
TalerPreciseTimestamp,
@@ -124,13 +126,16 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const transactionId = this.transactionId;
const ws = this.wex;
const peerPullDebitId = this.peerPullDebitId;
- await ws.db.runReadWriteTx(["peerPullDebit", "tombstones"], async (tx) => {
- const debit = await tx.peerPullDebit.get(peerPullDebitId);
- if (debit) {
- await tx.peerPullDebit.delete(peerPullDebitId);
- await tx.tombstones.put({ id: transactionId });
- }
- });
+ await ws.db.runReadWriteTx(
+ { storeNames: ["peerPullDebit", "tombstones"] },
+ async (tx) => {
+ const debit = await tx.peerPullDebit.get(peerPullDebitId);
+ if (debit) {
+ await tx.peerPullDebit.delete(peerPullDebitId);
+ await tx.tombstones.put({ id: transactionId });
+ }
+ },
+ );
}
async suspendTransaction(): Promise<void> {
@@ -139,7 +144,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const wex = this.wex;
const peerPullDebitId = this.peerPullDebitId;
const transitionInfo = await wex.db.runReadWriteTx(
- ["peerPullDebit"],
+ { storeNames: ["peerPullDebit"] },
async (tx) => {
const pullDebitRec = await tx.peerPullDebit.get(peerPullDebitId);
if (!pullDebitRec) {
@@ -234,6 +239,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
"coinAvailability",
"denominations",
"refreshGroups",
+ "refreshSessions",
"coins",
"coinAvailability",
],
@@ -302,7 +308,7 @@ export class PeerPullDebitTransactionContext implements TransactionContext {
const wex = this.wex;
const extraStores = opts.extraStores ?? [];
const transitionInfo = await wex.db.runReadWriteTx(
- ["peerPullDebit", ...extraStores],
+ { storeNames: ["peerPullDebit", ...extraStores] },
async (tx) => {
const pi = await tx.peerPullDebit.get(this.peerPullDebitId);
if (!pi) {
@@ -369,13 +375,25 @@ async function handlePurseCreationConflict(
}
}
- const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
+ const coinSelRes = await selectPeerCoins(ws, {
+ instructedAmount,
+ repair,
+ });
- if (coinSelRes.type == "failure") {
- // FIXME: Details!
- throw Error(
- "insufficient balance to re-select coins to repair double spending",
- );
+ switch (coinSelRes.type) {
+ case "failure":
+ // FIXME: Details!
+ throw Error(
+ "insufficient balance to re-select coins to repair double spending",
+ );
+ case "prospective":
+ throw Error(
+ "insufficient balance to re-select coins to repair double spending (blocked on refresh)",
+ );
+ case "success":
+ break;
+ default:
+ assertUnreachable(coinSelRes);
}
const totalAmount = await getTotalPeerPaymentCost(
@@ -383,7 +401,7 @@ async function handlePurseCreationConflict(
coinSelRes.result.coins,
);
- await ws.db.runReadWriteTx(["peerPullDebit"], async (tx) => {
+ await ws.db.runReadWriteTx({ storeNames: ["peerPullDebit"] }, async (tx) => {
const myPpi = await tx.peerPullDebit.get(peerPullInc.peerPullDebitId);
if (!myPpi) {
return;
@@ -411,77 +429,176 @@ async function processPeerPullDebitPendingDeposit(
wex: WalletExecutionContext,
peerPullInc: PeerPullPaymentIncomingRecord,
): Promise<TaskRunResult> {
+ const ctx = new PeerPullDebitTransactionContext(
+ wex,
+ peerPullInc.peerPullDebitId,
+ );
+
const pursePub = peerPullInc.pursePub;
const coinSel = peerPullInc.coinSel;
+
if (!coinSel) {
- throw Error("invalid state, no coins selected");
- }
+ const instructedAmount = Amounts.parseOrThrow(peerPullInc.amount);
- const coins = await queryCoinInfosForSelection(wex, coinSel);
+ const coinSelRes = await selectPeerCoins(wex, {
+ instructedAmount,
+ });
+ if (logger.shouldLogTrace()) {
+ logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`);
+ }
- const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
- exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
- pursePub: peerPullInc.pursePub,
- coins,
- });
+ let coins: SelectedProspectiveCoin[] | undefined = undefined;
+
+ switch (coinSelRes.type) {
+ case "failure":
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
+ {
+ insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
+ },
+ );
+ case "prospective":
+ throw Error("insufficient balance (locked behind refresh)");
+ case "success":
+ coins = coinSelRes.result.coins;
+ break;
+ default:
+ assertUnreachable(coinSelRes);
+ }
+
+ const peerPullDebitId = peerPullInc.peerPullDebitId;
+ const totalAmount = await getTotalPeerPaymentCost(wex, coins);
+
+ // FIXME: Missing notification here!
+
+ const transitionDone = await wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "exchanges",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "refreshSessions",
+ "peerPullDebit",
+ "coinAvailability",
+ ],
+ },
+ async (tx) => {
+ const pi = await tx.peerPullDebit.get(peerPullDebitId);
+ if (!pi) {
+ return false;
+ }
+ if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return false;
+ }
+ if (pi.coinSel) {
+ return false;
+ }
+ await spendCoins(wex, tx, {
+ // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`,
+ allocationId: constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullDebitId,
+ }),
+ coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
+ contributions: coinSelRes.result.coins.map((x) =>
+ Amounts.parseOrThrow(x.contribution),
+ ),
+ refreshReason: RefreshReason.PayPeerPull,
+ });
+ pi.coinSel = {
+ coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
+ contributions: coinSelRes.result.coins.map((x) => x.contribution),
+ totalCost: Amounts.stringify(totalAmount),
+ };
+ await tx.peerPullDebit.put(pi);
+ return true;
+ },
+ );
+ if (transitionDone) {
+ return TaskRunResult.progress();
+ } else {
+ return TaskRunResult.backoff();
+ }
+ }
const purseDepositUrl = new URL(
`purses/${pursePub}/deposit`,
peerPullInc.exchangeBaseUrl,
);
- const depositPayload: ExchangePurseDeposits = {
- deposits: depositSigsResp.deposits,
- };
+ // FIXME: We could skip batches that we've already submitted.
- if (logger.shouldLogTrace()) {
- logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
- }
+ const coins = await queryCoinInfosForSelection(wex, coinSel);
- const httpResp = await wex.http.fetch(purseDepositUrl.href, {
- method: "POST",
- body: depositPayload,
- cancellationToken: wex.cancellationToken,
- });
+ const maxBatchSize = 100;
- const ctx = new PeerPullDebitTransactionContext(
- wex,
- peerPullInc.peerPullDebitId,
- );
+ for (let i = 0; i < coins.length; i += maxBatchSize) {
+ const batchSize = Math.min(maxBatchSize, coins.length - i);
- switch (httpResp.status) {
- case HttpStatusCode.Ok: {
- const resp = await readSuccessResponseJsonOrThrow(
- httpResp,
- codecForAny(),
- );
- logger.trace(`purse deposit response: ${j2s(resp)}`);
+ wex.oc.observe({
+ type: ObservabilityEventType.Message,
+ contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`,
+ });
- await ctx.transition(async (r) => {
- if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
- return TransitionResultType.Stay;
- }
- r.status = PeerPullDebitRecordStatus.Done;
- return TransitionResultType.Transition;
- });
- return TaskRunResult.finished();
- }
- case HttpStatusCode.Gone: {
- await ctx.abortTransaction();
- return TaskRunResult.backoff();
- }
- case HttpStatusCode.Conflict: {
- return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ const batchCoins = coins.slice(i, i + batchSize);
+ const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
+ exchangeBaseUrl: peerPullInc.exchangeBaseUrl,
+ pursePub: peerPullInc.pursePub,
+ coins: batchCoins,
+ });
+
+ const depositPayload: ExchangePurseDeposits = {
+ deposits: depositSigsResp.deposits,
+ };
+
+ if (logger.shouldLogTrace()) {
+ logger.trace(`purse deposit payload: ${j2s(depositPayload)}`);
}
- default: {
- const errResp = await readTalerErrorResponse(httpResp);
- return {
- type: TaskRunResultType.Error,
- errorDetail: errResp,
- };
+
+ const httpResp = await wex.http.fetch(purseDepositUrl.href, {
+ method: "POST",
+ body: depositPayload,
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (httpResp.status) {
+ case HttpStatusCode.Ok: {
+ const resp = await readSuccessResponseJsonOrThrow(
+ httpResp,
+ codecForAny(),
+ );
+ logger.trace(`purse deposit response: ${j2s(resp)}`);
+ continue;
+ }
+ case HttpStatusCode.Gone: {
+ await ctx.abortTransaction();
+ return TaskRunResult.backoff();
+ }
+ case HttpStatusCode.Conflict: {
+ return handlePurseCreationConflict(ctx, peerPullInc, httpResp);
+ }
+ default: {
+ const errResp = await readTalerErrorResponse(httpResp);
+ return {
+ type: TaskRunResultType.Error,
+ errorDetail: errResp,
+ };
+ }
}
}
+
+ // All batches succeeded, we can transition!
+
+ await ctx.transition(async (r) => {
+ if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) {
+ return TransitionResultType.Stay;
+ }
+ r.status = PeerPullDebitRecordStatus.Done;
+ return TransitionResultType.Transition;
+ });
+ return TaskRunResult.finished();
}
async function processPeerPullDebitAbortingRefresh(
@@ -496,7 +613,7 @@ async function processPeerPullDebitAbortingRefresh(
peerPullDebitId,
});
const transitionInfo = await wex.db.runReadWriteTx(
- ["peerPullDebit", "refreshGroups"],
+ { storeNames: ["peerPullDebit", "refreshGroups"] },
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPullDebitRecordStatus | undefined;
@@ -538,7 +655,7 @@ export async function processPeerPullDebit(
peerPullDebitId: string,
): Promise<TaskRunResult> {
const peerPullInc = await wex.db.runReadOnlyTx(
- ["peerPullDebit"],
+ { storeNames: ["peerPullDebit"] },
async (tx) => {
return tx.peerPullDebit.get(peerPullDebitId);
},
@@ -568,7 +685,7 @@ export async function confirmPeerPullDebit(
peerPullDebitId = parsedTx.peerPullDebitId;
const peerPullInc = await wex.db.runReadOnlyTx(
- ["peerPullDebit"],
+ { storeNames: ["peerPullDebit"] },
async (tx) => {
return tx.peerPullDebit.get(peerPullDebitId);
},
@@ -582,62 +699,77 @@ export async function confirmPeerPullDebit(
const instructedAmount = Amounts.parseOrThrow(peerPullInc.amount);
- const coinSelRes = await selectPeerCoins(wex, { instructedAmount });
+ const coinSelRes = await selectPeerCoins(wex, {
+ instructedAmount,
+ });
if (logger.shouldLogTrace()) {
logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`);
}
- if (coinSelRes.type !== "success") {
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
- {
- insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
- },
- );
+ let coins: SelectedProspectiveCoin[] | undefined = undefined;
+
+ switch (coinSelRes.type) {
+ case "failure":
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
+ {
+ insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
+ },
+ );
+ case "prospective":
+ coins = coinSelRes.result.prospectiveCoins;
+ break;
+ case "success":
+ coins = coinSelRes.result.coins;
+ break;
+ default:
+ assertUnreachable(coinSelRes);
}
- const sel = coinSelRes.result;
+ const totalAmount = await getTotalPeerPaymentCost(wex, coins);
- const totalAmount = await getTotalPeerPaymentCost(
- wex,
- coinSelRes.result.coins,
- );
+ // FIXME: Missing notification here!
await wex.db.runReadWriteTx(
- [
- "exchanges",
- "coins",
- "denominations",
- "refreshGroups",
- "peerPullDebit",
- "coinAvailability",
- ],
+ {
+ storeNames: [
+ "exchanges",
+ "coins",
+ "denominations",
+ "refreshGroups",
+ "refreshSessions",
+ "peerPullDebit",
+ "coinAvailability",
+ ],
+ },
async (tx) => {
- await spendCoins(wex, tx, {
- // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`,
- allocationId: constructTransactionIdentifier({
- tag: TransactionType.PeerPullDebit,
- peerPullDebitId,
- }),
- coinPubs: sel.coins.map((x) => x.coinPub),
- contributions: sel.coins.map((x) =>
- Amounts.parseOrThrow(x.contribution),
- ),
- refreshReason: RefreshReason.PayPeerPull,
- });
-
const pi = await tx.peerPullDebit.get(peerPullDebitId);
if (!pi) {
throw Error();
}
- if (pi.status === PeerPullDebitRecordStatus.DialogProposed) {
- pi.status = PeerPullDebitRecordStatus.PendingDeposit;
+ if (pi.status !== PeerPullDebitRecordStatus.DialogProposed) {
+ return;
+ }
+ if (coinSelRes.type == "success") {
+ await spendCoins(wex, tx, {
+ // allocationId: `txn:peer-pull-debit:${req.peerPullDebitId}`,
+ allocationId: constructTransactionIdentifier({
+ tag: TransactionType.PeerPullDebit,
+ peerPullDebitId,
+ }),
+ coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
+ contributions: coinSelRes.result.coins.map((x) =>
+ Amounts.parseOrThrow(x.contribution),
+ ),
+ refreshReason: RefreshReason.PayPeerPull,
+ });
pi.coinSel = {
- coinPubs: sel.coins.map((x) => x.coinPub),
- contributions: sel.coins.map((x) => x.contribution),
+ coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
+ contributions: coinSelRes.result.coins.map((x) => x.contribution),
totalCost: Amounts.stringify(totalAmount),
};
}
+ pi.status = PeerPullDebitRecordStatus.PendingDeposit;
await tx.peerPullDebit.put(pi);
},
);
@@ -673,7 +805,7 @@ export async function preparePeerPullDebit(
}
const existing = await wex.db.runReadOnlyTx(
- ["peerPullDebit", "contractTerms"],
+ { storeNames: ["peerPullDebit", "contractTerms"] },
async (tx) => {
const peerPullDebitRecord =
await tx.peerPullDebit.indexes.byExchangeAndContractPriv.get([
@@ -756,27 +888,37 @@ export async function preparePeerPullDebit(
const instructedAmount = Amounts.parseOrThrow(contractTerms.amount);
- const coinSelRes = await selectPeerCoins(wex, { instructedAmount });
+ const coinSelRes = await selectPeerCoins(wex, {
+ instructedAmount,
+ });
if (logger.shouldLogTrace()) {
logger.trace(`selected p2p coins (pull): ${j2s(coinSelRes)}`);
}
- if (coinSelRes.type !== "success") {
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
- {
- insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
- },
- );
+ let coins: SelectedProspectiveCoin[] | undefined = undefined;
+
+ switch (coinSelRes.type) {
+ case "failure":
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
+ {
+ insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
+ },
+ );
+ case "prospective":
+ coins = coinSelRes.result.prospectiveCoins;
+ break;
+ case "success":
+ coins = coinSelRes.result.coins;
+ break;
+ default:
+ assertUnreachable(coinSelRes);
}
- const totalAmount = await getTotalPeerPaymentCost(
- wex,
- coinSelRes.result.coins,
- );
+ const totalAmount = await getTotalPeerPaymentCost(wex, coins);
await wex.db.runReadWriteTx(
- ["peerPullDebit", "contractTerms"],
+ { storeNames: ["peerPullDebit", "contractTerms"] },
async (tx) => {
await tx.contractTerms.put({
h: contractTermsHash,