Kafka 保留策略2025年5月16日 | 11 分钟阅读 Kafka 的保留机制对于维持效率至关重要,它能确保分布式系统在高吞吐量下处理数据而不会耗尽存储容量。保留机制是 Kafka 用来决定主题中的消息保留多长时间,以及在超过时间或大小限制后如何处理这些消息的过程。本指南将涵盖从基本的保留策略到日志压缩等高级配置,并提供实际示例。 1. 日志保留策略Kafka 将数据存储在主题(topics)中,这些主题被分成多个分区(partitions)以实现可扩展性和并行处理。每个分区由一系列以文件形式存储在磁盘上的日志段(log segments)组成。一个日志段是一系列连续的记录或消息,Kafka 将新记录追加到这些段的末尾。 Kafka 的日志保留策略决定了 Kafka 在日志中保留这些消息多长时间,之后它们将被删除或压缩。有多种机制可以控制消息何时以及如何被丢弃,包括:
Kafka 的默认设置是保留消息 7 天,但这个设置可以在代理级别(broker-level)和主题级别(topic-level)进行自定义,从而让用户能够精细控制 Kafka 保留数据的时间。 通过控制保留策略,Kafka 允许用户优化磁盘使用,同时保留足够的数据供消费者处理。 基于时间的保留Kafka 中的基于时间的保留允许用户将主题中的消息保留一段特定的时间。一旦超过保留时间,Kafka 会自动删除较旧的消息。这种策略适用于消息仅在有限时间内需要,之后可以丢弃的场景。 配置参数
示例:设置基于时间的保留 假设您有一个用例,消息需要保留 1 天(86400000 毫秒)。 在此示例中,Kafka 将在您的主题(your-topic)中保留消息 24 小时。之后,旧消息将被 Kafka 的后台进程自动删除。 基于大小的保留Kafka 中的基于大小的保留旨在限制消息所消耗的磁盘空间。该策略确保当主题的日志总大小超过预定义的大小限制时,删除较旧的消息。 配置参数
示例:设置基于大小的保留 在以下示例中,您希望为特定主题最多保留 2 GB 的数据。 在这里,一旦您的主题(your-topic)的日志总大小超过 2 GB,Kafka 将删除较旧的消息。 保留时间Kafka 中的保留时间指的是消息在被删除之前可以保留在主题中的最长时间。这由 `retention.ms` 配置控制,该配置以毫秒为单位指定时间。在配置的保留时间过去后,Kafka 会自动删除较旧的消息,以确保主题的存储得到有效管理。 保留时间可以在代理级别和主题级别进行配置:
保留时间在 Kafka 中如何工作?Kafka 的日志保留机制基于日志段的原理。Kafka 将消息存储在分区内的日志段中,每个分区包含一系列这些段。当新消息被生产到主题时,Kafka 将它们追加到相关分区的当前日志段的末尾。 一旦日志段达到其大小或时间限制,它将被关闭,并启动一个新的段。然后,Kafka 使用保留时间来决定何时可以删除这些日志段。
保留时间粒度保留时间在日志段级别操作,而不是在单个消息级别。Kafka 检查整个日志段的年龄,如果任何段的年龄超过保留时间,它将删除整个段。这意味着即使段中有最近的消息,如果段的年龄超过了配置的保留时间,Kafka 也会删除整个段。 配置保留时间Kafka 允许灵活配置保留时间。默认保留时间为 7 天(604800000 毫秒),但可以根据应用程序的需求进行自定义。 1. 代理级别配置 在代理级别,`log.retention.ms` 参数定义了所有主题的默认保留时间。如果您希望代理上的所有主题具有相同的保留时间,可以在代理的配置文件中设置此参数。 代理级别配置示例 2.主题级别配置 您可以通过使用 `retention.ms` 参数来覆盖单个主题的默认代理设置。这可以通过使用 Kafka 的命令行工具或 API 更改主题配置来完成。 主题级别配置示例 在此示例中,Kafka 将在 `my-topic` 中保留消息 1 天(86400000 毫秒)。此时间过后,较旧的消息将被自动删除。 保留大小Kafka 中的保留大小是一个配置参数,用于定义主题在删除旧消息之前可以占用的最大磁盘空间。这对于管理存储至关重要,尤其是在处理大量流数据的系统中。保留大小由 `retention.bytes` 参数控制,该参数可以在代理和主题级别设置。 为什么保留大小很重要?
保留大小的关键参数
配置保留大小可以使用 Kafka 命令行工具 `kafka-topics.sh` 或通过修改代理配置来配置保留大小。 1. 代理级别配置 要为所有主题设置默认保留大小,您可以修改 Kafka 代理上的 `server.properties` 文件: 2. 主题级别配置 要为特定主题设置保留大小,您可以使用以下命令: 管理保留大小的示例程序 下面是一个完整的 Kafka 程序示例,它创建一个具有指定保留大小的主题,生产一些消息,然后检查主题的配置以确认保留大小。 步骤 1:启动 Kafka 服务器和 Zookeeper 确保您的 Kafka 和 Zookeeper 正在运行。您可以使用以下命令启动它们: 步骤 2:创建具有保留大小的主题 使用 `kafka-topics.sh` 命令创建一个名为 `my-topic` 的主题,保留大小为 500 MB。 步骤 3:向主题生产消息 接下来,您可以向 `my-topic` 生产消息以填满它。使用 `kafka-console-producer.sh` 脚本: 您可以在控制台中输入一些消息,并在每条消息后按 Enter 键。例如: ![]() 步骤 4:检查主题配置 要确认保留大小设置正确,请使用以下命令: 此命令将输出有关该主题的详细信息,包括其保留大小。您应该看到类似这样的输出: ![]() 步骤 5:生产更多消息 继续生产消息,直到主题中消息的总大小超过 500 MB。一旦超过,Kafka 将开始删除最旧的消息以释放空间。 步骤 6:监控主题 您可以监控日志和主题的大小,以了解 Kafka 如何管理保留大小。您可能需要检查 Kafka 日志目录中的磁盘使用情况,该目录通常位于 Kafka 安装目录下的 `logs/` 中。 步骤 7:测试保留大小 要测试保留大小的工作方式,您可以生产足够的消息以超过保留限制,并观察旧消息是如何被删除的。例如: 日志压缩日志压缩是 Apache Kafka 中的一种保留机制,它允许主题只保留日志中每个键的最新值。此功能对于最新状态比保留所有历史数据更重要的用例特别有用。日志压缩确保 Kafka 有效管理存储,同时提供一种恢复数据最新状态的方法。 日志压缩的关键概念
配置日志压缩要在 Kafka 主题上启用日志压缩,您可以将 `cleanup.policy` 参数设置为 `compact`。以下是演示如何创建具有日志压缩的主题、生产消息并查看压缩效果的步骤。 日志压缩的分步程序 步骤 1:启动 Kafka 服务器和 Zookeeper 确保您的 Kafka 和 Zookeeper 服务器正在运行。您可以使用以下命令启动它们: 步骤 2:创建具有日志压缩的主题 使用 `kafka-topics.sh` 命令创建一个名为 `compaction-topic` 且启用了日志压缩的主题。 步骤 3:向主题生产消息 接下来,使用 `kafka-console-producer.sh` 脚本生产带键的消息。这至关重要,因为日志压缩是基于键工作的。 您可以输入格式为 `key:value` 的消息。 ![]() 步骤 4:验证主题中的消息 要检查主题中当前的消息,您可以使用 `kafka-console-consumer.sh` 脚本从头开始消费消息: 输出 ![]() 步骤 5:观察日志压缩 Kafka 在生产消息后不会立即压缩日志。您可能需要等待一段时间才能运行压缩过程。您也可以通过临时将 `min.cleanable.dirty.ratio` 调整为较低的值来强制进行日志压缩,但通常 Kafka 会根据其内部设置自动处理此过程。 步骤 6:消费压缩后的消息 一段时间后,再次消费消息: 输出 ![]() 在这里,键 1 的第一条和第二条消息已被删除,因为它们比最新版本(第三条消息)旧,而键 2 只保留了最后一条消息。 段配置段配置是 Kafka 架构的一个重要方面,它管理数据如何存储在日志文件中。Kafka 主题被划分为段,这些段是磁盘上存储一系列消息的不可变文件。一旦满足某些条件,每个段都会被关闭,新消息会追加到一个新段中。正确配置段设置对于优化性能、存储效率和资源管理至关重要。 段配置的关键组件1. 段大小 (log.segment.bytes)
2. 段时间 (log.segment.ms)
3. 段索引 (index.interval.bytes)
4. 段的保留 (log.retention.check.interval.ms)
5. 最小可清理比率 (min.cleanable.dirty.ratio)
配置段参数您可以在 Kafka 代理的 `server.properties` 文件中或在主题级别使用 `kafka-topics.sh` 脚本配置段参数。以下是如何设置这些参数的示例。 1. 代理级别配置 要为所有主题设置默认段参数,请修改 `server.properties` 文件: 2. 主题级别配置 要为特定主题设置段参数,请使用以下命令: bash 复制代码 示例程序:监控段配置您可以使用 `kafka-topics.sh` 命令监控和验证 Kafka 主题的段配置设置: 输出 ![]() 下一个主题Kafka-streams-dsl |
我们请求您订阅我们的新闻通讯以获取最新更新。