Original
Playground

Observable

Direct Subclass
  • ConnectableObservable

  • GroupedObservable

  • Subject

Indirect Subclass
  • AnonymousSubject

  • AsyncSubject

  • BehaviorSubject

  • ReplaySubject

Static Public Methods

  • bindCallback

public static bindCallback(func: function, selector: function, schedular: Schedular): function(...params: *): Observable

Convert a callback API to a function that returns an Observable

Give it a function f of type f(x, callback) and it will return a function g taht when called as g(x) will output an Observable.

bindCallback is not an operator because its input and output are not Observables.

The input is a function with some parameters, but the last parameter must be a callback function.

The output of the bindCallback is a function that takes the same parameters as original function takes, except the last one(the callback). When the output function is called with arguments, it will return an Observable.

If the original function’s callback takes one argument, the Observable will emit that value. If on the other hand callback is called with multiple values, resulting Observable will emit an array with these argumetns.

More in Origin Article
  • bindNodeCallback

public static bindNodeCallback(func: function, scheduler: Schedular): function(...param: *): Observable

  • combineLatest

public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, schedular: Schedular): Observable

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of the formula.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// combine two timer Observable
const firstTimer = Rx.Observable.timer(0, 1000)
const secondTimer = Rx.Observable.timer(500, 1000)
const combineTimers = Rx.Observable.combineLatest(firstTimer, secondTimer)
combineTimers.subscribe(console.log)
// logs
// [0, 0] after 0.5s, triggered by secondTimer
// [1, 0] after 1s, triggered by firstTimer
// [1, 1.5] after 1.5s, triggered by secondTimer


// combine an array of observable
const observables = [1, 2, 4].map(n => Rx.Observable.of(n).delay(n * 1000).startWith(0))
const combined = Rx.Observable.combineLatest(observables)
combined.subscribe(console.log)
// logs
// [0, 0, 0] immediately
// [1, 0, 0] after 1s triggered by observables[0]
// [1, 2, 0] after 2s triggered by observables[1]
// [1, 2, 4] after 4s triggered by observables[2]

// use project function to dynamically calculate the body-mass index
const weight = Rx.Observable.of(70, 72, 76, 79, 75)
const height = Rx.Observable.of(1.76, 1.77, 1.78)
const bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w/(h*h))
bmi.subscribe(console.log)
// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
  • concat

public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Schedular): Observable

Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.

1
2
3
4
5
6
7
8
9
10
// Concatenate a timer counting form 0 to 3 with a synchronous sequence from 1 to 10
var timer = Rx.Observable.interval(1000).take(4)
var sequence = Rx.Observable.range(1, 10)
var result = Rx.Observable.concat(timer, sequence)

// concatenate an array of 3 Observables, different from the one above
var timer1 = Rx.Observable.interval(1000).take(10)
var timer2 = Rx.Observable.interval(1000).take(6)
var timer3 = Rx.Observable.interval(1000).take(10)
var result = Rx.Observable.concat([timer1, timer2, timer3])
  • create

public static create(onSubscription: function(observer: Observer): TearDownLogic): Observable

Creates a new Observable, that will execute the specified function when an Observer subscribes to it.

1
2
3
4
5
var observable = Rx.Observable.create(function(observer) {
observer.next(1)
observer.next(2)
observer.next(3)
})
  • defer

public static defer(observableFactory: function(): SubscribableOrPromise): Observable

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer

Creates the Observable lazily, that is, only when it is subscribed.

1
2
3
4
5
6
7
8
// when defer is called, it returns an Observable
var clicksOrInterval = Rx.Observable.defer(function(){
if (Math.random() > 0.5) {
return Rx.Observable.fromEvent(document, 'click')
} else {
return Rx.Observable.interval(1000)
}
})
  • empty

public static empty(scheduler: Scheduler): Observable

Creates an Observable that emits no items to the Observer and immediately emits a completion notification.

Just emits ‘complete’ and nothing else

1
var result = Rx.Observable.empty().startWith(8)
  • from

public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>

Creates an Observable from an Array, an array-like Object, a Promise, an iterable object, or an Observable-like Object

Convert almost anything to an Observable

  • fromEvent

public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>

Creates an Observable from DOM events, or Node EventEmitter events or others

1
var clicks = Rx.Observable.fromEvent(document, 'click')
  • fromEventPattern

public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T>

Converts any addHandler/removeHandler API to an Observable

1
2
3
4
5
6
7
8
9
10
11
12
function addClickHandler(handler) {
document.addEventListener('click', handler)
}

function removeClick(handler) {
document.removeEventListener('click', handler)
}

var clicks = Rx.Observable.fromEventPattern(
addClickHandler,
removeClickHandler,
)
  • fromPromise

public static fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T>

Returns an Observable that just emit the Promise’s resolved value, then complete

1
var result = Rx.Observable.fromPromise(fetch('reqres.in/api/users'))
  • interval

public static interval(period: number, scheduler: Scheduler): Observable

Emits incremental numbers periodically in time

  • merge

public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable

Creates an output Observable which concurrently emits all values from every given input Observable

Flatten multiple Observables together by blending their values into one Observable

1
2
3
var clicks = Rx.Observable.fromEvent(document, 'click')
var timer = Rx.Observable.interval(1000)
var clicksOrTimer = Rx.Observable.merge(clicks, timer)
  • never

public static never(): Observable

Creates an Observable that emits no items to the Observer

An Observable that never emits anything

1
2
Rx.Observable.never().startWith(7).subscribe(console.log)
// won't console 7
  • of

public static of(values: ...T, scheduler: Scheduler): Observable<T>

Creates an Observbale that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.

Emits the arguments you provide, then complete.

  • range

public static range(start: number, count: number, scheduler: Scheduler): Observable

Creates an Observable that emits a sequence of numbers within a specified range.

Emits a sequence of numbers in a range

  • throw

public static throw(error: any, scheduler: Scheduler): Observable

Just emit ‘error’ and nothing else.

  • timer

public static timer(initialDelay: number|Date, period: number, scheduler: Scheduler): Observable

Creates an Observable that starts emitting after an initialDelay and emits ever increasing numbers after each period of time thereafter.

It’s like interval, but you can specify when should the emissions start.

  • webSocket

public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): Observable

1
let subject = Rx.Observable.webSocket('ws://localhost:8001')
  • zip

public static zip(observables: *): Observable<R>

Combines multiple Observables to create an Observable whose values are calculated from the valeus, in order, of each of its input Observables.

1
2
3
4
5
6
7
8
9
let age$ = Rx.Observable.of(27, 25, 29)
let name$ = Rx.Observable.of('Foo', 'Bar', 'Beer')
let isDev$ = Rx.Observable.of(true, true, false)

Rx.Observable.zip(age$, name$, isDev$, (age, name, isDev) => ({age, name, isDev}))
// outputs
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }
  • constructor

public constructor (subsribe: Function)

  • audit

public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>

Ignores source value for a duration determined by another Observable, then emis the most recent value from the source Observable, then repeats this process.

It’s like auditTime, but the silencing duration is determined by a second Observable.

audit is similar to throttle, but emits the last value from the silenced time window.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
var result = clicks.audit(ev => Rx.Observable.interval(1000))
  • auditTime

public auditTime(duration: number, scheduler: Scheduler): Observable<T>

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

Similar to throttleTime

  • buffer

public buffer(closingNotifier: Observable<any>): Observable<T>

Buffers the source Observable values until closingNotifier emits.

Collect values from the past as an array, and emits that array when another Observable emits.

  • bufferCount

public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>

1
2
var interval = Rx.Observable.interval(1000)
interval.bufferCount(3, 2)
  • bufferTime

public bufferTime(bufferTimeSpan: number, bufferCreationIntervel: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>

Buffers the source Observable values for a specific time period.

Collect values from the past as an array, and emits those array periodically in time.

  • bufferToggle

public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]>

  • bufferWhen

public bufferWhen(closingSelector: function(): Obversable): Obversable<T[]>

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit and reset the buffer.

  • catch

public catch(selector: function): Obversable

Catches errors on the observable to be handled by returning a new obversable or throwing an error.

1
2
3
4
5
Rx.Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n === 4) throw 'four'
return n
}).catch(err => Rx.Observable.of('I' , 'II', 'III', 'IV', 'V'))
  • combineAll

public combineAll(project: function): Observable

Convert a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest

Similar to CombineLatest

Flatten an Observable-of-Observables by applying combineLatest when the Observable-of-Observable completes

1
2
3
4
5
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
Rx.Observable.interval(Math.random()*2000).take(13)
).take(3);
var result = higherOrder.combineAll();
  • combineLatest

public combineLatest(other: ObservableInput, project: function): Observable

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observable.

  • concat

public concat(other: ObservableInput, scheduler: Scheduler): Observable

  • concatAll

public concatAll(): Observable

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observable in order.

Flatten an Observable-of-Observable by putting one inner Observable after the other

  • concatMap

public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Map => Concat

Map each value to an Observable, then flatten all of these inner Observables using concatAll

1
2
3
4
var clicks = Rx.Observable.fromEvent(document, 'click')
var result = clicks.concatMap(ev => {
return Rx.Observable.interval(1000).take(4)
})
  • concatMaoTo

public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observer

It’s like concatMap, but maps each value always to the same inner Observable

  • count

public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable

Counts the number of emissions on the source and emits the number when the source completes.

1
2
3
4
5
6
7
8
9
// count how many seconds have passed before the first click
var seconds = Rx.Observable.interval(1000)
var clicks = Rx.Observable.fromEvent(document, 'click')
var secondsBeforeClick = seconds.takeUntil(clicks)
secondsBeforeClick.count()

// count how many odd numbers are there between 1 and 7
var numbers = Rx.Observable.range(1, 7)
numbers.count(i => i % 2)
  • debounce

public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable

Emits a value from the source Observable only after a particular time span determined by another Observable has passes without another source emission.

It’s like debounceTime, but the time span of emission silence is determined by a second Observable.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
var result = clicks.debounce(() => Rx.Observable.interval(1000))
  • debounceTime

public debounceTime(dueTime: number, scheduler: Scheduler): Observable

Emits a value from the source Observable only after a particular time span has passed without another source emission.

It’s like delay, but passes only the most recent value from each burst of emissions.

  • defaultEmpty

public defaultIfEmpty(defaultValue: any): Observable

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

If the source Observable turns out to be empty, then this operator will emit a default value.

1
2
3
var clicks = Rx.Observable.fromEvent(document, 'click')
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000))
clicksBeforeFive.defaultIfEmpty('no click')
  • delay

public delay(delay: number|Date, scheduler: Scheduler): Observable

Delays the emission of items from the source Observable by a given timeout or until a given Date.

  • delayWhen

public delayWhen(delayDurationSelector: function(value: T): Observable): Observable

It’s like delay, but the time span of the delay duration is determined by a second Observable.

  • dematerialize

public dematerialize(): Observable

Convert an Observable of Notification objects into the emissions that they represent.

Unwraps Notification objects as actual next, error, and complete emissions. The opposite of materialize.

dematerialize is assumed to operate an Observable that only emits Notification objects as next emission, and does not emit any error. Such Observable is the output of a materialize operation. Those notifications are then unwrapped using the metadata they contain, and emitted as next, errorand complete on the output Observable.

1
2
3
4
5
var notiA = new Rx.Notification('N', 'A')
var notiB = new Rx.Notification('N', 'B')
var notiE = new Rx.Notification('E', void 0, new TypeError('x.toUpperCase is not a function'))
var materialized = Rx.Observable.of(notiA, notiB, notiE)
var upperCase = materialized.dematerialize()
  • distinct

public distinct(keySelector: function, flushes: Observable): Observable

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Rx.Observable.of(1,1,2,2,1,1,3,4,5,67,6,6,).distinct()
// 1,2,3,4,5,67,6
// repeated 1 omitted

// keySelector
interface Person {
age: number
name: string
}

Rx.Observable.of<Person>(
{ age: 4, name: 'Foo' },
{ age: 7, name: 'Bar' },
{ age: 5, name: 'Foo' },
).distinct((p: Person) => p.name)

Note: ‘1’ is different from 1

  • distinctUntilChanged

public distinctUntilChanged(compare: function): Observable

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. (not items)

  • distinctUntilKeyChanged

public distinctUntilKeyChanged(key: string, compare: function): Observable

1
2
3
4
5
6
7
8
9
10
interface Person {
age: number
name: string
}

Rx.Observable.of<Person>(
{ age: 4, name: 'Foo' },
{ age: 5, name: 'Bar' },
{ age: 3, naem: 'For' },
).distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 2) === y.substring(0, 2))
  • do

public do(nextOrObserver: Observer|function, error: function, complete: function): Observable

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.

Intercepts each emission on the source and runs a function, but returns an output which is identical to the source.

  • elemetnAt

public elementAt(index: number, defaultValue: T): Observable

Emits the single value at the specified index in a sequence of emissions from the source Observable.

Emits only the i-th value , then completes.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
clicks.elementAt(2)
  • every

public every(predicate: function, thisArg: any): Observable

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

1
2
3
4
// all elements are less than 5, otherwise false
Observable.of(1, 2, 3, 4, 5, 6)
.every(x => x < 5)
.subscribe(x => console.log(x)); // -> false
  • exhaust

public exhaust(): Observable

Convert a higher-order Observable into a first-order Observable by dropping inner Observables while the previoud inner Observable has not yet completed.

Flatten an Observable-of-Observables by dropping the next inner Observable while the current inner is still executing.

exhaust ingores every new inner Observable if the previous Observable has not yet completed.

1
2
3
var clicks = Rx.Observable.fromEvent(document, 'click')
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(3))
higherOrder.exhaust()
  • exhaustMap

public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

  • expand

public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable

Recursively projects each source value to an Observable which is merged in the output Observable.

It’s similar to mergeMap, but applies the projection function to every source value as well as every output value. It’s recursive.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
clicks.mapTo(1).expand(x => Rx.Observable.of(2*x).delay(1000)).take(10)
  • filter

public filter(predicate: function(value T, index: number): boolean, thisArg: any): Observable

Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
clicks.filter(ev => Math.random() > 0.4)
  • find

public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

Emits only the first value emitted by the source Observable that meets some condition.

  • findIndex

public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

It’s like find, but emits the index of the found value, not the value itself.

  • first

public first(predicate: function(value: T, index: number, source: Obsevable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T|R>

Emits only the first value (or the fisrt value that meets some condition) emitted by the source Obsevable

  • forEach

public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise

  • groupBy

public groupBy(keySelector: function(value: T): K, elementSelector: functioN(value: T): R, durationSelector: function(grouped: GroupedObservable<K|R>): Observable<any>): Observable<GroupedObservable<K, R>>

  • ignoreElements

public ignoreElements(): Observable

Iggnores all items emitted by the source Observable and only pass calls of complete or error

  • isEmpty()

public isEmpty(): Observable

  • last

public last(predicate: function): Observable

  • letProto

public letProto(func: *): Observable<T>

  • lift

public lift(operator: Operator): Observable

Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable’s operator.

Return a new Observable with the Operator applied.

  • map

public map(project: function(value: T, index: number): R, thisArg: any): Observable

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

1
2
var clicks = Rx.Observable.fromEvent(document, 'click')
clicks.map(ev => ev.clientX)
  • mapTo

public mapTo(value: any): Observbale

Emits the given constant value on the output Observable every time the source Observable emits a value.

  • materialize

public materialize(): Observable<Notification<T>>

Represents all of the notifications from the source Observable as next emission marked with their orignal types within Notification objects.

  • max

public max(comparer: Function): Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
Rx.Observable.of(5, 3, 9, 8).max()

interface Person {
name: string
age: number
}

Rx.Observable.of<Person>(
{ age: 7, name: 'Foo' },
{ age: 8, name: 'Bar' },
{ age: 0, name: 'For' },
)
.max<Person>((a, b) => a.age - b.age)
  • merge

public merge(other: ObservableInpt, concurrent: number, scheduler: Scheduer): Observable

Creates an output Observable which concurrently emits all values from every given input Observable.

Flatten multiple Observable together by blending their values into one Observable.

  • mergeAll

public mergeAll(concurrent: number): Obsevable

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

  • mergeMap

public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to an Observable which is merged in the output Observable.

Map each value to an Observable, then flattn all of these inner Observable using mergeAll

1
2
var letters = Rx.Observable.of('a', 'b', 'c')
letters.mergeMap(letter => Rx.Observable.interval(1000).map(i => i+letter))
  • mergeMapTo

public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to the same Observable which is merged multiple times in the output Observable.

  • mergeScan

public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R>

Applies an accumulator function over the source Observable where the accumulator function itself retunrs an Observable, then each intermediate Observable is merged into the output Obsevable.

1
2
3
4
const click$ = Rx.Observable.fromEvent(document, 'click')
const one$ = click$.mapTo(1)
const seed = 0
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed)
  • min

public min(comparer: Function): Observable<R>

  • multicast

public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable

Returns an Observable tghat emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

  • observeOn

public observeOn(scheduler: *, delay: *): Observable<R>

  • pairwise

public pairwise(): Observable<Array<T>>

Groups pairs of consecutive emissions together and emits them as an array of two values.

Puts the current value and previous value together as an array, and emits that.

  • partition

`public partition(predicate: function(value: T, index: number): Boolean, thisArg: this): [Observable, Observable]

Splits the source Observable into two, one with values that satisfy a predicate, and another values that don’t satisfy the predicate.

It’s like filter, but returns two Observable, one like the output of filter, and the other with values that did not pass the condition.

  • pluck

public pluck(properties: ...string): Observable

Maps each source value (an object) to its specified nested property

Like map, but meant only for picking on eof the nested properties of every emitted object.

1
2
const clicks$ = Rx.Observable.fromEvent(document, 'click')
clicks$.pluck('target', 'tagName')
  • publish

public publish(selector: Function): *

Returns a ConnectableObservable, which is a variety of Observable that waits until connect method is called before it begins emitting items to those Observers that have subscribed to it.

  • publishBehavior

  • publishLast

  • publishReplay

  • race

public race(...observables): Observable

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.

  • reduce

public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R>

  • repeat

public repeat(count: number): Observable

  • repeatWhen

public repeatWhen(notifier: functioN(notification: Observable): Observable): Observable

1

  • retry

public retry(count: number): Observable

Returns an Observable that mirrors the source Observable with the exception of an error, if the source Observable calls error, this method will resubscribe to the source Observable for a maximum of count resubscriptions rather than propagating the error call.

  • retryWhen

public retryWhen(notifier: function(errors: Observable): Observable): Observable

  • sample

public sample(notifier: Observable<any>): Observable<T>

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier emits.

It’s like sampleTime, but sampels whenever the notifier Observable emits.

  • sampleTime

public sampleTime(period: number, scheduler: Scheduler): Observable<T>

  • scan

public scan(accumulate: function(acc: R, value: T, index: number): R, seed: T|R): Observable<R>

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

It’s like reduce, but emits the current accumulation whenever the source emits a value

  • sequenceEqual(compareTo: Observbale, comparor: function): Observable`

Compares all values of two observbales in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

Checks to see of all values emitted by both observables are equal in order.

  • share

public share(): Observable<T>

Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for .publish().refCount().

  • single

public single(predicate: Function): Observable<T>

  • skip

public skip(count: number): Observable

  • skipUntil

public skipUntil(notifier: Observable): Observable<T>

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

  • skipWhile

public skipWhile(predicate: Function): Observable

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

  • startWith

public startWith(values: ...T, scheduler: Scheduler): Observable

Returns an Observable that emits the items you specified before it begins to emit items by the source Observable.

  • subscribeOn

public subscribeOn(scheduler: Scheduler): Observable<T>

Asynchornously subscribes Observables to this Observable on the specified Scheduler.

  • switch

public switch(): Observable<T>

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

Flatten an Observable-of-Observables by dropping the previous inner Obsevable once a new one appears.

  • switchMap

public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recent projected Observable.

Map each value to an Observable, then flatten all of these Observable using switch.

  • take

public take(count: number): Observable<T>

Emits only the first count values emitted by the source Observable.

Take the first count value from the source, then complete.

  • takeLast

takeLast(count: number): Observable<T>

Emits only the last count values emitted by the source Observable.

Remembers the latest count values, then emits those only when the source completes.

  • takeUntil

public takeUntil(notifier: Observable): Observable<T>

Emits the values emitted by the source Observable until a notifier Observable emits a value.

Let values pass until a second Observable, notifier, emits something. Then, it completes.

  • takeWhile

public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T>

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

Take values from the source only while they pass the condition given, when the first value does not satisfy, it completes.

  • throttle

public throttle(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

It’s like throttleTime, but the silencing duration is determined by a second Observable.

Cannot emit more than one values until the second Observable emits a value.

  • throttleTime

public throttleTime(duration: number, scheduler: Scheduler): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

Lets a value pass, then ignores source values for the next duration milliseconds.

  • timeInterval

public timeInterval(scheduler: *): Observable<TimeInterval<any>>

  • timeout

public timeout(due: number, scheduler: Scheduler): Observable<T>

  • timeoutWith

public timeoutWith(due: *, withObservable: *, scheduler: Scheduler): Observable<R>

  • timestamp

public timestamp(scheduler: *): Observable<Timestamp<>>any

  • toArray

public toArray(): Observable<any[]>

  • toPromise

public toPromise(PromiseCtor: *): Promise<T>

1
Rx.Observable.of(42).toPromise().then(console.log)
  • window

public window(windowBoundaries: Observable<any>): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

It’s like buffer, but emits a nested Observable instead of an array.

1
2
3
let clicks = Rx.Observable.fromEvent(document, 'click')
let interval = Rx.Observable.interval(1000)
clicks.window(interval)
  • windowCount

public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

It’s like bufferCount, but emits a nested Observable instead of an array.

  • windowToggle

public windowToggle(openings: Observable<O>, closingSelectors: function(value: O): Observable): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector

  • windowWhen(closingSelector: function(): Observable): Observable<Observable>`

Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

It’s like bufferWhen, but emits a nested Observable instead of an array.

  • withLatestFrom

public withLatestFrom(other: ObservableInput, project: Function): Observable

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emit.

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

  • zipAll

public zipAll(project: *): Observable<R>

  • zipProto

public zipProto(observables: *): Observable<R>