/* This file is part of GNU Taler (C) 2019 GNUnet e.V. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * Derive pending tasks from the wallet database. */ /** * Imports. */ import { PurchaseStatus, WalletStoresV1, BackupProviderStateTag, RefreshCoinStatus, OperationStatus, OperationStatusRange, PeerPushPaymentInitiationStatus, PeerPullPaymentIncomingStatus, PeerPushPaymentIncomingStatus, PeerPullPaymentInitiationStatus, } from "../db.js"; import { PendingOperationsResponse, PendingTaskType, TaskId, } from "../pending-types.js"; import { AbsoluteTime } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; import { TaskIdentifiers } from "../util/retries.js"; import { GlobalIDB } from "@gnu-taler/idb-bridge"; function getPendingCommon( ws: InternalWalletState, opTag: TaskId, timestampDue: AbsoluteTime, ): { id: TaskId; isDue: boolean; timestampDue: AbsoluteTime; isLongpolling: boolean; } { const isDue = AbsoluteTime.isExpired(timestampDue) && !ws.activeLongpoll[opTag]; return { id: opTag, isDue, timestampDue, isLongpolling: !!ws.activeLongpoll[opTag], }; } async function gatherExchangePending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ exchanges: typeof WalletStoresV1.exchanges; exchangeDetails: typeof WalletStoresV1.exchangeDetails; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { // FIXME: We should do a range query here based on the update time. await tx.exchanges.iter().forEachAsync(async (exch) => { const opTag = TaskIdentifiers.forExchangeUpdate(exch); let opr = await tx.operationRetries.get(opTag); const timestampDue = opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate); resp.pendingOperations.push({ type: PendingTaskType.ExchangeUpdate, ...getPendingCommon(ws, opTag, timestampDue), givesLifeness: false, exchangeBaseUrl: exch.baseUrl, lastError: opr?.lastError, }); // We only schedule a check for auto-refresh if the exchange update // was successful. if (!opr?.lastError) { resp.pendingOperations.push({ type: PendingTaskType.ExchangeCheckRefresh, ...getPendingCommon(ws, opTag, timestampDue), timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck), givesLifeness: false, exchangeBaseUrl: exch.baseUrl, }); } }); } async function gatherRefreshPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { const keyRange = GlobalIDB.KeyRange.bound( OperationStatusRange.ACTIVE_START, OperationStatusRange.ACTIVE_END, ); const refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll( keyRange, ); for (const r of refreshGroups) { if (r.timestampFinished) { return; } const opId = TaskIdentifiers.forRefresh(r); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Refresh, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, refreshGroupId: r.refreshGroupId, finishedPerCoin: r.statusPerCoin.map( (x) => x === RefreshCoinStatus.Finished, ), retryInfo: retryRecord?.retryInfo, }); } } async function gatherWithdrawalPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; planchets: typeof WalletStoresV1.planchets; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll( GlobalIDB.KeyRange.bound( OperationStatusRange.ACTIVE_START, OperationStatusRange.ACTIVE_END, ), ); for (const wsr of wsrs) { if (wsr.timestampFinish) { return; } const opTag = TaskIdentifiers.forWithdrawal(wsr); let opr = await tx.operationRetries.get(opTag); const now = AbsoluteTime.now(); if (!opr) { opr = { id: opTag, retryInfo: { firstTry: now, nextRetry: now, retryCounter: 0, }, }; } resp.pendingOperations.push({ type: PendingTaskType.Withdraw, ...getPendingCommon( ws, opTag, opr.retryInfo?.nextRetry ?? AbsoluteTime.now(), ), givesLifeness: true, withdrawalGroupId: wsr.withdrawalGroupId, lastError: opr.lastError, retryInfo: opr.retryInfo, }); } } async function gatherDepositPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { const dgs = await tx.depositGroups.indexes.byStatus.getAll( GlobalIDB.KeyRange.bound( OperationStatusRange.ACTIVE_START, OperationStatusRange.ACTIVE_END, ), ); for (const dg of dgs) { if (dg.timestampFinished) { return; } let deposited = true; for (const d of dg.depositedPerCoin) { if (!d) { deposited = false; } } const opId = TaskIdentifiers.forDeposit(dg); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Deposit, ...getPendingCommon(ws, opId, timestampDue), // Fully deposited operations don't give lifeness, // because there is no reason to wait on the // deposit tracking status. givesLifeness: !deposited, depositGroupId: dg.depositGroupId, lastError: retryRecord?.lastError, retryInfo: retryRecord?.retryInfo, }); } } async function gatherTipPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.tips.iter().forEachAsync(async (tip) => { // FIXME: The tip record needs a proper status field! if (tip.pickedUpTimestamp) { return; } const opId = TaskIdentifiers.forTipPickup(tip); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); if (tip.acceptedTimestamp) { resp.pendingOperations.push({ type: PendingTaskType.TipPickup, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.walletTipId, merchantTipId: tip.merchantTipId, }); } }); } async function gatherPurchasePending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { const keyRange = GlobalIDB.KeyRange.bound( OperationStatusRange.ACTIVE_START, OperationStatusRange.ACTIVE_END, ); await tx.purchases.indexes.byStatus .iter(keyRange) .forEachAsync(async (pr) => { const opId = TaskIdentifiers.forPay(pr); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Purchase, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, statusStr: PurchaseStatus[pr.purchaseStatus], proposalId: pr.proposalId, retryInfo: retryRecord?.retryInfo, lastError: retryRecord?.lastError, }); }); } async function gatherRecoupPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.recoupGroups.iter().forEachAsync(async (rg) => { if (rg.timestampFinished) { return; } const opId = TaskIdentifiers.forRecoup(rg); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Recoup, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, recoupGroupId: rg.recoupGroupId, retryInfo: retryRecord?.retryInfo, lastError: retryRecord?.lastError, }); }); } async function gatherBackupPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ backupProviders: typeof WalletStoresV1.backupProviders; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.backupProviders.iter().forEachAsync(async (bp) => { const opId = TaskIdentifiers.forBackup(bp); const retryRecord = await tx.operationRetries.get(opId); if (bp.state.tag === BackupProviderStateTag.Ready) { const timestampDue = AbsoluteTime.fromTimestamp( bp.state.nextBackupTimestamp, ); resp.pendingOperations.push({ type: PendingTaskType.Backup, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: false, backupProviderBaseUrl: bp.baseUrl, lastError: undefined, }); } else if (bp.state.tag === BackupProviderStateTag.Retrying) { const timestampDue = retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Backup, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: false, backupProviderBaseUrl: bp.baseUrl, retryInfo: retryRecord?.retryInfo, lastError: retryRecord?.lastError, }); } }); } async function gatherPeerPullInitiationPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => { if (pi.status === PeerPullPaymentInitiationStatus.DonePurseDeposited) { return; } const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.PeerPullCredit, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); }); } async function gatherPeerPullDebitPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ peerPullPaymentIncoming: typeof WalletStoresV1.peerPullPaymentIncoming; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.peerPullPaymentIncoming.iter().forEachAsync(async (pi) => { switch (pi.status) { case PeerPullPaymentIncomingStatus.Paid: return; case PeerPullPaymentIncomingStatus.Proposed: return; case PeerPullPaymentIncomingStatus.Accepted: break; } const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.PeerPullDebit, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, retryInfo: retryRecord?.retryInfo, peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId, }); }); } async function gatherPeerPushInitiationPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { if (pi.status === PeerPushPaymentInitiationStatus.Done) { return; } const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.PeerPushDebit, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, }); }); } async function gatherPeerPushCreditPending( ws: InternalWalletState, tx: GetReadOnlyAccess<{ peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming; operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise { await tx.peerPushPaymentIncoming.iter().forEachAsync(async (pi) => { switch (pi.status) { case PeerPushPaymentIncomingStatus.Proposed: return; case PeerPushPaymentIncomingStatus.Done: return; } const opId = TaskIdentifiers.forPeerPushCredit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.PeerPushCredit, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, retryInfo: retryRecord?.retryInfo, peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId, }); }); } export async function getPendingOperations( ws: InternalWalletState, ): Promise { const now = AbsoluteTime.now(); return await ws.db .mktx((x) => [ x.backupProviders, x.exchanges, x.exchangeDetails, x.refreshGroups, x.coins, x.withdrawalGroups, x.tips, x.purchases, x.planchets, x.depositGroups, x.recoupGroups, x.operationRetries, x.peerPullPaymentInitiations, x.peerPushPaymentInitiations, x.peerPullPaymentIncoming, x.peerPushPaymentIncoming, ]) .runReadWrite(async (tx) => { const resp: PendingOperationsResponse = { pendingOperations: [], }; await gatherExchangePending(ws, tx, now, resp); await gatherRefreshPending(ws, tx, now, resp); await gatherWithdrawalPending(ws, tx, now, resp); await gatherDepositPending(ws, tx, now, resp); await gatherTipPending(ws, tx, now, resp); await gatherPurchasePending(ws, tx, now, resp); await gatherRecoupPending(ws, tx, now, resp); await gatherBackupPending(ws, tx, now, resp); await gatherPeerPushInitiationPending(ws, tx, now, resp); await gatherPeerPullInitiationPending(ws, tx, now, resp); await gatherPeerPullDebitPending(ws, tx, now, resp); await gatherPeerPushCreditPending(ws, tx, now, resp); return resp; }); }