前言 這個庫在angular 中已經集成了,所以使用起來有良好的代碼提示,但是在Vue中不行,一點提示都沒有,下面的代碼都在Vue項目中使用,以此分享自己在學習的體會:
一、初始RxJS (1)安裝與導入 命令
按需導入:
import { Observer } from "rxjs";
(2)Observable的工作 說明: ?Observable可以理解成被觀察者 ,Observer就是觀察者,連接兩者的橋梁就是Observable對象的函數subscribe,同時RxJS中的數據流就是Observable對象,它實現了觀察者模式和迭代器模式 ,這里聊聊前者。
=> 觀察者模式 <= 解決問題: ?它需要解決在一個持續產生事件的系統中,如何分割功能,讓不同模塊只需要處理一部分邏輯。
解決方法: ?將邏輯分為發布者和觀察者,發布者只管生產事件,之后將事件上注冊一個觀察者,至于事件如何被觀察者處理它不關心;同樣觀察者只管將接收到的事件處理掉,不關心它是如何產生的。
與RxJS的聯系: ?Observable對象就是一個發布者,通過函數subscribe將其與觀察者Observer聯系起來。
import { of } from "rxjs";
// of操作符會返回一個observable對象,將傳入的內容依次發射出來;
// 此時scoure$就是一個發布者,它產生的事件就是三個整數
const scoure$ = of(1, 2, 3);
// 這里console.log作為觀察者,將傳給它的內容輸出出來,
// 不管數據是怎么產生的
scoure$.subscribe(console.log);
處理步驟: 產生事件:這是發布者的責任,也就是Observable對象的工作。?響應事件 :這是觀察者的責任,也就是由subscribe的參數決定。 發布者如何關聯觀察者:也就是何時調用subscribe。 => 迭代器模式 <= 說明: ?它提供一個通用的接口來遍歷數據集合的對象,并且讓使用者不用關心這個數據集合是如何實現的。從數據消費的角度,迭代器實現分為拉和推兩種,簡單理解就是拉取數據和推送數據,RxJS屬于后者,它作為迭代器的使用者,并不需要主動去從Observable 中拉數據,而是只要subscribe上Observable對象之后,然后就能夠收到消息的推送。
=> 創造Observable <= 執行過程: ?創建一個Observable,也就是創建一個發布者,這個發布者接收一個onSubscribe用于與觀察者產生聯系,當發布者通過subscribe將其注冊給觀察者后,這個函數就會執行,函數的參數就是觀察者對象,對這個對象的唯一要求就是需要存在next屬性,屬性的值是一個函數,用來接收傳過來的數據
// 0.用于定義發布者
import { Observable } from "rxjs";
// 4.觸發后這個函數的參數是觀察者的一個包裝,
// ? 它與觀察者并不等價
const onSubscribe = (observer) => {
?observer.next(1);
?observer.next (2);
?observer.next(3);
};
// 1.這里創建一個發布者,它存在一個onSubscribe函數與
// ? 觀察者產生聯系
const source$ = new Observable(onSubscribe);
// 2.創建一個觀察者,有一個next屬性用于接收傳過來的值
const theObserver = {
?next: (item) => console.log(item),
};
// 3.通過subscribe函數 將發布者和觀察者聯系起來,此時發
// ? 布者中的onSubscribe函數會被觸發
source$.subscribe(theObserver);
=> 延遲的Observable <= 舉例: ?如何讓上面的例子中推送每個正整數之間有一定的時間間隔?
思考: ?這個邏輯放在哪個部分更合適?
解釋: ?按照分工,發布者產生數據,觀察者處理數據,這樣一來發布者控制推送數據的節奏也很合理。
const onSubscribe = (observer) => {
?let number = 1;
?const handle = setInterval(() => {
? ?observer.next(number++);
? ?if (number > 3) {
? ? ?clearInterval(handle);
? ?}
?}, 1000);
};
結論: ?發布者推送數據可以有時間間隔,這樣使得異步操作十分容易,因為對于觀察者,只需要被動接受推送數據來處理,再不用關心數據何時產生。
=> 永無止境的Observable <= 說明: ?其實發布者發射的數據可以是無窮的,每次發布者使用next發射出一個數據,這個數據會被觀察者接收然后消化掉,所以不會存在數據堆積;如果發布者的next方法停止調用,只能表示發布者此時不會發射數據出去,但并不代表之后不會發射數據;如果需要明確發布就不會再有新數據產生了,還需要多個Observable完結的方式 。
const onSubscribe = (observer) => {
?let number = 1;
?const handle = setInterval(() => {
? ?observer.next(number++);
?}, 1000);
};
=> Observable的完結 <= 說明: ?觀察者的next 方法只能表示現在推送的數據是什么,并不能表示后面沒有更多數據了,也就是沒辦法完全停止 它推送數據,但是在RxJS中,可以使用觀察者的complete 方法來完成。
import { Observable } from "rxjs";
const onSubscribe = (observer) => {
?let number = 1;
?const handle = setInterval(() => {
? ?observer.next(number++);
? ?if (number > 3) {
? ? ?clearInterval(handle);
? ? ?// 使用函數完全停止數據的發送
? ? ?observer.complete();
? ?}
?}, 1000);
};
const source$ = new Observable(onSubscribe);
const theObserver = {
?next: (item) => console.log(item),
?// 定義函數來讓發布者完全停止數據的傳輸
?complete: () => console.log("No More Data"),
};
source$.subscribe(theObserver);
=> 錯誤的Observable <= 說明: ?理想情況下,發布者只管生產數據給觀察者來消耗,但是,難免有時候發布者會遇到了異常情況,而且這種異常情況不是生產者所能夠處理并恢復正常的,發布者在這時候沒法再正常工作了,就需要通知對應的觀察者發生了這個異常情況,如果只是簡單地調用 complete,觀察者只會知道沒有更多數據,卻不知道沒有更多數據的原因是因為遭遇了異常,所以,我們還要在發布者和觀察者的交流渠道中增加一個新的函數error。
import { Observable } from "rxjs/Observable";
const onSubscribe = (observer) => {
?observer.next(1);
?// 此時發布者出現不能自己解決的錯誤,調用方法通知觀察者,
?// 此時發布者已經進入完結的狀態,后面所調用的next和complete
?// 都會失效
?observer.error("Someting Wrong");
?observer.complete();
};
const source$ = new Observable(onSubscribe);
const theObserver = {
?next: (item) => console.log(item),
?// 用來處理錯誤信息
?error: (err) => console.log(err),
?complete: () => console.log("No More Data"),
};
source$.subscribe(theObserver);
在RxJS中,一個發布者對象只有一種終結狀態,要么是complete,要么是error,一旦進入出錯狀態,這個發布者對象也就終結了,再不會調用對應觀察者的next函數 ,也不會再調用觀察者的complete函數 ;同樣,如果一個發布者對象進入了完結狀態,也不能再調用觀察者的next和error。 此外,一個觀察者對象,里面可以存在next、error、complete三個方法,用于接受發布者的三種不同事件,如果不關心某種事件,可以不實現對應的方法;比如對于一個永遠不會結束的發布者, 真的沒有必要提供complete方法,因為它永遠不會被調用到;但是對于錯誤,觀察者是無法察覺發布者會出現什么錯情況的,所以error方法還是需要。 (3)退訂Observable 說明: ?有時候需要斷開發布者與觀察者之間的聯系,這個操作就叫做退訂,在發布者的onSubscribe函數執行的時候,它可以返回一個對象,對象上可以有一個unsubscribe函數,執行這個函數來進行退訂操作。
import { Observable } from "rxjs";
const onSubscribe = (observer) => {
?let number = 1;
?const handle = setInterval(() => {
? ?observer.next(number++);
?}, 1000);
?return {
? ?unsubscribe : () => {
? ? ?clearInterval(handle);
? ?},
?};
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe((item) => console.log(item));
setTimeout(() => {
?subscription.unsubscribe();
}, 3500);
注意: ?退訂函數執行后,表示觀察者不再接受發布者推送的數據,但是發布者并沒有停止推送數據,因為發布者并沒有到達終結狀態 ,也就是沒有調用complete 或者是error 方法,此時只是發布者推送的數據觀察者不接收而已,看下面的例子:
import { Observable } from "rxjs";
const onSubscribe = (observer) => {
?let number = 1;
?const handle = setInterval(() => {
? ?// 將發布者發射的數據打印出來
? ?console.log("in onSubscribe ", number);
? ?observer.next(number++);
?}, 1000);
?return {
? ?unsubscribe: () => {
? ? ?// 這里不清除定時器,讓發布者繼續產生數據
? ? ?// clearInterval(handle);
? ?},
?};
};
const source$ = new Observable(onSubscribe);
// 每次觀察者執行的時候打印出收到的數據
const subscription = source$.subscribe((item) => console.log(item));
setTimeout(() => {
?subscription.unsubscribe();
}, 3500);
發布者產生的事件,只有觀察者通過訂閱之后才會收到,在退訂之后就不會收到。 (4)了解兩種Observable 說明: ?這里介紹的是Hot Observable和Cold Observable。
場景: ?假設每個發布者對象有兩個觀察者對象來訂閱, 而且這兩個觀察者對象并不是同時訂閱,第一個觀察者對象訂閱N秒鐘之后,第二個觀察者對象才訂閱同一個發布者對象,而且,在這N秒鐘之內,發布者對象已經吐出了一些數據,此時對這吐出的數據有兩種處理:
Hot Observable:只需要接受從訂閱那一刻開始發布者產生的數據就行;有點類似在電視上面看節目,你所看到的內容是節目當前這一刻開始的,之前的節目你是看不見的,假如你的家人跟你一起看,那么你們看到的節目是一樣的,這就可以理解為獲取數據的數據源是相同的 Cold Observable:不能錯過,需要獲取發布者之前產生的數據,也就是每次都需要獲取發布者完整的數據,可以理解為每次得到的數據與之前的數據之間并不存在聯系,是相互獨立的,也就是每次會得到獨立的數據源,就像你在手機應用市場下載游戲,跟你在同樣地方下載的游戲是一樣的。 理解: ?那么就可以得到這樣的結果,如果Cold Observable沒有訂閱者,數據不會真正的產生,就像你如果不主動下載游戲,你手機上不可能玩到的;而對于Hot Observable在沒有訂閱者的時候,數據依然產生,只不過不傳入數據管道而已,就像電視機節目,節目一直存在與此,只是你沒切換到那個頻道觀看而已。
(5)操作符簡介 說明: ?一個發布者對象就是一個數據流,在RxJS中數據流一般使用$開頭來命名;在一個復雜問題里面,數據流并不會直接交給觀察者來處理,在這途中會使用一系列內置的函數來處理數據,這些函數可以理解為操作符;就像一個管道,數據從管道的一段流入,途徑管道各個環節,當數據到達觀察者的時候,已經被管道操作過,有的數據已經被中途過濾拋棄掉了,有的數據已經被改變了原來的形態,而且最后的數據可能來自多個數據源,最后觀察者只需要處理能夠達到終點的數據。
說明: ?在數據管道中流淌的數據就像是水,從上游流向下游。對一個操作符來說,上游可能是一個數據源,也可能是其他操作符,下游可能是最終的觀察者,也可能是另一個操作符,每一個操作符之間都是獨立的,正因為如此,所以可以對操作符進行任意組合,從而產生各種功能的數據管道。
6)理解彈珠圖 作用: ?RxJS中每一個發布者是一個數據流,簡單的數據流可以由大腦想象出來,但是復雜的可就不好像了,此時就可以使用彈珠圖 來具體的方式來描述數據流,看兩張圖:
說明: ?這個彈珠圖所表示的數據流,每間隔一段時間吐出一個遞增的正整數,吐出到3的時候結束。因為每一個吐出來的數據都像是一個彈珠,所以這種表達方式叫做彈珠圖。在彈珠圖中,每個彈珠之間的間隔,代表的是吐出數據之間的時間間隔,通過這種形式,能夠很形象地看清楚每個發布者對象中數據的分布。 根據彈珠圖的傳統,豎杠符號|代表的是數據流的完結,對應調用complete函數,數據流吐出數據3之后立刻就完結了。 符號×代表數據流中的異常,對應于調用下游的error函數。
注意: ?為了描述操作符的功能,彈珠圖中往往會出現多條時間軸,因為各部分操作符的工作都是把上游的數據轉為傳給下游的數據,在彈珠圖上必須把上下游的數據流都展現出來,此外,編寫彈珠圖可以去此處,后面如果存在彈珠圖的地方所使用的代碼復制到此處就可以看到了。
二、實現操作符 理解: ?一個操作符是返回一個Observable對象的函數,不過,有的操作符是根據其他Observable對象產生返回的Observable對象,有的操作符則是利用其他類型輸出產生返回的Observable對象,還有一些操作符不需要輸出就可以憑空創造一個Observable對象,這里以實現一個操作符來慢慢理解什么是操作符。
(1)實現操作符函數 說明: ?每一個操作符是一個函數,不管函數的功能是怎樣的,它需要包含以下功能點,這里實現map操作符為例
返回?個全新的Observable對象。 需要存在訂閱和退訂的操作。 處理異常情況。 及時釋放資源。 => 返回Observable對象 <= 分析: ?首先map操作符的功能是遍歷得到的數據,通過傳入的參數函數來處理這些數據,看下面的例子:
// 這里的函數參數將數據的每一個值都乘以2,
// 如果source$是?個 1、2、3的序列,
// 那么map返回的序列就是2、4、6,根據函數式編程 的原則,
// map函數是不會修改原始的數據的,同時其返回值是?個全
// 新的Observable對象,這樣可以保持原始Observable對象的狀態
// 避免不可預料的行為
const result$ = source$.map(x => x * 2);
實現: ?根據上面的分析可以得到下面這個函數
// 這里的project就是傳遞給map操作符的函數參數
function map(project) {
?// map中利?new關鍵字創造了?個Observable對象,
?// 函數返回的結果就是這個對象,如此?來,
?// map可以?于鏈式調?,可以在后?調?其他的操作符,
?// 或者調?subscribe增加Observer。
?return new Observable((observer) => {
? ?// 假設此處this表示發布者對象,訂閱后數據就會交給觀察者了
? ?this.subscribe({
? ? ?next: (value) => observer.next(project(value)),
? ? ?error: (err) => observer.error(error),
? ? ?complete: () => observer.complete(),
? ?});
?});
}
=> 退訂處理 <= 說明: ?作為一個通用的操作符,無法預料上游Observable是如何實現的,上游完全可能在被訂閱時分配了特殊資源,如果不明確地告訴上游這些資源再也用不著了的話,它也不會釋放這些資源,此時就會造成資源的泄露,所以下游退訂那些資源,就要告訴上游退訂那些資源。
function map(project) {
?return new Observable((observer) => {
? ?const sub = this.subscribe({
? ? ?next: (value) => observer.next(project(value)),
? ? ?error: (err) => observer.error(error),
? ? ?complete: () => observer.complete(),
? ?});
? ?return {
? ? ?// 根據前面的了解這里需要存在一個unsubscribe
? ? ?// 方法用于退訂
? ? ?unsubscribe: () => {
? ? ? ?sub.unsubscribe();
? ? ?},
? ?};
?});
}
=> 處理異常 <= 說明: ?上面代碼中的參數project可以輸入的情況有很多,可能存在執行的時候不合理的代碼,此時就會出現異常,此時需要 捕獲異常錯誤,把異常錯誤沿著數據流往下游傳遞,最終如何處理交給觀察者來決定。
function map(project) {
?return new Observable((observer) => {
? ?const sub = this.subscribe({
? ? ?next: (value) => {
? ? ? ?try {
? ? ? ? ?observer.next(project(value));
? ? ? ?} catch (err) {
? ? ? ? ?observer.error(err);
? ? ? ?}
? ? ?},
? ? ?error: (err) => observer.error(error),
? ? ?complete: () => observer.complete(),
? ?});
? ?return {
? ? ?unsubscribe: () => {
? ? ? ?sub.unsubscribe();
? ? ?},
? ?};
?});
}
(2)關聯Observable 使用原型: ?這個操作符在使用的時候需要一個Observable對象實例,因此這個操作符是一個實例操作符,此時使用打補丁的方式關聯發布者對象的格式為Observable.prototype.操作符 = 操作符函數,既然有實例操作符,當然也有靜態操作符,它不需要Observable實例就可以使用,它的打補丁的格式為Observable.操作符 = 操作符函數,這個的弊端在于會影響每一個Observable對象。
Observable.prototype.map = map;
使用call和bind: ?解決上面的問題,可以讓我們?定義的操作符只對指定的 Observable對象可?,這時就可以?bind ,當然也可以使用call 。
// 一般使用
const result$ = map.bind(Observable對象)(x => x * 2);
// 鏈式調用
const result$ = map.bind(
? ? ? ? ? ? ? ? ? ?map.bind(Observable對象)((x) => x * 2)
? ? ? ? ? ? ? ?)((x) => x + 1);
// 一般使用
onst result$ = map.call(Observable對象, x => x * 2);
// 鏈式調用
const result$ = map.call(
? ?map.call(Observable對象, (x) => x * 2),
? ?(x) => x * 2
);
3)改進操作符 說明: ?如果遵循函數式編程思想,需要使用純函數,也就是函數執行的結果完全由輸入的參數決定,但是上面定義的函數中存在this,這是一個不確定的因素,也就是這個函數不屬于純函數了,所以在此處需要改進一下。
=> 缺陷 <= 說明: ?在現代網頁開發的過程中,都會經過打包才發布到產品環境,在打包的過程中會使用Tree-Shaking這個工具來去除代碼中沒有使用的代碼,比如那些引入的變量但是并沒有使用這種的;但是這個工具對于RxJS來說沒什么用,這是因為Tree Shaking只能做靜態代碼檢查,并不是在程序運行時去檢測這個函數是否被真的調用,只有這個函數在任何代碼中間都沒有引用過,才認為這個函數不會被引用。然而,RxJS中任何一個操作符都是掛在 Observable類上或者Observable.prototype上的,賦值給Observable或者 Observable.prototype上某個屬性在Tree Shaking看來就是被引用,所以,所有的操作符,不管真實運用時是否被調用,都會被Tree Shaking認為是會用到的代碼,也就不會當做死代碼刪除;其次上面關聯Observable的方式是直接添加到其原型上面,由于全局存在一個Observable對象,就跟window對象一樣,像上面添加屬性和方法是不可取的,是會帶來隱患的。
=> 不"打補丁" <= 說明: ?開發RxJS庫的規則的其中一條就是不能使用打補丁的方式使操作符函數與Observable對象關聯起來。如果是實例操作符,可以使用前面介紹過的bind/call,讓一個操作符函數只對一個具體的Observable對象生效;如果是靜態操作符,直接使用就好。
// 這里使用上面實現的map函數實現一個double操作符
import { Observable, of } from "rxjs";
function map(project) {
?return new Observable((observer) => {
? ?const sub = this.subscribe({
? ? ?next: (value) => {
? ? ? ?try {
? ? ? ? ?observer.next(project(value));
? ? ? ?} catch (err) {
? ? ? ? ?observer.error(err);
? ? ? ?}
? ? ?},
? ? ?error: (err) => observer.error(error),
? ? ?complete: () => observer.complete(),
? ?});
? ?return {
? ? ?unsubscribe: () => {
? ? ? ?sub.unsubscribe();
? ? ?},
? ?};
?});
}
Observable.prototype.double = function () {
? ?// 將當前的Observable對象作為this值傳遞給map函數
? ?return map.call(this, (x) => x * 2);
};
// of操作符用于創建一個Observable對象
const source$ = of(1, 2, 3);
const result$ = source$.double().subscribe((res) => console.log(res));
(4)lettable/pipeable操作符 原因: ?上面使用call/bind方法在函數體內還是會使用this,函數還是不純,其次call的返回類型是無法確定的,在ts中只能使用any表示,因此會讓其失去類型檢查。
說明: ?從RxJS v5.5.0開始,加上了這種更先進的操作符定義和使用方式,稱為pipeable操作符,也曾經被稱為lettable操作符,但是因為字面上太難理解,所以改成pipeable。
=> let操作符 <= 作用: ?實際上就是把上游的Observable對象作為參數傳遞給let操作符里面的參數進行處理,處理完之后將返回的Observable交給下游來訂閱。
// 下面的map函數就是上面寫的那個,這是以前的寫法,現在不支持,
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/let';
const source$ = Observable.of(1, 2, 3);
// 雖然map的使?是通過給Observable打補丁導?的,
// 但是map是直接作?于參數obs$,?不是作?于this,
// 所以,double$是?個純函數。
const double$ = obs$ => obs$.map(x => x * 2);
// let的參數是?個函數,在這個例?中函數參數名為double$,
// 這個函數名也以$為后綴,代表它返回的是?個Observable對象,
// double$同樣接受?個Observable對象作為輸?參數,也就是說,
// double$的功能就是根據?個Observable對象產??個新的
// Observable對象。
const result$ = source$.let(double$);
過程: ?let起到連接上游下游 的作用,真正的工作完全由函數參數map來執行。
處理之前的map函數: ?此時map的實現部分也看不到對this的訪問,而是用一個參數obs$代替了 this,這樣,在數據管道中上游的Observable是以參數形式傳遞,而不是靠 this來獲得,讓map徹底成了一個純函數。map執行返回的結果是一個函數,接受一個Observable對象返回一個 Observable 對象,正好滿足let的參數要求。
const map = (fn) => (obs$) =>
?new Observable((observer) =>
? ?obs$.subscribe({
? ? ?next: (value) => observer.next(fn(value)),
? ? ?error: (err) => observer.error(error),
? ? ?complete: () => observer.complete(),
? ?})
?);
好處: ?由于每一個lettable操作符都是純函數,而且也不會被作為補丁掛在Observable上,Tree Shaking就能夠找到根本不會被使用的操作符。
=> pipe操作符 <= 原因: ?要導入let這個操作符,又不得不用傳統的打補丁或者使用call的方式,使用起來要導入很麻煩;所以創建了pipe操作符,它可以滿足let具備的功能。使用pipe只需像使用let那樣導入模塊,任何Observable對象都保持pipe,此外還有管道功能,可以把多個lettable操作符串接起來,形成數據管道。
import { map, filter, of } from "rxjs";
const source$ = of(1, 2, 3);
// 可以一次使用多個操作符
const result$ = source$.pipe(
?filter((x) => x % 2 === 0),
?map((x) => x * 2)
);
result$.subscribe(console.log);
三、創建數據流 (1)創建類操作符 說明: ?這里所說的創造,是指這些操作符不依賴于其他Observable對象,這些操作符可以憑空或者根據其他數據源創造出?個Observable對象,其次創建類操作符往往不會從其他Observable對象獲取數據,因為在數據管道中它自己就是數據流的源頭,基于這些特性大部分的創建類操作符都是靜態操作符。
(2)創建同步數據流 說明: ?對于同步的Observable對象,需要關心的是存在哪些數據和數據之間的先后順序,由于數據之間的時間間隔不存在因此不需要考慮時間方面的問題。
=> of操作符 <= 作用: ?可以輕松創建指定數據集合的Observable對象;
參數: ?of(數據1,數據2,數據3...);
注意: ?of操作符所產生的Observable對象被訂閱后會將參數依次吐出來,吐完之后會調用complete方法;吐的這個過程是同步的,也就是所有數據之間是不存在間隔的。
const { of } = Rx;
of(1).pipe();
const { of } = Rx;
of(1, 2, 3).pipe();
值: ?of產生的是Cold Observable,對于每一個Observer都會重復吐出同樣的一組數據,所以可以反復使用。
=> range操作符 <= 作用: ?對需要產生多個很長連續數字序列的場景,就是得上range這個操作符了,range的含義就是“范圍”,只需要指定一個范圍的開始值和長度,range 就能夠產生這個范圍內的依次+1的數字序列;同樣數據吐完之后會調用complete方法。
參數: ?range(序列開始的任意數字,序列的長度)
const { range } = Rx;
range(1, 100).pipe();
局限性: ?無法規定每次遞增的大小
=> generate操作符 <= 作用: ?類似一個for循環,設定一個初始值,每次遞增這個值,直到滿足某個條件的時候才中止循環,同時,循環體內可以根據當前值產生數據。
參數: ?generate(初始值, 條件判斷函數, 值如何增加函數, 返回結果處理函數)
// 假設存在這樣的for循環:產??個?10?的所有偶數的平?
const result = [];
for (let i = 2; i < 10; i += 2) {
?result.push(i * i);
}
// 使用generate類似實現
const { generate } = Rx;
generate(
? ?2, // 初始值,相當于for循環中的i=2
? ?value => value < 10, //繼續的條件,相當于for中的條件判斷
? ?value => value + 2, //每次值的遞增
? ?value => value * value // 產?的結果
).pipe();
注意: ?使用時需要保證后面三個函數參數為純函數
=> repeat操作符 <= 作用: ?重復上游Observable中的數據n 次
參數1: ?repeat(重復的次數)
const { of } = Rx;
const { repeat } = RxOperators;
of(1, 2, 3).pipe(repeat(100))
參數2: ?repeat({count: 重復的次數, delay: 數據的時間間隔})
import { of, repeat } from "rxjs";
of(1, 2, 3)
?.pipe(
? ?repeat({
? ? ?count: 10,
? ? ?delay: 1000,
? ?})
?)
?.subscribe((res) => console.log(res));
注意: ?保證上游Observable對象一定會完結。
=> EMPTY常量 <= 作用: ?產生一個直接完結 的Observable對象,沒有參數,不產生任何數據,直接完結。
const { EMPTY ?} = Rx;
EMPTY.pipe()
=> throwError操作符 <= 作用: ?它所產生的Observable對象也是什么都不做,直接出錯 ,拋出的錯誤就是throw的參數
參數: ?throwError(錯誤程序)
const { throwError ?} = Rx;
throwError(new Error('這是一個錯誤')).pipe()
=> NEVER常量 <= 作用: ?產生的Observable對象就真的是什么都不做,既不吐出數據,也不完結 ,也不產生錯誤 ,就這樣待著,一直到永遠
const { NEVER ?} = Rx;
NEVER.pipe()
(3)創建異步數據流 說明: ?就是創建異步的Observable對象,不光要考慮產生什么數據,還需要考慮數據之間的時間間隔了
=> interval操作符 <= 作用: ?定時從Observable對象吐出一個數據,如果不主動結束就一直執行
參數: ?interval(吐數據的間隔毫秒數)
const { interval } = Rx;
interval(1000).pipe()
注意: 它所創建的數據流不會自動完結,也就是不會調用complete方法,要想結束只能夠執行退訂操作了 其次這個異步數據序列總是從0開始遞增的; 最后它與原生的setinterval的地位是等價的 => timer操作符 <= 作用: ?產生的Observable對象在指定毫秒之后會吐出一個數據,執行后立即結束
參數: ?timer(毫秒數 / 一個Date對象, 時間間隔)
// 明確延時產?數據的時間間隔
const { timer } = Rx;
timer(1000).pipe()
// 明確的是?個時間點
const { timer } = Rx;
timer(
?new Date(
? ?new Date().getTime() + 1000
?)
).pipe()
const { timer } = Rx;
timer(1000,2000).pipe()
注意: 如果使用第二個參數,產生的數據流跟interval類似,只不過產生第一個數據的時間間隔由第一個參數決定,后面產生數據的時間間隔由第二個參數決定;如果兩個參數一致,那就是interval了 => from操作符 <= 作用: ?以把任何可迭代對象都轉化為Observable對象
參數: ?from(任何可迭代對象)
const { from } = Rx;
from([1,2,3]).pipe()
const { from } = Rx;
from('abc').pipe()
注意: ?在from的眼中,把輸出參數都當做一個Iterable 來看待,所以上面的字符串abc在from看來就和數組['a','b','c']沒有區別
import { from } from "rxjs";
const promise = Promise.resolve("good");
const source$ = from(promise);
source$.subscribe(
?console.log,
?(error) => console.log("catch", error),
?() => console.log("complete")
);
import { from } from "rxjs";
const promise = Promise.reject("error");
const source$ = from(promise);
source$.subscribe(
?console.log,
?(error) => console.log("catch", error),
?() => console.log("complete")
);
解釋: ?如果from的參數是promise,當promsie成功結束,from產生的Observable對象就會吐出Promise成功的結果,并且立刻結束,如果以失敗而告終的時候,from產生的Observable對象也會立刻產生失敗事件
=> fromEvent操作符 <= 作用1: ?在網頁開發中,可以把DOM中的事件轉化為Observable對象中的數據
參數1: ?fromEvent(事件源, 事件名稱)
// 希望點擊id為clickMe的按鈕時,id為text的div中的數字會增加1,
// 連續點擊那個按鈕,對應數字會持續增加
<template>
?<div>
? ?<button id="clickMe">Click Me</button>
? ?<div id="text">0</div>
?</div>
</template>
<script setup>
import { fromEvent } from "rxjs";
import { onMounted } from "vue";
let clickCount = 0;
onMounted(() => {
?const event$ = fromEvent(
? ? ?document.querySelector("#clickMe"),
? ? ?"click"
?);
?event$.subscribe(() => {
? ?document
? ? ? ?.querySelector("#text")
? ? ? ?.innerText = ++clickCount;
?});
});
</script>
<style></style>
說明: ?網頁開發中事件源一般是DOM節點
// 這里展示從Node.js的events中獲得數據
import { fromEvent } from "rxjs";
// 這個模塊需要使用 npm i events 安裝一下
import EventEmitter from "events";
const emitter = new EventEmitter();
// 只接受數據源中事件為"msg"的數據
const source$ = fromEvent(emitter, "msg");
source$.subscribe(
?console.log,
?(error) => console.log("catch", error),
?() => console.log("complete")
);
// emitter的emit函數發送任何名稱的事件,
// 第?個參數就是事件名稱,第?個參數是數據
emitter.emit("msg", 1);
emitter.emit("msg", 2);
emitter.emit("another-msg", "oops");
emitter.emit("msg", 3);
注意: ?fromEvent產生的是Hot Observable,也就是數據的產生和訂閱是無關的,如果在訂閱之前調用emitter.emit,那有沒有Observer這些數據都會立刻吐出來,等不到訂閱的時候,當添加了Observer的時候,仍然什么數據都獲得不到。
import { fromEvent } from "rxjs";
import EventEmitter from "events";
const emitter = new EventEmitter();
const source$ = fromEvent(emitter, "msg");
// 在訂閱之前發射數據
emitter.emit("msg", 1);
emitter.emit("msg", 2);
emitter.emit("another-msg", "oops");
emitter.emit("msg", 3);
source$.subscribe(
?console.log,
?(error) => console.log("catch", error),
?() => console.log("complete")
);
=> fromEventPattern操作符 <= 作用: ?用于處理的Observable對象被訂閱 和退訂 時的動作
參數: ?fromEventPattern(被訂閱時觸發的函數, 被退訂時觸發的函數)
import { fromEventPattern } from "rxjs";
import EventEmitter from "events";
const emitter = new EventEmitter();
// handler參數可以理解為觀察者對象中的next方法
const addHandler = (handler) => {
?// 監聽事件源中的msg事件,每次觸發事件執行next方法
?emitter.addListener("msg", handler);
};
const removeHandler = (handler) => {
?// 與上面相反,會移除msg事件上面的next方法
?emitter.removeListener("msg", handler);
};
const source$ = fromEventPattern(addHandler, removeHandler);
const subscription = source$.subscribe(
?console.log,
?(error) => console.log("catch", error),
?() => console.log("complete")
);
emitter.emit("msg", "hello");
emitter.emit("msg", "world");
// 取消訂閱后emitter上面監聽的事件被取消掉,
// 所以此處的值并不會出現在Observable對象里面
subscription.unsubscribe();
emitter.emit("msg", "end");
說明: ?它提供的就是一種模式,不管數據源是怎樣的行為,最后的產出都是一個Observable對象
=> ajax操作符 <= 作用: ?用于發送請求并根據結果返回Observable對象
參數: ?ajax('請求的地址')
// 根據github上的api獲取RxJS項?獲得的Start的數量
<template>
?<div>
? ?<button id="getStar">Get RxJS Star Count</button>
? ?<div id="text"></div>
?</div>
</template>
<script setup>
import { onMounted } from "vue";
import { fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
onMounted(() => {
?fromEvent(
? ? ?document.querySelector("#getStar"),
? ? ?"click"
?).subscribe(
? ? ?() => {
? ? ? ? ?ajax("https://api.github.com/repos/ReactiveX/rxjs")
? ? ? ? ?.subscribe(
? ? ? ? ? ? ?(value) => {
? ? ? ? ? ? ? ? ?const starCount =
? ? ? ? ? ? ? ? ? ? ?value.response.stargazers_count;
? ? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ?document.querySelector("#text").innerText =
? ? ? ? ? ? ? ? ? ? ?starCount;
? ? ? ? ? });
?});
});
</script>
=> defer操作符 <= 作用: ?用于延遲 執行某些操作
參數: ?defer(一個函數,這個函數會在被訂閱時調用)
// 延遲發送請求
import { defer } from "rxjs";
import { ajax } from "rxjs/ajax";
defer(
? ?() => ajax("https://api.github.com/repos/ReactiveX/rxjs")
? ? ? ? ? ?.subscribe(
? ? ? ? ? ? ? ?(res) => console.log(res)
? ? ? ? ? ?)
);
四、合并數據流 (1)合并類操作符 說明: ?其作用在于將有多個Observable對象作為數據來源,把不同來源的數據根據不同的規則合并到一個Observable對象中。
=> concat操作符 <= 作用: ?把多個Observable中的數據內容依次合并,合并的時候原有數據不變
參數: ?concat(數據1, 數據2, 數據3...)
import { concat, of } from "rxjs";
const source1$ = of(1, 2, 3);
const source2$ = of(4, 5, 6);
concat(source1$, source2$)
? ?.subscribe(
? ? ? ?(res) => console.log(res)
? ?);
注意: ?它在工作的時候,會先從第一個Observable對象中獲取數據,等它complete之后,再從下一個對象中去數據,直到取完所有的,此時,如果其中有一個對象是不完結的狀態,那么它之后的Observable對象就不會有被取到的機會。
=> merge操作符 <= 作用: ?一定性訂閱上游所有的Observable對象,只要有數據傳遞下來,這個數據就會被傳遞給下游,也就是數據采取先到先出的原則,同時合并的時候原有數據不變
參數: ?merge(數據1, 數據2, 數據3, ... 可選數字參數)
場景一:合并異步數據流 import { merge, of, repeat, pipe } from "rxjs";
// 隔700ms重復一個A,重復的次數為5次
const source1$ = of("A").pipe(
? ? ? ? ? ? ? ? ? ? ? ?repeat(
? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 700 }
? ? ? ? ? ? ? ? ? ? ? ?));
// 隔800ms重復一個B,重復的次數為5次
const source2$ = of("B").pipe(
? ? ? ? ? ? ? ? ? ? ? ?repeat(
? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 800 }
? ? ? ? ? ? ? ? ? ? ? ?));
const merged$ = merge(source1$, source2$);
merged$.subscribe((res) => console.log(res));
場景二: 同步限流 解釋: ?此時需要用到最后的參數 ,參數是一個數字,表示可以同時合并的個數
import { merge, of, repeat, pipe } from "rxjs";
// 隔700ms重復一個A,重復的次數為5次
const source1$ = of("A").pipe(
? ? ? ? ? ? ? ? ? ? ? ?repeat(
? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 700 }
? ? ? ? ? ? ? ? ? ? ? ?));
// 隔800ms重復一個B,重復的次數為5次
const source2$ = of("B").pipe(
? ? ? ? ? ? ? ? ? ? ? ?repeat(
? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 800 }
? ? ? ? ? ? ? ? ? ? ? ?));
? ? ? ? ? ? ? ? ? ? ? ?
// 隔900ms重復一個C,重復的次數為5次
const source3$ = of("C").pipe(
? ? ? ? ? ? ? ? ? ? ? ?repeat(
? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 900 }
? ? ? ? ? ? ? ? ? ? ? ?));
? ? ? ? ? ? ? ? ? ? ? ?
// 限定合并的個數為2
const merged$ = merge(source1$, source2$, source3$, 2);
merged$.subscribe((res) => console.log(res));
場景三:合并多個事件 說明: ?一個元素存在click 事件和touch 事件,對應網頁和移動設備,假如其事件處理程序的邏輯一致,此時就可以分別使用fromEvent 獲取單個事件流,之后用merge 合并成一個數據流,就可以集中管理了
// 可以像這樣處理
const click$ = fromEvent(element, 'click');
const touchend$ = fromEvent(element, 'touchend');
merge(click$, touchend$).subscribe(處理函數);
=> zip操作符 <= 作用: ?將數據流里面的數據一一對應 并使用數組 組合起來
參數: ?zip(數據流1, 數據流2, 數據流3...)
場景一: 一對一合并 import { interval, of, zip } from "rxjs";
// 一個異步數據流,產生的數據是無限的
const source1$ = interval(1000);
// 一個同步數據流,產生的數據流是有限
const source2$ = of("a", "b", "c");
// 將兩個數據流合并
zip(source1$, source2$)
? ?.subscribe(
? ? ? ?(res) => console.log(res),
? ? ? ?null,
? ? ? ?() => console.log('complete')
? ?);
注意: ?這里數據的匹配是一一對應 的,所以數據少的那個Observable決定zip 產生數據的個數;然后在對應的時候需要雙方都有數據 才能夠對應,這也是為什么上面的打印會隔1s才打印。
問題: 數據積壓 說明: ?如果某個上游source1$吐出數據的速度很快,而另一個上游source2$吐出數據的速度很慢,那zip就不得不先存儲source1$吐出的數據,因為RxJS的工作方式是“推”, Observable把數據推給下游之后就沒有責任保存數據了。被source1$推送了數據之后,zip就有責任保存這些數據,等著和source2$未來吐出的數據配對。假如source2$遲遲不吐出數據,那么zip就會一直保存source1$沒有配對的數據,然而這時候source1$可能會持續地產生數據,最后zip積壓的數據就會越來越多,占用的內存也就越來越多。對于數據量比較小的Observable對象,這樣的數據積壓還可以忍受,但是對于超大量的數據流,使用zip就不得不考慮潛在的內存壓力問題。
=> combineLatest操作符 <= 作用: ?合并上游所有Observable一個最新的數據,也就是它返回值是一個數組
參數: ?combineLatest([數據1, 數據2, 數據3 ...], 處理函數)
場景一: 基本使用 import { combineLatest, timer } from "rxjs";
// 隔1s產生一個數字
const firstTimer = timer(1000, 1000);
// 隔2s產生一個數字
const secondTimer = timer(1000, 2000);
// 合并上面的數據流
const combinedTimers = combineLatest([firstTimer, secondTimer]);
combinedTimers.subscribe((value) => console.log(value));
注意: ?首先還是一一對應的關系,也就是如果一個數據源還沒發射值出來,那么會等待它將值發射出來,如果值沒有改變并且操作沒有完結的話,發射的值將一直是這一個,所以只有所有的Observable對象完結,combineLatest才會給下游一個complete信號,表示不會有任何數據更新了
場景二: 合并同步數據流 const firstTimer = of("a", "b", "c");
const secondTimer = of(1, 2, 3);
const combinedTimers = combineLatest([firstTimer, secondTimer]);
combinedTimers.subscribe((value) => console.log(value));
工作方式: ?combineLatest在工作的時候,會按照順序依次訂閱所有上游的Observable對象,只有所有上游Observable對象都已經吐出數據了,才會給下游傳遞所有上游“最新數據”組合的數據
解釋: ?由于of產生的同步數據流,在被訂閱時就會吐出所有數據,最后一個吐出的數據是字符串c,這也就是最新的數據,然后訂閱下一個對象,下一個對象會依次吐出數據,然后跟上一個對象的最新數據c結合,這就得到了上面看到的內容
場景三:定制下游數據 說明: ?這里就需要啊使用處理函數了,這個函數的參數就是每一個數據源的最新值,其返回值就是下游所接受到的數據,如果沒有返回值,則下游收到的數據為undefined
import { combineLatest, timer, of } from "rxjs";
const firstTimer = of("a", "b", "c");
const secondTimer = of(1, 2, 3);
const combinedTimers = combineLatest(
?[firstTimer, secondTimer],
?(res1, res2, res3) => {
? ?// 上面只有兩個數據源,所以參數只會前兩個有值
? ?console.log(res1, res2, res3);
?}
);
combinedTimers.subscribe();
=> withLatestFrom操作符 <= 說明: ?這個的作用于combineLatest是類似的,只不過下游推送數據只能由一個上游Observable對象驅動,也就是調用withLatestFrom的那個Observable對象起到主導數據產生節奏的作用,作為參數的Observable對象只能貢獻數據,不能控制產生數據的時機
參數: ?數據源1.withLatestFrom(數據源2)
import { withLatestFrom, timer, pipe, map } from "rxjs";
// 每隔兩秒產生100、200、300這樣的數字
const source1$ = timer(0, 2000)
? ? ? ? ? ? ? ? ? ?.pipe(
? ? ? ? ? ? ? ? ? ? ? ?map((x) => 100 * x)
? ? ? ? ? ? ? ? ? ?);
// 每隔一秒產生0、1、2這樣的數字
const source2$ = timer(500, 1000);
// 后面的處理函數將它們想加起來
const result$ = source1$
? ? ? ? ? ? ? ? ? ?.pipe(
? ? ? ? ? ? ? ? ? ? ? ?withLatestFrom(
? ? ? ? ? ? ? ? ? ? ? ? ? ?source2$,
? ? ? ? ? ? ? ? ? ? ? ? ? ?(a, b) => a + b
? ? ? ? ? ? ? ? ? ? ? ?)
? ? ? ? ? ? ? ? ? ?);
result$.subscribe(console.log);
=> race操作符 <= 作用: ?以Observable產生第一個數據的時間為準,只留下最快 的那一個,當然,使用的所有數據也是最快的那一個
參數: ?race(數據源1, 數據源2, 數據源3 ...)
import { timer, race, pipe, map } from "rxjs";
// 立即開始產生數據a
const source1$ = timer(0, 2000).pipe(map(() => "a"));
// 500ms后開始產生數據b
const source2$ = timer(500, 1000).pipe(map(() => "b"));
// 比賽
const winner$ = race(source1$, source2$);
winner$.subscribe(console.log);
=> startWith操作符 <= 作用: ?在讓?個Observable對象在被訂閱的時候,總是先同步 吐出指定的若?個數據
參數: ?數據源.startWith(參數1, 參數2, 參數3 ...)
import { of, startWith } from "rxjs";
of(1000)
?.pipe(startWith("timer start", 1, 2))
?.subscribe((x) => console.log(x));
=> forkJoin操作符 <= 作用: ?等待所有參數Observable對象的最后?個 數據,將其合并成一個數組發射出去
參數: ?forkJoin(對象 / 數組)
import { forkJoin, of, timer } from "rxjs";
// 下面會在四秒后返回結果
forkJoin({
?foo: of(1, 2, 3, 4),
?bar: Promise.resolve(8),
?baz: timer(4000),
}).subscribe((res) => console.log(res));
(2)高階Observable 說明: ?簡單理解就是一個Observable中存在Observable,它有一個特點就是高階Observable完結不代表其里面的Observable完結
=> concatAll操作符 <= 說明: ?這個操作符針對高階Observable,也是依次訂閱Observable內部的Observable取值結合,訂閱的過程中如果上一個Observable沒有完結就不會訂閱下一個Observable對象。其他操作可以參照concat
參數: ?沒有參數
import { of, concatAll } from "rxjs";
const source = of(
? ? ? ? ? ? ? ? ? ?of(1, 2, 3),
? ? ? ? ? ? ? ? ? ?of(4, 5, 6),
? ? ? ? ? ? ? ? ? ?of(7, 8, 9)
? ? ? ? ? ? ? ?);
source.pipe(concatAll())
? ? ? ?.subscribe(
? ? ? ? ? ?(val) => console.log(val)
? ? ? ?);
=> mergeAll操作符 <= 說明: ?針對高階Observable,在合并的時候,依次訂閱其內部的Observable對象,那個對象有數據傳下來,這個數據就會傳遞給下游;它可以傳遞一個數字來限定合并的最大流的個數。其他操作可以參照merge
參數: ?mergeAll(數字)
import { of, mergeAll, repeat } from "rxjs";
// 這里A延遲復制的時間比B長,所以第二次打印的時候B在前面
const source = of(
?of("A").pipe(
? ? ? ? ? ? ?repeat({ count: 5, delay: 800 })
? ? ? ? ?),
?of("B").pipe(
? ? ? ? ? ? ?repeat({ count: 5, delay: 700 })
? ? ? ? ?)
);
source.pipe(mergeAll())
? ? ? ? ? ?.subscribe(
? ? ? ? ? ? ? ?(val) => console.log(val)
? ? ? ? ? ?);
=> zipAll操作符 <= 說明: ?對高階Observable使用的時候,將數據流里面的數據一一對應 并使用數組 組合起來。其它操作可以參考zip
參數: ?zipAll(處理函數)
import { of, zipAll } from "rxjs";
const source = of(of("A", "B", "C"), of(1, 2, 3), of("X", "Y", "Z"));
source
?.pipe(
? ?// 可以接收一個處理函數,每個參數對應返回值的每一項
? ?zipAll((a, b, c) => {
? ? ?// 這里將參數打印出來,如果沒有返回值則下游將收不到值
? ? ?console.log(a, b, c);
? ?})
?)
?.subscribe();
=> combineLatestAll操作符 <= 說明: ?在處理高階Observable的時候,將其內部Observable產生的最新數據一一對應并用數組的形式返回出來。其它操作可以參考combineLatest
參數: ?combineLatestAll(處理函數)
import { of, combineLatestAll } from "rxjs";
const source = of(of("A", "B", "C"), of(1, 2, 3), of("X", "Y", "Z"));
source
?.pipe(
? ?// 可以接收一個處理函數,每個參數對應返回值的每一項
? ?combineLatestAll((a, b, c) => {
? ? ?// 這里將參數打印出來,如果沒有返回值則下游將收不到值
? ? ?console.log(a, b, c);
? ?})
?)
?.subscribe();
五、輔助類操作符 (1)數學類操作符 說明: ?這里介紹的操作符會遍歷上游Observable對象中吐出的所有數據才給下游傳遞數據, 也就是說,它們只有在上游完結的時候,才給下游傳遞唯?數據。
=> count操作符 <= 作用: ?用于統計上游Observable對象吐出的所有數據個數,所以上游的Observable需要完結
參數: ?count(過濾函數)
import { of, interval, count } from "rxjs";
// 可以完結
of(1000, 1)
?.pipe(
? ? ?// 此時過濾出數據為1的數量
? ? ?count(
? ? ? ? ?(val) => val === 1
? ? ?)
?)
?.subscribe((res) => console.log(res));
// 無法完結
interval(1000)
?.pipe(count())
?.subscribe((res) => console.log(res));
=> max和min操作符 <= 作用: ?找出上游數據中的最大值 和最小值
參數: ?max(比較函數) / min(比較函數)
import { of, max } from "rxjs";
of(
? ?{ age: 7, name: "Foo" },
? ?{ age: 5, name: "Bar" },
? ?{ age: 9, name: "Beer" }
).pipe(
? ?// 返回值為正 => a > b
? ?// 返回值為0 => a = b
? ?// 返回值為負 => a < b
? ?max((a, b) => a.age > b.age)
?)
.subscribe(
? ? (x) => console.log(x.name)
);
注意: ?如果Observable吐出的數據類型是復雜數據類型,?如?個對象,那必須指定?個?較這種復雜類型??的?法,就像上面使用的那樣
=> reduce操作符 <= 說明: ?對上游的每個數據進行自定義計算,也就是對每一個元素都會調用一次這個函數
參數: ?reduce((累加的值, 當前元素的值) => {}, 初始值)
// 計算 1-100 的和
import { range, reduce } from "rxjs";
range(1, 100)
?.pipe(
? ? ?reduce(
? ? ? ? ?(acc, current) => acc + current,
? ? ? ? ?0
? ? ?)
?)
?.subscribe(
? ? ?(res) => console.log(res)
?);
2)條件操作符 說明: ?根據上游Observable對象的某些條件產生一個新的 Observable對象
=> every操作符 <= 作用: ?它接受一個判定函數作為參數,如果上游所有數據都能夠通過這個函數,那么會返回一個包含true值的Observable,有一個通不過就返回一個包含false值的Observable,在吐出結果后every產生的Observable會立即完結;不要對不會完結的對象使用
參數: ?every(判定函數)
import { every, of } from "rxjs";
of(1, 100)
?.pipe(
? ? ?every(
? ? ? ? ?// 這里判定是否所有值都大于10,顯然1不行
? ? ? ? ?(val) => val > 10
? ? ?)
?)
?.subscribe((res) => console.log(res));
=> find和findIndex操作符 <= 作用: ?通過一個處理函數來在上游數據中查找滿足條件的數據,find會吐出找到的上游的數據,findIndex會吐出滿足判定條件的索引,如果找不到find會吐出undefined后完結,findIndex則會吐出-1后完結;不要對不會完結的對象使用
參數: ?find(處理函數) / findIndex(處理函數)
import { find, findIndex, of } from "rxjs";
of(1, 100)
?.pipe(
? ? ?find(
? ? ? ? ?(val) => (val = 100)
? ? ?)
?)
?.subscribe((res) => console.log(res));
?
of(1, 100)
?.pipe(
? ? ?findIndex(
? ? ? ? ?(val) => (val = 100)
? ? ?)
?)
?.subscribe((res) => console.log(res));
=> isEmpty操作符 <= 作用: ?檢測上游Observable對象是不是空的,如果在完結之前沒有 吐出數據,它就是空的,此時返回一個包含true 值的Observable,否則返回一個包含false 值的Observable
import { EMPTY, isEmpty, of } from "rxjs";
// 不是空的
of(1)
?.pipe(isEmpty())
?.subscribe((res) => console.log(res));
?
// 是空的
EMPTY.pipe(isEmpty())
?.subscribe((res) => console.log(res));
=> defaultIfEmpty操作符 <= 作用: ?接受一個默認值,如果 檢測上游的Observable是空的,則把這個默認值傳遞給下游,如果 不是空的就把上游的東西傳遞給下游;如果 不傳但是上游檢測還是空的,下游就會收到一個undefined 值
import { defaultIfEmpty, EMPTY, of } from "rxjs";
// 不是空值,不傳參數
of(1)
?.pipe(defaultIfEmpty())
?.subscribe((res) => console.log(res));
// 不是空值,傳參數
of(1)
?.pipe(defaultIfEmpty("存在內容"))
?.subscribe((res) => console.log(res));
// 是空值,不傳參數
EMPTY.pipe(defaultIfEmpty())
?.subscribe((res) => console.log(res));
?
// 是空值,傳參數
EMPTY.pipe(defaultIfEmpty("存在內容"))
?.subscribe((res) => console.log(res));
六、過濾數據流 (1)過濾類操作符 說明: ?對上游Observable中所有的數據使用判定函數進行操作,決定是否某些元素不能通過進入下游,如果對某個元素處理結果為true,表示能通過,否則就不能通過
=> filter操作符 <= 作用: ?跟JavaScript中的filter使用起來是類似的,只不過這里針對的是Observable
參數: ?filter(過濾函數)
import { filter, interval } from "rxjs";
source$ = interval(1000)
?.pipe(
? ? ?// 過濾能被2整除的數據
? ? ?filter(
? ? ? ? ?(x) => x % 2 === 0
? ? ?)
?)
?
source$.subscribe((res) => console.log(res));
注意: ?當上游產?數據的時候,只要這個數據滿?判定條件,就會立刻被同步傳給 下游。
=> first操作符 <= 作用: ?過濾出Observable中第一個滿足條件的值,在沒有找到的時候會拋出一個錯誤,如果不想這個錯誤傳遞給下游可以使用第二個默認值,它的作用是在沒找到滿足條件的值的時候將這個值傳遞出去。如果不傳參數則將第一個數據返回出去,
參數: ?filter(過濾函數, 默認值)
import { first, of } from "rxjs";
// 找不到結果拋出錯誤,但是給默認值
of(1, 3)
?.pipe(first((x) => x % 2 === 0, 2))
?.subscribe((res) => console.log(res));
// 找到結果
of(1, 4, 3)
?.pipe(first((x) => x % 2 === 0))
?.subscribe((res) => console.log(res));
// 找不到結果拋出錯誤
of(1, 3)
?.pipe(first((x) => x % 2 === 0))
?.subscribe((res) => console.log(res));
=> last操作符 <= 說明: ?這個作用與first相反,它是找最后一個 滿足條件的值,使用可以參考 first,這里需要注意 ,使用這個操作符的上游必須完結 ,否則操作符不知道哪一個是最后一個數據
參數: ?filter(過濾函數, 默認值)
import { last, interval } from "rxjs";
// 這個Observable不會完結,自然也不會拿到結果
interval(1000)
?.pipe(last((x) => x % 2 === 0, 2))
?.subscribe((res) => console.log(res));
=> take操作符 <= 作用: ?從上游的數據中拿指定個數 的數據,拿完之后就會完結,并將獲取的數據返回
參數: ?take(需要的個數)
import { interval, of, take } from "rxjs";
// 數據不夠拿,那就拿完為止
of("a", "b", "c")
?.pipe(take(4))
?.subscribe((res) => console.log(res));
// 獲取指定個數的數據
interval(1000)
?.pipe(take(4))
?.subscribe((res) => console.log(res));
注意: ?上游每產生一個數據就會立即傳給下游,也就是同步 操作的
=> takeLast操作符 <= 作用: ?從后往前 獲取指定個數 的數據,之后將數據一次性 返回出去之后完結
參數: ?takeLast(需要的個數)
import { interval, of, takeLast } from "rxjs";
// 數據不夠拿,那就拿完為止
of("a", "b", "c")
?.pipe(takeLast(4))
?.subscribe((res) => console.log(res));
// 數據沒有完結,獲取不到數據
interval(1000)
?.pipe(takeLast(4))
?.subscribe((res) => console.log(res));
注意: ?如果上游的Observable對象不會完結的話,那么是拿不到數據的,因為不知道誰是最后一個數據
=> takeWhile操作符 <= 說明: ?takeWhile接受?個判定函數作為參數,這個判定函數有兩個參數,分別代表上游的數據和對應的序號,takeWhile會吐出上游數據,直到判定函數返回false,只要遇到第一個判定函數返回false的情況, takeWhile產生的Observable就完結
參數: ?takeWhile(判定函數, 布爾值)
// 這里關注第二個參數
import { range, takeWhile } from "rxjs";
range(1, 10)
?.pipe(
? ? ?takeWhile(
? ? ? ? ?(val) => val < 3, true
? ? ?)
?)
?.subscribe((res) => console.log(res));
range(1, 10)
?.pipe(
? ? ?takeWhile(
? ? ? ? ?(val) => val < 3, false
? ? ?)
?)
?.subscribe((res) => console.log(res));
注意: ?第二個參數表示是否將第一次導致判定函數結果為false的那個值發射出去,默認是false,表示不發射,true則表示發射。
=> takeUntil操作符 <= 說明: ?它接受一個Observable對象,在這個對象沒有吐出數據之前,上游的數據會直接傳遞給下游,在參數對象吐出第一個數據時,上游的數據就不能傳遞給下游了。其次參數對象出現錯誤的時候,這個錯誤會傳遞給下游,此時上游數據也不能傳遞給下游了
參數: ?takeUntil(Observable對象)
// 假如使用interval創建數據,在第三秒的時候停止
import { interval, takeUntil, timer } from "rxjs";
interval(1000)
?.pipe(
? ? ?takeUntil(timer(3000))
?)
?.subscribe((res) => console.log(res));
=> skip操作符 <= 作用: ?跳過上游的前n個 值,然后從上游的第n+1個 值開始傳遞給下游,這個操作符不關心最后一個值是什么,所以 這個操作符的上游不管會不會完結下游都會有值。
參數: ?skip(跳過的個數)
import { interval, skip } from "rxjs";
// 跳過前兩個值
interval(1000)
?.pipe(skip(2))
?.subscribe((res) => console.log(res));
=> skipLast操作符 <= 作用: ?可以理解成去除上游的最后n個 值,然后將剩下的值傳遞給下游;
參數: ?skipLast(跳過的n個值)
import { interval, skipLast, of } from "rxjs";
// 一個完結的對象
of("a", "b", "c")
?.pipe(skipLast(2))
?.subscribe((res) => console.log(res));
// 不會完結的對象
interval(1000)
?.pipe(skipLast(2))
?.subscribe((res) => console.log(res));
注意: ?上游沒有完結下游依然可以收到數據
=> skipWhile操作符 <= 說明: ?它接收一個函數作為參數,上游的每一個數據都會執行這個函數,只要有一個數據在函數中的返回值是false,那么這個數據之前的數據都會被過濾調用,剩下的數據會傳遞給下游。
參數: ?skipWhile(處理函數)
import { interval, skipWhile } from "rxjs";
interval(1000)
?.pipe(skipWhile((val) => val % 2 === 0))
?.subscribe((res) => console.log(res));
=> skipUntil操作符 <= 作用: ?用于在一個Observable中跳過一些值,直到另一個Observable發出了特定的信號或者達到某種狀態。
參數: ?skipUntil(Observable對象)
import { interval, timer, skipUntil } from "rxjs";
// 創建一個每秒發出一個值的Observable
const source$ = interval(1000);
// 創建一個在5秒后發出第一個值的Observable
const trigger$ = timer(5000);
// 使用skipUntil操作符,跳過source$的值,直到trigger$發出第一個值
const example$ = source$.pipe(skipUntil(trigger$));
const subscription = example$.subscribe((val) => console.log(val));
(2)有損回壓控制 解釋: ?如果數據管道中某一個環節處理數據的速度跟不上數據涌現的速度,上游無法把數據推送給下游,就會在緩沖區中積壓數據,這就相當于對上游施加了壓力,這就是RxJS世界中的回壓。
處理: ?造成這種現象的原因是數據管道中某個環節數據涌?的速度超過了處理速度,那么,既然處理不過來,干脆就舍棄掉某些涌現的數據,這種方式稱為有損回壓控制
可選的調度器: asyncScheduler:這是默認的調度器,它使用setTimeout或setInterval來安排任務的執行。它適用于異步操作。 queueScheduler:這個調度器會按順序執行任務,并且會等待當前任務完成后才執行下一個任務。適用于同步操作。 animationFrameScheduler:這個調度器會根據瀏覽器的刷新率來執行任務,通常用于實現動畫效果或者對性能要求較高的操作。 asapScheduler:這個調度器會盡可能快地在當前執行棧中執行任務,但是會在微任務隊列中等待其他任務完成后執行。適用于需要盡快執行的任務。 TestScheduler:這是用于測試的調度器,可以用來模擬時間的流逝,方便測試 RxJS 代碼。 可選參數對象: leading:布爾值,表示是否在節流周期的開始時立即發出第一個值。默認為true。 trailing:布爾值,表示是否在節流周期結束時發出最后一個值。默認為false。 => throttleTime操作符 <= 說明: ?在一個時間范圍內,上游傳遞給下游的數據只能傳遞一個;這里參數如果只傳一個,其它值都會使用默認值;
參數: ?throttleTime(時間范圍, 調度器, 可選參數對象)
import {
? ? ? ?interval,
? ? ? ?throttleTime,
? ? ? ?asyncScheduler
} from "rxjs";
// 這里每隔1s產生一個數字
interval(1000)
?.pipe(
? ? ?throttleTime(
? ? ? ? ? ? ? ? ? ? ?2000,
? ? ? ? ? ? ? ? ? ? ?asyncScheduler,
? ? ? ? ? ? ? ? ? ? ?// trailing為true時產生的結果是:2、4、6...
? ? ? ? ? ? ? ? ? ? ?// leading為true時產生的結果是:3、6、9...
? ? ? ? ? ? ? ? ? ? ?{ leading: false, trailing: true }
? ? ? ? ? ? ? ? ?)
? ? ?)
?.subscribe((res) => console.log(res));
=> debounceTime操作符 <= 說明: ?在一個時間范圍內,一直有數據產生一直不會將數據傳遞給下游,只有在這個時間外產生的第一個數據才會傳遞給下游;所以產生數據的間隔需要大于這個時間范圍才可以
參數: ?throttleTime(時間范圍, 調度器)
import { interval, debounceTime, asyncScheduler } from "rxjs";
// 這里的值如果比2000還小那么就不會有數據打印出來
interval(4000)
?.pipe(debounceTime(2000, asyncScheduler))
?.subscribe((res) => console.log(res));
=> throttle和debounce操作符 <= 作用: ?這兩個都是使用Observable中的數據來控制流量,區別 在于時機不同而已
參數: ?throttle(處理函數, 可選參數對象)
參數: ?debounce(處理函數)
// 這里以throttle為例
import { interval, timer, throttle } from "rxjs";
const source$ = interval(1000);
// 處理函數的參數只能拿到上游的數據
const durationSelector = (value) => {
?console.log(`# call durationSelector with ${value}`);
?return timer(2000);
};
const result$ = source$.pipe(throttle(durationSelector));
result$.subscribe(console.log);
理解: ?當source$產生第一個數據0的時候,throttle就和throttleTime一樣,毫不 猶豫地把這個數據0傳給了下游,在此之前會將這個數據0作為參數調用 durationSelector,然后訂閱durationSelector返回的Observable對象,在這個 Observable對象產生第一個對象之前,所有上游傳過來的數據都會被丟棄,于是,source$產生的數據1就被丟棄了,因為durationSelector返回的 Observable對象被訂閱之后2000毫秒才會產生數據。 這個過程,相當于throttle每往下游傳遞一個數據,都關上了上下游之間閘門,只有當durationSelector產生數據的時候才打開這個閘門。到了2000毫秒的時刻,durationSelector第二次被調用產生的Observable對象終于產生了多個數據,閘門被打開,source$產生的第三個數據2正好趕上,被 傳遞給了下游,同時關上閘門,這時候throttle會立刻退訂上一次 durationSelector返回的Observable對象,重新將數據2作為參數調用 durationSelector來獲得一個新的Observable對象,這個新的Observable對象產生數據的時候,閘門才會再次打開。可見,durationSelector產生Observable對象只有第一個產生的數據會有作用,而且這個數據的產生時機是關鍵,至于這個數據是個什么值不重要。
=> auditTime和audit操作符 <= 說明: ?這兩個都是在一個時間內,將最后一個產生的值發射出去,其余的值會被忽略掉。它們之間的區別是一個使用時間范圍管理,一個使用函數管理
參數: ?auditTime(時間范圍, 可選參數對象)
參數: ?audit(處理函數)
import { interval, auditTime } from "rxjs";
interval(1000)
? ?.pipe(auditTime(3000))
? ?.subscribe(
? ? ? ?(val) => console.log("auditTime:", val)
? ?); ? ? ? ? ?
// 第一個3s:0、1、2、3 --> 三秒末也是四秒初發出值3
// 第二個3s:4、5、6、7 --> 六秒末也是七秒初發出值7
// ...
理解: ?上面的時間寫3s,所以在第一個3s內產生了值0、1、2,在第3s結束的時候,產生了值3,根據定義,所以第一個3s發出的值是3,在物理上,第n秒結束的時候,也就是第n+1秒開始的時候,所以下一個3s是從第四秒開始,然后這個時間內產生4、5、6,第7s結束的時候,產生值7,將其傳遞給下游...后面的值都是這樣產生的,也就是它發出一個值傳遞到下游之后,它會等待下一個值到達,才會開始其計時
=> sampleTime和sample操作符 <= 說明: ?sampleTime的作用是搜尋一個時間范圍內的最后一個數據,將其傳遞給下游,如果這個時間范圍里面沒有值則不會傳值到下游,然后繼續下一個時間范圍的搜尋; 而sample有點不同,它的參數接收一個Observable對象來控制Observable,這個參數被稱為notifier,當notifier產生一個數據的時候, sample就從上游拿最后一個產生的數據傳給下游。
參數: ?sampleTime(時間范圍, 調度器)
參數: ?sample(observable對象)
interval(1000)
?.pipe(sampleTime(2000))
?.subscribe((res) => console.log("sampleTime:", res));
理解: ?上面數據是每隔1s產生一個,然后我搜尋時間范圍是2s,第一個2s,產生值0、1,將1傳遞出去,繼續第二個2s的搜尋,產生值2、3,將3傳遞出去...以此類推
(3)去重 => distinct操作符 <= 作用: ?上游同樣的數據只有第一次產生時會傳給下游,其余的都被舍棄掉了,判斷是否相等使用的是===
參數: ?distinct(一個函數來定制需要對比什么屬性, 一個Observable對象用于清空數據)
場景一: 基本使用 import { distinct, of } from "rxjs";
of(1, 3, 2, 5, 7, 1, 2)
?.pipe(distinct())
?.subscribe((res) => console.log(res));
場景二: ?對對象使用
import { distinct, of } from "rxjs";
of(
?{ name: "RxJS", version: "v4" },
?{ name: "React", version: "v15" },
?{ name: "React", version: "v16" },
?{ name: "RxJS", version: "v5" }
)
?// 這里規定數據中的name字段相同就算相同數據
?.pipe(distinct((x) => x.name))
?.subscribe((res) => console.log(res));
第二個參數: ?distinct在運作的時候自己會先創建一個集合,里面存放上游的不同數據,每次上游傳遞一個數據出來就對比集合中是否有元素跟它相等,相等就舍棄,如果上游數據無限多切都是不同的,那么這個集合就會有無限的數據在里面,這就存在數據壓力,為了解決這個問題,可以使用第二個可選參數,當這個Observable對象產生數據的時候,這個集合中的數據就會被清空。
=> distinctUntilChanged操作符 <= 作用: ?將上游中的連續數據過濾掉
參數: ?distinctUntilChanged(比較函數)
import { distinctUntilChanged, of } from "rxjs";
of(
?{ name: "RxJS", version: "v4" },
?{ name: "React", version: "v15" },
?{ name: "React", version: "v16" },
?{ name: "RxJS", version: "v5" }
)
?// a表示上一個值,b表示當前值
?.pipe(distinctUntilChanged((a, b) => a.name === b.name))
?.subscribe((res) => console.log(res));
注意: ?比較函數需要返回布爾值來確定由哪些屬性決定數據相等
(4)其它 => ignoreElements操作符 <= 作用: ?忽略上游所有元素,只關心complete和error事件
參數: ?沒有參數
import { ignoreElements, of } from "rxjs";
of(1, 2, 3)
?.pipe(ignoreElements())
?.subscribe((res) => console.log(res));
=> elementAt操作符 <= 說明: ?把上游數據當數組,只獲取指定下標的那?個數據,如果找不到,則拋出一個錯誤事件,如果不想出現錯誤,可以使用第二個參數,在找不到的時候,會將第二個參數做為默認值傳遞給下游
參數: ?elementAt(下標, 默認值)
import { elementAt, of } from "rxjs";
of(1, 2, 3)
?.pipe(elementAt(3, "使用默認值作為數據傳遞給下游"))
?.subscribe((res) => console.log(res));
=> single操作符 <= 作用: ?檢查上游是否只有一個滿足對應條件的數據,如果答案為是,就向下游傳遞這個數據;如果答案為否,就向下游傳遞一個異常
參數: ?single(過濾函數)
import { of, single } from "rxjs";
of(1, 2, 3)
?.pipe(single((x) => x % 2 === 0))
?.subscribe((res) => console.log(res));
七、轉化數據流 (1)映射數據 理解: ?映射數據是最簡單的轉化形式。假如上游的數據是A、B、C、D的序列,那么可以認為經過轉化類操作符之后,就會變成f(A)、f(B)、f(C)、f(D)的序列,其中f是一個函數,作用于上游數據之后,產生的就是傳給下游新的數據
=> map操作符 <= 說明: ?它接受一個函數作為參數,這個函數通常稱為project,指定了數據映射的邏輯 ,每當上游推下來一個數據,map就把這個數據作為參數傳給map的參數函數,然后再把函數執行的返回值 推給下游
參數: ?map(處理函數)
import { of, map } from "rxjs";
of(1, 2, 3)
?.pipe(
? ?map((item, index) => {
? ? ?// 處理函數的item表示當前值,index表示當前值得索引
? ? ?console.log(item, index);
? ?})
?)
?.subscribe();
2)無損回壓控制 說明: ?把上游在一段時間內產生的數據放到一個數據集合中,當時機合適時,把緩存的數據匯聚到一個數組或者Observable對象傳給下游,這就是無損回壓控制
=> windowTime和bufferTime操作符 <= 作用: ?用一個參數來指定產生緩沖窗口的時間間隔,以此緩存上游的數據
參數: ?windowTime(劃分區塊間隔, 內部區塊開始間隔, 最多緩存數據個數)
參數: ?bufferTime(劃分區塊間隔, 內部區塊開始間隔, 最多緩存數據個數)
場景一: 基本使用 import { timer, windowTime } from "rxjs";
const source$ = timer(0, 1000);
const result$ = source$.pipe(windowTime(4000));
理解: ?windowTime的參數是4000,也就會把時間劃分為連續的4000毫秒長度區塊,在每個時間區塊中,上游傳下來的數據不會直接送給下游,而是在該時間區塊的開始就新創建一個Observable對象推送給下游,然后在這個時間區塊內上游產生的數據放到這個新創建的Observable對象中。在每個4000毫秒的時間區間內,上游的每個數據都被傳送給對應時間區間的內部Observable對象中,當4000毫秒時間一到,這個區間的內部Observable對象就會完結,將結果打印出來會發現控制臺每隔1000毫秒打印一個數字出來,因此windowTime把上游數據傳遞出去是不需要延遲的
import { bufferTime, timer } from "rxjs";
const source$ = timer(0, 1000);
const result$ = source$
?.pipe(bufferTime(4000))
?.subscribe((res) => console.log(res));
理解: ?bufferTime產?的是普通的Observable對象,其中的數據是數組形式, bufferTime會把時間區塊內的數據緩存,在時間區塊結束的時候把所有緩存的數據放在一個數組再傳給下游,在控制臺你會看見每隔4秒打印一個數組,因此bufferTime把上游數據傳遞出去是需要延遲的
場景二: 第二個參數 作用: ?指定每個時間區塊開始的時間間隔。
import { timer, windowTime } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowTime(4000, 2000)).subscribe();
理解: ?windowTime使用第二個參數200之后,產生內部Observable的頻率更高了,每200毫秒就會產生一個內部Observable對象, 而且各內部Observable對象中的數據會重復,例如數據2和3就同時出現在第一個和第二個內部Observable對象中
import { bufferTime, timer } from "rxjs";
const source$ = timer(0, 1000);
source$
? ?.pipe(bufferTime(4000, 2000, 2))
? ?.subscribe(console.log);
理解: ?對于bufferTime,因為需要緩存上游數據,不管參數設定的數據區間有多短,都無法預期在這段時間內上游會產生多少數據,如果上游在短時間內爆發出很多數據,那就會給bufferTime很大的內存壓力,為了防止出現這種情況可以使用第三個可選參數來指定每個時間區間內緩存的最多數據個數。
注意: ?如果第一個參數比第二個參數大,那么就有可能出現數據重復,如果第二個參數比第一個參數大,那么就有可能出現上游數據的丟失。之所以說“有可能”,是因為丟失或者重疊的時間區塊中可能上游沒有產生數據,所以也就不會引起上游數據的丟失和重復。從這個意義上說來,windowTime和bufferTime如果用上了第二個參數,也未必是“止損”的回壓控制。
=> windowCount和bufferCount操作符 <= 作用: ?根據數據個數來決定內部的一個Observabe需要保存多少數據。
參數: ?windowCount(時間區間長度, 隔幾個數據重新開一個區間)
import { timer, windowCount } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowCount(4)).subscribe(console.log);
import { timer, windowCount } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowCount(4, 5)).subscribe(console.log);
理解: ?windowCount還支持可選的第二個參數,如果不使用第二個參數,那么所有的時間區間沒有重疊部分;如果使用了第二個參數,那么第二個參數依然是時間區間的長度,但是每間隔第二個參數毫秒數,就會新開一個時間區間
說明: ?對于bufferCount,和windowCount一樣,區別只是傳給下游的是緩存數據組成的數組
=> windowWhen和bufferWhen操作符 <= 說明: ?它們接受一個函數作為參數,這個函數返回一個Observable對象,用于控制上游的數據分割,每當返回的Observable對象產生數據或者完結時,windowWhen就認為是一個緩沖區塊的結束,重新開啟一個緩沖窗口。bufferWhen跟這個是類似的
參數: ?windowWhen(處理函數)
import { timer, windowWhen } from "rxjs";
const source$ = timer(0, 100);
const closingSelector = () => {
?return timer(400);
};
// 被訂閱的時候windowWhen就開始?作,?先開啟?個緩沖
// 窗口,然后?刻調?closingSelector獲得?個Observable對象,
// 在這個Observable對象輸出數據的時候,當前的緩沖窗?就關閉,
// 同時開啟?個新的緩沖窗口,然后再次調?closingSelector
// 獲得?個Observable對象
source$.pipe(windowWhen(closingSelector));
=> windowToggle和bufferToggle操作符 <= 說明: ?利?Observable來控制緩沖窗口的開和關。它需要兩個參數,第一個參數是一個Observable對象,當產生一個數據,代表一個緩沖窗口的開始;同時,第二個參數是一個函數,它也會被調用,用來獲得緩沖窗口結束的通知;其次函數的參數是第一個參數產生的數據,這樣就可以由前一個參數控制緩沖窗口的開始時機,函數控制其關閉時機,從而控制產生高階Observable的節奏;同理bufferToggle也是類似的
import { timer, windowToggle } from "rxjs";
const source$ = timer(0, 100);
const openings$ = timer(0, 400);
const closingSelector = (value) => {
?return value % 2 === 0 ? timer(200) : timer(100);
};
// opening$每400毫秒產??個數據,所以每400毫秒就會有?個
// 緩沖區間開始。每當opening$產??個數據時,closingSelector
// 就會被調?返回控制對應緩沖區間結束的Observable對象,
// 如果參數為偶數,就會延時200毫秒產??個數據,否則就延時100
// 毫秒產??個數據
source$.pipe(windowToggle(openings$, closingSelector));
=> window和buffer操作符 <= 說明: ?保持一個Observable類型的參數,稱為notifier$,每當notifer$產生一個數據,既是前一個緩存窗口的結束,也是后一個緩存窗口的開始;如果這個Observable完結了,那么window產生的一階Observable對象也會完結,buffer也是類似的
參數: ?window(一個Observable對象)
import { timer, window } from "rxjs";
const source$ = timer(0, 100);
// 一個不會完結的Observable
const notifer$ = timer(400, 400);
source$.pipe(window(notifer$));
import { timer, window } from "rxjs";
const source$ = timer(0, 100);
// 一個會完結的Observable
const notifer$ = timer(400);
source$.pipe(window(notifer$));
(3)高階map 說明: ?傳統map與高階map的區別在于其函數參數的返回值,前者是將一個數據映射成另一個數據 ,而后者是將一個數據轉變成一個Observable
import { interval, map } from "rxjs";
const source$ = interval(200);
// 這里每個數據都會轉換成一個包含數字0、1、2、3、4的
// Observable對象
source$.pipe(
? ? ? ? ? ?map(
? ? ? ? ? ? ? ?() => interval(100).take(5)
? ? ? ? ? ?)
? ? ? ?);
=> concatMap操作符 <= 說明: ?可以理解成concatMap = map + concatAll
import { interval, concatMap } from "rxjs";
const source$ = interval(200);
source$.pipe(
? ? ? ? ? ?concatMap(
? ? ? ? ? ? ? ?() => interval(100).take(5)
? ? ? ? ? ?)
? ? ? ?);
理解: ?第一個內部Observable對象中的數據被完整傳遞給了 concatMap的下游,但是,第一個產生的內部Observable對象沒有那么快處理,只有到第一個內部Observable對象完結之后,concatMap才會去訂閱第二個內部Observable,這樣就導致第二個內部Observable對象中的數據排在了后面,絕不會和第一個內部Observable對象中的數據交叉。
=> mergeMap操作符 <= 說明: ?可以理解成mergeMap = map + mergeAll
注意: ?一旦內部Observable發出一個值,它就會立即將該值傳遞給下游觀察者,而不管其他內部Observable是否已經發出或者完成了
import { interval, mergeMap, take } from "rxjs";
const source$ = interval(200).take(2);
source$.pipe(
? ? ? ? ? ?mergeMap(
? ? ? ? ? ? ? ?() => interval(100).take(5)
? ? ? ? ? ?)
? ? ? ?);
=> switchMap操作符 <= 說明: ?可以理解成switchMap = map + switchAll
注意: ?后產生的內部Observable對象優先級總是更高,只要有新的內部Observable對象產生,就立刻退訂之前的內部 Observable對象,改為從最新的內部Observable對象拿數據
import { interval, switchMap, take } from "rxjs";
const source$ = interval(200).take(2);
source$.pipe(
? ? ? ? ? ?switchMap(
? ? ? ? ? ? ? ?() => interval(100).take(5)
? ? ? ? ? ?)
? ? ? ?);
4)分組 => groupBy操作符 <= 參數: ?groupBy(一個處理函數,用于得到數據的key值)
機制: ?對于上游推送下來的任何數據,檢查這個數據的key值,如果這個key值是第一次出現,就產生一個新的內部Observable對象,同時這個數據就是內部Observable對象的第一個數據;如果key值已經出現過,就直接把這個數據塞給對應的內部Observable對象
import { groupBy, interval } from "rxjs";
const source$ = interval(200);
source$.pipe(groupBy((val) => val % 2));
理解: ?groupBy的函數參數取的是參數除以2的余數,所以會產生兩個key值:0和1。從彈珠圖中可以看到,0和2屬于第一個內部 Observable對象,第一個內部Observable對象收納所有key值為0的數據,1 和3屬于第二個內部Observable對象,因為它們對應的key值為1
=> partition操作符 <= 說明: ?partition接受一個判定函數作為參數,對上游的每個數據進行判定,滿足條件的放一個Observable對象,不滿足條件的放到另一個Observable對象,就這樣來分組,它返回的是一個數組,包含兩個元素,第一個元素是容納滿組判定條件的Observable對象,第二個元素當然是不滿足判定條件的Observable對象。
參數: ?partition(數據源, 判定函數)
import { partition, timer } from "rxjs";
const source$ = timer(0, 100);
// 解構賦值
const [even$, odd$] = partition(source$, (x) => x % 2 === 0);
even$.subscribe((value) => console.log("even:", value));
odd$.subscribe((value) => console.log("odd:", value));
注意: ?使用 partition一般也不會在后面直接使用鏈式調用,需要把結果以變量存儲,然后分別處理結果中的兩個Observable對象
(5)累計數據 => scan操作符 <= 說明: ?與reduce操作符類似,它也有一個求和函數參數和一個可選的seed種子參數作為求和初始值。scan和reduce的區別在于scan對上游每一個數據都會產生一個求和結果,reduce是對上游所有數據進行求和,reduce最多只給下游傳遞一個數據,如果上游數據永不完結,那reduce也永遠不會產生數據,scan完全可以處理一個永不完結的上游Observable對象
參數: ?scan(求和函數, 初始值)
import { interval, scan } from "rxjs";
const source$ = interval(1000);
source$
?.pipe(
? ?// sum:上一次求和后的值
? ?// current:當前需要進行求和的值
? ?scan((sum, current) => {
? ? ?console.log(sum, current);
? ? ?return sum + current;
? ?})
?)
?.subscribe();
理解: ?scan的規約函數參數把之前求和的值加上當前數據作為求和結果,每一次上游產生數據的時候,這個求和函數都會被調用,結果會傳給下游,同時結果也會由scan保存,作為下一次調用規約函數時的sum參數
=> mergeScan操作符 <= 說明: ?它在使用的時候跟scan是類似的,不過它的返回值是一個Observable對象
機制: ?每當上游推送一個數據下來,mergeScan就調用一次求和函數,并且訂閱返回的Observable對象,之后,這個Observable對象會使用類似merge的方式與下游合并,此時mergeScan會記住傳給下游的最后一個數據,當上游再次推送數據下來的時候,就把最后一次傳遞給下游的數據作為求和函數的sum參數
注意: ?如果mergeScan返回一個復雜或者不會完結的Observable對象,可能會導致上游數據和返回的Observable對象會交叉傳遞數據給下游,這樣那個值是最后一次傳遞給下游的會很難確定,因此在使用的時候返回的Observable里面包含的值盡量簡單
八、錯誤處理 說明: ?錯誤異常和數據一樣,會沿著數據流管道從上游向下游流動,流過所有的過濾類或者轉化類操作符,最后會觸發Observer的error方法,不過也不是所有錯誤都交給Observer處理,不然它需要處理的東西就太多了,此時就需要在數據管道中處理掉,這里處理異常有兩類方法:恢復和重試。在實際應用中,重試和恢復往往配合使用,因為重試往往是有次數限制的,不能無限重試,如果嘗試了次數上限之后得到的依然是錯誤異常, 還是要用“恢復”的方法獲得默認值繼續運算。
恢復:就是本來雖然產生了錯誤異常,但是依然讓運算繼續下去。最常見的場景就是在獲取某個數據的過程中發生了錯誤,這時候雖然沒有獲得正確數據,但是用一個默認值當做返回的結果,讓運算繼續。 重試:就是當發現錯誤異常的時候,認為這個錯誤只是臨時的,重新嘗試之前發生錯誤的操作,寄希望于重試之后能夠獲得正常的結果,其本質是在訂閱上游的同時,退訂上一次訂閱的內容 => catchError操作符 <= 作用: ?會在管道中捕獲上游傳遞過來的錯誤
參數: ?catchError(異常函數)
import { range, map, catchError, of } from "rxjs";
// 產生數據1、2、3、4、5
const source$ = range(1, 5);
// 遍歷數據發現在4這個位置會拋出一個錯誤
const error$ = source$.pipe(
?map((value) => {
? ?if (value === 4) {
? ? ?throw new Error("unlucky number 4");
? ?}
? ?return value;
?})
);
// 此時錯誤會被catchError的處理函數所接收
const catch$ = error$
?.pipe(
? ?// err:被捕獲的錯誤
? ?// caught$:上游緊鄰的那個Observable對象,此處就是指error$了
? ?catchError((err, caught$) => {
? ?
? ? ?// 函數的返回值是一個Observable對象,用來替代發生錯誤的那個數據,然后傳遞給下游
? ? ?return of(8);
? ?})
?)
?// 錯誤被catchError捕獲處理,所以此處不存在錯誤
?.subscribe(console.log);
注意: ?異常函數的第一個參數caught$比較有意思,因為它代表的是上游的 Observable對象,如果異常函數就返回caught$的話,相當于讓上游Observable 重新試一遍,所以,catch這個操作符其實不光有恢復的功能,也有重試的功能
=> retry操作符 <= 第一種參數: 直接傳一個數字 說明: ?它可以讓上游的Observable重新試一遍,以達到重試的目的,它接受一個數值參數number,number等于指定重試的次數, 如果number為負數或者沒有number參數,那么就是無限次retry,直到上游不再拋出錯誤異常為止
參數: ?retry(重試的次數)
注意: ?retry調用應該有一個正整數的參數,也就是要指定有限次數的重試,否則,很可能陷入無限循環,畢竟被重試的上游Observable只是有可能重試成功,意思就是也有可能重試不成功,如果真的運氣不好就是重試不成功,也真沒有必要一直重試下去,因為retry通常要限定重試次數,所以retry通常也要和catch配合使用,重試只是增加獲得成功結果的概率,當重試依然沒有結果的時候,還是要catch上場做恢復的操作
import { range, map, catchError, of, retry } from "rxjs";
const source$ = range(1, 5);
const error$ = source$.pipe(
?map((value) => {
? ?if (value === 4) {
? ? ?throw new Error("unlucky number 4");
? ?}
? ?return value;
?})
);
const catch$ = error$
?.pipe(
? ?// 重復兩次
? ?retry(2),
? ?catchError((err, caught$) => {
? ? ?return of(8);
? ?})
?)
?.subscribe(console.log);
第二種參數: 傳一個配置對象 配置對象的取值: count: 表示重試的次數限制。如果未指定,將會無限次重試,直到成功或者遇到無法處理的錯誤 delay: 表示每次重試之間的延遲時間。可以是一個數字,表示固定的延遲時間,也可以是一個函數,接受錯誤對象和重試次數作為參數,返回一個 Observable 或 Promise,用于動態計算延遲時間 resetOnSuccess: 表示是否在成功后重置重試計數。如果設置為true,則在每次成功后重置重試計數,否則會保持重試計數直到達到設定的重試次數或者遇到無法處理的錯誤 注意: delay地方如果寫一個函數在這里,這個函數會在發生錯誤時被調用,它有兩個參數,一個是err$表示發生錯誤的對象, 一個是retryCount表示當前重試的次數,它需要一個返回值,不然函數無法正確的獲取錯誤對象,導致重試不會繼續下去。 如果delay函數的返回值是一個Observable對象,那么每次這個對象吐出一個數據,就會重復一次,由此可以結合timer類似的操作符來達到延遲重復的目的 import { range, map, catchError, of, retry } from "rxjs";
const source$ = range(1, 5);
const error$ = source$.pipe(
?map((value) => {
? ?if (value === 4) {
? ? ?throw new Error("unlucky number 4");
? ?}
? ?return value;
?})
);
const catch$ = error$
?.pipe(
? ?// 重復兩次
? ?retry({
? ? ?count: 2,
? ? ?delay: (err$, retryCount) => {
? ? ? ?console.log(err$, retryCount);
? ? ? ?// 如果這里沒有返回值,下面只會出現一次重復
? ? ? ?return of(1000);
? ? ?},
? ?}),
? ?catchError((err, caught$) => {
? ? ?return of(8);
? ?})
?)
?.subscribe();
=> finalize操作符 <= 說明: ?它接受一個回調函數作為參數,上游無論是完結還是出現錯誤這個函數都會執行,只不過在一個數據流中只會作用一次,同時這個函數也無法影響數據流。
九、多播 說明: ?多播就是讓一個數據流的內容被多個Observer訂閱
(1)數據流的關系 說明: ?這里指的是Observable和Observer的關系,可以理解成前者播放內容,后者接受內容,播放的形式有單播、廣播和多播
理解概念: 單播: 就是一個播放者對應朵個收聽者,一對朵的關系,例如,你使用微信給你的朋友發送信息,這就是單播,你發送的信息只有你的朋友才能收到 廣播: 例如,有一個好消息你不想只分享給一個人,而是想告訴所有的同事或者同學,你就在辦公室或者教室里大聲吼出這個好消息,所有人都聽見了,這就是“廣播”,不過發布消息的根本不知道聽眾是什么樣的人,于是篩選消息的責任就完全落在了接收方的人上,以至于難以控制。 多播: 假如有一些八卦消息,你想要分享給一群朋友,但并不想分享給所有人,或者不想在公共場合大聲嚷嚷,于是你在微信上把相關朋友拉進一個群,在群里說出這個消息,只有被選中的朋友才能收到這條消息,這就叫做“多播” (2)Subject 承上啟下: ?根據第一部分對兩種Observable的理解不難得到Cold Observable實現的是單播,Hot Observable實現的多播
問題: ?如何把Cold Observable變成Hot Observable呢
解決: ?在函數式編程的世界里,有一個要求是保持不可變性,所以,要把一個Cold Observable對象轉換成一個Hot Observable對象,并不是去改變這個Cold Observable對象本身,而是產生一個新的Observable對象,包裝之前Cold Observable對象,這樣在數據流管道中,新的Observable對象就成為了下游,想要Hot數據源的Observer要訂閱的是這個作為下游的Observable對象,所以此時需要一個中間人來完成轉化,這個中間人就是Subject
中間人的職責: 要提供subscribe方法,讓其他?能夠訂閱一個的數據源,相當于一個Observable 要能夠有辦法接受推送的數據,包括Cold Observable推送的數據,相當于一個Observer => 雙重身份 <= 說明: ?這里說的是它具有具Observable和Observer的性質,雖然?個Subject對象是一個Observable,但是這兩個之間存在區別,區別在于Subject是存在記憶的,也就是它能夠記住有哪些Observer訂閱了自己,Subject有狀態,這個狀態就是所有Observer的列表,所以,當調用Subject的next函數時,才可以把消息通知給所有的Observer
import { Subject } from "rxjs";
const subject = new Subject();
// 1號Observer訂閱了subject
const subscription1 = subject.subscribe(
?(value) => console.log("on observer 1 data: " + value),
?(err) => console.log("on observer 1 error: " + err.message),
?() => console.log("on observer 1 complete")
);
// 調?subject的next推送了數據1,這個消息只有1號Observer響應,
// 因為當前只有?個Observer。同時因為next(1)在2號Observer
// 加?之前執?,所以2號Observer沒有接收到1
subject.next(1);
// 2號Observer也訂閱了subject
subject.subscribe(
?(value) => console.log("on observer 2 data: " + value),
?(err) => console.log("on observer 2 error: " + err.message),
?() => console.log("on observer 2 complete")
);
// 這時候調?subject的next?法推送數據2,subject現在知道??
// 有兩個Observer,所以會分別推送消息給1號和2號Observer
subject.next(2);
// subject的1號Observer通過unsubscribe?法退訂
subscription1.unsubscribe();
// 這時候subject知道??只有?個2號Observer,
// 所以,當調?complete?法時,只有2號Observer接到通知
subject.complete();
特點: ?后加入的觀察者,并不會獲得加入之前Subject對象上通過next推送的數據
實現多播: ?既然Subject既有Observable又有Observer的特性,那么,可以讓一個Subject對象成為一個Cold Observable對象的下游,其他想要Hot數據源就可以訂閱這個Subject對象來達到轉換的目的,以此完成多播的操作。
=> 不能重復使用 <= 說明: ?Subject對象也是一個Observable對象,但是因為它有完結的狀態,所以不像Cold Observable對象一樣每次被subscribe都是一個新的開始,正因為如此,Subject對象是不能重復使用的,所謂不能重復使用,指的是一個 Subject對象一旦被調用了complete或者error函數,那么,它作為Observable 的生命周期也就結束了,后續還想調用這個Subject對象的next函數傳遞數據給下游,會沒有任何反應。
import { Subject } from "rxjs";
const subject = new Subject();
// ?先1號Observer成為subject的下游
subject.subscribe(
?(value) => console.log("on observer 1 data: " + value),
?(err) => console.log("on observer 1 error: " + err.message),
?() => console.log("on observer 1 complete")
);
// 然后通過subject的next函數傳遞了1和2
subject.next(1);
subject.next(2);
// 緊接著調?了subject的complete函數,結束了subject的?命周期
subject.complete();
// 2號Observer也成為subject的下游,但是,這時候subject已經完結了
subject.subscribe(
?(value) => console.log("on observer 2 data: " + value),
?(err) => console.log("on observer 2 error: " + err.message),
?() => console.log("on observer 2 complete")
);
// 后續通過next傳遞參數3的調?,不會傳遞給2號Observer,
// 也不會傳遞給1號Observer,但是可以獲取subject的complete通知,
// 可以這樣認為,當?個Subject對象的complete函數被調?之后,
// 它暴露給下游的Observable對象就是?個由empty變量產?的直接
// 完結的Observable對象
subject.next(3);
注意: ?在Subject的生命周期結束之后,再次調用next方法沒有任何反應,也不會拋出錯誤,這樣可能會認為上游所有數據都傳遞成功了,這是不合理的,由于Subject是一個Observable,那么它就會存在一個unsubscribe的方法,表示它已經不管事了,再次調用其next方法就會報錯,所以可以像下面這樣達到警示的目的。
import { Subject, interval, take } from "rxjs";
// tick$會間隔?秒鐘吐出數據,調?下游subject的next函數
const tick$ = interval(1000).pipe(take(5));
const subject = new Subject();
tick$.subscribe(subject);
subject.subscribe((value) => console.log("observer: " + value));
// 在1.5秒的時候subject的unsubscribe函數被調?,
// 所以,2秒以后的時間,tick$還要調?subject的
// next就會拋出?個錯誤異常
setTimeout(() => {
?subject.unsubscribe();
}, 1500);
=> 多個上游 <= 說明: ?理論上 可以用一個Subject合并多個Observable的數據流,但是這樣做并不合適,原因 在于任何一個上游數據流的完結或者出錯信息都可以終結Subject對象的生命。
import { Subject, interval, take, map } from "rxjs";
// 這兩個數據流都是通過interval產?的Cold Observable對象,
// 每隔?秒鐘吐出?個整數,然后利?map轉化為間隔?秒鐘吐出
// ?個固定的字符串,利?take只從兩個數據流中分別拿兩個數據
const tick1$ = interval(1000).pipe(
?map(() => "a"),
?take(2)
);
const tick2$ = interval(1000).pipe(
?map(() => "b"),
?take(2)
);
const subject = new Subject();
tick1$.subscribe(subject);
tick2$.subscribe(subject);
subject.subscribe((value) => console.log("observer 1: " + value));
subject.subscribe((value) => console.log("observer 2: " + value));
// tick1$每隔?秒鐘吐出?個a字符串,吐出兩個之后完結,
// tick2$同樣每隔?秒鐘吐出?個字符串,只不過吐出的是b,
// 同樣是吐出兩個之后完結。因為subject訂閱了tick1$和tick2$,
// 所以理論上結果應該是下面這八個值,但其實并不是
// observer 1: a
// observer 2: a
// observer 1: b
// observer 2: b
// observer 1: a
// observer 2: a
// observer 1: b
// observer 2: b
理解: ?為tick1$是由take產生的,也就是說在吐出2個數據之后就會調用下游的complete函數,也就是調用subject的complete函數,此時它已經完結,后續的next的方法是沒有效果的,這也是為什么第二個b不會有效果的原因。
=> 錯誤處理 <= 說明: ?如果Subject有多個Observer,并且Subject的某個下游數據流產生了一個錯誤異常,而且這個錯誤異常沒有被Observer處理,那這個Subject其他的Observer都會失敗,為了避免這種情況的發生,每有一個Observer的時候,就需要給它一個處理錯誤的地放就可以解決這個問題了。
十、調度器Scheduler (1)作用 作用: ?用于控制RxJS數據流中數據消息的推送節奏
舉例: ?這里以帶Scheduler類型的參數的操作符range為例,不過使用調度器的這種寫法已經廢棄,這里只是舉例而已
// 不使用調度器
import { range } from "rxjs";
const source$ = range(1, 3);
console.log("before subscribe");
source$.subscribe(
?(value) => console.log("data: ", value),
?(error) => console.log("error: ", error),
?() => console.log("complete")
);
console.log("after subscribe");
解釋: ?因為range是同步輸出數據,所有當Observer添加之后,會把所有數據全部吐出,所以上面的代碼也是完全同步執行 的。
// 使用調度器,寫法已經廢棄
import { range, asapScheduler } from "rxjs";
const source$ = range(1, 3, asapScheduler);
console.log("before subscribe");
source$.subscribe(
?(value) => console.log("data: ", value),
?(error) => console.log("error: ", error),
?() => console.log("complete")
);
console.log("after subscribe");
思考: ?所以這里的asapScheduler決定了數據推送任務不是同步執行,因為range數據的吐出是在after subscribe字符串之后的,那么什么是Scheduler呢?
RxJS中定義Scheduler: 它是一種數據結構 它是一個執行環境 它有一個虛擬時鐘 解釋: 所謂Scheduer是?種數據結構,指的是Scheduler對象可以根據優先級或者其他某種條件來安排任務執行隊列 Scheduler可以指定一個任務何時何地執行,所以它是一個執行環境在RxJS的數據流世界中,Scheduler說現在是幾點幾分幾秒,那現在就是幾點幾分幾秒,所以Scheduler就像是這個世界中的權威標準時鐘,正因為Scheduler提供的虛擬時鐘可以被操縱,所以可以利用Scheduler來控制數據流中數據的流動節奏。 (2)內置的Scheduler 調度器 說明 null 默認不使用,代表同步執行的情況 queueScheduler 利用隊列實現,用于迭代操作 asapScheduler 在當前工作之后,下個工作之前執行,用于異步轉換 asyncScheduler 用于基于時間的操作 animationFrameScheduler 用于創建流暢的瀏覽器動畫
(3)支持Scheduler的操作符 => observeOn操作符 <= 作用: ?根據上游的Observable對象產生出一個新的Observable對象出來,讓這個新的Observable對象吐出的數據由指定的Scheduler來控制
參數: ?observeOn(調度器)
import { range, observeOn, asapScheduler } from "rxjs";
const source$ = range(1, 3);
const asapSource$ = source$.pipe(observeOn(asapScheduler));
console.log("before subscribe");
// 訂閱新產生的Observable發現受調度器的影響
asapSource$.subscribe(
?(value) => console.log("data: ", value),
?(error) => console.log("error: ", error),
?() => console.log("complete")
);
console.log("after subscribe");
import { range, observeOn, asapScheduler } from "rxjs";
const source$ = range(1, 3);
const asapSource$ = source$.pipe(observeOn(asapScheduler));
console.log("before subscribe");
// 訂閱上游數據發現不受調度器的影響
source$.subscribe(
?(value) => console.log("data: ", value),
?(error) => console.log("error: ", error),
?() => console.log("complete")
);
console.log("after subscribe");
注意: ?observeOn只控制新產生的Observable對象的數據推送節奏,并不能改變上游Observable對象所使用的Scheduler
=> subscribeOn操作符 <= 說明: ?這個跟observeOn的區別在于前者是控制什么時候訂閱Observable對象,而后者是控制Observable對象何時往下游推送數據,使用和參數是類似的。
該文章在 2024/11/12 11:11:19 編輯過