RxJS 主题

2025年3月17日 | 阅读 7 分钟

RxJS Subject 就像一个 Observable。它是一种特殊的 Observable,允许将值多播到多个 Observer。简单来说,我们可以说 RxJS subject 是一个可以多播或与多个观察者通信的 Observable。

根据其官方定义,“Subject 就像一个 Observable,但可以多播到多个 Observer。Subjects 就像 EventEmitters:它们维护着许多监听器的注册表。”

一个 RxJS subject 可以被订阅,就像我们通常对 Observables 所做的那样。它也具有 next()、error() 和 complete() 等方法,我们在 Observable 创建函数中已经看到和使用过这些方法。

RxJS Observable 和 RxJS Subject 之间的区别

每个 Subject 都是一个 Observable。我们可以像订阅 observable 一样订阅给定的 Subject,它通常会开始接收值。从 Observer 的角度来看,无法确定 Observable 执行来自普通的单播 Observable 还是 Subject。

Observable 和 Subject 之间的主要区别在于,默认情况下,普通的 Observable 是单播的。这意味着每个订阅的 Observer 都拥有 Observable 的独立执行。另一方面,Subjects 是多播的。Subject 就像一个 Observable,但它可以多播到多个 Observer。使用 Subjects 的主要原因是进行多播。

Observable 的示例

示例 1

执行上述示例后,我们将看到以下结果。在这里,您将看到 Observables 在设计上是单播的,因此每次执行上述示例时,它们都会产生不同的随机结果。请参见以下输出

输出

RxJS Subjects

在这里,您可以看到每次我们执行程序时,两个订阅的结果都不同。如果您期望每个订阅者都收到相同的值,这不是很令人愉快。Subjects 用于克服此问题,因为 subjects 可以多播。这意味着一个 Observable 执行在多个订阅者之间共享。

Subjects 就像 EventEmitters。它们用于维护许多监听器的注册表,因此当我们调用 subject 的 subscribe 时,它不会调用新的执行。它只是在 Observers 列表中注册给定的 Observer。

使用 RxJS Subjects

让我们看看如何使用 Subjects 进行多播并克服上述问题。

示例 2

输出

执行上述示例后,我们将看到以下结果。

RxJS Subjects

在这里,您可以看到每次我们执行程序时,它都会显示不同的随机数,但是两个订阅的值是相同的。这意味着两个订阅正在获取相同的数据。

如何将 Observables 从单播转换为多播

RxJS Observables 只是数据生产者,但是 RxJS Subjects 可以用作数据生产者以及数据消费者。通过将 Subjects 用作数据消费者,我们可以使用它们将 Observables 从单播转换为多播。请参见以下示例

示例 3

输出

执行上述示例后,我们将看到以下结果。

RxJS Subjects

在这里,我们将 Subject 传递给 subscribe 函数。它获取了 Observable 的值,然后该 Subject 的所有订阅者立即接收该值。

如何创建 RxJS Subject?

让我们看看如何使用 RxJS subjects。要使用 RxJS subject,我们需要以下列方式导入 Subject

现在,使用以下方法创建一个 subject 对象

与 RxJS Observables 相同,RxJS Subject 也具有以下三个方法

  • next(v)
  • error(e)
  • complete()

如何订阅 RxJS Subject?

创建 RxJS subject 后,我们必须订阅它。Subscription 是一个用于表示可释放资源的对象,通常是 Subject 的执行。通过使用以下方法,我们可以轻松地在 Subject 上创建多个订阅

如何将数据传递给 Subject?

订阅后,我们需要将数据传递到我们创建的 subject。我们可以使用 next() 方法来做到这一点。

此数据将被传递给添加到 subject 的所有订阅。

让我们看一个完整的 RxJS Subject 示例。在这里,我们将使用以上三个方法:next(v)、error(e) 和 complete()

使用 next(v) 方法的示例

在这里,我们通过调用 new Subject() 创建了一个名为“subject_test”的对象。 之后,subject_test 对象具有对 next() 方法的引用。

输出

RxJS Subjects

使用 complete() 方法的示例

我们可以使用 complete() 方法停止 subject 的执行。请参见以下示例

输出

RxJS Subjects

使用 error() 方法的示例

让我们看看如何调用 error () 方法。

输出

RxJS Subjects

RxJS Subjects 的类型

RxJS subjects 主要有四种变体

  1. Subject - 这是标准的 RxJS Subject。它没有任何初始值或重放行为。
  2. BehaviorSubject - RxJS subject 的此变体需要一个初始值,并将当前值(上次发出的项目)发送给新的订阅者。
  3. ReplaySubject - RxJS subject 的此变体用于将指定数量的上次发出的值(重放)发送给新的订阅者。
  4. AsyncSubject - AsyncSubject 在完成后将最新的值发送给观察者。

BehaviorSubject

BehaviorSubject 用于表示“调用时的当前和最新值”。它存储发送给其消费者的最新值,并且每当有新的 Observer 订阅时,它将立即从 BehaviorSubject 接收“当前值”。

BehaviorSubjects 主要用于表示“随时间变化的值”。例如,生日的事件流是一个 Subject,但人的年龄流将是一个 BehaviorSubject。

语法

示例

输出

RxJS Subjects

ReplaySubject

ReplaySubject 与 BehaviorSubject 非常相似。它也用于将旧值发送给新的订阅者,但它也可以记录 Observable 执行的一部分。ReplaySubject 记录 Observable 执行中的多个值,并将它们重放给新的订阅者。

语法

创建 ReplaySubject 时,您可以指定要重放多少个值。请参见以下示例

示例

输出

RxJS Subjects

如何设置窗口时间?

除了缓冲区大小之外,您还可以指定一个以毫秒为单位的窗口时间,以确定记录的值可以有多旧。

请参见以下示例,我们在其中使用了 100 的大缓冲区大小,但窗口时间参数仅为 500 毫秒。

示例

输出

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 3
observerB: 4
observerA: 5
observerB: 5
observerA: 6
observerB: 6
//..... and so on.
RxJS Subjects

AsyncSubject

在这种 RxJS Subject 变体中,只有 Observable 执行的最后一个值被发送给其观察者,并且也在调用 complete() 方法之后完成。

语法

示例

输出

RxJS Subjects
下一个主题RxJS 调度器