summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/deposits.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/deposits.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts343
1 files changed, 177 insertions, 166 deletions
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index decef7375..415f3cd72 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -133,25 +133,23 @@ export class DepositTransactionContext implements TransactionContext {
const ws = this.ws;
// FIXME: We should check first if we are in a final state
// where deletion is allowed.
- await ws.db
- .mktx((x) => [x.depositGroups, x.tombstones])
- .runReadWrite(async (tx) => {
- const tipRecord = await tx.depositGroups.get(depositGroupId);
- if (tipRecord) {
- await tx.depositGroups.delete(depositGroupId);
- await tx.tombstones.put({
- id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId,
- });
- }
- });
+ await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => {
+ const tipRecord = await tx.depositGroups.get(depositGroupId);
+ if (tipRecord) {
+ await tx.depositGroups.delete(depositGroupId);
+ await tx.tombstones.put({
+ id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId,
+ });
+ }
+ });
return;
}
async suspendTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -184,16 +182,17 @@ export class DepositTransactionContext implements TransactionContext {
oldTxState: oldState,
newTxState: computeDepositTransactionStatus(dg),
};
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
}
async abortTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -218,7 +217,8 @@ export class DepositTransactionContext implements TransactionContext {
return undefined;
}
return undefined;
- });
+ },
+ );
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
@@ -226,9 +226,9 @@ export class DepositTransactionContext implements TransactionContext {
async resumeTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -261,16 +261,17 @@ export class DepositTransactionContext implements TransactionContext {
oldTxState: oldState,
newTxState: computeDepositTransactionStatus(dg),
};
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
ws.taskScheduler.startShepherdTask(retryTag);
}
async failTransaction(): Promise<void> {
const { ws, depositGroupId, transactionId, taskId: retryTag } = this;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
logger.warn(
@@ -291,7 +292,8 @@ export class DepositTransactionContext implements TransactionContext {
}
}
return undefined;
- });
+ },
+ );
// FIXME: Also cancel ongoing work (via cancellation token, once implemented)
ws.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(ws, transactionId, transitionInfo);
@@ -418,9 +420,9 @@ async function waitForRefreshOnDepositGroup(
tag: TransactionType.Deposit,
depositGroupId: depositGroup.depositGroupId,
});
- const transitionInfo = await ws.db
- .mktx((x) => [x.refreshGroups, x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups", "refreshGroups"],
+ async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: DepositOperationStatus | undefined;
if (!refreshGroup) {
@@ -449,7 +451,8 @@ async function waitForRefreshOnDepositGroup(
return { oldTxState, newTxState };
}
return undefined;
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.backoff();
@@ -469,13 +472,14 @@ async function refundDepositGroup(
break;
default: {
const coinPub = depositGroup.payCoinSelection.coinPubs[i];
- const coinExchange = await ws.db
- .mktx((x) => [x.coins])
- .runReadOnly(async (tx) => {
+ const coinExchange = await ws.db.runReadOnlyTx(
+ ["coins"],
+ async (tx) => {
const coinRecord = await tx.coins.get(coinPub);
checkDbInvariant(!!coinRecord);
return coinRecord.exchangeBaseUrl;
- });
+ },
+ );
const refundAmount = depositGroup.payCoinSelection.coinContributions[i];
// We use a constant refund transaction ID, since there can
// only be one refund.
@@ -529,15 +533,15 @@ async function refundDepositGroup(
const currency = Amounts.currencyOf(depositGroup.totalPayCost);
- await ws.db
- .mktx((x) => [
- x.depositGroups,
- x.refreshGroups,
- x.coins,
- x.denominations,
- x.coinAvailability,
- ])
- .runReadWrite(async (tx) => {
+ await ws.db.runReadWriteTx(
+ [
+ "depositGroups",
+ "refreshGroups",
+ "coins",
+ "denominations",
+ "coinAvailability",
+ ],
+ async (tx) => {
const newDg = await tx.depositGroups.get(depositGroup.depositGroupId);
if (!newDg) {
return;
@@ -565,7 +569,8 @@ async function refundDepositGroup(
newDg.abortRefreshGroupId = rgid.refreshGroupId;
}
await tx.depositGroups.put(newDg);
- });
+ },
+ );
return TaskRunResult.backoff();
}
@@ -622,9 +627,9 @@ async function processDepositGroupPendingKyc(
// remove after the exchange is fixed or clarified
kycStatusRes.status === HttpStatusCode.NoContent
) {
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const newDg = await tx.depositGroups.get(depositGroupId);
if (!newDg) {
return;
@@ -637,7 +642,8 @@ async function processDepositGroupPendingKyc(
const newTxState = computeDepositTransactionStatus(newDg);
await tx.depositGroups.put(newDg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
} else if (kycStatusRes.status === HttpStatusCode.Accepted) {
// FIXME: Do we have to update the URL here?
@@ -680,9 +686,9 @@ async function transitionToKycRequired(
} else if (kycStatusReq.status === HttpStatusCode.Accepted) {
const kycStatus = await kycStatusReq.json();
logger.info(`kyc status: ${j2s(kycStatus)}`);
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -700,7 +706,8 @@ async function transitionToKycRequired(
await tx.depositGroups.put(dg);
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.finished();
} else {
@@ -717,13 +724,14 @@ async function processDepositGroupPendingTrack(
for (let i = 0; i < depositGroup.statusPerCoin.length; i++) {
const coinPub = depositGroup.payCoinSelection.coinPubs[i];
// FIXME: Make the URL part of the coin selection?
- const exchangeBaseUrl = await ws.db
- .mktx((x) => [x.coins])
- .runReadWrite(async (tx) => {
+ const exchangeBaseUrl = await ws.db.runReadWriteTx(
+ ["coins"],
+ async (tx) => {
const coinRecord = await tx.coins.get(coinPub);
checkDbInvariant(!!coinRecord);
return coinRecord.exchangeBaseUrl;
- });
+ },
+ );
let updatedTxStatus: DepositElementStatus | undefined = undefined;
let newWiredCoin:
@@ -793,41 +801,39 @@ async function processDepositGroupPendingTrack(
}
if (updatedTxStatus !== undefined) {
- await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return;
- }
- if (updatedTxStatus !== undefined) {
- dg.statusPerCoin[i] = updatedTxStatus;
- }
- if (newWiredCoin) {
- /**
- * FIXME: if there is a new wire information from the exchange
- * it should add up to the previous tracking states.
- *
- * This may loose information by overriding prev state.
- *
- * And: add checks to integration tests
- */
- if (!dg.trackingState) {
- dg.trackingState = {};
- }
-
- dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+ await ws.db.runReadWriteTx(["depositGroups"], async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ if (updatedTxStatus !== undefined) {
+ dg.statusPerCoin[i] = updatedTxStatus;
+ }
+ if (newWiredCoin) {
+ /**
+ * FIXME: if there is a new wire information from the exchange
+ * it should add up to the previous tracking states.
+ *
+ * This may loose information by overriding prev state.
+ *
+ * And: add checks to integration tests
+ */
+ if (!dg.trackingState) {
+ dg.trackingState = {};
}
- await tx.depositGroups.put(dg);
- });
+
+ dg.trackingState[newWiredCoin.id] = newWiredCoin.value;
+ }
+ await tx.depositGroups.put(dg);
+ });
}
}
let allWired = true;
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -848,7 +854,8 @@ async function processDepositGroupPendingTrack(
}
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Deposit,
depositGroupId,
@@ -869,11 +876,12 @@ async function processDepositGroupPendingDeposit(
): Promise<TaskRunResult> {
logger.info("processing deposit group in pending(deposit)");
const depositGroupId = depositGroup.depositGroupId;
- const contractTermsRec = await ws.db
- .mktx((x) => [x.contractTerms])
- .runReadOnly(async (tx) => {
+ const contractTermsRec = await ws.db.runReadOnlyTx(
+ ["contractTerms"],
+ async (tx) => {
return tx.contractTerms.get(depositGroup.contractTermsHash);
- });
+ },
+ );
if (!contractTermsRec) {
throw Error("contract terms for deposit not found in database");
}
@@ -954,27 +962,25 @@ async function processDepositGroupPendingDeposit(
codecForBatchDepositSuccess(),
);
- await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
- const dg = await tx.depositGroups.get(depositGroupId);
- if (!dg) {
- return;
- }
- for (const batchIndex of batchIndexes) {
- const coinStatus = dg.statusPerCoin[batchIndex];
- switch (coinStatus) {
- case DepositElementStatus.DepositPending:
- dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
- await tx.depositGroups.put(dg);
- }
+ await ws.db.runReadWriteTx(["depositGroups"], async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ for (const batchIndex of batchIndexes) {
+ const coinStatus = dg.statusPerCoin[batchIndex];
+ switch (coinStatus) {
+ case DepositElementStatus.DepositPending:
+ dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking;
+ await tx.depositGroups.put(dg);
}
- });
+ }
+ });
}
- const transitionInfo = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadWrite(async (tx) => {
+ const transitionInfo = await ws.db.runReadWriteTx(
+ ["depositGroups"],
+ async (tx) => {
const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return undefined;
@@ -984,7 +990,8 @@ async function processDepositGroupPendingDeposit(
await tx.depositGroups.put(dg);
const newTxState = computeDepositTransactionStatus(dg);
return { oldTxState, newTxState };
- });
+ },
+ );
notifyTransition(ws, transactionId, transitionInfo);
return TaskRunResult.progress();
@@ -998,11 +1005,12 @@ export async function processDepositGroup(
depositGroupId: string,
cancellationToken: CancellationToken,
): Promise<TaskRunResult> {
- const depositGroup = await ws.db
- .mktx((x) => [x.depositGroups])
- .runReadOnly(async (tx) => {
+ const depositGroup = await ws.db.runReadOnlyTx(
+ ["depositGroups"],
+ async (tx) => {
return tx.depositGroups.get(depositGroupId);
- });
+ },
+ );
if (!depositGroup) {
logger.warn(`deposit group ${depositGroupId} not found`);
return TaskRunResult.finished();
@@ -1030,15 +1038,18 @@ export async function processDepositGroup(
return TaskRunResult.finished();
}
+/**
+ * FIXME: Consider moving this to exchanges.ts.
+ */
async function getExchangeWireFee(
ws: InternalWalletState,
wireType: string,
baseUrl: string,
time: TalerProtocolTimestamp,
): Promise<WireFee> {
- const exchangeDetails = await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ const exchangeDetails = await ws.db.runReadOnlyTx(
+ ["exchangeDetails", "exchanges"],
+ async (tx) => {
const ex = await tx.exchanges.get(baseUrl);
if (!ex || !ex.detailsPointer) return undefined;
return await tx.exchangeDetails.indexes.byPointer.get([
@@ -1046,7 +1057,8 @@ async function getExchangeWireFee(
ex.detailsPointer.currency,
ex.detailsPointer.masterPublicKey,
]);
- });
+ },
+ );
if (!exchangeDetails) {
throw Error(`exchange missing: ${baseUrl}`);
@@ -1141,21 +1153,19 @@ export async function prepareDepositGroup(
const exchangeInfos: { url: string; master_pub: string }[] = [];
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- const allExchanges = await tx.exchanges.iter().toArray();
- for (const e of allExchanges) {
- const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
- if (!details || amount.currency !== details.currency) {
- continue;
- }
- exchangeInfos.push({
- master_pub: details.masterPublicKey,
- url: e.baseUrl,
- });
+ await ws.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
+ if (!details || amount.currency !== details.currency) {
+ continue;
}
- });
+ exchangeInfos.push({
+ master_pub: details.masterPublicKey,
+ url: e.baseUrl,
+ });
+ }
+ });
const now = AbsoluteTime.now();
const nowRounded = AbsoluteTime.toProtocolTimestamp(now);
@@ -1255,21 +1265,19 @@ export async function createDepositGroup(
const exchangeInfos: { url: string; master_pub: string }[] = [];
- await ws.db
- .mktx((x) => [x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
- const allExchanges = await tx.exchanges.iter().toArray();
- for (const e of allExchanges) {
- const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
- if (!details || amount.currency !== details.currency) {
- continue;
- }
- exchangeInfos.push({
- master_pub: details.masterPublicKey,
- url: e.baseUrl,
- });
+ await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeWireDetailsInTx(tx, e.baseUrl);
+ if (!details || amount.currency !== details.currency) {
+ continue;
}
- });
+ exchangeInfos.push({
+ master_pub: details.masterPublicKey,
+ url: e.baseUrl,
+ });
+ }
+ });
const now = AbsoluteTime.now();
const wireDeadline = AbsoluteTime.toProtocolTimestamp(
@@ -1388,17 +1396,17 @@ export async function createDepositGroup(
const ctx = new DepositTransactionContext(ws, depositGroupId);
const transactionId = ctx.transactionId;
- const newTxState = await ws.db
- .mktx((x) => [
- x.depositGroups,
- x.coins,
- x.recoupGroups,
- x.denominations,
- x.refreshGroups,
- x.coinAvailability,
- x.contractTerms,
- ])
- .runReadWrite(async (tx) => {
+ const newTxState = await ws.db.runReadWriteTx(
+ [
+ "depositGroups",
+ "coins",
+ "recoupGroups",
+ "denominations",
+ "refreshGroups",
+ "coinAvailability",
+ "contractTerms",
+ ],
+ async (tx) => {
await spendCoins(ws, tx, {
allocationId: transactionId,
coinPubs: payCoinSel.coinSel.coinPubs,
@@ -1413,7 +1421,8 @@ export async function createDepositGroup(
h: contractTermsHash,
});
return computeDepositTransactionStatus(depositGroup);
- });
+ },
+ );
ws.notify({
type: NotificationType.TransactionStateTransition,
@@ -1450,9 +1459,9 @@ export async function getCounterpartyEffectiveDepositAmount(
const fees: AmountJson[] = [];
const exchangeSet: Set<string> = new Set();
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ ["coins", "denominations", "exchangeDetails", "exchanges"],
+ async (tx) => {
for (let i = 0; i < pcs.coinPubs.length; i++) {
const coin = await tx.coins.get(pcs.coinPubs[i]);
if (!coin) {
@@ -1495,7 +1504,8 @@ export async function getCounterpartyEffectiveDepositAmount(
fees.push(Amounts.parseOrThrow(fee));
}
}
- });
+ },
+ );
return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount;
}
@@ -1515,9 +1525,9 @@ async function getTotalFeesForDepositAmount(
const exchangeSet: Set<string> = new Set();
const currency = Amounts.currencyOf(total);
- await ws.db
- .mktx((x) => [x.coins, x.denominations, x.exchanges, x.exchangeDetails])
- .runReadOnly(async (tx) => {
+ await ws.db.runReadOnlyTx(
+ ["coins", "denominations", "exchanges", "exchangeDetails"],
+ async (tx) => {
for (let i = 0; i < pcs.coinPubs.length; i++) {
const coin = await tx.coins.get(pcs.coinPubs[i]);
if (!coin) {
@@ -1575,7 +1585,8 @@ async function getTotalFeesForDepositAmount(
wireFee.push(Amounts.parseOrThrow(fee));
}
}
- });
+ },
+ );
return {
coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount),