在上一篇关于
RxJs
的博客中有提到除了实例化Observable
来创建一个Observable
外,还有一些创建操作符,也能够帮助我们简单快速的创建Observable
。我们在日常的开发使用中,也是更多的使用这些操作符来进行
Rxjs
的创建,只有很少情况下,我们才会去自己定义和处理Observable
的内部构造和实现。这篇博客,就来介绍一下与
Rxjs
的创建有关的几个操作符。
在第二篇关于Observable
的介绍中,我们是这样创建一个Observable
实例的。
1 | // 单纯推送值 |
同时我们也提到这种构造形式和Promise
非常像。但我们仔细思考一下,就会发现问题。对于Promise
来说,它推送一次值之后就会进入resolved
状态,它的这种形态是与单次异步操作完美契合的,因此它的内部构造逻辑不会十分的复杂。但Observable
就不是这样了,它可以任意次的推送值,可以包裹任意多个异步操作(甚至是异步同步混杂的操作),如果我们再通过这种方式来进行实例构造的话,将会是一个十分麻烦的事情。
也正是因为这样,Rxjs
提供了很多的创建操作符(也可以称它们为Observalbe
的工厂函数),来让我们能够从各种形式的数据,以各种形式创建Rxjs
。
比较常用的操作符按创建来源可以分为以下几类:
- 值
of
range
from
- 事件
fromEvent
fromEventPattern
- webSocket
webSocket
- 定时器
interval
timer
- 特殊值
empty
never
throw
另外,本篇博客我们主要来讲解一下上面提到的这些从其他形式的数据中创建Observable
的操作符,另外还有一些操作符,它们可以从存在的Observable
中组合或者合并出新的Observable
来,例如concat
,merge
等。本篇博客就不多说它们了,在后面博客中我们再有选择性的详细介绍。
下面我们就来逐一介绍上面提到的这些Observable
创建操作符。
of
of
创建操作符是一个相当简单的操作符,它接受多个参数,然后再按照接收的参数顺序依次发出这些参数值。当它接收的所有参数都发送完毕后,它会再发出一个完成通知。
1 | const ob$ = of(1, 2, 3) |
当然,of
可以发送所有形式的Js
值,例如数组,对象,函数,undefined
值,null
值等等。
1 | const ob$ = of(undefined, ()=>{}, null, {name: 'age'}, [1, 2, 3]) |
of
操作符默认是同步发出它接收的所有值,后面我们会知道可以通过调度器Scheduler
来设置它是同步还是异步,但现在我们可以暂时认为它总是同步的。
range
range
操作符也相当简单易懂,它接受一个开始值数字和个数数字作为参数,依次发出开始值后的个数个数字。(包含开始值)
1 | const ob$ = range(1, 10) |
from
from
操作符就不简单了,它是一个相当强大的操作符,换句话说,它几乎可以把任何东西转化为Observable
。
具体的,from
操作符可以将字符串,数组,类数组对象,Promise
,迭代器对象转化为Observable
。
字符串
首先我们来看字符串,当from
接受一个字符串时,它会把其当成一个字符串数组。如下:
1 | const ob$ = from('hello afei') |
数组
对于数组,我们应该很容易想到from
的行为,它创建一个依次返回数组元素的Observable
。
1 | const ob$ = from('hello afei') |
类数组对象
类数组对象,是Js
中一种特殊形式的对象,它跟数组一样具有length
属性,但没有其他的数组方法,例如push
,map
等等。
常见的类数组对象有函数的arguments
对象,HTMLCollection
,NodeList
等。
1 | var arrLike = { '0': 'a', '1': 'b', '2': 'c', 'length': 3 } |
Promise
from
操作符可以将Promise
转换为一个Observable
,这个Observable
返回Promise
的结果。
1 | const promise = new Promise(resolve => { resolve('hello, world') }) |
可迭代对象
可迭代对象是一个复杂的概念,笼统的来说,在Js
中实现了[Symbol.iterator]
的对象都是可迭代对象,表示一组可迭代的值。具体的,例如Array
,String
, 函数的arguments
对象,NodeList
对象,Es6中的Set
,Map
对象等,都在内部实现了[Symbol.iterator]
方法,所以都是可迭代对象。
from
操作符通过内部处理,依次返回可迭代对象的迭代值。
我们看一个from
将可迭代的Set
集合转换为Observable
的例子。
1 | const aset = new Set() |
从这里也可以看出,我们上面提到的
from
能将字符串转换为Observable
,也是将字符串作为可迭代对象进行的。
fromEvent
通过fromEvent
操作符,我们可以将一个事件转换为Observable
。此处的事件,既可以是客户端的DOM
事件,也可以是服务端Node
的EventEmitter
事件。
来看一个例子:将DOM document
的点击事件转换为Observable
。
1 | const documentClicked$ = fromEvent(document, 'click') |
具体的,fromEvent
操作符可以接受两个参数,如下:
1 | fromEvent(target, eventName) |
当我们订阅(subscribe)
Observable
时,RxJs
会自动的将我们的事件监听挂载。当我们取消订阅(unsubscribe)时,它同样会为我们清理事件监听。这种机制可以有效的防止我们忘记取消事件监听导致的内存泄漏问题,在对事件进行复杂处理时,可以更专注于事件流程本身。
fromEventHandler
fromEventHandler
操作符,可以接受更纯粹更具有自定义性的事件处理器方法,将事件处理器转换为Observable
,相比fromEvent
,它更为底层。
通常的,一个事件处理器具有一个addHandler
方法用来添加事件的观察者,一个removeHandler
用来移除事件的观察者。如下所示是一个最简单的事件处理器类(为了少写一点代码,此处我们使用Es6语法)
1 | class EventProducer { |
然后我们实例化一个事件处理器来使用这个事件处理器类。
1 | const myEventProducer = new EventProducer() |
常用的DOM
元素的addEventListener
和removeEventListener
就是一个比较典型的事件处理器。
接着回来说fromEvent
操作符,它可以接收两个参数:
addHandler
事件处理器的添加观察者方法removeHnadler
事件处理器的移除观察者方法
对于上面的事件处理器的例子,我们可以这样将其转换为Observable
1 | const ob$ = fromEventPattern( |
可以看作fromEventPattern
将我们的订阅转换成了事件处理器的观察者,从而使得我们的代码更清晰。
webSocket
这个操作符可以将一个webSocket
封装为了一个Subject
(关于Subject
,是RxJs
中除了Observable
之外的又一个核心概念,在后续的博客中我们会详细介绍,现在可以先理解为它是一个可以被订阅的Observable
,同时也是一个可以接收推送的Observer
)。
通过这层封装,我们使用起webSocket
来将会非常的方便和简洁。
先来看一下我们平常使用原生webSocket
的常见流程写法。
1 | const ws = new WebSocket("wss://echo.websocket.org") |
看起来流程并不复杂,但需要考虑到在现在前端普遍的模块和组件化开发,我们通常需要将webSocket
的连接封装在一个组件文件或抽离到一个模块中去,然后在其他组件中通过webSocket
与服务端进行通信。
由此带来的问题就是在封装webSocket
的组件或模块中我们要进行webSocket
的连接,消息接收处理,在其他的组件里我们要进行消息的消费和发送。webSocket
的生命周期被我们分隔到了不同的文件中,为此我们可能需要自己去进行跨组件的事件通知和消息传递,这大大增加了我们使用webSocket
的复杂度。
思考一下,我们要在ws.open
和ws.onmessage
时在另一个文件里进行对应响应,要怎么做?只需要引入ws
对象这么简单吗?如何在其他文件中即刻的判断ws
的状态呢?如何保证文件的打包和加载顺序呢?如果我们有多个子模块中都需要用到这个webSocket
呢?
下面来看一下使用RxJs
对webSocket
进行包装之后的处理。
1 | const ws$ = webSocket('wss://echo.websocket.org') |
可以看到webSocket
接收一个连接URL作为参数,通过将webSocket
连接转换为Subject
来抽象它,代码十分的简洁优雅,我们可以直接通过ws$
来完美的承接webSocket
所需的操作,在需要使用webSocket
时直接订阅,不需要时直接取消订阅即可,招之即来,挥之即去,不必在各个使用到的地方都进行零碎的生命周期处理和事件传递接收。
interval
interval
操作符主要用来创建一个持续的间隔一定时间发送一次值的Observable
。只要知道JS中的setInterval
,就不难理解interval
操作符是做什么的。
首先我们自己写一个间隔一定时间发出一个值的Observable
。
1 | const ob$ = new Observable(observer => { |
使用interval
操作符我们可以这样写:
1 | const ob$ = interval(1000) |
interval
操作符接收一个表示间隔时间(以ms计)的数字值作为参数,间隔此时间,持续发送一个从0开始无限递增的整数。
interval
在被订阅后,并不会立即发出第一个值,而是在第一个间隔时间段过去后才开始发送。也就是说,上面例子中的 0 是在第 1s 时发出的。
这和
setInterval
的表现是一致的。
timer
timer
操作符类似于上面的interval
操作符,可以根据一个固定的间隔来发送递增的整数序列。但timer
操作符还可以定义第一次发送值之前的延时。也就是说,你可以指定什么时候开始发送值。
它接受两个参数,第一个参数用来指定初始延时,第二个参数指定开始发送值之后每个值的间隔时间。
来看一个例子,我们在5秒后开始,每隔一秒发送一个自增的数字。
1 | const ob$ = timer(5000, 1000) |
另外,当timer
操作符只被传递了一个参数时,则只会在此参数对应时间后返回一个0,类似 JS 中的setTimeout
。
1 | const ob$ = timer(5000) |
timer
操作符第一个参数除了是一个数字之外,还可以是一个Date
对象,表示开始推送值的具体时间。这在某些时候会很有用。
empty
empty
用于创建一个空的Observable
,什么是空的Observable
,就是当你订阅这个Observable
时,它不会返回任何数据,而是直接发出complete
表示Observable
已完成。
1 | const emptyOb$ = empty() |
通俗的来讲,empty
创建一个什么也不做的Observable
,但它会告诉你它什么都没做(怎么这么不要脸这个操作符)
这个操作符创建的Observable
一般会跟其他Observable
通过各种操作符组合来发挥作用,例如switchMap
,mergeMap
等,这个我们留到以后再细说。
另外,我们还可以通过 RxJs v6版本新加入的EMPTY
常量来直接得到一个空的Observable
。
1 | const emptyOb$ = EMPTY |
throwError
empty
操作符创建的Observable
是在订阅后立即发出一个complete
消息,而throwError
操作符创建的Observable
则是在被订阅后立即发出一个错误通知。
它可以接受一个错误对象作为参数,并在其创建的Observable
被订阅后立即发出这个错误。
与empty
类似,它创建的Observable
通常也是用来测试或与其它Observable
组合使用。
1 | const errorOb$ = throwError(new Error('attention, error')) |
never
never
操作符,用来创建一个永远不结束的操作符,也就是never end
。不过,虽然它永远不结束,但它也永远不会发出值。换句话说,观察者的next
,error
,complet
三个函数,它永远一个也不会去触发。我们订阅它后,什么事也不会发生。
1 | const neverOb$ = never() |
另外有一点需要注意,因为这个Observable
永远也不会结束,也意味着永远都不会被清理。所以我们在必要的时候,要记得去手动清理掉它,防止它真的无限的持续下去。
还记得我们上篇博客说的吗?当订阅一个Observable
时,会返回一个订阅关系Subscription
,调用Subscription
的unsubscribe
即可清理Observalbe
1 | const neverOb$ = Rx.Observable.never() |
另外,同样的我们可以使用 RxJs v6 版本新加入的NEVER
常量来直接得到一个无限持续的Observable
。
Ok,常用的RxJs
创建操作符,大概就这些了,让我们就在这里结束吧。
感谢阅读,我们下篇博客见。