Observables in Angular
Observables are pretty similar to .NET’s Events. The Observable<T>
is a stream
of data that we can subscribe()
to to get the latest values. This
functionality comes with the RxJS package.
An Observable<T>
might deliver 3 kinds of messages:
- data of type
T
- error
- completion - signifies that this
Observable
will not deliver any more data
Observables can be finite, meaning the stream of data will eventually finish and the observer(s) will be notified about that (HTTP request is an example of such an Observable). Observables might also be infinite and the completion will never occur.
Built-in Observables
RxJS comes with a few built-in functions that create observables:
interval
- works similarly to JS’ssetInterval
. We set the interval in milliseconds, incoming values are incresing, like in a counter (starting from 0).create
- we’re given a function where values emitted by the new observables need to ne “manually” pushed out (example is below).- and a few more…
Custom Observables
Here’s how we can create our own Observables. This is a custom interval
implementation:
Operations
Removing Subscription
Subscriptions stay for as long as the app lives. If we are no longer interested in receiving values, we should explicitly remove the subscription. Otherwise, we’d have a memory leak.
In Angular, most often we will want to remove the subscription when some
component is no longer displayed. In such case, the component should implement
OnDestroy
, which comes with the ngOnDestroy
lifecycle hook. We should remove
subscription there.
Error handling
Subscription can have have an error. For example, when doing an HTTP request. In
such a case the subscription “dies” and we do not have to call unsubscribe()
on it.
We can also handle errors by providing error handler as a second argument to
subscribe()
.
Without handling errors explicitly, Observable will throw the error to the console.
catchError
We can also use the catchError
operator. We can attach it once
to some observable, to include some common error handling logic. We can rethrow
an error from withing that operator with throwError(...)
. Then, each
subscriber may handle the rethrown error individually.
Like the example above shows, the individual subscribers can still attach their own error handling logic. The “common” handler will always be fired first, and then the error will be propagated to the subscribers.
Completion
Observables can be finite. Here’s how we can complete custom observables:
Subscribers can be notified about completion via a third argument to the
subscribe()
method:
We don’t need to unsubscribe()
from completed subscriptions.
Operators
We might want to somehow change the behaviour of some subscription. For example, we might want to filter out some values, or maybe transform them into some other data. This is where the Operators come in. They allow us to create new Observables based on existing ones.
Here’s an example:
We apply operators with the pipe()
function. We used the map
operator. There
are lots more in the rxjs/operators
import. For example:
filter
rejects values that do not conform to the provided predicate;catchError
allows to handle error “globally” somehow andthrowError
(possible a new one) down the chaintake
- automatically completes the resultingObservable
after receivingn
items from it. We do not need tounsubscribe()
from it, it will be handled automatically. It is useful when we want to get just one latest value of someObservable
(like theBehaviorSubject
described down below).exhaustMap
similar tomap
, but accepts function that crosses boundaries (more on that in the tip below)
Maps
There’s a nice overview of different kids of maps, like map
, mergeMap
,
switchMap
at
Medium.
Subject
Rxjs comes with a special type of Observable - Subject. It is functionally
very similar to Angular’s EventEmitter<T>
. Reportedly, it’s more performant
though and it’s recommended to always use Subject.
Combining Observables
In some cases we might need to use values from multiple observables to create some new value. It is often used in ngRx effects.
RxJs offers a few ways to combine observables. Some of these are operators, and some are functions that allow us to create new Observables, based on the ones we provide.
Operators
withLatestFrom
It will publish only at points of time when the first observable emits. The first output will come out only after all of the provided observables have emitted at least one value.
combineLatestWith
It’s pretty similar to withLatestFrom
. The difference is that the output is
emitted when any of the input observables emits something, the first observable
is not prioritized in any way.
There also is a
combineLatest function (not
the same as the deprecated operator!) that behaves pretty similar to the
combineLatestWith
operator, and it may be used to create observables from
other observables. The difference is that operator fires when the source
observable emits, while the function like
combineLatest (not an
operator) creates an observable, that one can subscribe to, and only then it
will produce values.