Kafka-to-Cosmos DB 管道中的模式演进和序列化2025 年 5 月 16 日 | 阅读 9 分钟 模式演进和序列化是 Kafka 到 Cosmos DB 管道的关键组成部分。这些概念确保数据在系统之间无缝地进行序列化和传输,从而在模式发生变化时提供灵活性,同时保持数据完整性和一致性。 Kafka 到 Cosmos DB 管道中模式演进的概述模式演进允许在不破坏 Kafka 到 Cosmos DB 管道的情况下更改数据结构。通过正确定义模式并仔细管理更改,我们确保生产者和消费者在模式更新后仍然兼容。 关键概念
使用 Avro 在 Kafka 到 Cosmos DB 管道中实现模式演进Apache Avro 因其通过模式注册表(例如,Confluent Schema Registry)支持模式版本控制而被广泛用于模式演进。 1. 设置 Kafka 和模式注册表确保以下服务正在运行
模式演进示例A. 初始模式(版本 1)我们为将存储在 Cosmos DB 中的 Kafka 中的事件消息定义一个 Avro 模式。 1. 定义初始 Avro 模式 (user_v1.avsc)
2. Kafka Producer(使用 Avro 模式)输出 ![]()
B. 向后兼容性(版本 2)现在,我们向模式添加了一个新的可选字段。旧的消费者仍应能够读取新消息。 1. 更新的模式(版本 2)
2. 使用新模式的 Kafka Producer
3. 处理旧消息和新消息的 Kafka Consumer 输出 ![]()
C. 向前兼容性(版本 3)现在,我们删除一个字段(name)来查看旧的消费者是否仍然可以正常工作。 1. 更新的模式(版本 3) 输出 ![]()
修复:与其删除 name,不如弃用它并将其标记为可选
D. 完全兼容性(向后 + 向前)要维护完全兼容性,我们必须
最终模式 输出 ![]()
模式注册表模式注册表充当 Kafka 生态系统中管理模式的中央存储库。它提供版本控制和验证,确保模式更改兼容。 例如
Kafka 到 Cosmos DB 管道中的序列化序列化涉及将结构化数据转换为可以跨 Kafka 主题传输的格式。在 Kafka 到 Cosmos DB 管道中,序列化使得 Kafka 生产者和 Cosmos DB 消费者之间能够高效地传输数据。 常见的序列化格式包括
Kafka 到 Cosmos DB 管道中的模式演进和序列化工作流模式演进确保即使模式随时间变化也能实现平稳的数据处理。在Kafka 到Cosmos DB管道中,序列化和反序列化在处理模式更新而不破坏消费者方面发挥着至关重要的作用。 工作流步骤
实施我们将为此实现使用Apache Kafka、Confluent Schema Registry、Python Kafka 客户端和 Azure Cosmos DB SDK。 1. 设置环境确保您拥有
2. 定义 Avro 模式创建版本 1 的模式文件user.avsc
生产者 - 序列化数据并发布到 Kafka我们将使用Confluent Kafka Avro producer用Python编写一个 Kafka producer。 Kafka Producer 代码 (producer.py) 输出 ![]()
Kafka 存储序列化数据一旦生产者发送了消息,Kafka 就会以序列化的 Avro 格式存储它们。 要验证消息
消费者 - 反序列化数据消费者从 Kafka 检索消息,从模式注册表中获取模式,然后反序列化它们。 Kafka Consumer 代码 (consumer.py) 输出 ![]()
Cosmos DB 存储反序列化的数据一旦消费者读取并反序列化了数据,它就会将其写入Azure Cosmos DB。 验证 Cosmos DB 中的数据 运行此Azure CLI 命令来验证存储的记录
处理模式演进现在,让我们修改模式(版本 2)以添加一个新字段(phone)。 新模式(版本 2)
生产者使用新模式发送消息
消费者将新数据存储到 Cosmos DB
带有模式演进的 Kafka 到 Cosmos DB 管道1. 设置 Kafka 和模式注册表使用 Docker Compose 启动 Kafka 和模式注册表 2. 定义 Avro 模式为音乐曲目创建 Avro 模式 输出 ![]() 使用REST API或 Confluent 的 Kafka Python 客户端等库将此模式注册到模式注册表。 3. 带有模式演进的 Kafka Producer这是一个使用 confluent-kafka 的 Python producer 输出 ![]() 4. 用于 Cosmos DB 的 Kafka Consumer消费者从 Kafka 读取,反序列化 Avro 数据,并将其写入 Cosmos DB。 安装 Cosmos DB SDK Python 消费者代码 输出 ![]() 5. 模式演进:添加字段要演进模式,请添加一个名为 genre 的新可选字段 使用新模式的生产者可以发送包含或不包含 genre 字段的消息。消费者将能够优雅地反序列化数据,因为该字段是可选的。 模式演进和序列化的最佳实践
|
我们请求您订阅我们的新闻通讯以获取最新更新。