分析目的
- Observable發(fā)出數據和Observer接收數據
- 如何實現線程調度和操作符原理
文章僅分析Observable不分析帶背壓的Flowable
Observable創(chuàng)建和訂閱
一個常見例子
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("test");
emitter.onComplete();
}).map(s -> s + "111")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
LPLogger.e("onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
LPLogger.e("onError:" + e.getMessage());
}
@Override
public void onComplete() {
LPLogger.e("onComplete");
}
});
}
以上常見例子展示了Observable 創(chuàng)建,訂閱和調度以及轉換操作符的整個過程下面分析各個過程
Observable創(chuàng)建
Observable.create()方法參數ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
該接口方法subscribe(ObservableEmitter)參數ObservableEmitter實現了onNext(),onError()和onComplete()即我們使用發(fā)送數據的地方
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
其中RxJavaPlugins.onAssembly()方法是hook方法,默認返回原值即ObservableCreate(source)是一個Observable,實現了subscribeActual()此方法是訂閱真正執(zhí)行的方法,先不用關注
只需要注意Observable.create()方法傳入參數ObservableOnSubscribe返回ObservableCreate即可
Observable訂閱
先不看線程調度和操作符轉換處理僅看最簡單的部分subscribe()
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
//..
subscribeActual(observer);
}
實際是調用的subscribeActual(observer),即前面創(chuàng)建的ObservableCreate里面的subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
//實現自ObservableEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//這里是外界的回調
observer.onSubscribe(parent);
try {
//這里source即create傳入的ObservableOnSubscribe
//parent即前面?zhèn)魅氲腛bservableEmitter,即emitter.onNext("test")中的emitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
看下CreateEmitter源碼
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("xxx"));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("xxx");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
- 以上
onNext(T)調用了observer.onNext(t)即讓訂閱者接收到發(fā)送端數據
我們可以看到subscribeActual(observer)中的方法已經將Observable和Observer聯系起來 - 同時注意到
CreateEmitter是繼承自Disposable即我們可以使用回調中的onSubscribe(Disposable d)中的Disposable去結束Observable發(fā)送,當我們subscribe(Consumer)獲取的返回值同理 - 從上面代碼可以得出我們之前記住的一些結論如
onError()和onComplete()只能調用一次,onSubscribe在onNext()之前執(zhí)行等
線程調度
subcribeOn
subscribeOn(Schedulers.io())返回值類似于create(),返回的是ObservableSubscribeOn<T>(this, scheduler)比ObservableCreate<T>(source)多一個線程處理,典型的裝飾器模式應用
源碼也和ObservableCreate類似
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
//和CreateEmitter一樣也是包裝Observer,最終調用Observer.onNext之類方法
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
//線程切換SubscribeTask是Runnable最終執(zhí)行的還是source.subscribe(parent);
//source.subscribe(parent)執(zhí)行后會執(zhí)行到ObservableCreate的subscribeActual()
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
繼續(xù)看
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//實際是一個Dispose并設置到ObservableSubscribeOn方便管理任務
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
執(zhí)行調用如下不一一展開:
w.schedule(task, delay, unit)->IoScheduler.EventLoopWorker.schedule->NewThreadWorker.scheduleActual()->ScheduledRunnable.call()->ScheduledRunnable.run()->DisposeTask.run()->new SubscribeTask(parent)
即在使用線程池執(zhí)行了source.subscribe(parent)
observeOn
observeOn(AndroidSchedulers.mainThread())中創(chuàng)建的是ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//創(chuàng)建worker
Scheduler.Worker w = scheduler.createWorker();
//還是和source.subscribe(parent)一致
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
查看ObserveOnObserver是繼承Runnable
public void onNext(T t) {
if (done) {
return;
}
//先把值存儲到隊列中,然后切換線程處理
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//run方法運行到這里
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//切到線程后再執(zhí)行onNext
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
-
ObserveOnObserver的onNext()中把發(fā)送值存到隊列,然后調用schedule() - 調用的是
worker.schedule(this);和前面分析subcribeOn一樣直接查看run(),此時已經完成線程切換 -
run()中調用的是drainNormal(),從1中存儲的隊列中取出值發(fā)送
切換線程分析
AndroidSchedulers.mainThread()實現
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
繼續(xù)看HandlerScheduler
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//設置了runnable()后續(xù)發(fā)送到主線程會執(zhí)行run()
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
ObservableObserveOn中的subscribeActual()創(chuàng)建的worker就是HandlerWorker
-
AndroidSchedulers.mainThread()創(chuàng)建了一個帶主線程Handler的HandlerScheduler -
schedule()中通過handler.sendMessageDelayed(msg,delay)發(fā)送消息到主線程,因為message設置了Runnable(),消息發(fā)送到主線程后會調用message.callback.run()從而調用schedule()中的參數run(),即實際完成切換線程回調到ObserveOnObserver的run()
操作符原理
僅分析下map操作符,其它操作符類似
直接看ObservableMap源碼
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
- 可以看到
subscribeActual()直接是source.subscribe(parent)類型,我們直接看MapObserver的onNext() - 非聚合模式sourceMode的值是NONE,相當于
Observer.onNext(mapper.apply(t))而mapper.apply(t)則是我們寫的lambda表達式的返回值即s + "111",由此可以看出map是直接把值返回
總結
-
subscribeActual()方法中實際完成訂閱,subscribe訂閱后各個操作符才完成訂閱,即訂閱是自下而上進行的 - 線程操作是通過線程池和
Handler完成切換