RxJS (Reactive Extensions for JavaScript)

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables. It enables you to work with asynchronous data streams and events. Observables are the core building blocks, representing sequences of values over time.

More information can be found at https://rxjs.dev

Basic Concepts

Observable: Represents a stream of values or events that can be observed.

Observer: Listens to the values emitted by an Observable.

Operators: Transform, filter, or combine Observables to create new streams.

Error Handling

RxJS allows you to handle errors using the catchError operator:

import { of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const observableWithError = of('Hello', 'World').pipe(
  map(() => {
    throw new Error('This is an error!');
  }),
  catchError(error => of(`Caught an error: ${error.message}`))
);

observableWithError.subscribe(value => console.log(value));

Common RxJS operators:

1. Map Operator

The map operator transforms each item emitted by an Observable:

import { from } from 'rxjs';
import { map } from 'rxjs/operators';

const numbers = from([1, 2, 3, 4, 5]);

const doubled = numbers.pipe(
  map(x => x * 2)
);

doubled.subscribe(result => console.log(result));

2. Filter Operator

The filter operator allows you to selectively emit items based on a condition:

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

const numbers = from([1, 2, 3, 4, 5]);

const evens = numbers.pipe(
  filter(x => x % 2 === 0)
);

evens.subscribe(result => console.log(result));

3. Merge Operator

The merge operator combines multiple Observables into a single Observable:

import { interval, merge } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const source1 = interval(1000).pipe(mapTo('Source 1'));
const source2 = interval(1500).pipe(mapTo('Source 2'));

const merged = merge(source1, source2);

merged.subscribe(result => console.log(result));

4. Concat Operator

The concat operator concatenates multiple Observables, emitting values in sequence:

import { interval, concat } from 'rxjs';
import { take } from 'rxjs/operators';

const source1 = interval(1000).pipe(take(3));
const source2 = interval(1500).pipe(take(3));

const concatenated = concat(source1, source2);

concatenated.subscribe(result => console.log(result));

5. Tap Operator

The tap operator allows you to perform side effects without affecting the emitted values:

import { from } from 'rxjs';
import { tap, map } from 'rxjs/operators';

const numbers = from([1, 2, 3, 4, 5]);

const squaredAndLogged = numbers.pipe(
  tap(x => console.log(`Before map: ${x}`)),
  map(x => x * x),
  tap(x => console.log(`After map: ${x}`))
);

squaredAndLogged.subscribe(result => console.log(result));

1. Creating an Observable

You can create an Observable using the Observable class constructor:

import { Observable } from 'rxjs';

const customObservable = new Observable(observer => {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

customObservable.subscribe(value => console.log(value));

2. Observing Values

Observables emit values over time. You can subscribe to an Observable to receive these values:

import { interval } from 'rxjs';

const numbers = interval(1000);

const subscription = numbers.subscribe(value => console.log(value));

// Unsubscribe after 5 seconds
setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

3. Error Handling in Observables

Handling errors in Observables is crucial. You can use the error callback in the subscribe method:

import { Observable } from 'rxjs';

const errorObservable = new Observable(observer => {
  try {
    throw new Error('This is an error!');
  } catch (error) {
    observer.error(error);
  }
});

errorObservable.subscribe(
  value => console.log(value),
  error => console.error(`Caught an error: ${error.message}`)
);

4. Completing Observables

Observables can complete, signaling that no more values will be emitted:

import { Observable } from 'rxjs';

const completeObservable = new Observable(observer => {
  observer.next('First value');
  observer.next('Second value');
  observer.complete(); // No more values will be emitted
});

completeObservable.subscribe(
  value => console.log(value),
  undefined,
  () => console.log('Observable completed')
);

5. Combining Observables

You can combine multiple Observables using operators like merge:

import { interval, merge } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const source1 = interval(1000).pipe(mapTo('Source 1'));
const source2 = interval(1500).pipe(mapTo('Source 2'));

const merged = merge(source1, source2);

merged.subscribe(result => console.log(result));