Học RxJS Trực Quan

Từ người mới bắt đầu đến chuyên gia lập trình phản ứng

Khóa học trực quan nhất về Reactive Programming

1. Chào mừng đến với RxJS

RxJS là gì?

RxJS (Reactive Extensions for JavaScript) là một thư viện giúp lập trình với các luồng dữ liệu bất đồng bộ. Hãy tưởng tượng bạn đang xử lý mọi thứ, từ sự kiện click chuột, nhập liệu, đến dữ liệu từ API, như những dòng sông (streams) chảy liên tục. RxJS cung cấp cho bạn các công cụ mạnh mẽ để điều khiển, biến đổi và kết hợp những dòng sông này.

💡 Phép so sánh: Dây chuyền sản xuất

Hãy xem một Observable như một dây chuyền sản xuất. Nó đưa ra các "sản phẩm thô" (dữ liệu). Các Operators (như map, filter) là các cỗ máy trên dây chuyền, mỗi máy thực hiện một công đoạn xử lý. Cuối cùng, Observer (người đăng ký) là người nhận "thành phẩm" (dữ liệu đã qua xử lý).

Observable đầu tiên của bạn

Đây là một Observable đơn giản nhất, sử dụng hàm of để tạo ra một luồng dữ liệu phát ra một vài giá trị rồi kết thúc.

Luồng dữ liệu (Source):
Log:

2. Các Operators Cơ Bản

map(fn) - Biến đổi

Biến đổi mỗi giá trị được phát ra bởi luồng nguồn. Ví dụ: nhân mỗi số với 10.

Input:
x * 10
Output:
Log:

filter(fn) - Lọc

Chỉ cho phép các giá trị thỏa mãn một điều kiện đi qua. Ví dụ: chỉ lấy các số chẵn.

Input:
x % 2 === 0
Output:
Log:

tap(fn) - Hành động phụ

Thực hiện một hành động phụ (ví dụ: logging) với mỗi giá trị mà không làm thay đổi luồng.

Input:
LOG
Output:
Log:

scan(fn, seed) - Tích lũy

Tích lũy giá trị theo thời gian, tương tự reduce của Array nhưng phát ra mỗi giá trị trung gian.

Input:
TÍNH TỔNG
Output:
Log:

3. Operators Chuyển Đổi (Flattening)

Các operators này xử lý "Observable của Observable" (luồng lồng trong luồng). Tình huống phổ biến là khi một sự kiện (như click) kích hoạt một tác vụ bất đồng bộ khác (như gọi API).

mergeMap - Hợp nhất luồng

Tạo và đăng ký vào một luồng con cho mỗi giá trị từ luồng cha. Tất cả các luồng con chạy song song và kết quả được hợp nhất.

Input (Clicks):
Tạo & Hợp nhất luồng con
Output:
Log:
Trường hợp sử dụng: Khi bạn cần thực hiện nhiều tác vụ bất đồng bộ song song và không quan tâm đến thứ tự hoàn thành của chúng (ví dụ: tải nhiều ảnh cùng lúc).

switchMap - Chuyển luồng Khi có giá trị mới từ nguồn, `switchMap` sẽ hủy (unsubscribe) luồng con (inner observable) trước đó và chuyển sang luồng con mới. Rất hữu ích để tránh các tác vụ lỗi thời.

Giống mergeMap, nhưng khi luồng cha phát giá trị mới, nó sẽ hủy luồng con trước đó và chuyển sang luồng con mới.

Input (Clicks):
Hủy luồng cũ & Chuyển luồng mới
Output:
Log:
Trường hợp sử dụng: Cực kỳ hữu ích cho các tính năng như gợi ý tìm kiếm (typeahead), khi bạn chỉ quan tâm đến kết quả của lần gõ cuối cùng và muốn hủy các yêu cầu API cũ.

concatMap - Nối luồng

Chạy các luồng con một cách tuần tự. Luồng con tiếp theo chỉ bắt đầu khi luồng con trước đó đã hoàn thành. Đảm bảo thứ tự.

Input (Clicks):
Xếp hàng & Chạy tuần tự
Output:
Log:
Trường hợp sử dụng: Khi thứ tự thực hiện là quan trọng. Ví dụ: một chuỗi các yêu cầu API đọc-ghi mà yêu cầu sau phụ thuộc vào kết quả của yêu cầu trước.

exhaustMap - Bỏ qua luồng

Bỏ qua các giá trị mới từ luồng cha trong khi một luồng con đang hoạt động. Hữu ích để chống click-spam.

Input (Clicks):
Bỏ qua click khi đang bận
Output:
Log:
Trường hợp sử dụng: Hoàn hảo cho các nút bấm chỉ nên được xử lý một lần, như nút "Login" hoặc "Submit", để ngăn người dùng gửi nhiều yêu cầu bằng cách click liên tục.

4. Operators Kết Hợp (Combination)

Các operators này cho phép bạn kết hợp nhiều luồng dữ liệu lại với nhau theo những cách khác nhau.

combineLatest - Kết hợp giá trị mới nhất

Khi bất kỳ luồng nào phát, nó sẽ phát ra một mảng chứa giá trị mới nhất từ TẤT CẢ các luồng. Sẽ không phát gì cho đến khi mọi luồng đã phát ít nhất một lần.

Stream A:
Stream B:
Kết hợp
Output:
Log:

forkJoin - Chờ tất cả hoàn thành

Tương tự Promise.all. Chờ cho tất cả các luồng nguồn hoàn thành, sau đó phát ra một mảng chứa giá trị CUỐI CÙNG từ mỗi luồng.

API 1 (1.5s):
API 2 (2.5s):
Chờ
Output:
Log:

5. Utility Operators

Đây là tập hợp các toán tử tiện ích mạnh mẽ để điều khiển luồng dữ liệu, đặc biệt hữu ích trong việc xử lý tương tác người dùng hoặc các nguồn phát dữ liệu dày đặc.

So sánh debounceTimethrottleTime

Click vào nút bên dưới thật nhanh và nhiều lần để xem sự khác biệt. Debounce chỉ phát giá trị sau khi có một khoảng lặng. Throttle phát giá trị đầu tiên, sau đó chặn trong một khoảng thời gian.

Input (Clicks):
Debounce Output
Throttle Output
Log:

bufferTime(2000)

Gom các giá trị từ luồng nguồn vào một bộ đệm và phát ra bộ đệm này sau mỗi 2 giây.

Input (Clicks):
Gom mỗi 2s
Output (mảng giá trị):
Log:

pairwise()

Nhóm các giá trị liên tiếp thành từng cặp `[trước, sau]` và phát ra.

Input:
Ghép cặp
Output ([trước, sau]):
Log:

withLatestFrom(other$)

Khi luồng chính (Clicks) phát, nó sẽ kết hợp giá trị của mình với giá trị mới nhất từ luồng phụ (Timer). Luồng phụ chạy ngầm.

Luồng chính (Clicks):
Luồng phụ (Timer mỗi 1.5s):
Kết hợp
Output ([click, timer]):
Log:

6. Operators Hữu Ích Khác

take(count) - Giới Hạn Số Lượng

Chỉ lấy một số lượng giá trị xác định từ luồng dữ liệu.

Input:
TAKE(3)
Output:
Log:

zip(obs1,...) - Ghép Cặp

Ghép cặp các giá trị từ nhiều luồng theo thứ tự xuất hiện.

Stream A:
Stream B:
ZIP
Output (Ghép cặp):
Log:

catchError(fn) - Xử Lý Lỗi

Bắt và xử lý lỗi trong luồng dữ liệu, trả về một giá trị thay thế hoặc luồng mới.

Input:
CATCHERROR (trả về 'Đã Fix')
Output:
Log:

startWith(...values) - Bắt Đầu Với Giá Trị

Thêm các giá trị vào đầu luồng dữ liệu trước khi luồng chính bắt đầu phát giá trị.

Stream gốc:
STARTWITH(-1,0)
Output:
Log:

7. Subjects - Đa Hướng Dữ Liệu

Subject là một dạng đặc biệt của Observable cho phép đa hướng dữ liệu - nhiều Observer có thể subscribe vào cùng một Subject. Subject cũng là một Observer, có thể nhận dữ liệu thông qua các phương thức next(), error() và complete().

Subject

Subject cơ bản - Observer chỉ nhận các giá trị phát ra sau khi subscribe.

SUBJECT
Log Subject:
Subscriber 1 nhận:
Subscriber 2 nhận:

BehaviorSubject

Lưu trữ giá trị hiện tại và phát nó cho các subscriber mới ngay khi họ subscribe.

BEHAVIOR_SUBJECT (khởi tạo với 'Khánh')
Log BehaviorSubject:
Subscriber 1 nhận:
Subscriber 2 nhận:

ReplaySubject

Lưu trữ một số lượng giá trị đã phát trước đó và phát lại cho các subscriber mới.

REPLAY_SUBJECT (nhớ 2 giá trị)
Log ReplaySubject:
Subscriber 1 nhận:
Subscriber 2 nhận:

8. Ví Dụ Thực Tế

8.1. Gợi ý tìm kiếm (Typeahead)

Một ứng dụng kinh điển của RxJS: đợi người dùng ngừng gõ, sau đó mới thực hiện tìm kiếm để tránh các request không cần thiết.

Stream gõ phím:
Xử lý tìm kiếm
Kết quả từ API (giả):
Log:

8.2. Thanh tiến trình (Progress Bar)

Sử dụng RxJS để điều khiển một thanh tiến trình, rất hữu ích cho việc hiển thị trạng thái tải file hoặc các tác vụ kéo dài.

Log:

8.3. Xác thực biểu mẫu (Real-time Form Validation)

Sử dụng combineLatest để tổng hợp trạng thái hợp lệ của nhiều trường và chỉ cho phép gửi khi tất cả đều OK. Tên người dùng "admin" hoặc "root" sẽ bị báo là đã tồn tại (kiểm tra bất đồng bộ).

Vui lòng điền đúng thông tin.
Log:

9. Ví Dụ Khó Nhằn

9.1. Tìm Kiếm Nâng Cao: Loading & Xử Lý Lỗi

Tìm kiếm thông minh với hiệu ứng loading, xử lý lỗi và gợi ý tự động.

Stream Gõ Phím:
debounceTime → distinctUntilChanged → switchMap (API call + startWith + catchError)
Kết quả tìm kiếm:
Log:

9.2. Kéo Thả Bằng RxJS

Triển khai tính năng kéo thả sử dụng các Observable để xử lý sự kiện.

KÉO TÔI
Thả vào đây!
Log:

9.3. Lấy Dữ Liệu Định Kỳ (Polling)

Tự động lấy dữ liệu mới từ server theo chu kỳ với khả năng dừng lại.

Trạng thái: Chưa chạy
Dữ liệu nhận được:
Log:

10. Ví Dụ Siêu Khó Nhằn

10.1. Gợi Ý Từ Nhiều Nguồn API

Gõ vào ô tìm kiếm, hệ thống sẽ gọi đồng thời nhiều API khác nhau và kết hợp kết quả.

Stream Gõ Phím:
debounceTime → distinctUntilChanged → filter → switchMap (forkJoin [API1, API2])
API 1 Response:
API 2 Response:
Kết quả gộp (API1 + API2):
Log:

10.2. Chuỗi Tác Vụ Tuần Tự & Thử Lại

Thực hiện một chuỗi tác vụ tuần tự, mỗi tác vụ có thể thử lại nếu gặp lỗi.

Tiến Trình Tác Vụ:
Log:

10.3. Cuộn Vô Tận (Infinite Scroll)

Tự động tải thêm dữ liệu khi người dùng cuộn đến cuối trang.

Log:

11. Ứng Dụng Nâng Cao

11.1. Tự Động Lưu Form

Tự động lưu nội dung khi người dùng ngừng gõ, hiển thị trạng thái lưu.

Chưa có gì để lưu.
Log:

11.2. WebSocket với Tự Động Kết Nối Lại

Kết nối đến một server WebSocket, gửi tin nhắn, và tự động kết nối lại khi mất kết nối.

Trạng thái: Đã ngắt kết nối.
Log Tin Nhắn: