RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables. It enables you to work with asynchronous data streams and events. Observables are the core building blocks, representing sequences of values over time.
More information can be found at https://rxjs.dev
Basic Concepts
Observable: Represents a stream of values or events that can be observed.
Observer: Listens to the values emitted by an Observable.
Operators: Transform, filter, or combine Observables to create new streams.
Error Handling
RxJS allows you to handle errors using the catchError operator:
import { of } from 'rxjs';
import { catchError } from 'rxjs/operators';
const observableWithError = of('Hello', 'World').pipe(
map(() => {
throw new Error('This is an error!');
}),
catchError(error => of(`Caught an error: ${error.message}`))
);
observableWithError.subscribe(value => console.log(value));
Common RxJS operators:
1. Map Operator
The map operator transforms each item emitted by an Observable:
import { from } from 'rxjs';
import { map } from 'rxjs/operators';
const numbers = from([1, 2, 3, 4, 5]);
const doubled = numbers.pipe(
map(x => x * 2)
);
doubled.subscribe(result => console.log(result));
2. Filter Operator
The filter operator allows you to selectively emit items based on a condition:
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
const numbers = from([1, 2, 3, 4, 5]);
const evens = numbers.pipe(
filter(x => x % 2 === 0)
);
evens.subscribe(result => console.log(result));
3. Merge Operator
The merge operator combines multiple Observables into a single Observable:
import { interval, merge } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const source1 = interval(1000).pipe(mapTo('Source 1'));
const source2 = interval(1500).pipe(mapTo('Source 2'));
const merged = merge(source1, source2);
merged.subscribe(result => console.log(result));
4. Concat Operator
The concat operator concatenates multiple Observables, emitting values in sequence:
import { interval, concat } from 'rxjs';
import { take } from 'rxjs/operators';
const source1 = interval(1000).pipe(take(3));
const source2 = interval(1500).pipe(take(3));
const concatenated = concat(source1, source2);
concatenated.subscribe(result => console.log(result));
5. Tap Operator
The tap operator allows you to perform side effects without affecting the emitted values:
import { from } from 'rxjs';
import { tap, map } from 'rxjs/operators';
const numbers = from([1, 2, 3, 4, 5]);
const squaredAndLogged = numbers.pipe(
tap(x => console.log(`Before map: ${x}`)),
map(x => x * x),
tap(x => console.log(`After map: ${x}`))
);
squaredAndLogged.subscribe(result => console.log(result));
Key concepts related to Observables in RxJS
1. Creating an Observable
You can create an Observable using the Observable class constructor:
import { Observable } from 'rxjs';
const customObservable = new Observable(observer => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
customObservable.subscribe(value => console.log(value));
2. Observing Values
Observables emit values over time. You can subscribe to an Observable to receive these values:
import { interval } from 'rxjs';
const numbers = interval(1000);
const subscription = numbers.subscribe(value => console.log(value));
// Unsubscribe after 5 seconds
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
3. Error Handling in Observables
Handling errors in Observables is crucial. You can use the error callback in the subscribe method:
import { Observable } from 'rxjs';
const errorObservable = new Observable(observer => {
try {
throw new Error('This is an error!');
} catch (error) {
observer.error(error);
}
});
errorObservable.subscribe(
value => console.log(value),
error => console.error(`Caught an error: ${error.message}`)
);
4. Completing Observables
Observables can complete, signaling that no more values will be emitted:
import { Observable } from 'rxjs';
const completeObservable = new Observable(observer => {
observer.next('First value');
observer.next('Second value');
observer.complete(); // No more values will be emitted
});
completeObservable.subscribe(
value => console.log(value),
undefined,
() => console.log('Observable completed')
);
5. Combining Observables
You can combine multiple Observables using operators like merge:
import { interval, merge } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const source1 = interval(1000).pipe(mapTo('Source 1'));
const source2 = interval(1500).pipe(mapTo('Source 2'));
const merged = merge(source1, source2);
merged.subscribe(result => console.log(result));