基于 RxJava 2.1.6 RxJava github 地址
為什么要用 RxJava
簡(jiǎn)潔!簡(jiǎn)潔!簡(jiǎn)潔?。ㄖ匾氖虑檎f(shuō)三遍)
RxJava 最大的優(yōu)點(diǎn)就是簡(jiǎn)潔。簡(jiǎn)潔的代碼能讓人心曠神怡,減少 bug 。
RxJava 是一種新的編程模式 響應(yīng)式編程
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。
數(shù)據(jù)流就像一條河:它可以被觀(guān)測(cè),被過(guò)濾,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。
以上是 RxJava Essentials 中文翻譯版 對(duì)響應(yīng)式編程的介紹。使用 RxJava 可以讓我們?cè)?java 語(yǔ)言中體驗(yàn)什么是響應(yīng)式編程。響應(yīng)式編程有兩個(gè)重要概念
1. 基于異步
2. 數(shù)據(jù)流
如何使用 RxJava
RxJava 的設(shè)計(jì)理念基于 觀(guān)察者模式
在 RxJava 中首先明白有兩個(gè)對(duì)象觀(guān)察者和被觀(guān)察者。
觀(guān)察者和被觀(guān)察者可以存在不同的線(xiàn)程之中,所以存在觀(guān)測(cè)線(xiàn)程和被觀(guān)測(cè)線(xiàn)程*
觀(guān)察者和被觀(guān)察者通過(guò) subscribe 發(fā)生『訂閱』關(guān)系。
RxJava 提供一些列的鏈?zhǔn)秸{(diào)用,使用起來(lái)如下:
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.subscribe(...)
subscribe() 方法為鏈?zhǔn)秸{(diào)用的最后一層,create() 和 subscribe() 方式之前可以任意設(shè)置其他操作。
上面提到『響應(yīng)式編程』中的數(shù)據(jù)流就像是一條河。
create() 方法可以比喻為河流的『上游發(fā)源地』,subscribe() 則為河流的『入??凇弧?在這兩個(gè)方法之間我們可以添加觀(guān)察、過(guò)濾等操作。
在鏈?zhǔn)秸{(diào)用中增加一些數(shù)據(jù)處理
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.map(...)
.filter(...)
.subscribe(...)
在 RxJava2 中提供一系列可觀(guān)測(cè)對(duì)象(也就是上面鏈?zhǔn)秸{(diào)用的 Observable 等同功能)
io.reactivex.Flowableio.reactivex.Observableio.reactivex.Singleio.reactivex.Completableio.reactivex.Maybe
這里我們寫(xiě)一個(gè)例子
Observable.create((ObservableOnSubscribe<String>) e -> {
for (int i = 0; i < 5; i++) {
e.onNext(i + "");
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.map(s -> {
System.out.println("map:" + s);
return s + "_map";
})
.filter(o -> {
System.out.println("flat:" + o);
if (o.compareTo("3") < 0) {
return true;
}
return false;
})
.subscribe(o -> System.out.println("subscribe:" + o));
輸出結(jié)果如下:
map:0
flat:0_map
subscribe:0_map
map:1
flat:1_map
subscribe:1_map
map:2
flat:2_map
subscribe:2_map
map:3
flat:3_map
map:4
flat:4_map
上面的例子中,我們?cè)?strong>被觀(guān)察者中發(fā)射了 5 個(gè)數(shù)據(jù)源,觀(guān)察者和被觀(guān)察著都在同一個(gè)線(xiàn)程中,通過(guò) map 對(duì)象給每個(gè)發(fā)射的對(duì)象拼接一個(gè) 『_map』字符串,通過(guò) filter 過(guò)濾了比『3』字符串小的對(duì)象。
所以最終在觀(guān)察者中接收到了 3 次消息。
源碼分析 RxJava 中的核心代碼
訂閱關(guān)系的產(chǎn)生
創(chuàng)建被觀(guān)察著
『被觀(guān)測(cè)者』是事件產(chǎn)生的一方,創(chuàng)建方式也有很多種。這里列舉一下 Observable 創(chuàng)建的方式
-
通過(guò) create() 方法創(chuàng)建
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } -
通過(guò) just() 方法創(chuàng)建
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); } -
通過(guò) fromArray() 方法創(chuàng)建
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子類(lèi),而 Observable 本身是一個(gè)抽象。
這些子類(lèi)主要實(shí)現(xiàn) Observe 的抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
再看一下 RxJavaPlugins.onAssembly() 方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
這里需要說(shuō)明一下 RxJavaPlugins.onAssembly()是一個(gè) Hock,如果不做任何 hock 處理,RxJavaPlugins.onAssembly()會(huì)直接返回傳入的對(duì)象。onObservableAssembly 靜態(tài)成員變量為 null
我們用 ObservableCreate 舉例
ObservableCreate 的構(gòu)造方法需要傳入一個(gè) ObservableOnSubscribe 對(duì)象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
并重載 Observable 的 subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual() 方法傳入了一個(gè) Observer 對(duì)象并且包裝到 CreateEmitter 對(duì)象中,然后調(diào)用
observer.onSubscribe(parent); 和 source.subscribe(parent);
創(chuàng)建觀(guān)察者
觀(guān)察者比較簡(jiǎn)單,需要實(shí)現(xiàn) 4 個(gè)方法
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
訂閱
通常情況下,我們不需要重載 Observer 的每一個(gè)方法,RxJava 內(nèi)部提供了另一個(gè) LambdaObserver 把 Observer 的四個(gè)方法拆分為 4 個(gè)部分。
Observable.subscribe() 可以只傳入一個(gè) Consumer 對(duì)象。
內(nèi)部會(huì)把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
然后調(diào)用接收 Observer 的 subscribe() 方法
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} ……
}
這里調(diào)用了 Observable 的 subscribeActual(observer) 方法。
這里就完成了 觀(guān)察者 和 被觀(guān)察著 之間的訂閱關(guān)系

如下一段代碼
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("subscribe:" + i);
e.onNext(i + "");
}
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept:" + s);
}
});
調(diào)用的時(shí)序圖如下

線(xiàn)程調(diào)度原理分析
上部分分析的訂閱關(guān)系的創(chuàng)建,都是在當(dāng)前線(xiàn)程之中。RxJava 可以指定 觀(guān)察線(xiàn)程 和 觀(guān)察者線(xiàn)程
observeOn 原理分析
Observable 的 observeOn 方法有三個(gè)
- Observable<T> observeOn(Scheduler scheduler)
- Observable<T> observeOn(Scheduler scheduler, boolean delayError)
- Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
這三個(gè)方法中,前兩個(gè)方法都會(huì)再次調(diào)用第三個(gè)方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
我們看到這里創(chuàng)建了一個(gè) ObservableObserveOn 對(duì)象,ObservableObserveOn是 Observable的子類(lèi)。
我們看下 ObservableObserveOn 的 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
這里我們假設(shè)傳入的 Scheduler 是 Schedulers.io() 進(jìn)一步跟蹤分析會(huì)發(fā)現(xiàn) Schedulers.io() 返回的是 IoScheduler
所以會(huì)走上面代碼的 else 分支。
先忽略 scheduler.createWorker() 過(guò)程,先看下 ObserveOnObserver
這里的 source 是 ObservableCreate ,而 source.subscribe() 會(huì)調(diào)用 ObservableCreate.subscribeActual(observer) 然后調(diào)用 ObserveOnObserver.onSubscribe() 方法
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
……
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
這里傳入的 Disposable 是 CreateEmitter 對(duì)象,所以不會(huì)走 if 分支。
然后創(chuàng)建了一個(gè) SpscLinkedArrayQueue 對(duì)象,
緊接著執(zhí)行 actual.onSubscribe() 也就是 LambdaObserver.onSubscribe()
后面應(yīng)該執(zhí)行的是 ObserveOnObserver.onNext() 方法
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
看到會(huì)把 onNext(T t) 傳入?yún)?shù)放入隊(duì)列之中,然后執(zhí)行 schedule
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker.schedule() 接收的是一個(gè) Runnable 對(duì)象,所以我們從這里可以看出 Observer 的 onNext() 、onComplete()、onError() 等方法都是在線(xiàn)程之中執(zhí)行。
接下來(lái)我們看下線(xiàn)程的創(chuàng)建
從 ObservableObserveOn 的 subscribeActual() 方法中的
Scheduler.Worker w = scheduler.createWorker();
Schedulers.io()的跟蹤過(guò)程比較簡(jiǎn)單,最終會(huì)得到一個(gè) IoScheduler
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
直接看 createWorker()
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
再看 EventLoopWorker 的構(gòu)造函數(shù)
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
繼續(xù)看下 CachedWorkerPool 的構(gòu)造方法
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
終于我們找到了線(xiàn)程池相關(guān)的代碼
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
pool.get() 方法
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
這里返回了一個(gè) ThreadWorker
看下 ThreadWorker 的創(chuàng)建過(guò)程
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
看下父類(lèi)的構(gòu)造函數(shù)
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
再跟下去
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
這里又出現(xiàn)了一個(gè)線(xiàn)程池
下面開(kāi)始看 worker.schedule(this)
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
繼續(xù)跟蹤下去
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
終于我們找到了執(zhí)行線(xiàn)程的方法
總結(jié)一下真?zhèn)€流程

subscribeOn 流程分析
subscribeOn 方法只有一個(gè)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
和 observeOn 類(lèi)似,這里把 Observable 包裝到 ObservableSubscribeOn 對(duì)象中。
直接看subscribeActual方法
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
這里把 Observe 包裹在 SubscribeOnObserver 中,并執(zhí)行
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
這只設(shè)置了 Disposable 只能被執(zhí)行一次。
重點(diǎn)看下 scheduler.scheduleDirect(new SubscribeTask(parent))
先看一些 SubscribeTask 類(lèi)
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
這里看出 SubscribeTask 也是一個(gè) Runnable 在 run 方法中執(zhí)行 source.subscribe(parent),
所以 SubscribeTask 是觀(guān)察者線(xiàn)程的關(guān)鍵
繼續(xù)依照 IoScheduler 為例
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
這里和 subscribeOn 原理一樣了,通過(guò) Worker 對(duì)象把觀(guān)察者的行為設(shè)置在線(xiàn)程之中。
操作符原理分析
RxJava 里面有很多操作符,這里找一個(gè) map 操作進(jìn)行分析。其他更復(fù)雜的操作不一一分析。
先看一個(gè)例子
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("subscribe:" + i);
e.onNext(i + "");
}
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("map:" + s);
return s + "_map";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept:" + s);
}
});
輸出結(jié)果如下:
subscribe:0
map:0
accept:0_map
subscribe:1
map:1
accept:1_map
subscribe:2
map:2
accept:2_map
subscribe:3
map:3
accept:3_map
subscribe:4
map:4
accept:4_map
可以看 ObservableOnSubscribe 里面每次發(fā)送的數(shù)據(jù)都會(huì)到 Function.apply() 方法進(jìn)行『過(guò)濾』,把每個(gè)發(fā)送的 String 轉(zhuǎn)換一下后再發(fā)送給 Consumer
看下 map() 方法
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
根據(jù)上面的經(jīng)驗(yàn),我們直接看 ObservableMap 對(duì)象的 subscribeActual()
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
然后繼續(xù)看 MapObserver 對(duì)象的 onNext,其中 function 就是我們?cè)?map() 方法中傳入的 Function 對(duì)象。
先看構(gòu)造函數(shù)
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
在看 onNext()
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
可以看出,在執(zhí)行 actual.onNext(v) 之前,會(huì)執(zhí)行 mapper.apply(t) 從而完成轉(zhuǎn)換。
其他更復(fù)雜的操作符,基本都是在各種特定的 XXXObserver 中的 onNext(T t) 方法中做特殊處理。