Rxjava2 trong Android

Share1
Tweet
Share
Pin
1 Shares
Post Views: 461

Tìm hiểu về RxAndroid phần 1 :Creating Observables

Bài viết
By Kiên Nguyên Trung
0 Comments

RxJava/Android là một thư viện để soạn các program không đồng bộ ,dựa trên sự kiện bằng cách sử dụng các Observable. Đây là 1 thư viện hỗ trợ mạnh mẽ về các vấn đề như xử lí thread, thread-safety, đồng bộ hoá, non-blocking I/O.

tại sao nên dùng RxAndroid?

Trước tiên để bàn về vấn đề liệu RxAndroid có thực sự tốt hơn ? có nên dùng nó? chúng ta hãy nhìn lại 1 chút về bài toán xử lí 1 tác vụ nào đó ở background và sau đó update kết quả lên UI. và cách xử lí mà được nhiều người lựa chọn sử dụng từ trước đến nay: AsyncTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DownloadImage extends AsyncTask<String, Void, Bitmap> {
protected User doInBackground(String... strings) {
try {
Bitmap bm = downloadImageFromUrl(strings[0]);
return bm;
} catch (Exception e) {
}
}
protected void onPostExecute(Bitmap bm) {
super.onPostExecute(bm);
updateImage(bm);
}
}

Nhìn vào đoạn code trên, bạn thấy có những vấn đề gì?

  • Thứ nhất, ngay tại thời điểm có lỗi xảy ra, nếu chúng ta muốn thông báo cho user biết quá trình xử lí đã xảy ra lỗi và đó là lỗi gì? thì như code hiện tại là không thể. doInBackground() không chạy ở main thread nên không thể tạo dialog thông báo lỗi. Lúc này để thông báo lỗi tới người dùng, chúng ta buộc phải return null ở trong khối catch (Exception e), tuy nhiên, ở onPostExecute tuy đã có thể tương tác với UI nhưng ta lại không thể thông báo cho user biết họ đang gặp phải lỗi gì?.
  • Thứ hai, ở onPostExecute(), nếu muốn update UI, chúng ta cần 1 context để update bitmap lên UI. Như vậy thì ta lại cần phải truyền reference của Activity vào AsyncTask. Đây chính là nguyên nhân gây ra memory leak ở nhiều trường hợp hoạt động trong thực tế khi chúng ta cố gắng sử dụng context này để update UI.Lí do là giả sử quá trình download ảnh (ở doInBackground) chưa kết thúc mà user lại destroy activity(ấn nút back, xoay màn hình..), lúc này nếu sử dụng context mà AsyncTask đang lưu giữ (reference của Activity đã bị huỷ ) sẽ gây ra memory leak

để giải quyết được những vấn đề trên, chúng ta vẫn có thể tiếp tục sử dụng AsyncTask nhưng sẽ phải đối ứng thêm tương đối. Thay vào đó, RxAndroid đã hỗ trợ cho chúng ta giải quyết những vấn đề trên, giúp chúng ta dễ dàng tránh được những lỗi như vậy, kèm theo khiến cho code của bạn trở nên rõ ràng, dễ hiểu. Ngoài ra, ưu điểm của RxAndroid không chỉ có vậy, RxAndroid cũng được đánh giá là một công cụ rất tốt cho bạn trong việc lập trình đa luồng, RxAndroid cũng lên kịch bản cho rất nhiều tình huống xử lí tác vụ trong thực tế để giúp các developer có thể dễ dàng áp dụngQua bài viết này, mình mong rằng sẽ đem cho các bạn 1 cái nhìn tổng quan về RxAndroid, hiểu về các thành phần của nó cũng như là có thể áp dụng được các kịch bản thông dụng thường sử dụng nhất mà RxAndroid đã hỗ trợ

OK! bắt đầu nào!!!

Khái niệm cơ bản

  • Reactive code gồm 2 phần cơ phản là Observables và Subscribers. Observables phát ra các items, Subscriber tiếp nhận và sử dụng các items đó.
  • Một Observable có thể tạo ra 0 hoặc nhiều item, Observable sẽ gọi đến onNext() tương ứng với số item mà nó có .Và sẽ kết thúc sau khi đã hoàn thành (gọi đến onCompleted()) hoặc do xảy ra lỗi (gọi đến onError()()).
  • Observer có thể thực hiện những hành động khi Observable phát ra một giá trị, khi Observable cho biết một lỗi đã xảy ra, hoặc khi Observable thông báo không còn giá trị nào để phát ra. Ba hành động này được đóng gói trong giao diện Observer với các phương thức tương ứng là onNext(), onError() và onCompleted().

Setup

Import RxJava, RxAndroid vào project của bạn

1
2
3
implementation 'io.reactivex.rxjava2:rxjava:2.x.x'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

I, Cách tạo Observable (Creating Observables)

1.1 Observable.fromArray()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable.fromArray(new String[]{"one", "two"}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe at thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext with value: " + s + " at thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError at thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete at thread : " + Thread.currentThread().getName());
}
});
  • output:
    onSubscribe at thread : main
    onNext with value: one at thread : main
    onNext with value: two at thread : main
    onComplete at thread : main
  • Giải thích
    fromArray chuyển đổi một số object hoặc cấu trúc dữ liệu nào đó thành một vật Observable. Sau đó Observable sẽ phát ra lần lượt các item đó, và chúng được xử lí xong onNext.Sau khi hoàn thành sẽ gọi đến onCompleted()

1.2 Observable.just()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(new String[]{"one", "two"}, "three").subscribe(new Observer<Serializable>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe at thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(Serializable serializable) {
Log.i(TAG, "onNext with value: " + serializable.toString() + " at thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError at thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete at thread : " + Thread.currentThread().getName());
}
});
  • output:
    onSubscribe at thread : main
    onNext with value: [Ljava.lang.String;@3511b7c at thread : main
    onNext with value: three at thread : main
    onComplete at thread : main
  • Giải thích
    just() chuyển đổi một object hoặc một tập hợp các object thành Observable và phát ra nó. Với just(), giả sử nếu truyền vào 1 array thì array đó sẽ được chuyển đổi thành Observable và phát ra chính object /tập hợp đó.

1.3 Observable.defer()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Car {
private String brand;
public void setBrand(String brand) {
this.brand = brand;
}
public Observable<String> brandDeferObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() {
return Observable.just(brand);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Car car = new Car();
Observable<String> brandDeferObservable = car.brandDeferObservable();
car.setBrand("BMW");
brandDeferObservable
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe at thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext with value: " + s + " at thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError at thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete at thread : " + Thread.currentThread().getName());
}
});
  • output
    onSubscribe at thread : main
    onNext with value: BMW at thread : main
    onComplete at thread : main
  • giải thích
    defer() không tạo ra Observable cho đến khi có ít nhất 1 subcriber được đăng kí và nó sẽ luôn tạo mới 1 observable tương ứng với mỗi subcriber. Như ví dụ ở trên, thậm chí là chúng ta set giá trị brand sau khi khởi tạo Observable nhưng kết quả ở output chúng ta vẫn biết được giá trị của brand là BMW. Lí do là vì defer() phải chờ đến thời điểm đăng kí 1 subcriber thì mới thực hiện lưu giá trị của chứ không phải khi khởi tạo. Trong trường hợp chúng ta không sử dụng defer(), output của giá trị brand sẽ là null.

1.4 Observable.interval()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.interval(5, 3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe at thread : " + Thread.currentThread().getName());
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext with value: " + aLong + " at thread : " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError at thread : " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete at thread : " + Thread.currentThread().getName());
}
});
  • output
    17:24:56.384 onSubscribe
    17:25:01.393 onNext with value: 0
    17:25:04.393 onNext with value: 1
    17:25:07.393 onNext with value: 2
    17:25:10.393 onNext with value: 3
  • giải thích
    interval() tạo một Observable phát ra một chuỗi các số nguyên cách nhau một khoảng thời gian cụ thể. Trên đây là ví dụ đơn giản sử dụng interval để chạy tác vụ trong khoảng thời gian 3 giây một . Lần đầu thì chờ sau 5s mới phát Observable(để bắt đầy ngay lập tức thì set initialDelay = 0)

1.5 Observable.create()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
try {
for (int i = 1; i < 5; i++) {
emitter.onNext(i);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext with value: " + integer );
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
  • output
    onSubscribe
    onNext with value: 1
    onNext with value: 2
    onNext with value: 3
    onNext with value: 4
    onComplete
  • giải thích
    create() có thể tạo Observable từ con số 0 bằng cách gọi đến hàm observer. Với create(), bạn có thể tự thiết kế hoạt động của Observe bằng cách gọi các phương thức onError và onCompleted một cách thích hợp. Lưu ý là Subscriber.onComplete() hoặc Subscriber.onError() chỉ được gọi duy nhất 1 lần và sau đó không được gọi thêm bất cứ hàm nào khác của Observer.

1.6 Observable..timer()

  • Sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext with value: " + aLong );
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
  • output
    17:25:01.393 onSubscribe
    17:25:03.393 onNext with value: 0
    onComplete
  • giải thích
    timer() tạo 1 Observable sẽ phát ra 1 single item sau 1 khoảng thời gian delay cho trước. Như ví dụ trên thì sau 2s (sử dụng bộ đếm thời gian SECONDS) sẽ phát ra object

1.7 Observable.range()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.range(1, 5)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
  • output
    onSubscribe
    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onComplete
  • giải thích
    Range() tạo 1 Observable từ 1 dải interger và lần lươt phát ra các interger trong dải đó. Trong TH này onNext() được gọi đến 5 lần , lần lượt từ 1 đến 5.

1.8 Observable..repeat()

  • sample code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.range(1, 3).repeat(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
  • output
    onSubscribe
    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 1
    onNext: 2
    onNext: 3
    onComplete
  • giải thích
    Repeat() tạo 1 Observable mà có thể lặp đi lặp lại việc phát ra 1 dải các item. Bạn có thể hạn chế số lần lặp lại bằng cách set repeat(số lần lặp). Ở ví dụ trên, Observable phát ra dải interger từ 1 đến 3 trong 2 lần

Kết luận

Vậy là chúng ta đã cùng đi qua những hàm cơ bản và thông dụng nhất trong RxAndroid để tạo Observable. Hẹn gặp lại các bạn ở lần tới, chúng ta sẽ tiếp tục tìm hiểu về các concept quan trọng khác của RxAndroid.
Cảm ơn các bạn đã theo dõi!

Tài liệu tham khảo

http://reactivex.io/documentation/operators.html#creating
Sample: https://github.com/amitshekhariitbhu/RxJava2-Android-Samples

Share1
Tweet
Share
Pin
1 Shares

Video liên quan

Post a Comment (0)
Previous Post Next Post