summaryrefslogtreecommitdiff
path: root/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
diff options
context:
space:
mode:
Diffstat (limited to 'tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts')
-rw-r--r--tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts183
1 files changed, 0 insertions, 183 deletions
diff --git a/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts b/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
deleted file mode 100644
index c12825f765..0000000000
--- a/tools/node_modules/eslint/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
+++ /dev/null
@@ -1,183 +0,0 @@
-import { Subject, SubjectSubscriber } from '../Subject';
-import { Operator } from '../Operator';
-import { Observable } from '../Observable';
-import { Subscriber } from '../Subscriber';
-import { Subscription } from '../Subscription';
-import { TeardownLogic } from '../types';
-import { refCount as higherOrderRefCount } from '../operators/refCount';
-
-/**
- * @class ConnectableObservable<T>
- */
-export class ConnectableObservable<T> extends Observable<T> {
-
- protected _subject: Subject<T>;
- protected _refCount: number = 0;
- protected _connection: Subscription;
- /** @internal */
- _isComplete = false;
-
- constructor(public source: Observable<T>,
- protected subjectFactory: () => Subject<T>) {
- super();
- }
-
- /** @deprecated This is an internal implementation detail, do not use. */
- _subscribe(subscriber: Subscriber<T>) {
- return this.getSubject().subscribe(subscriber);
- }
-
- protected getSubject(): Subject<T> {
- const subject = this._subject;
- if (!subject || subject.isStopped) {
- this._subject = this.subjectFactory();
- }
- return this._subject;
- }
-
- connect(): Subscription {
- let connection = this._connection;
- if (!connection) {
- this._isComplete = false;
- connection = this._connection = new Subscription();
- connection.add(this.source
- .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
- if (connection.closed) {
- this._connection = null;
- connection = Subscription.EMPTY;
- } else {
- this._connection = connection;
- }
- }
- return connection;
- }
-
- refCount(): Observable<T> {
- return higherOrderRefCount()(this) as Observable<T>;
- }
-}
-
-const connectableProto = <any>ConnectableObservable.prototype;
-
-export const connectableObservableDescriptor: PropertyDescriptorMap = {
- operator: { value: null },
- _refCount: { value: 0, writable: true },
- _subject: { value: null, writable: true },
- _connection: { value: null, writable: true },
- _subscribe: { value: connectableProto._subscribe },
- _isComplete: { value: connectableProto._isComplete, writable: true },
- getSubject: { value: connectableProto.getSubject },
- connect: { value: connectableProto.connect },
- refCount: { value: connectableProto.refCount }
-};
-
-class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
- constructor(destination: Subject<T>,
- private connectable: ConnectableObservable<T>) {
- super(destination);
- }
- protected _error(err: any): void {
- this._unsubscribe();
- super._error(err);
- }
- protected _complete(): void {
- this.connectable._isComplete = true;
- this._unsubscribe();
- super._complete();
- }
- protected _unsubscribe() {
- const connectable = <any>this.connectable;
- if (connectable) {
- this.connectable = null;
- const connection = connectable._connection;
- connectable._refCount = 0;
- connectable._subject = null;
- connectable._connection = null;
- if (connection) {
- connection.unsubscribe();
- }
- }
- }
-}
-
-class RefCountOperator<T> implements Operator<T, T> {
- constructor(private connectable: ConnectableObservable<T>) {
- }
- call(subscriber: Subscriber<T>, source: any): TeardownLogic {
-
- const { connectable } = this;
- (<any> connectable)._refCount++;
-
- const refCounter = new RefCountSubscriber(subscriber, connectable);
- const subscription = source.subscribe(refCounter);
-
- if (!refCounter.closed) {
- (<any> refCounter).connection = connectable.connect();
- }
-
- return subscription;
- }
-}
-
-class RefCountSubscriber<T> extends Subscriber<T> {
-
- private connection: Subscription;
-
- constructor(destination: Subscriber<T>,
- private connectable: ConnectableObservable<T>) {
- super(destination);
- }
-
- protected _unsubscribe() {
-
- const { connectable } = this;
- if (!connectable) {
- this.connection = null;
- return;
- }
-
- this.connectable = null;
- const refCount = (<any> connectable)._refCount;
- if (refCount <= 0) {
- this.connection = null;
- return;
- }
-
- (<any> connectable)._refCount = refCount - 1;
- if (refCount > 1) {
- this.connection = null;
- return;
- }
-
- ///
- // Compare the local RefCountSubscriber's connection Subscription to the
- // connection Subscription on the shared ConnectableObservable. In cases
- // where the ConnectableObservable source synchronously emits values, and
- // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
- // execution continues to here before the RefCountOperator has a chance to
- // supply the RefCountSubscriber with the shared connection Subscription.
- // For example:
- // ```
- // range(0, 10).pipe(
- // publish(),
- // refCount(),
- // take(5),
- // ).subscribe();
- // ```
- // In order to account for this case, RefCountSubscriber should only dispose
- // the ConnectableObservable's shared connection Subscription if the
- // connection Subscription exists, *and* either:
- // a. RefCountSubscriber doesn't have a reference to the shared connection
- // Subscription yet, or,
- // b. RefCountSubscriber's connection Subscription reference is identical
- // to the shared connection Subscription
- ///
- const { connection } = this;
- const sharedConnection = (<any> connectable)._connection;
- this.connection = null;
-
- if (sharedConnection && (!connection || sharedConnection === connection)) {
- sharedConnection.unsubscribe();
- }
- }
-}