Kafka 保留策略

2025年5月16日 | 11 分钟阅读

Kafka 的保留机制对于维持效率至关重要,它能确保分布式系统在高吞吐量下处理数据而不会耗尽存储容量。保留机制是 Kafka 用来决定主题中的消息保留多长时间,以及在超过时间或大小限制后如何处理这些消息的过程。本指南将涵盖从基本的保留策略到日志压缩等高级配置,并提供实际示例。

1. 日志保留策略

Kafka 将数据存储在主题(topics)中,这些主题被分成多个分区(partitions)以实现可扩展性和并行处理。每个分区由一系列以文件形式存储在磁盘上的日志段(log segments)组成。一个日志段是一系列连续的记录或消息,Kafka 将新记录追加到这些段的末尾。

Kafka 的日志保留策略决定了 Kafka 在日志中保留这些消息多长时间,之后它们将被删除或压缩。有多种机制可以控制消息何时以及如何被丢弃,包括:

  • 基于时间的保留:消息保留一段特定时间,之后被删除。
  • 基于大小的保留:消息被保留,直到日志总大小超过预定义限制。

Kafka 的默认设置是保留消息 7 天,但这个设置可以在代理级别(broker-level)和主题级别(topic-level)进行自定义,从而让用户能够精细控制 Kafka 保留数据的时间。

通过控制保留策略,Kafka 允许用户优化磁盘使用,同时保留足够的数据供消费者处理。

基于时间的保留

Kafka 中的基于时间的保留允许用户将主题中的消息保留一段特定的时间。一旦超过保留时间,Kafka 会自动删除较旧的消息。这种策略适用于消息仅在有限时间内需要,之后可以丢弃的场景。

配置参数

  • retention.ms: 该参数控制主题中消息的保留时间。它定义了 Kafka 在消息符合删除条件之前保留它们的最长时间。该值以毫秒为单位指定,默认值为 7 天(604800000 毫秒)。

示例:设置基于时间的保留

假设您有一个用例,消息需要保留 1 天(86400000 毫秒)。

在此示例中,Kafka 将在您的主题(your-topic)中保留消息 24 小时。之后,旧消息将被 Kafka 的后台进程自动删除。

基于大小的保留

Kafka 中的基于大小的保留旨在限制消息所消耗的磁盘空间。该策略确保当主题的日志总大小超过预定义的大小限制时,删除较旧的消息。

配置参数

  • retention.bytes: 此参数定义了主题可以保留的最大大小(以字节为单位)。一旦日志总大小超过此值,Kafka 将开始删除最旧的消息以释放空间。

示例:设置基于大小的保留

在以下示例中,您希望为特定主题最多保留 2 GB 的数据。

在这里,一旦您的主题(your-topic)的日志总大小超过 2 GB,Kafka 将删除较旧的消息。

保留时间

Kafka 中的保留时间指的是消息在被删除之前可以保留在主题中的最长时间。这由 `retention.ms` 配置控制,该配置以毫秒为单位指定时间。在配置的保留时间过去后,Kafka 会自动删除较旧的消息,以确保主题的存储得到有效管理。

保留时间可以在代理级别主题级别进行配置:

  • 代理级别配置: 所有主题的默认保留时间在代理级别设置,这意味着它适用于每个主题,除非在主题级别被覆盖。
  • 主题级别配置: 可以为单个主题配置自己的保留时间设置,这将覆盖该主题的代理默认保留配置。

保留时间在 Kafka 中如何工作?

Kafka 的日志保留机制基于日志段的原理。Kafka 将消息存储在分区内的日志段中,每个分区包含一系列这些段。当新消息被生产到主题时,Kafka 将它们追加到相关分区的当前日志段的末尾。

一旦日志段达到其大小或时间限制,它将被关闭,并启动一个新的段。然后,Kafka 使用保留时间来决定何时可以删除这些日志段。

  • 每个日志段都有一个与之关联的时间戳。
  • Kafka 定期检查日志段的年龄与配置的 `retention.ms` 值。
  • 如果日志段的时间戳超过了保留时间,Kafka 会删除该段,从而释放存储空间。

保留时间粒度

保留时间在日志段级别操作,而不是在单个消息级别。Kafka 检查整个日志段的年龄,如果任何段的年龄超过保留时间,它将删除整个段。这意味着即使段中有最近的消息,如果段的年龄超过了配置的保留时间,Kafka 也会删除整个段。

配置保留时间

Kafka 允许灵活配置保留时间。默认保留时间为 7 天(604800000 毫秒),但可以根据应用程序的需求进行自定义。

1. 代理级别配置

在代理级别,`log.retention.ms` 参数定义了所有主题的默认保留时间。如果您希望代理上的所有主题具有相同的保留时间,可以在代理的配置文件中设置此参数。

代理级别配置示例

2.主题级别配置

您可以通过使用 `retention.ms` 参数来覆盖单个主题的默认代理设置。这可以通过使用 Kafka 的命令行工具或 API 更改主题配置来完成。

主题级别配置示例

在此示例中,Kafka 将在 `my-topic` 中保留消息 1 天(86400000 毫秒)。此时间过后,较旧的消息将被自动删除。

保留大小

Kafka 中的保留大小是一个配置参数,用于定义主题在删除旧消息之前可以占用的最大磁盘空间。这对于管理存储至关重要,尤其是在处理大量流数据的系统中。保留大小由 `retention.bytes` 参数控制,该参数可以在代理和主题级别设置。

为什么保留大小很重要?

  • 存储管理: 保留大小通过在总大小超过指定限制时自动删除旧消息,确保 Kafka 不会耗尽磁盘空间。
  • 成本控制: 在云环境中,存储成本可能迅速累积,设置保留大小有助于通过限制存储的数据量来控制开销。

保留大小的关键参数

  • retention.bytes: 该参数指定主题日志可以占用的最大大小(以字节为单位)。当日志大小超过此值时,Kafka 会删除最旧的段。
  • 默认值: 默认设置为 -1,表示对主题的大小没有限制。

配置保留大小

可以使用 Kafka 命令行工具 `kafka-topics.sh` 或通过修改代理配置来配置保留大小。

1. 代理级别配置

要为所有主题设置默认保留大小,您可以修改 Kafka 代理上的 `server.properties` 文件:

2. 主题级别配置

要为特定主题设置保留大小,您可以使用以下命令:

管理保留大小的示例程序

下面是一个完整的 Kafka 程序示例,它创建一个具有指定保留大小的主题,生产一些消息,然后检查主题的配置以确认保留大小。

步骤 1:启动 Kafka 服务器和 Zookeeper

确保您的 Kafka 和 Zookeeper 正在运行。您可以使用以下命令启动它们:

步骤 2:创建具有保留大小的主题

使用 `kafka-topics.sh` 命令创建一个名为 `my-topic` 的主题,保留大小为 500 MB。

步骤 3:向主题生产消息

接下来,您可以向 `my-topic` 生产消息以填满它。使用 `kafka-console-producer.sh` 脚本:

您可以在控制台中输入一些消息,并在每条消息后按 Enter 键。例如:

Kafka Retention

步骤 4:检查主题配置

要确认保留大小设置正确,请使用以下命令:

此命令将输出有关该主题的详细信息,包括其保留大小。您应该看到类似这样的输出:

Kafka Retention

步骤 5:生产更多消息

继续生产消息,直到主题中消息的总大小超过 500 MB。一旦超过,Kafka 将开始删除最旧的消息以释放空间。

步骤 6:监控主题

您可以监控日志和主题的大小,以了解 Kafka 如何管理保留大小。您可能需要检查 Kafka 日志目录中的磁盘使用情况,该目录通常位于 Kafka 安装目录下的 `logs/` 中。

步骤 7:测试保留大小

要测试保留大小的工作方式,您可以生产足够的消息以超过保留限制,并观察旧消息是如何被删除的。例如:

日志压缩

日志压缩是 Apache Kafka 中的一种保留机制,它允许主题只保留日志中每个键的最新值。此功能对于最新状态比保留所有历史数据更重要的用例特别有用。日志压缩确保 Kafka 有效管理存储,同时提供一种恢复数据最新状态的方法。

日志压缩的关键概念

  1. 保留策略: 与基于时间或大小的保留策略(无论其键如何都删除旧消息)不同,日志压缩根据消息键保留消息。
  2. 墓碑消息(Tombstones): 当一个键被删除时,会发送一个称为墓碑消息的特殊消息。这表示该键应从日志中删除。墓碑消息也受保留策略的约束。
  3. 清理: Kafka 定期压缩日志,删除同一键的旧版本,只保留最新的值。
  4. 配置: 日志压缩使用主题配置中的 `cleanup.policy` 参数进行配置。将其设置为 `compact` 可为该主题启用日志压缩。

配置日志压缩

要在 Kafka 主题上启用日志压缩,您可以将 `cleanup.policy` 参数设置为 `compact`。以下是演示如何创建具有日志压缩的主题、生产消息并查看压缩效果的步骤。

日志压缩的分步程序

步骤 1:启动 Kafka 服务器和 Zookeeper

确保您的 Kafka 和 Zookeeper 服务器正在运行。您可以使用以下命令启动它们:

步骤 2:创建具有日志压缩的主题

使用 `kafka-topics.sh` 命令创建一个名为 `compaction-topic` 且启用了日志压缩的主题。

步骤 3:向主题生产消息

接下来,使用 `kafka-console-producer.sh` 脚本生产带键的消息。这至关重要,因为日志压缩是基于键工作的。

您可以输入格式为 `key:value` 的消息。

Kafka Retention

步骤 4:验证主题中的消息

要检查主题中当前的消息,您可以使用 `kafka-console-consumer.sh` 脚本从头开始消费消息:

输出

Kafka Retention

步骤 5:观察日志压缩

Kafka 在生产消息后不会立即压缩日志。您可能需要等待一段时间才能运行压缩过程。您也可以通过临时将 `min.cleanable.dirty.ratio` 调整为较低的值来强制进行日志压缩,但通常 Kafka 会根据其内部设置自动处理此过程。

步骤 6:消费压缩后的消息

一段时间后,再次消费消息:

输出

Kafka Retention

在这里,键 1 的第一条和第二条消息已被删除,因为它们比最新版本(第三条消息)旧,而键 2 只保留了最后一条消息。

段配置

段配置是 Kafka 架构的一个重要方面,它管理数据如何存储在日志文件中。Kafka 主题被划分为段,这些段是磁盘上存储一系列消息的不可变文件。一旦满足某些条件,每个段都会被关闭,新消息会追加到一个新段中。正确配置段设置对于优化性能、存储效率和资源管理至关重要。

段配置的关键组件

1. 段大小 (log.segment.bytes)

  • 此参数指定每个日志段文件的最大大小(以字节为单位)。一旦段达到此大小,它将被关闭,并创建一个新的段文件。
  • 默认值: 默认设置为 1 GB (1073741824 字节)。

2. 段时间 (log.segment.ms)

  • 此参数定义了 Kafka 允许一个段保持打开的最长时间(以毫秒为单位),无论其大小如何。当达到此时间限制时,将创建一个新段。

3. 段索引 (index.interval.bytes)

  • 此参数指定 Kafka 为写入段的消息创建索引条目的频率。
  • 默认值: 默认值为 4 KB (4096 字节)。
  • 功能: 较低的值将创建更频繁的索引条目,从而可以更快地访问消息,但会消耗更多的磁盘空间。

4. 段的保留 (log.retention.check.interval.ms)

  • 此参数设置 Kafka 检查日志段保留策略的频率。

5. 最小可清理比率 (min.cleanable.dirty.ratio)

  • 虽然不严格属于段配置,但此参数在压缩期间与段协同工作。它确定在触发压缩之前日志中脏数据(可压缩)的最小比率。

配置段参数

您可以在 Kafka 代理的 `server.properties` 文件中或在主题级别使用 `kafka-topics.sh` 脚本配置段参数。以下是如何设置这些参数的示例。

1. 代理级别配置

要为所有主题设置默认段参数,请修改 `server.properties` 文件:

2. 主题级别配置

要为特定主题设置段参数,请使用以下命令:

bash

复制代码

示例程序:监控段配置

您可以使用 `kafka-topics.sh` 命令监控和验证 Kafka 主题的段配置设置:

输出

Kafka Retention
下一个主题Kafka-streams-dsl