一直以来,响应式编程都是业界讨论的热门话题之一。为了推广响应式编程,ReactiveX 社区几乎为每一种编程语言设计实现了一种对应的响应式编程框架。RxSwift 就是针对 Swift 所开发的响应式框架。
关于 RxSwift,网上有不少相关的学习资料,但绝大多数都是 RxSwift 的使用说明,鲜有文章介绍 RxSwift 背后的设计原理。通过阅读源码,查阅资料,正向设计,我逐步理解了 RxSwift 的设计思想。因此,趁热打铁,记录并总结一下我的理解。
下文我们首先介绍一下 RxSwift 中所涉及的基本概念。然后,从零开始设计并实现 RxSwift,从而逐步理解 RxSwift 的设计理念。
本文所实现的 RxSwift 代码已在 Github 开源——传送门。参照源码阅读本文效果更佳。
RxSwift 主要蕴含了以下几种设计思想:
- 发布-订阅模式
- 流编程
- 函数式编程
下面,我们依次来进行介绍。
发布-订阅模式 是 RxSwift 所呈现的一种最直观思想。发布-订阅模式可以分为两个角色:发布者、订阅者。
订阅者的主要职责是:
- 订阅:监听并处理某个事件。其本质就是向发布者注册一个处理某个事件的闭包。
- 取消(订阅)
发布者的主要职责是:
- 发布:分发某个事件

发布-订阅模式的基本原理是:
- 订阅者调用发布者提供的订阅方法进行订阅,从而在发布者内部注册订阅者。
- 发布者内部会维护一个订阅者列表。
- 当发布者发布事件时,会遍历订阅者列表,执行其中的处理方法(将事件作为参数传递给闭包,并执行)。

在日常开发中,发布-订阅模式是一种广泛被应用的设计模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件总线都是基于这种模式实现的。如下所示为 Flutter 中事件总线的一种实现方式,其中的代码逻辑基本遵循了上述所描述的发布-订阅模式的基本原理。
思考:为什么 iOS 中一个监听了某个通知的类必须要在 时要执行 方法?
流(stream) 是 RxSwift 另一个重要的设计思想。 是 Rx 框架的基础,也被称为 可观察序列。它的作用是可以异步地产生一系列数据,即一个 对象会随着时间的推移不定期地发出 。数据就像水流一样持续不断地在流动,顾名思义,这也被称为 流编程。
关于流编程,《计算机程序的构造与解释》一书中认为 流编程是一种调用驱动的编程思想。流编程的基本思想是:一般情况下,只是部分地构造出流的结构,并将这样的部分结构传给使用流的程序。如果使用者需要访问这个流中未构造出的那个部分,那么这个流就会自动地继续构造下去,但是只做出满足当时需要的那一部分。
如下所示是 RxSwift 中常见使用形式, 是数据源,不断地发出数据,如果水流一样,最终流向 中的闭包。期间会流经 , 等操作符,经过转换或过滤。这就是流编程的思想。
流编程底层实现的本质则是 闭包的延迟执行和强制执行。具体来说是基于一种称为 的特殊形式,对于 的求值不会对 求值,而是返回一个称为 延时对象 的对象。它可以看做是对未来的某个时间求值 的允诺。在各类编程语言中,返回特定类型的闭包常被用于描述一个延迟对象。与 配对的是 的过程。它以一个延时对象为参数,执行相应的求值工作,即迫使 完成其所允诺的求值。在各类编程语言中,执行返回特定类型的闭包常被用于描述 的过程。
RxSwift 提供了大量无副作用的操作符,无副作用也是函数式编程的一种重要特性。RxSwift 能够实现操作符的链式调用,一个重要的前提是:提供操作符的类型和操作符的返回类型必须保持一致。这里就涉及到了函数式编程中的一些高阶概念:函子(Functor)、适用函子(Applicative)、单子(Monad)。详细内容可参见《函数式编程——Functor、Applicative、Monad》一文。
不过,RxSwift 操作符中运用最多的是 函子(Functor),什么是函子?简而言之,函子能够将普通函数应用到一个包装类型。如下图及代码所示,一个包装类型包含了一个原始值,当函子作用在其上后,使用普通函数对原始值进行转换,最终将结果值放入包装类型中返回。

以 RxSwift 中最常用的 操作符为例,如下所示。 方法扩展自 ,其能够将普通函数 应用到 包装类型。这其实就是一种典型的 函子 应用。
下面我们将以正向设计的方式,结合 RxSwift 中的设计思想,手动实现一个 RxSwift 的核心部分。
首先,我们来定义事件,RxSwift 中有三种类型的事件,如下所示:
其次,我们来定义订阅者,在 中,我们先忽略 取消订阅 的功能,如下所示。订阅者必须遵循订阅者协议,需要实现 监听事件 的方法 。订阅者内部维护一个处理事件的闭包 。当监听事件方法触发时,会立即执行处理事件闭包。
最后,我们来定义发布者,如下所示。发布者必须遵循发布者协议,需要实现 订阅操作 的方法 。发布者内部维护一个发布事件的闭包 。当订阅发生时(订阅操作方法被执行时),会立即执行发布事件的闭包。
接下来,我们来试用一下 RxDemo-01 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式基本吻合。
下图所示为 RxDemo-01 实现的 RxSwift 的内部调用关系。通过闭包实现将 传递给 ,在发布事件时能够将事件传递给 ,从而形成一条看似自左向右流动的数据流。

RxDemo-01 有一个明显的缺陷——无法取消订阅。我们来参考 RxSwift 的实现,它 并不是直接让订阅者支持取消订阅,而是通过一个第三方类型 对订阅进行管理。 的核心作用是 提供一个状态位标识订阅是否已经取消。
关于由第三方类来管理订阅,而不是让订阅者自己管理的原因,我猜测有两个:一是出于职责单一的原则;二是为了支持函数式编程,抽取一个第三方类型作为返回值,从而在链式调用时保持类型一致。
协议要求所有的 类型都实现 方法,表示取消订阅。
这里我们定义两个遵循 协议的类型: 和 。
作为一个匿名 ,在本例中作为最底层的 并没有什么作用,只是为了实现模式统一而已。
作为一个可管理多个 的容器,它内部维持一个标志位表示订阅是否被取消。 所实现的 方法真正改变了标志位,并对其所维护的所有 执行各自的 方法,从而完成它们定义的在取消订阅时需要执行的附带操作。
很显然,这里执行 操作只是修改了状态,并没有释放订阅资源。只有当 对象被释放后才算真正释放资源。在原版 RxSwift 中, 差不多就是 。这样也是为什么我们要把订阅交给 来进行管理, 作为某个对象的属性,会随着对象的释放,从而自动释放真正的订阅资源。
为了支持 ,我们需要在 RxDemo-01 的基础上稍作修改即可,主要是修改发布者 。
接下来,我们来试用一下 RxDemo-02 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式进一步吻合。这里,我们实现了取消订阅的功能。
RxDemo-02 的核心思想是在 RxDemo-01 的基础上对事件进行拦截和过滤,如下图所示。

具体的实现方式如下所示,通过 管理 (原始 中的闭包执行后的返回类型)。同时,在执行发布事件时,使用一个中间 Observer 接收原始事件,中间 Observer 引用外部 的状态决定是否将事件发送给原始 Observer。

使用 的本质就是添加了一个中间层来解决管理订阅的问题。

在 RxDemo-02 中, 引用了外部的 中的订阅状态,从而决定事件的传递方向。这种代码逻辑由内而外实现,并不是很直观。
为了更加清晰地描述这个事件流动方向,RxDemo-03 通过增加一个中间层,将原始 observer、中间 observer、事件转发逻辑聚合在同一层级下,让代码具有更好的可读性。这里我们实现一个遵循 协议的 类型。 是 水槽 的意思,象征着这里我们通过它来控制事件的流动方向,暗示了这个类的作用。
在定义了 之后,我们就可以简化 中 方法的具体实现。
引用了原始的事件发生器,并定义一个中间 Observer 转入至原始事件发生器,从而让中间 observer 接收原始事件。除此之外, 还引用了原始 observer,当中间 observer 处理原始事件时,会判断订阅是否已经取消,从而决定是否将原始事件转发给原始 observer。此时,RxDemo-03 实现的 RxSwift 的内部调用关系如下所示。

下面,我们来实现操作符,RxSwift 中包含了大量的操作符,它们基本上都是对函数式编程中 函子、单子 等进行了应用。我们以 为例进行介绍。
如下所示, 方法能够将一个普通函数应用到包装类型 上。 方法最终返回一个 类型(同样遵循 协议),因此能够完美支持链式操作。
接下来,我们来试用一下 RxDemo-04 所实现的 RxSwift。这里,我们实现了 操作符的功能。
RxDemo-04 中由于增加了操作符功能,其内部的调用关系也发生了变化。特别是,发布事件和过滤事件的逻辑,很显然,每增加一个操作符,就会增加一个 中间层,调用栈也会更深。

如果我们仔细分析 RxDemo-04,其实我们可以发现内部隐藏了如下所示的调用关系链:observable -> MapObserver -> MapObservable -> observer。实际执行时,调用关系如下所示:
- observable 调用 MapObserver.on 方法,将原始事件传递给 MapObserver;
- MapObserver 使用 map 方法将原始事件转换成 map 事件(即 map 后的数据),作为 MapObservable 发出的事件;
- MapObservable 调用 observer.on 方法,将 map 事件传给 observer。

在 RxDemo-04 中,为了增加 操作符,对 进行了扩展,其本质就是在原始的 Observable 和 Observer 之间插入了一个 MapObserver 和一个 MapObservable。本节,我们继续进行优化,对中间类也进行细分和定义。
在 RxDemo-04 中, 的主要作用是 根据订阅是否取消决定是否拦截事件的传递。这里我们可能会想到:中间订阅者(如:MapObserver)本身是不是就应该具备 的这种功能呢?事实上,RxSwift 就是让 作为所有 Observer 的基类。
对于操作符,我们可以实现上述的模式;但是,对于不带操作符的情况,该如何处理呢?为了实现模式的统一,我们可以认为不带操作符的情况等同于带了 只返回原值的匿名操作符,即等同于 。针对此情况,我们也需要定义两个类 和 。
下面,我们来进行优化改进。
首先,我们将原来的 改成抽象基类。
其次,我们再一定一个新的类 代替 。 继承自 ,作为发布者的基类,该类内部没有时间生成器 的闭包,由子类选择性进行定义。
然后,我们来对 进行修改。在 RxDemo-04 中, 即提供了事件生成的功能和事件转发的功能。这里,我们让 的职责更加单一,仅仅是提供事件转发的功能。修改结果如下:
为了便于泛型类型的转换,我们给 协议增加一个方法,并由 予以实现。
接下来,我们来分别实现 的子类 和 以及 的子类 和 。
现在我们再来看发布者、操作符,其实两者的本质都是一样,都是创建了一个发布者。对此,我们采用扩展的方式来提供相应的方法。如下所示:
最后,我们再来验证一下实现结果,如下所示。
从运行结果而言,RxDemo-05 基本实现了分类细化,并且达到了取消订阅的功能。此时,上述例子中实际的订阅关系如下所示。相比 RxDemo-04,多了 和 ,但是整体的内部订阅关系链更加清晰了。

此时,我们再来对照一下 RxSwift 和 RxDemo-05 中类的定义,如下表所示。各个类的功能基本相同,整体结构也是大同小异,只是在类名上略有差异。
如果,我们仔细对比,可以发现 RxSwift 中 的 方法内部与 RxDemo-05 还是不太一样,前者内部还引用了一个 类。其作用是什么呢?事实上,其主要作用是管理 。细心的同学可能会发现 RxDemo-05 中有个 BUG:取消订阅时没有执行 闭包。
对此,我们也可以用类似的方式来解决,通过增加一个 类来进行管理。具体代码见:RxDemo-06。
当订阅发生时(即执行 方法时),内部会产生一个递归的控制流,如下图所示。

通过递归返回的方式构建整个订阅管理关系链,如下图所示。 是 方法最终返回的 对象。当我们对 执行 方法时,内部会递归地执行 方法,最终取消订阅链中所有的订阅。
注意:这里面会有循环引用,如 内部又引用了 ,RxDemo-06 以及 RxSwift 中的处理是给 类内部添加一个状态,表示是否已经取消了订阅,从而避免循环引用导致的循环调用。这里的循环引用并不一定是坏事,它在下述的场景下起到了非常关键的作用。

当事件中出现一个 或 事件时,由于事件会依次传递至 ,最后一次传递时,即 进行传递时,会判断是否是 或 事件,从而决定是否执行 方法。当 执行 方法时,会通过上述的循环引用,调用 执行 方法,从而实现整体取消订阅。
本文通过逐步实现 RxSwift 核心部分中的功能,一窥其背后的设计思路。从中我们也看到了其对函数式编程的应用,以及其所呈现出来的流编程模式的底层实现原理。
后续,我们将进一步探索原版 RxSwift 中其他的一些内容。
- RxSwift repo
- ReactiveX
- 函数式编程——Functor、Applicative、Monad
- 《计算机程序的构造与解释》
- Modern RxSwift Architectures
- Learn Rx by implementing Observable
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.mushiming.com/mjsbk/3991.html