使用 Kafka 向 Cosmos DB 进行实时流式传输2025 年 5 月 16 日 | 阅读 8 分钟 实时数据流式传输对于需要快速处理和响应数据的现代应用程序至关重要。Apache Kafka 是创建实时数据管道的流行工具,而 Microsoft Azure Cosmos DB 是专为快速访问和高可用性而设计的全局 NoSQL 数据库。将 Kafka 与 Cosmos DB 集成,使企业能够摄取海量数据,实时处理它们,并将结果存储用于分析或事务目的。 1. 设置 Kafka在开始将数据流式传输到 Cosmos DB 之前,我们需要一个正在运行的 Kafka 集群。在本指南中,我们可以使用本地设置或托管服务,例如 Confluent Cloud 或 Azure Event Hubs for Kafka。 本地设置 Kafka 的步骤1. 下载 Apache Kafka 2. 启动 ZooKeeper: Kafka 需要 Zookeeper 来管理其集群。 3. 启动 Kafka Broker 4. 创建 Kafka Topic 配置 Azure Cosmos DBAzure Cosmos DB 是一项全局分布式、多模型数据库服务,专为实时应用程序而设计。在本节中,我们将介绍配置 Azure Cosmos DB 以存储来自 Kafka 的流数据的过程。在本节结束时,您将拥有一个已完全配置好的 Cosmos DB 实例,可用于实时摄取。 步骤 1:创建 Azure Cosmos DB 帐户
选择 **Core (SQL)** 作为 API 选项,因为我们将使用文档 (JSON) 模型。 输入以下详细信息
点击 **查看 + 创建**,然后点击 **创建**。 此过程可能需要几分钟。完成后,您的 Cosmos DB 帐户将准备就绪。 步骤 2:创建数据库和容器
说明
步骤 3:配置访问密钥
示例环境变量配置 步骤 4:测试 Cosmos DB 设置在集成 Kafka 之前,最好通过插入一些示例数据来测试 Cosmos DB 设置。 1. 使用数据资源管理器插入数据 点击保存。 2. 查询数据
输出 ![]() 步骤 5:设置 Cosmos DB 的本地开发环境(可选)出于开发目的,您可以设置本地 Azure Cosmos DB 模拟器。
注意:本地模拟器模拟 Azure Cosmos DB 服务,允许您在不产生费用的情况下开发和测试应用程序。步骤 6:理解吞吐量和分区吞吐量: Cosmos DB 中的吞吐量以请求单位 (RU/秒) 为单位。每个操作(例如,读取、写入或查询)根据操作的复杂性消耗一定数量的 RU。
分区: Cosmos DB 中的分区可确保数据分布在多个节点上,以实现可伸缩性和性能。
开发 Kafka Producer在本节中,我们将开发一个 Kafka producer,它模拟实时事件生成并将这些事件发送到 Kafka 主题 streaming-data。producer 将生成代表用户与音乐流媒体应用程序交互的 JSON 数据,例如播放歌曲。这些数据稍后将被消耗并存储在 Azure Cosmos DB 中。 编写 Kafka Producer 代码将 JSON 消息发送到主题 streaming-data 的 Kafka producer。 KafkaProducerExample.java 输出 ![]() 说明
通过错误处理和配置增强 Producer为了提高 producer 的可靠性和性能,请考虑以下增强功能: 1. 重试: 在出现暂时性错误时设置重试次数。 2. 确认: 配置确认级别以确保消息持久性。 3. 批处理: 启用批处理,通过在单个请求中发送多个记录来提高吞吐量。 4. 压缩: 减小数据大小并减少通过网络发送的数据量。 创建用于 Cosmos DB 的 Kafka Consumer在本节中,我们将开发一个 Kafka consumer,它从 streaming-data 主题读取消息并将它们写入 Azure Cosmos DB。consumer 将处理 JSON 消息并将它们存储在前面配置的 Cosmos DB 帐户的 StreamingDataContainer 中。 步骤 1:设置开发环境确保满足以下先决条件: 1. Kafka 集群: 应已设置并运行 Kafka 集群。 2. Azure Cosmos DB 帐户: Cosmos DB 帐户应配置有数据库 (StreamingDB) 和容器 (StreamingDataContainer)。 3. Azure Cosmos DB Java SDK: 将以下依赖项添加到您的 pom.xml 中,以使用 Cosmos DB Java SDK: 4. IDE: 使用 IntelliJ IDEA 或 Eclipse 等 IDE 来编写和运行 Java 代码。 步骤 2:编写 Kafka Consumer 代码从“streaming-data”主题读取消息并将其保存到 Cosmos DB 的 Kafka consumer。 KafkaConsumerToCosmos.java 输出 ![]() 说明
创建用于 Cosmos DB 的 Kafka ConsumerKafka consumer 将从 Kafka 主题读取数据并将其插入到 Cosmos DB。 Kafka Consumer 代码示例 说明
示例输出 ![]() 端到端代码示例及输出步骤 1启动 Zookeeper 和 Kafka Broker。 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties 步骤 2运行 Kafka producer 脚本。 python kafka_producer.py 步骤 3运行 Kafka consumer 脚本。 python kafka_consumer_cosmosdb.py 步骤 4在 Cosmos DB 中验证数据。
最佳实践和性能优化1. 使用批量插入Cosmos DB 支持批量操作,这可以显着提高吞吐量。 2. 优化 Kafka Consumer 配置
3. 分区策略确保 Kafka 主题和 Cosmos DB 容器具有兼容的分区策略,以避免热点。 4. 监控吞吐量使用 Azure Monitor 来跟踪 Cosmos DB RU(请求单位)使用情况和 Kafka 延迟指标。 5. 错误处理在 Kafka consumer 中实现健壮的错误处理和重试逻辑。 |
我们请求您订阅我们的新闻通讯以获取最新更新。