Kafka 到 Cosmos DB 故障的死信队列2025 年 5 月 16 日 | 阅读 9 分钟 Kafka 死信队列 (DLQ) 简介什么是死信队列 (DLQ)?死信队列 (DLQ) 是 Kafka 中的一个特殊主题,用于存储因各种原因导致处理失败的消息,例如:
与其丢弃或无限重试失败的消息,不如DLQ 会隔离坏消息,从而便于调试、重新处理或发出警报。 为什么 Kafka 到 Cosmos DB 管道需要 DLQ?当集成 Kafka 与 Azure Cosmos DB 时,可能会在多个点发生故障。DLQ 可确保
Kafka 到 Cosmos DB 的 DLQ 架构概述
代码示例:设置基本的 Kafka DLQ 我们将创建
步骤 1:启动 Kafka 并创建主题运行这些命令来设置 Kafka 主题 步骤 2:Kafka 生产者 (向 Kafka 发送消息) 此生产者会将消息发送到main-topic。 producer.py 输出 ![]() 步骤 3:Kafka 消费者 (处理消息 & 处理故障)此消费者从main-topic读取,模拟故障,并将失败的消息发送到 DLQ。 consumer.py 输出 ![]() 步骤 4:DLQ 消费者 (检查失败的消息)此消费者从dlq-topic读取消息,使我们能够调试故障。 dlq_consumer.py 输出 ![]() 步骤 5:运行管道在单独的终端中运行这些命令 1. 启动 Kafka & 创建主题 2. 运行生产者 3. 运行消费者 4. 运行 DLQ 消费者 在 Kafka 中为 Cosmos DB 故障实现 DLQ为什么在 DLQ 之前实现重试机制?与其立即将失败的消息推送到死信队列 (DLQ),不如
这样可以减少DLQ 中的消息量,并防止不必要的故障。 处理 Cosmos DB 写入失败Cosmos DB 写入失败的常见原因
而不是丢弃消息,我们将
代码实现:带有重试机制的 Kafka 消费者(在发送到 DLQ 之前)
步骤 1:安装依赖项步骤 2:Kafka 生产者 (向 Kafka 发送消息)producer.py 输出 ![]() 步骤 3:Kafka 消费者 (重试,然后发送到 DLQ)此消费者 从main-topic读取 尝试写入Cosmos DB 重试3 次 (指数退避) 如果所有重试都失败,则发送到 DLQ consumer.py 输出 ![]() 步骤 4:DLQ 消费者 (处理失败的消息)dlq_consumer.py 输出 ![]() 步骤 5:运行管道在单独的终端中运行这些命令 1. 启动 Kafka & 创建主题 2. 运行生产者 3. 运行消费者 (重试,然后发送到 DLQ) 4. 运行 DLQ 消费者 监视和自动化 Kafka 中用于 Cosmos DB 故障的 DLQ 处理为什么监视 DLQ?DLQ不是最终目的地。它们存储失败的消息,但我们必须
使用 Prometheus & Grafana 设置 DLQ 监视 我们将集成Prometheus来收集 Kafka DLQ 指标,并使用Grafana来可视化它们。 步骤 1:安装 Prometheus & Grafana 步骤 2:配置 Prometheus 以监视 Kafka 修改Prometheus 配置 (prometheus.yml) 启动 Prometheus 设置 DLQ 指标生产者 我们将创建一个Kafka 消费者,它将 读取 DLQ 消息 记录失败次数 将指标暴露给 Prometheus dlq_metrics.py 步骤 3:在 Prometheus 中查看 DLQ 指标 访问https://:8000/metrics 您应该会看到 步骤 4:为 Kafka DLQ 监视设置 Grafana 仪表板 1. 打开 Grafana (https://:3000) 2. 将 Prometheus 添加为数据源 3. 创建一个新仪表板 4. 添加一个图表面板 5. 使用此 PromQL 查询 6. 点击保存并应用 代码:DLQ 自动重试回放 消费 DLQ 消息 尝试将它们重新发送到 Cosmos DB 如果成功 → 从 DLQ 中删除 如果失败 → 保留在 DLQ 中 dlq_replayer.py 步骤 5:运行回放过程 在修复问题后运行此脚本 (例如,修复 Cosmos DB 中的模式问题)。 步骤 6:预期输出 在修复问题之前 (第一次运行) ![]() 在修复问题之后 (第二次运行) ![]() 扩展死信队列 (DLQ) 以应对大规模工作负载大规模 DLQ 处理中的挑战 随着数据量的增加,如果处理效率不高,DLQ 可能会成为瓶颈。 一些关键挑战包括
为高吞吐量故障优化 DLQ 为了提高性能,我们可以 分区 DLQ - 将负载分散到多个消费者。 并行处理 - 使用 Kafka 消费者组进行回放。 Kafka Streams 进行智能过滤 - 自动分类错误。 步骤 1:为提高效率而分区 DLQ而不是一个 DLQ 主题,我们可以创建多个分区来并行化处理。 创建分区 DLQ 主题 现在,Kafka 将失败的消息分布到3 个分区,以便更快地处理。 步骤 2:更新 DLQ 消费者以处理分区我们将修改我们的DLQ 消费者以 使用 Kafka 消费者组 并行处理消息 dlq_consumer_partitioned.py 步骤 3:使用 Kafka Streams 进行高级 DLQ 处理而不是盲目重试,我们可以在重试前分析故障。 Kafka Streams 允许我们 过滤瞬时错误和永久性错误 将瞬时错误路由到自动重试 将永久性错误发送到单独的存档 步骤 3.1:实现 Kafka Streams 来分类 DLQ 消息 我们创建一个Kafka Streams 处理器来 检查错误类型 仅重试瞬时故障 存档永久性故障 dlq_streams_processor.py 输出 ![]() 步骤 4:实现自动重试消费者retry-topic 包含可以安全重试的消息。 我们将创建一个消费者,它 重试将数据发送到 Cosmos DB 将失败的重试移回 DLQ retry_consumer.py 输出 ![]() 步骤 5:运行优化后的 DLQ 管道1.启动 Kafka & 创建主题 |
我们请求您订阅我们的新闻通讯以获取最新更新。