創(chuàng)建一個(gè)Observable的代碼
const hello = Rx.Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});
- create傳入的參數(shù)為一個(gè)subscriber函數(shù),該函數(shù)有一個(gè)參數(shù)為observer,observer是一個(gè)觀察者對象,該對象有三個(gè)屬性,屬性值為函數(shù)。如下:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
- 當(dāng)observable被訂閱的時(shí)候,即observable.subscribe(function next(x), function error(x), function complete(x)), 創(chuàng)建該observable時(shí)傳入的回調(diào)函數(shù)即被執(zhí)行,observer調(diào)用next方法,并傳入?yún)?shù)(即發(fā)射值),發(fā)射出去的值則作為subscribe中回調(diào)函數(shù)的參數(shù),即傳入function next(x)中的x,并執(zhí)行該回調(diào)函數(shù)。 自此只要被觀察者發(fā)生了變化,就會(huì)調(diào)用create的回調(diào)函數(shù)將變化的值通知觀察者,并執(zhí)行對應(yīng)observer的next()方法。
Observable - 被觀察者
Observable.subscribe(observer) - 對被觀察者進(jìn)行訂閱
Observer - 觀察者
創(chuàng)建被觀察者時(shí),傳入一個(gè)訂閱函數(shù),當(dāng)觀察者對被觀察者進(jìn)行訂閱時(shí),就調(diào)用這個(gè)訂閱函數(shù),訂閱函數(shù)的作用是向觀察者發(fā)射數(shù)據(jù),通知數(shù)據(jù)(流)的變化。觀察者根據(jù)發(fā)射過來的不同數(shù)據(jù),自行處理對應(yīng)處理函數(shù)里的邏輯。
源碼
看看源碼做了什么:
創(chuàng)建Observable的過程:
(1) 將create()的參數(shù)(一個(gè)匿名函數(shù)function(observer){})作為參數(shù)傳入構(gòu)造函數(shù),并保存該匿名函數(shù)到類變量this._subscribe。
Observable.create = function (subscribe) {
return new Observable(subscribe);
};
return Observable;
function Observable(subscribe) {
this._isScalar = false;
if (subscribe) {
this._subscribe = subscribe;
}
調(diào)用observable.subscribe的過程:
(1) 將subscribe()里的回調(diào)函數(shù)傳入。
(2) 通過toSubscriber_1.toSubscriber包裝成一個(gè)Subsciber對象即下文中的sink。該對象有一些屬性,其中包括next, error, complete屬性分別指向subscribe中對應(yīng)的回調(diào)函數(shù)。并通過this._subscribe(sink),將sink傳值給Rx.Observable.create(function(observer) {..})中的observer參數(shù)。
(3) 通過this._subscribe(sink)調(diào)用了create()里的回調(diào)函數(shù)即function(observer) { observer.next('Hello');},并執(zhí)行對應(yīng)的next()函數(shù),并傳入對應(yīng)的參數(shù)。
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
var operator = this.operator;
var sink = toSubscriber_1.toSubscriber(observerOrNext, error, complete);
if (operator) {
operator.call(sink, this.source);
}
else {
sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
}
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
return sink;
};
Observable.prototype._trySubscribe = function (sink) {
try {
return this._subscribe(sink);
}
catch (err) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
sink.error(err);
}
};