使用 Kafka 和 Cosmos DB 构建可扩展的管道

2025 年 5 月 16 日 | 阅读 8 分钟

当今世界,处理和理解大量实时数据至关重要。创建快速且可扩展的数据管道有助于应用程序满足现代任务的需求。Apache Kafka 和 Azure Cosmos DB 是两个功能强大的工具,它们协同工作,构建可靠的数据管道。

1. Kafka 和 Cosmos DB 简介

Apache Kafka 概述

Apache Kafka 是一个分布式事件流平台,用于

  1. 实时数据处理: 每秒处理数百万条消息。
  2. 可扩展性: 无缝添加生产者、代理或消费者。
  3. 持久性: 具有复制功能的持久化日志存储,提供容错能力。

核心概念

  • 生产者: 将消息发布到 Kafka 主题。
  • 消费者: 从主题读取消息。
  • 代理: 管理消息分发和存储。
  • 主题: 消息的类别或馈送名称。

Azure Cosmos DB 概述

Azure Cosmos DB 专为高可用性、可扩展性和低延迟而设计。

主要特点

  1. 多模型支持: 包括文档、图形、键值和列族数据模型。
  2. 全局分发: 自动将数据复制到 Azure 各个区域。
  3. 一致性模型: 提供强一致性、最终一致性和有界陈旧性一致性。
  4. 完全托管服务: 处理缩放、备份和维护。

为何结合 Kafka 和 Cosmos DB?

这种组合提供了

  • 实时数据管道: 使用 Kafka 摄取数据流并将其传输到 Cosmos DB。
  • 可扩展的存储和查询: Cosmos DB 提供对流式数据的快速查询。
  • 弹性: Kafka 的容错能力与 Cosmos DB 的全局分发相辅相成。

程序示例:Kafka 生产者到 Cosmos DB

以下是如何使用 Kafka 将实时流式数据发送到 Cosmos DB 的示例。

步骤 1:Kafka 生产者

Kafka 生产者将用户活动事件发布到某个主题。

输出

Building Scalable Pipelines with Kafka and Cosmos DB

步骤 2:Kafka 消费者到 Cosmos DB

Kafka 消费者读取消息并将其插入 Cosmos DB。

输出

Building Scalable Pipelines with Kafka and Cosmos DB

可扩展数据管道的架构

可扩展数据管道的架构旨在处理大容量、实时和容错处理,确保数据以最小的延迟高效地流经各个阶段。

架构概述

  1. 数据摄入
    • 生产者将数据推送到 Kafka 主题。这些可以是实时数据源,如传感器、日志或来自 Web 应用程序的事件驱动数据。
    • Kafka 作为一种持久的分布式消息代理,它缓冲数据并确保无数据丢失。
  2. Stream 处理
    • Kafka 消费者订阅主题并将数据传递给流处理框架,如 Apache Spark Streaming。
    • Spark 根据延迟要求,以微批次或连续方式处理数据。
  3. 数据存储
    • 处理后的数据将推送到数据仓库,如 Google BigQuery,用于分析目的。
    • 存储层设计为随着数据量的增长而水平扩展。
  4. 可视化与监控
    • 使用 Looker Studio 或其他 BI 工具构建仪表板和实时可视化,以深入了解数据。使用 Kafka、Spark Streaming 和 Google BigQuery 构建可扩展数据管道

1. Kafka 生产者代码

此 Kafka 生产者模拟实时事件数据,用于流式传输到 Kafka。

输出

Building Scalable Pipelines with Kafka and Cosmos DB

2. Spark Streaming 消费者代码

此 Spark Streaming 消费者处理来自 Kafka 的数据,执行转换,并将结果存储在 BigQuery 中。

3. 用于数据存储的 BigQuery Schema

数据由 Spark Streaming 处理后,它将被存储在具有以下 Schema 的 BigQuery 表中

  • event_id (INTEGER)
  • event_type (STRING)
  • timestamp (TIMESTAMP)

您需要确保 Spark 写入 Google BigQuery 的必要权限已设置。

4. 处理容错与可扩展性

  • Kafka 通过主题复制(默认 3 个副本)确保消息持久性
  • Spark Streaming 通过利用检查点来处理精确一次语义。
  • 要扩展管道,您可以增加 Kafka 分区、Spark 执行器或 BigQuery 的分区表。

输出

Building Scalable Pipelines with Kafka and Cosmos DB

安装和配置 Kafka

步骤 1:安装 Apache Kafka

步骤 2:启动 Zookeeper 服务器

Kafka 依赖 Zookeeper 来管理其集群元数据。使用提供的配置文件启动 ZooKeeper 服务。

步骤 3:启动 Kafka Broker

Zookeeper 运行后,您可以启动 Kafka Broker。

步骤 4:验证 Kafka 是否正在运行

要检查 Kafka 是否正在运行,请使用 Kafka 服务器的 kafka-topics.sh 命令列出主题。

为数据管道设置主题

步骤 1:创建 Kafka 主题

Kafka 主题是数据管道的基本构建块。您可以使用以下命令创建主题。

此命令创建一个名为 <your_topic_name> 的主题,包含 3 个分区和 1 个复制因子。

步骤 2:验证主题创建

创建主题后,您可以通过再次列出主题来验证它。

步骤 3:设置主题配置(可选)

您可以使用以下命令配置主题的设置,如保留期或清理策略。

此命令将保留期设置为 7 天(以毫秒为单位)。

步骤 4:向主题发布消息

您可以使用生产者开始向 Kafka 主题发送消息。

执行命令后,在终端中键入消息。它们将被发送到 Kafka 主题。

步骤 5:从主题消费消息

您可以通过消费消息来验证消息是否已发布到主题。

此命令将打印发送到该主题的所有消息,从头开始。

Cosmos DB 配置代码示例

使用 Azure CLI

5. 开发 Kafka 生产者

开发 Kafka 生产者

在 Kafka 中,生产者负责将记录(消息)发送到 Kafka 主题。让我们逐步完成开发 Kafka 生产者、编写 Kafka 生产者代码并运行它的过程。

1. 编写 Kafka 生产者代码

要编写 Kafka 生产者,您需要使用 Kafka 的生产者 API,该 API 可以用 Java、Python 等语言编写。以下是使用 Java 编写 Kafka 生产者的示例。

Java Kafka 生产者示例

1. 添加 Kafka 依赖项: 如果您使用 Maven,则需要在 pom.xml 中添加以下依赖项。

2. 编写 Kafka 生产者代码

说明

  • servers:指定 Kafka Broker(此处为 localhost:9092)。
  • serializer 和 value.serializer:定义在发送之前如何序列化键和值。
  • KafkaProducer:与 Kafka Broker 交互的主要生产者类。
  • ProducerRecord:表示将要发送到主题的消息。

2. 运行 Kafka 生产者

编写完 Kafka 生产者代码后,您可以在开发环境中编译并运行它。以下是步骤:

启动 Kafka 服务器: 确保 Kafka 服务器已启动并正在运行。如果您使用的是本地设置,则可以通过执行以下命令来运行 Kafka 服务器:

  • 启动 Zookeeper(如果尚未运行)
  • 启动 Kafka Broker

创建 Kafka 主题: 在运行生产者之前,您需要创建 Kafka 主题(在本例中为 test-topic)。

编译生产者代码: 如果您使用 Maven,可以使用以下命令进行编译:

运行 Kafka 生产者代码: 要运行生产者,请使用以下命令:

或者,如果您使用的是 IntelliJ IDEA 或 Eclipse 等 IDE,您可以直接运行 MyKafkaProducer 类。

预期输出

当生产者成功执行时,将显示以下输出:

Building Scalable Pipelines with Kafka and Cosmos DB

您还可以检查日志以验证消息是否正在发布。为此,您可以运行 Kafka 消费者来验证数据。

这应该会显示生产者发送的消息,确认 Kafka 生产者工作正常。

6. 为 Cosmos DB 创建 Kafka 消费者

编写 Kafka 消费者代码

Kafka 消费者从 Kafka 读取数据并将其插入 Cosmos DB。

Code Example

输出

Building Scalable Pipelines with Kafka and Cosmos DB