diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/deposits.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/deposits.ts | 343 |
1 files changed, 177 insertions, 166 deletions
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index decef7375..415f3cd72 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -133,25 +133,23 @@ export class DepositTransactionContext implements TransactionContext { const ws = this.ws; // FIXME: We should check first if we are in a final state // where deletion is allowed. - await ws.db - .mktx((x) => [x.depositGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const tipRecord = await tx.depositGroups.get(depositGroupId); - if (tipRecord) { - await tx.depositGroups.delete(depositGroupId); - await tx.tombstones.put({ - id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, - }); - } - }); + await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => { + const tipRecord = await tx.depositGroups.get(depositGroupId); + if (tipRecord) { + await tx.depositGroups.delete(depositGroupId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, + }); + } + }); return; } async suspendTransaction(): Promise<void> { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -184,16 +182,17 @@ export class DepositTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); } async abortTransaction(): Promise<void> { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -218,7 +217,8 @@ export class DepositTransactionContext implements TransactionContext { return undefined; } return undefined; - }); + }, + ); ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); @@ -226,9 +226,9 @@ export class DepositTransactionContext implements TransactionContext { async resumeTransaction(): Promise<void> { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -261,16 +261,17 @@ export class DepositTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeDepositTransactionStatus(dg), }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(retryTag); } async failTransaction(): Promise<void> { const { ws, depositGroupId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { logger.warn( @@ -291,7 +292,8 @@ export class DepositTransactionContext implements TransactionContext { } } return undefined; - }); + }, + ); // FIXME: Also cancel ongoing work (via cancellation token, once implemented) ws.taskScheduler.stopShepherdTask(retryTag); notifyTransition(ws, transactionId, transitionInfo); @@ -418,9 +420,9 @@ async function waitForRefreshOnDepositGroup( tag: TransactionType.Deposit, depositGroupId: depositGroup.depositGroupId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups, x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups", "refreshGroups"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); let newOpState: DepositOperationStatus | undefined; if (!refreshGroup) { @@ -449,7 +451,8 @@ async function waitForRefreshOnDepositGroup( return { oldTxState, newTxState }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.backoff(); @@ -469,13 +472,14 @@ async function refundDepositGroup( break; default: { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; - const coinExchange = await ws.db - .mktx((x) => [x.coins]) - .runReadOnly(async (tx) => { + const coinExchange = await ws.db.runReadOnlyTx( + ["coins"], + async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; - }); + }, + ); const refundAmount = depositGroup.payCoinSelection.coinContributions[i]; // We use a constant refund transaction ID, since there can // only be one refund. @@ -529,15 +533,15 @@ async function refundDepositGroup( const currency = Amounts.currencyOf(depositGroup.totalPayCost); - await ws.db - .mktx((x) => [ - x.depositGroups, - x.refreshGroups, - x.coins, - x.denominations, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "depositGroups", + "refreshGroups", + "coins", + "denominations", + "coinAvailability", + ], + async (tx) => { const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); if (!newDg) { return; @@ -565,7 +569,8 @@ async function refundDepositGroup( newDg.abortRefreshGroupId = rgid.refreshGroupId; } await tx.depositGroups.put(newDg); - }); + }, + ); return TaskRunResult.backoff(); } @@ -622,9 +627,9 @@ async function processDepositGroupPendingKyc( // remove after the exchange is fixed or clarified kycStatusRes.status === HttpStatusCode.NoContent ) { - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const newDg = await tx.depositGroups.get(depositGroupId); if (!newDg) { return; @@ -637,7 +642,8 @@ async function processDepositGroupPendingKyc( const newTxState = computeDepositTransactionStatus(newDg); await tx.depositGroups.put(newDg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } else if (kycStatusRes.status === HttpStatusCode.Accepted) { // FIXME: Do we have to update the URL here? @@ -680,9 +686,9 @@ async function transitionToKycRequired( } else if (kycStatusReq.status === HttpStatusCode.Accepted) { const kycStatus = await kycStatusReq.json(); logger.info(`kyc status: ${j2s(kycStatus)}`); - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -700,7 +706,8 @@ async function transitionToKycRequired( await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } else { @@ -717,13 +724,14 @@ async function processDepositGroupPendingTrack( for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { const coinPub = depositGroup.payCoinSelection.coinPubs[i]; // FIXME: Make the URL part of the coin selection? - const exchangeBaseUrl = await ws.db - .mktx((x) => [x.coins]) - .runReadWrite(async (tx) => { + const exchangeBaseUrl = await ws.db.runReadWriteTx( + ["coins"], + async (tx) => { const coinRecord = await tx.coins.get(coinPub); checkDbInvariant(!!coinRecord); return coinRecord.exchangeBaseUrl; - }); + }, + ); let updatedTxStatus: DepositElementStatus | undefined = undefined; let newWiredCoin: @@ -793,41 +801,39 @@ async function processDepositGroupPendingTrack( } if (updatedTxStatus !== undefined) { - await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.depositGroups.get(depositGroupId); - if (!dg) { - return; - } - if (updatedTxStatus !== undefined) { - dg.statusPerCoin[i] = updatedTxStatus; - } - if (newWiredCoin) { - /** - * FIXME: if there is a new wire information from the exchange - * it should add up to the previous tracking states. - * - * This may loose information by overriding prev state. - * - * And: add checks to integration tests - */ - if (!dg.trackingState) { - dg.trackingState = {}; - } - - dg.trackingState[newWiredCoin.id] = newWiredCoin.value; + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + if (updatedTxStatus !== undefined) { + dg.statusPerCoin[i] = updatedTxStatus; + } + if (newWiredCoin) { + /** + * FIXME: if there is a new wire information from the exchange + * it should add up to the previous tracking states. + * + * This may loose information by overriding prev state. + * + * And: add checks to integration tests + */ + if (!dg.trackingState) { + dg.trackingState = {}; } - await tx.depositGroups.put(dg); - }); + + dg.trackingState[newWiredCoin.id] = newWiredCoin.value; + } + await tx.depositGroups.put(dg); + }); } } let allWired = true; - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -848,7 +854,8 @@ async function processDepositGroupPendingTrack( } const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId, @@ -869,11 +876,12 @@ async function processDepositGroupPendingDeposit( ): Promise<TaskRunResult> { logger.info("processing deposit group in pending(deposit)"); const depositGroupId = depositGroup.depositGroupId; - const contractTermsRec = await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(async (tx) => { + const contractTermsRec = await ws.db.runReadOnlyTx( + ["contractTerms"], + async (tx) => { return tx.contractTerms.get(depositGroup.contractTermsHash); - }); + }, + ); if (!contractTermsRec) { throw Error("contract terms for deposit not found in database"); } @@ -954,27 +962,25 @@ async function processDepositGroupPendingDeposit( codecForBatchDepositSuccess(), ); - await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.depositGroups.get(depositGroupId); - if (!dg) { - return; - } - for (const batchIndex of batchIndexes) { - const coinStatus = dg.statusPerCoin[batchIndex]; - switch (coinStatus) { - case DepositElementStatus.DepositPending: - dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; - await tx.depositGroups.put(dg); - } + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + for (const batchIndex of batchIndexes) { + const coinStatus = dg.statusPerCoin[batchIndex]; + switch (coinStatus) { + case DepositElementStatus.DepositPending: + dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; + await tx.depositGroups.put(dg); } - }); + } + }); } - const transitionInfo = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { const dg = await tx.depositGroups.get(depositGroupId); if (!dg) { return undefined; @@ -984,7 +990,8 @@ async function processDepositGroupPendingDeposit( await tx.depositGroups.put(dg); const newTxState = computeDepositTransactionStatus(dg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); @@ -998,11 +1005,12 @@ export async function processDepositGroup( depositGroupId: string, cancellationToken: CancellationToken, ): Promise<TaskRunResult> { - const depositGroup = await ws.db - .mktx((x) => [x.depositGroups]) - .runReadOnly(async (tx) => { + const depositGroup = await ws.db.runReadOnlyTx( + ["depositGroups"], + async (tx) => { return tx.depositGroups.get(depositGroupId); - }); + }, + ); if (!depositGroup) { logger.warn(`deposit group ${depositGroupId} not found`); return TaskRunResult.finished(); @@ -1030,15 +1038,18 @@ export async function processDepositGroup( return TaskRunResult.finished(); } +/** + * FIXME: Consider moving this to exchanges.ts. + */ async function getExchangeWireFee( ws: InternalWalletState, wireType: string, baseUrl: string, time: TalerProtocolTimestamp, ): Promise<WireFee> { - const exchangeDetails = await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + const exchangeDetails = await ws.db.runReadOnlyTx( + ["exchangeDetails", "exchanges"], + async (tx) => { const ex = await tx.exchanges.get(baseUrl); if (!ex || !ex.detailsPointer) return undefined; return await tx.exchangeDetails.indexes.byPointer.get([ @@ -1046,7 +1057,8 @@ async function getExchangeWireFee( ex.detailsPointer.currency, ex.detailsPointer.masterPublicKey, ]); - }); + }, + ); if (!exchangeDetails) { throw Error(`exchange missing: ${baseUrl}`); @@ -1141,21 +1153,19 @@ export async function prepareDepositGroup( const exchangeInfos: { url: string; master_pub: string }[] = []; - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - const allExchanges = await tx.exchanges.iter().toArray(); - for (const e of allExchanges) { - const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); - if (!details || amount.currency !== details.currency) { - continue; - } - exchangeInfos.push({ - master_pub: details.masterPublicKey, - url: e.baseUrl, - }); + await ws.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; } - }); + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); const now = AbsoluteTime.now(); const nowRounded = AbsoluteTime.toProtocolTimestamp(now); @@ -1255,21 +1265,19 @@ export async function createDepositGroup( const exchangeInfos: { url: string; master_pub: string }[] = []; - await ws.db - .mktx((x) => [x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { - const allExchanges = await tx.exchanges.iter().toArray(); - for (const e of allExchanges) { - const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); - if (!details || amount.currency !== details.currency) { - continue; - } - exchangeInfos.push({ - master_pub: details.masterPublicKey, - url: e.baseUrl, - }); + await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; } - }); + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); const now = AbsoluteTime.now(); const wireDeadline = AbsoluteTime.toProtocolTimestamp( @@ -1388,17 +1396,17 @@ export async function createDepositGroup( const ctx = new DepositTransactionContext(ws, depositGroupId); const transactionId = ctx.transactionId; - const newTxState = await ws.db - .mktx((x) => [ - x.depositGroups, - x.coins, - x.recoupGroups, - x.denominations, - x.refreshGroups, - x.coinAvailability, - x.contractTerms, - ]) - .runReadWrite(async (tx) => { + const newTxState = await ws.db.runReadWriteTx( + [ + "depositGroups", + "coins", + "recoupGroups", + "denominations", + "refreshGroups", + "coinAvailability", + "contractTerms", + ], + async (tx) => { await spendCoins(ws, tx, { allocationId: transactionId, coinPubs: payCoinSel.coinSel.coinPubs, @@ -1413,7 +1421,8 @@ export async function createDepositGroup( h: contractTermsHash, }); return computeDepositTransactionStatus(depositGroup); - }); + }, + ); ws.notify({ type: NotificationType.TransactionStateTransition, @@ -1450,9 +1459,9 @@ export async function getCounterpartyEffectiveDepositAmount( const fees: AmountJson[] = []; const exchangeSet: Set<string> = new Set(); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchangeDetails", "exchanges"], + async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { @@ -1495,7 +1504,8 @@ export async function getCounterpartyEffectiveDepositAmount( fees.push(Amounts.parseOrThrow(fee)); } } - }); + }, + ); return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } @@ -1515,9 +1525,9 @@ async function getTotalFeesForDepositAmount( const exchangeSet: Set<string> = new Set(); const currency = Amounts.currencyOf(total); - await ws.db - .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails]) - .runReadOnly(async (tx) => { + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchanges", "exchangeDetails"], + async (tx) => { for (let i = 0; i < pcs.coinPubs.length; i++) { const coin = await tx.coins.get(pcs.coinPubs[i]); if (!coin) { @@ -1575,7 +1585,8 @@ async function getTotalFeesForDepositAmount( wireFee.push(Amounts.parseOrThrow(fee)); } } - }); + }, + ); return { coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount), |