/* This file is part of TALER (C) 2016 GNUnet e.V. TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with TALER; see the file COPYING. If not, see */ /** * Database query abstractions. * @module Query * @author Florian Dold */ import { openPromise } from "./promiseUtils"; import { join } from "path"; /** * Result of an inner join. */ export interface JoinResult { left: L; right: R; } /** * Result of a left outer join. */ export interface JoinLeftResult { left: L; right?: R; } /** * Definition of an object store. */ export class Store { constructor( public name: string, public storeParams?: IDBObjectStoreParameters, public validator?: (v: T) => T, ) {} } /** * Options for an index. */ export interface IndexOptions { /** * If true and the path resolves to an array, create an index entry for * each member of the array (instead of one index entry containing the full array). * * Defaults to false. */ multiEntry?: boolean; } /** * Definition of an index. */ export class Index { /** * Name of the store that this index is associated with. */ storeName: string; /** * Options to use for the index. */ options: IndexOptions; constructor( s: Store, public indexName: string, public keyPath: string | string[], options?: IndexOptions, ) { const defaultOptions = { multiEntry: false, }; this.options = { ...defaultOptions, ...(options || {}) }; this.storeName = s.name; } /** * We want to have the key type parameter in use somewhere, * because otherwise the compiler complains. In iterIndex the * key type is pretty useful. */ protected _dummyKey: S | undefined; } /** * Stream that can be filtered, reduced or joined * with indices. */ export interface QueryStream { /** * Join the current query with values from an index. * The left side of the join is extracted via a function from the stream's * result, the right side of the join is the key of the index. */ indexJoin( index: Index, keyFn: (obj: T) => I, ): QueryStream>; /** * Join the current query with values from an index, and keep values in the * current stream that don't have a match. The left side of the join is * extracted via a function from the stream's result, the right side of the * join is the key of the index. */ indexJoinLeft( index: Index, keyFn: (obj: T) => I, ): QueryStream>; /** * Join the current query with values from another object store. * The left side of the join is extracted via a function over the current query, * the right side of the join is the key of the object store. */ keyJoin( store: Store, keyFn: (obj: T) => I, ): QueryStream>; /** * Only keep elements in the result stream for which the predicate returns * true. */ filter(f: (x: T) => boolean): QueryStream; /** * Fold the stream, resulting in a single value. */ fold(f: (v: T, acc: S) => S, start: S): Promise; /** * Execute a function for every value of the stream, for the * side-effects of the function. */ forEach(f: (v: T) => void): Promise; /** * Map each element of the stream using a function, resulting in another * stream of a different type. */ map(f: (x: T) => S): QueryStream; /** * Map each element of the stream to a potentially empty array, and collect * the result in a stream of the flattened arrays. */ flatMap(f: (x: T) => S[]): QueryStream; /** * Collect the stream into an array and return a promise for it. */ toArray(): Promise; /** * Get the first value of the stream. */ first(): QueryValue; /** * Run the query without returning a result. * Useful for queries with side effects. */ run(): Promise; } /** * Query result that consists of at most one value. */ export interface QueryValue { /** * Apply a function to a query value. */ map(f: (x: T) => S): QueryValue; /** * Conditionally execute either of two queries based * on a property of this query value. * * Useful to properly implement complex queries within a transaction (as * opposed to just computing the conditional and then executing either * branch). This is necessary since IndexedDB does not allow long-lived * transactions. */ cond( f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R, ): Promise; } abstract class BaseQueryValue implements QueryValue { constructor(public root: QueryRoot) {} map(f: (x: T) => S): QueryValue { return new MapQueryValue(this, f); } cond( f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R, ): Promise { return new Promise((resolve, reject) => { this.subscribeOne((v, tx) => { if (f(v)) { onTrue(new QueryRoot(this.root.db)); } else { onFalse(new QueryRoot(this.root.db)); } }); resolve(); }); } abstract subscribeOne(f: SubscribeOneFn): void; } class FirstQueryValue extends BaseQueryValue { private gotValue = false; private s: QueryStreamBase; constructor(stream: QueryStreamBase) { super(stream.root); this.s = stream; } subscribeOne(f: SubscribeOneFn): void { this.s.subscribe((isDone, value, tx) => { if (this.gotValue) { return; } if (isDone) { f(undefined, tx); } else { f(value, tx); } this.gotValue = true; }); } } class MapQueryValue extends BaseQueryValue { constructor(private v: BaseQueryValue, private mapFn: (x: T) => S) { super(v.root); } subscribeOne(f: SubscribeOneFn): void { this.v.subscribeOne((v, tx) => this.mapFn(v)); } } /** * Exception that should be thrown by client code to abort a transaction. */ export const AbortTransaction = Symbol("abort_transaction"); abstract class QueryStreamBase implements QueryStream { abstract subscribe( f: (isDone: boolean, value: any, tx: IDBTransaction) => void, ): void; constructor(public root: QueryRoot) {} first(): QueryValue { return new FirstQueryValue(this); } flatMap(f: (x: T) => S[]): QueryStream { return new QueryStreamFlatMap(this, f); } map(f: (x: T) => S): QueryStream { return new QueryStreamMap(this, f); } indexJoin( index: Index, keyFn: (obj: T) => I, ): QueryStream> { this.root.addStoreAccess(index.storeName, false); return new QueryStreamIndexJoin( this, index.storeName, index.indexName, keyFn, ); } indexJoinLeft( index: Index, keyFn: (obj: T) => I, ): QueryStream> { this.root.addStoreAccess(index.storeName, false); return new QueryStreamIndexJoinLeft( this, index.storeName, index.indexName, keyFn, ); } keyJoin( store: Store, keyFn: (obj: T) => I, ): QueryStream> { this.root.addStoreAccess(store.name, false); return new QueryStreamKeyJoin(this, store.name, keyFn); } filter(f: (x: any) => boolean): QueryStream { return new QueryStreamFilter(this, f); } toArray(): Promise { const { resolve, promise } = openPromise(); const values: T[] = []; this.subscribe((isDone, value) => { if (isDone) { resolve(values); return; } values.push(value); }); return Promise.resolve() .then(() => this.root.finish()) .then(() => promise); } fold(f: (x: T, acc: A) => A, init: A): Promise { const { resolve, promise } = openPromise(); let acc = init; this.subscribe((isDone, value) => { if (isDone) { resolve(acc); return; } acc = f(value, acc); }); return Promise.resolve() .then(() => this.root.finish()) .then(() => promise); } forEach(f: (x: T) => void): Promise { const { resolve, promise } = openPromise(); this.subscribe((isDone, value) => { if (isDone) { resolve(); return; } f(value); }); return Promise.resolve() .then(() => this.root.finish()) .then(() => promise); } run(): Promise { const { resolve, promise } = openPromise(); this.subscribe((isDone, value) => { if (isDone) { resolve(); return; } }); return Promise.resolve() .then(() => this.root.finish()) .then(() => promise); } } type FilterFn = (e: any) => boolean; type SubscribeFn = (done: boolean, value: any, tx: IDBTransaction) => void; type SubscribeOneFn = (value: any, tx: IDBTransaction) => void; class QueryStreamFilter extends QueryStreamBase { constructor(public s: QueryStreamBase, public filterFn: FilterFn) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } if (this.filterFn(value)) { f(false, value, tx); } }); } } class QueryStreamFlatMap extends QueryStreamBase { constructor(public s: QueryStreamBase, public flatMapFn: (v: T) => S[]) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } const values = this.flatMapFn(value); for (const v in values) { f(false, v, tx); } }); } } class QueryStreamMap extends QueryStreamBase { constructor(public s: QueryStreamBase, public mapFn: (v: S) => T) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } const mappedValue = this.mapFn(value); f(false, mappedValue, tx); }); } } class QueryStreamIndexJoin extends QueryStreamBase> { constructor( public s: QueryStreamBase, public storeName: string, public indexName: string, public key: any, ) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } const joinKey = this.key(value); console.log("***** JOINING ON", joinKey); const s = tx.objectStore(this.storeName).index(this.indexName); const req = s.openCursor(IDBKeyRange.only(joinKey)); req.onsuccess = () => { const cursor = req.result; if (cursor) { console.log(`join result for ${joinKey}`, { left: value, right: cursor.value }); f(false, { left: value, right: cursor.value }, tx); cursor.continue(); } }; }); } } class QueryStreamIndexJoinLeft extends QueryStreamBase< JoinLeftResult > { constructor( public s: QueryStreamBase, public storeName: string, public indexName: string, public key: any, ) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } const s = tx.objectStore(this.storeName).index(this.indexName); const req = s.openCursor(IDBKeyRange.only(this.key(value))); let gotMatch = false; req.onsuccess = () => { const cursor = req.result; if (cursor) { gotMatch = true; f(false, { left: value, right: cursor.value }, tx); cursor.continue(); } else { if (!gotMatch) { f(false, { left: value }, tx); } } }; }); } } class QueryStreamKeyJoin extends QueryStreamBase> { constructor( public s: QueryStreamBase, public storeName: string, public key: any, ) { super(s.root); } subscribe(f: SubscribeFn) { this.s.subscribe((isDone, value, tx) => { if (isDone) { f(true, undefined, tx); return; } const s = tx.objectStore(this.storeName); const req = s.openCursor(IDBKeyRange.only(this.key(value))); req.onsuccess = () => { const cursor = req.result; if (cursor) { f(false, { left: value, right: cursor.value }, tx); cursor.continue(); } else { f(true, undefined, tx); } }; }); } } class IterQueryStream extends QueryStreamBase { private storeName: string; private options: any; private subscribers: SubscribeFn[]; constructor(qr: QueryRoot, storeName: string, options: any) { super(qr); this.options = options; this.storeName = storeName; this.subscribers = []; const doIt = (tx: IDBTransaction) => { const { indexName = void 0, only = void 0 } = this.options; let s: any; if (indexName !== void 0) { s = tx.objectStore(this.storeName).index(this.options.indexName); } else { s = tx.objectStore(this.storeName); } let kr: IDBKeyRange | undefined; if (only !== undefined) { kr = IDBKeyRange.only(this.options.only); } const req = s.openCursor(kr); req.onsuccess = () => { const cursor: IDBCursorWithValue = req.result; if (cursor) { for (const f of this.subscribers) { f(false, cursor.value, tx); } cursor.continue(); } else { for (const f of this.subscribers) { f(true, undefined, tx); } } }; }; this.root.addWork(doIt); } subscribe(f: SubscribeFn) { this.subscribers.push(f); } } /** * Root wrapper around an IndexedDB for queries with a fluent interface. */ export class QueryRoot { private work: Array<(t: IDBTransaction) => void> = []; private stores: Set = new Set(); private kickoffPromise: Promise; /** * Some operations is a write operation, * and we need to do a "readwrite" transaction/ */ private hasWrite: boolean; private finishScheduled: boolean; private finished: boolean = false; private keys: { [keyName: string]: IDBValidKey } = {}; constructor(public db: IDBDatabase) {} /** * Get a named key that was created during the query. */ key(keyName: string): IDBValidKey | undefined { return this.keys[keyName]; } private checkFinished() { if (this.finished) { throw Error("Can't add work to query after it was started"); } } /** * Get a stream of all objects in the store. */ iter(store: Store): QueryStream { this.checkFinished(); this.stores.add(store.name); this.scheduleFinish(); return new IterQueryStream(this, store.name, {}); } /** * Count the number of objects in a store. */ count(store: Store): Promise { this.checkFinished(); const { resolve, promise } = openPromise(); const doCount = (tx: IDBTransaction) => { const s = tx.objectStore(store.name); const req = s.count(); req.onsuccess = () => { resolve(req.result); }; }; this.addWork(doCount, store.name, false); return Promise.resolve() .then(() => this.finish()) .then(() => promise); } /** * Delete all objects in a store that match a predicate. */ deleteIf( store: Store, predicate: (x: T, n: number) => boolean, ): QueryRoot { this.checkFinished(); const doDeleteIf = (tx: IDBTransaction) => { const s = tx.objectStore(store.name); const req = s.openCursor(); let n = 0; req.onsuccess = () => { const cursor: IDBCursorWithValue | null = req.result; if (cursor) { if (predicate(cursor.value, n++)) { cursor.delete(); } cursor.continue(); } }; }; this.addWork(doDeleteIf, store.name, true); return this; } iterIndex( index: Index, only?: S, ): QueryStream { this.checkFinished(); this.stores.add(index.storeName); this.scheduleFinish(); return new IterQueryStream(this, index.storeName, { indexName: index.indexName, only, }); } /** * Put an object into the given object store. * Overrides if an existing object with the same key exists * in the store. */ put(store: Store, val: T, keyName?: string): QueryRoot { this.checkFinished(); const doPut = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).put(val); if (keyName) { req.onsuccess = () => { this.keys[keyName] = req.result; }; } }; this.scheduleFinish(); this.addWork(doPut, store.name, true); return this; } /** * Put an object into a store or return an existing record. */ putOrGetExisting(store: Store, val: T, key: IDBValidKey): Promise { this.checkFinished(); const { resolve, promise } = openPromise(); const doPutOrGet = (tx: IDBTransaction) => { const objstore = tx.objectStore(store.name); const req = objstore.get(key); req.onsuccess = () => { if (req.result !== undefined) { resolve(req.result); } else { const req2 = objstore.add(val); req2.onsuccess = () => { resolve(val); }; } }; }; this.scheduleFinish(); this.addWork(doPutOrGet, store.name, true); return promise; } putWithResult(store: Store, val: T): Promise { this.checkFinished(); const { resolve, promise } = openPromise(); const doPutWithResult = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).put(val); req.onsuccess = () => { resolve(req.result); }; this.scheduleFinish(); }; this.addWork(doPutWithResult, store.name, true); return Promise.resolve() .then(() => this.finish()) .then(() => promise); } /** * Update objects inside a transaction. * * If the mutation function throws AbortTransaction, the whole transaction will be aborted. * If the mutation function returns undefined or null, no modification will be made. */ mutate(store: Store, key: any, f: (v: T) => T | undefined): QueryRoot { this.checkFinished(); const doPut = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).openCursor(IDBKeyRange.only(key)); req.onsuccess = () => { const cursor = req.result; if (cursor) { const value = cursor.value; let modifiedValue: T | undefined; try { modifiedValue = f(value); } catch (e) { if (e === AbortTransaction) { tx.abort(); return; } throw e; } if (modifiedValue !== undefined && modifiedValue !== null) { cursor.update(modifiedValue); } cursor.continue(); } }; }; this.scheduleFinish(); this.addWork(doPut, store.name, true); return this; } /** * Add all object from an iterable to the given object store. */ putAll(store: Store, iterable: T[]): QueryRoot { this.checkFinished(); const doPutAll = (tx: IDBTransaction) => { for (const obj of iterable) { tx.objectStore(store.name).put(obj); } }; this.scheduleFinish(); this.addWork(doPutAll, store.name, true); return this; } /** * Add an object to the given object store. * Fails if the object's key is already present * in the object store. */ add(store: Store, val: T): QueryRoot { this.checkFinished(); const doAdd = (tx: IDBTransaction) => { tx.objectStore(store.name).add(val); }; this.scheduleFinish(); this.addWork(doAdd, store.name, true); return this; } /** * Get one object from a store by its key. */ get(store: Store, key: any): Promise { this.checkFinished(); if (key === void 0) { throw Error("key must not be undefined"); } const { resolve, promise } = openPromise(); const doGet = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).get(key); req.onsuccess = () => { resolve(req.result); }; }; this.addWork(doGet, store.name, false); return Promise.resolve() .then(() => this.finish()) .then(() => promise); } /** * Get get objects from a store by their keys. * If no object for a key exists, the resulting position in the array * contains 'undefined'. */ getMany(store: Store, keys: any[]): Promise { this.checkFinished(); const { resolve, promise } = openPromise(); const results: T[] = []; const doGetMany = (tx: IDBTransaction) => { for (const key of keys) { if (key === void 0) { throw Error("key must not be undefined"); } const req = tx.objectStore(store.name).get(key); req.onsuccess = () => { results.push(req.result); if (results.length === keys.length) { resolve(results); } }; } }; this.addWork(doGetMany, store.name, false); return Promise.resolve() .then(() => this.finish()) .then(() => promise); } /** * Get one object from a store by its key. */ getIndexed( index: Index, key: I, ): Promise { this.checkFinished(); if (key === void 0) { throw Error("key must not be undefined"); } const { resolve, promise } = openPromise(); const doGetIndexed = (tx: IDBTransaction) => { const req = tx .objectStore(index.storeName) .index(index.indexName) .get(key); req.onsuccess = () => { resolve(req.result); }; }; this.addWork(doGetIndexed, index.storeName, false); return Promise.resolve() .then(() => this.finish()) .then(() => promise); } private scheduleFinish() { if (!this.finishScheduled) { Promise.resolve().then(() => this.finish()); this.finishScheduled = true; } } /** * Finish the query, and start the query in the first place if necessary. */ finish(): Promise { if (this.kickoffPromise) { return this.kickoffPromise; } this.kickoffPromise = new Promise((resolve, reject) => { // At this point, we can't add any more work this.finished = true; if (this.work.length === 0) { resolve(); return; } const mode = this.hasWrite ? "readwrite" : "readonly"; const tx = this.db.transaction(Array.from(this.stores), mode); tx.oncomplete = () => { resolve(); }; tx.onabort = () => { console.warn( `aborted ${mode} transaction on stores [${[...this.stores]}]`, ); reject(Error("transaction aborted")); }; tx.onerror = e => { console.warn(`error in transaction`, (e.target as any).error); }; for (const w of this.work) { w(tx); } }); return this.kickoffPromise; } /** * Delete an object by from the given object store. */ delete(store: Store, key: any): QueryRoot { this.checkFinished(); const doDelete = (tx: IDBTransaction) => { tx.objectStore(store.name).delete(key); }; this.scheduleFinish(); this.addWork(doDelete, store.name, true); return this; } /** * Low-level function to add a task to the internal work queue. */ addWork( workFn: (t: IDBTransaction) => void, storeName?: string, isWrite?: boolean, ) { this.work.push(workFn); if (storeName) { this.addStoreAccess(storeName, isWrite); } } addStoreAccess(storeName: string, isWrite?: boolean) { if (storeName) { this.stores.add(storeName); } if (isWrite) { this.hasWrite = true; } } }