概述

mergeMap、concatMap、exhaustMap和switchMap是 rxjs 中的扁平化操作符。

它们被视为转换运算符,因为它们在应用函数后将Observable对象转换为新的Observable对象。

它们帮助我们避免了必须嵌套订阅等复杂情况。例如,当用户点击提交按钮(source observable)时,会向服务器发送一个 http 请求(inner observable),然后,我们会监听响应。

一个不使用扁平化操作符的例子如下:

import { of } from "rxjs";
import { map } from "rxjs/operators";

const sourceObs = of(10);

sourceObs
  .pipe(map((value) => of(value + 10)))
  .subscribe((transObs) =>
    transObs.subscribe((transValue) => console.log(transValue))
  );

// 最终控制台会打印出20

可以看到我们需要嵌套的subscribe才能获取到最终的值。此时, 扁平化操作符便派上了用场, 上面提到的这4个扁平化操作符的主要区别在于当源 observable 持续发出新值而内部之前的订阅仍在进行中时, 这个几个操作符的应对方式各有不同。

想象一下这个用户点击了提交按钮,一个http请求被发送到服务器,此时我们还在等待服务器返回响应, 但是这个用户再一次或多次地点击了按钮。那这种情况下内部的obervable应该如何处理呢?

  1. 取消当前的订阅然后为用户新的点击开启一个新的订阅?
  2. 保持当前的订阅然后忽略用户的新点击?
  3. 保持当前的订阅然后为用户的新点击再开启一个新的订阅?

为了更好地理解, 我们来想象一个场景: 顾客在餐厅点餐, 源observable就像是顾客在持续点餐, 而内部的observable就像是厨师在接受订阅顾客的订单。

import { of, from } from "rxjs";
import { delay } from "rxjs/operators";

// 厨师做菜(内部的observable)
function prepareOrder(order) {
  // 模拟厨师做菜需要花费的时间
  const delayTime = Math.floor(Math.random() * 1000) + 1;
  return of(`我是订单: ${order}, 我在 ${delayTime}ms 后完成.`).pipe(
    delay(delayTime)
  );
}

// 顾客的订单 (外部的observable)
const orders = from(["order 1", "order 2", "order 3", "order 4", "order 5"]);

接下来让我看一看4种不同的扁平化操作符的处理方式有什么区别.

mergeMap

import { mergeMap } from "rxjs/operators";

orders
  .pipe(mergeMap((order) => prepareOrder(order)))
  .subscribe((value) => console.log(value));

结果:

我是订单: order 5, 我在 437ms 后完成.
我是订单: order 4, 我在 472ms 后完成.
我是订单: order 1, 我在 876ms 后完成.
我是订单: order 2, 我在 877ms 后完成.
我是订单: order 3, 我在 884ms 后完成.

可以看到, mergeMap并不注重订单的顺序, 让我们揪着这个厨师的鼻子问一问他是怎么想的:

我在准备订单的同时,我也在监听新的订单,当我收到一个订单时,即使当前有尚未完成的订单,我也会立即开始同时处理新订单,然后将最先完成的订单交给顾客。

可以看到, 这位用mergeMap的厨师是在同时处理多份订单,哪个先完成就先交付哪个。

注: 有的其他地方会用到flatMap, flatMap其实是mergeMap的别名, 他们是一个东西只是名字不一样, 在rxjs的源码中是这样导出的:

export { mergeMap } from '../internal/operators/mergeMap';
export { mergeMap as flatMap } from '../internal/operators/mergeMap';

concatMap

import { concatMap } from "rxjs/operators";

orders
  .pipe(concatMap((order) => prepareOrder(order)))
  .subscribe((value) => console.log(value));

结果:

我是订单: order 1, 我在 845ms 后完成.
我是订单: order 2, 我在 964ms 后完成.
我是订单: order 3, 我在 777ms 后完成.
我是订单: order 4, 我在 136ms 后完成.
我是订单: order 5, 我在 530ms 后完成.

可以看到, 结果是按照顾客点单的顺序执行的, 即使先来的订单要花费更多时间准备。 看起来这位厨师很尊重先来后到, 既然这样让我们客气的问一问这位厨师是怎么工作的:

我会按顺序接收订单。当我正在准备一个订单并且新的订单来了时,我会记下这个新订单(在缓冲区中),以便在完成当前订单后再来处理它。

exhaustMap

import { exhaustMap } from "rxjs/operators";

orders
  .pipe(exhaustMap((order) => prepareOrder(order)))
  .subscribe((value) => console.log(value));

结果:

我是订单: order 1, 我在 193ms 后完成.

这个厨师竟然只完成第一个订单, 其他4个都不管了, 可真够懒的。让我们揪着他鼻子问问怎么回事:

当我在准备一个订单时,我会在此期间忽略任何其他新订单,直到我完成当前的订单。

switchMap

import { switchMap } from "rxjs/operators";

orders
  .pipe(switchMap((order) => prepareOrder(order)))
  .subscribe((value) => console.log(value));

结果:

我是订单: order 5, 我在 497ms 后完成.

呵呵, 他只完成了最后一个订单, 看看他是怎么想的:

当我在准备订单并获得了新订单时,我会放弃当前正在处理的订单并立即开始准备新订单。

总结

如果这4个扁平化操作符是厨师的话:

  • mergeMap: 我很努力,可以同时准备多个订单!但我不注重订单顺序,哪个先完成先上哪个。
  • concatMap: 我注重订单顺序!一旦我完成我目前正在做的订单, 我就会给此订单的顾客上菜。
  • exhaustMap: 我快累死了!当我准备一个订单时,我不会接收任何其他订单。
  • switchMap: 我很喜新厌旧! 当我接收到新订单时, 我会把当前正在处理的订单扔进垃圾箱。

以上就是这4种扁平化操作符的区别, 希望你通过阅读这篇文章后, 能够在你的项目中根据场景需求不同选择使用合适的那个。