Khám Phá RxSwift: Observables, Subjects và Schedulers cho Lập trình viên iOS

Chào mừng bạn quay trở lại với serie “Lộ trình học Lập trình viên iOS 2025“. Cho đến nay, chúng ta đã cùng nhau khám phá những kiến thức nền tảng quan trọng, từ cơ bản về Swift, quản lý bộ nhớ (ARC), xử lý lỗi (Error Handling), đến làm việc với UI trên UIKit hay SwiftUI, và các phương pháp xử lý bất đồng bộ (GCD hay async/await). Hôm nay, chúng ta sẽ bước chân vào một thế giới mới, một paradigma lập trình mạnh mẽ có thể thay đổi cách bạn viết ứng dụng iOS: Lập trình phản ứng (Reactive Programming), cụ thể là với thư viện RxSwift.

Trong các ứng dụng hiện đại, chúng ta liên tục phải đối mặt với các luồng dữ liệu không đồng bộ: phản hồi từ API, thay đổi trong UI, thông báo từ hệ thống, thao tác của người dùng, v.v. Việc quản lý các sự kiện này một cách mạch lạc, dễ hiểu và ít lỗi có thể trở nên phức tạp với các phương pháp truyền thống (như Callbacks hay Delegates lồng nhau). RxSwift mang đến một giải pháp thanh lịch hơn, cho phép bạn biểu diễn mọi thứ dưới dạng các luồng dữ liệu theo thời gian.

Bài viết này sẽ tập trung vào ba khái niệm cốt lõi của RxSwift: Observables (Luồng quan sát được), Subjects (Đối tượng quan sát được) và Schedulers (Bộ lập lịch). Hiểu rõ ba thành phần này là bước đệm vững chắc để bạn bắt đầu làm việc hiệu quả với RxSwift.

Lập trình Phản ứng (Reactive Programming) là gì?

Nói một cách đơn giản, Lập trình Phản ứng là lập trình với các luồng dữ liệu không đồng bộ. Thay vì tư duy theo hướng “khi sự kiện X xảy ra, làm Y”, bạn sẽ tư duy theo hướng “có một luồng dữ liệu Z, khi có dữ liệu mới trong Z, xử lý nó như thế này”.

Hãy tưởng tượng một bảng tính Excel. Khi bạn thay đổi giá trị của một ô, các ô khác phụ thuộc vào nó sẽ tự động cập nhật. Đó chính là tư duy phản ứng! Các ô phụ thuộc “phản ứng” lại với sự thay đổi của ô gốc.

Trong RxSwift, các luồng dữ liệu này được biểu diễn bởi Observables. Bạn “quan sát” (subscribe) các Observables này và “phản ứng” (react) lại mỗi khi chúng phát ra (emit) dữ liệu hoặc sự kiện.

Observables: Nguồn Phát Dữ liệu

Observable là xương sống của RxSwift. Nó đại diện cho một chuỗi các sự kiện có thể phát ra theo thời gian. Các sự kiện này có thể là giá trị (.next), lỗi (.error), hoặc tín hiệu kết thúc chuỗi (.completed).

  • .next(Element): Phát ra một giá trị mới cho tất cả các subscriber.
  • .error(Error): Phát ra một lỗi. Khi một Observable phát ra sự kiện .error, nó sẽ dừng lại và không phát ra bất kỳ sự kiện nào khác.
  • .completed: Phát ra tín hiệu hoàn thành. Giống như .error, khi một Observable phát ra sự kiện .completed, nó sẽ dừng lại.

Một Observable có thể phát ra 0 hoặc nhiều sự kiện .next, và sau đó kết thúc bằng duy nhất một sự kiện .error hoặc duy nhất một sự kiện .completed. Nó không thể phát ra cả hai.

Tạo Observable

Có nhiều cách để tạo ra một Observable:

1. Từ một giá trị hoặc tập hợp giá trị cố định:


import RxSwift

let observable1 = Observable.just(10) // Phát ra duy nhất 10, sau đó completed
let observable2 = Observable.of(1, 2, 3) // Phát ra 1, 2, 3 theo thứ tự, sau đó completed
let observable3 = Observable.from([4, 5, 6]) // Tương tự .of cho mảng

2. Từ một closure phát sự kiện:


let observable4 = Observable.create { observer in
    observer.onNext("Xin chào")
    observer.onNext("RxSwift")
    observer.onCompleted() // hoặc observer.onError(SomeError())
    return Disposables.create() // Dọn dẹp tài nguyên nếu cần
}

Phương thức create rất linh hoạt, cho phép bạn định nghĩa cách Observable phát ra sự kiện một cách tùy chỉnh.

Quan sát (Subscribe) Observable

Để nhận các sự kiện từ một Observable, bạn cần “quan sát” (subscribe) nó. Khi bạn subscribe, bạn cung cấp các closure để xử lý các sự kiện .next, .error, và .completed.


observable4.subscribe(onNext: { element in
    print("Nhận được: \(element)")
}, onError: { error in
    print("Gặp lỗi: \(error)")
}, onCompleted: {
    print("Hoàn thành!")
})

Khi bạn subscribe một Observable được tạo bằng .just, .of, .from, hoặc .create (trong ví dụ trên), nó là một “Cold Observable”. Điều này có nghĩa là quá trình phát sự kiện chỉ bắt đầu khi có subscriber. Mỗi subscriber sẽ nhận được toàn bộ chuỗi sự kiện độc lập với các subscriber khác.

Quản lý bộ nhớ với DisposeBag

Khi bạn subscribe một Observable, nó sẽ tạo ra một “subscription” (sự đăng ký). Nếu Observable không kết thúc bằng .completed hoặc .error (ví dụ: các sự kiện UI như button tap), subscription này sẽ tồn tại mãi mãi và có thể gây rò rỉ bộ nhớ (memory leak) nếu không được giải phóng.

RxSwift cung cấp DisposeBag để giải quyết vấn đề này. Một DisposeBag là một thùng chứa. Khi bạn thêm một subscription vào DisposeBag, subscription đó sẽ tự động bị hủy (disposed) khi DisposeBag được giải phóng (ví dụ: khi object chứa nó bị deinit).


class MyViewController: UIViewController {
    let disposeBag = DisposeBag() // Tạo một DisposeBag

    override func viewDidLoad() {
        super.viewDidLoad()

        let buttonTapObservable = // ... (Observable từ button tap)

        buttonTapObservable
            .subscribe(onNext: { _ in
                print("Button được nhấn!")
            })
            .disposed(by: disposeBag) // Thêm subscription vào disposeBag
    }
} // Khi MyViewController bị giải phóng, disposeBag cũng được giải phóng, hủy subscription

Luôn nhớ thêm các subscriptions vào DisposeBag, trừ khi bạn cố ý muốn chúng tồn tại suốt vòng đời ứng dụng hoặc có cơ chế hủy khác.

Subjects: Nguồn Phát và Người Quan sát

Trong khi Observables chỉ là nguồn phát (người “đọc” dữ liệu), Subjects là cả nguồn phát và người quan sát (có thể vừa “viết” dữ liệu vào vừa “đọc” dữ liệu ra). Điều này làm cho Subjects trở thành “Hot Observable” – chúng phát ra sự kiện bất kể có subscriber nào hay không, và các subscriber mới chỉ nhận được các sự kiện phát ra sau khi họ subscribe (tùy thuộc vào loại Subject).

Subjects rất hữu ích khi bạn cần bridge giữa code không dùng RxSwift (như code UIKit truyền thống) và thế giới RxSwift, hoặc khi bạn cần nhiều nơi cùng nghe một luồng sự kiện duy nhất.

Có bốn loại Subject chính trong RxSwift (mặc dù Variable đã được thay thế bằng BehaviorRelay):

1. PublishSubject

Một PublishSubject bắt đầu trống rỗng. Nó chỉ phát ra các sự kiện .next được gửi đến nó *sau* khi subscriber đăng ký. Subscriber mới sẽ không nhận được bất kỳ sự kiện nào đã được phát ra trước đó.


let publishSubject = PublishSubject<String>()

// Subscriber 1 đăng ký
publishSubject.subscribe(onNext: { print("Sub 1: \($0)") })
    .disposed(by: disposeBag)

publishSubject.onNext("Sự kiện A") // Cả Sub 1 và Sub 2 (nếu có) đều nhận
publishSubject.onNext("Sự kiện B")

// Subscriber 2 đăng ký sau
publishSubject.subscribe(onNext: { print("Sub 2: \($0)") })
    .disposed(by: disposeBag)

publishSubject.onNext("Sự kiện C") // Cả Sub 1 và Sub 2 đều nhận Sự kiện C

publishSubject.onCompleted() // Cả hai subscriber đều nhận completed

publishSubject.onNext("Sự kiện D") // Không ai nhận, vì đã completed

/* Output:
Sub 1: Sự kiện A
Sub 1: Sự kiện B
Sub 2: Sự kiện C
Sub 1: Sự kiện C
Sub 1: Hoàn thành!
Sub 2: Hoàn thành!
*/

PublishSubject hữu ích cho các sự kiện tức thời như button taps, notifications, hoặc kết quả của một hành động.

2. BehaviorSubject

Một BehaviorSubject cần một giá trị khởi tạo. Khi một subscriber đăng ký, nó sẽ nhận được giá trị *cuối cùng* mà Subject đã phát ra (hoặc giá trị khởi tạo nếu chưa có sự kiện nào được phát ra) và sau đó là các sự kiện mới.


let behaviorSubject = BehaviorSubject<Int>(value: 0) // Giá trị khởi tạo là 0

behaviorSubject.onNext(1) // Subject phát ra 1

// Subscriber 1 đăng ký
behaviorSubject.subscribe(onNext: { print("Sub 1: \($0)") })
    .disposed(by: disposeBag) // Sub 1 nhận 1 (giá trị cuối cùng)

behaviorSubject.onNext(2) // Cả Sub 1 và Sub 2 đều nhận 2

// Subscriber 2 đăng ký sau
behaviorSubject.subscribe(onNext: { print("Sub 2: \($0)") })
    .disposed(by: disposeBag) // Sub 2 nhận 2 (giá trị cuối cùng)

behaviorSubject.onNext(3) // Cả Sub 1 và Sub 2 đều nhận 3

/* Output:
Sub 1: 1
Sub 1: 2
Sub 2: 2
Sub 1: 3
Sub 2: 3
*/

BehaviorSubject thường được dùng để biểu diễn trạng thái (state) – ví dụ: giá trị hiện tại của một text field, trạng thái loading, hay dữ liệu user đang hiển thị. Bạn có thể lấy giá trị hiện tại của BehaviorSubject bằng thuộc tính value() (có thể throw error nếu Subject đã bị lỗi/completed).

3. ReplaySubject

Một ReplaySubject có khả năng buffer (lưu trữ) một số lượng sự kiện .next nhất định. Khi một subscriber mới đăng ký, nó sẽ nhận được *tất cả* các sự kiện được lưu trong buffer, sau đó là các sự kiện mới được phát ra.


// ReplaySubject lưu trữ 2 sự kiện cuối cùng
let replaySubject = ReplaySubject<String>.create(bufferSize: 2)

replaySubject.onNext("A")
replaySubject.onNext("B")
replaySubject.onNext("C") // "A" bị đẩy ra khỏi buffer

// Subscriber 1 đăng ký
replaySubject.subscribe(onNext: { print("Sub 1: \($0)") })
    .disposed(by: disposeBag) // Sub 1 nhận B, C

replaySubject.onNext("D") // Cả Sub 1 và Sub 2 nhận D

// Subscriber 2 đăng ký sau
replaySubject.subscribe(onNext: { print("Sub 2: \($0)") })
    .disposed(by: disposeBag) // Sub 2 nhận C, D (buffer hiện tại là C, D)

/* Output:
Sub 1: B
Sub 1: C
Sub 1: D
Sub 2: C
Sub 2: D
*/

ReplaySubject hữu ích khi bạn muốn các subscriber mới luôn nhận được một lịch sử các sự kiện gần đây, ví dụ: các thông báo gần nhất hoặc các giá trị dữ liệu quan trọng.

4. BehaviorRelay

BehaviorRelay là một wrapper tiện lợi xung quanh BehaviorSubject. Điểm khác biệt chính là BehaviorRelay không bao giờ phát ra sự kiện .error hoặc .completed. Điều này làm cho nó rất phù hợp để biểu diễn trạng thái của UI, nơi bạn thường không muốn luồng dữ liệu bị dừng đột ngột bởi lỗi hoặc hoàn thành.

Bạn không gọi onNext cho BehaviorRelay, thay vào đó bạn sử dụng thuộc tính accept() để gửi giá trị mới.


import RxCocoa // BehaviorRelay thuộc RxCocoa

let behaviorRelay = BehaviorRelay<String>(value: "Giá trị ban đầu")

// Subscriber 1 đăng ký
behaviorRelay.asObservable() // Cần chuyển đổi sang Observable để subscribe
    .subscribe(onNext: { print("Relay Sub 1: \($0)") })
    .disposed(by: disposeBag) // Sub 1 nhận "Giá trị ban đầu"

behaviorRelay.accept("Giá trị mới A") // Sub 1 nhận "Giá trị mới A"

// Subscriber 2 đăng ký
behaviorRelay.asObservable()
    .subscribe(onNext: { print("Relay Sub 2: \($0)") })
    .disposed(by: disposeBag) // Sub 2 nhận "Giá trị mới A"

behaviorRelay.accept("Giá trị mới B") // Cả Sub 1 và Sub 2 nhận "Giá trị mới B"

// behaviorRelay không có onCompleted() hay onError()

/* Output:
Relay Sub 1: Giá trị ban đầu
Relay Sub 1: Giá trị mới A
Relay Sub 2: Giá trị mới A
Relay Sub 1: Giá trị mới B
Relay Sub 2: Giá trị mới B
*/

BehaviorRelay là lựa chọn ưu tiên cho các thuộc tính UI hoặc trạng thái đơn giản trong ViewModel khi sử dụng các kiến trúc như MVVM (Lựa Chọn Kiến Trúc iOS: MVC vs MVVM vs VIPER vs TCA) và làm việc với Combine (dù Combine là framework khác, tư duy reactive tương đồng).

Bảng so sánh các loại Subject

Đây là bảng tóm tắt nhanh sự khác biệt giữa các loại Subject chính:

Loại Subject Giá trị khởi tạo? Subscriber mới nhận gì? Có thể Error/Complete? Dùng cho trường hợp nào?
PublishSubject Không Chỉ các sự kiện phát ra SAU khi subscribe Sự kiện tức thời (button tap, notifications)
BehaviorSubject Có (hoặc giá trị cuối cùng) Giá trị cuối cùng (hoặc khởi tạo) VÀ các sự kiện mới Biểu diễn trạng thái có giá trị ban đầu
ReplaySubject Không Các sự kiện trong buffer VÀ các sự kiện mới Cần lịch sử các sự kiện gần đây
BehaviorRelay Có (bắt buộc) Giá trị cuối cùng (hoặc khởi tạo) VÀ các sự kiện mới Không (không bao giờ error/complete) Biểu diễn trạng thái UI đơn giản, không cần xử lý lỗi/hoàn thành

Schedulers: Quản lý Luồng Thực thi

Trong các ứng dụng, chúng ta thường thực hiện các tác vụ tốn thời gian (như network requests, xử lý dữ liệu) ở background để không làm đơ UI. RxSwift cần một cách để xác định các công việc trong luồng Observable sẽ được thực thi trên thread (luồng) nào. Đây là lúc Schedulers phát huy tác dụng.

Scheduler tương đương với khái niệm Dispatch Queue trong GCD (Đa luồng trong Swift: GCD hay async/await?) hoặc Actors/Tasks trong Async/Await. Chúng quyết định xem các công việc sẽ được thực hiện nối tiếp hay song song, trên main thread hay background thread.

Hai phương thức Operator phổ biến nhất liên quan đến Schedulers là subscribeOnobserveOn.

  • subscribeOn(scheduler): Chỉ định Scheduler mà Observable sẽ *bắt đầu* công việc của nó. Điều này ảnh hưởng đến nơi mà subscription được tạo ra và các tác vụ khởi tạo của Observable được thực hiện. Vị trí của subscribeOn trong chuỗi operators không quan trọng, nó chỉ ảnh hưởng đến điểm bắt đầu.
  • observeOn(scheduler): Chỉ định Scheduler mà *các sự kiện* (.next, .error, .completed) sẽ được gửi đến subscriber. Điều này ảnh hưởng đến nơi mà các operators tiếp theo trong chuỗi (và cuối cùng là subscriber) sẽ xử lý dữ liệu. Vị trí của observeOn rất quan trọng, nó thay đổi Scheduler cho tất cả các operators và subscriber sau nó.

Các Scheduler phổ biến:

  • MainScheduler.instance: Scheduler cho main thread. Bắt buộc phải sử dụng khi cập nhật UI.
  • ConcurrentDispatchQueueScheduler(qos: .background): Tạo một Scheduler dựa trên một Grand Central Dispatch (GCD) queue chạy song song ở background. Thường dùng cho các tác vụ tốn thời gian không liên quan đến UI.
  • OperationQueueScheduler(operationQueue: operationQueue): Tạo một Scheduler dựa trên một Operation Queue tùy chỉnh.
  • ConcurrentUtilConcurrentQueueScheduler: Một scheduler dùng internal của RxSwift.
  • SerialDispatchQueueScheduler: Scheduler dựa trên GCD queue chạy nối tiếp.

Ví dụ sử dụng Schedulers:


let dataFetchObservable = Observable<String>.create { observer in
    print("Bắt đầu fetch dữ liệu trên thread: \(Thread.current)")
    // Mô phỏng tác vụ tốn thời gian ở background
    sleep(2)
    observer.onNext("Dữ liệu đã về!")
    observer.onCompleted()
    return Disposables.create()
}

dataFetchObservable
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background)) // Bắt đầu fetch trên background thread
    .observeOn(MainScheduler.instance) // Nhận kết quả (onNext) trên main thread
    .subscribe(onNext: { data in
        print("Nhận được dữ liệu trên thread: \(Thread.current)")
        // Cập nhật UI ở đây
        // myLabel.text = data
    }, onError: { error in
        print("Lỗi: \(error) trên thread: \(Thread.current)")
    }, onCompleted: {
        print("Fetch hoàn thành trên thread: \(Thread.current)")
    })
    .disposed(by: disposeBag)

/* Output (các dòng print sẽ hiển thị luồng thực thi):
Bắt đầu fetch dữ liệu trên thread: <NSThread: 0x...>{number = 3, name = (null)} // Background thread
Nhận được dữ liệu trên thread: <NSThread: 0x...>{number = 1, name = main} // Main thread
Fetch hoàn thành trên thread: <NSThread: 0x...>{number = 1, name = main} // Main thread
*/

Trong ví dụ này, tác vụ mô phỏng fetch dữ liệu chạy trên một background thread nhờ subscribeOn. Tuy nhiên, kết quả .next.completed được đưa về main thread bởi observeOn, đảm bảo rằng việc cập nhật UI (nếu có) là an toàn và hiệu quả.

Tại sao nên học và sử dụng RxSwift?

Học RxSwift đòi hỏi một sự thay đổi trong tư duy lập trình, nhưng những lợi ích mà nó mang lại là đáng kể:

  • Quản lý sự kiện phức tạp dễ dàng hơn: Biểu diễn mọi thứ dưới dạng luồng giúp xâu chuỗi các thao tác không đồng bộ trở nên mạch lạc và dễ đọc.
  • Giảm boilerplate code: Các pattern phổ biến (như Debounce cho search bar) có thể được triển khai chỉ với một vài dòng code sử dụng operators.
  • Kết hợp tốt với MVVM: RxSwift rất phù hợp với kiến trúc MVVM, giúp binding dữ liệu giữa ViewModel và View trở nên đơn giản và reactive. (Tham khảo lại bài MVVM với Combine – nguyên lý tương tự áp dụng cho RxSwift).
  • Xử lý lỗi và multi-threading thanh lịch: Schedulers và cách Observable xử lý lỗi giúp quản lý luồng thực thi và error handling hiệu quả hơn.

Bắt đầu với RxSwift trong dự án của bạn

Để sử dụng RxSwift, bạn cần thêm nó vào project của mình. Các cách phổ biến nhất là:

  • Swift Package Manager (SPM): Đây là cách được khuyến khích hiện nay. Trong Xcode, vào File > Add Packages… và tìm kiếm “RxSwift”.
  • CocoaPods: Thêm pod 'RxSwift'pod 'RxCocoa' vào Podfile, chạy pod install.
  • Carthage: Thêm github "ReactiveX/RxSwift" vào Cartfile, chạy carthage update.

RxCocoa là một thư viện đi kèm, cung cấp các reactive extensions cho các thành phần UI của Apple (như UIButton, UITextField, UITableView). Nó làm cho việc binding UI với RxSwift trở nên cực kỳ tiện lợi.

Kết luận

Observables, Subjects và Schedulers là ba trụ cột fundamental của RxSwift. Observables là nguồn phát dữ liệu, Subjects là cầu nối giữa thế giới reactive và non-reactive (hoặc giữa nhiều subscribers), còn Schedulers giúp bạn kiểm soát luồng thực thi trên các threads khác nhau.

Việc làm chủ ba khái niệm này là bước khởi đầu quan trọng trên con đường khám phá Lập trình Phản ứng với RxSwift. Có thể bạn sẽ thấy hơi bỡ ngỡ lúc đầu, nhưng đừng nản lòng. Hãy thực hành, thử nghiệm với các ví dụ đơn giản, và dần dần bạn sẽ thấy sức mạnh và sự thanh lịch mà RxSwift mang lại.

Đây mới chỉ là bề nổi của tảng băng chìm. Thế giới của RxSwift còn rộng lớn với hàng trăm operators mạnh mẽ cho phép bạn biến đổi, kết hợp, lọc, và xử lý các luồng dữ liệu theo vô vàn cách. Chúng ta sẽ cùng nhau khám phá các operators này và cách tích hợp RxSwift vào các project iOS thực tế trong các bài viết tiếp theo của serie “Lộ trình học Lập trình viên iOS 2025“.

Chúc bạn học tốt và hẹn gặp lại trong các bài viết tiếp theo!

Chỉ mục