rxjs, redux-observable 入门

ep5OIJ.png

# 前言

RxJS 是使用 Observables 的响应式编程的库,它使编写异步或基于回调的代码更容易。这个项目是 Reactive-Extensions/RxJS(RxJS 4) 的重写,具有更好的性能、更好的模块性、更好的可调试调用堆栈,同时保持大部分向后兼容,只有一些破坏性的变更(breaking changes)是为了减少外层的 API,官方文档称是能优雅地解决异步问题的 lodash。

粗略了解了一下感觉说 rxjs 是解决异步问题的 loadash 不太准确,因为 async.js 才是解决异步问题的 lodash。rxjs 更像一种状态管理库。不过并没有热门的 rxjs-react 之类的库实现对 React 组件的状态注入。倒是 redux 的中间件 - redux-observable 跟 rxjs 联系较紧密。

# Observable - 创建可观察对象

create

import { Observable } from 'rxjs'
const hello = Observable.create(function (observer) {
  observer.next('Hello')
  observer.next('World')
})
// 输出: 'Hello'...'World'
const subscribe = hello.subscribe((val) => console.log(val))

这个 create 是比较难理解的一点。因为通常的数据观察库像 redux, mobx, dobjs, 还有 vue 里面的双向绑定创建的往往是一个可观察对象。

以 mobx 为例:

import { observable, computed, action, reaction } from 'mobx'
const store = new (class {
  @observable store = {
    count: 0
  }
  @computed get count() {
    return this.store.count
  }
  @action addcount = () => {
    this.store.count++
  }
})()
reaction(
  () => store.count,
  (count, reaction) => {
    ref.current && (ref.current.innerHTML = count)
    // reaction.dispose() // 只响应一次
  }
)
const addcount = () => {
  store.addcount()
}
// ...

然而 RxJS 主张流式数据和链式操作,因此和常规方法不太相同。上述 create 的写法类似迭代器一样,那么它的可观察性体现在哪里呢?常规的观察者模式实现方法有 Object.definePropertyProxy,显然 RxJS 两者都不是。

官方的解释是 RxJS 实现了一种数据推送机制。什么是推送? - 在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。

RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者)。

subscribe

observable.subscribe((x) => console.log(x))
setTimeout(() => {
  observable.subscribe((x) => console.log(x))
}, 3000)
// 输出 hello world
// 3s 后输出 hellow world

unsubscribe

subscribe 方法返回一个 subscription,subscription 上面有个 unsubscribe 方法,用于取消订阅

var observable = Rx.Observable.from([10, 20, 30])
var subscription = observable.subscribe((x) => console.log(x))
// 稍后:
subscription.unsubscribe()

Observable 是惰性运算,只有在每个观察者订阅后才会执行

Observer

RxJS 里面观察者即消费者

const observer = {
  next: (x) => console.log('Observer got a next value: ' + x),
  error: (err) => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification')
}
observable.subscribe(observer)

subject

Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

Subject 是 Observer,它可以被 Subscribe。

Subject 可能是 RxJS 中最难理解的概念了,因为 Subject 既可以作为一个可观察对象(Observable)去订阅其它 Observer,也可以作为一个 Observer 被 Subscribe。

var subject = new Rx.Subject()

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
})

subject.next(1)
subject.next(2)

BehaviorSubject

有初始值的 Subject,官方解释是:

BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

简直了,就不能给 Subject 加个初始值么 🤦‍,非要强加一个概念。

# Operators - 操作符

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

# 关于流

计算机中的流分为二进制流和文本流

  • 文本流是解释性的,最长可达 255 个字符,其中回车/换行将被转换为换行符“\n”,(如果以"文本"方式打开一个文件,那么在读字符的时候,系统会把所有的"\r\n"序列转成"\n",在写入时把"\n"转成"\r\n")。注: \n 一般会操作系统被翻译成"行的结束",即 LF(Line-Feed); \r 会被翻译成"回车",即 CR(Cariage-Return)
  • 而二进制流是非解释性的,一次处理一个字符,并且不转换字符。

流的特性

  • 多个流可合并成一个流
  • 一个流可拆分成多个流
  • 可在管道中处理

RxJS 中的流

  • 只是引用了流的概念,并非二进制流和文本流