中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

RxJS 緩存高級教程

2018-07-20    來源:編程學(xué)習(xí)網(wǎng)

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

在開發(fā) Web 應(yīng)用程序時,性能一般都是出于最高優(yōu)先級的。對于 Angular 項目,我們有很多途徑去提升程序性能,例如搖樹優(yōu)化(tree-shaking)、AoT(ahead-of-time 編譯)、模塊懶加載(lazy loading)以及緩存。為了能夠更好地概覽全局,以便提高 Angular 應(yīng)用程序的性能,我們強烈推薦使用 Minko Gechev 的 Angular 性能檢查表 。本文主要聚焦于 緩存

事實上,緩存是提升網(wǎng)站性能的最有效的方法之一,尤其是當(dāng)用戶出于受限網(wǎng)絡(luò)帶寬或慢速網(wǎng)絡(luò)的情況。

有很多種緩存數(shù)據(jù)或資源的方法。靜態(tài)資源通常使用標(biāo)準(zhǔn)的瀏覽器緩存或 Service Worker 進行緩存。當(dāng)然,Service Worker 也可以緩存 API 請求,但它們更適合緩存圖片、HTML、JS 或 CSS 等文件。緩存系統(tǒng)數(shù)據(jù),我們則會選用另外的機制。

不管我們選擇怎樣的機制,緩存都會 改善系統(tǒng)的響應(yīng)性 , 降低網(wǎng)絡(luò)消耗在網(wǎng)絡(luò)中斷的情況下依然能夠使用內(nèi)容 。換句話說,當(dāng)內(nèi)容在更接近用戶的地方被緩存,例如就在客戶端,請求就不會引起另外的網(wǎng)絡(luò)活動;緩存的數(shù)據(jù)可以被更快返回,因為我們不需要進行完整的網(wǎng)絡(luò)周期。

這篇文章我們將使用 RxJS 以及 Angular 提供的各種工具實現(xiàn)一種高級的緩存機制。

動機

一直以來,都有一個疑問:如何在一個頻繁使用 Observable 對象的 Angular 程序中緩存數(shù)據(jù)。很多人都知道如何使用 Promise 緩存數(shù)據(jù),但是對如何在函數(shù)式的響應(yīng)式編程中緩存數(shù)據(jù)束手無策。因為后者的復(fù)雜性(龐大的 API)、完全不同的使用方式(從命令式編程到指令式編程)以及許多概念。因此,將基于 Promise 的緩存系統(tǒng)移植到 Observable 是非常困難的,尤其是還需要實現(xiàn)一些高級功能的時候。

Angular 應(yīng)用程序通常使用由 HttpClientModule 提供的 HTTPClient 實現(xiàn) HTTP 請求。它的所有 API 都是基于 Observable 的。這意味著, HTTPClient 的函數(shù),例如 get 、 post 、 put 和 delete 都返回一個 Observable 。 Observable 天生是懶的,只有當(dāng)我們調(diào)用了 subscribe 函數(shù)之后才會發(fā)送請求。但是,對同一個 Observable 多次調(diào)用 subscribe 函數(shù),會一遍一遍地創(chuàng)建源 Observable 對象,也就是為每一次定于執(zhí)行一次請求。我們將這種模式成為冷模式(cold)。

如果你對此完全不了解,可以閱讀我們的另外一篇文章 《Observable 的冷模式和熱模式》 。

這種行為使得實現(xiàn) Observable 緩存機制變得有些棘手。簡單的實現(xiàn)通常需要大量固定模式的代碼,而且最終可能需要繞過 RxJS。這是一種解決思路,但如果我們依然希望使用 Observable 的強大功能,這種實現(xiàn)就不值得推薦。簡單來說,我們并不想給法拉利安裝一個摩托車引擎,對吧?

需求

在我們深入代碼之前,首先定義好我們的高級緩存機制的需求。

我們要開發(fā)一個名為 笑話世界 的應(yīng)用。這是一個簡單的 app,隨機顯示給定分類里面的笑話。為了盡可能簡單,我們只給出一個分類。

這個 app 有三個組件: AppComponent 、 DashboardComponent 和 JokeListComponent 。

AppComponent 是程序入口,顯示一個工具欄和一個安裝當(dāng)前路由狀態(tài)填充的 <router-outlet> 。

DashboardComponent 只用來顯示分類列表。我們可以從這里導(dǎo)航到 JokeListComponent 。 JokeListComponent 負責(zé)將笑話列表顯示到屏幕。

笑話由 Angular 的 HttpClient 服務(wù)從服務(wù)器獲取。為保持組件的響應(yīng)、解耦,我們要創(chuàng)建一個 JokeService ,來幫助我們獲取數(shù)據(jù)。組件只需要注入這個服務(wù),通過其公開的 API 訪問數(shù)據(jù)即可。

以上所有都是我們的系統(tǒng)架構(gòu),并沒有引入緩存。

當(dāng)我們從主頁到列表視圖時,我們可以從緩存請求數(shù)據(jù),而不是每次都從服務(wù)器獲取。緩存中的數(shù)據(jù)每 10 秒自動更新。

當(dāng)然,每 10 秒獲取數(shù)據(jù)并不是每個產(chǎn)品都需要遵守的固定準(zhǔn)則,我們可能需要更復(fù)雜的實現(xiàn)來更新緩存(例如使用 web socket 推送更新)。但是,現(xiàn)在我們可以盡可能保持簡單,集中精力解決緩存的問題。

無論如何,我們都希望收到某種更新的通知。就我們的程序而言,我們不希望自動更新 UI( JokeListComponent )的數(shù)據(jù),而是用戶要求 UI 更新時才去更新緩存。為什么?想象下這樣的場景:用戶正在閱讀一個笑話,因為數(shù)據(jù)的自動更新,所有笑話突然都消失了。這無疑非常令人反感,是一種非常差的用戶體驗。因此,我們的用戶在有新數(shù)據(jù)的時候會收到通知。

為了更有趣一點,我們還希望用戶能夠強制刷新緩存。這與僅僅更新 UI 不同,因為強制刷新意味著要從服務(wù)器請求數(shù)據(jù)、更新緩存、然后更新 UI。

現(xiàn)在我們總結(jié)一下我們想要干什么:

  • 程序有兩個組件,當(dāng)從組件 A 導(dǎo)航到組件 B 時,組件 B 的數(shù)據(jù)最好從緩存獲取,而不是每次都從服務(wù)器獲取
  • 每 10 秒更新緩存
  • UI 中的數(shù)據(jù)并不會自動更新,而是由用戶強制更新
  • 用戶可以強制刷新,從服務(wù)器重新獲取數(shù)據(jù)、更新緩存和 UI

下面是我們即將構(gòu)建的 app 預(yù)覽圖:

實現(xiàn)基本的緩存

我們從一個簡單的實現(xiàn)開始,逐漸過渡到最終的全功能版本。

第一步是創(chuàng)建一個新的服務(wù)。

然后,我們添加兩個接口,一個用來描述 Joke 的屬性,另一個用來描述 HTTP 請求的返回。這樣的接口會讓我們的程序更符合 TypeScript 的要求,同時也更方便開發(fā)。

export interface Joke {
  id: number;
  joke: string;
  categories: Array<string>;
}
 
export interface JokeResponse {
  type: string;
  value: Array<Joke>;
}

下面我們實現(xiàn) JokeService 。我們不想透露數(shù)據(jù)究竟是從緩存獲取的,還是從服務(wù)器獲取的,因此,我們只提供一個返回值類型為 Observable 的 jokes 屬性,用于獲取笑話列表。

為了執(zhí)行 HTTP 請求,我們需要為我們的服務(wù)注入 HttpClient 。

下面是 JokeService 的代碼框架:

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
 
@Injectable()
export class JokeService {
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    ...
  }
}

下面,我們實現(xiàn)一個私有的 requestJokes() 函數(shù),通過 HttpClient 的 GET 請求獲取笑話列表。

import { map } from 'rxjs/operators';
 
@Injectable()
export class JokeService {
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    ...
  }
 
  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

現(xiàn)在,我們有了實現(xiàn)獲取笑話的函數(shù)的一切準(zhǔn)備。

一個顯而易見的實現(xiàn)是,直接返回 this.requestJokes() ,但這無法滿足我們的需要。我們知道,所有 HttpClient 暴露的函數(shù),例如 get() ,都是返回一個冷 Observable 。這意味著每個訂閱者都會重新出發(fā)完整的數(shù)據(jù)流,從而帶來額外的 HTTP 請求。畢竟,緩存的意義就在于提高系統(tǒng)的加載時間,將網(wǎng)絡(luò)請求限制到最低的水平。

所以,我們想讓我們的流變成熱的。不僅僅如此,每一個新的訂閱者應(yīng)該獲取到最近的緩存值。事實上,有一個很方便的操作符可以實現(xiàn)這一點: shareReplay 。這個操作符返回一個 Observable 對象。該對象會在底層共享一個訂閱,也就是 this.requestJokes() 返回的那個 Observable 。

另外, shareReplay 接受一個可選參數(shù) bufferSize ,對于我們的用例非常有用。 bufferSize 決定了重現(xiàn)緩存(replay buffer)的最大元素數(shù),也就是被緩存、能夠重現(xiàn)給每一個訂閱者的元素數(shù)。在我們的場景中,我們只需要最近的一個值,因此將 bufferSize 設(shè)置為 1。

我們看一下實際代碼,看看我們剛剛學(xué)到了什么:

import { Observable } from 'rxjs/Observable';
import { shareReplay, map } from 'rxjs/operators';
 
const API_ENDPOINT = 'https://api.icndb.com/jokes/random/5?limitTo=[nerdy]';
const CACHE_SIZE = 1;
 
@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    if (!this.cache$) {
      this.cache$ = this.requestJokes().pipe(
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  private requestJokes() {
    return this.http.get<JokeResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

好了,我們已經(jīng)討論過上面的大部分代碼。但等等,私有的 cache$ 屬性以及訪問函數(shù)里面的 if 語句是什么意思?答案很簡單。如果我們直接返回 this.requestJokes().pipe(shareReplay(CACHE_SIZE)) ,那么,每一個訂閱者都會創(chuàng)建一個新的緩存實例。但是,我們想要所有訂閱者共享一個實例。因此,我們將這個實例保持在私有的 cache$ 屬性中,在第一次調(diào)用的時候初始化這個屬性。這樣,所有訂閱者都會訪問到這一個共享實例,而不是每次創(chuàng)建一個新的對象。

我們看一下上面代碼實現(xiàn)的更直觀的表示:

上圖是一個 時序圖 ,它描述了場景中涉及的對象,請求一個笑話列表,以及對象之間交換信息的時序,F(xiàn)在我們暫停一下,了解下發(fā)生了什么。

我們從導(dǎo)航到列表組件的儀表盤開始。

組件初始化之后,Angular 會調(diào)用 ngOnInit 生命周期鉤子。在這里,我們調(diào)用 JokeService 暴露的訪問器 jokes 請求笑話列表。由于這是我們第一次請求數(shù)據(jù),所以緩存是空的,并且沒有初始化,這意味著 JokeService.cache$ 是 undefined 。我們在訪問器內(nèi)部調(diào)用了 requestJokes() 。這會返回我們一個 Observable 對象,其數(shù)據(jù)來自服務(wù)器。同時,我們使用 shareReplay 運算符來獲得所期望的行為。

shareReplay 操作符會在原始源于未來所有的訂閱者之間自動創(chuàng)建一個 ReplaySubject 。只要訂閱者數(shù)量從零變?yōu)橐,它就會將這個 Subject 關(guān)聯(lián)到底層數(shù)據(jù)源,然后廣播其所有值。未來所有訂閱者都會關(guān)聯(lián)到這個 Subject ,所以實際上只有一個訂閱關(guān)聯(lián)到底層的那個冷 Observable 。這被稱為 多播 (multicasting),定義了我們的簡單緩存的基礎(chǔ)。

一旦數(shù)據(jù)從服務(wù)器獲取到,就會被緩存。

注意,在時序圖中, Cache 是一個獨立的對象,目的是用來說明那個從消費者(訂閱者)到底層源(HTTP 請求)之間創(chuàng)建的 ReplaySubject 。

下一次我們?yōu)榱斜碇薪M件請求數(shù)據(jù)的時候,我們的緩存就會發(fā)送最近的值給消費者。這時候并不會有額外的 HTTP 調(diào)用。

很簡單,對吧?

為了真正的區(qū)別開來,我們更進一步,從 Observable 的層次看看緩存是如何工作的,F(xiàn)在我們使用 彈子圖 (marble diagram)看看流:

彈子圖非常清晰地顯示出,底層 Observable 只有一個訂閱,所有消費者都訂閱到這個共享的 Observable ,也就是這個 ReplaySubject 對象。我們也能夠看出,只有第一個訂閱者觸發(fā)了 HTTP 調(diào)用,其余的都是直接獲取重現(xiàn)的值。

最后,我們看一下 JokeListComponent 是如何顯示數(shù)據(jù)的。首先,注入 JokeService 對象。之后,在 ngOnInit 中,使用服務(wù)對象暴露的訪問器初始化一個 jokes$ 屬性。這個訪問器會返回一個 Array<Joke> 類型的 Observable 對象,這正是我們所需要的。

@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;
 
  constructor(private jokeService: JokeService) { }
 
  ngOnInit() {
    this.jokes$ = this.jokeService.jokes;
  }
 
  ...
}

注意,我們并沒有馬上訂閱 jokes$ ,而是在模板中使用了 async 管道,因為這個管道充滿了奇跡。好奇嗎?請閱讀文章 《了解關(guān)于 AsyncPipe 你不知道的三件事情》 。

<mat-card *ngFor="let joke of jokes$ | async">...</mat-card>

太棒了!這就是我們實際使用的簡單緩存。為了驗證是不是只有一次請求,打開 Chrome 的 DevTools,點擊 Network 選項卡,選擇 XHR。開啟儀表盤,導(dǎo)航到列表視圖,然后再導(dǎo)航回來。

自動更新

現(xiàn)在我們用幾行代碼構(gòu)建了一個簡單的緩存機制。事實上,很多工作都是由 shareReplay 操作符完成的。這個操作符會實現(xiàn)緩存和重現(xiàn)大多數(shù)數(shù)據(jù)值。

當(dāng)數(shù)據(jù)不會在后臺更新時,這就已經(jīng)很好地工作了。那么,如果數(shù)據(jù)每隔幾分鐘就會改變了呢?我們當(dāng)然不應(yīng)該強制用戶為了從服務(wù)器獲取最新數(shù)據(jù)必須要刷新這個頁面。

如果我們的緩存每隔 10 秒鐘就會在后臺更新呢?是不是很酷?肯定的!作為用戶,我們不需要重新加載頁面;數(shù)據(jù)改變后,UI 會隨之更新。再說一遍,在真實的應(yīng)用中,我們一般不會主動拉取數(shù)據(jù),而是要服務(wù)器 推送 通知。對于我們的小示例程序而言,能夠 每隔 10 秒刷新 一次就很好了。

這種實現(xiàn)很簡單。簡而言之,我們需要創(chuàng)建一個 Observable ,根據(jù)給定的時間間隔發(fā)出一系列值。或者簡單來說,就是我們需要每 X 毫秒產(chǎn)生一個值。對達到這一目的,我們有很多種實現(xiàn)。

第一個選擇是使用 interval 操作符。這個操作符要求一個可選參數(shù),定義了每次發(fā)送值得時間?紤]下面的代碼:

import { interval } from 'rxjs/observable/interval';
 
interval(10000).subscribe(console.log);

這里我們創(chuàng)建了一個能夠發(fā)送無限整數(shù)序列的 Observable 對象。該對象每隔 10 秒會發(fā)出一個整數(shù)值。這意味著第一個值也會在延遲給定時間之后才會發(fā)出。為了更好理解這一行為,我們可以看一下 interval 的彈子圖。

對,就像我們想的那樣。這一個值會“延遲”,這不是我們想要的。為什么?因為如果我們從儀表盤導(dǎo)航到列表組件,希望閱讀一些有趣的笑話,我們得等待 10 秒鐘,才會從服務(wù)器請求數(shù)據(jù),然后顯示到屏幕。

我們通過引入另外一個操作符來解決這一問題。這個操作符是 startWith(value) ,可以先發(fā)出一個給定值作為初始值。但我們可以做得更好!

我會告訴你,其實有一個操作符可以在給定的一段是時間之后(初識延遲)按照特定時間(正常間隔)發(fā)出一個值的序列。這就是 timer 。

可視化時間!

酷!但這真的解決我們的問題了嗎?是的。如果我們將初始值設(shè)置為 0,將間隔時間設(shè)置為 10 秒,我們就會有類似 interval(10000).pipe(startWith(0)) 的行為,但只用了一個操作符。

讓我們把這種實現(xiàn)帶到我們的緩存機制中去吧。

我們需要建立一個 定時器 ,每次觸發(fā)都發(fā)送一個 HTTP 請求,去服務(wù)器獲取新的數(shù)據(jù)。也就是說,每一次定時器觸發(fā),我們需要使用 switchMap 轉(zhuǎn)換到一個 Observable 對象,在訂閱時獲取新的笑話列表。使用 switchMap 還有一個額外的好處是,我們可以避免競爭條件。這是這個操作符天生就有的特點,它會取消 Observable 之前的訂閱,僅僅為最新的對象發(fā)出值。

我們的緩存的剩余部分不需要改變,意味著我們的流還是多播的,所有的訂閱者共享一個底層源。

再說一遍, shareReplay 天生就會將新的值廣播給所有訂閱者,并且將最近的值發(fā)送給新的訂閱者。

正如我們在彈子圖看到的那樣, timer 每 10 秒發(fā)出一個值。每一個值都會轉(zhuǎn)換成一個內(nèi)部 Observable 對象,去獲取我們所需要的數(shù)據(jù)。因為我們使用了 switchMap ,我們避免了競爭條件,因此消費者只會接收到值 1 和 3。內(nèi)部 Observable 對象發(fā)出的第二個值會被“跳過”,因為新值到了的時候已經(jīng)取消了。

讓我們利用學(xué)到的知識,更新下 JokeService 。

import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay } from 'rxjs/operators';
 
const REFRESH_INTERVAL = 10000;
 
@Injectable()
export class JokeService {
  private cache$: Observable<Array<Joke>>;
 
  constructor(private http: HttpClient) { }
 
  get jokes() {
    if (!this.cache$) {
      // Set up timer that ticks every X milliseconds
      const timer$ = timer(0, REFRESH_INTERVAL);
 
      // For each tick make an http request to fetch new data
      this.cache$ = timer$.pipe(
        switchMap(_ => this.requestJokes()),
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  ...
}

厲害!想自己試試嗎?下面是一個現(xiàn)實的示例。從儀表盤開始,到列表組件,然后看看有什么魔法出現(xiàn)。等幾秒鐘,就可以看到更新的動作。記住,緩存每 10 秒刷新一次,但通過修改 REFRESH_INTERVAL 的值就可以改變這一間隔。

發(fā)送更新通知

讓我們回顧一下目前所構(gòu)建的內(nèi)容。

當(dāng)我們通過 JokeService 請求數(shù)據(jù)時,我們希望數(shù)據(jù)從緩存獲得,而不是每次都去請求服務(wù)器。緩存的底層數(shù)據(jù)每 10 秒刷新一次。刷新之后,數(shù)據(jù)會推送給組件,組件自動更新。

這有點問題。想象一下,如果我們是一個用戶,正在閱讀某個笑話,突然間這個笑話消失了,因為 UI 自動更新了。這無疑非常討厭,是很壞的用戶體驗。

因此,我們的用戶應(yīng)該在有新數(shù)據(jù)時獲得 通知 。換句話說,我們希望用戶自己去更新 UI。

事實證明,我們不需要修改服務(wù)來實現(xiàn)這個功能。這個邏輯很簡單。畢竟,我們的服務(wù)不應(yīng)該關(guān)心發(fā)送通知的問題,視圖應(yīng)該負責(zé)何時怎樣更新屏幕的數(shù)據(jù)。

首先,我們需要給用戶展示一個 初始值 ,否則的話在第一次緩存更新之前,屏幕就是空白的。我們馬上就會看到原因。設(shè)置一個初始化的流與調(diào)用訪問器函數(shù)一樣簡單。另外,既然我們只關(guān)心第一次的值,我們可以使用 take 操作符。

為了邏輯上的可讀性,我們創(chuàng)建一個副主函數(shù) getDataOnce() 。

import { take } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  ...
  ngOnInit() {
    const initialJokes$ = this.getDataOnce();
    ...
  }
 
  getDataOnce() {
    return this.jokeService.jokes.pipe(take(1));
  }
  ...
}

從我們的需求可以知道,我們只想在用戶真正需要更新 UI 的時候出去更新,而不是自動更新。你會問,用戶如何要求更新界面?當(dāng)用戶點擊了“更新”按鈕時,我們就知道用戶想要更新 UI。這個按鈕和通知一起顯示,F(xiàn)在,我們不去關(guān)心通知,把注意力集中在點擊按鈕之后的更新邏輯上面。

為了實現(xiàn)這一目的,我們需要從 DOM 事件(也就是按鈕點擊的事件)創(chuàng)建一個 Observable 對象。有很多種方法可以實現(xiàn),但最常用的是使用 Subject 對象作為模板與組件視圖邏輯之間的 橋梁 。簡單來說, Subject 即是 Observer 又是 Observable 。 Observable 定義了數(shù)據(jù)流,能夠發(fā)出數(shù)據(jù); Observer 則能夠訂閱 Observable 并且接收數(shù)據(jù)。

好消息是,我們可以在模板的事件綁定中直接使用 Subject ,然后在事件發(fā)出時調(diào)用其 next 函數(shù)。這會產(chǎn)生一個特定值,廣播給所有監(jiān)聽這個值的 Observer 對象。注意,如果 Subject 是 void 類型的,我們可以簡單地忽略這個值。事實上,我們的用例就是這樣子的。

讓我們繼續(xù),實例化一個 Subject 對象吧。

import { Subject } from 'rxjs/Subject';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...
}

下面繼續(xù),將這個值用到模板中。

<div class="notification">
  <span>There's new data available. Click to reload the data.</span>
  <button mat-raised-button color="accent" (click)="update$.next()">
    <div class="flex-row">
      <mat-icon>cached</mat-icon>
      UPDATE
    </div>
  </button>
</div>

注意我們在 <button> 標(biāo)簽的點擊事件使用了 事件綁定 語法。當(dāng)點擊按鈕時,我們發(fā)出一個 幽靈 值,可以被所有活動的 Observer 注意到。我們將其稱為“幽靈”,是因為我們不會傳遞任何值,或者說只是一種 void 類型的值。

另一種實現(xiàn)是使用 @ViewChild() 裝飾器結(jié)合 RxJS 的 fromEvent 操作符。但是,這會要求我們“混合” DOM 以及從視圖查詢 HTML 元素。使用 Subject ,我們僅僅將兩邊橋接起來,除了向按鈕添加事件綁定之外,并不再觸及 DOM。

好了,視圖設(shè)置完畢,我們可以切換回更新 UI 的邏輯部分。

那么,更新 UI 意味著什么?既然緩存已經(jīng)在后臺自動刷新了,我們想要在點擊按鈕之后從緩存獲取到最新值來渲染界面,對吧?這意味著我們的 源數(shù)據(jù)流 也是一個 Subject 。每當(dāng) update$ 發(fā)出一個值,我們都希望將其 映射 到一個能夠給我們最新值的 Observable 對象。換句話說,我們正在處理的是所謂“ 高階可觀察對象 (Higher Order Observable)”,一個能發(fā)射 Observable 的 Observable 對象

之前我們知道, switchMap 正是為了解決這個問題。這一次我們選擇使用 mergeMap 。這個操作符非常像 switchMap ,區(qū)別在于它不會取消之前訂閱的內(nèi)部 Observable 對象,而是將內(nèi)部發(fā)出值合并到外部的 Observable 。

事實上,當(dāng)從緩存請求最新值時,HTTP 請求已經(jīng)完成,緩存已經(jīng)更新。因此,在這里我們并不會遇到競爭條件。雖然看起來像是異步的,但實際上是 同步 的,因為值在同一時間發(fā)出。

import { Subject } from 'rxjs/Subject';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

太好了!每一次“更新”,我們都會使用我們之前實現(xiàn)的輔助函數(shù)從緩存中獲取到最新值。

現(xiàn)在,為屏幕上呈現(xiàn)的笑話提供數(shù)據(jù)流只是一小步。我們還需要將初始笑話列表與 update$ 流整合起來。

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  jokes$: Observable<Array<Joke>>;
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    const initialJokes$ = this.getDataOnce();
 
    const updates$ = this.update$.pipe(
      mergeMap(() => this.getDataOnce())
    );
 
    this.jokes$ = merge(initialJokes$, updates$);
    ...
  }
  ...
}

注意,我們使用輔助函數(shù) getDataOnce() ,將每一個事件映射為一個最新的緩存值;貞浺幌,這個函數(shù)內(nèi)部會使用 take(1) 來獲取第一個值,然后結(jié)束整個流。這是至關(guān)重要的,否則的話,我們會得到一個正在運行的流,或者是直接連接到緩存。在這種情況下,我們只需點擊“更新”按鈕,就會終止強制更新 UI 的邏輯。

同時,因為底層緩存是多播的,所以重復(fù)訂閱緩存以便獲得最新值也是沒有任何問題的。

在我們繼續(xù)通知流之前,我們先暫停一下,用彈子圖看看現(xiàn)在我們實現(xiàn)了什么。

正如上面的圖中顯示的那樣, initialJokes$ 是至關(guān)重要的,因為沒有它,我們只能在點擊了“更新”之后才能在屏幕上看到內(nèi)容。雖然數(shù)據(jù)在后臺每 10 秒更新一次,但我們沒有辦法點擊按鈕。這是因為按鈕時通知的一部分,我們并沒有將其顯示給用戶。

讓我們把這個坑填完,實現(xiàn)這個謎題的缺失的部分。

為了達到這一目的,我們需要創(chuàng)建一個 Observable 對象,負責(zé)顯示或隱藏通知。本質(zhì)上,我們需要一個能發(fā)出 true 或 false 的流。當(dāng)有更新的時候,這個值應(yīng)該是 true ;當(dāng)用戶點擊“更新”按鈕時,這個值應(yīng)該是 false 。

另外,我們還得 跳過 由我們的緩存發(fā)出的第一個(初始)值,因為這不是一個刷新操作。

我們從流的角度思考這個問題,我們可以把它分解成多個流,然后再 合并 到一起,成為一個單一的可觀察的流。最終的流就是我們所需要的行為,顯示或隱藏通知。

理論已經(jīng)足夠了!現(xiàn)在是編碼:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { skip, mapTo } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const initialNotifications$ = this.jokeService.jokes.pipe(skip(1));
    const show$ = initialNotifications$.pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }
  ...
}

這里,我們監(jiān)聽從緩存發(fā)出的所有值,但是跳過第一個,因為它不是一個 刷新 操作。對 initialNotifications$ 上的每一個新值,我們將其映射為 true ,以便顯示通知。一旦點擊了通知中的“更新”按鈕, update$ 會產(chǎn)生一個值,我們將其簡單地映射為 false ,來使通知消失。

我們在 JokeListComponent 的模板中使用 showNotification$ ,通過切換 class 顯示或隱藏通知。

<div class="notification" [class.visible]="showNotification$ | async">
  ...
</div>

很好!我們已經(jīng)非常接近最終的解決方案了。但在我們繼續(xù)之前,先來嘗試下實例;c時間一步步瀏覽下代碼。

按需獲取新的數(shù)據(jù)

太棒了!經(jīng)過了這么長的道路,我們已經(jīng)為我們的緩存實現(xiàn)了很多非?岬奶匦。在結(jié)束本文之前,我們要將緩存提升到一個新的層次。現(xiàn)在還有一件事留給我們。作為一個用戶,我們想要在任意時間點強制更新。

這并不是非常復(fù)雜,但我們得同時修改組件和服務(wù)。

讓我們從服務(wù)開始。我們需要一個公開的 API,能夠強制重新加載緩存中的數(shù)據(jù)。技術(shù)上來說,我們可以 完成 當(dāng)前緩存,將其設(shè)置為 null 。這意味著下一次我們從服務(wù)器請求數(shù)據(jù)時,我們的服務(wù)會設(shè)置一個新的緩存,獲取數(shù)據(jù)并向未來的訂閱者存儲數(shù)據(jù)。當(dāng)我們強制要求更新時創(chuàng)建新的緩存并不是什么大問題,因為原來的對象會終止,然后被垃圾回收。事實上,這么做還有一個好處是,我們可以重置定時器,這也是我們所需要的。例如,我們已經(jīng)等待了 9 秒,然后點擊“獲取新笑話”。我們希望數(shù)據(jù)刷新,但是不想在 1 秒鐘之后就看到通知跳了出來。相反,我們想重新開始定時器,這樣當(dāng)我們強制更新時,就會有另外的 10 秒之后才會觸發(fā)自動更新。

銷毀緩存的另一個原因是,比起讓緩存一直存在的其它機制,這種實現(xiàn)簡單得多。如果是這樣,緩存就需要知道是否需要強制重新加載。

我們創(chuàng)建一個 Subject 對象,用它來通知緩存結(jié)束。我們將利用 takeUntil 將其提取到我們的 cache$ 流。另外,我們需要實現(xiàn)一個公共 API,其作用是將緩存設(shè)置為 null ,然后向 Subject 對象廣播一個事件。

import { Subject } from 'rxjs/Subject';
import { timer } from 'rxjs/observable/timer';
import { switchMap, shareReplay, map, takeUntil } from 'rxjs/operators';
 
const REFRESH_INTERVAL = 10000;
 
@Injectable()
export class JokeService {
  private reload$ = new Subject<void>();
  ...
 
  get jokes() {
    if (!this.cache$) {
      const timer$ = timer(0, REFRESH_INTERVAL);
 
      this.cache$ = timer$.pipe(
        switchMap(() => this.requestJokes()),
        takeUntil(this.reload$),
        shareReplay(CACHE_SIZE)
      );
    }
 
    return this.cache$;
  }
 
  forceReload() {
    // Calling next will complete the current cache instance
    this.reload$.next();
 
    // Setting the cache to null will create a new cache the
    // next time 'jokes' is called
    this.cache$ = null;
  }
 
  ...
}

這部分沒做太多工作,所以我們繼續(xù),以便將其用到 JokeListComponent 中。我們實現(xiàn)了一個函數(shù) forceReload() ,當(dāng)我們點擊了“獲取新笑話”按鈕之后會被調(diào)用。另外,我們還需要創(chuàng)建一個 Subject 對象,作為更新 UI 并且顯示通知的事件總線。我們馬上就會看到它的作用。

import { Subject } from 'rxjs/Subject';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  forceReload$ = new Subject<void>();
  ...
 
  forceReload() {
    this.jokeService.forceReload();
    this.forceReload$.next();
  }
  ...
}

在適當(dāng)?shù)奈恢蒙希覀儗?JokeListComponent 模板中的按鈕連接起來,以便強制重新加載緩存數(shù)據(jù)。我們需要做的就是使用 Angular 的事件綁定語法監(jiān)聽點擊事件,然后調(diào)用 forceReload() 。

<button class="reload-button" (click)="forceReload()" mat-raised-button color="accent">
  <div class="flex-row">
    <mat-icon>cached</mat-icon>
    FETCH NEW JOKES
  </div>
</button>

這已經(jīng)可以正常工作了,但只有當(dāng)我們回到儀表盤,再重新進入列表視圖時才是正常的。這當(dāng)然不是我們所需要的。我們想要強制更新緩存數(shù)據(jù)的時候,UI 能夠立即更新。

還記得我們實現(xiàn)了 updates$ 流,當(dāng)我們點擊“更新”時,會從緩存獲取最新的數(shù)據(jù)?我們就需要類似這種的行為,所以我們需要擴展一下這個流。這意味著,我們需要將 update$ 和 forceReload$ 合并 起來,因為這兩個流都需要更新 UI。

import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { mergeMap } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const updates$ = merge(this.update$, this.forceReload$).pipe(
      mergeMap(() => this.getDataOnce())
    );
    ...
  }
  ...
}

這是不是很簡單?是的,但我們還沒完成。事實上,我們剛剛“打斷”了我們的通知。這本可以正常工作,直到我們點擊了“獲取新笑話”。屏幕上的數(shù)據(jù)會更新,緩存中的也是,但我們等待 10 秒之后,并沒有彈出通知。問題出在強制更新緩存會完成緩存實例,這意味著組件再也不會收到值。簡單來說,通知流( initialNotifications$ )死了。這很不想,怎么解決這個問題呢?

很簡單!我們可以監(jiān)聽 forceReload$ 的事件,對其每一個值都切換到一個新的通知流。重要的是,我們需要取消之前的流的訂閱。聽起來耳熟嗎?好像我們需要 switchMap ,是不是?

讓我們動手實踐吧!

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { merge } from 'rxjs/observable/merge';
import { take, switchMap, mergeMap, skip, mapTo } from 'rxjs/operators';
 
@Component({
  ...
})
export class JokeListComponent implements OnInit {
  showNotification$: Observable<boolean>;
  update$ = new Subject<void>();
  forceReload$ = new Subject<void>();
  ...
 
  ngOnInit() {
    ...
    const reload$ = this.forceReload$.pipe(switchMap(() => this.getNotifications()));
    const initialNotifications$ = this.getNotifications();
    const show$ = merge(initialNotifications$, reload$).pipe(mapTo(true));
    const hide$ = this.update$.pipe(mapTo(false));
    this.showNotification$ = merge(show$, hide$);
  }
 
  getNotifications() {
    return this.jokeService.jokes.pipe(skip(1));
  }
  ...
}

好了。只要 forceReload$ 一發(fā)出值,我們就取消訂閱之前的 Observable ,切換到一個新的通知流。注意我們有一段代碼需要兩遍,也就是 this.jokeService.jokes.pipe(skip(1)) 。為了避免重復(fù)代碼,我們創(chuàng)建一個函數(shù) getNotifications() ,返回跳過第一個值的笑話的流。最后,我們將 initialNotifications$ 和 reload$ 合并到名為 show$ 的流。這個流負責(zé)在屏幕上顯示通知。這里并不需要取消訂閱 initialNotifications$ ,因為這個流在下一次訂閱重新創(chuàng)建緩存之前就已經(jīng)結(jié)束了。其余部分保持不變。

呼,我們完成了,F(xiàn)在花掉事件看看我們實現(xiàn)了什么。

在彈子圖中可以看到, initialNotifications$ 對顯示通知非常重要。如果我們?nèi)笔Я诉@個流,就只能在強制更新緩存之后才會看到通知。也就是說,我們按需請求新的數(shù)據(jù)時,必須不斷切換到一個新的通知流,因為之前的(舊的) Observable 對象已經(jīng)完成,不會再發(fā)出值。

大功告成!我們使用 RxJS 和 Angular 提供的工具創(chuàng)建并實現(xiàn)了一個復(fù)雜的緩存機制;仡櫼幌拢覀兊姆⻊(wù)暴露了一個笑話列表的流。底層的 HTTP 請求每 10 秒更新緩存。為了改進用戶體驗,我們顯示一個通知,以便用戶強制更新 UI。在這之上,我們還實現(xiàn)了一個允許用戶按需請求新的數(shù)據(jù)的方法。

太棒了!這就是最終的解決方案;◣追昼姍z查一下代碼,嘗試下不同的場景,看看一切是否正常。

前景

如果你想做點作業(yè),或者再多思考下,下面有幾個可以改進的地方:

  • 添加錯誤處理
  • 重構(gòu)組建中的邏輯到一個服務(wù)以便重用

特別感謝

特別感謝 Kwinten Pisman 幫助完成代碼。同樣,感謝 Ben Lesh 和 Brian Troncone 提供的寶貴反饋以及之處一些改進點。另外,感謝 Christoph Burgdorf 幫助復(fù)查文章和代碼。

 

來自:https://www.devbean.net/2018/06/advanced-caching-with-rxjs/

 

標(biāo)簽: 代碼 服務(wù)器 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:IM系統(tǒng)的MQ消息中間件選型:Kafka還是RabbitMQ?

下一篇:使用 Python 從零開始開發(fā)區(qū)塊鏈應(yīng)用程序