使用 Kafka 和 Cosmos DB 构建可扩展的管道2025 年 5 月 16 日 | 阅读 8 分钟 当今世界,处理和理解大量实时数据至关重要。创建快速且可扩展的数据管道有助于应用程序满足现代任务的需求。Apache Kafka 和 Azure Cosmos DB 是两个功能强大的工具,它们协同工作,构建可靠的数据管道。 1. Kafka 和 Cosmos DB 简介Apache Kafka 概述Apache Kafka 是一个分布式事件流平台,用于
核心概念
Azure Cosmos DB 概述Azure Cosmos DB 专为高可用性、可扩展性和低延迟而设计。 主要特点
为何结合 Kafka 和 Cosmos DB?这种组合提供了
程序示例:Kafka 生产者到 Cosmos DB 以下是如何使用 Kafka 将实时流式数据发送到 Cosmos DB 的示例。 步骤 1:Kafka 生产者 Kafka 生产者将用户活动事件发布到某个主题。 输出 ![]() 步骤 2:Kafka 消费者到 Cosmos DB Kafka 消费者读取消息并将其插入 Cosmos DB。 输出 ![]() 可扩展数据管道的架构可扩展数据管道的架构旨在处理大容量、实时和容错处理,确保数据以最小的延迟高效地流经各个阶段。 架构概述
1. Kafka 生产者代码 此 Kafka 生产者模拟实时事件数据,用于流式传输到 Kafka。 输出 ![]() 2. Spark Streaming 消费者代码 此 Spark Streaming 消费者处理来自 Kafka 的数据,执行转换,并将结果存储在 BigQuery 中。 3. 用于数据存储的 BigQuery Schema 数据由 Spark Streaming 处理后,它将被存储在具有以下 Schema 的 BigQuery 表中
您需要确保 Spark 写入 Google BigQuery 的必要权限已设置。 4. 处理容错与可扩展性
输出 ![]() 安装和配置 Kafka步骤 1:安装 Apache Kafka步骤 2:启动 Zookeeper 服务器Kafka 依赖 Zookeeper 来管理其集群元数据。使用提供的配置文件启动 ZooKeeper 服务。 步骤 3:启动 Kafka BrokerZookeeper 运行后,您可以启动 Kafka Broker。 步骤 4:验证 Kafka 是否正在运行要检查 Kafka 是否正在运行,请使用 Kafka 服务器的 kafka-topics.sh 命令列出主题。 为数据管道设置主题步骤 1:创建 Kafka 主题Kafka 主题是数据管道的基本构建块。您可以使用以下命令创建主题。 此命令创建一个名为 <your_topic_name> 的主题,包含 3 个分区和 1 个复制因子。 步骤 2:验证主题创建创建主题后,您可以通过再次列出主题来验证它。 步骤 3:设置主题配置(可选)您可以使用以下命令配置主题的设置,如保留期或清理策略。 此命令将保留期设置为 7 天(以毫秒为单位)。 步骤 4:向主题发布消息您可以使用生产者开始向 Kafka 主题发送消息。 执行命令后,在终端中键入消息。它们将被发送到 Kafka 主题。 步骤 5:从主题消费消息您可以通过消费消息来验证消息是否已发布到主题。 此命令将打印发送到该主题的所有消息,从头开始。 Cosmos DB 配置代码示例使用 Azure CLI 5. 开发 Kafka 生产者开发 Kafka 生产者在 Kafka 中,生产者负责将记录(消息)发送到 Kafka 主题。让我们逐步完成开发 Kafka 生产者、编写 Kafka 生产者代码并运行它的过程。 1. 编写 Kafka 生产者代码 要编写 Kafka 生产者,您需要使用 Kafka 的生产者 API,该 API 可以用 Java、Python 等语言编写。以下是使用 Java 编写 Kafka 生产者的示例。 Java Kafka 生产者示例 1. 添加 Kafka 依赖项: 如果您使用 Maven,则需要在 pom.xml 中添加以下依赖项。 2. 编写 Kafka 生产者代码 说明
2. 运行 Kafka 生产者 编写完 Kafka 生产者代码后,您可以在开发环境中编译并运行它。以下是步骤: 启动 Kafka 服务器: 确保 Kafka 服务器已启动并正在运行。如果您使用的是本地设置,则可以通过执行以下命令来运行 Kafka 服务器:
创建 Kafka 主题: 在运行生产者之前,您需要创建 Kafka 主题(在本例中为 test-topic)。 编译生产者代码: 如果您使用 Maven,可以使用以下命令进行编译: 运行 Kafka 生产者代码: 要运行生产者,请使用以下命令: 或者,如果您使用的是 IntelliJ IDEA 或 Eclipse 等 IDE,您可以直接运行 MyKafkaProducer 类。 预期输出 当生产者成功执行时,将显示以下输出: ![]() 您还可以检查日志以验证消息是否正在发布。为此,您可以运行 Kafka 消费者来验证数据。 这应该会显示生产者发送的消息,确认 Kafka 生产者工作正常。 6. 为 Cosmos DB 创建 Kafka 消费者编写 Kafka 消费者代码Kafka 消费者从 Kafka 读取数据并将其插入 Cosmos DB。 Code Example 输出 ![]() |
我们请求您订阅我们的新闻通讯以获取最新更新。