Fork me on GitHub

RxJs系列(三):创建操作符

在上一篇关于RxJs的博客中有提到除了实例化Observable来创建一个Observable外,还有一些创建操作符,也能够帮助我们简单快速的创建Observable

我们在日常的开发使用中,也是更多的使用这些操作符来进行Rxjs的创建,只有很少情况下,我们才会去自己定义和处理Observable的内部构造和实现。

这篇博客,就来介绍一下与Rxjs的创建有关的几个操作符。

Rxjs还不是非常了解的同学,可以到本系列的第一篇第二篇先大致了解一下。

在第二篇关于Observable的介绍中,我们是这样创建一个Observable实例的。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 单纯推送值
const observable$ = new Observable(function subscribe(observer) {
observer.next(1)
observer.next(2)
})

// 包含完成标志和错误捕捉
const observable$ = new Observable(function subscribe(observer) {
try{
observer.next(1)
observer.complte()
} catch(err) {
observer.error(err)
}
})

同时我们也提到这种构造形式和Promise非常像。但我们仔细思考一下,就会发现问题。对于Promise来说,它推送一次值之后就会进入resolved状态,它的这种形态是与单次异步操作完美契合的,因此它的内部构造逻辑不会十分的复杂。但Observable就不是这样了,它可以任意次的推送值,可以包裹任意多个异步操作(甚至是异步同步混杂的操作),如果我们再通过这种方式来进行实例构造的话,将会是一个十分麻烦的事情。

也正是因为这样,Rxjs提供了很多的创建操作符(也可以称它们为Observalbe的工厂函数),来让我们能够从各种形式的数据,以各种形式创建Rxjs

比较常用的操作符按创建来源可以分为以下几类:

    • of
    • range
    • from
  • 事件
    • fromEvent
    • fromEventPattern
  • webSocket
    • webSocket
  • 定时器
    • interval
    • timer
  • 特殊值
    • empty
    • never
    • throw

另外,本篇博客我们主要来讲解一下上面提到的这些从其他形式的数据中创建Observable的操作符,另外还有一些操作符,它们可以从存在的Observable中组合或者合并出新的Observable来,例如concat,merge等。本篇博客就不多说它们了,在后面博客中我们再有选择性的详细介绍。

下面我们就来逐一介绍上面提到的这些Observable创建操作符。

of

of创建操作符是一个相当简单的操作符,它接受多个参数,然后再按照接收的参数顺序依次发出这些参数值。当它接收的所有参数都发送完毕后,它会再发出一个完成通知。

JavaScript
1
2
3
4
5
const ob$ = of(1, 2, 3)

ob$.subscribe(x => console.log(x))

// 输出: 1, 2, 3

当然,of可以发送所有形式的Js值,例如数组,对象,函数,undefined值,null值等等。

JavaScript
1
2
3
4
5
const ob$ = of(undefined, ()=>{}, null, {name: 'age'}, [1, 2, 3])

ob$.subscribe(x => console.log(x))

// 输出: undefined, ()=>{}, null, {name: 'age'}, [1, 2, 3]

of操作符默认是同步发出它接收的所有值,后面我们会知道可以通过调度器Scheduler来设置它是同步还是异步,但现在我们可以暂时认为它总是同步的。

range

range操作符也相当简单易懂,它接受一个开始值数字和个数数字作为参数,依次发出开始值后的个数个数字。(包含开始值)

JavaScript
1
2
3
4
5
const ob$ = range(1, 10)

ob$.subscribe(x => console.log(x))

// 输出: 1,2,3,4,5,6,7,8,9,10

from

from操作符就不简单了,它是一个相当强大的操作符,换句话说,它几乎可以把任何东西转化为Observable

具体的,from操作符可以将字符串,数组,类数组对象,Promise,迭代器对象转化为Observable

字符串

首先我们来看字符串,当from接受一个字符串时,它会把其当成一个字符串数组。如下:

JavaScript
1
2
3
4
5
const ob$ = from('hello afei')

ob$.subscribe(x => console.log(x))

// 输出: 'h','e','l','l','o','','a','f','e','i'

数组

对于数组,我们应该很容易想到from的行为,它创建一个依次返回数组元素的Observable

JavaScript
1
2
3
4
5
const ob$ = from('hello afei')

ob$.subscribe(x => console.log(x))

//输出: 10, 20, 'afei'

类数组对象

类数组对象,是Js中一种特殊形式的对象,它跟数组一样具有length属性,但没有其他的数组方法,例如push,map等等。

常见的类数组对象有函数的arguments对象,HTMLCollectionNodeList等。

JavaScript
1
2
3
4
5
6
7
var arrLike = { '0': 'a', '1': 'b', '2': 'c', 'length': 3 }

const ob$ = from(arrLike)

ob$.subscribe(x => console.log(x))

// 输出: a, b, c

Promise

from操作符可以将Promise转换为一个Observable,这个Observable返回Promise的结果。

JavaScript
1
2
3
4
5
6
7
const promise = new Promise(resolve => { resolve('hello, world') })

const ob$ = from(promise)

ob$.subscribe(x => console.log(x))

// 输出: 'hello, world'

可迭代对象

可迭代对象是一个复杂的概念,笼统的来说,在Js中实现了[Symbol.iterator]的对象都是可迭代对象,表示一组可迭代的值。具体的,例如ArrayString, 函数的arguments对象,NodeList对象,Es6中的Set,Map对象等,都在内部实现了[Symbol.iterator]方法,所以都是可迭代对象。

from操作符通过内部处理,依次返回可迭代对象的迭代值。

我们看一个from将可迭代的Set集合转换为Observable的例子。

JavaScript
1
2
3
4
5
6
7
8
9
10
const aset = new Set()
aset.add(1)
aset.add(2)
aset.add(3)

const ob$ = from(aset)

ob$.subscribe(x => console.log(x))

// 输出: 1, 2, 3

从这里也可以看出,我们上面提到的from能将字符串转换为Observable,也是将字符串作为可迭代对象进行的。

fromEvent

通过fromEvent操作符,我们可以将一个事件转换为Observable。此处的事件,既可以是客户端的DOM事件,也可以是服务端NodeEventEmitter事件。

来看一个例子:将DOM document的点击事件转换为Observable

JavaScript
1
2
3
4
5
const documentClicked$ = fromEvent(document, 'click')

documentClick$.subscribe(e => console.log(e));

// 每次点击docuemnt时,都会在控制台上输出点击事件的 Event

具体的,fromEvent操作符可以接受两个参数,如下:

JavaScript
1
2
3
4
5
fromEvent(target, eventName)

// target 是事件目标,也就是触发事件的实体,可以是DOM元素,HTMLCollection,Node.js的 EventEmitter等。

// EventName 是事件目标发出的事件名称

当我们订阅(subscribe)Observable时,RxJs会自动的将我们的事件监听挂载。当我们取消订阅(unsubscribe)时,它同样会为我们清理事件监听。这种机制可以有效的防止我们忘记取消事件监听导致的内存泄漏问题,在对事件进行复杂处理时,可以更专注于事件流程本身。

fromEventHandler

fromEventHandler操作符,可以接受更纯粹更具有自定义性的事件处理器方法,将事件处理器转换为Observable,相比fromEvent,它更为底层。

通常的,一个事件处理器具有一个addHandler方法用来添加事件的观察者,一个removeHandler用来移除事件的观察者。如下所示是一个最简单的事件处理器类(为了少写一点代码,此处我们使用Es6语法)

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class EventProducer {
constructor() {
this.listener = []
}

// 添加观察者
addEventListener(listener) {
if (typeof listener !== 'function') {
throw('listener not a function')
}
this.listener.push(listener)
}

// 移除观察者
removeEventListener(listener) {
const index = this.listener.indexOf(listener)
this.listener.splice(index, 1)
}

// 通知所有观察者
notify(message) {
this.listeners.forEach(listener => {
listener(message)
})
}
}

然后我们实例化一个事件处理器来使用这个事件处理器类。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
const myEventProducer = new EventProducer()

// 添加一个观察者到事件处理器
const handler = msg => console.log(msg)
myEventProducer.addEventListener(handler)

// 发送通知
myEventProducer.notify('event happen')
// 输出: event happen

// 移除观察者
myEventProducer.removeEventListener(handler)

常用的DOM元素的addEventListenerremoveEventListener就是一个比较典型的事件处理器。

接着回来说fromEvent操作符,它可以接收两个参数:

  1. addHandler 事件处理器的添加观察者方法
  2. removeHnadler 事件处理器的移除观察者方法

对于上面的事件处理器的例子,我们可以这样将其转换为Observable

JavaScript
1
2
3
4
5
6
7
8
9
10
const ob$ = fromEventPattern(
myEventProducer.addListener.bind(myEventProducer),
myEventProducer.removeListener.bind(myEventProducer)
)
// 通过bind绑定事件处理器实例,防止方法被作为函数传入导致的内部this指针错误。

ob$.subscribe(x => console.log(x))

ob$.notify('ob event')
// 输出: ob event

可以看作fromEventPattern将我们的订阅转换成了事件处理器的观察者,从而使得我们的代码更清晰。

webSocket

这个操作符可以将一个webSocket封装为了一个Subject(关于Subject,是RxJs中除了Observable之外的又一个核心概念,在后续的博客中我们会详细介绍,现在可以先理解为它是一个可以被订阅的Observable,同时也是一个可以接收推送的Observer)。

通过这层封装,我们使用起webSocket来将会非常的方便和简洁。

先来看一下我们平常使用原生webSocket的常见流程写法。

JavaScript
1
2
3
4
5
6
7
8
9
const ws = new WebSocket("wss://echo.websocket.org")

ws.onopen = function(msg){...}
ws.onmessage = function(msg){...}
ws.onerror = function(err){...}
ws.onclose = funtion(msg){...}

ws.send('some send msg')
ws.close()

看起来流程并不复杂,但需要考虑到在现在前端普遍的模块和组件化开发,我们通常需要将webSocket的连接封装在一个组件文件或抽离到一个模块中去,然后在其他组件中通过webSocket与服务端进行通信。

由此带来的问题就是在封装webSocket的组件或模块中我们要进行webSocket的连接,消息接收处理,在其他的组件里我们要进行消息的消费和发送。webSocket的生命周期被我们分隔到了不同的文件中,为此我们可能需要自己去进行跨组件的事件通知和消息传递,这大大增加了我们使用webSocket的复杂度。

思考一下,我们要在ws.openws.onmessage时在另一个文件里进行对应响应,要怎么做?只需要引入ws对象这么简单吗?如何在其他文件中即刻的判断ws的状态呢?如何保证文件的打包和加载顺序呢?如果我们有多个子模块中都需要用到这个webSocket呢?

下面来看一下使用RxJswebSocket进行包装之后的处理。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
const ws$ = webSocket('wss://echo.websocket.org')

// 订阅webSocket转换为的Subject,也即相当于开启了webSocket连接
ws$.subscribe(
msg => console.log(msg), //
err => console.log(err),
_ => console.log('complete, ws closed')
)

// 向服务端发送消息
ws$.next(JSON.stringify({msg: 'hello'}))

可以看到webSocket接收一个连接URL作为参数,通过将webSocket连接转换为Subject来抽象它,代码十分的简洁优雅,我们可以直接通过ws$来完美的承接webSocket所需的操作,在需要使用webSocket时直接订阅,不需要时直接取消订阅即可,招之即来,挥之即去,不必在各个使用到的地方都进行零碎的生命周期处理和事件传递接收。

interval

interval操作符主要用来创建一个持续的间隔一定时间发送一次值的Observable。只要知道JS中的setInterval,就不难理解interval操作符是做什么的。

首先我们自己写一个间隔一定时间发出一个值的Observable

JavaScript
1
2
3
4
5
6
7
8
9
10
const ob$ = new Observable(observer => {
let count = 0
// 每秒发出一个递增的数字值
setInterval(_ => observer.next(count++), 1000)
})

ob$.subscribe(x => console.log(x))

// 输出: 0, 1, 2, 3, 4......
// 每个值间隔1s

使用interval操作符我们可以这样写:

JavaScript
1
2
3
4
5
6
const ob$ = interval(1000)

ob$.subscribe(x => console.log(x))

// 同样输出: 0, 1, 2, 3, 4......
// 每个值间隔1s

interval操作符接收一个表示间隔时间(以ms计)的数字值作为参数,间隔此时间,持续发送一个从0开始无限递增的整数。

interval在被订阅后,并不会立即发出第一个值,而是在第一个间隔时间段过去后才开始发送。

也就是说,上面例子中的 0 是在第 1s 时发出的。

这和setInterval的表现是一致的。

timer

timer操作符类似于上面的interval操作符,可以根据一个固定的间隔来发送递增的整数序列。但timer操作符还可以定义第一次发送值之前的延时。也就是说,你可以指定什么时候开始发送值。

它接受两个参数,第一个参数用来指定初始延时,第二个参数指定开始发送值之后每个值的间隔时间。

来看一个例子,我们在5秒后开始,每隔一秒发送一个自增的数字。

JavaScript
1
2
3
4
5
const ob$ = timer(5000, 1000)

ob$.subscribe(x => console.log(x))

// 五秒后,每隔1s输出:0, 1, 2,3, 4, 5....

另外,当timer操作符只被传递了一个参数时,则只会在此参数对应时间后返回一个0,类似 JS 中的setTimeout

JavaScript
1
2
3
4
5
const ob$ = timer(5000)

ob$.subscribe(x => console.log(x))

// 五秒后输出:0

timer操作符第一个参数除了是一个数字之外,还可以是一个Date对象,表示开始推送值的具体时间。这在某些时候会很有用。

empty

empty用于创建一个空的Observable,什么是空的Observable,就是当你订阅这个Observable时,它不会返回任何数据,而是直接发出complete表示Observable已完成。

JavaScript
1
2
3
4
5
6
7
8
9
const emptyOb$ = empty()

emptyOb$.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: _ => console.log('complete')
})

// 输出: complete

通俗的来讲,empty创建一个什么也不做的Observable,但它会告诉你它什么都没做(怎么这么不要脸这个操作符)

这个操作符创建的Observable一般会跟其他Observable通过各种操作符组合来发挥作用,例如switchMapmergeMap等,这个我们留到以后再细说。

另外,我们还可以通过 RxJs v6版本新加入的EMPTY常量来直接得到一个空的Observable

1
const emptyOb$ = EMPTY

throwError

empty操作符创建的Observable是在订阅后立即发出一个complete消息,而throwError操作符创建的Observable则是在被订阅后立即发出一个错误通知。

它可以接受一个错误对象作为参数,并在其创建的Observable被订阅后立即发出这个错误。

empty类似,它创建的Observable通常也是用来测试或与其它Observable组合使用。

JavaScript
1
2
3
4
5
6
7
const errorOb$ = throwError(new Error('attention, error'))

errorOb$.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: _ => console.log('complete')
})

never

never操作符,用来创建一个永远不结束的操作符,也就是never end。不过,虽然它永远不结束,但它也永远不会发出值。换句话说,观察者的nexterrorcomplet三个函数,它永远一个也不会去触发。我们订阅它后,什么事也不会发生。

JavaScript
1
2
3
4
5
6
7
8
9
const neverOb$ = never()

neverOb$.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: _ => console.log('complete')
})

// 输出: 并不会有什么输出....

另外有一点需要注意,因为这个Observable永远也不会结束,也意味着永远都不会被清理。所以我们在必要的时候,要记得去手动清理掉它,防止它真的无限的持续下去。

还记得我们上篇博客说的吗?当订阅一个Observable时,会返回一个订阅关系Subscription,调用Subscriptionunsubscribe即可清理Observalbe

JavaScript
1
2
3
4
5
6
7
8
9
10
const neverOb$ = Rx.Observable.never()

const subscription = neverOb$.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: _ => console.log('complete')
})

// 清理
subscription.unsubscribe()

另外,同样的我们可以使用 RxJs v6 版本新加入的NEVER常量来直接得到一个无限持续的Observable

Ok,常用的RxJs创建操作符,大概就这些了,让我们就在这里结束吧。

感谢阅读,我们下篇博客见。

----本文结束感谢阅读----