用 Java 创建 Kafka 生产者

17 Mar 2025 | 5 分钟阅读

在上一节中,我们学习了创建 Kafka 项目的基本步骤。现在,在用 Java 创建 Kafka 生产者之前,我们需要定义必要的项目依赖项。在我们的项目中,将需要两个依赖项

  1. Kafka 依赖项
  2. 日志记录依赖项,即 SLF4J Logger。

设置依赖项需要以下步骤

步骤 1: 构建工具 Maven 包含一个“pom.xml”文件。“pom.xml”是一个默认的 XML 文件,其中包含有关 GroupID、ArtifactID 以及版本值的所有信息。用户需要在“pom.xml”文件中定义所有必要的项目依赖项。转到“pom.xml”文件。

步骤 2: 首先,我们需要定义 Kafka 依赖项。在其中创建一个“<dependencies>...</dependencies>”块,我们将在其中定义所需的依赖项。

步骤 3: 现在,打开一个网络浏览器并搜索“Kafka Maven”,如下所示

Creating Kafka Producer in Java

单击突出显示的链接并选择“Apache Kafka,Kafka-Clients”存储库。下面显示了一个示例

Creating Kafka Producer in Java

步骤 4: 根据系统上下载的 Kafka 版本选择存储库版本。例如,在本教程中,我们使用“Apache Kafka 2.3.0”。因此,我们需要存储库版本 2.3.0(突出显示的版本)。

Creating Kafka Producer in Java

步骤 5: 单击存储库版本后,将打开一个新窗口。从那里复制依赖项代码。

Creating Kafka Producer in Java

由于我们正在使用 Maven,请复制 Maven 代码。如果用户正在使用 Gradle,请复制 Gradle 编写的代码。

步骤 6: 将复制的代码粘贴到“<dependencies>...</dependencies>”块中,如下所示

Creating Kafka Producer in Java

如果版本号显示为红色,则表示用户错过了启用“自动导入”选项。如果是这样,请转到 查看>工具窗口>Maven。屏幕右侧将出现一个 Maven 项目窗口。单击显示在那里的“刷新”按钮。这将启用错过的自动导入 Maven 项目。如果颜色变为黑色,则表示错过的依赖项已下载。用户可以继续下一步。

步骤 7: 现在,打开网络浏览器并搜索“SL4J Simple”,然后打开下面快照中显示的突出显示的链接

Creating Kafka Producer in Java

将出现一堆存储库。单击适当的存储库。

Creating Kafka Producer in Java

要了解适当的存储库,请查看 Maven 项目窗口,并在“依赖项”下查看 slf4j 版本。

Creating Kafka Producer in Java

单击相应的版本并复制代码,然后粘贴到“pom.xml”文件中 Kafka 依赖项下方,如下所示

Creating Kafka Producer in Java

注意:在代码中放置注释或删除 <scope> test</scope> 标记行。因为此作用域标记定义了依赖项的有限作用域,并且我们需要将此依赖项用于所有代码,因此不应限制该作用域。

现在,我们已经设置了所有必需的依赖项。让我们尝试“简单的 Hello World”示例。

首先,创建一个 java 包,例如“com.firstgroupapp.aktutorial”及其下的一个 java 类。创建 java 包时,请遵循包命名约定。最后,创建“hello world”程序。

Creating Kafka Producer in Java

执行“producer1.java”文件后,输出将成功显示为“Hello World”。这说明 IntelliJ IDEA 运行成功。

创建 Java 生产者

基本上,创建 Java 生产者有四个步骤,如前所述

  1. 创建生产者属性
  2. 创建生产者
  3. 创建生产者记录
  4. 发送数据。

创建生产者属性

Apache Kafka 提供了各种用于创建生产者的 Kafka 属性。要了解每个属性,请访问 Apache 的官方网站,即“https://kafka.apache.org”。转到 Kafka>文档>配置>生产者配置。

在那里,用户可以了解 Apache Kafka 提供的所有生产者属性。在这里,我们将讨论所需的属性,例如

  1. bootstrap.servers: 这是用于建立与 Kafka 集群的初始连接的端口对列表。用户只能使用引导服务器进行初始连接。此服务器以 host:port, host:port,... 的形式存在。
  2. key.serializer: 它是键的序列化程序类的类型,用于实现“org.apache.kafka.common.serialization.Serializer”接口。
  3. value.serializer: 它是序列化程序类的类型,用于实现“org.apache.kafka.common.serialization.Serializer”接口。

现在,让我们看看 IntelliJ IDEA 中生产者属性的实现。

Creating Kafka Producer in Java

当我们创建属性时,它将“java.util.Properties”导入到代码中。

因此,以这种方式,创建生产者属性的第一步就完成了。

创建生产者

要创建 Kafka 生产者,我们只需要创建 KafkaProducer 的一个对象。

KafkaProducer 的对象可以创建为

这里,“first_producer”是我们选择的生产者的名称。用户可以相应地选择。

让我们在下面的快照中看看

Creating Kafka Producer in Java

创建生产者记录

为了将数据发送到 Kafka,用户需要创建一个 ProducerRecord。这是因为所有生产者都位于生产者记录中。在这里,生产者指定了主题名称以及要传递给 Kafka 的消息。

可以创建 ProducerRecord,如下所示

这里,“record”是为创建生产者记录选择的名称,“my_first”是主题名称,“Hye Kafka”是消息。用户可以相应地选择。

让我们在下面的快照中看看

Creating Kafka Producer in Java

发送数据

现在,用户已准备好将数据发送到 Kafka。生产者只需要调用 ProducerRecord 的对象,如下所示

让我们在下面的快照中看看

Creating Kafka Producer in Java

要了解上述代码的输出,请使用以下命令在 CLI 上打开“kafka-console-consumer

kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic my_first -group first_app

生产者产生的数据是异步的。因此,需要两个附加函数,即 flush()close()(如上图所示)。flush() 将强制生成所有数据,而 close() 将停止生产者。如果未执行这些功能,则数据将永远不会发送到 Kafka,并且使用者将无法读取它。

下面显示了使用者控制台上代码的输出,如下所示

Creating Kafka Producer in Java
Creating Kafka Producer in Java

在终端上,用户可以看到各种日志文件。终端上的最后一行显示 Kafka 生产者已关闭。因此,消息以异步方式显示在使用者控制台上。