
前言
在上一篇文章中給小伙伴們介紹了 Agera 的相關(guān)概念以及基本使用。如果你對 Agera 還不了解,建議先看一下
Android官方響應(yīng)式框架Agera詳解:一、相關(guān)概念和基本使用
后一篇文章已經(jīng)更新啦~
Android官方響應(yīng)式框架Agera詳解:三、Repository的更新規(guī)則及Agera+Retrofit+Okhttp實(shí)戰(zhàn)
這里對把上一篇中幾個重要的類/接口再列出來,加深一下印象:
| 類/接口 | 描述 | 作用 |
|---|---|---|
| Updatable | 觀察者 | 更新事件 |
| Observable | 被觀察者 | 添加/刪除Updatable |
| Supplier | 數(shù)據(jù)提供者 | 提供一個數(shù)據(jù) |
| Receiver | 數(shù)據(jù)接收者 | 接收一個數(shù)據(jù) |
| Repository | 擁有提供數(shù)據(jù)能力的被觀察者 | 添加/刪除Updatable、提供一個數(shù)據(jù) |
| MutableRepository | 擁有 提供/更新 數(shù)據(jù)能力的被觀察者 | 添加/刪除Updatable、提供/更新一個數(shù)據(jù) |
在這篇文章中,我將給小伙伴們介紹 Rpository 的用法。這是 Agera 中最精髓最重要的部分,我盡量把它講清楚,如果有什么疑問的話可以留言交流
注:在上一篇文章中我們說過,Repository 分為 簡單的 Repository 和復(fù)雜的 Repository 。簡單的Repository 創(chuàng)建和使用在上一篇已經(jīng)講過,所以本篇的重點(diǎn)就是復(fù)雜的 Repository 的創(chuàng)建和使用。所以本文中以下所說的 Repository 默認(rèn)指的是復(fù)雜的Repository
目錄
一、如何創(chuàng)建Repository
二、數(shù)據(jù)處理流和操作符
三、編譯表達(dá)式中的相關(guān)配置
四、Attempts && Result
五、總結(jié)
六、相關(guān)代碼
七、預(yù)告
一、如何創(chuàng)建Repository
最開始介紹 Agera 的時候,有這么一句話:
通過加入函數(shù)式響應(yīng)式編程,Agera 可以在 什么時機(jī), 什么線程 和 什么數(shù)據(jù) 層面上更清晰的分離數(shù)據(jù)處理流程,并且使用一個接近自然語言的單個表達(dá)式就能編寫一個復(fù)雜的異步流
這段話就是對 Repository 的精確描述。一個 Repository 可以用單個Java表達(dá)式編譯出來。Java表達(dá)式由下面幾部分構(gòu)成,順序如下:
// 聲明RepositoryCompiler,并初始化,返回REventSource實(shí)例;
1. `Repositories.repositoryWithInitialValue(...)`;
// 指定事件源(Observable),可以多個,返回RFrequency實(shí)例;
2. Event sources - `.observe(...)`;
// 設(shè)置通知頻率(比如click頻率限制),返回RFlow實(shí)例;
3. Frequency of reaction - `.onUpdatesPer(...)` or `.onUpdatesPerLoop()`;
// 設(shè)置數(shù)據(jù)源(Supplier),返回RFlow或RTermination實(shí)例;
4. Data processing flow - `.getFrom(...)`, `.mergeIn(...)`, `.transform(...)`, etc.;
// 其他配置,返回RConfig;
5. Miscellaneous configurations - `.notifyIf(...)`, `.onDeactivation(...)`, etc.;
// 編譯成Repository實(shí)例。
6. `.compile()`.
上面就是創(chuàng)建一個 Repository 的偽代碼。
接下來,我們看一下具體到代碼中 Repository 是如何創(chuàng)建的:
Repository<String> repository =
Repositories.repositoryWithInitialValue("init") //1.聲明RepositoryCompiler,初始值為"init",返回REventSource實(shí)例
.observe() //2. 指定事件源(Observable),可以多個或者不指定,返回RFrequency實(shí)例
.onUpdatesPerLoop() //3. 設(shè)置通知頻率,返回RFlow實(shí)例
.getFrom(new Supplier<Double>() { //4. 設(shè)置數(shù)據(jù)源(Supplier),返回RFlow或RTermination實(shí)例
@NonNull
@Override
public Double get() {
//通過getFrom方法,獲得一個數(shù)據(jù)值。當(dāng)前值可以與聲明時的初始值不同(數(shù)值和類型都可以不同)
//當(dāng)前數(shù)據(jù)流的值為 5000000.00
return 5000000.00d;
}
})
.mergeIn(new Supplier<String>() { //5. 將當(dāng)前數(shù)據(jù)流中的值 和 一個通過 Supplier 提供的新值進(jìn)行合并,并返回一個值
@NonNull
@Override
public String get() {
//這里是新提供的一個值
return "祝大家的銀行卡里余額為:";
}
}, new Merger<Double, String, String>() {
@NonNull
@Override
public String merge(@NonNull Double integer, @NonNull String tAdd) {
//這里將前數(shù)據(jù)流中的值和新值合并,并返回
//新值為 "祝大家的銀行卡里余額為:"
//前數(shù)據(jù)流中的值 為 5000000.00d
//合并后的值為 "祝大家的銀行卡里余額為:5000000.00"
return tAdd + integer;
}
})
.sendTo(new Receiver<String>() { //6. 將當(dāng)前數(shù)據(jù)流中的值發(fā)送到 Receiver 對象中
@Override
public void accept(@NonNull String value) {
Log.d("tag", value);
}
})
.thenTransform(new Function<String, String>() {//7. 將數(shù)據(jù)流中的值進(jìn)行最后轉(zhuǎn)換
@NonNull
@Override
public String apply(@NonNull String input) {
//注意,轉(zhuǎn)換后的數(shù)據(jù)類型必須與初始化時的數(shù)據(jù)類型相同
return input + " 吼吼吼~";
}
})
.compile(); //8. 最后一步,編譯此數(shù)據(jù)流,返回Repiository對象
Updatable updatable = new Updatable() {
@Override
public void update() {
Log.d("tag", repository.get());
}
};
上面的這段代碼比較長,我詳細(xì)講解一下,里面涉及到一到方法和操作符,在接下來會具體講到。這里我們先關(guān)心的是創(chuàng)建 Repository 的流程及整個數(shù)據(jù)流中的數(shù)據(jù)是如何變化的
首先需要明確的是,創(chuàng)建 Repository 表達(dá)式的不同階段都返回 RepositoryCompilerStates 中內(nèi)嵌的接口(compiler state interfaces)對象,這樣可以每個階段只暴露合適的方法, 引導(dǎo)開發(fā)者完成正確的表達(dá)式
這三個階段分別是:
- 事件源和響應(yīng)頻率: RFrequency 和 REventSource
- 數(shù)據(jù)處理流程: RFlow 和 RSyncFlow
- 其它配置: RConfig
好,我們現(xiàn)在開始分析上面的代碼。大家看我標(biāo)注的注釋,我們根據(jù)序號進(jìn)行分析:
1. 初始化聲明
首先,我們使用Repositories.repositoryWithInitialValue("init") 聲明 RepositoryCompiler ,并指定了一個 String 類型的初始值:"init"。這一條語句返回的是一個 REventSource 實(shí)例,對應(yīng)上面提到的第一階段 事件源和響應(yīng)頻率: RFrequency 和 REventSource
注意,由于這一步返回的是 REventSource 實(shí)例,所以接下來我們能調(diào)用的方法只有 observe() 方法。這就體現(xiàn)了上面所說的每個階段只暴露合適的方法, 引導(dǎo)開發(fā)者完成正確的表達(dá)式
后面的步驟也是同樣的道理
2. 設(shè)置觀察源
第2步調(diào)用了方法 observe() ,這個方法可以接收0個到多個參數(shù)(參數(shù)類型是 Observable),返回 RFrequency 實(shí)例
沒有接收參數(shù)的時候,說明該 Repository 不需要監(jiān)聽其他額外的事件源,只有 Repository 第一次變?yōu)榧せ顮顟B(tài)時(即之前沒有注冊過 Updatable,第一次注冊 Updatable 后變?yōu)榧せ顮顟B(tài))整個數(shù)據(jù)流開始執(zhí)行
有接收參數(shù)的時候,除了第一次變?yōu)榧せ顮顟B(tài)時會執(zhí)行數(shù)據(jù)流,當(dāng)接收參數(shù)中的被觀察者狀態(tài)改變的時候,整個數(shù)據(jù)流也會重新開始執(zhí)行
該方法返回 RFrequency 實(shí)例,對應(yīng)第一階段 事件源和響應(yīng)頻率: RFrequency 和 REventSource
3. 設(shè)置通知頻率
RFrequency 對象對應(yīng)的方法有兩個,onUpdatesPer(int millis) 和 onUpdatesPerLoop()。這兩個方法的作用都是設(shè)置通知頻率,返回 RFlow 實(shí)例,對應(yīng)第二階段數(shù)據(jù)處理流程: RFlow 和 RSyncFlow
onUpdatesPerLoop 代表線程中的 looper 循環(huán)一次就更新一次數(shù)據(jù)流
onUpdatesPer(int millis) 則是根據(jù)我們傳入的時間間隔來更新數(shù)據(jù)流
Agera 底層的消息通知是通過 Handler 實(shí)現(xiàn)的,onUpdatesPerLoop 意思就是和 Looper 循環(huán)器保持同樣的更新頻率
4. 獲取一個新的數(shù)據(jù)
上面一步操作返回的是返回 RFlow 實(shí)例,對應(yīng)第二階段數(shù)據(jù)處理流程: RFlow 和 RSyncFlow。也就是說在從這一步,我們開始對數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行操作了
這一步使用 getFrom 方法從一個 Supplier 對象中拿到了一個 Double 類型的數(shù)據(jù):5000000.00d
注意,我們需要重點(diǎn)掌握的就是數(shù)據(jù)流中的數(shù)據(jù)是如何變化和轉(zhuǎn)換的。在最開始的時候我們聲明的是一個 String 類型的數(shù)據(jù) "init",到這一步變成了 Double 類型的數(shù)據(jù) 5000000.00d
getFrom 返回 RFlow 實(shí)例,對應(yīng)第二階段數(shù)據(jù)處理流程: RFlow 和 RSyncFlow。也就是說,接下來,我們還是處于第二階段,仍然可以對數(shù)據(jù)流進(jìn)行操作
在整個數(shù)據(jù)流中,不僅僅可以改變數(shù)據(jù)的值,連類型也是可以改變的。只需要保證最后的結(jié)果和最開始聲明的初始值類型一致即可,中間的數(shù)據(jù)可以根據(jù)需要任意轉(zhuǎn)換和處理
5. 合并一個數(shù)據(jù)
mergeIn 方法作用是將兩個數(shù)據(jù)合并。它有兩個參數(shù),第一個參數(shù)是 Supplier 對象,它提供的值就是要合并的值。第二個參數(shù)是一個 Merger 對象,它有三個泛型,第一個泛型代表 當(dāng)前數(shù)據(jù)流中的數(shù)據(jù)類型,也就是 Double,第二個泛型代表 要合并數(shù)據(jù)類型,即 Supplier 對象中提供的值的類型,也就是 String。第三個泛型代表 前面兩個值合并后返回的值的數(shù)據(jù)類型,這里我們是把兩個值拼接到一起了,所以就是 String
我們來看一下數(shù)據(jù)流中的值的變化,當(dāng)前數(shù)據(jù)流中的值是 Double 類型的 5000000.00d,mergeIn 方法中第一個參數(shù)提供了一個 String 類型的值 "祝大家的銀行卡里余額為:",后面的 Merger 中的操作是將這兩個值拼接起來,所以經(jīng)過這一步,數(shù)據(jù)流中的值就變成了 String 類型的 "祝大家的銀行卡里余額為:5000000.00"
mergeIn 方法返回 RFlow 實(shí)例,對應(yīng)第二階段數(shù)據(jù)處理流程: RFlow 和 RSyncFlow。接下來,我們還是處于第二階段,仍然可以對數(shù)據(jù)流進(jìn)行操作
6. 發(fā)送數(shù)據(jù)
sendTo 方法的作用是將數(shù)據(jù)流中的數(shù)據(jù)傳遞給一個 Receiver 對象。我們在 Receiver 對象中接收到后,打印出了當(dāng)前數(shù)據(jù)流中的值,即"祝大家的銀行卡里余額為:5000000.00"
sendTo 方法返回的是當(dāng)前對象this,也就是說整個數(shù)據(jù)流還處于第二階段,仍然可以對數(shù)據(jù)進(jìn)行操作
7. 轉(zhuǎn)換數(shù)據(jù),結(jié)束數(shù)據(jù)流處理
thenTransform 方法作用是對一個數(shù)據(jù)進(jìn)行轉(zhuǎn)換,并返回 RConfig 實(shí)例,對應(yīng)第三階段其它配置: RConfig
.then** 代表了數(shù)據(jù)流的結(jié)束,也就是說,經(jīng)過這個方法,我們對數(shù)據(jù)流的操作就結(jié)束了。
它的返回值是一個 RConfig 對象,對應(yīng)第三階段
thenTransform 方法參數(shù)是一個 Function 對象。
Function 對象的作用是根據(jù)提供的數(shù)據(jù),返回另一個數(shù)據(jù),也就是對數(shù)據(jù)進(jìn)行了一次轉(zhuǎn)換。它有兩個泛型參數(shù),第一個代表輸入值的類型,第二個代表輸出值的類型
輸入值,也就是當(dāng)前數(shù)據(jù)流中的值,即 String 類型的"祝大家的銀行卡里余額為:5000000.00"
輸出值,因?yàn)檫@是對數(shù)據(jù)處理的最后一步,輸出值的類型必須與最開始提供的初始值類型相同,即 String 類型
在拿到當(dāng)前數(shù)據(jù)流中的值后,我們在其后面追加了" 吼吼吼~",所以數(shù)據(jù)流的最終結(jié)果為 "祝大家的銀行卡里余額為:5000000.00 吼吼吼~"
8. 編譯此數(shù)據(jù)流
最后一步,編譯此數(shù)據(jù)流,返回 Repiository 對象
注意,在 .then** 操作后返回 RConfig 對象,這里還可以進(jìn)行一些特殊配置,下面我們會講。在這個例子中沒有用到,直接編譯成 Repiository 對象了
我們運(yùn)行一下程序,看一下輸出值:
05-21 16:27:58.463 11791-11791/com.cmos.agerademo D/tag: 祝大家的銀行卡里余額為:5000000.0
05-21 16:27:58.566 11791-11791/com.cmos.agerademo D/tag: 祝大家的銀行卡里余額為:5000000.0 吼吼吼~
可以看到,第一條log是我們在第6步的時候輸出的,
第二條log是在 Updatable 中輸出的,也就是最終的結(jié)果
到這里,關(guān)于創(chuàng)建一個 Repiository 對象的流程就已經(jīng)分析完了。我并沒有直接把所有的方法和操作符列舉出來,而是想先通過這個例子,讓大家了解一下整個數(shù)據(jù)流中的數(shù)據(jù)是如何變化的。這才是最最重要的,只有理解了這個,然后再去配合操作符,才能正確地流暢的去使用 Repository
我當(dāng)時在學(xué)習(xí)的時候,把重點(diǎn)放在了各種操作符還有方法上,用起來迷迷糊糊的,用了好久我都不清楚數(shù)據(jù)到底是怎么一步一步變化的!所以,一定要先把數(shù)據(jù)是如何變化的搞清楚,然后再去學(xué)習(xí)各種操作符
二、數(shù)據(jù)處理流和操作符
在上一部分,給大家介紹了創(chuàng)建 Repository 的表達(dá)式分為3個部分:
- 事件源和響應(yīng)頻率: RFrequency 和 REventSource
- 數(shù)據(jù)處理流程: RFlow 和 RSyncFlow
- 其它配置: RConfig
這三個部分所對應(yīng)的分別是 RepositoryCompilerStates 中內(nèi)嵌的接口(compiler state interfaces)對象
每一個接口對象擁有不同的方法,這樣可以每個階段只暴露合適的方法, 引導(dǎo)開發(fā)者完成正確的表達(dá)式
我把所有的接口和相關(guān)方法都列了出來:
接下來給大家介紹常用的操作符和方法
1. Supplier && getFrom
public interface Supplier<T> {
@NonNull
T get();
}
<TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier)
Supplier 是一個數(shù)據(jù)提供者,它是一個沒有輸入值,有一個輸出值的操作符
getFrom(Supplier<TCur> supplier) 方法作用是從Supplier中拿到數(shù)據(jù)值作為當(dāng)前數(shù)據(jù)流中的值

除了 getFrom(Supplier) 方法,其變種方法也是同樣的道理。比如:attemptGetFrom(Supplier)、thenGetFrom(Suppiler) 等
2. Function && transform
public interface Function<TFrom, TTo> {
@NonNull
TTo apply(@NonNull TFrom input);
}
/**
* Transform the input value using the given function into the output value.
*/
@NonNull
<TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);
Function 的作用是 基于一個輸入值,返回一個輸出值。它有兩個泛型參數(shù),第一個代表輸入值的數(shù)據(jù)類型,第二個代表輸出值的數(shù)據(jù)類型
transform(Function<? super TPre, TCur> function) 方法的作用是 將輸入值根據(jù)傳入的Function對象轉(zhuǎn)換成輸出值

除了 transform(Supplier) 方法,其變種方法也是同樣的道理。比如:attempTransform(Function)、thenTransform(Function) 等
3. Merger && mergeIn
public interface Merger<TFirst, TSecond, TTo> {
/**
* Computes the return value merged from the two given input values.
*/
@NonNull
TTo merge(@NonNull TFirst first, @NonNull TSecond second);
}
@NonNull
@Override
<TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, TCur> merger);
Merger 是作用是根據(jù)兩個輸入值,返回一個輸出值。它有3個泛型參數(shù),第一個代表第一個輸入值的數(shù)據(jù)類型,第二個代表第二個輸入值的數(shù)據(jù)類型,第三個代表輸出值的數(shù)據(jù)類型
mergeIn(Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, TCur> merger)的作用就是以當(dāng)前數(shù)據(jù)流中的值作為merger對象中的第一個輸入值,以supplier對象中的值作為merger對象中的第二個輸入值,并根據(jù)這兩個值返回一個數(shù)據(jù)

除了 mergeIn 方法,其變種方法也是同樣的道理。比如:attemptMergeIn、thenAttemptMergeIn 等
4. Receiver && sendTo
public interface Receiver<T> {
/**
* Accepts the given {@code value}.
*/
void accept(@NonNull T value);
}
TSelf sendTo(@NonNull Receiver<? super TPre> receiver);
Receiver 的作用是接收一個數(shù)據(jù)。它有一個泛型參數(shù),該泛型的類型就是接收值的數(shù)據(jù)類型
sendTo(@NonNull Receiver<? super TPre> receiver)方法的作用是將當(dāng)前數(shù)據(jù)流中的數(shù)據(jù)發(fā)送給Receiver對象

關(guān)于 sendTo 這里需要說明一下,它是把當(dāng)前的數(shù)據(jù)流中的值發(fā)送給 Receiver 對象。并不是復(fù)制一份給 Receiver 對象
這說明了什么?說明了 sendTo 這個操作是同步的!。當(dāng)在執(zhí)行 Receiver 對象中的方法的時候,整個數(shù)據(jù)流會阻塞,等到 Receiver 對象方法執(zhí)行完畢后數(shù)據(jù)流才繼續(xù)往下執(zhí)行
5. Binder 和 bindWith
public interface Binder<TFirst, TSecond> {
/**
* Accepts the given values {@code first} and {@code second}.
*/
void bind(@NonNull TFirst first, @NonNull TSecond second);
}
<TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier,
@NonNull Binder<? super TPre, ? super TAdd> binder);
Binder 的作用是 接收兩個值。它的泛型參數(shù)值分別代表第一個輸入值的數(shù)據(jù)類型、第二個輸入值數(shù)據(jù)類型。
它與上面提到的 Suppiler 是類似的,不同的是多接收了一個參數(shù)
bindWith 方法有兩個參數(shù),第一個參數(shù)是一個 Suppiler 對象,代表傳遞給 Binder 對象的第二個輸入值,Binder 對象中的第一個輸入值是當(dāng)前數(shù)據(jù)流中的值

注意,這個方法和 sendTo 方法類似,也是同步的
6. Predicate && check
public interface Predicate<T> {
/**
* Returns whether the predicate applies to the input {@code value}.
*/
boolean apply(@NonNull T value);
}
RTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate);
Predicate 的作用是根據(jù)一個輸入值,返回ture或者false
check(@NonNull Predicate<? super TPre> predicate) 的作用就是檢查 Predicate 對象中的返回值,如果為 true ,則繼續(xù)進(jìn)行后續(xù)數(shù)據(jù)流操作。如果為 false ,則跳過后續(xù)數(shù)據(jù)流或走失敗邏輯

7. .then**
這個.then是什么意思呢?它代表的意思就是對當(dāng)前數(shù)據(jù)流中數(shù)據(jù)處理部分(即數(shù)據(jù)處理流程: RFlow 和 RSyncFlow)已經(jīng)處理完畢,接下來要進(jìn)行的是第三部分的操作
我們看一下最開始的代碼
...
... //這里還有很多對數(shù)據(jù)流的操作
.sendTo(new Receiver<String>() {
...
...
})
//注意,這里的thenTransform就代表了這是對數(shù)據(jù)流處理的最后一步,執(zhí)行完這一步就進(jìn)入到下一部分了
.thenTransform(new Function<String, String>()
...
...
)
.compile();
.then** 有很多方法,比如 thenGetFrom、 thenTransform、thenAttemptTransform等
記住,當(dāng)我們對數(shù)據(jù)流處理的最后一步,一定是 .then** 的方法,這樣才能順利的進(jìn)入到下一步,否則的話會永遠(yuǎn)停留在數(shù)據(jù)流處理的階段
一般來說,最后一步使用最多的就是 thenTransform 這個方法。我們在最開始就講過,數(shù)據(jù)流結(jié)束后的數(shù)據(jù)類型必須和初始類型相同,而 .then** 表示的是數(shù)據(jù)處理的最后一步,所以一般情況下我們需要用 thenTransform 在數(shù)據(jù)流的最后一步把數(shù)據(jù)轉(zhuǎn)換成初始值的數(shù)據(jù)類型
8. goTo(Executor executor)
我們在上一篇就講過,Agera 切換線程非常的方便。沒錯,goTo 這個方法就是用來切換線程的
...
.goTo(Executors.newSingleThreadExecutor())
...
當(dāng)調(diào)用 goTo 方法后,此方法下面的數(shù)據(jù)流將會切換到指定的線程中去執(zhí)行
注,Agera 是支持多次線程切換的,所以可以多處調(diào)用 goTo 方法,將不同部分的處理切換到不同的線程中
好了,以上就是對 Repository 中數(shù)據(jù)流部分的方法和操作符的講解。如果你對這些還不是特別熟悉的話,建議你按照本篇最開始的代碼那樣,多寫幾遍,方法和操作符任意組合,數(shù)據(jù)任意轉(zhuǎn)換
注意,上面介紹的一些方法會有一些變種,比如 getFrom 方法就會有 attemptGetFrom 等。他們的作用是類似的,但又有不同的地方。下面我們會講解到
三、編譯表達(dá)式中的相關(guān)配置
在上面的部分,主要給大家介紹了關(guān)于數(shù)據(jù)流操作的方法和操作符,所有的操作都對應(yīng)于編譯表達(dá)式的第二階段:數(shù)據(jù)處理流程: RFlow 和 RSyncFlow
在這一部分,將會給大家介紹編譯表達(dá)式中的第一部分和第三部分
這里特別說明一下,下面介紹的有些內(nèi)容到目前為止可能并沒有接觸到,而且會有很少使用的一些方法,所以大家在看的時候可以先有個大體印象就好,不必深究
事件源和響應(yīng)頻率: RFrequency 和 REventSource
1. REventSource
interface REventSource<TVal, TStart> {
/**
* Specifies the event source of the compiled repository.
*/
@NonNull
RFrequency<TVal, TStart> observe(@NonNull Observable... observables);
}
REventSource 是一個接口,包含一個 observe 方法,它的作用就是設(shè)置監(jiān)聽的事件源
REventSource 實(shí)例是如何產(chǎn)生的呢?
Repositories.repositoryWithInitialValue("init")
就是我們編譯表達(dá)式的第一步,當(dāng)我們聲明一個 Repository 的初始值后,它返回的就是一個 REventSource 實(shí)例
Agera 這樣做的目的就是告訴我們,聲明完初始值下一步應(yīng)該設(shè)置事件源了,用于引導(dǎo)我們正確的完成表達(dá)式的編譯
也就是說,上面的代碼繼續(xù)寫下去,就只能是
Repositories.repositoryWithInitialValue("init")
.observe()
2. RFrequency
interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {
@NonNull
RFlow<TVal, TStart, ?> onUpdatesPer(int millis);
@NonNull
RFlow<TVal, TStart, ?> onUpdatesPerLoop();
}
RFrequency 也是一個接口,用于設(shè)置更新頻率
onUpdatesPerLoop 代表線程中的 looper 循環(huán)一次就更新一次數(shù)據(jù)流
onUpdatesPer(int millis) 則是根據(jù)我們傳入的時間間隔來更新數(shù)據(jù)流
Agera 底層的消息通知是通過 Handler 實(shí)現(xiàn)的,onUpdatesPerLoop 意思就是和 Looper 循環(huán)器保持同樣的更新頻率
RFrequency 實(shí)例是如何來的呢?它是通過 REventSource 的 observe 方法返回的
Repositories.repositoryWithInitialValue("init")
.observe() //返回RFrequency 實(shí)例
當(dāng)我們得到 RFrequency 實(shí)例后,就只能去設(shè)置更新頻率
Repositories.repositoryWithInitialValue("init")
.observe()
.onUpdatesPerLoop() //設(shè)置更新頻率
其它配置: RConfig
interface RConfig<TVal> {
RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);
RConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig);
RConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);
Repository<TVal> compile();
}
RConfig 接口代表的是整個編譯表達(dá)式的最后一部分。從最開始的初始化,然后進(jìn)行一系列的數(shù)據(jù)流操作之后,這個時候,就可以對 Repository 進(jìn)行最后的一些設(shè)置
其實(shí)還有兩個方法,由于不常使用,我沒有寫出來。如果感興趣的去可以自行去查看源碼
1. notifyIf(Merger<? super TVal, ? super TVal, Boolean> checker)
這個方法的作用是根據(jù) Merger 對象的返回值決定是否要通知 Updatable 更新
上面我們講過 Merger 是一個 兩個輸入值,一個輸出值的操作符,在 notifyIf 方法中,第一個輸入值代表的是舊的數(shù)據(jù)(即上一次數(shù)據(jù)處理流執(zhí)行完后得到的數(shù)據(jù)),第二個輸入值代表的是新的數(shù)據(jù)(即本次數(shù)據(jù)處理流執(zhí)行完畢后得到的新數(shù)據(jù))
我們可以看到,Merger 對象的第三個泛型參數(shù),也就是輸出值的類型,已經(jīng)被指定為 Boolean 型了,說明需要我們根據(jù)舊的數(shù)據(jù)和新的數(shù)據(jù),決定是否要通知Updatable 更新(返回值為true代表更新,false為不更新)
2. onDeactivation(@RepositoryConfig int deactivationConfig)
這個方法的作用是當(dāng)數(shù)據(jù)倉庫變?yōu)椴换钴S狀態(tài)時數(shù)據(jù)處理流的行為
它的參數(shù)是 RepositoryConfig 接口中的4個變量
//繼續(xù)數(shù)據(jù)流的執(zhí)行
1. RepositoryConfig.CONTINUE_FLOW
//取消數(shù)據(jù)流的執(zhí)行
2. RepositoryConfig.CANCEL_FLOW
//將倉庫中的數(shù)據(jù)值重新設(shè)置成初始值(針對onDeactivation方法)
//不重置數(shù)據(jù)流中的值,只是取消數(shù)據(jù)流的執(zhí)行(針對并發(fā)配置,即 onConcurrentUpdate 方法。下面會講到)
3. RepositoryConfig.RESET_TO_INITIAL_VALUE = 2 | CANCEL_FLOW
//當(dāng)數(shù)據(jù)流處于正在執(zhí)行中并且是異步狀態(tài)的時候(從第一次 goTo 指令后到 goLazy 指令前),中斷當(dāng)前運(yùn)行流程的線程
4. RepositoryConfig.SEND_INTERRUPT = 4 | CANCEL_FLOW;
上面對4個參數(shù)值都做了對應(yīng)的說明,我們使用onDeactivation 方法的時候,就可以控制數(shù)據(jù)倉庫變?yōu)椴换钴S狀態(tài)時數(shù)據(jù)處理流該如何執(zhí)行
舉個列子,我們設(shè)置成 .onDeactivation(RepositoryConfig.CANCEL_FLOW)。當(dāng)數(shù)據(jù)流正在執(zhí)行的時候,比如說正在加載一張圖片,這時候我們把頁面退出了,也就是調(diào)用了 repository.removeUpdatable(updatable); 這個時候倉庫就由活動狀態(tài)變成了非活動狀態(tài),那么根據(jù)我們的設(shè)置,正在加載圖片的數(shù)據(jù)流就會被取消,不再繼續(xù)往下執(zhí)行了
這個方法默認(rèn)的值是RepositoryConfig#CONTINUE_FLOW,即當(dāng)倉庫變?yōu)椴换钴S狀態(tài)時,仍然執(zhí)行數(shù)據(jù)流直到數(shù)據(jù)流執(zhí)行完畢
3. onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig)
這個方法的作用是聲明當(dāng)數(shù)據(jù)流正在執(zhí)行的同時又收到了一個事件源更新的情形下的數(shù)據(jù)流的行為
說簡單直白一點(diǎn),就是數(shù)據(jù)流正在執(zhí)行呢,收到了一個被觀察者的更新,這時候數(shù)據(jù)流應(yīng)該是繼續(xù)執(zhí)行還是取消執(zhí)行
這個方法的參數(shù)也是上面那4個值,上面已經(jīng)講解過
注,這個方法的默認(rèn)值是 RepositoryConfig#CONTINUE_FLOW,繼續(xù)數(shù)據(jù)流的執(zhí)行
4. compile()
這個方法的作用是編譯上面的數(shù)據(jù)流,返回一個 Repository 對象,這是整個編譯表達(dá)式的最后一步
小結(jié)
到這里,我們就對創(chuàng)建 Repository 的整個編譯表達(dá)式做了講解,其實(shí)還是很有規(guī)律的,總共分為3個部分:
- 先初始化,然后設(shè)置事件源和更新頻率
- 對數(shù)據(jù)進(jìn)行各種操作
- 設(shè)置一些需要的配置,最后調(diào)用compile()
Repository<String> repository =
//1. 先初始化,然后設(shè)置事件源和更新頻率
Repositories.repositoryWithInitialValue("init")
.observe()
.onUpdatesPerLoop()
//2. 對數(shù)據(jù)進(jìn)行各種操作
.getFrom(new Supplier<Float>() {
@NonNull
@Override
public Float get() {
return 50000000.00f;
}
})
...
...
.thenTransform(new Function<String, String>() {
@NonNull
@Override
public String apply(@NonNull String input) {
return input + " 吼吼吼~";
}
})
//3.設(shè)置一些需要的配置,最后調(diào)用compile()
.onDeactivation(RepositoryConfig.CANCEL_FLOW)
.onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)
.compile();
四、Attempts && Result
這部分會給大家講解一個非常非常重要的知識點(diǎn):Agera 中的異常處理
在前面所有的講解中,我們都默認(rèn)了這一步的操作是可以正確執(zhí)行的。實(shí)際上并不是這樣,異常無處不在。比如進(jìn)行一個網(wǎng)絡(luò)請求,我們就需要處理網(wǎng)絡(luò)請求失敗的情況
我們先看一下沒有使用異常處理是什么樣的:
Repository repository = Repositories.repositoryWithInitialValue(1)
.observe()
.onUpdatesPerLoop()
.thenGetFrom(new Supplier<Integer>() {
@NonNull
@Override
public Integer get() {
return 1 / 0;
}
})
.compile();
Updatable updatable = new Updatable() {
@Override
public void update() {
Log.d("tag", repository.get() + "");
}
};
我們想要去拿到 1/0 的結(jié)果,而且沒有做異常處理。運(yùn)行程序,崩潰了
java.lang.ArithmeticException: divide by zero
為了捕獲異常,Agera 提供了一個封裝類 Result,它可以封裝易失敗操作的結(jié)果(成功或者失敗)或者attempt的結(jié)果值
1. Result
Result 類中對可能失敗的操作做了異常封裝,保證程序能夠正常運(yùn)行下去而不是像上面那樣崩潰
Result 能夠做什么?
- 表示成功并提供一個數(shù)據(jù) -> Result.success(num)
- 表示失敗并提供異常信息 -> Result.failure(new Throwable("網(wǎng)絡(luò)請求失敗")
- 表示默認(rèn)缺省 -> Result.absent()
在 Result 類中,有一個 value 字段。當(dāng) Result 表示成功時候,可以通過 get() 方法獲取到該值。如果 Result 是失敗或者默認(rèn)缺省時,該值為空
Result 類中的常用方法:
//根據(jù)給定值返回一個成功的Result對象
public static <T> Result<T> success(@NonNull final T value) {
return new Result<>(checkNotNull(value), null);
}
//根據(jù)給定值返回一個成功的Result對象,它上面方法的別名
public static <T> Result<T> present(@NonNull final T value) {
return success(value);
}
//根據(jù)傳入的Throwable 返回一個失敗的Result對象
public static <T> Result<T> failure(@NonNull final Throwable failure) {
return failure == ABSENT_THROWABLE
? Result.<T>absent() : new Result<T>(null, checkNotNull(failure));
}
//返回一個失敗的Result對象
public static <T> Result<T> failure() {
return (Result<T>) FAILURE;
}
//返回該對象中封裝的值
public T get() throws FailedResultException {
if (value != null) {
return value;
}
throw new FailedResultException(failure);
}
//如果 Result 對象是成功的(即下面的value != null),將結(jié)果值發(fā)送給一個 Receiver 對象
public Result<T> ifSucceededSendTo(@NonNull final Receiver<? super T> receiver) {
if (value != null) {
receiver.accept(value);
}
return this;
}
//如果 Result 對象是失敗的(即下面的failure != null),將失敗信息發(fā)送給一個 Receiver 對象
public Result<T> ifFailedSendTo(@NonNull final Receiver<? super Throwable> receiver) {
if (failure != null) {
receiver.accept(failure);
}
return this;
}
注:關(guān)于 Result 這個類,是在是不知道該怎么把它講解好。如果你看得不太理解的話,建議多在代碼中去使用,并且去看一下這個類的源碼(這個類其實(shí)挺簡單的)
2. .attempt*
Agera 提供了能感知錯誤的指令,這樣就可以在錯誤的情況下終止流程
.attemptGetFrom(Supplier).or…;
.attemptTransform(Function).or…;
.attemptMergeIn(Supplier, Merger).or…,
我們把感知錯誤的指令與之前講到的普通指令對比一下:
普通指令不能處理異常,并且指令后面返回的是 RFlow 或 RSyncFlow ,能夠直接繼續(xù)進(jìn)行數(shù)據(jù)流操作
感知錯誤的指令能夠感知到異常,并且指令后面返回的是一個 RTerminationOrContinue 或者 RTermination 對象。這兩個對象提供的方法決定了當(dāng)發(fā)生異常后數(shù)據(jù)流該如何處理
我們來看一下發(fā)生異常后的處理方法:
//當(dāng)前一個指令發(fā)生異常時,跳過剩下的數(shù)據(jù)流(即不在繼續(xù)執(zhí)行),并且不通知 Updatable
1. orSkip()
//當(dāng)前一個指令發(fā)生異常時,在 Function 對象中,根據(jù)傳入的錯誤異常,返回一個值(這個值可以是成功也可以是失敗,按具體需要處理)
2. orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction)
//當(dāng)前面的指令發(fā)生異常時,繼續(xù)剩下的數(shù)據(jù)處理流。并使用 Result.getFailure() 中的值作為下一個指令的輸入值
3.orContinue()
好,接下來我們把上面的例子改造一下:
Repository repository2 = Repositories.repositoryWithInitialValue(0)
.observe()
.onUpdatesPerLoop()
//1.我們使用thenAttemptGetFrom方法,去處理一個可能發(fā)生異常的操作
.thenAttemptGetFrom(new Supplier<Result<? extends Integer>>() {
@NonNull
@Override
public Result<? extends Integer> get() {
try {
int num = 1 / 0;
//2. 如果不發(fā)生異常,返回一個表示成功的Result
return Result.success(num);
} catch (Exception e) {
e.printStackTrace();
//3.如果發(fā)生了異常,返回一個表示失敗的Result
return Result.failure(e);
}
}
})
//4.對異常情況進(jìn)行處理
.orEnd(new Function<Throwable, Integer>() {
@NonNull
@Override
public Integer apply(@NonNull Throwable input) {
//5. 當(dāng)發(fā)生異常的時候,返回一個-1
return -1;
}
})
.compile();
我們對于異常進(jìn)行處理后,運(yùn)行看一下效果,發(fā)現(xiàn)程序正常運(yùn)行,并且最后得到的值是-1
當(dāng)我們使用了.attempt* 指令和 Result 封裝后,整個數(shù)據(jù)流中的泛型就變得比之前復(fù)雜很多。這一塊的的知識講解起來比較抽象,不好描述。所以我還是建議大家多去寫寫,才能好更好的理解
五、總結(jié)
這一篇文章主要給大家介紹了如何創(chuàng)建一個 Repository 、以及 Repository 數(shù)據(jù)流中的方法和操作符還有對異常的處理
- 創(chuàng)建一個 Repository 分為3個部分,每一部分都有其特定的指令
Repository<String> repository =
//1. 先初始化,然后設(shè)置事件源和更新頻率
Repositories.repositoryWithInitialValue("init")
.observe()
.onUpdatesPerLoop()
//2. 對數(shù)據(jù)進(jìn)行各種操作
.getFrom(new Supplier<Float>() {
@NonNull
@Override
public Float get() {
return 50000000.00f;
}
})
...
...
.thenTransform(new Function<String, String>() {
@NonNull
@Override
public String apply(@NonNull String input) {
return input + " 吼吼吼~";
}
})
//3.設(shè)置一些需要的配置,最后調(diào)用compile()
.onDeactivation(RepositoryConfig.CANCEL_FLOW)
.onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)
.compile();
Agera 提供了能感知錯誤的指令
普通指令不能處理異常,并且指令后面返回的是 RFlow 或 RSyncFlow ,能夠直接繼續(xù)進(jìn)行數(shù)據(jù)流操作。
感知錯誤的指令能夠感知到異常,并且指令后面返回的是一個 RTerminationOrContinue 或者 RTermination 對象。這兩個對象提供的方法決定了當(dāng)發(fā)生異常后數(shù)據(jù)流該如何處理關(guān)于異常封裝的 Result 類 結(jié)合.attempt* 指令后,整個數(shù)據(jù)流中的泛型就變得比之前復(fù)雜很多。數(shù)據(jù)流中整體的數(shù)據(jù)是如何變化的,大家親自動手去寫去分析,這樣才能理解透徹
六、相關(guān)代碼
https://github.com/smashinggit/Study
注:此工程包含多個module,本文所用代碼均在AgeraDemo下
七、預(yù)告
本來計(jì)劃是這篇文章講解 Repository 并且配套講解一個實(shí)例,然后寫著寫著發(fā)現(xiàn)內(nèi)容實(shí)在是太多了。所以這一篇整體就比較偏理論
在下一篇文章中,我將會用實(shí)例去帶大家在實(shí)戰(zhàn)中體驗(yàn)一下 Agera 的強(qiáng)大,敬請期待~
后一篇文章已經(jīng)更新啦~
Android官方響應(yīng)式框架Agera詳解:三、Repository的更新規(guī)則及Agera+Retrofit+Okhttp實(shí)戰(zhàn)
注:由于本人水平有限,所以難免會有理解偏差或者使用不正確的問題。如果小伙伴們有更好的理解或者發(fā)現(xiàn)有什么問題,歡迎留言批評指正~