響應(yīng)式編程是以異步和數(shù)據(jù)流來構(gòu)建事務(wù)關(guān)系的編程模型,異步和數(shù)據(jù)流是以構(gòu)建事務(wù)關(guān)系而存在,異步是為了區(qū)分無關(guān)的事務(wù),數(shù)據(jù)流是為了聯(lián)系有關(guān)的事務(wù)
版權(quán)聲明:本文來自門心叼龍的博客,屬于原創(chuàng)內(nèi)容,轉(zhuǎn)載請注明出處
https://blog.csdn.net/geduo_83/article/details/89736624
1.時間段選擇器,把一天24小時,按照每半個小時作為一個時間段進行分割,共生成48個時間段字符串集合進行返回
時間格式如下:
[00:00-00:30,
00:30-01:00,
01:00-01:30,
...
23:00-23:30,
23:30-00:00]
思路如下:
- 首先我們需要得到48個時間點如:
2019-01-01 00:00
2019-01-01 00:30
2019-01-01 01:00
2019-01-01 01:30
...
...
2019-01-01 23:00
2019-01-01 23:30
2019-01-01 24:00
- 首先我們需要得到48個時間點如:
- 計算完畢,兩兩組合
實現(xiàn)步驟:
- 1.創(chuàng)建了一個日期發(fā)射器
- 2.創(chuàng)建一個0-49范圍的有序整數(shù)序列的合集發(fā)射器
- 3.通過combineLatest將兩個兩個發(fā)射器的事件進行兩兩結(jié)合
- 4.計算得到48個時間點的事件
- 5.通過buffer緩存48個時間點事件
- 6.將得到的事件(00:00,00:30) 轉(zhuǎn)換為另外一個事件(00:00-00:30)
- 7.通過blockingFirst將計算結(jié)果返回
知識點:
- 1.Observable.create創(chuàng)建一個發(fā)射器
- 2.Observable.range創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的發(fā)射器
- 3.Observable.combineLatest組合兩個發(fā)射器
- 4.Observable.buffer緩存事件
- 5.Observable.blockingFirst阻塞返回第一個事件
代碼實現(xiàn):
public static List<String> getTimeData() {
//創(chuàng)建了一個日期發(fā)射器
Observable<Calendar> calendarObservable = Observable.create(new ObservableOnSubscribe<Calendar>() {
@Override
public void subscribe(ObservableEmitter<Calendar> emitter) throws Exception {
Calendar calendar = Calendar.getInstance();
calendar.set(2019, 0, 1, 0, 0);
emitter.onNext(calendar);
}
});
//創(chuàng)建了一個發(fā)射0到48的一個數(shù)字發(fā)射器,目的是給日期做累加計算
Observable<Integer> integerObservable = Observable.range(0, 49);
//對日期發(fā)射器和數(shù)字發(fā)射器進行兩兩組合
List<String> strings = Observable.combineLatest(calendarObservable, integerObservable, new BiFunction<Calendar, Integer, String>() {
@Override
public String apply(Calendar calendar, Integer integer) throws Exception {
calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) + (integer == 0 ? 0 : 30));
return DateUtil.formatDate(calendar.getTime(), DateUtil.FormatType.HHmm);
}
}).buffer(49).map(new Function<List<String>, List<String>>() {
@Override
public List<String> apply(List<String> list) throws Exception {
List<String> timeLine = new ArrayList<>();
for (int i = 0; i < list.size() - 1; i++) {
timeLine.add(list.get(i) + "-" + list.get(i + 1));
}
return timeLine;
}
}).blockingFirst();
return strings;
}
2.地圖加載完畢且定位成功之后,顯示當前定位點
在RxJava出現(xiàn)之前解決這類問題的確是一件很麻煩的事情,地圖加載和定位成功兩件事都完成之后才能進行顯示當前點的操作,否則就有可能出現(xiàn),當定位完成了,但是地圖還沒有加載完畢,而是導致定位點的縮放級別不能顯示的問題,有了RxJava這些問題已經(jīng)變得迎刃而解了。
實現(xiàn)步驟:
1.創(chuàng)建一個地圖加載的發(fā)射器
ObservableEmitter<Boolean> mMapLoadEmitter;
Observable<Boolean> mapLoadObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
mMapLoadEmitter = emitter;
KLog.v(TAG,"mapLoadObservable succ");
}
});
2.創(chuàng)建一個地圖定位的發(fā)射器
ObservableEmitter<PoiInfo> mLocationEmitter;
Observable<PoiInfo> mapLocationObservable = Observable.create(new ObservableOnSubscribe<PoiInfo>() {
@Override
public void subscribe(ObservableEmitter<PoiInfo> emitter) throws Exception {
KLog.v(TAG,"mapLocationObservable succ");
mLocationEmitter = emitter;
}
});
3.開啟地圖加載監(jiān)聽,加載成功發(fā)射事件
當?shù)貓D加載成功的時候,發(fā)射地圖加載成功的事件
mMap.setOnMapLoadedListener(new AMap.OnMapLoadedListener() {
@Override
public void onMapLoaded() {
KLog.v(TAG,"onMapLoaded succ");
KLog.v(TAG,"mMapLoadEmitter:"+(mMapLoadEmitter == null ? "null":"not null"));
if(mMapLoadEmitter != null){
mMapLoadEmitter.onNext(true);
}
}
});
4.開啟地圖定位監(jiān)聽,定位成功發(fā)射事件
當定位成功的時候,發(fā)射定位成功的事件
mMap.setOnMyLocationChangeListener(new AMap.OnMyLocationChangeListener() {
@Override
public void onMyLocationChange(Location location) {
KLog.json(TAG, location.toString());
KLog.v(TAG,"onMyLocationChange succ");
KLog.v(TAG,"mLocationEmitter:"+(mLocationEmitter == null ? "null":"not null"));
mPoiInfo = new PoiInfo(location.getLatitude(), location.getLongitude());
if(mLocationEmitter != null){
mLocationEmitter.onNext(mPoiInfo);
}
}
});
5. 對地圖加載發(fā)射器和地圖定位發(fā)射器的事件進行打包合并處理
當?shù)貓D加載成功,且地圖定位成功,則顯示當前位置點信息
Observable.zip(mapLoadObservable, mapLocationObservable, new BiFunction<Boolean, PoiInfo, PoiInfo>() {
@Override
public PoiInfo apply(Boolean aBoolean, PoiInfo poiInfo) throws Exception {
return poiInfo;
}
}).subscribe(new Consumer<PoiInfo>() {
@Override
public void accept(PoiInfo poiInfo) throws Exception {
KLog.v(TAG,"zip succ");
mMap.animateCamera(CameraUpdateFactory.newLatLngZoom(new LatLng(poiInfo.getLat(), poiInfo.getLon()),14));
}
});
}
知識點:
- 1.Observable.create創(chuàng)建發(fā)射器
- 2.Observable.zip打包發(fā)射器
3. 網(wǎng)絡(luò)請求完畢數(shù)據(jù)返回之后,根據(jù)頭像的url對頭像bitmap數(shù)據(jù)進行下載,最后最終將結(jié)果數(shù)據(jù)返回
實現(xiàn)步驟:
- 1.將GetAppyDetailResponse事件轉(zhuǎn)化為List<ApprovalProgress>事件
- 2.將List<ApprovalProgress>事件轉(zhuǎn)為一個能發(fā)射多個事件的發(fā)射器
- 3.將每個ApprovalProgress事件都轉(zhuǎn)換為另外一個能下載圖片的發(fā)射器
- 4.圖片下載完畢,將事件進行發(fā)射
- 5.將圖片下載的事件進行緩存
- 6.所有的圖片都下載完畢之后最終將所有數(shù)據(jù)返回
知識點:
- 1.Observable.map 將一個事件轉(zhuǎn)換為另外一個事件
- 2.Observable.flatMap 將一個事件轉(zhuǎn)換為另外一個發(fā)射器
- 3.Observable.toList 將所有的發(fā)射事件進行打包
代碼實現(xiàn):
public Single getApplyDetail(GetAppyDetailRequest request) {
cacheAppyDetailResponse = null;
return mUseCarService
.getApplyDeatail(request)
.compose(RxAdapter.bindUntilEvent(getLifecycle()))
.compose(RxAdapter.schedulersTransformer())
.compose(RxAdapter.exceptionTransformer())
// 把一個事件轉(zhuǎn)化為另外一個事件
.map(new Function<GetAppyDetailResponse, List<ApprovalProgress>>() {
@Override
public List<ApprovalProgress> apply(GetAppyDetailResponse getAppyDetailResponse) throws Exception {
cacheAppyDetailResponse = getAppyDetailResponse;
List<ApprovalProgress> handleprogress = getAppyDetailResponse.handleprogress;
// count = handleprogress.size();
// KLog.v(TAG,"count:"+count);
for(int i = 0; i < handleprogress.size(); i++){
handleprogress.get(i).orderid = i;
}
return handleprogress;
}
})
// 把一個事件轉(zhuǎn)化為另外一個發(fā)射器
.flatMap(new Function<List<ApprovalProgress>, Observable<ApprovalProgress>>() {
@Override
public Observable<ApprovalProgress> apply(List<ApprovalProgress> approvalProgresses)
throws Exception {
return Observable.fromIterable(approvalProgresses);
}
// 繼續(xù)把一個事件轉(zhuǎn)化為另外一個發(fā)射器
})
.flatMap(new Function<ApprovalProgress, ObservableSource<ApprovalProgress>>() {
@Override
public ObservableSource<ApprovalProgress> apply(final ApprovalProgress approvalProgress)
throws Exception {
return Observable.create(new ObservableOnSubscribe<ApprovalProgress>() {
@Override
public void subscribe(final ObservableEmitter<ApprovalProgress> emitter) throws Exception {
String headphoto = approvalProgress.headphoto;
int resid = R.drawable.usercar_pic_user_bg;
KLog.v(TAG, "headurl:" + headphoto);
GlideApp.with(getContext()).asBitmap().load(headphoto)
.into(new SimpleTarget<Bitmap>() {
@Override
public void onResourceReady(Bitmap resource,
Transition<? super Bitmap> transition) {
KLog.v(TAG, "header bitmap download succ");
approvalProgress.header = BitmapUtil.scaleTo(resource,
DisplayUtil.dip2px(40), DisplayUtil.dip2px(40));
emitter.onNext(approvalProgress);
emitter.onComplete();
}
@Override
public void onLoadFailed(@Nullable Drawable errorDrawable) {
//super.onLoadFailed(errorDrawable);
approvalProgress.header = BitmapFactory.decodeResource(getContext().getResources(),R.drawable.usercar_pic_user_bg);
emitter.onNext(approvalProgress);
emitter.onComplete();
}
});
}
});
}
}).toList().map(new Function<List<ApprovalProgress>, GetAppyDetailResponse>() {
@Override
public GetAppyDetailResponse apply(List<ApprovalProgress> approvalProgresses) throws Exception {
KLog.v(TAG, "emitter succ");
Collections.sort(approvalProgresses);
cacheAppyDetailResponse.handleprogress = approvalProgresses;
return cacheAppyDetailResponse;
}
}).compose(RxAdapter.singleSchedulersTransformer()).compose(RxAdapter.singleExceptionTransformer())
.compose(RxAdapter.<GetAppyDetailResponse> bindUntilEvent(getLifecycle())); // 最后返回到主線程;
}
4.進入首頁要進行權(quán)限檢查,檢查完畢再進行升級檢查
實現(xiàn)步驟:
- 1.創(chuàng)建一個權(quán)限檢查的發(fā)射器
- 2.對檢查的結(jié)果事件轉(zhuǎn)換為另外一個升級的發(fā)射器
- 3.訂閱該發(fā)射器處理事件結(jié)果
知識點:
- 1.Observable.flatMap將一個事件轉(zhuǎn)換為另外一個發(fā)射器
代碼實現(xiàn):
@Override
public void checkPermisionAndUpgrade(FragmentActivity activity) {
//先進行權(quán)限檢查,在進行升級檢查,避免同時出現(xiàn)權(quán)限授權(quán)對話框和升級對話框
new RxPermissions(activity).request(Manifest.permission.ACCESS_FINE_LOCATION, Manifest.permission.READ_EXTERNAL_STORAGE, Manifest.permission.WRITE_EXTERNAL_STORAGE).flatMap(new Function<Boolean, ObservableSource<ClientVersionResponse>>() {
@Override
public ObservableSource<ClientVersionResponse> apply(Boolean aBoolean) throws Exception {
if (!aBoolean) {
ToastUtil.showToast("缺少定位權(quán)限、存儲權(quán)限,這會導致地圖、導航、拍照等部分功能無法使用");
}
int versionCode = EnvironmentUtil.getAppVersionCode(mContext);
return mModel.getAppVersion(new ClientVersionRequest(String.valueOf(versionCode)));
}
}).subscribe(new Observer<ClientVersionResponse>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ClientVersionResponse clientVersionResponse) {
KLog.v(TAG,"upgrade check succ...");
if (UpgradeType.MUST.type.equalsIgnoreCase(clientVersionResponse.getVersion().upgrade) || UpgradeType.SUGGEST.type.equalsIgnoreCase(clientVersionResponse.getVersion().upgrade)) {
//存儲版本升級信息
VersionManagerUtils.setLastVersion(clientVersionResponse.getVersion());
mView.showUpdateDialog(clientVersionResponse.getVersion());
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
5.多圖片上傳,先進行token請求,請求完畢再進行圖片上傳
圖片上傳可以說是一個在常見不過的功能了,在app的個人設(shè)置中上傳一個個人頭像,用微信發(fā)一個帶幾張圖的朋友圈,都會用到圖片上傳的功能,一般情況下圖片上傳的時候,先要獲取上傳的權(quán)限,也就是先要獲取Token才能進行進行下一步的圖片上傳操作
實現(xiàn)步驟:
- 1.創(chuàng)建一個獲取token的Observable
- 2.將token事件轉(zhuǎn)換為另外一個發(fā)送多個事件的Observable
- 3.將每一個MediaRequest事件再轉(zhuǎn)換為圖片上傳的Observable
- 4.圖片上傳完畢發(fā)射該事件
- 5.將圖片下載成功的事件機型緩存
- 6.全部下載完畢再返回數(shù)據(jù)
知識點:
- 1.Observable.flatMap
- 2.Obserable.create
/**
* 批量上傳文件,帶進度條的
*
* @param context
* @param type : 上傳的業(yè)務(wù)類型 詳見UploadImageUtil.FILE_TYPE_BLOG
* @param callback
* @param urls
* @param listener
*/
public static void uploadFile(final Context context, final int type, final String callback, final List<String> urls, final ResponseProgressListener listener) {
Log.v(TAG, "uploadImages start...");
final List<MediaResponse> list = new ArrayList<>();
// 1.通過服務(wù)創(chuàng)建了一個請求Token的命令
Observable<UserTokenResponse> observable = RetrofitManager.getInstance().getCommonService().getToken(new TokenRequest(type));
// 2.將執(zhí)行的結(jié)果和要上傳的多條數(shù)據(jù)轉(zhuǎn)化為一個新的Observable
observable.flatMap(new Function<UserTokenResponse, ObservableSource<MediaRequest>>() {
@Override
public ObservableSource<MediaRequest> apply(UserTokenResponse userTokenResponse) throws Exception {
Log.v(TAG, "get token succ...");
Log.v(TAG, "thread id" + Thread.currentThread().getName());
List<MediaRequest> mList = new ArrayList<>();
STSV2 stsinfo = userTokenResponse.getStsinfo();
for (int i = 0; i < urls.size(); i++) {
Log.v(TAG, urls.get(i));
mList.add(new MediaRequest(stsinfo, urls.get(i), i));
}
return Observable.fromIterable(mList);
}
}).flatMap(new Function<MediaRequest, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(final MediaRequest mediaRequest) throws Exception {
// 3.對發(fā)射過來的每一條數(shù)據(jù)再次進行轉(zhuǎn)換成一個可以執(zhí)行上傳任務(wù)的發(fā)射器,并將執(zhí)行結(jié)果進行發(fā)送
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
Log.v(TAG, "start upload...");
Log.v(TAG, "thread id" + Thread.currentThread().getName());
try {
final STSV2 stsv2 = mediaRequest.getSTSV2();
OSSCredentialProvider credentialProvider = new OSSStsTokenCredentialProvider(stsv2.keyid, stsv2.keysecret, stsv2.securitytoken);
OSSLog.enableLog();
final OSS oss = new OSSClient(context, stsv2.endpoint, credentialProvider);
String currurl = mediaRequest.getUrl();
final String fileName = MD5Util.MD5(UUID.randomUUID().toString()) + "." + currurl.substring(currurl.lastIndexOf(".") + 1);
String fileOssName = stsv2.filepath + fileName;
byte[] imageCompressByte;
if (FileUtil.isImageFile(currurl)) {
imageCompressByte = BitMapUtils.getImageCompressByte(currurl);
} else {
imageCompressByte = FileUtil.getFileByte(currurl);
}
final PutObjectRequest put = new PutObjectRequest(stsv2.bucket, fileOssName, imageCompressByte);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("application/octet-stream");
put.setMetadata(metadata);
final StringBuilder backUrlSB = new StringBuilder();
backUrlSB.append("bucket=").append(stsv2.bucket);
backUrlSB.append("&filename=").append(fileOssName);
backUrlSB.append("&filetype=").append(type);
backUrlSB.append("&").append(callback);
put.setCallbackParam(new HashMap<String, String>() {
{
put("callbackUrl", stsv2.callbackurl);
put("callbackBody", backUrlSB.toString());
}
});
put.setProgressCallback(new OSSProgressCallback<PutObjectRequest>() {
@Override
public void onProgress(PutObjectRequest putObjectRequest, long l, long l1) {
if (l > 0 && l1 > 0) {
emitter.onNext(l + "|" + l1 + "|" + urls.size() + "|" + mediaRequest.getId());
}
}
});
oss.asyncPutObject(put, new OSSCompletedCallback<PutObjectRequest, PutObjectResult>() {
@Override
public void onSuccess(PutObjectRequest putObjectRequest, PutObjectResult putObjectResult) {
Log.v(TAG, "video upload succ...");
Log.v(TAG, "thread id" + Thread.currentThread().getName());
int[] imgSize = BitMapUtils.getImageSize(mediaRequest.getUrl());
emitter.onNext(new MediaResponse(TextUtils.concat(stsv2.fileurl, fileName).toString(), imgSize[0], imgSize[1]));
}
@Override
public void onFailure(PutObjectRequest putObjectRequest, ClientException e, ServiceException e1) {
Log.v(TAG, "video upload fail...");
emitter.onError(e1);
}
});
} catch (Exception e1) {
e1.printStackTrace();
}
}
});
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "onSubscribe start...");
if (listener != null) {
listener.onStart();
}
}
@Override
public void onNext(Object object) {
Log.v(TAG, "onNext start...");
Log.v(TAG, "thread id" + Thread.currentThread().getName());
if (listener != null) {
if (object instanceof String) {
String original = (String) object;
if (original.contains("|")) {
String[] split = original.split("\\|");
int progress = Integer.parseInt(split[0]);
int total = Integer.parseInt(split[1]);
int size = Integer.parseInt(split[2]);
int position = Integer.parseInt(split[3]);
//當前的圖片所占自己的百分比
float v = (float) progress / total;
//當前圖片所占所有圖片的百分比
float v1 = (float) 1 / size * v;
//之前上傳過得百分比
float v2 = (float) 1 / size * position;
//所占總的百分比
int v3 = (int) ((v1 + v2) * 100);
Log.v(TAG, "progress:" + progress + ";total:" + total + ";present:" + v3);
listener.onProgress(v3, 100);
}
} else if (object instanceof MediaResponse) {
list.add((MediaResponse) object);
if (list.size() == urls.size()) {
listener.onSuccess(list);
}
Log.v(TAG, "observer receive succ");
}
}
}
@Override
public void onError(Throwable e) {
Log.v(TAG, "onError start...");
if (listener != null) {
listener.onError(e);
}
}
@Override
public void onComplete() {
Log.v(TAG, "onComplete start...");
if (listener != null) {
listener.onComplete();
}
}
});
}
6.對于請求的事件數(shù)據(jù)進行轉(zhuǎn)換
實現(xiàn)步驟:
- 將GetUseStatusListResponse事件轉(zhuǎn)化為List<VehicleUseStatus>事件
- 2.List<VehicleUseStatus>事件轉(zhuǎn)換為另外一個Observable
- 3.對VehicleUseStatus事件進行操作
- 4.緩存 VehicleUseStatus事件
- 5.訂閱Observable并返回處理的最終結(jié)果
知識點:
- Observable.map
- Observable.flatMap
- Observable.toList
mModel.getCarStateList(new GetUseStatusListRequest(currDate, enterpriseid, stardid)).map(new Function<GetUseStatusListResponse, List<VehicleUseStatus>>() {
@Override
public List<VehicleUseStatus> apply(GetUseStatusListResponse getUseStatusListResponse) throws Exception {
stardid = getUseStatusListResponse.nextid;
return getUseStatusListResponse.statuslist;
}
}).flatMap(new Function<List<VehicleUseStatus>, ObservableSource<VehicleUseStatus>>() {
@Override
public ObservableSource<VehicleUseStatus> apply(List<VehicleUseStatus> vehicleUseStatuses) throws Exception {
return Observable.fromIterable(vehicleUseStatuses);
}
}).map(new Function<VehicleUseStatus, VehicleUseStatus>() {
@Override
public VehicleUseStatus apply(VehicleUseStatus o) throws Exception {
String usetime = o.usetime;
if (!TextUtils.isEmpty(usetime)) {
String[] split = usetime.split("\\|");
if (split != null && split.length > 0) {
o.useposition = getTimePositon(Arrays.asList(split));
}
}
return o;
}
}).toList().subscribe(new SingleObserver<List<VehicleUseStatus>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<VehicleUseStatus> vehicleUseStatuses) {
isfirst = false;
if (isrefresh) {
String nowDate = DateUtil.formatDate(new Date(), DateUtil.FormatType.MMdd);
String tempDate = DateUtil.formatDate(currDate, DateUtil.FormatType.yyyyMMdd,DateUtil.FormatType.MMdd);
mView.showTopBarCurrDate(nowDate.equals(tempDate) ? tempDate + "今天" : tempDate);
mView.enableTopBarLeftBtn(nowDate.equals(tempDate) ? false : true);
if(vehicleUseStatuses != null && vehicleUseStatuses.size() > 0){
mView.refreshData(vehicleUseStatuses,currDate);
}else{
mView.showNoDataView();
}
mView.stopRefresh();
} else {
mView.loadMoreData(vehicleUseStatuses);
mView.stopLoadMore();
}
}
@Override
public void onError(Throwable e) {
if (isrefresh) {
mView.stopRefresh();
if(isfirst){
if(e instanceof ResponseThrowable){
ResponseThrowable throwable = (ResponseThrowable) e;
if(throwable.code == ExceptionHandler.SYSTEM_ERROR.TIMEOUT_ERROR){
mView.showNetWorkErrView();
}
}else{
mView.showNoDataView();
}
}
} else {
mView.stopLoadMore();
}
}
});
7.車輛設(shè)備id過濾
實現(xiàn)步驟:
- 1.對車輛集合中的車輛進行過濾,過濾有設(shè)備的車輛
- 2.對過濾后的車輛事件進行轉(zhuǎn)換為設(shè)備事件
- 3.將設(shè)備集合事件轉(zhuǎn)換為一個Observable
- 4.對設(shè)備事件進行過濾,過濾為0的設(shè)備
- 5.對設(shè)備事件轉(zhuǎn)換為設(shè)備的id事件
- 6.緩存設(shè)備id事件
- 7.訂閱這個事件并返回結(jié)果
知識點:
- 1.Observable.fromIterable
- 2.Observable.filter
- 3.Observable.map
- 4.Obserable.flatMap
- 5.Obserable.buffer
@Override
public void filterVehicleDevice(List<VehicleInfo> vehicleInfoList, final VehicleInfo vehicleInfo) {
Observable.fromIterable(vehicleInfoList).filter(new Predicate<VehicleInfo>() {
@Override
public boolean test(VehicleInfo vehicleInfo) throws Exception {
// 將有設(shè)備的車輛過濾下來
return vehicleInfo.getDevices() != null
&& vehicleInfo.getDevices().size() > 0;
}
}).map(new Function<VehicleInfo, List<DeviceInfo>>() {
@Override
public List<DeviceInfo> apply(VehicleInfo vehicleInfo) throws Exception {
// 對事件進行轉(zhuǎn)換,將VehicleInfo轉(zhuǎn)化為DeviceInfo
return vehicleInfo.getDevices();
}
}).flatMap(new Function<List<DeviceInfo>, ObservableSource<DeviceInfo>>() {
@Override
public ObservableSource<DeviceInfo> apply(List<DeviceInfo> deviceInfos) throws Exception {
// 再次將device事件轉(zhuǎn)化為Observable
return Observable.fromIterable(deviceInfos);
}
}).filter(new Predicate<DeviceInfo>() {
@Override
public boolean test(DeviceInfo deviceInfo) throws Exception {
// 對傳遞過來的DeviceInfo進行過濾
return "0".equals(deviceInfo.devicetype);
}
}).map(new Function<DeviceInfo, String>() {
@Override
public String apply(DeviceInfo deviceInfo) throws Exception {
// 再次對事件進行轉(zhuǎn)換
return deviceInfo.deviceid;
}
}).buffer(Integer.MAX_VALUE).subscribe(new Observer<List<String>>() {
ArrayList<String> listBox;
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
// 對過濾的數(shù)據(jù)把它緩存起來,然后進行打包輸出
if (mView == null) {
return;
}
listBox = new ArrayList<>(strings);
}
@Override
public void onError(Throwable e) {
mView.filterVehicleDeviceComplete(listBox, vehicleInfo);
}
@Override
public void onComplete() {limView.filterVehicleDeviceComplete(listBox, vehicleInfo);
}
});
}