Fork me on GitHub

RxJs系列(一):概述

异步编程作为代码世界中一个绕不开的话题,其与人类直观的同步思维相违背的代码执行顺序再加上复杂的状态管理,异常处理,控制权交接,数据传递,条件判断,事件循环等等,经常让初入其中的程序员头昏脑涨,拐不过弯,屡屡翻车。

Js来说,因为处在浏览器这个特殊语言环境和其单用户线程的特性,异步编程更是随处可见,在Js中具有举足轻重的地位。更别说在以异步无阻塞著称的nodeJS中,更是大部分的API都是异步的。异步编程的场景越来越复杂,异步处理的机制也越来越抽象。

从最早的回调函数,到类似jQueryevent-emmiter事件广播,再到ES6中的迭代器和Promise,再到Es7中的async/await函数,我们对js中异步的处理工具也日益丰富。今天要介绍的rxjs,也是用来处理js的异步编程问题的。不过,相比上面的那些异步处理机制,rxjs的适用场景更广,也更抽象,对使用者对异步问题的归纳和梳理能力,要求也更高,当然,学习曲线也是相当陡峭。

不过话又说回来,只要认真努力坚持不懈,我还没见过有什么技术是啃不下来的。今天看不懂就明天再看一遍,明天看不懂就大后天再看一遍(中间要休息一天,劳逸结合撒….),总是会大彻大悟之类的。

好啦,不闲扯了,进入正题。

在最早的js中,我们使用回调函数来处理异步编程,但在存在条件判断和逻辑嵌套的异步场景中,回调函数可读性差,难以维护的问题(回调地狱)随之而来。

随后,又出现了事件机制,如jQuery中的on方法,js中的triggeraddEventListener等,但紧接着,我们发现,使用事件机制需要将程序改造成事件驱动,代码的运行流程会变得模糊和难以控制。

再接着,Es6GeneratorPromise出现了,通过语言特性层面的扩张,使我们对异步问题的处理更上了一个台阶,再加上async/await函数将GeneratorPromise整合在一起,大家都以为js中的异步问题终于有了终极的解决方案。
但问题总比方法多,我们很快的发现,对于复杂的异步场景,例如多个异步请求的并行,串行,合并,竞态条件的处理,防止内存泄漏等等限制下,我们仍然无法显著降低生产项目中异步处理的复杂度,仍然需要用大量的代码来兜住这些场景。

同时,我们也可以察觉到,异步的场景越来越多样,从传统的DOM Event,定时器,Ajax请求到新出现的fetchWebScoket,Service Worker等等,这些异步动作的构造代码写法也不尽相同,为实现统一的异步处理模型带来了更多的难点。

这正是Rxjs所要解决的问题。

Rxjs,具体的来说,就是ReactiveX APIjs实现。

ReactiveX,即Reactive Extensions的简写,一般我们称之为Rx,是由微软领导开发的一个异步编程模型,通过提供统一一致的编程接口,来帮助开发者更方便的处理异步数据流。它具有多种编程语言实现,例如RxJavaRx.NET等等。

当然,js的实现,就是Rxjs

在前端Angular框架的官方标配大礼包中,就包括了Rxjs,想用好Angular的同学,很有必要对它进行一些了解。当然,它的应用场景非常广泛,只要有复杂异步问题的地方,都是它大显身手的地方。同时因为在各种语言中的实现都比较类似,可以说是学会了一个,就会了所有,真正的学不了吃亏,学不了上当。。。。

在进入正题之前,首先来解决一下历史遗留问题,防止大家在对照其他教程或者文档学习时产生困惑。先说明一下,RxJs经历了多个版本的迭代,引入和API调用方式都产生了比较大的变化。目前市面上很多教程和文章都是基于旧版本,大部分是RxJs v5写就的。

本系列基于RxJs V6最新版本,所以大家看到和其他地方不一样的实例代码时,不必疑惑,两边都没错。(话说我怎么又想起了学习python时关于python 2python 3的痛苦回忆。。。)

RxJs v5 和 RxJs v6

RxJs经历了多个版本迭代,库的代码组织形式和结构也几经改变,尤其以V6版本在类和方法的结构改动上变化最大。

首先,是以下这些操作符,为了兼容Es标准,避免与语言自身关键字冲突,而进行了替换:

  1. do -> tap
  2. catch -> catchError
  3. switch -> switchAll
  4. throw -> throwError
  5. finally -> finalize

还删除了create操作符,统一使用Observable构造函数代替。

另外比较明显的升级之后的不同之处的是操作符在V6以前的版本,它们看起来都更像是Observable类的静态方法和实例方法,而在V6中,为了更具有适应性,同时也更易于测试和使用,这些操作符都陆续被独立了出来,作为单纯的函数进行了重新的模块划分。

因此我们的导入方式需要跟着有一些变化。

还有比较重要的一点是,在RxJs v5.5之后,又引入了pipe管道操作符,改变了我们链式调用操作符的方式,不过这也只是操作符调用形式上的一点变化,不影响其本质。

来看一个例子,在v5.0中,我们在模块化使用RxJs时,通常用下面这种形式导入和使用Observable类和相关操作符:

JavaScript
1
2
3
4
5
6
7
8
9
10
// RxJs v5.0

import { Observable } from 'rxjs/Observable'
import 'rxjs/add/observable/interval'
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/delay'

cosnt stream$ = Observable.interval(1000).map(x => console.log(x * 10)).delay(5000)

stream$.subscribe(console.log)

可以看到,在V5版本中,我们是通过这种链式调用的方式来进行操作符的使用。

V6版本中,我们只需要直接引入创建操作符和其它的操作符,并通过pipe操作符来管道式的使用操作符:

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// RxJs v6.0

// 原来作为Observable的静态方法的操作符从rxjs中直接引入
import { interval } from 'rxjs'

// 原来作为Observable的实例方法的操作符从rxjs/oberators中引入
import { map, delay } from 'rxjs/oberators'

const stream$ = interval().pipe(
map(x => console.log(x * 10)),
delay(5000)
)

stream$.subscribe(console.log)

中间另外还有几个版本,例如v5.5v5.6,引入方式还会各自有一些不同,但通常我们都不会遇到。

在这一系列文章中,我会直接使用RxJs V6版本来进行讲解。因此很多例子可能看起来与其他的很多V5教程不太一样,不过也不需要疑惑,大部分地方,还是很容易就明白V5V6的不同之处的,另外,毕竟买新不买旧嘛,在没有历史包袱的情况下,当然要直接上新的啦。

安装和引入

在文章开始,首先来说一下Rxjs的安装和引入,像其他的npm package一样,Rxjs的安装和引入也十分简单。
执行下面命令即可安装。

Shell
1
npm install rxjs

下面是引入RxJs中的基础构造类及操作符的代码。在实际使用中,我们只需要按需引入即可。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
// 创建操作符和基础类
import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';

// 实例操作符
import { map, filter, scan } from 'rxjs/operators';

// `webSocket
import { webSocket } from 'rxjs/webSocket';

// ajax
import { ajax } from 'rxjs/ajax';

基础应用

Rxjs中,所有的数据都被视为stream,也就是数据流。我们通过对数据流的订阅,来对其进行查询,过滤,归并等操作。
本质来说,数据流,对应的就是迭代器模式,订阅,显而易见对应的观察者模式,再通过函数式编程型的过滤归并查找删除。根据上面这段话,我们就可以对Rxjs下定义了:

一个结合了观察者模式,迭代器模式和函数式编程的异步事件管理库。

Rxjs以流的形式来处理数据,这个数据是抽象意义的,可以是ajax请求,可以是DOM Event,也可以是个普通数组。
数据流是通过Observable可观察对象来体现的,我们通过对Observable进行subscribe来获取它其中的数据。
我们以一个简单的输入框事件来举例Rxjs的应用:

JavaScript
1
2
3
4
5
6
7
8
import { fromEvent } from 'rxjs'
import { map } from 'rxjs/operators'

var button = document.querySelector('input')

fromEvent(input, 'input').pipe(
map(event => event.target.value)
).subscribe(value => console.log(value))

在上面这个例子中,我们通过fromEvent这个创建操作符方法生成了一个Observable,并通过map操作符获取到了输入的值,并随之subscribe了这个Observable,在每次输入事件发生时打印出输入的值。
你可能会觉得,这也没什么神奇的嘛,不就是我们平常普通的事件回调换了一个写法嘛。
的确,在普通的单个异步事件处理中,Rxjs并没有什么特别的地方。别忘了我们前面提到的,Rxjs以流的形式处理数据,对于单次的事件来说,并不能形成时间线维度上流的概念,所以Rxjs并没有可以大展身手的地方

现在我们来更进一步,我们想实现一个比较常见的需求,在输入停止两秒后才进行响应,防止用户在输入过程中不停触发响应造成的性能浪费。
如果是普通的回调函数形式,我们可能需要自己来写一个debounce函数或者引入提供此功能的相关类库,来对用于响应的回调函数再封装一层从而实现延时响应。
但在Rxjs中,我们非常容易的通过一个操作符就可以实现这一需求。

JavaScript
1
2
3
4
fromEvent(input, 'input').pipe(
debounceTime(200),
map(event => event.target.value)
).subscribe(value => console.log(value))

通过debounceTime操作符,我们顺利的实现了这个功能。
你可能觉得,这也只是一个小小的功能嘛,有什么神奇的。
那现在我们又来了一个需求,我们需要对输入的字符进行去重操作。此时如果你还是普通的回调形式来处理事件的话,可能回调函数已经嵌套了三层了。
但是使用Rxjs,非常简单的再加一个操作符:

JavaScript
1
2
3
4
5
fromEvent(input, 'input').pipe(
debounceTime(200),
distinct(),
map(event => event.target.value)
).subscribe(value => console.log(value))

是不是觉得Rxjs有点厉害了,但其实处理输入事件这种东西,还只是Rxjs功能的冰山一角而已。

我们来一个更复杂的需求,举例来说,我们有api/aapi/b,api/c三个AJAX请求,现在我们需要它们串行的发出,也就是前一个结束再发出下一个。
如果使用普通的回调函数形式,我们要这样写:

JavaScript
1
2
3
4
5
6
7
ajax('api/a', () => {
ajax('api/b', () => {
ajax('api/c', () => {
console.log('complete')
}
}
})

显而易见,在实际项目中处理请求结果的逻辑会相当复杂,这样一层套一层太恐怖了。比如现在我们需要在api/bapi/c请求之间再加一个api/o请求,你看着一堆叠罗汉一样的代码,肯定心里想的是,甘霖娘***这可咋整啊(小孩子不可以讲脏话哦)。

有的同学可能会说,用Promise不就得了。那我们就用Promise形式再重构一下上面这个回调地狱。

JavaScript
1
2
3
4
5
6
7
ajax('api/a').then(_ => {
return ajax('api/b')
}).then(_ => {
return ajax('api/c')
}).then(_ => {
console.log('complete')
})

可以看出,使用Promise大大提升了我们代码的可维护性,但是连续的then链也不是那么美观和易读。而且,它也并不是完美的。举例来说,现在我们突然接到需求,需要将三个串行的请求改成并行的,你会想,改成Promise.all不就行了。

但要明白的是,在实际项目中,我们每个请求的处理逻辑可不单单是这几行的代码量。你需要将每个请求的代码组合拼装成一个数组传递给Promise.all。面对着一两百行的代码,你心里又开始想了。好嘛!这可咋整。
那如果使用Rxjs是怎么解决的呢?看代码:

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { concat, merge } from 'rxjs'
import { ajax } from 'rxjs/ajax'

const ob1$ = ajax('api/a')
const ob2$ = ajax('api/b')
const ob3$ = ajax('api/c')

const obs = [ob1$, ob2$, ob3$]

//串行
concat(...obs).subscribe(() => {})

//轻松的改成并行
merge(...obs).subscribe(() => {})

是不是非常优雅,非常清晰,非常NB?这就是Rxjs的强大之处,它通过引入Js中比较少见的流(stream)这个概念,将异步,队列,回调等等完美的归纳融合在一起形成了一个统一的异步事件处理模型。

当然,极高的抽象程度也意味者陡峭的学习曲线。不过不用担心,只要理解了Rxjs的思想,具体的细节语法,还是十分的统一和易懂的。

PS: 一般的,为了易于区分Observable对象,我们都会在表示Observable的变量结尾加上一个$符号。这也是使用RxJs中大家约定俗成的一个惯例,在这里提一下,防止有的同学看到一堆以$结束的变量一脸懵逼。

OK,通过上面两个RxjsDOM事件和HTTP请求中的两个小场景介绍,大家应该在心里对Rxjs有了一个基本的概念和印象。其实这还不是RxJs真正大展身手的地方,当你需要在点击事件之后触发一个请求,取消之前发送但未返回的旧请求,当你想将两个请求的结果组合再作为参数发起另一个请求,或者当你想将某个状态广播给所有消费者但又想保证后续新加入的消费者不会错过以前的消息等等极其复杂和难以操控的场景,才是RxJs作为一个模型真正要解决和简化的问题。

当然,作为Rxjs系列博客的第一篇,主要想让大家对Rxjs的相关背景和一些基础的适用场景有一个大致了解,不会去直接探究和深入它的具体细节。

关于Rxjs是如何在代码层面实现这些看起来非常酷炫的展现出极高的抽象色彩的功能的?带着好奇心的学习总是会带来极高的效率。

我将会在下一篇博客中,重点介绍Rxjs的核心概念————Observable, 就是通过它,我们将所有的数据都转换为数据流的形式,从而优雅高效的以完全同步编程的方式来处理异步,极大简化了在时间线上错综复杂纠缠不清令人头疼的异步编程问题。

ok,谢谢阅读,我们下篇博客再见^-^。

----本文结束感谢阅读----