Kafka CLI 和工具2025 年 1 月 23 日 | 阅读 16 分钟 ![]() Apache Kafka 是一个分布式流媒体平台,用于构建实时数据管道和流应用程序。它旨在处理高吞吐量、容错性和可伸缩性。Kafka Consumer CLI(命令行接口)是一个用于从 Kafka 主题消费消息的工具。以下是其关键组件和功能的概述 Kafka Consumer CLI 关键组件
Kafka Consumer CLI 功能
基本的 Kafka Consumer CLI 命令1. 启动 Kafka 消费者 2. 从特定偏移量消费消息 3. 作为消费者组的一部分消费消息 4. 使用键值反序列化消费消息 Kafka 主题 CLIKafka Topics CLI 是 Apache Kafka 提供的命令行接口工具,旨在有效管理 Kafka 主题。Kafka 主题是 Kafka 中必不可少的组件,充当消息发布和存储的类别或通道。Kafka Topics CLI 使管理员和开发人员能够对这些主题执行各种操作,例如创建、列出、描述、修改和删除它们。 ![]() 主要功能
Kafka CLI 工具Kafka 提供了一套命令行接口 (CLI) 工具,用于与 Kafka 集群、主题、生产者和消费者进行交互和管理。这些工具对于 Kafka 管理员和开发人员高效管理 Kafka 操作和调试问题至关重要。 ![]() Kafka CLI 工具包含在 Kafka 分发包中,并提供各种命令来与 Kafka 生态系统的不同组件进行交互。这些工具根据其功能分为几类
1. Apache Kafka 中的主题管理工具Apache Kafka 提供了一套用于管理 Kafka 主题的命令行接口 (CLI) 工具。这些工具使用户能够在 Kafka 集群中创建、列出、描述、修改和删除主题。正确的主题管理对于优化 Kafka 的性能、可靠性和可伸缩性至关重要。本节全面概述 Kafka 的主题管理工具,主要关注 kafka-topics.sh。 kafka-topics.shkafka-topics.sh 脚本是管理 Kafka 主题的主要工具。它提供了各种命令来处理主题生命周期和配置。 下面是其功能的详细说明 创建主题 创建主题是 Kafka 中的基本操作之一。创建主题时,可以指定分区数量和复制因子。 命令 参数
示例 kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 5 --topic my_topic 此命令创建一个名为 my_topic 的主题,该主题具有 5 个分区和 3 个复制因子。 列出主题 列出主题提供了一种快速查看 Kafka 集群中所有主题的方法。 命令 参数
示例 此命令列出 Kafka 集群中所有可用的主题。 描述主题 描述主题提供有关主题配置、分区和副本的详细信息。 命令 参数
示例 此命令提供有关 my_topic 的详细信息,包括其分区、副本和配置。 修改主题修改主题允许您更改其配置,例如增加分区数量或修改主题级配置。 增加分区 参数
示例 此命令将 my_topic 中的分区数量增加到 10。 修改配置参数
示例 此命令将 my_topic 的保留时间设置为 2 小时。 删除主题删除主题会将其从 Kafka 集群中删除。此操作不可逆,因此应谨慎执行。 命令 参数
示例 此命令从 Kafka 集群中删除 my_topic。 详细示例和用例使用自定义配置创建主题 有时您可能需要使用特定配置创建主题,例如自定义保留策略或压缩类型。 命令 此命令创建一个名为 custom_topic 的主题,该主题具有 3 个分区、2 个复制因子、1 天的保留策略(86400000 毫秒),并使用 gzip 压缩消息。 描述多个主题您可以通过在一个命令中列出多个主题来一次描述它们。 命令 此命令提供有关 topic1、topic2 和 topic3 的详细信息。 修改主题的保留配置 更改主题的保留策略有助于管理 Kafka 使用的存储空间。 命令 此命令将 my_topic 的保留策略设置为 12 小时(43200000 毫秒)。 检查主题是否存在 在对主题执行操作之前,检查主题是否存在通常很有用。 命令 此命令列出所有主题并过滤输出以检查 my_topic 是否存在。 2. 数据生产工具:kafka-console-producer.shkafka-console-producer.sh 工具用于从命令行向 Kafka 主题生产消息。此工具对于测试和开发目的非常方便,用户可以手动输入消息或与脚本集成以将数据发送到 Kafka。 基本用法要开始向 Kafka 主题生产消息,请使用以下命令格式 afka-console-producer.sh --broker-list <broker_list> --topic <topic_name>
示例 消息输入生产者启动后,它进入交互模式,可以直接在控制台中键入消息。在控制台中输入的每一行文本都将被视为单独的消息并发送到指定的 Kafka 主题。 附加选项
此示例设置消息的键和值分隔符。
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic --producer-property acks=all 这确保消息在确认之前成功写入所有副本。
3. 数据消费工具:kafka-console-consumer.shkafka-console-consumer.sh 工具用于从 Kafka 主题消费消息并将其打印到控制台。它支持各种消费模式,对于监控和调试 Kafka 数据流至关重要。 ![]() 基本用法要开始从 Kafka 主题消费消息,请使用以下命令格式
示例 消息输出消费者启动后,它会不断从指定主题中获取消息并将其打印到控制台。每条消息都包含键、值、分区和偏移量,从而提供对消息流的洞察。 附加选项
kafka-configs.shkafka-configs.sh 工具用于管理 Kafka 实体的配置,例如主题、代理和客户端。它允许用户动态查看、修改、添加和删除配置。 查看配置 要查看 Kafka 实体(主题、代理、客户端)的配置,请使用 --describe 命令
示例(描述主题配置) 修改配置 要修改 Kafka 实体的配置,请使用 --alter 命令
示例(修改主题配置) 删除配置 要删除 Kafka 实体的配置,请使用 --alter 命令和 --delete-config 选项
示例(删除主题配置) 配置管理最佳实践
用例设置代理配置 要配置 Kafka 代理,您可以动态修改日志保留设置或安全配置等属性 管理主题配置 对于主题,您可能会根据数据保留策略调整保留期限或段大小 客户端配置 管理 Kafka 客户端的配置以优化性能或调整安全设置 4. Kafka 集群管理工具Kafka 代理管理
集群监控和健康检查
扩展和弹性
安全管理
Kafka 控制台生产者 CLIApache Kafka 是一个强大的分布式事件流平台,专为高吞吐量和低延迟数据流而设计。Kafka 提供的关键实用程序之一是 Kafka 控制台生产者 CLI,它允许用户直接从命令行向 Kafka 主题生产消息。此工具对于测试、调试和获取 Kafka 实践经验非常有价值,而无需编写自定义代码。 Kafka 控制台生产者 CLI,通常使用 kafka-console-producer.sh 调用,是一个命令行工具,允许用户向 Kafka 主题生产消息。此实用程序是 Kafka 分发包的一部分,提供了一种直接的方式向主题发送消息以进行测试和调试。 ![]() 基本用法Kafka 控制台生产者 CLI 使用 kafka-console-producer.sh 脚本调用。基本语法是
关键选项和示例生产消息 要向主题生产消息,可以使用以下命令 运行此命令后,您可以在终端中键入消息,每行将作为单独的消息发送到指定的主题。 为消息指定键Kafka 消息可以有键。要为每条消息指定键,请使用 --property 选项,其中包含 parse.key=true 并定义键分隔符 例如,键入 key1:value1 将发送一条消息,其中 key1 作为键,value1 作为值。 指定分区 默认情况下,消息使用默认分区策略分布到可用的分区中。此命令确保所有消息都发送到主题的分区 0。要将消息发送到特定分区,请使用 --partition 选项 配置确认 确认确保消息可靠地发送到 Kafka 集群。使用 acks 属性配置此项 acks 属性可以设置为 0(无确认)、1(仅来自领导者的确认)或 all(来自所有副本的确认)。 高级配置 自定义序列化器 默认情况下,Kafka 控制台生产者 CLI 对键和值都使用 StringSerializer。要使用自定义序列化器,请指定序列化器类 您可以根据需要将 StringSerializer 替换为其他序列化器。 配置压缩 为了优化性能并降低存储成本,Kafka 支持消息压缩。您可以使用 compression.type 属性配置压缩 Kafka 支持多种压缩类型,包括 gzip、snappy、lz4 和 zstd。 批处理大小和停留时间 Kafka 可以在将多条消息发送到代理之前将其批量处理。这提高了吞吐量和效率。使用 batch.size 和 linger.ms 属性配置批处理大小和停留时间
实际用例测试和调试 Kafka 控制台生产者 CLI 对于测试和调试 Kafka 配置非常宝贵。通过手动向主题生产消息,您可以验证消费者是否正确处理它们。 用于开发的数据注入 在开发过程中,您可能需要将测试数据注入 Kafka 主题。Kafka 控制台生产者 CLI 提供了一种简单的方法,无需编写自定义生产者应用程序即可完成此操作。 监控和故障排除 对于监控和故障排除,Kafka 控制台生产者 CLI 允许您快速生产消息并检查 Kafka 主题和代理的运行状况。 如何使用 CLI 消费 Kafka 主题中的数据?![]() 使用命令行接口 (CLI) 消费 Kafka 主题中的数据既简单又适用于测试、调试和监控目的。以下是使用 Kafka 控制台消费者 CLI (kafka-console-consumer.sh) 消费 Kafka 主题中数据的分步指南 前提条件在开始之前,请确保已安装并运行 Apache Kafka。如果没有,请按照 Apache Kafka 提供的 Kafka 安装说明进行操作。 从 Kafka 主题消费数据的步骤1. 导航到 Kafka 目录 打开终端并导航到您的 Kafka 安装目录。 2. 启动 Kafka 消费者 使用 kafka-console-consumer.sh 脚本开始从 Kafka 主题消费消息。基本命令结构是
例如,要从名为 my-topic 的主题消费消息,其中 Kafka 代理在 localhost 和端口 9092 上运行 3. 消费消息 运行命令后,Kafka 控制台消费者 CLI 将开始从指定主题消费消息。消息将在收到时显示在终端窗口中。 附加选项 从开头消费 默认情况下,消费者从主题中的最新偏移量开始读取消息。要从主题的开头消费消息,请添加 --from-beginning 标志 指定消费者组 消费者可以属于一个消费者组,这可以实现组管理和负载平衡。使用 --group 标志指定消费者组 指定键和值反序列化器 默认情况下,Kafka 控制台消费者 CLI 假定键和值都是字符串。使用 --key-deserializer 和 --value-deserializer 标志指定自定义反序列化器 从特定分区消费 要从主题的特定分区消费消息,请使用 --partition 标志 使用偏移量消费 您可以使用 --offset 标志指定开始消费消息的起始偏移量 将 <offset_value> 替换为您要开始消费消息的特定偏移量值。 使用时间戳消费 Kafka 消息带有时间戳。您可以使用 --offset 和 --timestamp 标志指定开始消费消息的起始时间戳 将 <timestamp_value> 替换为时间戳值,将 <offset_value> 替换为您要开始消费消息的特定偏移量值。 Kafka 控制台消费者 CLIKafka 控制台消费者 CLI (kafka-console-consumer.sh) 是 Apache Kafka 提供的用于从 Kafka 主题消费消息的强大命令行工具。它对于直接从命令行测试、调试和监控 Kafka 主题特别有用,而无需编写自定义消费者应用程序。本指南将介绍 Kafka 控制台消费者 CLI 的基本用法、主要功能和一些高级选项。 基本用法要使用 Kafka 控制台消费者 CLI,请遵循以下基本步骤
将 <topic_name> 替换为您要消费的 Kafka 主题的名称,将 <broker_list> 替换为 Kafka 代理(引导服务器)列表,格式为 host1:port1,host2:port2。 例如 主要特点1. 从开头消费 默认情况下,消费者从主题中的最新偏移量开始消费消息。要从主题的开头消费消息,请使用 --from-beginning 标志 2. 指定消费者组 消费者可以属于一个消费者组,这可以实现负载平衡和容错。使用 --group 标志指定消费者组 3. 指定反序列化器 默认情况下,Kafka 控制台消费者 CLI 假定键和值都是字符串。您可以使用 --key-deserializer 和 --value-deserializer 标志指定自定义反序列化器。例如 4. 消费特定分区 要从主题的特定分区消费消息,请使用 --partition 标志 高级选项1. 使用偏移量消费 您可以使用 --offset 标志指定开始消费消息的起始偏移量 将 <offset_value> 替换为您要开始消费消息的特定偏移量值。 2. 使用时间戳消费 Kafka 消息带有时间戳。您可以使用 --offset 和 --timestamp 标志指定开始消费消息的起始时间戳 将 <timestamp_value> 替换为时间戳值,将 <offset_value> 替换为您要开始消费消息的特定偏移量值。 示例示例 1:从消费者组的开头开始消费 示例 2:使用自定义反序列化器从特定分区消费 |
我们请求您订阅我们的新闻通讯以获取最新更新。