引言
前面我們分析了RxJava的線程調(diào)度,今天我們研究下RxJava的另外一塊強(qiáng)大的功能-事件變換操作符。
map操作符
官方定義:transform the items emitted by an Observable by applying a function to each item
翻譯過(guò)來(lái)就是就是轉(zhuǎn)換發(fā)射數(shù)據(jù)的操作符,說(shuō)白了就是起到事件變換的作用,下面是圖示:
map操作符示例:
/**
* 變換操作符
*/
//1.map將事件通過(guò)一個(gè)函數(shù)做變換,一般用作數(shù)據(jù)類(lèi)型轉(zhuǎn)換
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
//Integer轉(zhuǎn)換成String
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map變換操作符 將事件" + integer + "的參數(shù)從 整型" + integer + " 變換成 字符串類(lèi)型" + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
//2.FlatMap/concatMap 將每個(gè)事件進(jìn)行拆分和轉(zhuǎn)換,再合并成一個(gè)新的事件序列,最后再發(fā)送,前者無(wú)序發(fā)送,后者有序發(fā)送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
//事件拆分
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通過(guò)flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
// 最終合并,再發(fā)送給被觀察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
在這個(gè)例子中map操作符將整形數(shù)據(jù)變換成了String數(shù)據(jù),交給Consumer<String>對(duì)象接收。
我們看一下map函數(shù)的源碼:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略過(guò)
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
返回ObservableMap對(duì)象:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//調(diào)用層傳入的Function,用于數(shù)據(jù)轉(zhuǎn)換
final Function<? super T, ? extends U> function;
//source為上游,支持類(lèi)型為T(mén),U為下游的支持類(lèi)型
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//上游source訂閱MapObserver,根據(jù)function和傳入的觀察者t構(gòu)造MapObserver對(duì)象
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
//MapObserver通過(guò)function對(duì)象mapper,將T數(shù)據(jù)轉(zhuǎn)換U數(shù)據(jù)類(lèi)型后,再轉(zhuǎn)換后的數(shù)據(jù)交給真正的接受者actual
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//轉(zhuǎn)換后的數(shù)據(jù)
U v;
try {
//調(diào)用mapper.apply(t)執(zhí)行數(shù)據(jù)轉(zhuǎn)換,交給actual
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
它繼承自AbstractObservableWithUpstream,前面我們提到過(guò),它封裝了上游的ObservableSource,是支持?jǐn)?shù)據(jù)類(lèi)型轉(zhuǎn)換的Observable。
subscribeActual訂閱方法是調(diào)用上游(也就是用戶(hù)定義的Observable對(duì)象)的subscribe方法,入?yún)镺bserver對(duì)象(下游)的裝飾類(lèi)MapObserver。
ObservableMap持有Function對(duì)象和真正的觀察者對(duì)象,在被訂閱的onXXX方法中,通過(guò)Function將上游發(fā)送的T類(lèi)型數(shù)據(jù)轉(zhuǎn)換成U類(lèi)型數(shù)據(jù),然后將交給觀察者對(duì)象處理。
FlatMap操作符
還是先看看官方定義:
官方定義:transform the items emitted by an Observable by applying a function to each item
大致意思是將每一個(gè)上游發(fā)射的數(shù)據(jù)從一個(gè)Observable轉(zhuǎn)成為多個(gè)Observable,并將所有要發(fā)射的數(shù)據(jù)平鋪為一個(gè)Observable。
下面是FlatMap的圖解:
到這里我們總結(jié)一下:
- flatmap 轉(zhuǎn)換是一對(duì)多的(一對(duì)一當(dāng)然也支持),原來(lái)發(fā)射了幾個(gè)數(shù)據(jù),轉(zhuǎn)換之后可以是更多個(gè);
- flatMap 轉(zhuǎn)換同樣可以改變發(fā)射的數(shù)據(jù)類(lèi)型;
- flatMap 轉(zhuǎn)換后的數(shù)據(jù),還是會(huì)逐個(gè)發(fā)射給我們的Observer來(lái)接收(就像這些數(shù)據(jù)是由一個(gè)Observable發(fā)射的一樣,其實(shí)是多個(gè)Observable發(fā)射然后合并的);
4.注意:轉(zhuǎn)換后的數(shù)據(jù)發(fā)射順序可能和上游的發(fā)射順序不一致,為什么會(huì)這樣,我們后面看源碼分析。
樣例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
//事件拆分
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通過(guò)flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
// 最終合并,再發(fā)送給被觀察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
下面我們看下源碼
FlatMap方法
flatMap有多個(gè)重載方法,最終調(diào)用下面的方法:
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
//判空
ObjectHelper.requireNonNull(mapper, "mapper is null");
//參數(shù)為正校驗(yàn)
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
//返回一個(gè)ObservableFlatMap對(duì)象
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
ObservableFlatMap
還是和之前的套路一樣,看下ObservableFlatMap的訂閱方法:
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
//返回包裝類(lèi)MergeObserver
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
最后返回MergeObserver,顯而易見(jiàn)這是實(shí)現(xiàn)flatMap操作的核心類(lèi)。
MergeObserver
看關(guān)鍵方法onNext:
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
//調(diào)用Function方法轉(zhuǎn)換成新的Observable
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
//默認(rèn)情況會(huì)走到這里
subscribeInner(p);
}
由于我們默認(rèn)調(diào)用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 所以最終會(huì)調(diào)用 subscribeInner(p),注意這里我們的mapper方法以及被調(diào)用了,p就是跟我們傳入的Function生成的Observable,我們?cè)倮^續(xù)往下看
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
tryEmitScalar(((Callable<? extends U>)p));
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
break;
}
}
} else {
break;
}
} else {
//一般情況下,返回的Observable 都不是 Callable類(lèi)型的,走這里
//構(gòu)造最終的觀察者InnerObserver,它是真正接收數(shù)據(jù)的
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
addInner(inner);
p.subscribe(inner);
break;
}
}
}
終于找到最后的接受者InnerObserver,接著看它的onNext方法:
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
//MergeObserable的方法
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
paren為上面的MergeObserable的tryEmit方法,看字面意思是嘗試發(fā)送,所以可能會(huì)引入同步機(jī)制,下面是tryEmit方法:
void tryEmit(U value, InnerObserver<T, U> inner) {
//引入同步機(jī)制,保證每個(gè)時(shí)刻只發(fā)射一個(gè)數(shù)據(jù)給最后的接受者
//嘗試獲取cas鎖
if (get() == 0 && compareAndSet(0, 1)) {
//最后的接受者
actual.onNext(value);
if (decrementAndGet() == 0) {
//釋放鎖返回
return;
}
} else {//沒(méi)有拿到鎖,就把數(shù)據(jù)加入隊(duì)列
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
//輪詢(xún)隊(duì)列,取數(shù)據(jù)
drainLoop();
}
這里需要提一下:MergeObserver 繼承了 AtomicInteger,所以這里的tryEmit方法就利用了 AtomicInteger 的同步機(jī)制,同時(shí)只會(huì)有一個(gè) value 被 actual Observer 發(fā)射,而且這里 剛好 可以解答我們上面留下的 問(wèn)題,由于 AtomicInteger CAS鎖只能保證操作的原子性,并不保證鎖的獲取順序,是搶占式的,所以最終數(shù)據(jù)的發(fā)射順序并不是固定的(同一個(gè)Observable發(fā)出的數(shù)據(jù)是有序的)。
如果沒(méi)有獲取到鎖,就會(huì)將要發(fā)射的數(shù)據(jù)放入 隊(duì)列中,drainLoop 方法會(huì)循環(huán)去獲取隊(duì)列中的 數(shù)據(jù),然后發(fā)射。
void drainLoop() {
//取最終的觀察者
final Observer<? super U> child = this.actual;
int missed = 1;
for (;;) {
//錯(cuò)誤檢查
if (checkTerminate()) {
return;
}
SimplePlainQueue<U> svq = queue;
if (svq != null) {
for (;;) {
U o;
for (;;) {
if (checkTerminate()) {
return;
}
//取數(shù)據(jù)
o = svq.poll();
if (o == null) {
break;
}
//交給最后的觀察者
child.onNext(o);
}
if (o == null) {
break;
}
}
}
.....
//隊(duì)列數(shù)據(jù)輪詢(xún)完畢,做完成或者錯(cuò)誤處理
if (d && (svq == null || svq.isEmpty()) && n == 0) {
Throwable ex = errors.get();
if (ex == null) {
child.onComplete();
} else {
child.onError(errors.terminate());
}
return;
}
....
}
這個(gè)方法就是從循環(huán)從數(shù)據(jù)隊(duì)列中取數(shù)據(jù)交給最后的觀察者接收。
FlatMap引入CAS機(jī)制,在onNext方法中嘗試拿到鎖,如果拿到則立即交給最終的觀察者,否則加入等待隊(duì)列。
關(guān)于map操作法我們先講到這里,水平有限,難免有紕漏,還望多多指正!