使用 Kafka 向 Cosmos DB 进行实时流式传输

2025 年 5 月 16 日 | 阅读 8 分钟

实时数据流式传输对于需要快速处理和响应数据的现代应用程序至关重要。Apache Kafka 是创建实时数据管道的流行工具,而 Microsoft Azure Cosmos DB 是专为快速访问和高可用性而设计的全局 NoSQL 数据库。将 Kafka 与 Cosmos DB 集成,使企业能够摄取海量数据,实时处理它们,并将结果存储用于分析或事务目的。

1. 设置 Kafka

在开始将数据流式传输到 Cosmos DB 之前,我们需要一个正在运行的 Kafka 集群。在本指南中,我们可以使用本地设置或托管服务,例如 Confluent Cloud 或 Azure Event Hubs for Kafka。

本地设置 Kafka 的步骤

1. 下载 Apache Kafka

2. 启动 ZooKeeper Kafka 需要 Zookeeper 来管理其集群。

3. 启动 Kafka Broker

4. 创建 Kafka Topic

配置 Azure Cosmos DB

Azure Cosmos DB 是一项全局分布式、多模型数据库服务,专为实时应用程序而设计。在本节中,我们将介绍配置 Azure Cosmos DB 以存储来自 Kafka 的流数据的过程。在本节结束时,您将拥有一个已完全配置好的 Cosmos DB 实例,可用于实时摄取。

步骤 1:创建 Azure Cosmos DB 帐户

  1. 登录 Azure 门户: 转到 Azure 门户并使用您的凭据登录。
  2. 创建新的 Cosmos DB 帐户
    在左侧菜单中,选择 **创建资源 > 数据库 > Azure Cosmos DB**。

选择 **Core (SQL)** 作为 API 选项,因为我们将使用文档 (JSON) 模型。

输入以下详细信息

  • 订阅: 选择您的订阅。
  • 资源组: 创建新的资源组或使用现有的资源组。
  • 帐户名称: 为您的 Cosmos DB 帐户提供一个唯一的名称。
  • 位置: 选择一个靠近您的 Kafka 集群的区域。
  • 容量模式: 选择 **预配吞吐量**。

点击 **查看 + 创建**,然后点击 **创建**。

此过程可能需要几分钟。完成后,您的 Cosmos DB 帐户将准备就绪。

步骤 2:创建数据库和容器

  1. 导航到您的 Cosmos DB 帐户: 您的 Cosmos DB 帐户创建后,转到 **数据资源管理器**
  2. 创建新数据库
    • 点击 **新建数据库**。
    • 输入数据库 ID,例如 StreamingDB。
    • 对于吞吐量,选择 **手动** 并将值设置为 **400 RU/秒**(每秒请求单位)。
    • 单击确定
  3. 创建新容器
    • 在新建的数据库下,点击 **新建容器**。
    • 输入以下详细信息
      • 容器 ID: StreamingDataContainer
      • 分区键: /song_id
    • 单击确定

说明

  • 数据库 StreamingDB 将作为集合的逻辑分组。
  • 容器 StreamingDataContainer 是 Kafka 的 JSON 数据将存储的地方。
  • 分区键 /song_id 可确保数据在分区之间均匀分布,这有助于优化查询性能。

步骤 3:配置访问密钥

  1. 检索主连接字符串
    • 转到 Cosmos DB 帐户的 **密钥** 选项卡。
    • 复制 **主连接字符串**。Kafka 客户端将使用此字符串连接到 Cosmos DB。
  2. 安全保存连接字符串: 将连接字符串存储在安全的位置,例如环境变量或安全配置文件。

示例环境变量配置

步骤 4:测试 Cosmos DB 设置

在集成 Kafka 之前,最好通过插入一些示例数据来测试 Cosmos DB 设置。

1. 使用数据资源管理器插入数据

点击保存。

2. 查询数据

  • 在数据资源管理器中点击 **新建 SQL 查询**。
  • 输入以下查询以检索所有项
  • 点击 **执行查询**。

输出

Real-time Streaming to Cosmos DB with Kafka

步骤 5:设置 Cosmos DB 的本地开发环境(可选)

出于开发目的,您可以设置本地 Azure Cosmos DB 模拟器。

  1. 下载并安装 Cosmos DB 模拟器: 转到 Azure Cosmos DB 模拟器页面并在您的机器上安装模拟器。
  2. 启动模拟器: 安装后,启动模拟器。默认情况下,它运行在 https://:8081。
  3. 检索连接字符串: 打开模拟器并复制连接字符串。在您的本地开发环境中使用此字符串。

注意:本地模拟器模拟 Azure Cosmos DB 服务,允许您在不产生费用的情况下开发和测试应用程序。

步骤 6:理解吞吐量和分区

吞吐量: Cosmos DB 中的吞吐量以请求单位 (RU/秒) 为单位。每个操作(例如,读取、写入或查询)根据操作的复杂性消耗一定数量的 RU。

  • 预配吞吐量
    • 您为数据库或容器分配固定的每秒 RU 数量。
    • 适用于具有可预测工作负载的场景。
  • 自动缩放吞吐量
    • Cosmos DB 根据工作负载自动缩放 RU。
    • 适用于具有可变工作负载的场景。

分区: Cosmos DB 中的分区可确保数据分布在多个节点上,以实现可伸缩性和性能。

  • 分区键: Cosmos DB 中的分区有助于将数据分布到不同的节点上,以提高可伸缩性和性能。

开发 Kafka Producer

在本节中,我们将开发一个 Kafka producer,它模拟实时事件生成并将这些事件发送到 Kafka 主题 streaming-data。producer 将生成代表用户与音乐流媒体应用程序交互的 JSON 数据,例如播放歌曲。这些数据稍后将被消耗并存储在 Azure Cosmos DB 中。

编写 Kafka Producer 代码

将 JSON 消息发送到主题 streaming-data 的 Kafka producer。

KafkaProducerExample.java

输出

Real-time Streaming to Cosmos DB with Kafka

说明

  • 每条消息都将具有唯一的键(song0, song1 等)和随机生成的值发送到主题 streaming-data。
  • 回调函数会记录每条成功发送消息的分区和偏移量。

通过错误处理和配置增强 Producer

为了提高 producer 的可靠性和性能,请考虑以下增强功能:

1. 重试: 在出现暂时性错误时设置重试次数。

2. 确认: 配置确认级别以确保消息持久性。

3. 批处理: 启用批处理,通过在单个请求中发送多个记录来提高吞吐量。

4. 压缩: 减小数据大小并减少通过网络发送的数据量。

创建用于 Cosmos DB 的 Kafka Consumer

在本节中,我们将开发一个 Kafka consumer,它从 streaming-data 主题读取消息并将它们写入 Azure Cosmos DB。consumer 将处理 JSON 消息并将它们存储在前面配置的 Cosmos DB 帐户的 StreamingDataContainer 中。

步骤 1:设置开发环境

确保满足以下先决条件:

1. Kafka 集群: 应已设置并运行 Kafka 集群。

2. Azure Cosmos DB 帐户: Cosmos DB 帐户应配置有数据库 (StreamingDB) 和容器 (StreamingDataContainer)。

3. Azure Cosmos DB Java SDK: 将以下依赖项添加到您的 pom.xml 中,以使用 Cosmos DB Java SDK:

4. IDE: 使用 IntelliJ IDEA 或 Eclipse 等 IDE 来编写和运行 Java 代码。

步骤 2:编写 Kafka Consumer 代码

从“streaming-data”主题读取消息并将其保存到 Cosmos DB 的 Kafka consumer。

KafkaConsumerToCosmos.java

输出

Real-time Streaming to Cosmos DB with Kafka

说明

  • consumer 从 Kafka 主题 streaming-data 读取消息。
  • 每条消息都会被处理并插入到 Cosmos DB 容器 StreamingDataContainer 中。
  • 输出显示了消耗的记录详细信息以及 Cosmos DB 中相应的请求费用。

创建用于 Cosmos DB 的 Kafka Consumer

Kafka consumer 将从 Kafka 主题读取数据并将其插入到 Cosmos DB。

Kafka Consumer 代码示例

说明

  • CosmosClient: 使用端点和主密钥连接到 Azure Cosmos DB 帐户。
  • get_database_client: 检索名为 RealTimeDataDB 的数据库。
  • get_container_client: 检索名为 SensorData 的容器。
  • KafkaConsumer: 初始化一个 Kafka consumer,用于从主题 real-time-data 读取消息。
  • container.create_item: 将每条消息插入 Cosmos DB 容器。

示例输出

Real-time Streaming to Cosmos DB with Kafka

端到端代码示例及输出

步骤 1

启动 Zookeeper 和 Kafka Broker。

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

步骤 2

运行 Kafka producer 脚本。

python kafka_producer.py

步骤 3

运行 Kafka consumer 脚本。

python kafka_consumer_cosmosdb.py

步骤 4

在 Cosmos DB 中验证数据。

  • 转到 Azure 门户,导航到 Cosmos DB 帐户,然后打开 SensorData 容器。
  • 您应该会看到插入的文档。

最佳实践和性能优化

1. 使用批量插入

Cosmos DB 支持批量操作,这可以显着提高吞吐量。

2. 优化 Kafka Consumer 配置

  • 使用更高的 fetch.max.bytes 和 max.poll.records 来减少网络调用次数。
  • 启用 enable_auto_commit 以确保定期提交偏移量。

3. 分区策略

确保 Kafka 主题和 Cosmos DB 容器具有兼容的分区策略,以避免热点。

4. 监控吞吐量

使用 Azure Monitor 来跟踪 Cosmos DB RU(请求单位)使用情况和 Kafka 延迟指标。

5. 错误处理

在 Kafka consumer 中实现健壮的错误处理和重试逻辑。