*Redis Streams 介绍
Stream 是 Redis 5.0 引入的全新数据类型,它以更抽象的方式对 日志数据结构 进行建模,然而日志的本质仍然保持不变:就像日志文件一样,通常以只追加模式打开的文件来实现,Redis Stream 主要是一种只追加的数据结构。至少在概念上如此,因为作为在内存中表示的抽象数据类型,Redis Stream 实现了更强大的操作,以克服日志文件本身的限制。
尽管数据结构本身相当简单,但使 Redis Stream 成为 Redis 中最复杂类型的是它实现了额外的、非强制性的功能:一组阻塞操作,允许消费者等待生产者添加到流中的新数据,以及一个称为 消费者组 的概念。
消费者组最初由流行的消息系统 Kafka (TM) 引入。Redis 以完全不同的术语重新实现了类似的想法,但目标是相同的:允许一组客户端协作消费同一消息流的不同部分。
*Stream 基础
为了理解 Redis Stream 是什么以及如何使用它们,我们将忽略所有高级功能,而专注于数据结构本身,即用于操作和访问它的命令。这基本上是与大多数其他 Redis 数据类型(如列表、集合、有序集合等)共有的部分。然而,请注意,列表也有一个可选的更复杂的阻塞 API,由 BLPOP 等命令导出。因此,Stream 在这方面与列表没有太大不同,只是额外的 API 更复杂、更强大。
由于 Stream 是一种只追加的数据结构,称为 XADD 的基本写入命令将新条目追加到指定的流中。流条目不仅仅是字符串,而是由一或多个字段-值对组成。这样,流的每个条目已经是结构化的,就像以 CSV 格式写入的只追加文件一样,每行都有多个分隔的字段。
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
上面调用 XADD 命令将条目 sensor-id: 1234, temperature: 19.8 添加到键 mystream 的流中,使用自动生成的条目 ID,即命令返回的 1518951480106-0。它的第一个参数是键名 mystream,第二个参数是标识流中每个条目的条目 ID。然而,在这种情况下,我们传递了 *,因为我们希望服务器为我们生成一个新的 ID。每个新 ID 将单调递增,因此更简单地说,每个添加的新条目将比所有过去的条目具有更高的 ID。由服务器自动生成 ID 几乎总是你想要的,显式指定 ID 的原因非常罕见。我们稍后会更多地讨论这个问题。每个 Stream 条目都有一个 ID 这一事实是它与日志文件的另一个相似之处,在日志文件中,行号或文件内的字节偏移量可用于标识给定条目。回到我们的 XADD 示例,在键名和 ID 之后,下一个参数是组成我们流条目的字段-值对。
仅使用 XLEN 命令就可以获取流中的项目数量:
> XLEN mystream
(integer) 1
*条目 ID
XADD 命令返回的条目 ID 唯一标识给定流中的每个条目,由两部分组成:
<毫秒时间>-<序列号>
毫秒时间部分实际上是生成流 ID 的本地 Redis 节点的本地时间,但是如果当前毫秒时间恰好小于前一个条目时间,则使用前一个条目时间,因此如果时钟向后跳转,单调递增 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号是 64 位宽的,实际上在同一毫秒内可以生成的条目数量没有限制。
这种 ID 的格式乍一看可能很奇怪,细心的读者可能会想为什么时间是 ID 的一部分。原因是 Redis Stream 支持按 ID 进行范围查询。由于 ID 与条目生成的时间相关,这赋予了基本上免费地按时间范围进行查询的能力。我们在介绍 XRANGE 命令时很快就会看到这一点。
如果由于某种原因,用户需要与时间无关的递增 ID,但实际上与另一个外部系统 ID 相关联,正如前面已经观察到的,XADD 命令可以接受显式 ID 而不是触发自动生成的 * 通配符 ID,如以下示例所示:
> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
请注意,在这种情况下,最小 ID 是 0-1,并且命令不会接受等于或小于前一个 ID 的 ID:
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
*从 Stream 获取数据
现在我们终于能够通过 XADD 在流中追加条目了。然而,虽然向流追加数据相当明显,但查询流以提取数据的方式并不明显。如果我们继续与日志文件进行类比,一种明显的方法是模仿我们通常使用 Unix 命令 tail -f 所做的事情,也就是说,我们可以开始监听以获取追加到流的新消息。请注意,与 Redis 的阻塞列表操作不同,在阻塞列表操作中,给定元素将到达单个客户端,该客户端在类似 弹出 的操作(如 BLPOP)中阻塞,使用流时,我们希望多个消费者可以看到追加到流的新消息,就像许多 tail -f 进程可以看到添加到日志的内容一样。使用传统术语,我们希望流能够将消息 扇出 给多个客户端。
然而,这只是一个潜在的访问模式。我们还可以以相当不同的方式看待流:不是作为消息传递系统,而是作为 时间序列存储。在这种情况下,也许获取追加的新消息也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量方式检查所有历史记录。这绝对是另一种有用的访问模式。
最后,如果我们从消费者的角度看待流,我们可能希望以另一种方式访问流,即作为可以分区给多个正在处理此类消息的消费者的消息流,这样消费者组只能看到到达单个流的消息子集。通过这种方式,可以在不同的消费者之间扩展消息处理,而无需单个消费者处理所有消息:每个消费者只会获得要处理的不同消息。这基本上是 Kafka (TM) 对消费者组所做的。通过消费者组读取消息是从 Redis Stream 读取的另一种有趣模式。
Redis Stream 通过不同的命令支持上述所有三种查询模式。接下来的部分将展示所有这些模式,从最简单、最直接使用的开始:范围查询。
*范围查询:XRANGE 和 XREVRANGE
要通过范围查询流,我们只需要指定两个 ID:起始 和 结束。返回的范围将包含具有起始或结束 ID 的元素,因此范围是包含的。两个特殊 ID - 和 + 分别表示可能的最小和最大 ID。
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
返回的每个条目都是两个项目的数组:ID 和字段-值对列表。我们已经说过,条目 ID 与时间有关系,因为 - 字符左侧的部分是创建流条目的本地节点创建条目时的 Unix 时间(以毫秒为单位)(但是请注意,流以完全指定的 XADD 命令复制,因此副本将具有与主节点相同的 ID)。这意味着我可以使用 XRANGE 查询时间范围。为了做到这一点,然而,我可能希望省略 ID 的序列部分:如果省略,在范围的起始处将假定为 0,而在结束部分将假定为可用的最大序列号。这样,仅使用两个毫秒 Unix 时间进行查询,我们就能获得在该时间范围内生成的所有条目,以包含方式。例如,我可能想要查询两毫秒的时间段:
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
我在这个范围内只有一个条目,然而在真实的数据集中,我可以查询数小时的范围,或者仅在两毫秒内就可能有多个项目,返回的结果可能非常大。因此,XRANGE 支持末尾的可选 COUNT 选项。通过指定计数,我可以只获取前 N 个项目。如果我想要更多,我可以获取最后返回的 ID,将序列部分增加一,然后再次查询。让我们在以下示例中看看这一点。我们开始使用 XADD 添加 10 个项目(我不会展示这一点,已经假设流 mystream 已填充了 10 个项目)。为了开始我的迭代,每次命令获取 2 个项目,我从完整的范围开始,但计数为 2。
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
为了继续迭代接下来的两个项目,我必须选择最后返回的 ID,即 1519073279157-0,并将 ID 的序列号部分加 1。请注意,序列号是 64 位的,因此不需要检查溢出。结果 ID,即本例中的 1519073279157-1,现在可以用作下一次 XRANGE 调用的新 起始 参数:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
依此类推。由于 XRANGE 的复杂度是用于查找的 O(log(N)),然后是返回 M 个元素的 O(M),使用较小的计数,该命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此 XRANGE 也是事实上的 流迭代器,不需要 XSCAN 命令。
XREVRANGE 命令相当于 XRANGE,但以相反顺序返回元素,因此 XREVRANGE 的一个实际用途是检查流中的最后一项:
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
请注意,XREVRANGE 命令以相反的顺序接受 起始 和 结束 参数。
*使用 XREAD 监听新项目
当我们不想通过范围访问流中的项目时,通常我们想要的是 订阅 到达流的新项目。这个概念可能看起来与 Redis Pub/Sub 有关,在 Redis Pub/Sub 中,你订阅一个频道,或者与 Redis 阻塞列表有关,在 Redis 阻塞列表中,你等待键获得新元素以获取,但是消费流的方式存在根本差异:
- 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目将传递给在给定流中等待数据的所有消费者。这种行为与阻塞列表不同,在阻塞列表中,每个消费者将获得不同的元素。然而,向多个消费者 扇出 的能力类似于 Pub/Sub。
- 虽然在 Pub/Sub 中消息是 即发即弃 的,并且无论如何都不会存储,而在使用阻塞列表时,当客户端收到消息时,它会被 弹出(实际上被移除)从列表中,流的工作方式根本不同。所有消息都无限期地追加在流中(除非用户显式要求删除条目):不同的消费者将通过记住收到的最后一条消息的 ID 来知道从其角度来看什么是新消息。
- Stream 消费者组提供了一种 Pub/Sub 或阻塞列表无法实现的控制级别,同一流有不同的组,显式确认已处理的项目,检查待处理项目的能力,认领未处理的消息,以及对每个单个客户端一致的历史可见性,该客户端只能看到其自己的私有消息历史。
提供监听到达流的新消息能力的命令称为 XREAD。它比 XRANGE 稍微复杂一些,因此我们将从展示简单形式开始,稍后提供完整的命令布局。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
上面是 XREAD 的非阻塞形式。请注意,COUNT 选项不是强制性的,实际上命令的唯一强制选项是 STREAMS 选项,它指定一个键列表以及调用消费者已经为每个流看到的相应最大 ID,以便命令仅向客户端提供 ID 大于我们指定的消息。
在上面的命令中,我们写了 STREAMS mystream 0,因此我们想要流 mystream 中 ID 大于 0-0 的所有消息。正如你在上面的示例中看到的,命令返回键名,因为实际上可以使用多个键调用此命令以同时从不同的流读取。我可以写,例如:STREAMS mystream otherstream 0 0。请注意,在 STREAMS 选项之后,我们需要提供键名,然后是 ID。因此,STREAMS 选项必须始终是最后一个选项。
除了 XREAD 可以一次访问多个流,以及我们能够指定我们拥有的最后 ID 以仅获取新消息之外,在这种简单形式中,该命令与 XRANGE 做的事情并没有太大不同。然而,有趣的部分是我们可以很容易地将 XREAD 变成一个 阻塞命令,通过指定 BLOCK 参数:
> XREAD BLOCK 0 STREAMS mystream $
请注意,在上面的示例中,除了删除 COUNT 之外,我还指定了新的 BLOCK 选项,超时时间为 0 毫秒(这意味着永不超时)。此外,我没有为流 mystream 传递普通 ID,而是传递了特殊 ID $。这个特殊 ID 意味着 XREAD 应该使用流 mystream 中已存储的最大 ID 作为最后 ID,因此我们将只接收 新 消息,从我们开始监听的时间开始。这在某种程度上类似于 Unix 命令 tail -f。
请注意,当使用 BLOCK 选项时,我们不必使用特殊 ID $。我们可以使用任何有效的 ID。如果命令能够立即服务我们的请求而不阻塞,它就会这样做,否则它会阻塞。通常,如果我们想从新条目开始消费流,我们从 ID $ 开始,之后我们继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。
XREAD 的阻塞形式也能够监听多个流,只需指定多个键名。如果请求可以同步服务,因为至少有一个流的元素大于我们指定的相应 ID,它将返回结果。否则,命令将阻塞,并返回获得新数据的第一个流的元素(根据指定的 ID)。
与阻塞列表操作类似,从等待数据的客户端的角度来看,阻塞流读取是 公平 的,因为语义是 FIFO 风格的。第一个为给定流阻塞的客户端将是新项目可用时第一个被解除阻塞的客户端。
XREAD 除了 COUNT 和 BLOCK 之外没有其他选项,因此它是一个具有特定目的的基本命令,用于将消费者附加到一个或多个流。消费流的更强大功能可通过消费者组 API 获得,然而通过消费者组读取由另一个名为 XREADGROUP 的命令实现,在本指南的下一节中介绍。
*消费者组
当任务是从不同的客户端消费同一流时,XREAD 已经提供了一种向 N 个客户端 扇出 的方法,可能还使用副本以提供更多的读取可扩展性。然而,在某些问题中,我们想做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一流的 不同子集 的消息。一个明显有用的案例是处理消息缓慢的情况:拥有 N 个不同的工作者,它们将接收流的不同部分,使我们能够通过将不同的消息路由到准备做更多工作的不同工作者来扩展消息处理。
实际上,如果我们想象有三个消费者 C1、C2、C3,以及一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望按以下图表提供服务:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了获得这种效果,Redis 使用了一个称为 消费者组 的概念。非常重要的是要理解,Redis 消费者组从实现的角度来看与 Kafka (TM) 消费者组没有任何关系,但它们仅在实现的概念上相似,因此我决定不改变与最初推广这种想法的软件产品相比的术语。
消费者组就像一个 伪消费者,它从流中获取数据,并实际服务于多个消费者,提供某些保证:
- 每条消息都服务于不同的消费者,因此不可能将同一条消息传递给多个消费者。
- 消费者组内的消费者由名称标识,名称是客户端实现消费者必须选择的区分大小写的字符串。这意味着即使断开连接后,流消费者组也会保留所有状态,因为客户端将再次声明自己是同一消费者。然而,这也意味着由客户端提供唯一标识符。
- 每个消费者组都有 第一个从未消费过的 ID 的概念,因此,当消费者请求新消息时,它可以只提供以前从未交付过的消息。
- 消费消息需要使用特定命令进行显式确认,以说明:此消息已正确处理,因此可以从消费者组中删除。
- 消费者组跟踪所有当前待处理的消息,即已交付给消费者组中的某个消费者但尚未确认为已处理的消息。多亏了此功能,当访问流的消息历史时,每个消费者 只会看到已交付给它的消息。
在某种程度上,消费者组可以被想象为关于流的 一定量的状态:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
如果你从这个角度来看,就很容易理解消费者组可以做什么,它如何能够为消费者提供其待处理消息的历史,以及请求新消息的消费者将如何只获得 ID 大于 last_delivered_id 的消息。同时,如果你将消费者组视为 Redis Stream 的辅助数据结构,很明显单个流可以有多个消费者组,它们有一组不同的消费者。实际上,甚至同一流也可以有客户端通过 XREAD 不使用消费者组进行读取,以及在不同消费者组中通过 XREADGROUP 进行读取的客户端。
现在是时候放大查看基本的消费者组命令了,它们是以下命令:
- XGROUP 用于创建、销毁和管理消费者组。
- XREADGROUP 用于通过消费者组从流中读取。
- XACK 是允许消费者将待处理消息标记为正确处理的命令。
*创建消费者组
假设我已经有一个类型为流的现有键 mystream,为了创建一个消费者组,我只需要执行以下操作:
> XGROUP CREATE mystream mygroup $
OK
正如你在上面的命令中看到的,当创建消费者组时,我们必须指定一个 ID,在示例中就是 $。这是必需的,因为消费者组除了其他状态外,还必须知道在第一个消费者连接时接下来要提供什么消息,也就是说,当组刚刚创建时,当前的 最后一条消息 ID 是什么?如果我们像上面那样提供 $,那么从现在开始到达流的新消息才会提供给组中的消费者。如果我们改为指定 0,消费者组将消费流历史中的 所有 消息以开始。当然,你可以指定任何其他有效的 ID。你知道的是,消费者组将开始提供大于你指定的 ID 的消息。因为 $ 表示流中当前最大的 ID,指定 $ 将产生只消费新消息的效果。
XGROUP CREATE 还支持在流不存在时自动创建流,使用可选的 MKSTREAM 子命令作为最后一个参数:
> XGROUP CREATE newstream mygroup $ MKSTREAM
OK
现在消费者组已经创建,我们可以立即开始尝试通过消费者组读取消息,方法是使用 XREADGROUP 命令。我们将从称为 Alice 和 Bob 的消费者那里读取,以查看系统将如何向 Alice 和 Bob 返回不同的消息。
XREADGROUP 与 XREAD 非常相似,并提供相同的 BLOCK 选项,否则它是一个同步命令。然而有一个 强制 选项必须始终指定,即 GROUP,它有两个参数:消费者组的名称和尝试读取的消费者的名称。COUNT 选项也受支持,与 XREAD 中的相同。
在从流中读取之前,让我们在里面放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
注意:这里 message 是字段名,水果是关联的值,请记住流条目是小字典。
是时候尝试使用消费者组读取一些东西了:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
XREADGROUP 回复与 XREAD 回复完全相同。然而请注意上面提供的 GROUP <group-name> <consumer-name>,它表明我想使用消费者组 mygroup 从流中读取,我是消费者 Alice。每次消费者使用消费者组执行操作时,它必须在组内唯一标识此消费者的名称。
命令行中还有另一个非常重要的细节,在强制性的 STREAMS 选项之后,为键 mystream 请求的 ID 是特殊 ID >。这个特殊 ID 仅在消费者组的上下文中有效,它意味着:到目前为止从未交付给其他消费者的消息。
这几乎总是你想要的,然而也可以指定一个真实的 ID,例如 0 或任何其他有效的 ID,在这种情况下,我们请求 XREADGROUP 仅向我们提供其 待处理消息的历史记录,在这种情况下,将永远不会在组中看到新消息。因此,基本上 XREADGROUP 根据我们指定的 ID 具有以下行为:
- 如果 ID 是特殊 ID
>,则命令将仅返回到目前为止从未交付给其他消费者的新消息,并且作为副作用,将更新消费者组 最后 ID。 - 如果 ID 是任何其他有效的数字 ID,则命令将让我们访问我们的 待处理消息历史记录。也就是说,已交付给此指定消费者(由提供的名称标识)且到目前为止从未使用 XACK 确认的消息集。
我们可以立即通过指定 ID 为 0 来测试此行为,不使用任何 COUNT 选项:我们只会看到唯一的待处理消息,即关于苹果的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
然而,如果我们确认消息已处理,它将不再是待处理消息历史的一部分,因此系统将不再报告任何内容:
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
如果你还不了解 XACK 的工作原理,这个概念只是已处理的消息不再是我们可以访问的历史的一部分。
现在轮到 Bob 读取一些东西了:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
Bob 要求最多两条消息,并通过同一组 mygroup 读取。因此发生的情况是 Redis 仅报告 新 消息。正如你所看到的,"apple" 消息没有被传递,因为它已经传递给了 Alice,所以 Bob 获得了 orange 和 strawberry,依此类推。
这样 Alice、Bob 和组中的任何其他消费者,都能够从同一流中读取不同的消息,读取其尚未处理的待处理消息历史,或将消息标记为已处理。这允许创建不同的拓扑和语义来从流中消费消息。
有几件事要记住:
- 消费者在第一次被提及时自动创建,无需显式创建。
- 即使使用 XREADGROUP,你也可以同时从多个键读取,但是要使这项工作正常进行,你需要在每个流中创建一个具有相同名称的消费者组。这不是一个常见的需求,但值得提及该功能在技术上可用。
- XREADGROUP 是一个 写入命令,因为即使它从流中读取,消费者组也会作为读取的副作用而被修改,因此只能在主实例上调用。
使用消费者组的消费者实现示例,用 Ruby 语言编写,可能如下所示。Ruby 代码的编写方式旨在让任何经验丰富的用其他语言编程且不了解 Ruby 的程序员都能阅读:
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# 根据迭代选择 ID:第一次我们想
# 读取我们的待处理消息,以防我们崩溃并正在恢复。
# 一旦我们消费了我们的历史记录,我们就可以开始获取新消息。
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# 如果我们收到空回复,这意味着我们正在消费我们的历史记录
# 并且历史记录现在为空。让我们开始消费新消息。
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# 处理消息
process_message(id,fields)
# 确认消息已处理
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
正如你所看到的,这里的想法是开始消费历史记录,即我们的待处理消息列表。这很有用,因为消费者可能之前崩溃了,因此在重新启动时,我们希望重新读取交付给我们但没有得到确认的消息。这样我们可以多次处理一条消息或一次(至少在消费者故障的情况下,但还涉及 Redis 持久性和复制的限制,请参阅关于此主题的特定部分)。
一旦历史记录被消费,并且我们获得空的消息列表,我们可以切换为使用 > 特殊 ID 来消费新消息。
*从永久故障中恢复
上面的示例允许我们编写参与同一消费者组的消费者,每个消费者接收要处理的子集消息,并从故障中恢复,重新读取仅交付给它们的待处理消息。然而,在现实世界中,消费者可能会永久故障并且永远不会恢复。在因任何原因停止后从未恢复的消费者,其待处理消息会发生什么情况?
Redis 消费者组提供了一种功能,在这种情况下用于 认领 给定消费者的待处理消息,以便此类消息将更改所有权,并将重新分配给不同的消费者。该功能非常明确,消费者必须检查待处理消息列表,并且必须使用特殊命令认领特定消息,否则服务器将永远将待处理消息分配给旧消费者,这样不同的应用程序可以选择是否使用此类功能,以及确切的使用方式。
此过程的第一步只是一个提供消费者组中待处理条目可观察性的命令,称为 XPENDING。这只是一个只读命令,始终安全可调用,并且不会更改任何消息的所有权。在其最简单的形式中,该命令只需使用两个参数调用,即流的名称和消费者组的名称。
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
当以这种方式调用时,该命令仅输出消费者组中的待处理消息总数,在本例中只是两条消息,待处理消息中的最低和最高消息 ID,最后是消费者列表及其拥有的待处理消息数量。我们只有 Bob 有两条待处理消息,因为 Alice 请求的唯一消息已使用 XACK 确认。
我们可以通过向 XPENDING 提供更多参数来请求更多信息,因为完整的命令签名如下:
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<consumer-name>]]
通过提供起始和结束 ID(可以是 XRANGE 中的 - 和 +)以及用于控制命令返回信息量的计数,我们能够了解更多关于待处理消息的信息。可选的最终参数,即消费者名称,用于如果我们只想将输出限制为仅某个消费者的待处理消息,但在以下示例中我们不会使用此功能。
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
现在我们有了每条消息的详细信息:ID、消费者名称、空闲时间(以毫秒为单位),即自上次将消息交付给某个消费者以来经过了多少毫秒,最后是给定消息被交付的次数。我们有两条来自 Bob 的消息,它们空闲了 74170458 毫秒,大约 20 小时。
请注意,没有什么能阻止我们检查第一条消息的内容,只需使用 XRANGE。
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
我们只需在参数中重复相同的 ID 两次。现在我们有了一些想法,Alice 可能决定在 20 小时没有处理消息后,Bob 可能不会及时恢复,是时候 认领 这些消息并代替 Bob 恢复处理了。为此,我们使用 XCLAIM 命令。
此命令在其完整形式中非常复杂且充满选项,因为它用于复制消费者组更改,但我们通常只使用我们需要的参数。在这种情况下,调用它就像这样简单:
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
基本上我们说,对于这个特定的键和组,我们希望指定的消息 ID 更改所有权,并将分配给指定的消费者名称 <consumer>。然而,我们还提供了一个最小空闲时间,以便该操作仅在所提及消息的空闲时间大于指定的空闲时间时才有效。这很有用,因为也许两个客户端同时尝试认领一条消息:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
然而认领一条消息,作为副作用,将重置其空闲时间!并且会增加其交付次数计数器,因此第二个客户端将认领失败。通过这种方式,我们避免了消息的简单重新处理(即使在一般情况下你无法获得恰好一次处理)。
这是命令执行的结果:
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
消息已成功被 Alice 认领,她现在可以处理消息并确认它,即使原始消费者没有恢复,也能推动事情向前发展。
从上面的例子可以清楚地看出,作为成功认领给定消息的副作用,XCLAIM 命令还会返回它。然而这不是强制性的。可以使用 JUSTID 选项,以便仅返回成功认领的消息的 ID。如果你希望减少客户端和服务器之间使用的带宽,以及命令的性能,并且你对消息不感兴趣,因为你的消费者以实现方式定期重新扫描待处理消息的历史记录,这很有用。
认领也可以由单独的进程实现:一个只检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者。可以使用 Redis Stream 的可观察性功能之一获得活跃的消费者。这是下一节的主题。
*认领和交付计数器
你在 XPENDING 输出中观察到的计数器是每条消息的交付次数。当消息通过 XCLAIM 成功认领或通过 XREADGROUP 调用访问待处理消息的历史记录时,此类计数器会递增。
当出现故障时,消息被多次交付是正常的,但最终它们通常会被处理。然而,有时存在无法处理给定特定消息的问题,因为它已损坏或以触发处理代码中错误的方式精心制作。在这种情况下,消费者将不断无法处理此特定消息。由于我们有交付尝试次数计数器,我们可以使用该计数器来检测由于某种原因根本无法处理的消息。因此,一旦交付计数器达到你选择的一个大数字,将此类消息放入另一个流并向系统管理员发送通知可能是更明智的。这基本上是 Redis Stream 实现 死信 概念的方式。
*Stream 可观察性
缺乏可观察性的消息传递系统很难处理。不知道谁正在消费消息,什么消息是待处理的,给定流中的活动消费者组集是什么,使一切不透明。因此,Redis Stream 和消费者组有不同的方式来观察正在发生的事情。我们已经介绍了 XPENDING,它允许我们在给定时刻检查正在处理的消息列表,以及它们的空闲时间和交付次数。
然而我们可能想做得更多,而 XINFO 命令是一个可观察性接口,可用于子命令以获取有关流或消费者组的信息。
此命令使用子命令来显示有关流及其消费者组状态的不同信息。例如 XINFO STREAM
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1526569495631-0
2) 1) "message"
2) "apple"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
输出显示了有关流在内部如何编码的信息,还显示了流中的第一条和最后一条消息。另一个可用的信息是与此流值关联的消费者组数量。我们可以进一步询问有关消费者组的更多信息。
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
7) last-delivered-id
8) "1588152489012-0"
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
7) last-delivered-id
8) "1588152498034-0"
正如你在上面和之前的输出中看到的,XINFO 命令输出一系列字段-值项目。因为它是一个可观察性命令,这允许人类用户立即理解报告了什么信息,并允许命令在未来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更高效地使用带宽的命令,如 XPENDING,只报告信息而不带字段名称。
上面使用 GROUPS 子命令的示例输出,通过观察字段名称应该很清楚。我们可以通过检查注册在该组中的消费者来更详细地检查特定消费者组的状态。
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
如果你不记得命令的语法,只需向命令本身请求帮助:
> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.
3) GROUPS <key> -- Show the stream consumer groups.
4) STREAM <key> -- Show information about the stream.
5) HELP -- Print this help.
*与 Kafka (TM) 分区的区别
Redis Stream 中的消费者组在某些方面可能类似于 Kafka (TM) 基于分区的消费者组,然而请注意,Redis Stream 实际上非常不同。分区只是 逻辑 上的,消息只是放入单个 Redis 键中,因此不同客户端被服务的方式基于谁准备好处理新消息,而不是客户端从哪个分区读取。例如,如果消费者 C3 在某个点永久故障,Redis 将继续向 C1 和 C2 提供所有到达的新消息,就好像现在只有两个 逻辑 分区一样。
同样,如果给定消费者处理消息的速度比其他消费者快得多,该消费者将在同一单位时间内按比例接收更多消息。这是可能的,因为 Redis 显式跟踪所有未确认的消息,并记住谁收到了哪条消息以及第一条从未交付给任何消费者的消息的 ID。
然而,这也意味着在 Redis 中,如果你真的想将同一流中的消息分区到多个 Redis 实例中,你必须使用多个键和一些分片系统,如 Redis 集群或某些其他特定于应用程序的分片系统。单个 Redis 流不会自动分区到多个实例中。
我们可以说,在示意图上,以下情况是真实的:
- 如果你使用 1 个流 -> 1 个消费者,你正在按顺序处理消息。
- 如果你使用 N 个流与 N 个消费者,因此只有给定消费者命中 N 个流的子集,你可以扩展上述 1 个流 -> 1 个消费者的模型。
- 如果你使用 1 个流 -> N 个消费者,你正在向 N 个消费者进行负载均衡,然而在这种情况下,关于同一逻辑项的消息可能会无序消费,因为给定消费者可能比另一个消费者处理消息 3 更快,而另一个消费者正在处理消息 4。
因此,基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键。而 Redis 消费者组是一个服务器端负载均衡系统,将来自给定流的消息均衡到 N 个不同的消费者。
*有上限的 Stream
许多应用程序不想永远收集流中的数据。有时,在流中拥有最大给定数量的项目是有用的,其他时候,一旦达到给定大小,将数据从 Redis 移动到不在内存中且不那么快但适合将历史记录保存数十年的存储中是有用的。Redis Stream 对此有一些支持。一个是 XADD 命令的 MAXLEN 选项。这个选项非常容易使用:
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
使用 MAXLEN,当达到指定长度时,旧条目会自动驱逐,以便流保持恒定大小。目前没有选项告诉流只保留不超过给定数量的项目,因为这样的命令为了保持一致运行,可能必须阻塞很长时间才能驱逐项目。想象一下,例如,如果有一个插入高峰,然后是一个长时间的暂停,然后再一次插入,所有这些都具有相同的最大时间。流将在暂停期间阻止驱逐变得太旧的数据。因此,由用户进行一些规划并了解所需的最大流长度。此外,虽然流的长度与使用的内存成正比,但按时间修剪不太容易控制和预测:它取决于插入速率,这是一个经常随时间变化的变量(当它不改变时,那么仅按大小修剪是微不足道的)。
然而,使用 MAXLEN 进行修剪可能很昂贵:流由基数树中的宏节点表示,以便非常节省内存。改变由几十个元素组成的单个宏节点不是最优的。因此可以使用以下特殊形式的命令:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
MAXLEN 选项和实际计数之间的 ~ 参数意味着,我不真正需要这恰好是 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保保存至少 1000 个项目即可。使用此参数,仅当我们能够删除整个节点时才执行修剪。这使其效率更高,而且通常正是你想要的。
还有 XTRIM 命令,它执行与上面 MAXLEN 选项非常相似的操作,除了它可以单独运行:
> XTRIM mystream MAXLEN 10
或者,与 XADD 选项一样:
> XTRIM mystream MAXLEN ~ 10
然而,XTRIM 旨在接受不同的修剪策略,即使目前仅实现了 MAXLEN。
由于 XTRIM 是一个显式命令,用户应该了解不同修剪策略可能的缺点。因此,稍后可能会实现按时间修剪。
XTRIM 稍后可能学习的另一个有用的驱逐策略是按 ID 范围删除,以便在需要时更轻松地使用 XRANGE 和 XTRIM 将数据从 Redis 移动到其他存储系统。
*Stream API 中的特殊 ID
你可能已经注意到,Redis API 中有几个特殊 ID 可以使用。这里是一个简短的回顾,以便它们在未来更有意义。
前两个特殊 ID 是 - 和 +,它们用于使用 XRANGE 命令进行范围查询。这两个 ID 分别表示可能的最小 ID(基本上是 0-1)和可能的最大 ID(即 18446744073709551615-18446744073709551615)。正如你所看到的,写 - 和 + 比写那些数字要干净得多。
然后有一些 API,我们想说的是,流中 ID 最大的项目的 ID。这就是 $ 的含义。因此,例如,如果我只想要 XREADGROUP 的新条目,我使用这样的 ID 来告诉我已经拥有所有现有条目,但未来插入的新条目还没有。类似地,当我创建或设置消费者组的 ID 时,我可以将最后交付的项目设置为 $,以便只向使用该组的消费者交付新条目。
正如你所看到的,$ 并不意味着 +,它们是两回事,因为 + 是每个可能的流中可能的最大 ID,而 $ 是给定包含特定条目的流中的最大 ID。此外,API 通常只会理解 + 或 $,然而避免为给定符号加载多个含义是有用的。
另一个特殊 ID 是 >,它只与消费者组相关,并且仅在使用 XREADGROUP 命令时使用。这个特殊 ID 意味着我们只想要到目前为止从未交付给其他消费者的条目。因此,基本上 > ID 是消费者组的 最后交付 ID。
最后,特殊 ID *,只能与 XADD 命令一起使用,意味着为我们自动选择新条目的 ID。
所以我们有 -、+、$、> 和 *,它们都有不同的含义,而且大多数时候,可以在不同的上下文中使用。
*持久性、复制和消息安全性
Stream 与任何其他 Redis 数据结构一样,异步复制到副本并持久化到 AOF 和 RDB 文件中。然而,可能不那么明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本,因此如果消息在主节点中待处理,副本也将具有相同的信息。同样,在重新启动后,AOF 将恢复消费者组状态。
然而请注意,Redis Stream 和消费者组使用 Redis 默认复制进行持久化和复制,因此:
- 如果消息的持久性对您的应用程序很重要,则 AOF 必须与强 fsync 策略一起使用。
- 默认情况下,异步复制不能保证 XADD 命令或消费者组状态更改被复制:在故障转移后,可能会丢失某些内容,具体取决于副本从主节点接收数据的能力。
- WAIT 命令可用于强制将更改传播到一组副本。然而请注意,虽然这使得数据丢失非常不可能,但由 Sentinel 或 Redis 集群操作的 Redis 故障转移过程仅执行 尽力而为 检查以故障转移到最更新的副本,并且在某些特定故障下可能会提升缺少某些数据的副本。
因此,在使用 Redis Stream 和消费者组设计应用程序时,请确保了解应用程序在故障期间应具有什么样的语义属性,并相应地配置事物,评估它是否对您的用例足够安全。
*从流中删除单个项目
Stream 还有一个特殊的命令,用于从流的中间按 ID 删除项目。通常对于只追加数据结构,这可能看起来是一个奇怪的功能,但它实际上对涉及隐私法规的应用程序很有用。该命令称为 XDEL,只需获取流名称,然后是删除的 ID:
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
然而,在当前实现中,直到宏节点完全为空时,内存才真正被回收,因此你不应该滥用此功能。
*零长度流
Stream 与其他 Redis 数据结构之间的一个区别是,当其他数据结构不再具有元素时,由于调用删除元素的命令的副作用,键本身将被删除。因此,例如,当调用 ZREM 移除有序集合中的最后一个元素时,有序集合将被完全删除。然而 Stream 允许保持在零元素,既作为使用计数为零的 MAXLEN 选项(XADD 和 XTRIM 命令)的结果,也因为调用了 XDEL。
存在这种不对称的原因是,Stream 可能有相关联的消费者组,我们不希望仅仅因为流中不再有项目就丢失消费者组定义的状态。目前,即使没有相关联的消费者组,流也不会被删除,但这在将来可能会改变。
*消费消息的总延迟
非阻塞流命令(如 XRANGE 和 XREAD 或不带 BLOCK 选项的 XREADGROUP)像任何其他 Redis 命令一样同步服务,因此讨论此类命令的延迟是没有意义的:更有趣的是在 Redis 文档中检查命令的时间复杂度。应该足以说,流命令在提取范围时至少与有序集合命令一样快,并且 XADD 非常快,如果在平均机器上使用流水线,每秒可以轻松插入 50 万到 100 万个项目。
然而,如果我们想了解处理消息的延迟,在消费者组中阻塞消费者的上下文中,从通过 XADD 生成消息的时刻,到消费者获得消息因为 XREADGROUP 返回消息的时刻,延迟成为一个有趣的参数。
*如何为阻塞的消费者提供服务
在提供已执行测试的结果之前,了解 Redis 使用什么模型来路由流消息(以及通常实际上如何管理任何等待数据的阻塞操作)是很有趣的。
- 阻塞客户端被引用在一个哈希表中,该表将至少有一个阻塞消费者的键映射到等待此类键的消费者列表。这样,给定一个接收到数据的键,我们可以解析所有等待此类数据的客户端。
- 当写入发生时,在这种情况下,当调用 XADD 命令时,它调用
signalKeyAsReady()函数。此函数将键放入需要处理的键列表中,因为这些键可能有新的数据供阻塞的消费者使用。请注意,此类 就绪键 将在稍后处理,因此在同一事件循环周期中,键可能会收到其他写入。 - 最后,在返回到事件循环之前,就绪键 最终被处理。对于每个键,运行等待数据的客户端列表,如果适用,这些客户端将收到到达的新数据。对于流,数据是消费者请求的适用范围内的消息。
正如你所看到的,基本上,在返回到事件循环之前,XADD 的调用者和阻塞以消费消息的客户端都将在输出缓冲区中收到它们的回复,因此 XADD 的调用者应该在消费者收到新消息的同时收到来自 Redis 的回复。
此模型是 基于推送 的,因为将数据添加到消费者缓冲区将由调用 XADD 的操作直接执行,因此延迟往往非常可预测。
*延迟测试结果
为了检查这种延迟特性,使用多个 Ruby 程序实例进行了测试,这些程序推送消息,其附加字段为计算机毫秒时间,以及从消费者组读取消息并处理它们的 Ruby 程序。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。
此类程序未经过优化,并在同时运行 Redis 的小型双核实例中执行,以试图提供您可以在非最佳条件下预期的延迟数字。消息以每秒 10k 的速度生成,十个同时的消费者从同一 Redis 流和消费者组中消费和确认消息。
获得的结果:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
因此,99.9% 的请求延迟 <= 2 毫秒,异常值仍然非常接近平均值。
向流中添加数百万条未确认的消息不会改变基准测试的要点,大多数查询仍然以非常短的延迟处理。
一些备注:
- 在这里,我们每次迭代处理多达 10k 条消息,这意味着 XREADGROUP 的
COUNT参数设置为 10000。这增加了很多延迟,但是需要允许慢速消费者能够跟上消息流。因此,您可以预期现实世界中的延迟要小得多。 - 用于此基准测试的系统与今天的标准相比非常慢。