Kafka 偏移量管理14 2025年5月 | 阅读 24 分钟 Kafka 偏移量简介Kafka 偏移量是分区中消息的唯一标识符。Kafka 分区中的每条消息都分配了一个递增的偏移量编号,从 0 开始。当消费者读取消息时,它们会从一个偏移量移到下一个偏移量,偏移量帮助 Kafka 跟踪最后读取的消息,允许消费者根据配置要么从最后一个已提交的偏移量恢复,要么从头开始恢复。 偏移量至关重要,因为 Kafka 是一个分布式系统,旨在处理跨多个消费者和代理的容错和可伸缩性。在这样的系统中,偏移量成为跟踪消息消费状态的基础,特别是对于可能离线或意外崩溃的消费者。 Kafka 偏移量管理涉及的核心组件包括:
Kafka 偏移量类型偏移量是管理 Apache Kafka 中消息消费的关键方面。它们允许消费者跟踪其在消息流中的位置。理解 Kafka 偏移量管理的关键在于区分**已提交偏移量和当前偏移量**,偏移量如何管理(自动或手动),以及消费者组在这些偏移量管理中的作用。 在本节中,我们将详细探讨不同类型的 Kafka 偏移量,包括代码示例,以演示它们在现实场景中的用法。 已提交偏移量与当前偏移量
当前偏移量与已提交偏移量示例这是一个示例,演示了当前偏移量和已提交偏移量之间的区别。在此示例中,我们将从 Kafka 读取消息并打印当前偏移量和已提交偏移量。 在此程序中
输出 ![]() Kafka 提供了两种管理偏移量的机制:**自动偏移量管理和手动偏移量管理**。这些方法决定了何时提交偏移量以及消费者如何处理其在主题中的位置跟踪。 自动偏移量管理在**自动偏移量管理**中,Kafka 根据配置属性以固定的时间间隔自动提交偏移量。这是一种简单的方法,减少了开发人员手动管理偏移量的需求。然而,对于关键任务应用程序,这种方法不太可靠,因为消息可能在完全处理之前就被提交,如果消费者崩溃,可能会导致消息丢失。 通过将 enable.auto.commit 属性设置为 true 来启用自动偏移量管理,提交间隔由 auto.commit.interval.ms 属性控制(默认为 5 秒)。 示例:自动偏移量管理 在此示例中
![]() 手动偏移量管理通过**手动偏移量管理**,开发人员可以完全控制何时提交偏移量。这可以使用 commitSync() 或 commitAsync() 方法完成。手动偏移量管理提供了更高的可靠性和灵活性,因为它确保偏移量仅在成功处理消息后才提交。 示例:手动偏移量管理(同步提交) 输出 ![]() 在此示例中
示例:手动偏移量管理(异步提交) 虽然同步的 commitSync() 方法会阻塞消费者直到偏移量被提交,但 Kafka 还提供了 commitAsync() 方法用于异步偏移量提交。异步提交通过允许消费者在后台提交偏移量的同时继续处理来提高性能。 输出 ![]() 在此示例中
消费者组在管理偏移量中的作用消费者组在 Kafka 的偏移量管理系统中起着至关重要的作用。消费者组是一组共同处理 Kafka 主题数据的消费者。组中的每个消费者负责一部分分区,Kafka 会跟踪每个分区和消费者组组合的已提交偏移量。 关键概念
示例:具有多个消费者的消费者组 输出 ![]() 在此示例中,两个消费者属于同一个消费者组(my-consumer-group)。Kafka 会将 my-topic 的分区分配给这些消费者。每个消费者将仅接收其被分配分区的消息,并且每个分区的已提交偏移量将独立跟踪。 Kafka 中的偏移量工作原理偏移量是 Kafka 架构的核心,它允许消费者了解其在 Kafka 主题分区中读取消息的进度。在本节中,我们将深入探讨 Kafka 中偏移量的作用,涵盖三个关键方面:**分区和偏移量、消费者偏移量跟踪以及偏移量存储和保留。**我们还将为每个概念提供合适的示例和程序。 分区和偏移量Kafka 主题被划分为分区,这些分区分布在 Kafka 集群的代理之间。每个分区都是一个有序的、不可变的记录(消息)序列。分区中的每条记录都分配有一个**偏移量**,这是一个唯一的顺序编号,用作该记录在分区内的标识符。偏移量特定于分区,并表示记录在分区中的位置。 关键概念
将消息生产到 Kafka 分区 在以下程序中,我们将消息生产到已分区的 Kafka 主题,并观察如何在不同分区内的每条消息中分配偏移量。 在此示例中
输出 ![]() 如输出所示,每个分区都有自己的偏移量集,从 0 开始。偏移量在一个分区内是连续的,但在分区之间是独立的。 消费者偏移量跟踪当消费者从 Kafka 主题读取消息时,它使用偏移量来跟踪已处理的消息和待处理的消息。Kafka 消费者通过提交其已消费消息的偏移量来维护其进度。 关键概念
示例:跟踪消费者偏移量 让我们创建一个从主题读取消息并在处理消息后手动提交偏移量的消费者。这使我们能够控制何时提交偏移量。 输出 ![]() 在此示例中
如果消费者崩溃或重启,它将从每个分区的最后一个已提交偏移量恢复,确保不会跳过或丢失任何消息。 偏移量存储和保留Kafka 在一个名为 __consumer_offsets 的内部主题中存储已提交的偏移量。此主题中的每个分区对应于原始 Kafka 主题中的一个分区,并且每个消费者组在该主题中都有自己的条目用于跟踪偏移量。偏移量会保留一段可配置的时间。 关键概念
配置偏移量保留要更改偏移量保留期,可以修改 Kafka 代理中的 offsets.retention.minutes 配置。例如,要将偏移量保留 30 天(43,200 分钟),可以在 server.properties 文件中添加此设置。 示例:偏移量保留和消费者重启 在以下示例中,我们模拟了一个在处理了几条消息后重启的消费者,展示了 Kafka 如何使用已提交的偏移量允许消费者从正确的位置恢复。 输出 ![]() 在此示例中
输出 ![]() 这演示了 Kafka 的偏移量存储如何允许消费者在故障场景中进行恢复和从上次中断的地方继续,确保不会丢失数据。 Kafka 中的偏移量提交策略Kafka 消费者需要一种机制来跟踪哪些消息已被成功处理,以免在失败或重启时重新处理相同的消息。这种机制通过**偏移量提交**来实现。Kafka 提供了几种管理和提交偏移量的策略:自动和手动提交、同步和异步提交。每种策略都有其优点和缺点,选择取决于应用程序对性能、容错和消息处理保证的要求。 在本说明中,我们将详细介绍以下策略,并附带合适的代码示例:
1. 自动提交策略**自动提交**策略是管理偏移量最简单的方法。在此方法中,Kafka 会定期自动提交偏移量,而无需消费者手动管理提交。 它的工作原理
关键配置属性
程序示例:自动提交策略 输出 ![]() 2. 手动提交策略**手动提交**策略使消费者能够明确控制何时提交偏移量。当您想确保一条消息在被标记为已消费之前已完全处理时,这非常有用。 它的工作原理
关键配置属性
程序示例:手动提交策略(同步提交) 在下面的示例中,消费者读取消息,并在处理完消息后才提交偏移量。我们将使用**同步提交**来确保在继续之前完成偏移量提交。 输出 ![]() 3. 同步提交与异步提交在使用手动提交时,Kafka 提供了两种提交偏移量的方法:**同步提交 (commitSync()) 和异步提交 (commitAsync())**。每种方法都有其优点,取决于应用程序的性能和可靠性要求。 同步提交 (commitSync())
程序示例:同步提交 在前面手动提交的示例中,我们使用了 commitSync()。这种方法确保在消费者处理下一批记录之前提交偏移量。 commitSync() 的优点
commitSync() 的缺点
异步提交 (commitAsync())
程序示例:异步提交 在此示例中,我们将修改之前的手动提交示例以使用**异步提交**。 输出 ![]() Kafka Streams 中的偏移量管理Kafka Streams 是一个强大的流处理库,构建在 Apache Kafka 之上,允许开发人员以分布式和可伸缩的方式处理实时数据流。在本节中,我们将探讨 Kafka Streams 中的**偏移量管理**,重点关注三个关键领域:
Kafka Streams 提供了复杂的机制来管理偏移量,确保即使在发生故障的情况下,消息也能被可靠一致地处理。本指南介绍了 Streams API 中偏移量如何被处理,以及 Kafka Streams 如何通过事务性保证来实现**一次性处理语义**。 1. Streams API 和偏移量处理Kafka Streams 抽象了偏移量管理的许多复杂性,使开发人员可以轻松构建容错的有状态应用程序。与典型的 Kafka 消费者不同,Kafka Streams 在后台自动管理偏移量。 关键概念
程序示例:简单的 Kafka Streams 应用程序 在此示例中,一个简单的 Kafka Streams 应用程序从“input-topic”读取,通过将值转换为大写来处理每条记录,并将转换后的记录写入“output-topic”。Kafka Streams 自动管理偏移量,在成功处理记录后将其提交。 要点
2. 偏移量管理中的事务性语义在 Kafka Streams 中,事务性语义在偏移量管理中起着至关重要的作用,尤其是在处理有状态流处理时。Kafka Streams 与 Kafka 的事务性 API 集成,以确保**状态更改**和**偏移量提交**都是原子的。这保证了状态和偏移量要么都被提交,要么都不被提交,从而避免了处理管道中的不一致。 事务如何工作
启用 Kafka Streams 中的事务 要启用事务性语义,您需要在 Streams 配置中将 processing.guarantee 配置参数设置为“exactly_once_v2”。 这告诉 Kafka Streams 使用事务进行处理,并原子地提交状态更改和偏移量。 程序示例:具有事务性语义的 Kafka Streams 在此示例中,应用程序配置为一次性语义,确保记录的处理、状态更新和偏移量提交都包含在一个事务中。 要点
3. Kafka Streams 中的一次性处理Kafka Streams 支持**一次性处理**,以确保即使在发生故障的情况下,每条记录也只处理一次。Kafka 的底层架构使用**幂等生产者**和**事务性消费者**来实现此保证。 一次性处理与至少一次处理语义
启用一次性处理 可以通过将 processing.guarantee 配置设置为 EXACTLY_ONCE_V2 来启用一次性处理。 程序示例:具有一次性处理的 Kafka Streams 让我们修改前面的示例,以在有状态处理场景中演示一次性语义。 通过启用**一次性处理**,Kafka Streams 可确保即使在处理过程中发生故障,也不会重复或丢失任何记录。Kafka Streams 通过使用幂等写入输出主题和用于偏移量提交的事务性保证来实现这一点。 要点
监控和管理 Kafka 中的偏移量偏移量管理在确保 Kafka 消费者可靠地从流的正确点读取数据方面起着至关重要的作用。Kafka 提供了多种工具和技术来监控和管理偏移量,以保持数据一致性、管理消费者延迟并实时检测问题。在本节中,我们将介绍:
在本节结束时,您将熟悉用于在流处理应用程序中监控和管理 Kafka 偏移量的实用技术和脚本。 1. 监控消费者延迟的工具消费者延迟是消费者的最后一个已提交偏移量与分区的当前偏移量(即,最新生产的消息)之间的差异。监控消费者延迟对于确保消费者高效地近乎实时地处理数据至关重要。较大的消费者延迟可能表明消费者吞吐量或性能存在问题。 Kafka 偏移量关键监控工具
示例:使用 Burrow 监控消费者延迟 Burrow 是 LinkedIn 开发的一个用于跟踪 Kafka 消费者组及其延迟的监控工具。它提供了一个 REST API,允许您监控消费者的状态。 以下是 Burrow 工作原理的简要概述:
Burrow 示例配置 要使用 Burrow 监控 Kafka 消费者延迟,您需要配置 Burrow 以连接到您的 Kafka 集群。下面是 Burrow 的示例配置文件: Burrow 运行后,您可以使用其 REST API 检索消费者延迟指标。例如,要获取消费者组的延迟信息,您可以查询: 示例输出 此 API 提供实时消费者延迟数据,使您能够监控消费者是否落后。 2. 使用 kafka-consumer-groups.sh 跟踪偏移量Kafka 附带一个名为 kafka-consumer-groups.sh 的内置命令行工具,它提供有关消费者组及其偏移量的信息。此工具可用于监控消费者延迟和手动管理偏移量。 kafka-consumer-groups.sh 的主要功能
基本命令 1. 列出所有消费者组 2. 描述特定的消费者组 输出 ![]() 在此输出中,您可以看到每个分区的当前偏移量、日志结束偏移量(最新生产的消息)以及**延迟**(两者之间的差值)。 3. 重置消费者组的偏移量您可以将偏移量重置为最早、最新或特定时间戳。当您想重新处理数据或跳过特定消息时,这很有用。
在这种情况下,消费者组的偏移量将重置为指定时间戳之后的可用最早消息。 程序示例:使用 kafka-consumer-groups.sh 监控 Kafka 偏移量 例如,系统管理员可能定期运行以下脚本来检查一组消费者组的消费者延迟: 输出 ![]() 3. 跟踪偏移量的最佳实践有效跟踪和管理 Kafka 偏移量对于确保数据管道的可靠性至关重要。以下是处理 Kafka 偏移量时应遵循的一些最佳实践: 1. 启用消费者延迟监控
2. 对关键应用程序使用手动偏移量管理
示例:Kafka 消费者中的手动偏移量提交 输出 ![]() 在此示例中,消费者配置为禁用自动提交,并在处理完记录后手动提交偏移量。此策略可确保仅在处理完成后才提交偏移量。 3. 确保偏移量保留符合您的用例 Kafka 会在一段时间内为消费者组保留偏移量,之后偏移量将被删除。此保留期可以使用 offsets.retention.minutes 参数进行配置。
4. 有效使用消费者组监控工具
下一个主题Kafka 消费者偏移量跟踪 |
我们请求您订阅我们的新闻通讯以获取最新更新。