Kafka-to-Cosmos DB 管道中的模式演进和序列化

2025 年 5 月 16 日 | 阅读 9 分钟

模式演进和序列化是 Kafka 到 Cosmos DB 管道的关键组成部分。这些概念确保数据在系统之间无缝地进行序列化和传输,从而在模式发生变化时提供灵活性,同时保持数据完整性和一致性。

Kafka 到 Cosmos DB 管道中模式演进的概述

模式演进允许在不破坏 Kafka 到 Cosmos DB 管道的情况下更改数据结构。通过正确定义模式并仔细管理更改,我们确保生产者和消费者在模式更新后仍然兼容。

关键概念

  1. 向后兼容性
    • 较新的模式可以被旧的消费者读取。
    • 更改是添加性的(例如,添加可选字段)。
  2. 向前兼容性
    • 旧的模式可以被新的消费者读取。
    • 避免删除或重命名字段。
  3. 完全兼容性
    • 确保向后和向前兼容性。
    • 更改必须是严格添加性的且无破坏性的。

使用 Avro 在 Kafka 到 Cosmos DB 管道中实现模式演进

Apache Avro 因其通过模式注册表(例如,Confluent Schema Registry)支持模式版本控制而被广泛用于模式演进。

1. 设置 Kafka 和模式注册表

确保以下服务正在运行

  • Kafka Broker
  • Kafka 模式注册表
  • Kafka 主题
  • Cosmos DB Sink Connector

模式演进示例

A. 初始模式(版本 1)

我们为将存储在 Cosmos DB 中的 Kafka 中的事件消息定义一个 Avro 模式。

1. 定义初始 Avro 模式 (user_v1.avsc)

  • 字段:user_id,name
  • 此模式是版本 1。

2. Kafka Producer(使用 Avro 模式)

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 使用V1 模式生成消息。

B. 向后兼容性(版本 2)

现在,我们向模式添加了一个新的可选字段。旧的消费者仍应能够读取新消息。

1. 更新的模式(版本 2)

  • 新增字段 email,并设置了默认值(null)。
  • 确保向后兼容性。

2. 使用新模式的 Kafka Producer

  • 旧的消费者仍然可以处理此消息,因为 email 是可选的。

3. 处理旧消息和新消息的 Kafka Consumer

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 旧的消费者忽略新字段 email 并继续处理。

C. 向前兼容性(版本 3)

现在,我们删除一个字段(name)来查看旧的消费者是否仍然可以正常工作。

1. 更新的模式(版本 3)

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 字段 name 已删除。
  • 破坏了向前兼容性——旧的消费者期望 name 但找不到它。

修复:与其删除 name,不如弃用它并将其标记为可选

  • 旧的消费者仍然可以无故障地读取消息。

D. 完全兼容性(向后 + 向前)

要维护完全兼容性,我们必须

  • 仅添加带默认值的字段
  • 切勿删除现有字段
  • 切勿更改现有字段的数据类型

最终模式

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 确保旧的和新的消费者都可以读取消息。

模式注册表

模式注册表充当 Kafka 生态系统中管理模式的中央存储库。它提供版本控制和验证,确保模式更改兼容。

例如

  • 生产者以 JSON 或 Avro 格式写入数据。
  • 模式注册表验证模式并为其分配唯一 ID。
  • 消费者使用 ID 检索模式以反序列化数据。

Kafka 到 Cosmos DB 管道中的序列化

序列化涉及将结构化数据转换为可以跨 Kafka 主题传输的格式。在 Kafka 到 Cosmos DB 管道中,序列化使得 Kafka 生产者和 Cosmos DB 消费者之间能够高效地传输数据。

常见的序列化格式包括

  • Avro:紧凑、快速,并支持模式演进。
  • JSON人类可读,但在大规模系统中效率较低。
  • Protobuf:高效且紧凑的二进制格式。
  • Parquet:针对分析工作负载进行了优化。

Kafka 到 Cosmos DB 管道中的模式演进和序列化工作流

模式演进确保即使模式随时间变化也能实现平稳的数据处理。在Kafka 到Cosmos DB管道中,序列化和反序列化在处理模式更新而不破坏消费者方面发挥着至关重要的作用。

工作流步骤

  1. 制作人
    • 使用预定义的模式序列化数据。
    • 将模式注册到模式注册表。
  2. Kafka
    • 数据以序列化格式发布到 Kafka 主题。
  3. Consumer
    • 从模式注册表检索模式。
    • 使用模式反序列化数据。
  4. Cosmos DB
    • 反序列化的数据以 JSON 格式写入 Cosmos DB。

实施

我们将为此实现使用Apache Kafka、Confluent Schema Registry、Python Kafka 客户端和 Azure Cosmos DB SDK。

1. 设置环境

确保您拥有

  • 已安装并运行 Apache Kafka。
  • Confluent Schema Registry 正在 https://:8081 上运行。
  • 已设置具有数据库和容器的 Azure Cosmos DB。

2. 定义 Avro 模式

创建版本 1 的模式文件user.avsc

  • 此模式将存储在模式注册表中。
  • email 是可选的,以支持向后兼容性。

生产者 - 序列化数据并发布到 Kafka

我们将使用Confluent Kafka Avro producerPython编写一个 Kafka producer。

Kafka Producer 代码 (producer.py)

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 使用 Avro 序列化数据。
  • 在模式注册表中注册模式。
  • 将事件发布到 Kafka。

Kafka 存储序列化数据

一旦生产者发送了消息,Kafka 就会以序列化的 Avro 格式存储它们。

要验证消息

  • 消息将显示为二进制 Avro 格式。

消费者 - 反序列化数据

消费者从 Kafka 检索消息,从模式注册表中获取模式,然后反序列化它们。

Kafka Consumer 代码 (consumer.py)

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines
  • 从模式注册表检索模式。
  • 将 Avro 消息反序列化为 JSON。
  • 将消息存储到 Cosmos DB。

Cosmos DB 存储反序列化的数据

一旦消费者读取并反序列化了数据,它就会将其写入Azure Cosmos DB。

验证 Cosmos DB 中的数据

运行此Azure CLI 命令来验证存储的记录

  • 应显示JSON 数据:
  • 模式演进得以维持(旧消息仍然有效)。

处理模式演进

现在,让我们修改模式(版本 2)添加一个新字段(phone)。

新模式(版本 2)

  • 新字段 phone 是可选的 → 向后兼容。

生产者使用新模式发送消息

  • 旧的消费者仍然有效,因为 phone 是可选的。

消费者将新数据存储到 Cosmos DB

  • 旧消息仍然可以正确加载。
  • 新消息存储额外数据(phone)。

带有模式演进的 Kafka 到 Cosmos DB 管道

1. 设置 Kafka 和模式注册表

使用 Docker Compose 启动 Kafka 和模式注册表

2. 定义 Avro 模式

为音乐曲目创建 Avro 模式

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines

使用REST API或 Confluent 的 Kafka Python 客户端等库将此模式注册到模式注册表。

3. 带有模式演进的 Kafka Producer

这是一个使用 confluent-kafka 的 Python producer

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines

4. 用于 Cosmos DB 的 Kafka Consumer

消费者从 Kafka 读取,反序列化 Avro 数据,并将其写入 Cosmos DB。

安装 Cosmos DB SDK

Python 消费者代码

输出

Schema Evolution and Serialization in Kafka-to-Cosmos DB Pipelines

5. 模式演进:添加字段

要演进模式,请添加一个名为 genre 的新可选字段

使用新模式的生产者可以发送包含或不包含 genre 字段的消息。消费者将能够优雅地反序列化数据,因为该字段是可选的。

模式演进和序列化的最佳实践

  1. 使用 Avro 或 Protobuf
    • 这些格式支持模式演进,并且对于大规模系统来说效率很高。
  2. 模式注册表
    • 始终使用模式注册表验证模式更改的兼容性。
  3. 向后兼容性
    • 设计模式以向后兼容,以避免破坏下游系统。
  4. 监控 Cosmos DB 性能
    • 优化 Cosmos DB 中的索引和分区以实现实时写入。