向 Kafka 主题发送数据

2025年3月17日 | 阅读 3 分钟

Kafka 控制台生产者

为了向 Kafka 主题发送数据,需要一个生产者。生产者的作用是向 Kafka 主题发送或写入数据/消息。

在本节中,我们将学习生产者如何向 Kafka 主题发送消息。

启动生产者需要以下步骤

步骤 1: 启动 zookeeper 和 Kafka 服务器。

步骤 2: 在命令行中键入命令:'kafka-console-producer'。这将帮助用户从标准输入中读取数据并将其写入 Kafka 主题。

注意: 根据操作系统选择 '.bat' 或 '.sh'。

Sending data to Kafka Topics
Sending data to Kafka Topics

突出显示的文本表示需要 'broker-list' 和 'topic id' 才能生成消息。这是因为生产者必须知道要将数据写入的主题的 ID。

步骤 3: 了解所有要求后,尝试使用以下命令向主题生成消息

'kafka-console-producer -broker-list localhost:9092 -topic <topic_name>'。按 Enter 键。

Sending data to Kafka Topics

注意: 在这里,9092 是 Kafka 服务器的端口号。

在这里,选择“myfirst”主题来写入消息。

新行中将出现一个“>”。开始生成一些消息,如下所示

Sending data to Kafka Topics

步骤 4: 按 'Ctrl+c' 并按 'Y' 键退出。

因此,通过这种方式,生产者可以向 Kafka 主题生成/发送多条消息。

带有键的生产者

Kafka 生产者可以使用或不使用键将数据写入主题。如果生产者没有指定键,则数据将存储到任何 key=null 的分区中,否则数据将仅存储到指定的分区中。需要一个 'parse.key' 和一个 'key.seperator' 来为主题指定一个键。使用的命令是

在这里,key 是特定的分区,value 是生产者要写入主题的消息。

当主题不存在时?

假设生产者要向一个尚未存在的新主题发送消息。在这种情况下,生成消息后,将出现一个警告,如以下快照所示。这只是一个警告。

Sending data to Kafka Topics

为什么会出现此警告?

出现警告是因为之前主题“demo”不存在。但是,当生产者写入一条消息时,Kafka 设法创建了该主题。尽管,对于这个意外的主题,没有进行任何领导者选举,但可以看到“LEADER_NOT_AVAILABLE”错误。但是,下一次,生产者可以继续写入更多消息,因为不会再次出现警告。这是因为该主题现在出现在现有列表中。

用户可以使用 '-list' 命令进行检查,如下所示

Sending data to Kafka Topics

可以在列表中看到主题“demo”。

描述新主题

像这样,由生产者直接创建的主题会抓取默认的分区数,其复制因子为 1。

例如:

Sending data to Kafka Topics

使用 '-describe' 命令描述主题“demo”时,会给出“PartitionCount”和“ReplicationFactor”的值为 1(默认值)。因此,在向主题生成消息之前创建一个主题始终是一个更好的选择。

更改默认值

按照以下步骤更改新主题的默认值

  1. 使用 Notepad++ 或任何其他文本编辑器打开 'server.properties' 文件。
  2. 将 num.partitions=1 的值编辑为一个新值。设为 3。因此,每当引入此类新主题时,PartitionCount 和 ReplicationFactor 的数量将为 3(无论用户设置什么)。
  3. 保存文件。

但是,始终先创建主题。