summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-02-27 17:39:58 +0100
committerFlorian Dold <florian@dold.me>2024-02-27 17:40:03 +0100
commit523280b3862b528512ff93c651bc0d9ed632fbf6 (patch)
treeb99f866db59b572685c8c7215136270e22210ca2 /packages/taler-wallet-core/src/pay-peer-pull-credit.ts
parent3a889c177dd35a114d2c95efd296274cd185ce52 (diff)
downloadwallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.tar.gz
wallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.tar.bz2
wallet-core-523280b3862b528512ff93c651bc0d9ed632fbf6.zip
wallet-core: thread through wallet execution context
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-pull-credit.ts')
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-credit.ts157
1 files changed, 70 insertions, 87 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index 7774dfd5f..c999a8d1f 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -17,7 +17,6 @@
import {
AbsoluteTime,
Amounts,
- CancellationToken,
CheckPeerPullCreditRequest,
CheckPeerPullCreditResponse,
ContractTermsUtil,
@@ -81,7 +80,7 @@ import {
constructTransactionIdentifier,
notifyTransition,
} from "./transactions.js";
-import { InternalWalletState } from "./wallet.js";
+import { WalletExecutionContext } from "./wallet.js";
import {
getExchangeWithdrawalInfo,
internalCreateWithdrawalGroup,
@@ -94,7 +93,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
readonly taskId: TaskIdStr;
constructor(
- public ws: InternalWalletState,
+ public wex: WalletExecutionContext,
public pursePub: string,
) {
this.taskId = constructTaskIdentifier({
@@ -108,7 +107,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async deleteTransaction(): Promise<void> {
- const { ws, pursePub } = this;
+ const { wex: ws, pursePub } = this;
await ws.db.runReadWriteTx(
["withdrawalGroups", "peerPullCredit", "tombstones"],
async (tx) => {
@@ -138,7 +137,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async suspendTransaction(): Promise<void> {
- const { ws, pursePub, taskId: retryTag, transactionId } = this;
+ const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -198,7 +197,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async failTransaction(): Promise<void> {
- const { ws, pursePub, taskId: retryTag, transactionId } = this;
+ const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -249,7 +248,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async resumeTransaction(): Promise<void> {
- const { ws, pursePub, taskId: retryTag, transactionId } = this;
+ const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -308,7 +307,7 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async abortTransaction(): Promise<void> {
- const { ws, pursePub, taskId: retryTag, transactionId } = this;
+ const { wex: ws, pursePub, taskId: retryTag, transactionId } = this;
const transitionInfo = await ws.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
@@ -364,9 +363,8 @@ export class PeerPullCreditTransactionContext implements TransactionContext {
}
async function queryPurseForPeerPullCredit(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pullIni: PeerPullCreditRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const purseDepositUrl = new URL(
`purses/${pullIni.pursePub}/deposit`,
@@ -374,9 +372,9 @@ async function queryPurseForPeerPullCredit(
);
purseDepositUrl.searchParams.set("timeout_ms", "30000");
logger.info(`querying purse status via ${purseDepositUrl.href}`);
- const resp = await ws.http.fetch(purseDepositUrl.href, {
+ const resp = await wex.http.fetch(purseDepositUrl.href, {
timeout: { d_ms: 60000 },
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -388,7 +386,7 @@ async function queryPurseForPeerPullCredit(
switch (resp.status) {
case HttpStatusCode.Gone: {
// Exchange says that purse doesn't exist anymore => expired!
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
@@ -405,7 +403,7 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
case HttpStatusCode.NotFound:
@@ -427,7 +425,7 @@ async function queryPurseForPeerPullCredit(
return TaskRunResult.backoff();
}
- const reserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => {
+ const reserve = await wex.db.runReadOnlyTx(["reserves"], async (tx) => {
return await tx.reserves.get(pullIni.mergeReserveRowId);
});
@@ -435,7 +433,7 @@ async function queryPurseForPeerPullCredit(
throw Error("reserve for peer pull credit not found in wallet DB");
}
- await internalCreateWithdrawalGroup(ws, {
+ await internalCreateWithdrawalGroup(wex, {
amount: Amounts.parseOrThrow(pullIni.amount),
wgInfo: {
withdrawalType: WithdrawalRecordType.PeerPullCredit,
@@ -449,7 +447,7 @@ async function queryPurseForPeerPullCredit(
pub: reserve.reservePub,
},
});
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const finPi = await tx.peerPullCredit.get(pullIni.pursePub);
@@ -466,17 +464,16 @@ async function queryPurseForPeerPullCredit(
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
async function longpollKycStatus(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pursePub: string,
exchangeUrl: string,
kycInfo: KycPendingInfo,
userType: KycUserType,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -488,9 +485,9 @@ async function longpollKycStatus(
);
url.searchParams.set("timeout_ms", "10000");
logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
+ const kycStatusRes = await wex.http.fetch(url.href, {
method: "GET",
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
if (
kycStatusRes.status === HttpStatusCode.Ok ||
@@ -498,7 +495,7 @@ async function longpollKycStatus(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const peerIni = await tx.peerPullCredit.get(pursePub);
@@ -517,7 +514,7 @@ async function longpollKycStatus(
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.progress();
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
return TaskRunResult.longpollReturnedPending();
@@ -527,9 +524,8 @@ async function longpollKycStatus(
}
async function processPeerPullCreditAbortingDeletePurse(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerPullIni: PeerPullCreditRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const { pursePub, pursePriv } = peerPullIni;
const transactionId = constructTransactionIdentifier({
@@ -537,20 +533,20 @@ async function processPeerPullCreditAbortingDeletePurse(
pursePub,
});
- const sigResp = await ws.cryptoApi.signDeletePurse({
+ const sigResp = await wex.cryptoApi.signDeletePurse({
pursePriv,
});
const purseUrl = new URL(`purses/${pursePub}`, peerPullIni.exchangeBaseUrl);
- const resp = await ws.http.fetch(purseUrl.href, {
+ const resp = await wex.http.fetch(purseUrl.href, {
method: "DELETE",
headers: {
"taler-purse-signature": sigResp.sig,
},
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
logger.info(`deleted purse with response status ${resp.status}`);
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
[
"peerPullCredit",
"refreshGroups",
@@ -576,13 +572,13 @@ async function processPeerPullCreditAbortingDeletePurse(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
async function handlePeerPullCreditWithdrawing(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pullIni: PeerPullCreditRecord,
): Promise<TaskRunResult> {
if (!pullIni.withdrawalGroupId) {
@@ -594,7 +590,7 @@ async function handlePeerPullCreditWithdrawing(
});
const wgId = pullIni.withdrawalGroupId;
let finished: boolean = false;
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit", "withdrawalGroups"],
async (tx) => {
const ppi = await tx.peerPullCredit.get(pullIni.pursePub);
@@ -627,7 +623,7 @@ async function handlePeerPullCreditWithdrawing(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
if (finished) {
return TaskRunResult.finished();
} else {
@@ -637,13 +633,12 @@ async function handlePeerPullCreditWithdrawing(
}
async function handlePeerPullCreditCreatePurse(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pullIni: PeerPullCreditRecord,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount));
const pursePub = pullIni.pursePub;
- const mergeReserve = await ws.db.runReadOnlyTx(["reserves"], async (tx) => {
+ const mergeReserve = await wex.db.runReadOnlyTx(["reserves"], async (tx) => {
return tx.reserves.get(pullIni.mergeReserveRowId);
});
@@ -651,7 +646,7 @@ async function handlePeerPullCreditCreatePurse(
throw Error("merge reserve for peer pull payment not found in database");
}
- const contractTermsRecord = await ws.db.runReadOnlyTx(
+ const contractTermsRecord = await wex.db.runReadOnlyTx(
["contractTerms"],
async (tx) => {
return tx.contractTerms.get(pullIni.contractTermsHash);
@@ -669,7 +664,7 @@ async function handlePeerPullCreditCreatePurse(
mergeReserve.reservePub,
);
- const econtractResp = await ws.cryptoApi.encryptContractForDeposit({
+ const econtractResp = await wex.cryptoApi.encryptContractForDeposit({
contractPriv: pullIni.contractPriv,
contractPub: pullIni.contractPub,
contractTerms: contractTermsRecord.contractTermsRaw,
@@ -681,7 +676,7 @@ async function handlePeerPullCreditCreatePurse(
const mergeTimestamp = timestampPreciseFromDb(pullIni.mergeTimestamp);
const purseExpiration = contractTerms.purse_expiration;
- const sigRes = await ws.cryptoApi.signReservePurseCreate({
+ const sigRes = await wex.cryptoApi.signReservePurseCreate({
contractTermsHash: pullIni.contractTermsHash,
flags: WalletAccountMergeFlags.CreateWithPurseFee,
mergePriv: pullIni.mergePriv,
@@ -717,22 +712,17 @@ async function handlePeerPullCreditCreatePurse(
pullIni.exchangeBaseUrl,
);
- const httpResp = await ws.http.fetch(reservePurseMergeUrl.href, {
+ const httpResp = await wex.http.fetch(reservePurseMergeUrl.href, {
method: "POST",
body: reservePurseReqBody,
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
if (httpResp.status === HttpStatusCode.UnavailableForLegalReasons) {
const respJson = await httpResp.json();
const kycPending = codecForWalletKycUuid().decode(respJson);
logger.info(`kyc uuid response: ${j2s(kycPending)}`);
- return processPeerPullCreditKycRequired(
- ws,
- pullIni,
- kycPending,
- cancellationToken,
- );
+ return processPeerPullCreditKycRequired(wex, pullIni, kycPending);
}
const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny());
@@ -744,7 +734,7 @@ async function handlePeerPullCreditCreatePurse(
pursePub: pullIni.pursePub,
});
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const pi2 = await tx.peerPullCredit.get(pursePub);
@@ -758,16 +748,15 @@ async function handlePeerPullCreditCreatePurse(
return { oldTxState, newTxState };
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
export async function processPeerPullCredit(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
pursePub: string,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const pullIni = await ws.db.runReadOnlyTx(["peerPullCredit"], async (tx) => {
+ const pullIni = await wex.db.runReadOnlyTx(["peerPullCredit"], async (tx) => {
return tx.peerPullCredit.get(pursePub);
});
if (!pullIni) {
@@ -786,30 +775,25 @@ export async function processPeerPullCredit(
return TaskRunResult.finished();
}
case PeerPullPaymentCreditStatus.PendingReady:
- return queryPurseForPeerPullCredit(ws, pullIni, cancellationToken);
+ return queryPurseForPeerPullCredit(wex, pullIni);
case PeerPullPaymentCreditStatus.PendingMergeKycRequired: {
if (!pullIni.kycInfo) {
throw Error("invalid state, kycInfo required");
}
return await longpollKycStatus(
- ws,
+ wex,
pursePub,
pullIni.exchangeBaseUrl,
pullIni.kycInfo,
"individual",
- cancellationToken,
);
}
case PeerPullPaymentCreditStatus.PendingCreatePurse:
- return handlePeerPullCreditCreatePurse(ws, pullIni, cancellationToken);
+ return handlePeerPullCreditCreatePurse(wex, pullIni);
case PeerPullPaymentCreditStatus.AbortingDeletePurse:
- return await processPeerPullCreditAbortingDeletePurse(
- ws,
- pullIni,
- cancellationToken,
- );
+ return await processPeerPullCreditAbortingDeletePurse(wex, pullIni);
case PeerPullPaymentCreditStatus.PendingWithdrawing:
- return handlePeerPullCreditWithdrawing(ws, pullIni);
+ return handlePeerPullCreditWithdrawing(wex, pullIni);
case PeerPullPaymentCreditStatus.Aborted:
case PeerPullPaymentCreditStatus.Failed:
case PeerPullPaymentCreditStatus.Expired:
@@ -827,10 +811,9 @@ export async function processPeerPullCredit(
}
async function processPeerPullCreditKycRequired(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
peerIni: PeerPullCreditRecord,
kycPending: WalletKycUuid,
- cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
@@ -845,9 +828,9 @@ async function processPeerPullCreditKycRequired(
);
logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await ws.http.fetch(url.href, {
+ const kycStatusRes = await wex.http.fetch(url.href, {
method: "GET",
- cancellationToken,
+ cancellationToken: wex.cancellationToken,
});
if (
@@ -861,7 +844,7 @@ async function processPeerPullCreditKycRequired(
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusRes.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const { transitionInfo, result } = await ws.db.runReadWriteTx(
+ const { transitionInfo, result } = await wex.db.runReadWriteTx(
["peerPullCredit"],
async (tx) => {
const peerInc = await tx.peerPullCredit.get(pursePub);
@@ -897,7 +880,7 @@ async function processPeerPullCreditKycRequired(
};
},
);
- notifyTransition(ws, transactionId, transitionInfo);
+ notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
} else {
throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`);
@@ -908,7 +891,7 @@ async function processPeerPullCreditKycRequired(
* Check fees and available exchanges for a peer push payment initiation.
*/
export async function checkPeerPullPaymentInitiation(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: CheckPeerPullCreditRequest,
): Promise<CheckPeerPullCreditResponse> {
// FIXME: We don't support exchanges with purse fees yet.
@@ -922,7 +905,7 @@ export async function checkPeerPullPaymentInitiation(
if (req.exchangeBaseUrl) {
exchangeUrl = req.exchangeBaseUrl;
} else {
- exchangeUrl = await getPreferredExchangeForCurrency(ws, currency);
+ exchangeUrl = await getPreferredExchangeForCurrency(wex, currency);
}
if (!exchangeUrl) {
@@ -932,7 +915,7 @@ export async function checkPeerPullPaymentInitiation(
logger.trace(`found ${exchangeUrl} as preferred exchange`);
const wi = await getExchangeWithdrawalInfo(
- ws,
+ wex,
exchangeUrl,
Amounts.parseOrThrow(req.amount),
undefined,
@@ -957,12 +940,12 @@ export async function checkPeerPullPaymentInitiation(
* Find a preferred exchange based on when we withdrew last from this exchange.
*/
async function getPreferredExchangeForCurrency(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
currency: string,
): Promise<string | undefined> {
// Find an exchange with the matching currency.
// Prefer exchanges with the most recent withdrawal.
- const url = await ws.db.runReadOnlyTx(["exchanges"], async (tx) => {
+ const url = await wex.db.runReadOnlyTx(["exchanges"], async (tx) => {
const exchanges = await tx.exchanges.iter().toArray();
let candidate = undefined;
for (const e of exchanges) {
@@ -1005,7 +988,7 @@ async function getPreferredExchangeForCurrency(
* Initiate a peer pull payment.
*/
export async function initiatePeerPullPayment(
- ws: InternalWalletState,
+ wex: WalletExecutionContext,
req: InitiatePeerPullCreditRequest,
): Promise<InitiatePeerPullCreditResponse> {
const currency = Amounts.currencyOf(req.partialContractTerms.amount);
@@ -1013,7 +996,7 @@ export async function initiatePeerPullPayment(
if (req.exchangeBaseUrl) {
maybeExchangeBaseUrl = req.exchangeBaseUrl;
} else {
- maybeExchangeBaseUrl = await getPreferredExchangeForCurrency(ws, currency);
+ maybeExchangeBaseUrl = await getPreferredExchangeForCurrency(wex, currency);
}
if (!maybeExchangeBaseUrl) {
@@ -1022,20 +1005,20 @@ export async function initiatePeerPullPayment(
const exchangeBaseUrl = maybeExchangeBaseUrl;
- await fetchFreshExchange(ws, exchangeBaseUrl);
+ await fetchFreshExchange(wex, exchangeBaseUrl);
- const mergeReserveInfo = await getMergeReserveInfo(ws, {
+ const mergeReserveInfo = await getMergeReserveInfo(wex, {
exchangeBaseUrl: exchangeBaseUrl,
});
- const pursePair = await ws.cryptoApi.createEddsaKeypair({});
- const mergePair = await ws.cryptoApi.createEddsaKeypair({});
+ const pursePair = await wex.cryptoApi.createEddsaKeypair({});
+ const mergePair = await wex.cryptoApi.createEddsaKeypair({});
const contractTerms = req.partialContractTerms;
const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms);
- const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({});
+ const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({});
const withdrawalGroupId = encodeCrock(getRandomBytes(32));
@@ -1045,7 +1028,7 @@ export async function initiatePeerPullPayment(
const contractEncNonce = encodeCrock(getRandomBytes(24));
const wi = await getExchangeWithdrawalInfo(
- ws,
+ wex,
exchangeBaseUrl,
Amounts.parseOrThrow(req.partialContractTerms.amount),
undefined,
@@ -1053,7 +1036,7 @@ export async function initiatePeerPullPayment(
const mergeTimestamp = TalerPreciseTimestamp.now();
- const transitionInfo = await ws.db.runReadWriteTx(
+ const transitionInfo = await wex.db.runReadWriteTx(
["peerPullCredit", "contractTerms"],
async (tx) => {
const ppi: PeerPullCreditRecord = {
@@ -1086,16 +1069,16 @@ export async function initiatePeerPullPayment(
},
);
- const ctx = new PeerPullCreditTransactionContext(ws, pursePair.pub);
+ const ctx = new PeerPullCreditTransactionContext(wex, pursePair.pub);
// The pending-incoming balance has changed.
- ws.notify({
+ wex.ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: ctx.transactionId,
});
- notifyTransition(ws, ctx.transactionId, transitionInfo);
- ws.taskScheduler.startShepherdTask(ctx.taskId);
+ notifyTransition(wex, ctx.transactionId, transitionInfo);
+ wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
talerUri: stringifyTalerUri({