在 Java 中创建 Kafka 消费者2025年2月12日 | 阅读 6 分钟 在上一个章节,我们学习了如何在 Java 中创建生产者。在本章节,我们将学习如何在 Java 中实现 Kafka 消费者。 以下是创建消费者的步骤:
让我们讨论每个步骤,以学习在 Java 中实现消费者。 创建 LoggerLogger 用于在程序执行期间写入日志消息。用户需要创建一个 Logger 对象,该对象需要导入 'org.slf4j class'。以下快照显示了 Logger 的实现 ![]() 创建消费者属性与生产者属性类似,Apache Kafka 也提供了各种不同的属性来创建消费者。要了解每个消费者属性,请访问 Apache Kafa>Documentation>Configuration>Consumer Configs 的官方网站。在这里,我们将列出消费者的必需属性,例如 key.deserializer: 它是键的 Deserializer 类,用于实现 'org.apache.kafka.common.serialization.Deserializer' 接口。 value.deserializer: 它是值的 Deserializer 类,用于实现 'org.apache.kafka.common.serialization.Desrializer' 接口。 bootstrap.servers: 它是主机/端口对的列表,用于建立与 Kafka 集群的初始连接。它不包含客户端所需的全部服务器集。只需要引导所需的服务器。 group.id: 它是唯一标识消费者组的消费者的字符串。当消费者使用基于 Kafka 的偏移量管理策略或通过订阅主题进行组管理功能时,需要此属性。 auto.offset.reset: 当不存在初始偏移量或服务器上不再存在当前偏移量时,需要此属性。以下值用于重置偏移量值 earliest: 此偏移量变量自动将其值重置为最早的偏移量。 latest: 此偏移量变量将偏移量值重置为最新的偏移量。 none: 如果找不到先前组的先前偏移量,则会向消费者抛出异常。 其他任何内容: 它会向消费者抛出异常。 注意: 在我们的代码中,我们使用了“earliest”变量将值重置为最早的值。这些是实现消费者所需的一些基本属性。 让我们使用 IntelliJ IDEA 实现它。 步骤 1) 定义一个新的 Java 类,名为 'Consumer1.java'。 步骤 2) 在类中描述消费者属性,如下面的快照所示 ![]() 在快照中,描述了所有必需的属性。 创建消费者创建 KafkaConsumer 对象以创建消费者,如下所示 ![]() 在创建消费者时传递了上面描述的属性。 订阅消费者要从主题读取消息,我们需要将消费者连接到指定的主题。可以通过各种订阅 API 订阅消费者。 在这里,我们使用了 Arrays.asList(),因为用户可能希望订阅一个或多个主题。 因此,Arrays.asList() 允许消费者订阅多个主题。 以下代码显示了消费者订阅的实现 ![]() 用户需要直接或通过字符串变量指定主题名称才能读取消息。 也可以有多个主题,以逗号分隔。 轮询新数据消费者通过轮询方法从 Kafka 读取数据。 ![]() 轮询方法返回从当前分区偏移量获取的数据。指定的时间段内等待数据,否则向消费者返回一个空的 ConsumerRecord。 此外,logger 将获取记录键、分区、记录偏移量及其值。 创建 Java 消费者的完整代码如下 文件名:Consumer1.java 这样,消费者可以通过依次执行每个步骤来读取消息。 消费者实现的输出可以在下面的快照中看到 ![]() 键值为 null。 这是因为我们之前没有指定任何键。 由于 'earliest',将显示从头开始的所有消息。 在消费者组中读取数据用户可以有多个消费者一起读取数据。 这可以通过消费者组来完成。 在消费者组中,一个或多个消费者将能够从 Kafka 读取数据。 如果用户想要从头开始读取消息,请重置 group_id 或更改 group_id。 这将重置用户的应用程序,并将从头开始显示消息。 消费者的优雅关闭在当前的实现中,Kafka 消费者使用 while(true) 在无限循环中持续处理消息。 为了确保优雅终止,可以使用 consumer.wakeup() 中断消费者,这会导致它在下次调用 .poll() 方法期间抛出 WakeupException。 此 WakeupException 不需要特定的处理,因为它的目的是中断循环。 相反,清理活动集中在 finally 块中,其中调用了 consumer.close()。 此方法调用至关重要,因为它确保适当地提交任何未提交的偏移量,并干净地终止与 Kafka 代理的连接。 为了有效地触发 consumer.wakeup(),使用了关闭钩子。 此钩子至关重要,因为它确保 wakeup() 方法响应终止信号(如 CTRL+C)而被调用。 此外,关闭钩子已链接到主线程。 此链接允许程序等待所有处理线程在应用程序完全关闭之前完成其任务,从而确保干净有序的关闭。 文件名 输出 [Thread-0] INFO io.conduktor.demos.kafka.ConsumerDemo - Detected a shutdown, let's exit by calling consumer.wakeup()... [main] INFO io.conduktor.demos.kafka.ConsumerDemo - Wake up exception! [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fifth-application-1, groupId=my-fifth-application] Revoke previously assigned partitions demo_java-0, demo_java-1, demo_java-2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fifth-application-1, groupId=my-fifth-application] Member consumer-my-fifth-application-1-4498399b-0c5d-45a9-958f-8b31f124a3d9 sending LeaveGroup request to coordinator 127.0.0.1:9094 (id: 2147483644 rack: null) due to the consumer is being closed [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fifth-application-1, groupId=my-fifth-application] Resetting generation due to: consumer pro-actively leaving the group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-fifth-application-1, groupId=my-fifth-application] Request joining group due to: consumer pro-actively leaving the group [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-my-fifth-application-1 unregistered [main] INFO io.conduktor.demos.kafka.ConsumerDemo - The consumer is now gracefully closed. 下一个主题Kafka 实时示例 |
我们请求您订阅我们的新闻通讯以获取最新更新。