Fork me on GitHub

RxJs系列(二):Observable和Observer

ObservableRxJs中的核心概念,也是RxJs最为基础的构造类。我们通过将数据流封装为Observable,来进一步处理事件流程和状态分发等行为。

将值封装为Observable之后,我们就需要提供一个Observer来对值形成的数据流进行订阅,从而对每个值进行处理。

本篇博客,主要就ObservableObserver这两个概念进行一个介绍。

理解了ObservableObserver,对RxJs也就理解了一大部分,后续的深入学习也会更加的得心应手和轻松,所以我会尽可能详细的进行说明,同时也尽量保证文章的通俗易懂。

对于文中涉及到的RxJs的其他概念,如果不明白也不要紧,先跳过去,后续博客我会再进行介绍。

Ok,废话不多说。下面进入正题。

Observable,在RxJs中又被称作可观察对象。表示的是一系列值或事件的集合。这些值和事件,可以是HTTP请求,可以是DOM事件,可以是一个迭代器函数,甚至可以是一个简单的数组。归根结底的来说,其实RxJs并不是非常关心你的值是如何产生的,是同步还是异步。它更关注的部分是在这些值随着时间线需要进行哪些处理,通过关注时间线上值的状态和对时间轴的丰富操作性,自然而然的将异步消化。

Observer,又被称为观察者,用于接受经过处理后的值,就像一个数据管道的最末端。

笼统的来说,Observable就相当于一个生产者,不停的生产出新的商品,也就是我们数据流中的值。而Observer就相当于一个消费者,来接收并使用这个商品。

我们来看一个最简单的表达两者关系的一个例子:

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Observable } from 'rxjs'

const observable$ = new Observable(function subscribe(observer) {
observer.next(1)
observer.next(2)
})

const observer = {
next: (x) => console.log(x)
}

observable$.subscribe(observer)

// 输出
1
2

很多同学看到这个例子,可能会觉得似曾相识,看起来跟Promise很像啊。我们来用Promise重写一下这个例子:

JavaScript
1
2
3
4
5
6
7
8
9
const promise = new Promise(resolve => {
resolve(1)
resolve(2)
})

promise.then(x => console.log(x))

// 输出
1

看出区别了吧 ?虽然看起来形式很相近,两者也都是以推送的形式发送值,但不同的是,RxJs可以推送多个值到观察者,而Promise在推送一个值以后,就会进入resolved状态,不会再有后续值的推送了。

可能又有同学要问了,那Rxjs跟我们普通的事件广播和事件观察者有什么区别呢?主要区别是以下两点:

  1. Event 触发时,EventEmitters 会向所有观察者广播同一个副作用,而Rxjs对每个观察者的推送都是单独的,每个观察者都各自触发一个单独的副作用,就像我们调用一个函数多次一样。
  2. Event的执行与是否有观察者无关,而Observable是惰性的,只有当观察者存在时,它才会执行。

Rxjs通过连续推送和强大的操作符以及惰性机制相结合,可以动态的处理多个时间轴上的连续值,并为每个观察者提供单独的值推送,这种情况使用Promise或者eventemmiter是难以实现或者实现起来非常复杂的。

Observable的生命周期中,直观的有以下四个比较重要的核心关注点:

  • 创建
  • 订阅
  • 执行
  • 清理

我们按顺序的来说明一下这四部分的内容。

创建

在上面例子中,我们使用实例化Observable类的方式创建了一个Observable

Observable类构造函数接受一个subscribe函数作为参数,subscribe函数的参数为observer,通过observernextcompleteerror方法分别可以传递值信息,完成信息和错误信息。

我们可以调用任意次的next方法来传递多个值,但一旦调用了complete传递完成消息或者调用error传递错误消息,则意味着推送结束,Observable后续就不会再推送任何值了。

如下所示:

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const observable$ = new Observable(function subscribe(observer) {
try{
observer.next(1)
observer.complte()
} catch(err) {
observer.error(err)
}
})

const observer = {
next: (x) => console.log(x),
complte: () => console.log('complete'),
error: (err) => console.log(err)
}

observable$.subscribe(observer)

可以看出,Observable通过它的约定和实现,将我们提供的观察者对象observer的回调方法对应到了我们在构造函数中使用的observer参数的方法,通过这种方式解决了回调函数的位置造成的难以维护的问题。

当然,一般的,我们并不会通过实例化Observable类这种方式来进行Observable的创建,而是更多的通过创建操作符来进行Observable的创建,这些创建操作符内部封装了各种各样的从不同数据类型中创建Observable的方法,使我们可以方便快速的创建一个Observable

JavaScript
1
2
3
4
5
6
7
8
// 从一个数组或者类数组对象中创建一个Observable
from([1,2,3,4,5])

// 从一个给定的事件类型中创建一个Observable
fromEvent($('button'), 'click')

// 从一个Promise中创建一个Observable
from(fetch('a/b'))

还有许多的创建操作符,在这里就不一一列举了。

通过这些操作符,我们就几乎具有了从任意的数据形式中快速创建Observable的能力。

订阅

像上面的例子中一样,我们通过Observablesubscribe方法来对其进行订阅。

我们还需要向subscribe传递一个观察者对象作为参数来对接受到的值进行处理。

所谓观察者对象,也就是一个拥有nextcomplteerror方法的对象,作为Observable推送值的消费者,它这三种方法分别对应Observable发送的三种消息。

JavaScript
1
2
3
4
5
6
7
const observable$ = from([1,2,3])

observable$.subscribe({
next: (x) => console.log(x),
complete: () => console.log('complete')
error: (err) => console.error(err),
})

我们也可以只传递一个回调函数,此时Observable会使用内部方法创建一个观察者并使用此回调函数作为观察者的next方法。

JavaScript
1
observable$.subscribe(x => console.log(x))

我们也可以直接传递三个函数,这三个函数同样会被当做内部创建出的观察者的nexterrorcomplete 方法。

JavaScript
1
2
3
4
5
observable$.subscribe(
x => console.log(x),
err => console.error(err),
() => console.log('complete')
);

执行

在上面我们提到一句,Observable是惰性的,这是什么意思呢?

其实就是一句话而已,Observable只有在每个观察者订阅之后才会执行。如果没有观察者,Observable内部的代码永远不会被执行。

同时,Observable的执行可以是同步的,也可以是异步的。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const observable$ = new Observable(function subscribe(observer) {
observer.next(1)
observer.next(2)
setTimeout(() => {
observer.next(3)
}, 5000)
})

const observer = {
next: (x) => console.log(x)
}

observable$.subscribe(observer)

// 输出
1
2
3

如上所示,Observerable的内部执行是同步还是异步并无关紧要。重要的是,只有观察者存在时,它才会执行。

需要注意的是,订阅在同一个Observable的观察者之间是不共享的,每个订阅都会触发针对当前观察者的独立的设置和独立执行。这也是Rxjs与传统的事件和观察者模式之间最大的区别。

清理

对于使用创建操作符进行创建的Observable,在订阅时,会返回一个Subscription,直白的说,就是订阅关系,在Rxjs 4.0之前,也叫做disposable,翻译过来就是可清理对象

我们可以调用Subscriptionunsubscribe方法,来取消订阅,Observable内部会自动的进行相关的资源清理和关闭。

JavaScript
1
2
3
4
5
6
7
8
// 创建一个间隔一秒推送一个值的Observable
const observable$ = interval(1000)

// 订阅并得到Subscription
const subscription = observable.subscribe({(x) => console.log(x)})

// 取消订阅
subscription.unsubscribe()

Subscription还有一个add方法,用于将其他的Subscription合并进当前Subscription,当调用unsubscribe方法时,被合并进来的subscription也会被取消。

对应add还有一个remove方法,用于撤销已经添加进去的子Subscription

对于通过构造函数创建的Observable,我们需要显式的返回一个自定义函数unsubscribe来进行相应的清理执行资源和取消执行。

JavaScript
1
2
3
4
5
6
7
8
const observable$ = new Observable(observer => {
var intervalId = setInterval(() => {
observer.next('hi')
},1000)
return function unsubscribe() {
clearInterval(intervalID);
};
})

通过以上的介绍,相信大家对于Observable已经有了相对清晰和简单的认识。万事开头难,只要跨过了这个门槛,你会很快的发现RxJs特别的魅力,它是如此优雅和高效,其中蕴含的编程思想和智慧在某些时刻让人喜悦不已。

我想这大概也是计算机科学最大的魅力,一句简单的大道至简背后,是无数功力和智慧才铸就的举重若轻,实在让我这等凡夫俗子愚笨之徒艳羡。

ok,这篇博客就到这里,夜已深,不多说废话啦。多谢阅读,我们下篇博客再见^_^。

----本文结束感谢阅读----