變換操作符
-
作用
對(duì)事件序列中的事件 / 整個(gè)事件序列 進(jìn)行加工處理(即變換),使得其轉(zhuǎn)變成不同的事件 / 整個(gè)事件序列。
-
常見(jiàn)類(lèi)型

-
應(yīng)用場(chǎng)景 & 對(duì)應(yīng)操作符介紹
1.Map()
作用
對(duì)被觀(guān)察者發(fā)送的每一個(gè)事件都通過(guò)指定的函數(shù)處理,從而變換成另外一種事件。數(shù)據(jù)類(lèi)型轉(zhuǎn)換,即將被觀(guān)察者發(fā)送的事件轉(zhuǎn)換為任意的類(lèi)型事件。
事例
public void MapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
// 使用Map變換操作符中的Function函數(shù)對(duì)被觀(guān)察者發(fā)送的事件進(jìn)行統(tǒng)一變換:整型變換成字符串類(lèi)型
@Override
public String apply(Integer integer) throws Exception {
return "使用Map變換操作符,將事件"+"的參數(shù)從 整型"+integer + " 變換成 字符串類(lèi)型" + integer*6;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss",s);
}
});
}
2.FlatMap()
作用
將被觀(guān)察者發(fā)送的事件序列進(jìn)行 拆分 & 單獨(dú)轉(zhuǎn)換,再合并成一個(gè)新的事件序列,最后再進(jìn)行發(fā)送。
為事件序列中每個(gè)事件都創(chuàng)建一個(gè) Observable 對(duì)象,將對(duì)每個(gè)原始事件轉(zhuǎn)換后的新事件 都放入到對(duì)應(yīng) Observable對(duì)象,將新建的每個(gè)Observable 都合并到一個(gè)新建的、總的Observable 對(duì)象,新建的、總的Observable 對(duì)象將新合并的事件序列發(fā)送給觀(guān)察者(Observer)。
注:新合并生成的事件序列順序是無(wú)序的,即與舊序列發(fā)送事件的順序無(wú)關(guān)
事例
public void FlatMapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list=new ArrayList<>();
for (int i = 0; i <3 ; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通過(guò)flatMap中將被觀(guān)察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
// 最終合并,再發(fā)送給被觀(guān)察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss", s);
}
});
}
3.ConcatMap()
作用
類(lèi)似FlatMap()操作符,區(qū)別在于拆分 & 重新合并生成的事件序列的順序 = 被觀(guān)察者舊序列生產(chǎn)的順序。
事例
public void ConcatMapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list=new ArrayList<>();
for (int i = 0; i <3 ; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通過(guò)concatMap中將被觀(guān)察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
// 最終合并,再發(fā)送給被觀(guān)察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss", s);
}
});
}
4.Buffer()
作用
定期從被觀(guān)察者(Obervable)需要發(fā)送的事件中 獲取一定數(shù)量的事件 & 放到緩存區(qū)中,最終發(fā)送,即緩存被觀(guān)察者發(fā)送的事件。
事例
public void BufferOperators(View view){
Observable.just("1,","2","3","3")
.buffer(3,1)// 設(shè)置緩存區(qū)大小 & 步長(zhǎng)
// 緩存區(qū)大小 = 每次從被觀(guān)察者中獲取的事件數(shù)量
// 步長(zhǎng) = 每次獲取新事件的數(shù)量
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
Log.e("sss", " 緩存區(qū)里的事件數(shù)量 = " + strings.size());
for (String value : strings) {
Log.e("sss", " 事件 = " + value);
}
}
@Override
public void onError(Throwable e) {
Log.e("sss", "對(duì)Error事件作出響應(yīng)" );
}
@Override
public void onComplete() {
Log.e("sss", "對(duì)Complete事件作出響應(yīng)");
}
});
}
-
實(shí)際開(kāi)發(fā)中的應(yīng)用
1.網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
模擬先請(qǐng)求注冊(cè)然后登陸
public void RxJavaNest(View view){
Retrofit retrofit=new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest_Interface request=retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable1=request.getCall_1();
final Observable<Translation> observable2=request.getCall_2();
observable1.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("sss","第1次網(wǎng)絡(luò)請(qǐng)求成功");
Log.e("sss",translation.getContent().getOut());
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<Translation, ObservableSource<Translation>>() {
@Override
public ObservableSource<Translation> apply(Translation translation) throws Exception {
return observable2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("sss", "第2次網(wǎng)絡(luò)請(qǐng)求成功");
Log.e("sss",translation.getContent().getOut());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("sss", "登錄失敗");
}
});
}