Kafka 到 Cosmos DB 故障的死信队列

2025 年 5 月 16 日 | 阅读 9 分钟

Kafka 死信队列 (DLQ) 简介

什么是死信队列 (DLQ)?

死信队列 (DLQ)Kafka 中的一个特殊主题,用于存储因各种原因导致处理失败的消息,例如:

  • 消息格式问题 (无效的 JSON、字段缺失)
  • 序列化/反序列化失败
  • 下游服务失败 (例如,Cosmos DB 不可用)
  • 时效性数据过期

与其丢弃或无限重试失败的消息,不如DLQ 会隔离坏消息,从而便于调试、重新处理或发出警报

为什么 Kafka 到 Cosmos DB 管道需要 DLQ?

当集成 KafkaAzure Cosmos DB 时,可能会在多个点发生故障。DLQ 可确保

  • 数据完整性:失败的消息不会丢失。
  • 错误恢复:修复问题后可以重放失败的消息。
  • 操作效率:减少重试次数,防止阻塞健康的消息。

Kafka 到 Cosmos DB 的 DLQ 架构概述

  1. Kafka 生产者: 将消息发布到主主题 (Main Topic)
  2. Kafka 消费者:主主题读取并写入Cosmos DB
  3. 故障检测: 如果消息失败,则将其推送到DLQ 主题
  4. DLQ 处理器: 稍后,将从 DLQ 中重试或手动检查消息

代码示例:设置基本的 Kafka DLQ

我们将创建

  • 一个 Kafka 生产者来发送消息。
  • 一个 Kafka 消费者来处理消息并模拟故障。
  • 一个 DLQ 生产者将失败的消息存储在 DLQ 中。

步骤 1:启动 Kafka 并创建主题

运行这些命令来设置 Kafka 主题

步骤 2:Kafka 生产者 (向 Kafka 发送消息)

此生产者会将消息发送到main-topic

producer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 3:Kafka 消费者 (处理消息 & 处理故障)

此消费者从main-topic读取,模拟故障,并将失败的消息发送到 DLQ

consumer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 4:DLQ 消费者 (检查失败的消息)

此消费者从dlq-topic读取消息,使我们能够调试故障。

dlq_consumer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 5:运行管道

单独的终端中运行这些命令

1. 启动 Kafka & 创建主题

2. 运行生产者

3. 运行消费者

4. 运行 DLQ 消费者

在 Kafka 中为 Cosmos DB 故障实现 DLQ

为什么在 DLQ 之前实现重试机制?

与其立即将失败的消息推送到死信队列 (DLQ),不如

  1. 重试处理几次 (例如,网络问题可能是暂时的)。
  2. 仅在所有重试都失败后才发送到DLQ

这样可以减少DLQ 中的消息量,并防止不必要的故障。

处理 Cosmos DB 写入失败

Cosmos DB 写入失败的常见原因

  • 超出速率限制 (请求单位 - RU)
  • 瞬时网络问题
  • 模式验证错误
  • 重复 ID 约束

而不是丢弃消息,我们将

  1. 重试消息 (针对瞬时问题)。
  2. 记录错误 (针对永久性问题)。
  3. 发送到 DLQ (如果所有重试都失败)。

代码实现:带有重试机制的 Kafka 消费者(在发送到 DLQ 之前)

  • 我们将修改我们的Kafka 消费者
    在发送到 DLQ 之前重试 Cosmos DB 写入
    记录失败原因
    使用指数退避策略进行重试

步骤 1:安装依赖项

步骤 2:Kafka 生产者 (向 Kafka 发送消息)

producer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 3:Kafka 消费者 (重试,然后发送到 DLQ)

此消费者

main-topic读取

尝试写入Cosmos DB

重试3 次 (指数退避)

如果所有重试都失败,则发送到 DLQ

consumer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 4:DLQ 消费者 (处理失败的消息)

dlq_consumer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 5:运行管道

单独的终端中运行这些命令

1. 启动 Kafka & 创建主题

2. 运行生产者

3. 运行消费者 (重试,然后发送到 DLQ)

4. 运行 DLQ 消费者

监视和自动化 Kafka 中用于 Cosmos DB 故障的 DLQ 处理

为什么监视 DLQ?

DLQ不是最终目的地。它们存储失败的消息,但我们必须

  • 跟踪有多少消息进入 DLQ。
  • 识别常见的失败原因
  • 在错误修复后重新处理消息。

使用 Prometheus & Grafana 设置 DLQ 监视

我们将集成Prometheus来收集 Kafka DLQ 指标,并使用Grafana来可视化它们。

步骤 1:安装 Prometheus & Grafana

步骤 2:配置 Prometheus 以监视 Kafka

修改Prometheus 配置 (prometheus.yml)

启动 Prometheus

设置 DLQ 指标生产者

我们将创建一个Kafka 消费者,它将

读取 DLQ 消息

记录失败次数

将指标暴露给 Prometheus

dlq_metrics.py

步骤 3:在 Prometheus 中查看 DLQ 指标

访问https://:8000/metrics

您应该会看到

步骤 4:为 Kafka DLQ 监视设置 Grafana 仪表板

1. 打开 Grafana (https://:3000)

2. 将 Prometheus 添加为数据源

3. 创建一个新仪表板

4. 添加一个图表面板

5. 使用此 PromQL 查询

6. 点击保存并应用

代码:DLQ 自动重试回放

消费 DLQ 消息

尝试将它们重新发送到 Cosmos DB

如果成功 → 从 DLQ 中删除

如果失败 → 保留在 DLQ 中

dlq_replayer.py

步骤 5:运行回放过程

修复问题后运行此脚本 (例如,修复 Cosmos DB 中的模式问题)。

步骤 6:预期输出

在修复问题之前 (第一次运行)

Dead Letter Queues for Kafka to Cosmos DB Failures

在修复问题之后 (第二次运行)

Dead Letter Queues for Kafka to Cosmos DB Failures

扩展死信队列 (DLQ) 以应对大规模工作负载

大规模 DLQ 处理中的挑战

随着数据量的增加,如果处理效率不高,DLQ 可能会成为瓶颈

一些关键挑战包括

  1. 高失败率 - 数千条消息进入 DLQ。
  2. 重处理延迟 - 缓慢的重试导致积压。
  3. 单一消费者瓶颈 - 单个消费者可能无法跟上。
  4. 存储开销 - DLQ 无限增长。

为高吞吐量故障优化 DLQ

为了提高性能,我们可以

分区 DLQ - 将负载分散到多个消费者。

并行处理 - 使用 Kafka 消费者组进行回放。

Kafka Streams 进行智能过滤 - 自动分类错误。

步骤 1:为提高效率而分区 DLQ

而不是一个 DLQ 主题,我们可以创建多个分区来并行化处理。

创建分区 DLQ 主题

现在,Kafka 将失败的消息分布到3 个分区,以便更快地处理。

步骤 2:更新 DLQ 消费者以处理分区

我们将修改我们的DLQ 消费者

使用 Kafka 消费者组

并行处理消息

dlq_consumer_partitioned.py

步骤 3:使用 Kafka Streams 进行高级 DLQ 处理

而不是盲目重试,我们可以在重试前分析故障

Kafka Streams 允许我们

过滤瞬时错误和永久性错误

将瞬时错误路由到自动重试

将永久性错误发送到单独的存档

步骤 3.1:实现 Kafka Streams 来分类 DLQ 消息

我们创建一个Kafka Streams 处理器

检查错误类型

仅重试瞬时故障

存档永久性故障

dlq_streams_processor.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 4:实现自动重试消费者

retry-topic 包含可以安全重试的消息。

我们将创建一个消费者,它

重试将数据发送到 Cosmos DB

将失败的重试移回 DLQ

retry_consumer.py

输出

Dead Letter Queues for Kafka to Cosmos DB Failures

步骤 5:运行优化后的 DLQ 管道

1.启动 Kafka & 创建主题