Rxjs【combineLatest, withLatestFrom, zip】

Rxjs學習之路

1、小貼士

這篇文章是我的Angular Rxjs Series中的第六篇文章,在繼續(xù)閱讀本文之前,您至少應(yīng)該熟悉系列中的第一篇基礎(chǔ)文章:

Rxjs6都改變了些什么?

Rxjs【Observable】

// 圖譜
// ----- 代表一個Observable
// -----X 代表一個Observable有錯誤發(fā)生
// -----| 代表一個Observable結(jié)束
// (1234)| 代表一個同步Observable結(jié)束

// 特別提示:以下的操作符介紹均采用rxjs6的寫法!??!

2、combineLatest

combineLatest可以接收多個Observable,但最后一個參數(shù)一定是callback function,這個回調(diào)函數(shù)接收的參數(shù)個數(shù)和前邊傳入Observable一一對應(yīng),最后需要注意:一定至少有2個Observable送出新值的時候才會執(zhí)行回調(diào)函數(shù)。
/**
 * 取得各個observable 最后送出的值,再輸出成一個值
 * combineLatest可以接收多個observable,最后一個參數(shù)是callback function,這個callback function接收的參數(shù)數(shù)量跟合并的observable數(shù)量相同
 * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
 * source:      ----0----1----2|
 * newest:      --0--1--2--3--4--5|
 *          combineLatest(newest, (x, y) => x + y);
 * example:     ----01--23-4--(56)--7|
 */
const source = interval(500).pipe(take(3));
const newest = interval(300).pipe(take(6));
const combineLatestObservable = source.pipe(combineLatest(newest, (x, y) => x + y));
this.combineLatestSubscription = combineLatestObservable.subscribe({
    next: (value) => { console.log('=====combineLatest操作符: ', value); },
    error: (err) => { console.log('=====combineLatest操作符: Error: ', err); },
    complete: () => { console.log('=====combineLatest操作符: complete!'); }
});
  • 從上邊的例子,我們可以看到,newest送出0時,source此時沒有送出值,因此不執(zhí)行回調(diào)函數(shù),當source送出0時,此時newest送出的最新值是之前的0,執(zhí)行回調(diào)0+0=0,以此類推...

3、withLatestFrom

withLatestFrom其實和combineLatest很像,唯一不同的是他多了一個主從關(guān)系,即只有主Observable送出新值的時候,才會執(zhí)行callback function,其他情況下不會觸發(fā)回調(diào)。
/**
 * 和combineLatest類似,但是withLatestFrom只有在主要的observable 送出新的值時,才會執(zhí)行callback,附隨的observable 只是在背景下運作
 * withLatestFrom 會在main 送出值的時候執(zhí)行callback,但請注意如果main 送出值時some 之前沒有送出過任何值callback 仍然不會執(zhí)行!
 * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
 * main:       ----h----e----l----l----o|
 * some:       --0--1--0--0--0--1|
 *         withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);
 * example:    ----h----e----l----L----O|
 */
const main = from('hello').pipe(zip(interval(500), (x, y) => x));
const some = from([0, 1, 0, 0, 0, 1]).pipe(zip(interval(300), (x, y) => x));
const withLatestFromObservable = main.pipe(withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x));
this.withLatestFromSubscription = withLatestFromObservable.subscribe({
    next: (value) => { console.log('=====withLatestFrom操作符: ', value); },
    error: (err) => { console.log('=====withLatestFrom操作符: Error: ', err); },
    complete: () => { console.log('=====withLatestFrom操作符: complete!'); }
});
  • 從上邊的例子,我們可以觀察到:main送出h時,some的最新值是上一次的0,0不等于1,所以原樣輸出main,即h。當main送出e時,some送出的新值是上一次0,0不等于1,所以原樣輸出main,即e。以此類推...

4、zip

zip會取每個Observable且按順序傳入callback function回調(diào)函數(shù)的參數(shù)中,簡單的理解就是一一對應(yīng),成雙成對。
/**
 * 取每個observable 相同順位的元素并傳入callback,也就是說每個observable 的第n 個元素會一起被傳入callback
 * zip 會把各個observable 相同順位送出的值傳入callback
 * zip 必須cache 住還沒處理的元素,當我們兩個observable 一個很快一個很慢時,就會cache 非常多的元素,等待比較慢的那個observable。這很有可能造成記憶體相關(guān)的問題!
 * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
 * source:      ----0----1----2|
 * newest:      --0--1--2--3--4--5|
 *            zip(newest, (x, y) => x + y)
 * exaple:      ----0----2----4|
 */
const source = interval(500).pipe(take(3));
const newest = interval(300).pipe(take(6));
const zipObservable = source.pipe(zip(newest, (x, y) => x + y));
this.zipSubscription = zipObservable.subscribe({
    next: (value) => { console.log('=====zip操作符: ', value); },
    error: (err) => { console.log('=====zip操作符: Error: ', err); },
    complete: () => { console.log('=====zip操作符: complete!'); }
});
完整例子
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription, interval, from } from 'rxjs';
import { take, combineLatest, zip, withLatestFrom } from 'rxjs/operators';

@Component({
    selector: 'app-rxjs-demo06',
    template: `
        <h3>Rxjs Demo06 To Study! -- Operators操作符(combineLatest, withLatestFrom, zip)</h3>
        <button (click)="combineLatestHandler()">combineLatest</button>
        <button class="mgLeft" (click)="withLatestFromHandler()">withLatestFrom</button>
        <button class="mgLeft" (click)="zipHandler()">zip</button>
        <app-back></app-back>
    `,
    styles: [`
        .mgLeft {
            margin-left: 20px;
        }
    `]
})
export class RxjsDemo06Component implements OnInit, OnDestroy {
    combineLatestSubscription: Subscription;
    withLatestFromSubscription: Subscription;
    zipSubscription: Subscription;

    constructor() { }

    ngOnInit(): void {
        // 圖譜
        // ----- 代表一個Observable
        // -----X 代表一個Observable有錯誤發(fā)生
        // -----| 代表一個Observable結(jié)束
        // (1234)| 代表一個同步Observable結(jié)束
    }

    combineLatestHandler() {
        /**
         * 取得各個observable 最后送出的值,再輸出成一個值
         * combineLatest可以接收多個observable,最后一個參數(shù)是callback function,這個callback function接收的參數(shù)數(shù)量跟合并的observable數(shù)量相同
         * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
         * source:      ----0----1----2|
         * newest:      --0--1--2--3--4--5|
         *          combineLatest(newest, (x, y) => x + y);
         * example:     ----01--23-4--(56)--7|
         */
        const source = interval(500).pipe(take(3));
        const newest = interval(300).pipe(take(6));
        const combineLatestObservable = source.pipe(combineLatest(newest, (x, y) => x + y));
        this.combineLatestSubscription = combineLatestObservable.subscribe({
            next: (value) => { console.log('=====combineLatest操作符: ', value); },
            error: (err) => { console.log('=====combineLatest操作符: Error: ', err); },
            complete: () => { console.log('=====combineLatest操作符: complete!'); }
        });
    }

    withLatestFromHandler() {
        /**
         * 和combineLatest類似,但是withLatestFrom只有在主要的observable 送出新的值時,才會執(zhí)行callback,附隨的observable 只是在背景下運作
         * withLatestFrom 會在main 送出值的時候執(zhí)行callback,但請注意如果main 送出值時some 之前沒有送出過任何值callback 仍然不會執(zhí)行!
         * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
         * main:       ----h----e----l----l----o|
         * some:       --0--1--0--0--0--1|
         *         withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);
         * example:    ----h----e----l----L----O|
         */
        const main = from('hello').pipe(zip(interval(500), (x, y) => x));
        const some = from([0, 1, 0, 0, 0, 1]).pipe(zip(interval(300), (x, y) => x));
        const withLatestFromObservable = main.pipe(withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x));
        this.withLatestFromSubscription = withLatestFromObservable.subscribe({
            next: (value) => { console.log('=====withLatestFrom操作符: ', value); },
            error: (err) => { console.log('=====withLatestFrom操作符: Error: ', err); },
            complete: () => { console.log('=====withLatestFrom操作符: complete!'); }
        });
    }

    zipHandler() {
        /**
         * 取每個observable 相同順位的元素并傳入callback,也就是說每個observable 的第n 個元素會一起被傳入callback
         * zip 會把各個observable 相同順位送出的值傳入callback
         * zip 必須cache 住還沒處理的元素,當我們兩個observable 一個很快一個很慢時,就會cache 非常多的元素,等待比較慢的那個observable。這很有可能造成記憶體相關(guān)的問題!
         * callback 都會依照合并的observable 數(shù)量來傳入?yún)?shù),如果我們合并了三個observable,callback 就會有三個參數(shù),而不管合并幾個observable 都會只會回傳一個值。
         * source:      ----0----1----2|
         * newest:      --0--1--2--3--4--5|
         *            zip(newest, (x, y) => x + y)
         * exaple:      ----0----2----4|
         */
        const source = interval(500).pipe(take(3));
        const newest = interval(300).pipe(take(6));
        const zipObservable = source.pipe(zip(newest, (x, y) => x + y));
        this.zipSubscription = zipObservable.subscribe({
            next: (value) => { console.log('=====zip操作符: ', value); },
            error: (err) => { console.log('=====zip操作符: Error: ', err); },
            complete: () => { console.log('=====zip操作符: complete!'); }
        });
    }

    ngOnDestroy() {
        if (this.combineLatestSubscription) {
            this.combineLatestSubscription.unsubscribe();
        }
        if (this.withLatestFromSubscription) {
            this.withLatestFromSubscription.unsubscribe();
        }
        if (this.zipSubscription) {
            this.zipSubscription.unsubscribe();
        }
    }
}

Marble Diagrams【寶珠圖】

1. 這個Marble Diagrams【寶珠圖】可以很靈活的表現(xiàn)出每個操作符的使用
2. 下面是超鏈接傳送門

Marble Diagrams【寶珠圖】

Angular Rxjs Series

  1. Rxjs6都改變了些什么?
  2. Rxjs【Observable】
  3. Rxjs【map、mapTo、filter】
  4. Rxjs【take, first, takeUntil, concatAll】
  5. Rxjs【skip, takeLast, last, concat, startWith, merge】
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容