diff options
Diffstat (limited to 'tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts')
-rw-r--r-- | tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts | 387 |
1 files changed, 0 insertions, 387 deletions
diff --git a/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts deleted file mode 100644 index 0700382222..0000000000 --- a/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ /dev/null @@ -1,387 +0,0 @@ -import { Subject, AnonymousSubject } from '../../Subject'; -import { Subscriber } from '../../Subscriber'; -import { Observable } from '../../Observable'; -import { Subscription } from '../../Subscription'; -import { Operator } from '../../Operator'; -import { ReplaySubject } from '../../ReplaySubject'; -import { Observer, NextObserver } from '../../types'; - -/** - * WebSocketSubjectConfig is a plain Object that allows us to make our - * webSocket configurable. - * - * <span class="informal">Provides flexibility to {@link webSocket}</span> - * - * It defines a set of properties to provide custom behavior in specific - * moments of the socket's lifecycle. When the connection opens we can - * use `openObserver`, when the connection is closed `closeObserver`, if we - * are interested in listening for data comming from server: `deserializer`, - * which allows us to customize the deserialization strategy of data before passing it - * to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming - * from the Server. - * - * ## Example - * **deserializer**, the default for this property is `JSON.parse` but since there are just two options - * for incomming data, either be text or binarydata. We can apply a custom deserialization strategy - * or just simply skip the default behaviour. - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * //Apply any transformation of your choice. - * deserializer: ({data}) => data - * }); - * - * wsSubject.subscribe(console.log); - * - * // Let's suppose we have this on the Server: ws.send("This is a msg from the server") - * //output - * // - * // This is a msg from the server - * ``` - * - * **serializer** allows us tom apply custom serialization strategy but for the outgoing messages - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * //Apply any transformation of your choice. - * serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg}) - * }); - * - * wsSubject.subscribe(() => subject.next("msg to the server")); - * - * // Let's suppose we have this on the Server: ws.send("This is a msg from the server") - * //output - * // - * // {"channel":"webDevelopment","msg":"msg to the server"} - * ``` - * - * **closeObserver** allows us to set a custom error when an error raise up. - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * closeObserver: { - next(closeEvent) { - const customError = { code: 6666, reason: "Custom evil reason" } - console.log(`code: ${customError.code}, reason: ${customError.reason}`); - } - } - * }); - * - * //output - * // code: 6666, reason: Custom evil reason - * ``` - * - * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the - * webSocket or sending notification that the connection was successful, this is when - * openObserver is usefull for. - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * openObserver: { - * next: () => { - * console.log('connetion ok'); - * } - * }, - * }); - * - * //output - * // connetion ok` - * ``` - * */ - -export interface WebSocketSubjectConfig<T> { - /** The url of the socket server to connect to */ - url: string; - /** The protocol to use to connect */ - protocol?: string | Array<string>; - /** @deprecated use {@link deserializer} */ - resultSelector?: (e: MessageEvent) => T; - /** - * A serializer used to create messages from passed values before the - * messages are sent to the server. Defaults to JSON.stringify. - */ - serializer?: (value: T) => WebSocketMessage; - /** - * A deserializer used for messages arriving on the socket from the - * server. Defaults to JSON.parse. - */ - deserializer?: (e: MessageEvent) => T; - /** - * An Observer that watches when open events occur on the underlying web socket. - */ - openObserver?: NextObserver<Event>; - /** - * An Observer than watches when close events occur on the underlying webSocket - */ - closeObserver?: NextObserver<CloseEvent>; - /** - * An Observer that watches when a close is about to occur due to - * unsubscription. - */ - closingObserver?: NextObserver<void>; - /** - * A WebSocket constructor to use. This is useful for situations like using a - * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket - * for testing purposes - */ - WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket }; - /** Sets the `binaryType` property of the underlying WebSocket. */ - binaryType?: 'blob' | 'arraybuffer'; -} - -const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = { - url: '', - deserializer: (e: MessageEvent) => JSON.parse(e.data), - serializer: (value: any) => JSON.stringify(value), -}; - -const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = - 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }'; - -export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; - -export class WebSocketSubject<T> extends AnonymousSubject<T> { - - private _config: WebSocketSubjectConfig<T>; - - /** @deprecated This is an internal implementation detail, do not use. */ - _output: Subject<T>; - - private _socket: WebSocket; - - constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) { - super(); - if (urlConfigOrSource instanceof Observable) { - this.destination = destination; - this.source = urlConfigOrSource as Observable<T>; - } else { - const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG }; - this._output = new Subject<T>(); - if (typeof urlConfigOrSource === 'string') { - config.url = urlConfigOrSource; - } else { - for (let key in urlConfigOrSource) { - if (urlConfigOrSource.hasOwnProperty(key)) { - config[key] = urlConfigOrSource[key]; - } - } - } - - if (!config.WebSocketCtor && WebSocket) { - config.WebSocketCtor = WebSocket; - } else if (!config.WebSocketCtor) { - throw new Error('no WebSocket constructor can be found'); - } - this.destination = new ReplaySubject(); - } - } - - lift<R>(operator: Operator<T, R>): WebSocketSubject<R> { - const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination); - sock.operator = operator; - sock.source = this; - return sock; - } - - private _resetState() { - this._socket = null; - if (!this.source) { - this.destination = new ReplaySubject(); - } - this._output = new Subject<T>(); - } - - /** - * Creates an {@link Observable}, that when subscribed to, sends a message, - * defined by the `subMsg` function, to the server over the socket to begin a - * subscription to data over that socket. Once data arrives, the - * `messageFilter` argument will be used to select the appropriate data for - * the resulting Observable. When teardown occurs, either due to - * unsubscription, completion or error, a message defined by the `unsubMsg` - * argument will be send to the server over the WebSocketSubject. - * - * @param subMsg A function to generate the subscription message to be sent to - * the server. This will still be processed by the serializer in the - * WebSocketSubject's config. (Which defaults to JSON serialization) - * @param unsubMsg A function to generate the unsubscription message to be - * sent to the server at teardown. This will still be processed by the - * serializer in the WebSocketSubject's config. - * @param messageFilter A predicate for selecting the appropriate messages - * from the server for the output stream. - */ - multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - const self = this; - return new Observable((observer: Observer<any>) => { - try { - self.next(subMsg()); - } catch (err) { - observer.error(err); - } - - const subscription = self.subscribe(x => { - try { - if (messageFilter(x)) { - observer.next(x); - } - } catch (err) { - observer.error(err); - } - }, - err => observer.error(err), - () => observer.complete()); - - return () => { - try { - self.next(unsubMsg()); - } catch (err) { - observer.error(err); - } - subscription.unsubscribe(); - }; - }); - } - - private _connectSocket() { - const { WebSocketCtor, protocol, url, binaryType } = this._config; - const observer = this._output; - - let socket: WebSocket = null; - try { - socket = protocol ? - new WebSocketCtor(url, protocol) : - new WebSocketCtor(url); - this._socket = socket; - if (binaryType) { - this._socket.binaryType = binaryType; - } - } catch (e) { - observer.error(e); - return; - } - - const subscription = new Subscription(() => { - this._socket = null; - if (socket && socket.readyState === 1) { - socket.close(); - } - }); - - socket.onopen = (e: Event) => { - const { _socket } = this; - if (!_socket) { - socket.close(); - this._resetState(); - return; - } - const { openObserver } = this._config; - if (openObserver) { - openObserver.next(e); - } - - const queue = this.destination; - - this.destination = Subscriber.create<T>( - (x) => { - if (socket.readyState === 1) { - try { - const { serializer } = this._config; - socket.send(serializer(x)); - } catch (e) { - this.destination.error(e); - } - } - }, - (e) => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - if (e && e.code) { - socket.close(e.code, e.reason); - } else { - observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); - } - this._resetState(); - }, - () => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - socket.close(); - this._resetState(); - } - ) as Subscriber<any>; - - if (queue && queue instanceof ReplaySubject) { - subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination)); - } - }; - - socket.onerror = (e: Event) => { - this._resetState(); - observer.error(e); - }; - - socket.onclose = (e: CloseEvent) => { - this._resetState(); - const { closeObserver } = this._config; - if (closeObserver) { - closeObserver.next(e); - } - if (e.wasClean) { - observer.complete(); - } else { - observer.error(e); - } - }; - - socket.onmessage = (e: MessageEvent) => { - try { - const { deserializer } = this._config; - observer.next(deserializer(e)); - } catch (err) { - observer.error(err); - } - }; - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _subscribe(subscriber: Subscriber<T>): Subscription { - const { source } = this; - if (source) { - return source.subscribe(subscriber); - } - if (!this._socket) { - this._connectSocket(); - } - this._output.subscribe(subscriber); - subscriber.add(() => { - const { _socket } = this; - if (this._output.observers.length === 0) { - if (_socket && _socket.readyState === 1) { - _socket.close(); - } - this._resetState(); - } - }); - return subscriber; - } - - unsubscribe() { - const { _socket } = this; - if (_socket && _socket.readyState === 1) { - _socket.close(); - } - this._resetState(); - super.unsubscribe(); - } -} |