用 Java 创建 Kafka 生产者17 Mar 2025 | 5 分钟阅读 在上一节中,我们学习了创建 Kafka 项目的基本步骤。现在,在用 Java 创建 Kafka 生产者之前,我们需要定义必要的项目依赖项。在我们的项目中,将需要两个依赖项
设置依赖项需要以下步骤 步骤 1: 构建工具 Maven 包含一个“pom.xml”文件。“pom.xml”是一个默认的 XML 文件,其中包含有关 GroupID、ArtifactID 以及版本值的所有信息。用户需要在“pom.xml”文件中定义所有必要的项目依赖项。转到“pom.xml”文件。 步骤 2: 首先,我们需要定义 Kafka 依赖项。在其中创建一个“<dependencies>...</dependencies>”块,我们将在其中定义所需的依赖项。 步骤 3: 现在,打开一个网络浏览器并搜索“Kafka Maven”,如下所示 ![]() 单击突出显示的链接并选择“Apache Kafka,Kafka-Clients”存储库。下面显示了一个示例 ![]() 步骤 4: 根据系统上下载的 Kafka 版本选择存储库版本。例如,在本教程中,我们使用“Apache Kafka 2.3.0”。因此,我们需要存储库版本 2.3.0(突出显示的版本)。 ![]() 步骤 5: 单击存储库版本后,将打开一个新窗口。从那里复制依赖项代码。 ![]() 由于我们正在使用 Maven,请复制 Maven 代码。如果用户正在使用 Gradle,请复制 Gradle 编写的代码。 步骤 6: 将复制的代码粘贴到“<dependencies>...</dependencies>”块中,如下所示 ![]() 如果版本号显示为红色,则表示用户错过了启用“自动导入”选项。如果是这样,请转到 查看>工具窗口>Maven。屏幕右侧将出现一个 Maven 项目窗口。单击显示在那里的“刷新”按钮。这将启用错过的自动导入 Maven 项目。如果颜色变为黑色,则表示错过的依赖项已下载。用户可以继续下一步。 步骤 7: 现在,打开网络浏览器并搜索“SL4J Simple”,然后打开下面快照中显示的突出显示的链接 ![]() 将出现一堆存储库。单击适当的存储库。 ![]() 要了解适当的存储库,请查看 Maven 项目窗口,并在“依赖项”下查看 slf4j 版本。 ![]() 单击相应的版本并复制代码,然后粘贴到“pom.xml”文件中 Kafka 依赖项下方,如下所示 ![]() 注意:在代码中放置注释或删除 <scope> test</scope> 标记行。因为此作用域标记定义了依赖项的有限作用域,并且我们需要将此依赖项用于所有代码,因此不应限制该作用域。现在,我们已经设置了所有必需的依赖项。让我们尝试“简单的 Hello World”示例。 首先,创建一个 java 包,例如“com.firstgroupapp.aktutorial”及其下的一个 java 类。创建 java 包时,请遵循包命名约定。最后,创建“hello world”程序。 ![]() 执行“producer1.java”文件后,输出将成功显示为“Hello World”。这说明 IntelliJ IDEA 运行成功。 创建 Java 生产者基本上,创建 Java 生产者有四个步骤,如前所述
创建生产者属性Apache Kafka 提供了各种用于创建生产者的 Kafka 属性。要了解每个属性,请访问 Apache 的官方网站,即“https://kafka.apache.org”。转到 Kafka>文档>配置>生产者配置。 在那里,用户可以了解 Apache Kafka 提供的所有生产者属性。在这里,我们将讨论所需的属性,例如
现在,让我们看看 IntelliJ IDEA 中生产者属性的实现。 ![]() 当我们创建属性时,它将“java.util.Properties”导入到代码中。 因此,以这种方式,创建生产者属性的第一步就完成了。 创建生产者要创建 Kafka 生产者,我们只需要创建 KafkaProducer 的一个对象。 KafkaProducer 的对象可以创建为 这里,“first_producer”是我们选择的生产者的名称。用户可以相应地选择。 让我们在下面的快照中看看 ![]() 创建生产者记录为了将数据发送到 Kafka,用户需要创建一个 ProducerRecord。这是因为所有生产者都位于生产者记录中。在这里,生产者指定了主题名称以及要传递给 Kafka 的消息。 可以创建 ProducerRecord,如下所示 这里,“record”是为创建生产者记录选择的名称,“my_first”是主题名称,“Hye Kafka”是消息。用户可以相应地选择。 让我们在下面的快照中看看 ![]() 发送数据现在,用户已准备好将数据发送到 Kafka。生产者只需要调用 ProducerRecord 的对象,如下所示 让我们在下面的快照中看看 ![]() 要了解上述代码的输出,请使用以下命令在 CLI 上打开“kafka-console-consumer” “kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic my_first -group first_app” 生产者产生的数据是异步的。因此,需要两个附加函数,即 flush() 和 close()(如上图所示)。flush() 将强制生成所有数据,而 close() 将停止生产者。如果未执行这些功能,则数据将永远不会发送到 Kafka,并且使用者将无法读取它。 下面显示了使用者控制台上代码的输出,如下所示 ![]() ![]() 在终端上,用户可以看到各种日志文件。终端上的最后一行显示 Kafka 生产者已关闭。因此,消息以异步方式显示在使用者控制台上。 下一主题Kafka 生产者回调 |
我们请求您订阅我们的新闻通讯以获取最新更新。