从 Kafka 到 Cosmos DB 的变更数据捕获 (CDC)

2025年5月16日 | 阅读 5 分钟

引言

数据更改捕获 (CDC) 是一种用于检测和捕获对数据库所做的更改的技术,以便这些更改可以在其他地方进行处理,例如在数据管道或分析系统中。Apache Kafka 是一个分布式事件流平台,是实现 CDC 管道的流行工具,它提供了可伸缩性、容错性和高吞吐量。Azure Cosmos DB 是一个全球分布式 NoSQL 数据库,由于其可伸缩性和低延迟操作,通常是此类 CDC 管道的目标。

架构概述

该架构包含以下组件

  1. CDC 工具: Debezium 等工具可以从源数据库捕获更改,并将它们流式传输到 Kafka
  2. Kafka: 作为消息传递层来处理 CDC 事件。
  3. Cosmos DB: 捕获到的更改将被存储的目标数据库。
  4. Kafka 消费者: 从 Kafka 读取 CDC 事件并将其写入 Cosmos DB。

架构流程

为了实现下面的架构流程,我们提供了一个简化的 Python 脚本来模拟以下步骤

  1. 模拟源数据库中的更改: 我们使用模拟数据更改,而不是实际的 数据库
  2. 将更改发布到 Kafka: 使用 Kafka producer 将 CDC 事件发送到主题。
  3. 从 Kafka 消费更改并写入 Cosmos DB: 使用 Kafka consumer 处理这些事件并将它们存储在 Cosmos DB 中。

这是完整的程序

组合的 CDC 程序

示例输出

模拟数据库更改

Change Data Capture (CDC) from Kafka to Cosmos DB

Kafka 消费者和 Cosmos DB 写入

Change Data Capture (CDC) from Kafka to Cosmos DB

程序说明

  1. 模拟数据库更改
    • simulate_database_changes 函数将模拟的 CDC 事件生成为 JSON 对象,并使用 KafkaProducer 将它们发送到 Kafka 主题。
  2. 发布到 Kafka
    • 每个更改都会被序列化为 JSON 并发送到 cdc_topic Kafka 主题。
  3. 消费事件
    • KafkaConsumer 监听 cdc_topic 并处理每个事件。
  4. 写入 Cosmos DB
    • container.upsert_item 方法将事件写入 Cosmos DB,如果记录存在则更新,如果不存在则插入。

要点

  • 单个程序: 此脚本整合了整个 CDC 管道流程,以简化操作。
  • 实时处理: 该管道在事件发生时处理它们,并近实时地更新 Cosmos DB。
  • 可伸缩架构: 您可以通过添加更多 Kafka 分区、消费者或 Cosmos DB 容器来按需扩展此架构。

设置 Kafka

步骤 1:安装 Kafka

下载并安装 Kafka。请按照官方文档进行设置。以下是本地 Kafka 安装的快速入门指南

步骤 2:创建 Kafka 主题

为 CDC 事件创建一个主题

源数据库设置

步骤 1:设置示例数据库

在本指南中,我们使用 MySQL 作为源数据库

步骤 2:配置 Debezium 以进行 CDC

Debezium 是一个 CDC 工具,与 Kafka 集成良好。按照以下步骤设置 Debezium

  1. 将 Debezium MySQL 连接器插件添加到您的 Kafka Connect 安装中。
  2. 配置 Debezium MySQL 连接器

在 Kafka Connect 中部署此配置,以开始将更改从 MySQL 流式传输到 Kafka。

Azure Cosmos DB 设置

步骤 1:创建 Cosmos DB 实例

  1. 登录到 Azure 门户。
  2. 创建一个新的 Cosmos DB 帐户。选择 API 为 Core (SQL)
  3. 创建数据库和容器
    • 数据库名称:cdc_demo
    • 容器名称:employees
    • 分区键:/id

步骤 2:记下连接详细信息

从 Azure 门户检索 Cosmos DB 连接字符串和密钥。

代码实现

以下是一个基于 Python 的 Kafka 消费者,它将 CDC 事件提取到 Cosmos DB 中

说明

  1. KafkaConsumer: 从 Kafka 主题读取消息。
  2. CosmosClient: 与 Cosmos DB 交互以插入或更新记录。
  3. Upsert Item: 确保记录根据其 id 被插入或更新。

端到端流程

步骤 1:测试 CDC 管道

1. 在 MySQL 中插入一条新记录

2. 观察 Kafka 消费者日志

3. 收到的事件:{"id": 3, "name": "Charlie", "position": "Analyst", "salary": 60000}

4. 使用 Azure 门户或 SDK 在 Cosmos DB 中验证记录。

步骤 2:更新记录

在 MySQL 中更新一条记录

Kafka 消费者日志

最佳实践

  1. 分区: 在 Cosmos DB 中使用逻辑分区键来优化查询性能。
  2. 错误处理: 为瞬态错误实现重试机制。
  3. 监控: 使用 Prometheus 和 Grafana 等工具监控管道。
  4. 模式演进: 为源数据库中的模式更改做好计划。