Kafka 消费者组 CLI

17 Mar 2025 | 5 分钟阅读

通常,一个 Kafka 消费者属于一个特定的消费者组。一个消费者组基本上代表一个应用程序的名称。为了在消费者组中消费消息,使用 '-group' 命令。

让我们看看消费者将如何从 Kafka 主题中消费消息

步骤 1: 打开 Windows 命令提示符。

步骤 2: 使用 '-group' 命令:'kafka-console-consumer -bootstrap-server localhost:9092 -topic-group <group_name>'。 给该组取个名字。 按回车键。

Kafka Consumer Group CLI

在上图中,该组的名称是 'first_app'。 看起来没有显示任何消息,因为没有新消息产生到这个主题。如果使用 '-from-beginning' 命令,将显示所有以前的消息。

步骤 3: 要查看一些新消息,从生产者控制台(如上一节所做的那样)生成一些即时消息。

Kafka Consumer Group CLI

因此,生产者产生的新消息可以在消费者的控制台中看到。

步骤 4: 但是,这是该组中单个消费者读取数据。让我们创建更多消费者以了解消费者组的强大功能。为此,打开一个新的终端,并键入完全相同的消费者命令

'kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic <topic_name> --group <group_name>'。

Kafka Consumer Group CLI
Kafka Consumer Group CLI

在上图中,很明显,生产者正在将数据发送到 Kafka 主题。两个消费者正在消费消息。查看消息的顺序。由于为“myfirst”主题创建了三个分区(前面讨论过),因此消息仅按该顺序拆分。

我们可以在同一个组下创建更多消费者,每个消费者将根据分区的数量来消费消息。自己尝试一下,以便更好地理解。

注意: 组 ID 应该相同,这样消息才能在消费者之间拆分。

但是,如果任何一个消费者被终止,分区将重新分配给活动消费者,并且这些活动消费者将接收消息。

因此,通过这种方式,消费者组中的各种消费者会从 Kafka 主题中消费消息。

带有键的消费者

当生产者将键值与数据关联时,它将存储到指定的分区。如果未指定键值,数据将移动到任何分区。因此,当消费者读取带有键的消息时,如果没有指定键,它将显示为 null。为了从 Kafka 主题中消费消息,需要一个“print.key”和一个“key.seperator”。使用的命令是

'kafka-console-consumer -bootstrap-server localhost:9092 -topic <topic_name> --from-beginning -property print.key=true -property key.seperator=,'

使用上述命令,消费者可以读取带有指定键的数据。

关于消费者组的更多信息

'--from-beginning' 命令

此命令用于从开始处读取消息(前面讨论过)。因此,在消费者组中使用它将给出以下输出

Kafka Consumer Group CLI

可以看出,使用一个新的消费者组 'second_app' 来从头开始读取消息。如果再次运行相同的命令,它将不会显示任何输出。这是因为偏移量已在 Apache Kafka 中提交。因此,一旦一个消费者组读取了所有直到写入的消息,下一次,它将仅读取新消息。

例如,在下面的快照中,当再次使用 '-from-beginning' 命令时,仅读取新消息。这是因为所有以前的消息都已在之前被消费了。

Kafka Consumer Group CLI

'kafka-consumer-groups' 命令

此命令提供整个文档以列出所有组,描述组、删除消费者信息或重置消费者组偏移量。

Kafka Consumer Group CLI

它需要一个引导服务器,供客户端对消费者组执行不同的功能。

列出消费者组

使用 '-list' 命令列出 Kafka 集群中可用的消费者组的数量。 该命令的使用方法是

'kafka-consumer-groups.bat -bootstrap-server localhost:9092 -list'。

下面显示了一个快照,存在三个消费者组。

Kafka Consumer Group CLI

描述消费者组

使用 '--describe' 命令描述一个消费者组。该命令的使用方法是

'kafka-consumer-groups.bat -bootstrap-server localhost:9092 -describe group <group_name>'

Kafka Consumer Group CLI

此命令描述是否存在任何活动消费者,当前偏移量值,滞后值为 0 - 表明消费者已读取所有数据。

重置偏移量

偏移量已在 Apache Kafka 中提交。因此,如果用户想再次读取消息,则需要重置偏移量值。 'Kafka-consumer-groups' 命令提供了一个重置偏移量的选项。重置偏移量值意味着定义用户希望再次从中读取消息的点。它一次只支持一个消费者组,并且该组不应有任何活动实例。

在重置偏移量时,用户需要选择三个参数

  1. 一个执行选项
  2. 重置规范
  3. 范围

有两个可用的执行选项

'-dry-run': 这是默认的执行选项。此选项用于规划那些需要重置的偏移量。

'

-execute': 此选项用于更新偏移量值。

可以使用以下重置规范

'-to-datetime': 它根据日期时间(从日期时间偏移量)重置偏移量。使用的格式为:'YYYY-MM-DDTHH:mm:SS.sss'。

'--to-earliest': 它将偏移量重置为最早的偏移量。

' --to-latest': 它将偏移量重置为最新的偏移量。

'--shift-by': 它通过将当前偏移量值移动 'n' 来重置偏移量。'n' 的值可以为正数或负数。

'--from-file': 它将偏移量重置为 CSV 文件中定义的值。

' --to-current': 它将偏移量重置为当前偏移量。

有两个作用域可供定义

'-all-topics': 它将重置组中所有可用主题的偏移量值。

'-topics': 它仅重置指定主题的偏移量值。用户需要指定主题名称以重置偏移量值。

让我们尝试看看

1) 使用 '-to-earliest' 命令

Kafka Consumer Group CLI

在上图中,偏移量被重置为新偏移量 0。这是因为使用了 '-to-earliest' 命令,该命令将偏移量值重置为 0。

2) 使用 '-shift-by' 命令

Kafka Consumer Group CLI
Kafka Consumer Group CLI

在第一个快照中,偏移量值从 '0' 移动到 '+2'。 在第二个快照中,偏移量值从 '2' 移动到 '-1'。

注意: 要将偏移量值移动到正数,不必使用 '+' 符号。 默认情况下,它将被视为正数。