Cosmos DB 作为 Apache Kafka 主题的接收器

2025年5月16日 | 阅读 5 分钟

将 Cosmos DB 用作 Apache Kafka 主题的接收器是一个强大的组合,它允许将实时流数据摄取到像 Azure Cosmos DB 这样全球分布式、多模型数据库中。这使得企业能够高效地处理、分析和可视化数据。

1. Cosmos DB 和 Kafka 简介

Cosmos DB 概述

Azure Cosmos DB 是一个完全托管的 NoSQL 数据库服务,专为现代应用程序开发而设计。主要功能包括:

  • 多模型支持:支持文档、键值、图和列族数据模型。
  • 全球分布式 允许跨多个区域进行复制。
  • 低延迟:保证个位数毫秒级的响应时间。

Apache Kafka 概述

Apache Kafka 是一个分布式事件流平台,用于构建实时数据管道和流应用程序。主要组件包括:

  • 生产者:将数据发布到 Kafka 主题。
  • 主题:对数据流进行分类和存储。
  • 消费者:从主题读取和处理数据。

为什么使用 Cosmos DB 作为接收器?

  • 实时数据处理:将流数据存储在 Cosmos DB 中,用于分析和可视化。
  • 可扩展性:处理高速度的数据流。
  • 无缝集成:利用 Cosmos DB Kafka 连接器实现平稳摄取。

2. 架构概述

将 Cosmos DB 用作接收器的典型架构包括以下组件:

Kafka 生产者

生产者将实时事件发送到 Kafka 主题。生产者通常使用 Java 或 Python 等编程语言中的 Kafka 库来实现。

Kafka 主题

主题作为流数据的存储单元。生产者写入主题,消费者(如 Cosmos DB Sink Connector)从中读取。

Kafka Connect

Kafka Connect 简化了 Kafka 与外部系统的集成。Cosmos DB Sink Connector 是 Kafka Connect 的一个插件。

Cosmos DB Sink Connector

Cosmos DB Sink Connector 从 Kafka 主题读取数据并将其写入 Cosmos DB 容器。它负责将 Kafka 主题模式映射到 Cosmos DB 文档结构。

3. 环境设置

前提条件

  1. Azure 订阅:可以访问 Azure Portal 来设置 Cosmos DB。
  2. Apache Kafka已安装并在本地或云环境中运行。
  3. Kafka Connect:作为 Kafka 生态系统的一部分已安装。
  4. Cosmos DB Kafka Connector:从 Confluent Hub 下载。
  5. Java Development Kit (JDK):用于生产者开发。
  6. Maven 或 Gradle:用于依赖管理。

设置 Apache Kafka

1. 安装 Kafka

2. tar -xvf kafka_2.13-3.5.1.tgz

3. 启动 Kafka

4. 创建一个主题

配置 Azure Cosmos DB

  1. 创建 Cosmos DB 帐户
    • 导航到 Azure 门户。
    • 选择 **Azure Cosmos DB** 并创建一个新帐户,使用 **Core (SQL)**(SQL
  2. 创建数据库和容器
    • 创建一个数据库(例如,kafka_db)。
    • 添加一个容器(例如,kafka_data),并指定一个分区键(例如,/id)。
  3. 获取连接详细信息
    • 从 Cosmos DB 帐户复制 **Primary Key** 和 **Connection String**。

4. 开发 Kafka 生产者

Apache Kafka 是一个强大的消息和流平台,它促进了生产者和消费者之间无缝的数据交换。当 Cosmos DB 用作接收器时,Kafka 成为摄取、处理并将数据传输到 Azure Cosmos DB 进行存储和分析的通道。

实现 Kafka 生产者以用于 Cosmos DB 的步骤

  1. 设置 Kafka 和 Cosmos DB 环境
    • 安装 Apache Kafka,并确保 Broker 和 Zookeeper 正在运行。
    • 设置 Cosmos DB,并使用 Kafka Connect 框架将其配置为充当 Kafka 接收器。
  2. 将 Kafka Connect 配置为使用 Cosmos DB Sink
    • 使用 Cosmos DB sink 连接器。
    • 使用端点 URL、访问密钥和数据库名称等必要参数配置连接器。
  3. Java 中开发 Kafka 生产者
    • 创建一个 Java 应用程序,该应用程序生成样本数据并将其发送到 Kafka 主题。

前提条件

  • 已安装 Java Development Kit (JDK)(8 或更高版本)。
  • 已安装 Apache Kafka,并在本地或服务器上运行。
  • 拥有创建容器和数据库权限的 Azure Cosmos DB 帐户。
  • 已安装带有 Cosmos DB Sink Connector 的 Kafka Connect。

Code Example

1. Maven 项目结构

确保您的项目使用 Maven 来管理依赖项。以下是一个示例 pom.xml:

2. Kafka 生产者实现

创建一个用于 Kafka 生产者的 Java 类。

3. 使用 Cosmos DB Sink 配置 Kafka Connect

1. 安装 Cosmos DB Sink Connector。

  • 下载 Cosmos DB Kafka Connector JAR 文件,并将其放置在 Kafka 的 libs 目录中。

2. 使用属性文件(cosmosdb-sink.properties)配置 sink 连接器。

3. 启动 Kafka Connect worker 并加载连接器配置。

4. 运行示例

1. 启动 Kafka 组件

  • 启动 Zookeeper 和 Kafka broker。
  • 使用 kafka-topics.sh 验证 Kafka 主题。

2. 运行生产者

  • 编译并运行生产者代码。
  • 检查 Kafka 日志以验证消息是否已生成。

3. 验证 Cosmos DB

  • 检查 Cosmos DB 容器以查看已摄取的数据。

示例输出

生产者控制台输出

Cosmos DB as a Sink for Apache Kafka Topics

Cosmos DB 数据

Cosmos DB as a Sink for Apache Kafka Topics

5. 配置 Cosmos DB Sink Connector

安装和设置

可以使用 Confluent Hub CLI 安装 Cosmos DB Sink Connector。

安装后,通过启动 Kafka Connect worker 来确保其正在运行。

配置参数

创建一个 Cosmos DB Sink Connector 的配置文件(例如,cosmos_sink_connector.json),其中包含以下详细信息:

关键参数说明

  • connector.class:指定要使用的连接器类。
  • tasks.max:连接器的任务数量。
  • topics:连接器将从中读取的 Kafka 主题。
  • cosmos.connection.endpoint:Cosmos DB 帐户的 URI。
  • cosmos.connection.key:Cosmos DB 帐户的主密钥。
  • cosmos.database.name:目标 Cosmos DB 数据库。
  • cosmos.container.name:目标 Cosmos DB 数据库中的容器。
  • cosmos.key.converter 和 cosmos.value.converter:键和值的格式转换器。
  • cosmos.max.batch.size:每个批次的最大记录数。
  • cosmos.batch.flush.timeout.ms:刷新批次之前的超时时间(以毫秒为单位)。