Kafka CLI 和工具

2025 年 1 月 23 日 | 阅读 16 分钟
Kafka CLI and Tools

Apache Kafka 是一个分布式流媒体平台,用于构建实时数据管道和流应用程序。它旨在处理高吞吐量、容错性和可伸缩性。Kafka Consumer CLI(命令行接口)是一个用于从 Kafka 主题消费消息的工具。以下是其关键组件和功能的概述

Kafka Consumer CLI 关键组件

  1. Kafka 消费者: 读取 Kafka 主题消息的主要组件。它订阅一个或多个主题并处理记录。
  2. 主题: 存储和发布记录的类别或源名称。
  3. 分区: Kafka 主题分为多个分区,以实现并行处理。
  4. 偏移量: 记录在分区中的位置。消费者使用偏移量来跟踪已读取的消息。

Kafka Consumer CLI 功能

  1. 订阅主题: 允许消费者订阅一个或多个主题以开始读取消息。
  2. 轮询消息: 从消费者订阅的主题中检索记录。轮询方法返回一批记录。
  3. 提交偏移量: 跟踪消费者读取的消息。偏移量可以手动或自动提交。
  4. 消费者组: 一组消费者协同工作以从主题读取数据。组中的每个消费者从唯一的分区读取。
  5. 消息处理: 消息被消费后,可以根据应用程序逻辑进行处理。
  6. 错误处理: 处理消息消费过程中可能发生的错误。

基本的 Kafka Consumer CLI 命令

1. 启动 Kafka 消费者

2. 从特定偏移量消费消息

3. 作为消费者组的一部分消费消息

4. 使用键值反序列化消费消息

Kafka 主题 CLI

Kafka Topics CLI 是 Apache Kafka 提供的命令行接口工具,旨在有效管理 Kafka 主题。Kafka 主题是 Kafka 中必不可少的组件,充当消息发布和存储的类别或通道。Kafka Topics CLI 使管理员和开发人员能够对这些主题执行各种操作,例如创建、列出、描述、修改和删除它们。

Kafka CLI and Tools

主要功能

  1. 创建主题
    • 该工具允许用户创建新主题,这涉及指定分区数量和复制因子等参数。
    • 分区将主题分为多个段,允许并行处理和可伸缩性。
    • 复制因子确定每个分区在不同代理之间维护多少个副本,以实现容错和数据冗余。
  2. 列出主题
    • 此功能列出 Kafka 集群中的所有现有主题,提供可用主题及其当前状态的概述。
  3. 描述主题
    • 描述主题提供有关主题的详细信息,包括分区数量、每个分区的领导者、副本和同步副本 (ISR) 等元数据。
    • 此信息对于监控和管理主题的健康状况和性能至关重要。
  4. 删除主题
    • 用户可以删除指定的主题,将其及其数据从 Kafka 集群中删除。
    • 删除主题必须谨慎进行,因为它是不可逆的操作,会导致数据丢失。
  5. 修改主题
    • 该工具允许修改现有主题配置。常见的更改包括向主题添加分区以处理增加的负载或更改保留策略等配置。

Kafka CLI 工具

Kafka 提供了一套命令行接口 (CLI) 工具,用于与 Kafka 集群、主题、生产者和消费者进行交互和管理。这些工具对于 Kafka 管理员和开发人员高效管理 Kafka 操作和调试问题至关重要。

Kafka CLI and Tools

Kafka CLI 工具包含在 Kafka 分发包中,并提供各种命令来与 Kafka 生态系统的不同组件进行交互。这些工具根据其功能分为几类

  1. 主题管理: 创建、列出、描述、修改和删除主题。
  2. 数据生产和消费: 生产和消费消息。
  3. 配置管理: 管理主题、代理和其他组件的配置。
  4. 集群管理: 管理 Kafka 代理和集群。
  5. 实用工具: 用于检查和调试 Kafka 设置的各种实用工具。

1. Apache Kafka 中的主题管理工具

Apache Kafka 提供了一套用于管理 Kafka 主题的命令行接口 (CLI) 工具。这些工具使用户能够在 Kafka 集群中创建、列出、描述、修改和删除主题。正确的主题管理对于优化 Kafka 的性能、可靠性和可伸缩性至关重要。本节全面概述 Kafka 的主题管理工具,主要关注 kafka-topics.sh。

kafka-topics.sh

kafka-topics.sh 脚本是管理 Kafka 主题的主要工具。它提供了各种命令来处理主题生命周期和配置。

下面是其功能的详细说明

创建主题

创建主题是 Kafka 中的基本操作之一。创建主题时,可以指定分区数量和复制因子。

命令

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。
  • --replication-factor <replication_factor>: 设置主题的副本数量。
  • --partitions <num_partitions>: 定义主题的分区数量。
  • --topic <topic_name>: 要创建的主题名称。

示例

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 5 --topic my_topic

此命令创建一个名为 my_topic 的主题,该主题具有 5 个分区和 3 个复制因子。

列出主题

列出主题提供了一种快速查看 Kafka 集群中所有主题的方法。

命令

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。

示例

此命令列出 Kafka 集群中所有可用的主题。

描述主题

描述主题提供有关主题配置、分区和副本的详细信息。

命令

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。
  • --topic <topic_name>: 要描述的主题名称。

示例

此命令提供有关 my_topic 的详细信息,包括其分区、副本和配置。

修改主题

修改主题允许您更改其配置,例如增加分区数量或修改主题级配置。

增加分区

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。
  • --topic <topic_name>: 要修改的主题名称。
  • --partitions <new_num_partitions>: 主题的新分区数量。

示例

此命令将 my_topic 中的分区数量增加到 10。

修改配置

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。
  • --entity-type topics: 指示配置适用于主题。
  • --entity-name <topic_name>: 要修改的主题名称。
  • --add-config <config_name>=<config_value>: 指定要修改的配置参数。

示例

此命令将 my_topic 的保留时间设置为 2 小时。

删除主题

删除主题会将其从 Kafka 集群中删除。此操作不可逆,因此应谨慎执行。

命令

参数

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理。
  • --topic <topic_name>: 要删除的主题名称。

示例

此命令从 Kafka 集群中删除 my_topic。

详细示例和用例

使用自定义配置创建主题

有时您可能需要使用特定配置创建主题,例如自定义保留策略或压缩类型。

命令

此命令创建一个名为 custom_topic 的主题,该主题具有 3 个分区、2 个复制因子、1 天的保留策略(86400000 毫秒),并使用 gzip 压缩消息。

描述多个主题

您可以通过在一个命令中列出多个主题来一次描述它们。

命令

此命令提供有关 topic1、topic2 和 topic3 的详细信息。

修改主题的保留配置

更改主题的保留策略有助于管理 Kafka 使用的存储空间。

命令

此命令将 my_topic 的保留策略设置为 12 小时(43200000 毫秒)。

检查主题是否存在

在对主题执行操作之前,检查主题是否存在通常很有用。

命令

此命令列出所有主题并过滤输出以检查 my_topic 是否存在。

2. 数据生产工具:kafka-console-producer.sh

kafka-console-producer.sh 工具用于从命令行向 Kafka 主题生产消息。此工具对于测试和开发目的非常方便,用户可以手动输入消息或与脚本集成以将数据发送到 Kafka。

基本用法

要开始向 Kafka 主题生产消息,请使用以下命令格式

afka-console-producer.sh --broker-list <broker_list> --topic <topic_name>

  • --broker-list <broker_list>: 指定要连接的 Kafka 代理列表。应以 <host>:<port> 格式提供。
  • --topic <topic_name>: 指定将消息生产到的 Kafka 主题的名称。

示例

消息输入

生产者启动后,它进入交互模式,可以直接在控制台中键入消息。在控制台中输入的每一行文本都将被视为单独的消息并发送到指定的 Kafka 主题。

附加选项

  • 消息键: 您可以使用 --property 选项为每条消息指定消息键

此示例设置消息的键和值分隔符。

  • 消息确认: 默认情况下,消息是异步发送的。您可以使用 --producer-property 选项控制确认

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic --producer-property acks=all

这确保消息在确认之前成功写入所有副本。

  • 安全设置: 如果您的 Kafka 集群启用了安全功能 (SSL/TLS, SASL),您可以使用 --producer.config 传递安全配置

3. 数据消费工具:kafka-console-consumer.sh

kafka-console-consumer.sh 工具用于从 Kafka 主题消费消息并将其打印到控制台。它支持各种消费模式,对于监控和调试 Kafka 数据流至关重要。

Kafka CLI and Tools

基本用法

要开始从 Kafka 主题消费消息,请使用以下命令格式

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理列表。
  • --topic <topic_name>: 指定将从中消费消息的 Kafka 主题的名称。
  • --from-beginning: 可选地从主题的开头开始消费消息。对于调试或重新处理很有用。
  • --group <consumer_group>: 指定消费者所属的消费者组。此选项在作为消费者组的一部分消费消息时使用。

示例

消息输出

消费者启动后,它会不断从指定主题中获取消息并将其打印到控制台。每条消息都包含键、值、分区和偏移量,从而提供对消息流的洞察。

附加选项

  • 消费者组: 作为消费者组的一部分消费消息时,请使用 --group 选项
  • 偏移量管理: 使用 --offset 指定开始消费的特定偏移量
  • 安全设置: 与生产者类似,消费者可以使用 --consumer.config 传递安全配置
  • 消息格式化: 使用 --formatter 等选项调整消息格式,以控制消息的显示方式。

kafka-configs.sh

kafka-configs.sh 工具用于管理 Kafka 实体的配置,例如主题、代理和客户端。它允许用户动态查看、修改、添加和删除配置。

查看配置

要查看 Kafka 实体(主题、代理、客户端)的配置,请使用 --describe 命令

  • --bootstrap-server <broker_list>: 指定要连接的 Kafka 代理列表。
  • --entity-type <entity_type>: 指定实体类型(主题、代理、客户端)。
  • --entity-name <entity_name>: 指定要描述配置的实体名称。

示例(描述主题配置)

修改配置

要修改 Kafka 实体的配置,请使用 --alter 命令

  • --add-config <config_key>=<config_value>: 指定要添加或修改的配置参数。

示例(修改主题配置)

删除配置

要删除 Kafka 实体的配置,请使用 --alter 命令和 --delete-config 选项

  • --delete-config <config_key>: 指定要删除的配置参数。

示例(删除主题配置)

配置管理最佳实践

  1. 配置版本控制: 维护配置的版本控制或备份,以方便回滚和审计跟踪。
  2. 动态配置: 利用动态配置更改(--alter)来调整 Kafka 集群设置,而无需停机。
  3. 监控: 定期监控配置的一致性和是否符合最佳实践。
  4. 安全: 通过限制对工具的访问和实施最小权限原则来保护配置。
  5. 自动化: 使用脚本或自动化工具来高效管理多个 Kafka 集群的配置。

用例

设置代理配置

要配置 Kafka 代理,您可以动态修改日志保留设置或安全配置等属性

管理主题配置

对于主题,您可能会根据数据保留策略调整保留期限或段大小

客户端配置

管理 Kafka 客户端的配置以优化性能或调整安全设置

4. Kafka 集群管理工具

Kafka 代理管理

  1. kafka-server-start.sh / kafka-server-stop.sh
    • 这些脚本用于单独启动和停止 Kafka 代理。它们对于管理集群中 Kafka 代理的生命周期至关重要。
  2. kafka-configs.sh
    • 如前所述,kafka-configs.sh 用于管理 Kafka 代理的配置(--entity-type brokers),允许动态更改日志保留或安全设置等属性。

集群监控和健康检查

  1. kafka-topics.sh
    • 虽然主要用于主题管理,但 kafka-topics.sh 也可以列出(--list)集群中的所有主题,提供跨代理的主题分布概览。
  2. kafka-consumer-groups.sh
    • 此工具帮助监控和管理 Kafka 集群中的消费者组。它提供有关消费者组滞后、偏移量和消费进度(--describe)的信息。
  3. kafka-console-consumer.sh
    • 尽管主要用于数据消费,但 kafka-console-consumer.sh 可以有助于监控消息流并验证跨代理的数据可用性和一致性。

扩展和弹性

  1. 分区重分配工具
    • Kafka 提供实用程序 (kafka-reassign-partitions.sh) 来手动在代理之间重新分配分区。此工具对于通过添加新代理来重新平衡工作负载或横向扩展至关重要。
  2. 集群扩展
    • 向集群添加新代理 (kafka-server-start.sh) 需要调整配置,并可能需要重新分配分区以确保工作负载的均匀分布。

安全管理

  1. 安全配置工具
    • 管理安全设置 (kafka-configs.sh),例如 SSL/TLS 配置或 SASL 身份验证,对于保护 Kafka 集群免受未经授权的访问至关重要。

Kafka 控制台生产者 CLI

Apache Kafka 是一个强大的分布式事件流平台,专为高吞吐量和低延迟数据流而设计。Kafka 提供的关键实用程序之一是 Kafka 控制台生产者 CLI,它允许用户直接从命令行向 Kafka 主题生产消息。此工具对于测试、调试和获取 Kafka 实践经验非常有价值,而无需编写自定义代码。

Kafka 控制台生产者 CLI,通常使用 kafka-console-producer.sh 调用,是一个命令行工具,允许用户向 Kafka 主题生产消息。此实用程序是 Kafka 分发包的一部分,提供了一种直接的方式向主题发送消息以进行测试和调试。

Kafka CLI and Tools

基本用法

Kafka 控制台生产者 CLI 使用 kafka-console-producer.sh 脚本调用。基本语法是

  • --broker-list [broker_address]: 指定要连接的 Kafka 代理,通常是 localhost:9092。
  • --topic [topic_name]: 指定将消息发送到的主题。
  • [additional_options]: 用于自定义生产者行为的各种选项。

关键选项和示例

生产消息

要向主题生产消息,可以使用以下命令

运行此命令后,您可以在终端中键入消息,每行将作为单独的消息发送到指定的主题。

为消息指定键

Kafka 消息可以有键。要为每条消息指定键,请使用 --property 选项,其中包含 parse.key=true 并定义键分隔符

例如,键入 key1:value1 将发送一条消息,其中 key1 作为键,value1 作为值。

指定分区

默认情况下,消息使用默认分区策略分布到可用的分区中。此命令确保所有消息都发送到主题的分区 0。要将消息发送到特定分区,请使用 --partition 选项

配置确认

确认确保消息可靠地发送到 Kafka 集群。使用 acks 属性配置此项

acks 属性可以设置为 0(无确认)、1(仅来自领导者的确认)或 all(来自所有副本的确认)。

高级配置

自定义序列化器

默认情况下,Kafka 控制台生产者 CLI 对键和值都使用 StringSerializer。要使用自定义序列化器,请指定序列化器类

您可以根据需要将 StringSerializer 替换为其他序列化器。

配置压缩

为了优化性能并降低存储成本,Kafka 支持消息压缩。您可以使用 compression.type 属性配置压缩

Kafka 支持多种压缩类型,包括 gzip、snappy、lz4 和 zstd。

批处理大小和停留时间

Kafka 可以在将多条消息发送到代理之前将其批量处理。这提高了吞吐量和效率。使用 batch.size 和 linger.ms 属性配置批处理大小和停留时间

  • batch.size 定义了批处理的最大大小。
  • linger.ms 设置发送批处理之前等待的最长时间。

实际用例

测试和调试

Kafka 控制台生产者 CLI 对于测试和调试 Kafka 配置非常宝贵。通过手动向主题生产消息,您可以验证消费者是否正确处理它们。

用于开发的数据注入

在开发过程中,您可能需要将测试数据注入 Kafka 主题。Kafka 控制台生产者 CLI 提供了一种简单的方法,无需编写自定义生产者应用程序即可完成此操作。

监控和故障排除

对于监控和故障排除,Kafka 控制台生产者 CLI 允许您快速生产消息并检查 Kafka 主题和代理的运行状况。

如何使用 CLI 消费 Kafka 主题中的数据?

Kafka CLI and Tools

使用命令行接口 (CLI) 消费 Kafka 主题中的数据既简单又适用于测试、调试和监控目的。以下是使用 Kafka 控制台消费者 CLI (kafka-console-consumer.sh) 消费 Kafka 主题中数据的分步指南

前提条件

在开始之前,请确保已安装并运行 Apache Kafka。如果没有,请按照 Apache Kafka 提供的 Kafka 安装说明进行操作。

从 Kafka 主题消费数据的步骤

1. 导航到 Kafka 目录

打开终端并导航到您的 Kafka 安装目录。

2. 启动 Kafka 消费者

使用 kafka-console-consumer.sh 脚本开始从 Kafka 主题消费消息。基本命令结构是

  • <topic_name>: 将此替换为您要消费的 Kafka 主题的名称。
  • <broker_list>: 将此替换为 Kafka 代理(引导服务器)列表,格式为 host1:port1,host2:port2。

例如,要从名为 my-topic 的主题消费消息,其中 Kafka 代理在 localhost 和端口 9092 上运行

3. 消费消息

运行命令后,Kafka 控制台消费者 CLI 将开始从指定主题消费消息。消息将在收到时显示在终端窗口中。

附加选项

从开头消费

默认情况下,消费者从主题中的最新偏移量开始读取消息。要从主题的开头消费消息,请添加 --from-beginning 标志

指定消费者组

消费者可以属于一个消费者组,这可以实现组管理和负载平衡。使用 --group 标志指定消费者组

指定键和值反序列化器

默认情况下,Kafka 控制台消费者 CLI 假定键和值都是字符串。使用 --key-deserializer 和 --value-deserializer 标志指定自定义反序列化器

从特定分区消费

要从主题的特定分区消费消息,请使用 --partition 标志

使用偏移量消费

您可以使用 --offset 标志指定开始消费消息的起始偏移量

将 <offset_value> 替换为您要开始消费消息的特定偏移量值。

使用时间戳消费

Kafka 消息带有时间戳。您可以使用 --offset 和 --timestamp 标志指定开始消费消息的起始时间戳

将 <timestamp_value> 替换为时间戳值,将 <offset_value> 替换为您要开始消费消息的特定偏移量值。

Kafka 控制台消费者 CLI

Kafka 控制台消费者 CLI (kafka-console-consumer.sh) 是 Apache Kafka 提供的用于从 Kafka 主题消费消息的强大命令行工具。它对于直接从命令行测试、调试和监控 Kafka 主题特别有用,而无需编写自定义消费者应用程序。本指南将介绍 Kafka 控制台消费者 CLI 的基本用法、主要功能和一些高级选项。

基本用法

要使用 Kafka 控制台消费者 CLI,请遵循以下基本步骤

  1. 导航到 Kafka 目录: 打开终端并导航到您的 Kafka 安装目录。
  2. 启动消费者: 使用 kafka-console-consumer.sh 脚本开始从 Kafka 主题消费消息。基本命令结构是

将 <topic_name> 替换为您要消费的 Kafka 主题的名称,将 <broker_list> 替换为 Kafka 代理(引导服务器)列表,格式为 host1:port1,host2:port2。

例如

主要特点

1. 从开头消费

默认情况下,消费者从主题中的最新偏移量开始消费消息。要从主题的开头消费消息,请使用 --from-beginning 标志

2. 指定消费者组

消费者可以属于一个消费者组,这可以实现负载平衡和容错。使用 --group 标志指定消费者组

3. 指定反序列化器

默认情况下,Kafka 控制台消费者 CLI 假定键和值都是字符串。您可以使用 --key-deserializer 和 --value-deserializer 标志指定自定义反序列化器。例如

4. 消费特定分区

要从主题的特定分区消费消息,请使用 --partition 标志

高级选项

1. 使用偏移量消费

您可以使用 --offset 标志指定开始消费消息的起始偏移量

将 <offset_value> 替换为您要开始消费消息的特定偏移量值。

2. 使用时间戳消费

Kafka 消息带有时间戳。您可以使用 --offset 和 --timestamp 标志指定开始消费消息的起始时间戳

将 <timestamp_value> 替换为时间戳值,将 <offset_value> 替换为您要开始消费消息的特定偏移量值。

示例

示例 1:从消费者组的开头开始消费

示例 2:使用自定义反序列化器从特定分区消费