RxJava操作符系列三(下)
接上文
Take
Take操作符可以修改Observable的行為,只返回前面的N項數據,然后發射完成通知,忽略剩余的數據。
- Observable.range(1,8)
- .take(4)
- .subscribe(new Subscriber<Integer>() {
- @Override
- public void onNext(Integer item) {
- Log.e(TAG, "Next: " + item);
- }
- @Override
- public void onError(Throwable error) {
- Log.e(TAG, "Error: " + error.getMessage());
- }
- @Override
- public void onCompleted() {
- Log.e(TAG, "complete.");
- }
- });
輸出日志信息
- Next: 1
- Next: 2
- Next: 3
- Next: 4
- complete
take和skip一樣也有其它兩個重載方法take(long time, TimeUnit unit),take(long time, TimeUnit unit, Scheduler scheduler),默認在computation調度器上執行。
take還有變體操作符TakeLast,takeLastBuffer具體執行效果可自行代碼。
Debounce
該操作符指的是過了一段指定的時間還沒發射數據時才發射一個數據,聽著可能有點繞。你可以理解對源Observable間隔期產生的結果進行過濾,如果在這個規定的間隔期內沒有別的結果產生,則將這個結果提交給訂閱者,否則忽略該結果,原理有點像光學防抖
上代碼
- Observable.range(1,8)
- .take(4)
- .subscribe(new Subscriber<Integer>() {
- @Override
- public void onNext(Integer item) {
- Log.e(TAG, "Next: " + item);
- }
- @Override
- public void onError(Throwable error) {
- Log.e(TAG, "Error: " + error.getMessage());
- }
- @Override
- public void onCompleted() {
- Log.e(TAG, "complete.");
- }
- });
輸出信息
- onNext: 4
- onNext: 5
- onNext: 6
- onNext: 7
- onNext: 8
- onNext: 9
- onCompleted:
這個輸出數據不一定一樣,有可能從5開始。
Distinct
這個比較好理解,它就是過濾掉重復的數據,只允許還沒有發射過的數據項通過。
示例代碼
- Observable.just(0, 0, 6, 4, 2, 8, 2, 1, 9, 0)
- .distinct()
- .subscribe(new Subscriber<Integer>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:Distinct ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:Distinct ");
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext:Distinct " + integer);
- }
- });
輸出日志信息
- onNext:Distinct 0
- onNext:Distinct 6
- onNext:Distinct 4
- onNext:Distinct 2
- onNext:Distinct 8
- onNext:Distinct 1
- onNext:Distinct 9
- onCompleted:Distinct
ElementAt
該操作符獲取原始Observable發射的數據序列指定索引位置的數據項,然后當做自己的***數據發射。給它傳遞一個基于0的索引值,它會發射原始Observable數據序列對應索引位置的值,如果你傳遞給elementAt的值為4,那么它會發射第5項的數據。如下示例代碼
- Observable.just(0, 0, 6, 4, 2, 8, 2, 1, 9, 0)
- .elementAt(4)
- .subscribe(new Subscriber<Integer>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:ElementAt ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:ElementAt ");
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext:ElementAt " + integer);
- }
- });
輸出日志信息
- onNext:ElementAt 2
- onCompleted:ElementAt
IgnoreElements
操作符抑制原始Observable發射的所有數據,只允許它的終止通知(onError或onCompleted)通過,使用該操作符onNext()方法不會執行。
- Observable.just(1, 2, 3).ignoreElements().subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError");
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext");
- }
- });
執行后只會輸出onCompleted。這個操作符效果就如同empty()方法創建一個空的Observable,只會執行onCompleted()方法,不同的是ignoreElements是對數據源的處理,而empty()是創建Observable。


















