Kafka 生产者回调

2025年3月17日 | 阅读 3 分钟

没有键的生产者

在上一节中,我们看到了生产者如何将数据发送到 Kafka。为了更深入地理解,即数据是否正确生成、在哪里生成、关于其偏移量和分区值等。让我们学习更多。

为了执行回调,用户需要实现一个回调函数。此函数用于异步处理请求完成。这就是为什么它的返回类型将是 void。此函数将在生产者将数据发送到 Kafka 的块中实现。不需要更改其他代码块。

生产者使用的回调函数是 onCompletion()。基本上,此方法需要两个参数

记录的元数据: 记录的元数据意味着获取有关分区及其偏移量的信息。如果它不为 null,则会抛出错误。

异常: 处理时可能抛出以下异常

1) 可重试异常: 此异常表示消息可能已发送。

2) 不可重试异常: 此异常抛出错误,表示消息将永远不会被发送。

让我们在下面的快照中查看生产者回调的实现

Kafka Producer Callbacks

已创建一个 'Logger' 对象,它允许导入 'slf4j.Logger' 和 'slf4j.LoggerFactory'。此 logger 对象将记录有关分区、偏移量以及时间戳的信息。如果异常值等于 null,则 logger 将显示信息,否则将显示错误。当执行上述代码时,用户将知道主题名称、分区号、时间戳、消息发送的偏移量值。

下面显示了输出的快照

Kafka Producer Callbacks

在上面的输出中,可以看到消息被生成到 'my_first',存储在 '分区 0',偏移量值为 9'。

注意: 我们到目前为止发送的消息没有键,因此没有键的消息存储在随机分区中并异步运行。

带有键的生产者

当用户希望将消息发送到同一分区时,键变得有用。为了发送数据,用户需要指定一个键。该键将唯一地标识该分区与其他分区。用户需要将同步消息发送到 Kafka。

下面显示了一种实现键的方法

Kafka Producer Callbacks
Kafka Producer Callbacks

在上面的快照中,我们指定了主题名称、其值和键。在创建 ProducerRecord 时,将它们三个作为参数传递。如果异常 'e' 将等于 null,则 logger 将获取有关 key 的信息。最后,当数据发送到 Kafka 时,使用一个 get() 函数。此方法同步且强制地发送数据。用户可以尝试自己的方法来实现键。

注意: 使用 get(),会出现一条红色的下划线。按 alt+enter,它会说“将异常添加到方法签名”,选择它。这将在 main() 中添加两个异常,如上所示。此外,它会将 'java.util.concurrent.ExecutionException' 导入到代码中。

当执行上述代码时,输出将显示为

Kafka Producer Callbacks

输出中突出显示的部分说明了键值、主题名称、分区号、偏移量值以及时间戳。消息 'OneTwo' 现在将始终转到指定的分区。

因此,通过这种方式,生产者可以发送带有和不带有键的数据到 Kafka。