Kafka 偏移量管理

14 2025年5月 | 阅读 24 分钟

Kafka 偏移量简介

Kafka 偏移量是分区中消息的唯一标识符。Kafka 分区中的每条消息都分配了一个递增的偏移量编号,从 0 开始。当消费者读取消息时,它们会从一个偏移量移到下一个偏移量,偏移量帮助 Kafka 跟踪最后读取的消息,允许消费者根据配置要么从最后一个已提交的偏移量恢复,要么从头开始恢复。

偏移量至关重要,因为 Kafka 是一个分布式系统,旨在处理跨多个消费者和代理的容错和可伸缩性。在这样的系统中,偏移量成为跟踪消息消费状态的基础,特别是对于可能离线或意外崩溃的消费者。

Kafka 偏移量管理涉及的核心组件包括:

  • 生产者:将消息写入特定的 Kafka 主题和分区,这些消息随后会被分配偏移量。
  • 代理:Kafka 代理存储消息并在其系统范围内管理分区。它还处理与偏移量信息相关的客户端请求。
  • 消费者:消费者从特定分区读取消息并使用偏移量跟踪其进度。
  • 消费者组:一组逻辑上分组的消费者,它们共同负责消费一组分区中的消息。Kafka 独立管理每个消费者组的偏移量。

Kafka 偏移量类型

偏移量是管理 Apache Kafka 中消息消费的关键方面。它们允许消费者跟踪其在消息流中的位置。理解 Kafka 偏移量管理的关键在于区分**已提交偏移量和当前偏移量**,偏移量如何管理(自动或手动),以及消费者组在这些偏移量管理中的作用。

在本节中,我们将详细探讨不同类型的 Kafka 偏移量,包括代码示例,以演示它们在现实场景中的用法。

已提交偏移量与当前偏移量

  • 已提交偏移量:这是消费者已处理并明确标记为完成的偏移量。它存储在 Kafka 的内部主题(__consumer_offsets)中,可用于在重启或故障后恢复消费者的进度。提交偏移量可确保 Kafka 知道消费者在读取和处理消息方面已取得多大进展。
  • 当前偏移量:这指的是消费者当前正在读取的消息的偏移量,但它可能尚未提交。当前偏移量是消费者本地的,可能代表已获取的最新消息,但不能保证已完全处理。

当前偏移量与已提交偏移量示例

这是一个示例,演示了当前偏移量和已提交偏移量之间的区别。在此示例中,我们将从 Kafka 读取消息并打印当前偏移量和已提交偏移量。

在此程序中

  • 我们将 enable.auto.commit 设置为 false,以禁用自动偏移量提交,从而允许我们在处理每条消息后手动提交偏移量。
  • 当前偏移量是正在处理的消息的偏移量,调用 commitSync() 后,已提交偏移量将反映下一个要消费的消息。

输出

Kafka Offset Management

Kafka 提供了两种管理偏移量的机制:**自动偏移量管理和手动偏移量管理**。这些方法决定了何时提交偏移量以及消费者如何处理其在主题中的位置跟踪。

自动偏移量管理

在**自动偏移量管理**中,Kafka 根据配置属性以固定的时间间隔自动提交偏移量。这是一种简单的方法,减少了开发人员手动管理偏移量的需求。然而,对于关键任务应用程序,这种方法不太可靠,因为消息可能在完全处理之前就被提交,如果消费者崩溃,可能会导致消息丢失。

通过将 enable.auto.commit 属性设置为 true 来启用自动偏移量管理,提交间隔由 auto.commit.interval.ms 属性控制(默认为 5 秒)。

示例:自动偏移量管理

在此示例中

  • Kafka 将每秒(1000 毫秒)自动提交一次偏移量。
  • 这种方法简化了消费者逻辑,但如果消费者在处理完所有消息之前失败,可能会导致数据丢失。
Kafka Offset Management

手动偏移量管理

通过**手动偏移量管理**,开发人员可以完全控制何时提交偏移量。这可以使用 commitSync() 或 commitAsync() 方法完成。手动偏移量管理提供了更高的可靠性和灵活性,因为它确保偏移量仅在成功处理消息后才提交。

示例:手动偏移量管理(同步提交)

输出

Kafka Offset Management

在此示例中

  • enable.auto.commit 属性设置为 false,因此不会自动提交偏移量。
  • commitSync() 方法用于在处理完所有消息后同步提交偏移量。这可确保在发生故障时不会跳过或丢失任何消息,使这种方法成为需要可靠消息处理的场景的理想选择。

示例:手动偏移量管理(异步提交)

虽然同步的 commitSync() 方法会阻塞消费者直到偏移量被提交,但 Kafka 还提供了 commitAsync() 方法用于异步偏移量提交。异步提交通过允许消费者在后台提交偏移量的同时继续处理来提高性能。

输出

Kafka Offset Management

在此示例中

  • commitAsync() 方法允许消费者在后台提交偏移量的同时处理新消息。
  • 如果偏移量提交失败,将在回调中处理异常。

消费者组在管理偏移量中的作用

消费者组在 Kafka 的偏移量管理系统中起着至关重要的作用。消费者组是一组共同处理 Kafka 主题数据的消费者。组中的每个消费者负责一部分分区,Kafka 会跟踪每个分区和消费者组组合的已提交偏移量。

关键概念

  1. 分区分配:当消费者组订阅一个主题时,Kafka 会将分区分配给每个消费者。每个分区只分配给组内的一个消费者。这允许水平扩展,因为多个消费者可以并行处理同一主题的消息。
  2. 偏移量跟踪:Kafka 在 __consumer_offsets 主题中跟踪每个分区和消费者组的已提交偏移量。这可确保在重启或故障后,组中的每个消费者都从最后一个已提交的偏移量继续。
  3. 再平衡:当消费者加入或离开组时,Kafka 会触发再平衡,将分区重新分配给消费者。再平衡后,消费者将从其最后一个已提交的偏移量恢复。

示例:具有多个消费者的消费者组

输出

Kafka Offset Management

在此示例中,两个消费者属于同一个消费者组(my-consumer-group)。Kafka 会将 my-topic 的分区分配给这些消费者。每个消费者将仅接收其被分配分区的消息,并且每个分区的已提交偏移量将独立跟踪。

Kafka 中的偏移量工作原理

偏移量是 Kafka 架构的核心,它允许消费者了解其在 Kafka 主题分区中读取消息的进度。在本节中,我们将深入探讨 Kafka 中偏移量的作用,涵盖三个关键方面:**分区和偏移量、消费者偏移量跟踪以及偏移量存储和保留。**我们还将为每个概念提供合适的示例和程序。

分区和偏移量

Kafka 主题被划分为分区,这些分区分布在 Kafka 集群的代理之间。每个分区都是一个有序的、不可变的记录(消息)序列。分区中的每条记录都分配有一个**偏移量**,这是一个唯一的顺序编号,用作该记录在分区内的标识符。偏移量特定于分区,并表示记录在分区中的位置。

关键概念

  • 分区:Kafka 主题被分割成分区,以实现水平可伸缩性。每个分区都可以独立消费,这使得 Kafka 对于多个消费者进行并行处理具有高度可伸缩性。
  • 偏移量:偏移量是分区内每条记录的唯一标识符。偏移量从第一个消息的 0 开始,并随着新消息附加到分区而顺序增加。

将消息生产到 Kafka 分区

在以下程序中,我们将消息生产到已分区的 Kafka 主题,并观察如何在不同分区内的每条消息中分配偏移量。

在此示例中

  • 我们将 10 条消息生产到名为“my-partitioned-topic”的主题。
  • 每条消息发送到分区 0 或分区 1(基于键的索引模 2)。
  • RecordMetadata 对象使我们能够访问每条消息的分区和偏移量。

输出

Kafka Offset Management

如输出所示,每个分区都有自己的偏移量集,从 0 开始。偏移量在一个分区内是连续的,但在分区之间是独立的。

消费者偏移量跟踪

当消费者从 Kafka 主题读取消息时,它使用偏移量来跟踪已处理的消息和待处理的消息。Kafka 消费者通过提交其已消费消息的偏移量来维护其进度。

关键概念

  • 当前偏移量:消费者将要从分区读取的下一条记录的偏移量。
  • 已提交偏移量:消费者已成功处理并提交的最后一条记录的偏移量。如果消费者重启或发生故障,它将从已提交的偏移量恢复。
  • 消费者组:Kafka 按消费者组进行偏移量跟踪。每个消费者组可以有多个消费者从不同的分区读取。Kafka 确保每个分区仅分配给组内的其中一个消费者,这使得消费者组成为 Kafka 可伸缩性和并行处理的重要组成部分。

示例:跟踪消费者偏移量

让我们创建一个从主题读取消息并在处理消息后手动提交偏移量的消费者。这使我们能够控制何时提交偏移量。

输出

Kafka Offset Management

在此示例中

  • 消费者从“my-partitioned-topic”读取消息。
  • 在处理完每批消息后,它使用 commitSync() 手动提交偏移量。
  • 这确保已提交的偏移量仅反映已成功处理的消息。

如果消费者崩溃或重启,它将从每个分区的最后一个已提交偏移量恢复,确保不会跳过或丢失任何消息。

偏移量存储和保留

Kafka 在一个名为 __consumer_offsets 的内部主题中存储已提交的偏移量。此主题中的每个分区对应于原始 Kafka 主题中的一个分区,并且每个消费者组在该主题中都有自己的条目用于跟踪偏移量。偏移量会保留一段可配置的时间。

关键概念

  • 偏移量存储:已提交的偏移量存储在 __consumer_offsets 主题中。这允许 Kafka 跟踪消费者进度并在故障后实现消费者恢复。
  • 保留策略:默认情况下,Kafka 会保留已提交偏移量 7 天。这意味着,如果消费者组中的某个消费者在 7 天内未从某个分区读取,则该分区的偏移量将被删除。可以通过 offsets.retention.minutes 设置来配置保留期。

配置偏移量保留

要更改偏移量保留期,可以修改 Kafka 代理中的 offsets.retention.minutes 配置。例如,要将偏移量保留 30 天(43,200 分钟),可以在 server.properties 文件中添加此设置。

示例:偏移量保留和消费者重启

在以下示例中,我们模拟了一个在处理了几条消息后重启的消费者,展示了 Kafka 如何使用已提交的偏移量允许消费者从正确的位置恢复。

输出

Kafka Offset Management

在此示例中

  • 消费者处理 5 条消息并提交偏移量。
  • 然后消费者重启,模拟崩溃或故障。
  • 重启后,消费者将从最后一个已提交的偏移量恢复并处理另外 5 条消息。

输出

Kafka Offset Management

这演示了 Kafka 的偏移量存储如何允许消费者在故障场景中进行恢复和从上次中断的地方继续,确保不会丢失数据。

Kafka 中的偏移量提交策略

Kafka 消费者需要一种机制来跟踪哪些消息已被成功处理,以免在失败或重启时重新处理相同的消息。这种机制通过**偏移量提交**来实现。Kafka 提供了几种管理和提交偏移量的策略:自动和手动提交、同步和异步提交。每种策略都有其优点和缺点,选择取决于应用程序对性能、容错和消息处理保证的要求。

在本说明中,我们将详细介绍以下策略,并附带合适的代码示例:

  1. 自动提交策略
  2. 手动提交策略
  3. 同步提交与异步提交

1. 自动提交策略

**自动提交**策略是管理偏移量最简单的方法。在此方法中,Kafka 会定期自动提交偏移量,而无需消费者手动管理提交。

它的工作原理

  • Kafka 在消费者轮询记录后自动提交偏移量。
  • 您可以使用 auto.commit.interval.ms 配置来控制自动提交的频率。
  • 这种策略非常适合那些不要求每次处理一条记录的精确性,并且更看重简洁性而不是严格的消息处理保证的应用程序。

关键配置属性

  • enable.auto.commit = true:启用自动偏移量提交。
  • auto.commit.interval.ms = <interval>:Kafka 自动提交偏移量的间隔(以毫秒为单位)。

程序示例:自动提交策略

输出

Kafka Offset Management

2. 手动提交策略

**手动提交**策略使消费者能够明确控制何时提交偏移量。当您想确保一条消息在被标记为已消费之前已完全处理时,这非常有用。

它的工作原理

  • 消费者使用 commitSync() 或 commitAsync() 手动提交偏移量。
  • 您可以精确控制偏移量的提交时间,这使得您可以在完成特定的业务逻辑后提交。
  • 这种策略对于需要一次或至少一次消息处理保证的应用程序至关重要。

关键配置属性

  • enable.auto.commit = false:禁用自动偏移量提交。
  • commitSync() 或 commitAsync():用于手动提交偏移量。

程序示例:手动提交策略(同步提交)

在下面的示例中,消费者读取消息,并在处理完消息后才提交偏移量。我们将使用**同步提交**来确保在继续之前完成偏移量提交。

输出

Kafka Offset Management

3. 同步提交与异步提交

在使用手动提交时,Kafka 提供了两种提交偏移量的方法:**同步提交 (commitSync()) 和异步提交 (commitAsync())**。每种方法都有其优点,取决于应用程序的性能和可靠性要求。

同步提交 (commitSync())

  • 阻塞:消费者会阻塞,直到 Kafka 代理确认提交请求。
  • 保证送达:如果 commitSync() 失败,应用程序可以重试,确保偏移量成功提交。
  • 性能权衡:由于消费者会等待提交完成,如果提交需要较长时间,这可能会降低吞吐量。

程序示例:同步提交

在前面手动提交的示例中,我们使用了 commitSync()。这种方法确保在消费者处理下一批记录之前提交偏移量。

commitSync() 的优点

  • 可靠,并保证在前进之前提交偏移量。

commitSync() 的缺点

  • 由于阻塞行为,可能导致性能下降。

异步提交 (commitAsync())

  • 非阻塞:提交被异步发送到 Kafka,消费者可以在不等待提交完成的情况下继续处理消息。
  • 性能更好:由于消费者不会阻塞,因此可以实现更高的吞吐量。
  • 保证较少:不保证提交会成功。失败回调可用于处理提交失败。

程序示例:异步提交

在此示例中,我们将修改之前的手动提交示例以使用**异步提交**。

输出

Kafka Offset Management

Kafka Streams 中的偏移量管理

Kafka Streams 是一个强大的流处理库,构建在 Apache Kafka 之上,允许开发人员以分布式和可伸缩的方式处理实时数据流。在本节中,我们将探讨 Kafka Streams 中的**偏移量管理**,重点关注三个关键领域:

  1. Streams API 和偏移量处理
  2. 偏移量管理中的事务性语义
  3. 一次性处理

Kafka Streams 提供了复杂的机制来管理偏移量,确保即使在发生故障的情况下,消息也能被可靠一致地处理。本指南介绍了 Streams API 中偏移量如何被处理,以及 Kafka Streams 如何通过事务性保证来实现**一次性处理语义**。

1. Streams API 和偏移量处理

Kafka Streams 抽象了偏移量管理的许多复杂性,使开发人员可以轻松构建容错的有状态应用程序。与典型的 Kafka 消费者不同,Kafka Streams 在后台自动管理偏移量。

关键概念

  • 偏移量处理:Kafka Streams 管理其处理的每个分区的偏移量,并在处理完消息后将这些偏移量提交给 Kafka。
  • 有状态处理与无状态处理:偏移量的处理方式取决于您使用的是有状态处理还是无状态处理。
  • 变更日志主题:对于有状态处理,Kafka Streams 使用内部变更日志主题来存储应用程序的状态,确保在发生故障后可以重建处理状态。

程序示例:简单的 Kafka Streams 应用程序

在此示例中,一个简单的 Kafka Streams 应用程序从“input-topic”读取,通过将值转换为大写来处理每条记录,并将转换后的记录写入“output-topic”。Kafka Streams 自动管理偏移量,在成功处理记录后将其提交。

要点

  • Kafka Streams 在成功处理记录后自动提交偏移量。
  • 由于 Streams API 抽象了这种复杂性,因此无需手动提交偏移量。
  • 在像这样的无状态转换中,Kafka Streams 通过仅在记录被处理并写入输出主题后才提交偏移量,从而高效地处理偏移量管理。

2. 偏移量管理中的事务性语义

在 Kafka Streams 中,事务性语义在偏移量管理中起着至关重要的作用,尤其是在处理有状态流处理时。Kafka Streams 与 Kafka 的事务性 API 集成,以确保**状态更改**和**偏移量提交**都是原子的。这保证了状态和偏移量要么都被提交,要么都不被提交,从而避免了处理管道中的不一致。

事务如何工作

  • 事务 ID:每个 Kafka Streams 实例使用唯一的事务 ID 来标识其事务,从而能够提供事务性保证。
  • 原子性:Kafka Streams 确保偏移量和任何状态更新(例如,变更日志主题)都作为单个事务的一部分进行提交。
  • 隔离性:Kafka Streams 利用底层的 Kafka 事务支持来确保中间结果在事务完全提交之前不会被下游消费者看到。

启用 Kafka Streams 中的事务

要启用事务性语义,您需要在 Streams 配置中将 processing.guarantee 配置参数设置为“exactly_once_v2”。

这告诉 Kafka Streams 使用事务进行处理,并原子地提交状态更改和偏移量。

程序示例:具有事务性语义的 Kafka Streams

在此示例中,应用程序配置为一次性语义,确保记录的处理、状态更新和偏移量提交都包含在一个事务中。

要点

  • Kafka Streams 的事务性语义可确保状态和偏移量的原子更新。
  • 如果在处理过程中发生故障,Kafka Streams 可确保状态更新和偏移量都不会被提交,从而防止不一致。
  • 事务对于有状态处理至关重要,在这些处理中,一致性是关键的,例如在金融应用程序或订单处理系统中。

3. Kafka Streams 中的一次性处理

Kafka Streams 支持**一次性处理**,以确保即使在发生故障的情况下,每条记录也只处理一次。Kafka 的底层架构使用**幂等生产者**和**事务性消费者**来实现此保证。

一次性处理与至少一次处理语义

  • 至少一次:在处理记录后提交偏移量,因此在发生故障的情况下可能会重新处理,但不会丢失数据。
  • 一次性:偏移量和状态更改被原子地提交,确保在发生故障的情况下既不会重新处理也不会丢失记录。

启用一次性处理

可以通过将 processing.guarantee 配置设置为 EXACTLY_ONCE_V2 来启用一次性处理。

程序示例:具有一次性处理的 Kafka Streams

让我们修改前面的示例,以在有状态处理场景中演示一次性语义。

通过启用**一次性处理**,Kafka Streams 可确保即使在处理过程中发生故障,也不会重复或丢失任何记录。Kafka Streams 通过使用幂等写入输出主题和用于偏移量提交的事务性保证来实现这一点。

要点

  • Kafka Streams 支持一次性处理,确保即使在发生故障的情况下,每条记录也只处理一次。
  • 通过将 processing.guarantee 参数配置为 EXACTLY_ONCE_V2 来启用一次性处理。
  • 这种保证对于有状态处理或在不允许重复或数据丢失的应用程序中特别有用。

监控和管理 Kafka 中的偏移量

偏移量管理在确保 Kafka 消费者可靠地从流的正确点读取数据方面起着至关重要的作用。Kafka 提供了多种工具和技术来监控和管理偏移量,以保持数据一致性、管理消费者延迟并实时检测问题。在本节中,我们将介绍:

  1. 监控消费者延迟的工具
  2. 使用 kafka-consumer-groups.sh 跟踪偏移量
  3. 跟踪偏移量的最佳实践

在本节结束时,您将熟悉用于在流处理应用程序中监控和管理 Kafka 偏移量的实用技术和脚本。

1. 监控消费者延迟的工具

消费者延迟是消费者的最后一个已提交偏移量与分区的当前偏移量(即,最新生产的消息)之间的差异。监控消费者延迟对于确保消费者高效地近乎实时地处理数据至关重要。较大的消费者延迟可能表明消费者吞吐量或性能存在问题。

Kafka 偏移量关键监控工具

  • Kafka 命令行工具:Kafka 提供了多种命令行工具,如 kafka-consumer-groups.sh 和 kafka-run-class.sh,用于监控消费者延迟和偏移量。
  • 第三方监控工具:Prometheus、Grafana、LinkedIn 的 Burrow 和 Confluent Control Center 等工具可以监控 Kafka 集群,包括消费者偏移量、延迟和分区性能。

示例:使用 Burrow 监控消费者延迟

Burrow 是 LinkedIn 开发的一个用于跟踪 Kafka 消费者组及其延迟的监控工具。它提供了一个 REST API,允许您监控消费者的状态。

以下是 Burrow 工作原理的简要概述:

  • Burrow 持续监控 Kafka 消费者偏移量,并将其与分区中的最新偏移量进行比较。
  • 它实时计算延迟,并通过 REST API 公开此信息。

Burrow 示例配置

要使用 Burrow 监控 Kafka 消费者延迟,您需要配置 Burrow 以连接到您的 Kafka 集群。下面是 Burrow 的示例配置文件:

Burrow 运行后,您可以使用其 REST API 检索消费者延迟指标。例如,要获取消费者组的延迟信息,您可以查询:

示例输出

此 API 提供实时消费者延迟数据,使您能够监控消费者是否落后。

2. 使用 kafka-consumer-groups.sh 跟踪偏移量

Kafka 附带一个名为 kafka-consumer-groups.sh 的内置命令行工具,它提供有关消费者组及其偏移量的信息。此工具可用于监控消费者延迟和手动管理偏移量。

kafka-consumer-groups.sh 的主要功能

  • 查看消费者组详细信息,包括分区分配、已提交偏移量和延迟。
  • 重置消费者组的偏移量。
  • 列出集群中当前所有活动的消费者组。

基本命令

1. 列出所有消费者组

2. 描述特定的消费者组

输出

Kafka Offset Management

在此输出中,您可以看到每个分区的当前偏移量、日志结束偏移量(最新生产的消息)以及**延迟**(两者之间的差值)。

3. 重置消费者组的偏移量

您可以将偏移量重置为最早、最新或特定时间戳。当您想重新处理数据或跳过特定消息时,这很有用。

  • 重置为最早偏移量
  • 通过时间戳重置为特定偏移量

在这种情况下,消费者组的偏移量将重置为指定时间戳之后的可用最早消息。

程序示例:使用 kafka-consumer-groups.sh 监控 Kafka 偏移量

例如,系统管理员可能定期运行以下脚本来检查一组消费者组的消费者延迟:

输出

Kafka Offset Management

3. 跟踪偏移量的最佳实践

有效跟踪和管理 Kafka 偏移量对于确保数据管道的可靠性至关重要。以下是处理 Kafka 偏移量时应遵循的一些最佳实践:

1. 启用消费者延迟监控

  • 定期监控消费者延迟:使用 Burrow、Prometheus 或 Confluent Control Center 等工具定期监控消费者延迟。监控工具可以在消费者落后时向您发出警报,指示性能瓶颈。
  • 设置延迟阈值:配置警报阈值,以便在延迟超过可接受限制时通知管理员。

2. 对关键应用程序使用手动偏移量管理

  • 手动偏移量提交:对于需要对偏移量提交时间进行更多控制的应用程序(例如,一次性语义或事务性工作流),请考虑使用手动偏移量提交策略。这确保仅在记录完全处理后才提交偏移量。

示例:Kafka 消费者中的手动偏移量提交

输出

Kafka Offset Management

在此示例中,消费者配置为禁用自动提交,并在处理完记录后手动提交偏移量。此策略可确保仅在处理完成后才提交偏移量。

3. 确保偏移量保留符合您的用例

Kafka 会在一段时间内为消费者组保留偏移量,之后偏移量将被删除。此保留期可以使用 offsets.retention.minutes 参数进行配置。

  • 调整偏移量保留:如果您的应用程序需要在较长一段时间后(例如,几天或几周)从最后一个已提交的偏移量重新处理消息,请确保将 offsets.retention.minutes 设置调整为保留适当时间的偏移量。

4. 有效使用消费者组监控工具

  • 跟踪消费者组指标:使用 Kafka 内置的 kafka-consumer-groups.sh 工具监控您的消费者组并实时跟踪偏移量。
  • 平衡组内消费者:确保消费者组在分区之间正确平衡,以避免某些消费者过载而其他消费者空闲。