summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/pay-peer-push-debit.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-27 17:39:58 +0100
committerFlorian Dold <florian@dold.me>2024-02-27 17:40:03 +0100
commit523280b3862b528512ff93c651bc0d9ed632fbf6 (patch)
treeb99f866db59b572685c8c7215136270e22210ca2 /packages/taler-wallet-core/src/pay-peer-push-debit.ts
parent3a889c177dd35a114d2c95efd296274cd185ce52 (diff)
downloadwallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.tar.gz
wallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.tar.bz2
wallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.zip
wallet-core: thread through wallet execution context
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-push-debit.ts')
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts177
1 files changed, 83 insertions, 94 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index 40a5d97a4..5ee4d642b 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -16,7 +16,6 @@
import {
Amounts,
- CancellationToken,
CheckPeerPushDebitRequest,
CheckPeerPushDebitResponse,
CoinRefreshRequest,
@@ -78,7 +77,7 @@ import {
constructTransactionIdentifier,
notifyTransition,
} from "./transactions.js";
-import { InternalWalletState } from "./wallet.js";
+import { WalletExecutionContext } from "./wallet.js";
const logger = new Logger("pay-peer-push-debit.ts");
@@ -87,7 +86,7 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
readonly taskId: TaskIdStr;
constructor(
- public ws: InternalWalletState,
+ public wex: WalletExecutionContext,
public pursePub: string,
) {
this.transactionId = constructTransactionIdentifier({
@@ -101,8 +100,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async deleteTransaction(): Promise<void> {
- const { ws, pursePub, transactionId } = this;
- await ws.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => {
+ const { wex, pursePub, transactionId } = this;
+ await wex.db.runReadWriteTx(["peerPushDebit", "tombstones"], async (tx) => {
const debit = await tx.peerPushDebit.get(pursePub);
if (debit) {
await tx.peerPushDebit.delete(pursePub);
@@ -112,8 +111,8 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
@@ -165,13 +164,13 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
@@ -218,14 +217,14 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.stopShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(retryTag);
+ wex.taskScheduler.stopShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const { wex, pursePub, transactionId, taskId: retryTag } = this;
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
@@ -277,12 +276,12 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
return undefined;
},
);
- ws.taskScheduler.startShepherdTask(retryTag);
- notifyTransition(ws, transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(retryTag);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async failTransaction(): Promise<void> {
- const { ws, pursePub, transactionId, taskId: retryTag } = this;
+ const { wex: ws, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
@@ -337,14 +336,14 @@ export class PeerPushDebitTransactionContext implements TransactionContext {
}
export async function checkPeerPushDebit(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: CheckPeerPushDebitRequest,
): Promise<CheckPeerPushDebitResponse> {
const instructedAmount = Amounts.parseOrThrow(req.amount);
logger.trace(
`checking peer push debit for ${Amounts.stringify(instructedAmount)}`,
);
- const coinSelRes = await selectPeerCoins(ws, { instructedAmount });
+ const coinSelRes = await selectPeerCoins(wex, { instructedAmount });
if (coinSelRes.type === "failure") {
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
@@ -355,7 +354,7 @@ export async function checkPeerPushDebit(
}
logger.trace(`selected peer coins (len=${coinSelRes.result.coins.length})`);
const totalAmount = await getTotalPeerPaymentCost(
- ws,
+ wex,
coinSelRes.result.coins,
);
logger.trace("computed total peer payment cost");
@@ -368,13 +367,13 @@ export async function checkPeerPushDebit(
}
async function handlePurseCreationConflict(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
resp: HttpResponse,
): Promise<TaskRunResult> {
const pursePub = peerPushInitiation.pursePub;
const errResp = await readTalerErrorResponse(resp);
- const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+ const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) {
await ctx.failTransaction();
return TaskRunResult.finished();
@@ -405,7 +404,7 @@ async function handlePurseCreationConflict(
}
}
- const coinSelRes = await selectPeerCoins(ws, { instructedAmount, repair });
+ const coinSelRes = await selectPeerCoins(wex, { instructedAmount, repair });
if (coinSelRes.type == "failure") {
// FIXME: Details!
@@ -414,7 +413,7 @@ async function handlePurseCreationConflict(
);
}
- await ws.db.runReadWriteTx(["peerPushDebit"], async (tx) => {
+ await wex.db.runReadWriteTx(["peerPushDebit"], async (tx) => {
const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub);
if (!myPpi) {
return;
@@ -438,19 +437,18 @@ async function handlePurseCreationConflict(
}
async function processPeerPushDebitCreateReserve(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const pursePub = peerPushInitiation.pursePub;
const purseExpiration = peerPushInitiation.purseExpiration;
const hContractTerms = peerPushInitiation.contractTermsHash;
- const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+ const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
const transactionId = ctx.transactionId;
logger.trace(`processing ${transactionId} pending(create-reserve)`);
- const contractTermsRecord = await ws.db.runReadOnlyTx(
+ const contractTermsRecord = await wex.db.runReadOnlyTx(
["contractTerms"],
async (tx) => {
return tx.contractTerms.get(hContractTerms);
@@ -463,7 +461,7 @@ async function processPeerPushDebitCreateReserve(
);
}
- const purseSigResp = await ws.cryptoApi.signPurseCreation({
+ const purseSigResp = await wex.cryptoApi.signPurseCreation({
hContractTerms,
mergePub: peerPushInitiation.mergePub,
minAge: 0,
@@ -473,11 +471,11 @@ async function processPeerPushDebitCreateReserve(
});
const coins = await queryCoinInfosForSelection(
- ws,
+ wex,
peerPushInitiation.coinSel,
);
- const depositSigsResp = await ws.cryptoApi.signPurseDeposits({
+ const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
pursePub: peerPushInitiation.pursePub,
coins,
@@ -495,7 +493,7 @@ async function processPeerPushDebitCreateReserve(
logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`);
- const econtractResp = await ws.cryptoApi.encryptContractForMerge(
+ const econtractResp = await wex.cryptoApi.encryptContractForMerge(
encryptContractRequest,
);
@@ -517,10 +515,10 @@ async function processPeerPushDebitCreateReserve(
logger.trace(`request body: ${j2s(reqBody)}`);
- const httpResp = await ws.http.fetch(createPurseUrl.href, {
+ const httpResp = await wex.http.fetch(createPurseUrl.href, {
method: "POST",
body: reqBody,
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
{
@@ -538,7 +536,7 @@ async function processPeerPushDebitCreateReserve(
}
case HttpStatusCode.Conflict: {
// Handle double-spending
- return handlePurseCreationConflict(ws, peerPushInitiation, httpResp);
+ return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
}
default: {
const errResp = await readTalerErrorResponse(httpResp);
@@ -554,7 +552,7 @@ async function processPeerPushDebitCreateReserve(
throw Error("got error response from exchange");
}
- await transitionPeerPushDebitTransaction(ws, pursePub, {
+ await transitionPeerPushDebitTransaction(wex, pursePub, {
stFrom: PeerPushDebitStatus.PendingCreatePurse,
stTo: PeerPushDebitStatus.PendingReady,
});
@@ -563,9 +561,8 @@ async function processPeerPushDebitCreateReserve(
}
async function processPeerPushDebitAbortingDeletePurse(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { pursePub, pursePriv } = peerPushInitiation;
const transactionId = constructTransactionIdentifier({
@@ -573,23 +570,23 @@ async function processPeerPushDebitAbortingDeletePurse(
pursePub,
});
- const sigResp = await ws.cryptoApi.signDeletePurse({
+ const sigResp = await wex.cryptoApi.signDeletePurse({
pursePriv,
});
const purseUrl = new URL(
`purses/${pursePub}`,
peerPushInitiation.exchangeBaseUrl,
);
- const resp = await ws.http.fetch(purseUrl.href, {
+ const resp = await wex.http.fetch(purseUrl.href, {
method: "DELETE",
headers: {
"taler-purse-signature": sigResp.sig,
},
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"peerPushDebit",
"refreshGroups",
@@ -617,7 +614,7 @@ async function processPeerPushDebitAbortingDeletePurse(
}
const refresh = await createRefreshGroup(
- ws,
+ wex,
tx,
currency,
coinPubs,
@@ -634,7 +631,7 @@ async function processPeerPushDebitAbortingDeletePurse(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
@@ -645,7 +642,7 @@ interface SimpleTransition {
}
async function transitionPeerPushDebitTransaction(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pursePub: string,
transitionSpec: SimpleTransition,
): Promise<void> {
@@ -653,7 +650,7 @@ async function transitionPeerPushDebitTransaction(
tag: TransactionType.PeerPushDebit,
pursePub,
});
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit"],
async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
@@ -673,11 +670,11 @@ async function transitionPeerPushDebitTransaction(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
}
async function processPeerPushDebitAbortingRefreshDeleted(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise<TaskRunResult> {
const pursePub = peerPushInitiation.pursePub;
@@ -687,7 +684,7 @@ async function processPeerPushDebitAbortingRefreshDeleted(
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["refreshGroups", "peerPushDebit"],
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
@@ -720,13 +717,13 @@ async function processPeerPushDebitAbortingRefreshDeleted(
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingRefreshExpired(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise<TaskRunResult> {
const pursePub = peerPushInitiation.pursePub;
@@ -736,7 +733,7 @@ async function processPeerPushDebitAbortingRefreshExpired(
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPushDebit", "refreshGroups"],
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
@@ -769,7 +766,7 @@ async function processPeerPushDebitAbortingRefreshExpired(
return undefined;
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
}
@@ -778,9 +775,8 @@ async function processPeerPushDebitAbortingRefreshExpired(
* Process the "pending(ready)" state of a peer-push-debit transaction.
*/
async function processPeerPushDebitReady(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
logger.trace("processing peer-push-debit pending(ready)");
const pursePub = peerPushInitiation.pursePub;
@@ -794,9 +790,9 @@ async function processPeerPushDebitReady(
);
mergeUrl.searchParams.set("timeout_ms", "30000");
logger.info(`long-polling on purse status at ${mergeUrl.href}`);
- const resp = await ws.http.fetch(mergeUrl.href, {
+ const resp = await wex.http.fetch(mergeUrl.href, {
// timeout: getReserveRequestTimeout(withdrawalGroup),
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
if (resp.status === HttpStatusCode.Ok) {
const purseStatus = await readSuccessResponseJsonOrThrow(
@@ -809,7 +805,7 @@ async function processPeerPushDebitReady(
return TaskRunResult.backoff();
} else {
await transitionPeerPushDebitTransaction(
- ws,
+ wex,
peerPushInitiation.pursePub,
{
stFrom: PeerPushDebitStatus.PendingReady,
@@ -820,7 +816,7 @@ async function processPeerPushDebitReady(
}
} else if (resp.status === HttpStatusCode.Gone) {
logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"peerPushDebit",
"refreshGroups",
@@ -848,7 +844,7 @@ async function processPeerPushDebitReady(
}
const refresh = await createRefreshGroup(
- ws,
+ wex,
tx,
currency,
coinPubs,
@@ -865,7 +861,7 @@ async function processPeerPushDebitReady(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
} else {
logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
@@ -874,11 +870,10 @@ async function processPeerPushDebitReady(
}
export async function processPeerPushDebit(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pursePub: string,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const peerPushInitiation = await ws.db.runReadOnlyTx(
+ const peerPushInitiation = await wex.db.runReadOnlyTx(
["peerPushDebit"],
async (tx) => {
return tx.peerPushDebit.get(pursePub);
@@ -890,27 +885,21 @@ export async function processPeerPushDebit(
switch (peerPushInitiation.status) {
case PeerPushDebitStatus.PendingCreatePurse:
- return processPeerPushDebitCreateReserve(
- ws,
- peerPushInitiation,
- cancellationToken,
- );
+ return processPeerPushDebitCreateReserve(wex, peerPushInitiation);
case PeerPushDebitStatus.PendingReady:
- return processPeerPushDebitReady(
- ws,
- peerPushInitiation,
- cancellationToken,
- );
+ return processPeerPushDebitReady(wex, peerPushInitiation);
case PeerPushDebitStatus.AbortingDeletePurse:
- return processPeerPushDebitAbortingDeletePurse(
- ws,
+ return processPeerPushDebitAbortingDeletePurse(wex, peerPushInitiation);
+ case PeerPushDebitStatus.AbortingRefreshDeleted:
+ return processPeerPushDebitAbortingRefreshDeleted(
+ wex,
peerPushInitiation,
- cancellationToken,
);
- case PeerPushDebitStatus.AbortingRefreshDeleted:
- return processPeerPushDebitAbortingRefreshDeleted(ws, peerPushInitiation);
case PeerPushDebitStatus.AbortingRefreshExpired:
- return processPeerPushDebitAbortingRefreshExpired(ws, peerPushInitiation);
+ return processPeerPushDebitAbortingRefreshExpired(
+ wex,
+ peerPushInitiation,
+ );
default: {
const txState = computePeerPushDebitTransactionState(peerPushInitiation);
logger.warn(
@@ -926,7 +915,7 @@ export async function processPeerPushDebit(
* Initiate sending a peer-to-peer push payment.
*/
export async function initiatePeerPushDebit(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: InitiatePeerPushDebitRequest,
): Promise<InitiatePeerPushDebitResponse> {
const instructedAmount = Amounts.parseOrThrow(
@@ -935,14 +924,14 @@ export async function initiatePeerPushDebit(
const purseExpiration = req.partialContractTerms.purse_expiration;
const contractTerms = req.partialContractTerms;
- const pursePair = await ws.cryptoApi.createEddsaKeypair({});
- const mergePair = await ws.cryptoApi.createEddsaKeypair({});
+ const pursePair = await wex.cryptoApi.createEddsaKeypair({});
+ const mergePair = await wex.cryptoApi.createEddsaKeypair({});
const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms);
- const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({});
+ const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({});
- const coinSelRes = await selectPeerCoins(ws, { instructedAmount });
+ const coinSelRes = await selectPeerCoins(wex, { instructedAmount });
if (coinSelRes.type !== "success") {
throw TalerError.fromDetail(
@@ -959,7 +948,7 @@ export async function initiatePeerPushDebit(
logger.trace(`${j2s(coinSelRes)}`);
const totalAmount = await getTotalPeerPaymentCost(
- ws,
+ wex,
coinSelRes.result.coins,
);
@@ -967,13 +956,13 @@ export async function initiatePeerPushDebit(
const pursePub = pursePair.pub;
- const ctx = new PeerPushDebitTransactionContext(ws, pursePub);
+ const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
const transactionId = ctx.transactionId;
const contractEncNonce = encodeCrock(getRandomBytes(24));
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"exchanges",
"contractTerms",
@@ -987,7 +976,7 @@ export async function initiatePeerPushDebit(
// FIXME: Instead of directly doing a spendCoin here,
// we might want to mark the coins as used and spend them
// after we've been able to create the purse.
- await spendCoins(ws, tx, {
+ await spendCoins(wex, tx, {
allocationId: constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: pursePair.pub,
@@ -1034,13 +1023,13 @@ export async function initiatePeerPushDebit(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
- ws.notify({
+ notifyTransition(wex, transactionId, transitionInfo);
+ wex.ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
});
- ws.taskScheduler.startShepherdTask(ctx.taskId);
+ wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
contractPriv: contractKeyPair.priv,