summaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/reserves.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/reserves.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/reserves.ts334
1 files changed, 202 insertions, 132 deletions
diff --git a/packages/taler-wallet-core/src/operations/reserves.ts b/packages/taler-wallet-core/src/operations/reserves.ts
index a2482db70..73975fb03 100644
--- a/packages/taler-wallet-core/src/operations/reserves.ts
+++ b/packages/taler-wallet-core/src/operations/reserves.ts
@@ -34,11 +34,11 @@ import {
} from "@gnu-taler/taler-util";
import { randomBytes } from "../crypto/primitives/nacl-fast.js";
import {
- Stores,
ReserveRecordStatus,
ReserveBankInfo,
ReserveRecord,
WithdrawalGroupRecord,
+ WalletStoresV1,
} from "../db.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { canonicalizeBaseUrl } from "@gnu-taler/taler-util";
@@ -65,9 +65,13 @@ import {
import { getExchangeTrust } from "./currencies.js";
import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto.js";
import { Logger } from "@gnu-taler/taler-util";
-import { readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, throwUnexpectedRequestError } from "../util/http.js";
+import {
+ readSuccessResponseJsonOrErrorCode,
+ readSuccessResponseJsonOrThrow,
+ throwUnexpectedRequestError,
+} from "../util/http.js";
import { URL } from "../util/url.js";
-import { TransactionHandle } from "../util/query.js";
+import { GetReadOnlyAccess } from "../util/query.js";
const logger = new Logger("reserves.ts");
@@ -75,12 +79,17 @@ async function resetReserveRetry(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- await ws.db.mutate(Stores.reserves, reservePub, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.reserves.get(reservePub);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.reserves.put(x);
+ }
+ });
}
/**
@@ -157,17 +166,20 @@ export async function createReserve(
exchangeInfo.exchange,
);
- const resp = await ws.db.runWithWriteTransaction(
- [Stores.exchangeTrustStore, Stores.reserves, Stores.bankWithdrawUris],
- async (tx) => {
+ const resp = await ws.db
+ .mktx((x) => ({
+ exchangeTrust: x.exchangeTrust,
+ reserves: x.reserves,
+ bankWithdrawUris: x.bankWithdrawUris,
+ }))
+ .runReadWrite(async (tx) => {
// Check if we have already created a reserve for that bankWithdrawStatusUrl
if (reserveRecord.bankInfo?.statusUrl) {
- const bwi = await tx.get(
- Stores.bankWithdrawUris,
+ const bwi = await tx.bankWithdrawUris.get(
reserveRecord.bankInfo.statusUrl,
);
if (bwi) {
- const otherReserve = await tx.get(Stores.reserves, bwi.reservePub);
+ const otherReserve = await tx.reserves.get(bwi.reservePub);
if (otherReserve) {
logger.trace(
"returning existing reserve for bankWithdrawStatusUri",
@@ -178,27 +190,26 @@ export async function createReserve(
};
}
}
- await tx.put(Stores.bankWithdrawUris, {
+ await tx.bankWithdrawUris.put({
reservePub: reserveRecord.reservePub,
talerWithdrawUri: reserveRecord.bankInfo.statusUrl,
});
}
if (!isAudited && !isTrusted) {
- await tx.put(Stores.exchangeTrustStore, {
+ await tx.exchangeTrust.put({
currency: reserveRecord.currency,
exchangeBaseUrl: reserveRecord.exchangeBaseUrl,
exchangeMasterPub: exchangeDetails.masterPublicKey,
uids: [encodeCrock(getRandomBytes(32))],
});
}
- await tx.put(Stores.reserves, reserveRecord);
+ await tx.reserves.put(reserveRecord);
const r: CreateReserveResponse = {
exchange: canonExchange,
reservePub: keypair.pub,
};
return r;
- },
- );
+ });
if (reserveRecord.reservePub === resp.reservePub) {
// Only emit notification when a new reserve was created.
@@ -224,23 +235,27 @@ export async function forceQueryReserve(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => {
- const reserve = await tx.get(Stores.reserves, reservePub);
- if (!reserve) {
- return;
- }
- // Only force status query where it makes sense
- switch (reserve.reserveStatus) {
- case ReserveRecordStatus.DORMANT:
- reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
- break;
- default:
- reserve.requestedQuery = true;
- break;
- }
- reserve.retryInfo = initRetryInfo();
- await tx.put(Stores.reserves, reserve);
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const reserve = await tx.reserves.get(reservePub);
+ if (!reserve) {
+ return;
+ }
+ // Only force status query where it makes sense
+ switch (reserve.reserveStatus) {
+ case ReserveRecordStatus.DORMANT:
+ reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ break;
+ default:
+ reserve.requestedQuery = true;
+ break;
+ }
+ reserve.retryInfo = initRetryInfo();
+ await tx.reserves.put(reserve);
+ });
await processReserve(ws, reservePub, true);
}
@@ -270,7 +285,13 @@ async function registerReserveWithBank(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return await tx.reserves.get(reservePub);
+ });
switch (reserve?.reserveStatus) {
case ReserveRecordStatus.WAIT_CONFIRM_BANK:
case ReserveRecordStatus.REGISTERING_BANK:
@@ -297,22 +318,30 @@ async function registerReserveWithBank(
httpResp,
codecForBankWithdrawalOperationPostResponse(),
);
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
return;
- }
- r.timestampReserveInfoPosted = getTimestampNow();
- r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK;
- if (!r.bankInfo) {
- throw Error("invariant failed");
- }
- r.retryInfo = initRetryInfo();
- return r;
- });
+ }
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ r.timestampReserveInfoPosted = getTimestampNow();
+ r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK;
+ if (!r.bankInfo) {
+ throw Error("invariant failed");
+ }
+ r.retryInfo = initRetryInfo();
+ await tx.reserves.put(r);
+ });
ws.notify({ type: NotificationType.ReserveRegisteredWithBank });
return processReserveBankStatus(ws, reservePub);
}
@@ -340,7 +369,13 @@ async function processReserveBankStatusImpl(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
switch (reserve?.reserveStatus) {
case ReserveRecordStatus.WAIT_CONFIRM_BANK:
case ReserveRecordStatus.REGISTERING_BANK:
@@ -363,20 +398,28 @@ async function processReserveBankStatusImpl(
if (status.aborted) {
logger.trace("bank aborted the withdrawal");
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
return;
- }
- const now = getTimestampNow();
- r.timestampBankConfirmed = now;
- r.reserveStatus = ReserveRecordStatus.BANK_ABORTED;
- r.retryInfo = initRetryInfo();
- return r;
- });
+ }
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ const now = getTimestampNow();
+ r.timestampBankConfirmed = now;
+ r.reserveStatus = ReserveRecordStatus.BANK_ABORTED;
+ r.retryInfo = initRetryInfo();
+ await tx.reserves.put(r);
+ });
return;
}
@@ -390,37 +433,40 @@ async function processReserveBankStatusImpl(
return await processReserveBankStatus(ws, reservePub);
}
- if (status.transfer_done) {
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
- return;
- }
- const now = getTimestampNow();
- r.timestampBankConfirmed = now;
- r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
- r.retryInfo = initRetryInfo();
- return r;
- });
- await processReserveImpl(ws, reservePub, true);
- } else {
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
- return;
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
+ return;
}
- if (r.bankInfo) {
- r.bankInfo.confirmUrl = status.confirm_transfer_url;
+ if (status.transfer_done) {
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ const now = getTimestampNow();
+ r.timestampBankConfirmed = now;
+ r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ r.retryInfo = initRetryInfo();
+ } else {
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ if (r.bankInfo) {
+ r.bankInfo.confirmUrl = status.confirm_transfer_url;
+ }
}
- return r;
+ await tx.reserves.put(r);
});
- await incrementReserveRetry(ws, reservePub, undefined);
- }
}
async function incrementReserveRetry(
@@ -428,19 +474,23 @@ async function incrementReserveRetry(
reservePub: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => {
- const r = await tx.get(Stores.reserves, reservePub);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- return;
- }
- r.retryInfo.retryCounter++;
- updateRetryInfoTimeout(r.retryInfo);
- r.lastError = err;
- await tx.put(Stores.reserves, r);
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.reserves.put(r);
+ });
if (err) {
ws.notify({
type: NotificationType.ReserveOperationError,
@@ -461,7 +511,13 @@ async function updateReserve(
ws: InternalWalletState,
reservePub: string,
): Promise<{ ready: boolean }> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
if (!reserve) {
throw Error("reserve not in db");
}
@@ -508,10 +564,15 @@ async function updateReserve(
reserve.exchangeBaseUrl,
);
- const newWithdrawalGroup = await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.planchets, Stores.withdrawalGroups, Stores.reserves],
- async (tx) => {
- const newReserve = await tx.get(Stores.reserves, reserve.reservePub);
+ const newWithdrawalGroup = await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ planchets: x.planchets,
+ withdrawalGroups: x.withdrawalGroups,
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const newReserve = await tx.reserves.get(reserve.reservePub);
if (!newReserve) {
return;
}
@@ -519,8 +580,8 @@ async function updateReserve(
let amountReserveMinus = Amounts.getZero(currency);
// Subtract withdrawal groups for this reserve from the available amount.
- await tx
- .iterIndexed(Stores.withdrawalGroups.byReservePub, reservePub)
+ await tx.withdrawalGroups.indexes.byReservePub
+ .iter(reservePub)
.forEach((wg) => {
const cost = wg.denomsSel.totalWithdrawCost;
amountReserveMinus = Amounts.add(amountReserveMinus, cost).amount;
@@ -549,16 +610,14 @@ async function updateReserve(
case ReserveTransactionType.Withdraw: {
// Now we check if the withdrawal transaction
// is part of any withdrawal known to this wallet.
- const planchet = await tx.getIndexed(
- Stores.planchets.coinEvHashIndex,
+ const planchet = await tx.planchets.indexes.byCoinEvHash.get(
entry.h_coin_envelope,
);
if (planchet) {
// Amount is already accounted in some withdrawal session
break;
}
- const coin = await tx.getIndexed(
- Stores.coins.coinEvHashIndex,
+ const coin = await tx.coins.indexes.byCoinEvHash.get(
entry.h_coin_envelope,
);
if (coin) {
@@ -594,7 +653,7 @@ async function updateReserve(
newReserve.reserveStatus = ReserveRecordStatus.DORMANT;
newReserve.lastError = undefined;
newReserve.retryInfo = initRetryInfo(false);
- await tx.put(Stores.reserves, newReserve);
+ await tx.reserves.put(newReserve);
return;
}
@@ -624,11 +683,10 @@ async function updateReserve(
newReserve.retryInfo = initRetryInfo(false);
newReserve.reserveStatus = ReserveRecordStatus.DORMANT;
- await tx.put(Stores.reserves, newReserve);
- await tx.put(Stores.withdrawalGroups, withdrawalRecord);
+ await tx.reserves.put(newReserve);
+ await tx.withdrawalGroups.put(withdrawalRecord);
return withdrawalRecord;
- },
- );
+ });
if (newWithdrawalGroup) {
logger.trace("processing new withdraw group");
@@ -647,7 +705,13 @@ async function processReserveImpl(
reservePub: string,
forceNow = false,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
if (!reserve) {
logger.trace("not processing reserve: reserve does not exist");
return;
@@ -712,7 +776,13 @@ export async function createTalerWithdrawReserve(
// We do this here, as the reserve should be registered before we return,
// so that we can redirect the user to the bank's status page.
await processReserveBankStatus(ws, reserve.reservePub);
- const processedReserve = await ws.db.get(Stores.reserves, reserve.reservePub);
+ const processedReserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reserve.reservePub);
+ });
if (processedReserve?.reserveStatus === ReserveRecordStatus.BANK_ABORTED) {
throw OperationFailedError.fromCode(
TalerErrorCode.WALLET_WITHDRAWAL_OPERATION_ABORTED_BY_BANK,
@@ -730,14 +800,14 @@ export async function createTalerWithdrawReserve(
* Get payto URIs needed to fund a reserve.
*/
export async function getFundingPaytoUris(
- tx: TransactionHandle<
- | typeof Stores.reserves
- | typeof Stores.exchanges
- | typeof Stores.exchangeDetails
- >,
+ tx: GetReadOnlyAccess<{
+ reserves: typeof WalletStoresV1.reserves;
+ exchanges: typeof WalletStoresV1.exchanges;
+ exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ }>,
reservePub: string,
): Promise<string[]> {
- const r = await tx.get(Stores.reserves, reservePub);
+ const r = await tx.reserves.get(reservePub);
if (!r) {
logger.error(`reserve ${reservePub} not found (DB corrupted?)`);
return [];