Observable是RxJs中的核心概念,也是RxJs最为基础的构造类。我们通过将数据流封装为Observable,来进一步处理事件流程和状态分发等行为。将值封装为
Observable之后,我们就需要提供一个Observer来对值形成的数据流进行订阅,从而对每个值进行处理。本篇博客,主要就
Observable和Observer这两个概念进行一个介绍。理解了
Observable和Observer,对RxJs也就理解了一大部分,后续的深入学习也会更加的得心应手和轻松,所以我会尽可能详细的进行说明,同时也尽量保证文章的通俗易懂。对于文中涉及到的
RxJs的其他概念,如果不明白也不要紧,先跳过去,后续博客我会再进行介绍。Ok,废话不多说。下面进入正题。
Observable,在RxJs中又被称作可观察对象。表示的是一系列值或事件的集合。这些值和事件,可以是HTTP请求,可以是DOM事件,可以是一个迭代器函数,甚至可以是一个简单的数组。归根结底的来说,其实RxJs并不是非常关心你的值是如何产生的,是同步还是异步。它更关注的部分是在这些值随着时间线需要进行哪些处理,通过关注时间线上值的状态和对时间轴的丰富操作性,自然而然的将异步消化。
Observer,又被称为观察者,用于接受经过处理后的值,就像一个数据管道的最末端。
笼统的来说,Observable就相当于一个生产者,不停的生产出新的商品,也就是我们数据流中的值。而Observer就相当于一个消费者,来接收并使用这个商品。
我们来看一个最简单的表达两者关系的一个例子:
1 | import { Observable } from 'rxjs' |
很多同学看到这个例子,可能会觉得似曾相识,看起来跟Promise很像啊。我们来用Promise重写一下这个例子:
1 | const promise = new Promise(resolve => { |
看出区别了吧 ?虽然看起来形式很相近,两者也都是以推送的形式发送值,但不同的是,RxJs可以推送多个值到观察者,而Promise在推送一个值以后,就会进入resolved状态,不会再有后续值的推送了。
可能又有同学要问了,那Rxjs跟我们普通的事件广播和事件观察者有什么区别呢?主要区别是以下两点:
Event触发时,EventEmitters会向所有观察者广播同一个副作用,而Rxjs对每个观察者的推送都是单独的,每个观察者都各自触发一个单独的副作用,就像我们调用一个函数多次一样。Event的执行与是否有观察者无关,而Observable是惰性的,只有当观察者存在时,它才会执行。
Rxjs通过连续推送和强大的操作符以及惰性机制相结合,可以动态的处理多个时间轴上的连续值,并为每个观察者提供单独的值推送,这种情况使用Promise或者eventemmiter是难以实现或者实现起来非常复杂的。
在Observable的生命周期中,直观的有以下四个比较重要的核心关注点:
- 创建
- 订阅
- 执行
- 清理
我们按顺序的来说明一下这四部分的内容。
创建
在上面例子中,我们使用实例化Observable类的方式创建了一个Observable。
Observable类构造函数接受一个subscribe函数作为参数,subscribe函数的参数为observer,通过observer的next,complete,error方法分别可以传递值信息,完成信息和错误信息。
我们可以调用任意次的next方法来传递多个值,但一旦调用了complete传递完成消息或者调用error传递错误消息,则意味着推送结束,Observable后续就不会再推送任何值了。
如下所示:
1 | const observable$ = new Observable(function subscribe(observer) { |
可以看出,Observable通过它的约定和实现,将我们提供的观察者对象observer的回调方法对应到了我们在构造函数中使用的observer参数的方法,通过这种方式解决了回调函数的位置造成的难以维护的问题。
当然,一般的,我们并不会通过实例化Observable类这种方式来进行Observable的创建,而是更多的通过创建操作符来进行Observable的创建,这些创建操作符内部封装了各种各样的从不同数据类型中创建Observable的方法,使我们可以方便快速的创建一个Observable。
1 | // 从一个数组或者类数组对象中创建一个Observable |
还有许多的创建操作符,在这里就不一一列举了。
通过这些操作符,我们就几乎具有了从任意的数据形式中快速创建Observable的能力。
订阅
像上面的例子中一样,我们通过Observable的subscribe方法来对其进行订阅。
我们还需要向subscribe传递一个观察者对象作为参数来对接受到的值进行处理。
所谓观察者对象,也就是一个拥有next,complte,error方法的对象,作为Observable推送值的消费者,它这三种方法分别对应Observable发送的三种消息。
1 | const observable$ = from([1,2,3]) |
我们也可以只传递一个回调函数,此时Observable会使用内部方法创建一个观察者并使用此回调函数作为观察者的next方法。
1 | observable$.subscribe(x => console.log(x)) |
我们也可以直接传递三个函数,这三个函数同样会被当做内部创建出的观察者的next,error,complete 方法。
1 | observable$.subscribe( |
执行
在上面我们提到一句,Observable是惰性的,这是什么意思呢?
其实就是一句话而已,Observable只有在每个观察者订阅之后才会执行。如果没有观察者,Observable内部的代码永远不会被执行。
同时,Observable的执行可以是同步的,也可以是异步的。
1 | const observable$ = new Observable(function subscribe(observer) { |
如上所示,Observerable的内部执行是同步还是异步并无关紧要。重要的是,只有观察者存在时,它才会执行。
需要注意的是,订阅在同一个Observable的观察者之间是不共享的,每个订阅都会触发针对当前观察者的独立的设置和独立执行。这也是Rxjs与传统的事件和观察者模式之间最大的区别。
清理
对于使用创建操作符进行创建的Observable,在订阅时,会返回一个Subscription,直白的说,就是订阅关系,在Rxjs 4.0之前,也叫做disposable,翻译过来就是可清理对象。
我们可以调用Subscription的unsubscribe方法,来取消订阅,Observable内部会自动的进行相关的资源清理和关闭。
1 | // 创建一个间隔一秒推送一个值的Observable |
Subscription还有一个add方法,用于将其他的Subscription合并进当前Subscription,当调用unsubscribe方法时,被合并进来的subscription也会被取消。
对应add还有一个remove方法,用于撤销已经添加进去的子Subscription。
对于通过构造函数创建的Observable,我们需要显式的返回一个自定义函数unsubscribe来进行相应的清理执行资源和取消执行。
1 | const observable$ = new Observable(observer => { |
通过以上的介绍,相信大家对于Observable已经有了相对清晰和简单的认识。万事开头难,只要跨过了这个门槛,你会很快的发现RxJs特别的魅力,它是如此优雅和高效,其中蕴含的编程思想和智慧在某些时刻让人喜悦不已。
我想这大概也是计算机科学最大的魅力,一句简单的大道至简背后,是无数功力和智慧才铸就的举重若轻,实在让我这等凡夫俗子愚笨之徒艳羡。
ok,这篇博客就到这里,夜已深,不多说废话啦。多谢阅读,我们下篇博客再见^_^。
