Observable
Direct Subclass
ConnectableObservable
GroupedObservable
Subject
Indirect Subclass
AnonymousSubject
AsyncSubject
BehaviorSubject
ReplaySubject
Static Public Methods
- bindCallback
public static bindCallback(func: function, selector: function, schedular: Schedular): function(...params: *): Observable
Convert a callback API to a function that returns an Observable
Give it a function f of type f(x, callback) and it will return a function g taht when called as g(x) will output an Observable.
bindCallback
is not an operator because its input and output are not Observables.
The input is a function with some parameters, but the last parameter must be a callback function.
The output of the bindCallback
is a function that takes the same parameters as original function takes, except the last one(the callback). When the output function is called with arguments, it will return an Observable.
If the original function’s callback takes one argument, the Observable will emit that value. If on the other hand callback is called with multiple values, resulting Observable will emit an array with these argumetns.
More in Origin Article
- bindNodeCallback
public static bindNodeCallback(func: function, scheduler: Schedular): function(...param: *): Observable
- combineLatest
public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, schedular: Schedular): Observable
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.
Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of the formula.
1 | // combine two timer Observable |
- concat
public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Schedular): Observable
Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.
1 | // Concatenate a timer counting form 0 to 3 with a synchronous sequence from 1 to 10 |
- create
public static create(onSubscription: function(observer: Observer): TearDownLogic): Observable
Creates a new Observable, that will execute the specified function when an Observer subscribes to it.
1 | var observable = Rx.Observable.create(function(observer) { |
- defer
public static defer(observableFactory: function(): SubscribableOrPromise): Observable
Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer
Creates the Observable lazily, that is, only when it is subscribed.
1 | // when defer is called, it returns an Observable |
- empty
public static empty(scheduler: Scheduler): Observable
Creates an Observable that emits no items to the Observer and immediately emits a completion notification.
Just emits ‘complete’ and nothing else
1 | var result = Rx.Observable.empty().startWith(8) |
- from
public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>
Creates an Observable from an Array, an array-like Object, a Promise, an iterable object, or an Observable-like Object
Convert almost anything to an Observable
- fromEvent
public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>
Creates an Observable from DOM events, or Node EventEmitter events or others
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- fromEventPattern
public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T>
Converts any addHandler/removeHandler API to an Observable
1 | function addClickHandler(handler) { |
- fromPromise
public static fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T>
Returns an Observable that just emit the Promise’s resolved value, then complete
1 | var result = Rx.Observable.fromPromise(fetch('reqres.in/api/users')) |
- interval
public static interval(period: number, scheduler: Scheduler): Observable
Emits incremental numbers periodically in time
- merge
public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable
Creates an output Observable which concurrently emits all values from every given input Observable
Flatten multiple Observables together by blending their values into one Observable
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- never
public static never(): Observable
Creates an Observable that emits no items to the Observer
An Observable that never emits anything
1 | Rx.Observable.never().startWith(7).subscribe(console.log) |
- of
public static of(values: ...T, scheduler: Scheduler): Observable<T>
Creates an Observbale that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.
Emits the arguments you provide, then complete.
- range
public static range(start: number, count: number, scheduler: Scheduler): Observable
Creates an Observable that emits a sequence of numbers within a specified range.
Emits a sequence of numbers in a range
- throw
public static throw(error: any, scheduler: Scheduler): Observable
Just emit ‘error’ and nothing else.
- timer
public static timer(initialDelay: number|Date, period: number, scheduler: Scheduler): Observable
Creates an Observable that starts emitting after an initialDelay
and emits ever increasing numbers after each period
of time thereafter.
It’s like interval, but you can specify when should the emissions start.
- webSocket
public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): Observable
1 | let subject = Rx.Observable.webSocket('ws://localhost:8001') |
- zip
public static zip(observables: *): Observable<R>
Combines multiple Observables to create an Observable whose values are calculated from the valeus, in order, of each of its input Observables.
1 | let age$ = Rx.Observable.of(27, 25, 29) |
- constructor
public constructor (subsribe: Function)
- audit
public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>
Ignores source value for a duration determined by another Observable, then emis the most recent value from the source Observable, then repeats this process.
It’s like auditTime, but the silencing duration is determined by a second Observable.
audit
is similar to throttle
, but emits the last value from the silenced time window.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- auditTime
public auditTime(duration: number, scheduler: Scheduler): Observable<T>
Ignores source values for duration
milliseconds, then emits the most recent value from the source Observable, then repeats this process.
Similar to throttleTime
- buffer
public buffer(closingNotifier: Observable<any>): Observable<T>
Buffers the source Observable values until closingNotifier
emits.
Collect values from the past as an array, and emits that array when another Observable emits.
- bufferCount
public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>
1 | var interval = Rx.Observable.interval(1000) |
- bufferTime
public bufferTime(bufferTimeSpan: number, bufferCreationIntervel: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>
Buffers the source Observable values for a specific time period.
Collect values from the past as an array, and emits those array periodically in time.
- bufferToggle
public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]>
- bufferWhen
public bufferWhen(closingSelector: function(): Obversable): Obversable<T[]>
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit and reset the buffer.
- catch
public catch(selector: function): Obversable
Catches errors on the observable to be handled by returning a new obversable or throwing an error.
1 | Rx.Observable.of(1, 2, 3, 4, 5) |
- combineAll
public combineAll(project: function): Observable
Convert a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest
Similar to CombineLatest
Flatten an Observable-of-Observables by applying combineLatest when the Observable-of-Observable completes
1 | var clicks = Rx.Observable.fromEvent(document, 'click'); |
- combineLatest
public combineLatest(other: ObservableInput, project: function): Observable
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observable.
- concat
public concat(other: ObservableInput, scheduler: Scheduler): Observable
- concatAll
public concatAll(): Observable
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observable in order.
Flatten an Observable-of-Observable by putting one inner Observable after the other
- concatMap
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
Map => Concat
Map each value to an Observable, then flatten all of these inner Observables using concatAll
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- concatMaoTo
public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observer
It’s like concatMap, but maps each value always to the same inner Observable
- count
public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable
Counts the number of emissions on the source and emits the number when the source completes.
1 | // count how many seconds have passed before the first click |
- debounce
public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable
Emits a value from the source Observable only after a particular time span determined by another Observable has passes without another source emission.
It’s like debounceTime, but the time span of emission silence is determined by a second Observable.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- debounceTime
public debounceTime(dueTime: number, scheduler: Scheduler): Observable
Emits a value from the source Observable only after a particular time span has passed without another source emission.
It’s like delay, but passes only the most recent value from each burst of emissions.
- defaultEmpty
public defaultIfEmpty(defaultValue: any): Observable
Emits a given value if the source Observable completes without emitting any next
value, otherwise mirrors the source Observable.
If the source Observable turns out to be empty, then this operator will emit a default value.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- delay
public delay(delay: number|Date, scheduler: Scheduler): Observable
Delays the emission of items from the source Observable by a given timeout or until a given Date.
- delayWhen
public delayWhen(delayDurationSelector: function(value: T): Observable): Observable
It’s like delay, but the time span of the delay duration is determined by a second Observable.
- dematerialize
public dematerialize(): Observable
Convert an Observable of Notification objects into the emissions that they represent.
Unwraps Notification objects as actual next, error, and complete emissions. The opposite of materialize.
dematerialize
is assumed to operate an Observable that only emits Notification
objects as next
emission, and does not emit any error
. Such Observable is the output of a materialize
operation. Those notifications are then unwrapped using the metadata they contain, and emitted as next
, error
and complete
on the output Observable.
1 | var notiA = new Rx.Notification('N', 'A') |
- distinct
public distinct(keySelector: function, flushes: Observable): Observable
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
1 | Rx.Observable.of(1,1,2,2,1,1,3,4,5,67,6,6,).distinct() |
Note: ‘1’ is different from 1
- distinctUntilChanged
public distinctUntilChanged(compare: function): Observable
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. (not items)
- distinctUntilKeyChanged
public distinctUntilKeyChanged(key: string, compare: function): Observable
1 | interface Person { |
- do
public do(nextOrObserver: Observer|function, error: function, complete: function): Observable
Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.
Intercepts each emission on the source and runs a function, but returns an output which is identical to the source.
- elemetnAt
public elementAt(index: number, defaultValue: T): Observable
Emits the single value at the specified index
in a sequence of emissions from the source Observable.
Emits only the i-th value , then completes.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- every
public every(predicate: function, thisArg: any): Observable
Returns an Observable that emits whether or not every item of the source satisfies the condition specified.
1 | // all elements are less than 5, otherwise false |
- exhaust
public exhaust(): Observable
Convert a higher-order Observable into a first-order Observable by dropping inner Observables while the previoud inner Observable has not yet completed.
Flatten an Observable-of-Observables by dropping the next inner Observable while the current inner is still executing.
exhaust
ingores every new inner Observable if the previous Observable has not yet completed.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- exhaustMap
public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.
- expand
public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable
Recursively projects each source value to an Observable which is merged in the output Observable.
It’s similar to mergeMap, but applies the projection function to every source value as well as every output value. It’s recursive.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- filter
public filter(predicate: function(value T, index: number): boolean, thisArg: any): Observable
Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- find
public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>
Emits only the first value emitted by the source Observable that meets some condition.
- findIndex
public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>
It’s like find, but emits the index of the found value, not the value itself.
- first
public first(predicate: function(value: T, index: number, source: Obsevable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T|R>
Emits only the first value (or the fisrt value that meets some condition) emitted by the source Obsevable
- forEach
public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise
- groupBy
public groupBy(keySelector: function(value: T): K, elementSelector: functioN(value: T): R, durationSelector: function(grouped: GroupedObservable<K|R>): Observable<any>): Observable<GroupedObservable<K, R>>
- ignoreElements
public ignoreElements(): Observable
Iggnores all items emitted by the source Observable and only pass calls of complete
or error
- isEmpty()
public isEmpty(): Observable
- last
public last(predicate: function): Observable
- letProto
public letProto(func: *): Observable<T>
- lift
public lift(operator: Operator): Observable
Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable’s operator.
Return a new Observable with the Operator applied.
- map
public map(project: function(value: T, index: number): R, thisArg: any): Observable
Applies a given project
function to each value emitted by the source Observable, and emits the resulting values as an Observable.
1 | var clicks = Rx.Observable.fromEvent(document, 'click') |
- mapTo
public mapTo(value: any): Observbale
Emits the given constant value on the output Observable every time the source Observable emits a value.
- materialize
public materialize(): Observable<Notification<T>>
Represents all of the notifications from the source Observable as next
emission marked with their orignal types within Notification
objects.
- max
public max(comparer: Function): Observable
1 | Rx.Observable.of(5, 3, 9, 8).max() |
- merge
public merge(other: ObservableInpt, concurrent: number, scheduler: Scheduer): Observable
Creates an output Observable which concurrently emits all values from every given input Observable.
Flatten multiple Observable together by blending their values into one Observable.
- mergeAll
public mergeAll(concurrent: number): Obsevable
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
- mergeMap
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
Projects each source value to an Observable which is merged in the output Observable.
Map each value to an Observable, then flattn all of these inner Observable using mergeAll
1 | var letters = Rx.Observable.of('a', 'b', 'c') |
- mergeMapTo
public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
Projects each source value to the same Observable which is merged multiple times in the output Observable.
- mergeScan
public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R>
Applies an accumulator function over the source Observable where the accumulator function itself retunrs an Observable, then each intermediate Observable is merged into the output Obsevable.
1 | const click$ = Rx.Observable.fromEvent(document, 'click') |
- min
public min(comparer: Function): Observable<R>
- multicast
public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable
Returns an Observable tghat emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
- observeOn
public observeOn(scheduler: *, delay: *): Observable<R>
- pairwise
public pairwise(): Observable<Array<T>>
Groups pairs of consecutive emissions together and emits them as an array of two values.
Puts the current value and previous value together as an array, and emits that.
- partition
`public partition(predicate: function(value: T, index: number): Boolean, thisArg: this): [Observable
Splits the source Observable into two, one with values that satisfy a predicate, and another values that don’t satisfy the predicate.
It’s like filter, but returns two Observable, one like the output of filter, and the other with values that did not pass the condition.
- pluck
public pluck(properties: ...string): Observable
Maps each source value (an object) to its specified nested property
Like map, but meant only for picking on eof the nested properties of every emitted object.
1 | const clicks$ = Rx.Observable.fromEvent(document, 'click') |
- publish
public publish(selector: Function): *
Returns a ConnectableObservable, which is a variety of Observable that waits until connect method is called before it begins emitting items to those Observers that have subscribed to it.
publishBehavior
publishLast
publishReplay
race
public race(...observables): Observable
Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.
- reduce
public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R>
- repeat
public repeat(count: number): Observable
- repeatWhen
public repeatWhen(notifier: functioN(notification: Observable): Observable): Observable
- retry
public retry(count: number): Observable
Returns an Observable that mirrors the source Observable with the exception of an error
, if the source Observable calls error
, this method will resubscribe to the source Observable for a maximum of count
resubscriptions rather than propagating the error
call.
- retryWhen
public retryWhen(notifier: function(errors: Observable): Observable): Observable
- sample
public sample(notifier: Observable<any>): Observable<T>
Emits the most recently emitted value from the source Observable whenever another Observable, the notifier
emits.
It’s like sampleTime, but sampels whenever the notifier Observable emits.
- sampleTime
public sampleTime(period: number, scheduler: Scheduler): Observable<T>
- scan
public scan(accumulate: function(acc: R, value: T, index: number): R, seed: T|R): Observable<R>
Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.
It’s like reduce, but emits the current accumulation whenever the source emits a value
- sequenceEqual(compareTo: Observbale, comparor: function): Observable`
Compares all values of two observbales in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.
Checks to see of all values emitted by both observables are equal in order.
- share
public share(): Observable<T>
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for .publish().refCount().
- single
public single(predicate: Function): Observable<T>
- skip
public skip(count: number): Observable
- skipUntil
public skipUntil(notifier: Observable): Observable<T>
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
- skipWhile
public skipWhile(predicate: Function): Observable
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
- startWith
public startWith(values: ...T, scheduler: Scheduler): Observable
Returns an Observable that emits the items you specified before it begins to emit items by the source Observable.
- subscribeOn
public subscribeOn(scheduler: Scheduler): Observable<T>
Asynchornously subscribes Observables to this Observable on the specified Scheduler.
- switch
public switch(): Observable<T>
Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.
Flatten an Observable-of-Observables by dropping the previous inner Obsevable once a new one appears.
- switchMap
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recent projected Observable.
Map each value to an Observable, then flatten all of these Observable using switch.
- take
public take(count: number): Observable<T>
Emits only the first count
values emitted by the source Observable.
Take the first count value from the source, then complete.
- takeLast
takeLast(count: number): Observable<T>
Emits only the last count
values emitted by the source Observable.
Remembers the latest count values, then emits those only when the source completes.
- takeUntil
public takeUntil(notifier: Observable): Observable<T>
Emits the values emitted by the source Observable until a notifier
Observable emits a value.
Let values pass until a second Observable, notifier, emits something. Then, it completes.
- takeWhile
public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T>
Emits values emitted by the source Observable so long as each value satisfies the given predicate
, and then completes as soon as this predicate
is not satisfied.
Take values from the source only while they pass the condition given, when the first value does not satisfy, it completes.
- throttle
public throttle(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>
Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.
It’s like throttleTime, but the silencing duration is determined by a second Observable.
Cannot emit more than one values until the second Observable emits a value.
- throttleTime
public throttleTime(duration: number, scheduler: Scheduler): Observable<T>
Emits a value from the source Observable, then ignores subsequent source values for duration
milliseconds, then repeats this process.
Lets a value pass, then ignores source values for the next duration milliseconds.
- timeInterval
public timeInterval(scheduler: *): Observable<TimeInterval<any>>
- timeout
public timeout(due: number, scheduler: Scheduler): Observable<T>
- timeoutWith
public timeoutWith(due: *, withObservable: *, scheduler: Scheduler): Observable<R>
- timestamp
public timestamp(scheduler: *): Observable<Timestamp<>>any
- toArray
public toArray(): Observable<any[]>
- toPromise
public toPromise(PromiseCtor: *): Promise<T>
1 | Rx.Observable.of(42).toPromise().then(console.log) |
- window
public window(windowBoundaries: Observable<any>): Observable<Observable<T>>
Branch out the source Observable values as a nested Observable whenever windowBoundaries
emits.
It’s like buffer, but emits a nested Observable instead of an array.
1 | let clicks = Rx.Observable.fromEvent(document, 'click') |
- windowCount
public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>>
Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize
values.
It’s like bufferCount, but emits a nested Observable instead of an array.
- windowToggle
public windowToggle(openings: Observable<O>, closingSelectors: function(value: O): Observable): Observable<Observable<T>>
Branch out the source Observable values as a nested Observable starting from an emission from openings
and ending when the output of closingSelector
- windowWhen(closingSelector: function(): Observable): Observable<Observable
>`
Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.
It’s like bufferWhen, but emits a nested Observable instead of an array.
- withLatestFrom
public withLatestFrom(other: ObservableInput, project: Function): Observable
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emit.
Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.
- zipAll
public zipAll(project: *): Observable<R>
- zipProto
public zipProto(observables: *): Observable<R>