Observable
是RxJs
中的核心概念,也是RxJs
最为基础的构造类。我们通过将数据流封装为Observable
,来进一步处理事件流程和状态分发等行为。将值封装为
Observable
之后,我们就需要提供一个Observer
来对值形成的数据流进行订阅,从而对每个值进行处理。本篇博客,主要就
Observable
和Observer
这两个概念进行一个介绍。理解了
Observable
和Observer
,对RxJs
也就理解了一大部分,后续的深入学习也会更加的得心应手和轻松,所以我会尽可能详细的进行说明,同时也尽量保证文章的通俗易懂。对于文中涉及到的
RxJs
的其他概念,如果不明白也不要紧,先跳过去,后续博客我会再进行介绍。Ok,废话不多说。下面进入正题。
Observable
,在RxJs
中又被称作可观察对象。表示的是一系列值或事件的集合。这些值和事件,可以是HTTP请求,可以是DOM事件,可以是一个迭代器函数,甚至可以是一个简单的数组。归根结底的来说,其实RxJs
并不是非常关心你的值是如何产生的,是同步还是异步。它更关注的部分是在这些值随着时间线需要进行哪些处理,通过关注时间线上值的状态和对时间轴的丰富操作性,自然而然的将异步消化。
Observer
,又被称为观察者,用于接受经过处理后的值,就像一个数据管道的最末端。
笼统的来说,Observable
就相当于一个生产者,不停的生产出新的商品,也就是我们数据流中的值。而Observer
就相当于一个消费者,来接收并使用这个商品。
我们来看一个最简单的表达两者关系的一个例子:
1 | import { Observable } from 'rxjs' |
很多同学看到这个例子,可能会觉得似曾相识,看起来跟Promise
很像啊。我们来用Promise
重写一下这个例子:
1 | const promise = new Promise(resolve => { |
看出区别了吧 ?虽然看起来形式很相近,两者也都是以推送的形式发送值,但不同的是,RxJs
可以推送多个值到观察者,而Promise
在推送一个值以后,就会进入resolved
状态,不会再有后续值的推送了。
可能又有同学要问了,那Rxjs
跟我们普通的事件广播和事件观察者有什么区别呢?主要区别是以下两点:
Event
触发时,EventEmitters
会向所有观察者广播同一个副作用,而Rxjs
对每个观察者的推送都是单独的,每个观察者都各自触发一个单独的副作用,就像我们调用一个函数多次一样。Event
的执行与是否有观察者无关,而Observable
是惰性的,只有当观察者存在时,它才会执行。
Rxjs
通过连续推送和强大的操作符以及惰性机制相结合,可以动态的处理多个时间轴上的连续值,并为每个观察者提供单独的值推送,这种情况使用Promise
或者eventemmiter
是难以实现或者实现起来非常复杂的。
在Observable
的生命周期中,直观的有以下四个比较重要的核心关注点:
- 创建
- 订阅
- 执行
- 清理
我们按顺序的来说明一下这四部分的内容。
创建
在上面例子中,我们使用实例化Observable
类的方式创建了一个Observable
。
Observable
类构造函数接受一个subscribe
函数作为参数,subscribe
函数的参数为observer
,通过observer
的next
,complete
,error
方法分别可以传递值信息,完成信息和错误信息。
我们可以调用任意次的next
方法来传递多个值,但一旦调用了complete
传递完成消息或者调用error
传递错误消息,则意味着推送结束,Observable
后续就不会再推送任何值了。
如下所示:
1 | const observable$ = new Observable(function subscribe(observer) { |
可以看出,Observable
通过它的约定和实现,将我们提供的观察者对象observer
的回调方法对应到了我们在构造函数中使用的observer
参数的方法,通过这种方式解决了回调函数的位置造成的难以维护的问题。
当然,一般的,我们并不会通过实例化Observable
类这种方式来进行Observable
的创建,而是更多的通过创建操作符来进行Observable
的创建,这些创建操作符内部封装了各种各样的从不同数据类型中创建Observable
的方法,使我们可以方便快速的创建一个Observable
。
1 | // 从一个数组或者类数组对象中创建一个Observable |
还有许多的创建操作符,在这里就不一一列举了。
通过这些操作符,我们就几乎具有了从任意的数据形式中快速创建Observable
的能力。
订阅
像上面的例子中一样,我们通过Observable
的subscribe
方法来对其进行订阅。
我们还需要向subscribe
传递一个观察者对象作为参数来对接受到的值进行处理。
所谓观察者对象,也就是一个拥有next
,complte
,error
方法的对象,作为Observable
推送值的消费者,它这三种方法分别对应Observable
发送的三种消息。
1 | const observable$ = from([1,2,3]) |
我们也可以只传递一个回调函数,此时Observable
会使用内部方法创建一个观察者并使用此回调函数作为观察者的next
方法。
1 | observable$.subscribe(x => console.log(x)) |
我们也可以直接传递三个函数,这三个函数同样会被当做内部创建出的观察者的next
,error
,complete
方法。
1 | observable$.subscribe( |
执行
在上面我们提到一句,Observable
是惰性的,这是什么意思呢?
其实就是一句话而已,Observable
只有在每个观察者订阅之后才会执行。如果没有观察者,Observable
内部的代码永远不会被执行。
同时,Observable
的执行可以是同步的,也可以是异步的。
1 | const observable$ = new Observable(function subscribe(observer) { |
如上所示,Observerable
的内部执行是同步还是异步并无关紧要。重要的是,只有观察者存在时,它才会执行。
需要注意的是,订阅在同一个Observable
的观察者之间是不共享的,每个订阅都会触发针对当前观察者的独立的设置和独立执行。这也是Rxjs
与传统的事件和观察者模式之间最大的区别。
清理
对于使用创建操作符进行创建的Observable
,在订阅时,会返回一个Subscription
,直白的说,就是订阅关系,在Rxjs 4.0
之前,也叫做disposable
,翻译过来就是可清理对象。
我们可以调用Subscription
的unsubscribe
方法,来取消订阅,Observable
内部会自动的进行相关的资源清理和关闭。
1 | // 创建一个间隔一秒推送一个值的Observable |
Subscription
还有一个add
方法,用于将其他的Subscription
合并进当前Subscription
,当调用unsubscribe
方法时,被合并进来的subscription
也会被取消。
对应add
还有一个remove
方法,用于撤销已经添加进去的子Subscription
。
对于通过构造函数创建的Observable
,我们需要显式的返回一个自定义函数unsubscribe
来进行相应的清理执行资源和取消执行。
1 | const observable$ = new Observable(observer => { |
通过以上的介绍,相信大家对于Observable
已经有了相对清晰和简单的认识。万事开头难,只要跨过了这个门槛,你会很快的发现RxJs
特别的魅力,它是如此优雅和高效,其中蕴含的编程思想和智慧在某些时刻让人喜悦不已。
我想这大概也是计算机科学最大的魅力,一句简单的大道至简背后,是无数功力和智慧才铸就的举重若轻,实在让我这等凡夫俗子愚笨之徒艳羡。
ok,这篇博客就到这里,夜已深,不多说废话啦。多谢阅读,我们下篇博客再见^_^。