RxJS Complete Guide | Reactive Programming with Observables
이 글의 핵심
RxJS is a library for reactive programming using Observables. It makes composing asynchronous or callback-based code easier with a powerful operator-based approach.
Introduction
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.
The Problem
Traditional async handling:
// Callbacks
setTimeout(() => {
getData((data) => {
processData(data, (result) => {
console.log(result); // Callback hell
});
});
}, 1000);
// Promises (better, but limited)
fetch('/api/data')
.then(res => res.json())
.then(data => console.log(data));
// Can't cancel, only handles single values
The Solution
With RxJS:
import { fromEvent, debounceTime, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Search as user types
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
switchMap(query => ajax.getJSON(`/api/search?q=${query}`))
).subscribe(results => console.log(results));
Benefits:
- Cancellable
- Composable with operators
- Handles multiple values
- Built-in error handling
1. Installation
npm install rxjs
2. Core Concepts
Observable
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
observable.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
complete: () => console.log('Done'),
});
// Output: 1, 2, 3, Done
Creating Observables
import { of, from, interval, fromEvent } from 'rxjs';
// Emit values
of(1, 2, 3).subscribe(console.log);
// From array
from([1, 2, 3]).subscribe(console.log);
// From promise
from(fetch('/api/data')).subscribe(console.log);
// Interval
interval(1000).subscribe(count => console.log(count));
// DOM events
fromEvent(button, 'click').subscribe(e => console.log('Clicked'));
3. Operators
map
import { of, map } from 'rxjs';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log);
// Output: 2, 4, 6
filter
import { of, filter } from 'rxjs';
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log);
// Output: 2, 4
tap (Side Effects)
import { of, tap, map } from 'rxjs';
of(1, 2, 3).pipe(
tap(x => console.log('Before:', x)),
map(x => x * 2),
tap(x => console.log('After:', x))
).subscribe();
4. Combination Operators
merge
import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';
const first$ = interval(1000).pipe(map(() => 'First'));
const second$ = interval(1500).pipe(map(() => 'Second'));
merge(first$, second$).subscribe(console.log);
// Output: First, Second, First, First, Second, ...
combineLatest
import { combineLatest, of } from 'rxjs';
const age$ = of(30);
const name$ = of('John');
combineLatest([age$, name$]).subscribe(([age, name]) => {
console.log(`${name} is ${age} years old`);
});
zip
import { zip, of } from 'rxjs';
const age$ = of(30, 31, 32);
const name$ = of('John', 'Jane');
zip(age$, name$).subscribe(([age, name]) => {
console.log(`${name}: ${age}`);
});
// Output: John: 30, Jane: 31
5. Transformation Operators
switchMap
import { fromEvent, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Cancel previous request when new search happens
fromEvent(input, 'input').pipe(
switchMap(e => ajax.getJSON(`/api/search?q=${e.target.value}`))
).subscribe(results => console.log(results));
mergeMap
import { of, mergeMap, delay } from 'rxjs';
of(1, 2, 3).pipe(
mergeMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// All emit after 1s (concurrent)
concatMap
import { of, concatMap, delay } from 'rxjs';
of(1, 2, 3).pipe(
concatMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// Emit one by one (sequential)
exhaustMap
import { fromEvent, exhaustMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Ignore clicks while request is pending
fromEvent(button, 'click').pipe(
exhaustMap(() => ajax.getJSON('/api/data'))
).subscribe(console.log);
6. Time-based Operators
debounceTime
import { fromEvent, debounceTime, map } from 'rxjs';
// Wait 300ms after last keystroke
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value)
).subscribe(console.log);
throttleTime
import { fromEvent, throttleTime } from 'rxjs';
// Emit at most once per 1000ms
fromEvent(button, 'click').pipe(
throttleTime(1000)
).subscribe(() => console.log('Clicked'));
delay
import { of, delay } from 'rxjs';
of('Hello').pipe(
delay(2000)
).subscribe(console.log);
// Output after 2 seconds
7. Error Handling
catchError
import { of, throwError, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
ajax.getJSON('/api/data').pipe(
catchError(error => {
console.error('Error:', error);
return of({ error: true }); // Fallback value
})
).subscribe(console.log);
retry
import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';
ajax.getJSON('/api/data').pipe(
retry(3) // Retry 3 times on error
).subscribe({
next: console.log,
error: err => console.error('Failed after 3 retries')
});
retryWhen
import { ajax } from 'rxjs/ajax';
import { retryWhen, delay, take } from 'rxjs/operators';
ajax.getJSON('/api/data').pipe(
retryWhen(errors => errors.pipe(
delay(1000), // Wait 1s between retries
take(3) // Max 3 retries
))
).subscribe(console.log);
8. Subjects
Subject
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(x => console.log('A:', x));
subject.subscribe(x => console.log('B:', x));
subject.next(1);
subject.next(2);
// Output: A: 1, B: 1, A: 2, B: 2
BehaviorSubject
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // Initial value
subject.subscribe(x => console.log('A:', x)); // Gets 0 immediately
subject.next(1);
subject.next(2);
subject.subscribe(x => console.log('B:', x)); // Gets 2 immediately
ReplaySubject
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(2); // Remember last 2 values
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(x => console.log(x));
// Output: 2, 3 (last 2 values)
9. Real-World Examples
Autocomplete Search
import { fromEvent, debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const input = document.querySelector('#search');
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
distinctUntilChanged(),
switchMap(query =>
ajax.getJSON(`/api/search?q=${query}`).pipe(
catchError(() => of([]))
)
)
).subscribe(results => {
displayResults(results);
});
Infinite Scroll
import { fromEvent, map, filter, throttleTime } from 'rxjs';
fromEvent(window, 'scroll').pipe(
throttleTime(200),
map(() => ({
scrollHeight: document.documentElement.scrollHeight,
scrollTop: document.documentElement.scrollTop,
clientHeight: document.documentElement.clientHeight
})),
filter(({ scrollHeight, scrollTop, clientHeight }) =>
scrollHeight - scrollTop - clientHeight < 100
)
).subscribe(() => {
loadMoreData();
});
WebSocket with Auto-Reconnect
import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';
const socket$ = webSocket('ws://localhost:8080').pipe(
retryWhen(errors => errors.pipe(
tap(() => console.log('Reconnecting...')),
delay(1000)
))
);
socket$.subscribe(
msg => console.log('Message:', msg),
err => console.error('Error:', err),
() => console.log('Complete')
);
// Send message
socket$.next({ type: 'ping' });
Drag and Drop
import { fromEvent, takeUntil, map } from 'rxjs';
const target = document.querySelector('#draggable');
const mouseDown$ = fromEvent(target, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');
mouseDown$.pipe(
switchMap(() => mouseMove$.pipe(
map(e => ({ x: e.clientX, y: e.clientY })),
takeUntil(mouseUp$)
))
).subscribe(pos => {
target.style.left = pos.x + 'px';
target.style.top = pos.y + 'px';
});
10. Best Practices
1. Always Unsubscribe
// Good
const subscription = observable.subscribe(console.log);
// Later...
subscription.unsubscribe();
// Better: use takeUntil
const destroy$ = new Subject();
observable.pipe(
takeUntil(destroy$)
).subscribe(console.log);
// On cleanup
destroy$.next();
destroy$.complete();
2. Use shareReplay for Expensive Operations
import { ajax } from 'rxjs/ajax';
import { shareReplay } from 'rxjs/operators';
const data$ = ajax.getJSON('/api/data').pipe(
shareReplay(1) // Cache last value
);
// Multiple subscribers share same request
data$.subscribe(console.log);
data$.subscribe(console.log);
3. Avoid Nested Subscriptions
// Bad
observable1.subscribe(val1 => {
observable2.subscribe(val2 => {
// Nested!
});
});
// Good
observable1.pipe(
switchMap(val1 => observable2)
).subscribe(val2 => {
// Flat!
});
11. Angular Integration
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-users',
template: `<div *ngFor="let user of users">{{ user.name }}</div>`
})
export class UsersComponent implements OnInit, OnDestroy {
users: User[] = [];
private destroy$ = new Subject<void>();
constructor(private http: HttpClient) {}
ngOnInit() {
this.http.get<User[]>('/api/users').pipe(
takeUntil(this.destroy$)
).subscribe(users => {
this.users = users;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Summary
RxJS enables powerful reactive programming:
- Observables for async data streams
- Operators for transforming data
- Error handling with retry and catchError
- Cancellation with unsubscribe
- Composition over nested callbacks
Key Takeaways:
- Use
switchMapfor HTTP requests (cancels previous) - Use
debounceTimefor search inputs - Always unsubscribe or use
takeUntil - Use
shareReplayto avoid duplicate requests - Avoid nested subscriptions
Next Steps:
- Learn JavaScript Async
- Try TypeScript Generics
- Build with Angular
Resources: