Creation

Rx.Observable.create is an alias for the Observable constructor, and it takes one argument: the subscribe function.

1
2
3
4
var observable = Rx.Observable.create(function subscribe (observer) {
observer.next(1)
// ...
})

Observable can be created with create, but usually we use the so-called creation operators, like, of, from, interval, etc.

Subscription

1
2
3
4
5
observable.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('done'),
})

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

Execution

The code inside Observable.create(function subscribe (observer) { /*...*/ }) represents an Observable execution, a lazy computation that only happens for each Observer that subscribes. The execution produces multiple values over time, either synchronous or asynchronous.

There are three types of values on Observable Execution can deliver:

  • ‘Next’ notification: sends a value such as a Number, a String, an Object, etc.

  • ‘Error’ notification: sends a JavaScript Error or exception.

  • ‘Complete’ notification: does not send a value.

In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.

It’s a good idea to wrap any code in subscribe with try/catch block that will deliver an Error notification if it catches an exception:

1
2
3
4
5
6
7
8
9
var observable = Rx.Observable.create(function subscribe (observer) {
try{
observer.next(1)
observer.next(2)
observer.complete()
} catch (err) {
observer.error(err)
}
})

Disposing Observable Executions

Because Observable Executions may be infinite, and it’s common for an Observer to want abort execution in finite time, we need an API for cancelling an execution. Since each execution is exclusive to one Observer only, once the Observer is done receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources.

observable.subscribe will return the Subscription object.

1
var subscription = observable.subscribe(x => console.log(x))

The Subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution.

1
subscription.unsubscribe()

When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.

You can return a custom unsubscribe function within function subscribe()

1
2
3
4
5
6
7
8
9
var observale = Rx.Observable.create(function subscribe (observer) {
var intervalID = setInterval(() => {
observer.next('hi')
}, 1000)

return function unsubscribe() {
clearInterval(intervalID)
}
})

Observer

An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one of each type of notification delivered by the Observable: next, error, complete.

1
2
3
4
5
var observer = {
next: console.log,
error: console.error,
complete: () => console.log('done'),
}

To use the observer, provide it to the subscirbe of an Observable.

When subscribing to an Observable, you may also just provide the callbacks as arguments, without being attached to an Observer object.

1
2
3
4
5
6
7
observable.subscribe(console.log)
// it will be treated as
observable.subscribe({
console.log,
err => console.error('Observer got an error: ' + err),
() => console.log('Observable got a complete notification'),
})

Subscription

A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A subscription has one important method, unsubscribe, that takes no arguments and just disposes the resources held by the subscription.

Subscriptions can also be put together, so that a call to an unsubscribe() of one Subscription may unsubscribe multiple Subscriptions. You can do this by ‘ading’ one subscription into another:

1
2
3
4
5
6
7
8
var observable1 = Rx.Observable.interval(300)
var observable2 = Rx.Observable.interval(500)

var subscription = observable.subscribe(x => console.log('first: ' + x))
var childSubscription = observable2.subscribe(x => console.log('second: ' + x))

subscription.ad(childSubscription)
subscription.unsubscribe()

Subject

A RxJS Subject is a special type of Observable that allows values to multicasted to many Observers. While plain Observable are unicast(each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

A Subject is like an Observable, but can mutlicast to many Observers. Subjects are like EventEmitter: they maintain a registry of many listener.

Every Subject is an Observable. Given a Subject, you can subscribe to it, providing an Observer, which will start receiving values normally.

Every Subject is an Observer. It is an object with the methods next(), error(e), and complete().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var subject = new Rx.Subject()

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
})
// as observable
subject.next(1)
subject.next(2)

var observable = Rx.Observable.from([1,2,3])
// as observer
observable.subscribe(subject)

multicasted Observables

A multicasted Observable uses a Subject under the hood to make multiple Observers see the same Observable execution.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var source = Rx.Observable.from([1, 2, 3])
var subject = new Rx.Subject()
var multicasted = source.multicast(subject)

// There are, under the hood, `subject.subscribe({...})`
multicasted.subscribe({
next: v => console.log(`observerA ${v}`)
})
multicasted.subscribe({
next: v => console.log(`observerB ${v}`)
})

// This is, under the hood, `source.subscribe(subject)`
multicasted.connect()

multicast returns an Observable that looks like a normal Observale, but works like a subject when it comes to subscribing. multicast returns a COnnectableObservable, which is simply an Observable with the connect method.

The connect method is important to determine exactly when the shared Observable execution will start. Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, whcih you can unsubscribe from in order to cancel the shared Observable execution.

Reference Counting

Calling connect() manually and handling the Subscription is often cumbersome. Ususally we want to automatically connect when the first Observer arrives, and automatically cancel the shared execution when the last Observer unsubscribe.

Consider the following example where subscriptions occurs as outlined by this list:

  1. First Observer subscribes to the mutlicasted Observable.
  2. The multicasted Observable is connected
  3. The next value 0 is delivered to the first Observable.
  4. Second Observer subscribes to the multicasted Observable.
  5. The next value 1 is delivered to the first Observer.
  6. The next value 1 is delivered to the second Observer.
  7. First Observer unsubscribes from the mutlicasted Observable.
  8. The next value 2 is delivered to the second Observer.
  9. Second Observer unsubscribes from the mutlicasted Observable.
  10. The connection to the multicasted Observable is unsubscribed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var source = Rx.Observable.interval(500)
var subject = new Rx.Subject()
var multicasted = source.multicast(subject)
var subscription1, subscription2, subscriptionConnect

subscription1 = multicasted.subscribe({
next: v => console.log('observerA: ' + v)
})

subscriptionConnect = multicasted.connect()

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: v => console.log('observerB: ' + v)
})
}, 600)

setTimeout(() => {
subscription1.unsubscribe()
}, 1200)

setTimeout(() => {
subscription2.unsubscribe()
subscriptionConnect.unsubscribe()
}, 2000)

If we wish to avoid explicit calls to connect(), we can use ConnectableObservable’s refCount() method(reference Counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.

refCount makes the multicasted Observable automatically start executing when the first subscriber arrives and stop executing when the last subscriber leaves.

Below is an example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var source = Rx.Observable.interval(500)
var subject = new Rx.Subject()
var refCounted = source.multicast(subject).refCount()
var subscription1, subscription2, subscriptionConnect

console.log('observerA subscribed')
subscription1 = refCounted.subscribe({
next: v => console.log('observerA: ' + v)
})

setTimeout(() => {
console.log('observerB subscribed')
subscription2 = refCounted.subscribe({
next: v => console.log('observerB: ' + v)
})
}, 600)

setTimeout(() => {
console.log('observerA unsubscribed')
subscription1.unsubscribe()
}, 1200)

setTimeout(() => {
console.log('observerB unsubscribed')
subscription2.unsubscribe()
}, 2000)

The refCount() method only exists on ConnectableObservable, and it returns an Observable, not another ConnectableObservable.

BehaviorSubject

One of the variant of Subjects is the BehaviorSubject, which has a notion of ‘the current value’. It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the ‘current value’ from the BehaviorSubject.

BehaviorSubjects are useful for representing ‘values over time’. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject.

ReplaySubject

A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.

A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers.

When creating a ReplaySubject, you can specify how many values to replay:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var subject = new Rx.ReplaySubject(3) // buffer 3 values for new subscribers

subject.subscribe({
next: v => console.log('ObserverA: ' + v)
})

subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)

subject.subscribe({
next: v => console.log('ObserverB: ' + v)
})

subject.next(5)

You can also specify a window time in milliseconds, besides of the buffer size, to determine how old the recorded values can be.

1
2
3
var subject = new Rx.ReplaySubject(3, 500)

// ...

AsyncSubject

The AsyncSubject is a variant where only the last value of the Observale execution is sent to its observers, and only when the execution completes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var subject = new Rx.AsyncSubject()

subject.subscribe({
next: v => console.log('ObserverA: ' + v)
})

subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)

subject.subscribe({
next: v => console.log('ObserverB: ' + v)
})

subject.next(5)
subject.complete()

Operators

An Operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified.

An Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.

Subscirbing to the output Observable will also subscribe to the input Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function multiplyTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: v => observer.next(10 * v),
error: err => observer.error(err),
complete: () => observer.compelte()
})
})
return output
}

var input = Rx.Observable.from([1, 2, 3])
var output = multiplyTen(input)
output.subscribe(x => console.log(x))

Notice that a subscribe to output will cause input Observable to be subscirbed. It’s called operator subscription chain.

Instance Operators

Instance operators are methods on Observable instances.

1
2
3
4
5
6
7
8
9
10
11
12
Rx.Observable.prototype.multiplyByTen = function multiplyByTen () {
var input = this
return Rx.Observable.create(function subscirbe (observer) {
input.subscribe({
next: v => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete(),
})
})
}

var observable = Rx.Observable.from([1, 2, 3]).multiplyByTen()

Instance operators are functions that use the this keyword to infer waht is the input Observable.

Static Operators

Static operators are functions attached ot the Observable class directly. A static operator uses no this keyword internally, but instead relies entirely on its arguments.

Static Operators are usually used to create Observables from scratch.

1
var observable = Rx.Observable.interval(1000 /* number of milliseconds */)

Some combination operators may be static, such as merge, combineLatest, concat, etc. These make sense as static operators because they take multiple Observables as input, not just one.