利用 Kafka Connect 进行 Cosmos DB 集成

2025 年 5 月 16 日 | 阅读 8 分钟

Kafka Connect 和 Cosmos DB 简介

Kafka Connect 是一个强大且可靠的数据集成工具,用于在 Apache Kafka 和外部系统之间进行数据集成。通过 Kafka Connect 将 Kafka 与 Azure Cosmos DB 集成,可以使组织能够高效地摄取、处理和分析实时数据。

什么是 Kafka Connect?

Kafka Connect 是一个分布式数据集成框架,旨在以容错且可扩展的方式在大规模数据在 Apache Kafka 和外部系统之间传输。它提供了一种结构化的方法,可以将数据源(如 数据库、文件、云存储)连接到 Kafka 主题,并将 Kafka 主题数据传输到外部目的地,如 NoSQL 数据库、数据仓库或分析平台。

Kafka Connect 可以运行在两种模式下:

  • Standalone 模式:最适合单进程部署,通常用于简单的集成。
  • Distributed 模式:通过利用多个工作节点,实现了高可用性、可扩展性和容错性。

Kafka Connect 架构

Kafka Connect 采用插件式架构,包含工作节点、连接器和任务,实现了 Kafka 与外部系统之间的无缝集成。

Kafka Connect 组件

  1. Worker:运行和管理连接器和任务。
  2. Connector:定义了如何在 Kafka 和外部系统之间导入(source)或导出(sink)数据。
  3. Task:是移动数据的最小工作单元,由连接器创建以并行化数据移动。
  4. Converter:负责 Kafka 和外部存储之间数据的序列化和反序列化。

Kafka Connect 工作原理

  1. Source Connector 从外部系统拉取数据并将其推送到 Kafka 主题。
  2. Sink Connector 从 Kafka 主题读取数据并将其推送到外部系统。
  3. Kafka Connect 通过跟踪已处理记录的偏移量来确保容错性。

示例:实现一个简单的 Kafka Connect Source Connector

在此示例中,我们将创建一个简单的 source connector,它从文件读取数据并将其流式传输到 Kafka 主题。

步骤 1:创建 Kafka 主题

运行以下命令创建名为 test-topic 的主题

步骤 2:实现一个简单的 Kafka Connect Source Connector

创建 config/source-config.properties 文件

此配置指定 Kafka Connect 将从 test.txt 读取数据并将其发布到 test-topic Kafka 主题。

步骤 3:启动 Source Connector

此命令以 standalone 模式启动 Kafka Connect,并启动 source connector。

步骤 4:向源文件写入数据

添加到 test.txt 的任何新行都将自动流式传输到 Kafka 主题。

步骤 5:从 Kafka 消费消息

预期输出

Kafka Connect 太棒了!

这证实了 Kafka Connect source connector 已成功将数据从文件流式传输到 Kafka 主题。

示例:实现一个简单的 Kafka Connect Sink Connector

现在,我们将实现一个简单的 sink connector,它从 Kafka 主题读取数据并将其写入文件。

步骤 1:创建 Kafka 主题

步骤 2:创建 Sink Connector 配置文件

创建 config/sink-config.properties 文件

此配置指示 Kafka Connect 从 sink-topic 读取数据并将其写入 sink-output.txt。

步骤 3:启动 Sink Connector

步骤 4:将消息发布到 Sink 主题

输入一些消息

步骤 5:验证 Sink 文件中的输出

预期输出

Leveraging Kafka Connect for Cosmos DB Integration

这表明我们的 Kafka Connect sink connector 成功地将消息从 Kafka 主题写入了文件。

Azure Cosmos DB 概述

什么是 Azure Cosmos DB?

Azure Cosmos DB 是由 Microsoft Azure 提供的一项全局分布式的多模型 NoSQL 数据库服务。它专为跨多个地理区域提供高可用性、可扩展性和低延迟数据访问而设计。 Cosmos DB 支持多种数据模型,包括键值、文档、列族和图数据库,使其适用于各种实时应用程序。

Cosmos DB 架构

Azure Cosmos DB 由以下核心组件组成:

  • Containers:存储项的逻辑存储单元,类似于集合或表。
  • Databases:管理用户和权限的容器的逻辑分组。
  • Items:存储在容器中的单个 JSON 文档。
  • Partitions:用于将数据分布到多个节点以实现可扩展性的物理存储单元。
  • Request Units (RUs):吞吐量的度量单位,定义了读/写操作的成本。

设置 Azure Cosmos DB

步骤 1:创建 Cosmos DB 帐户

  1. 登录 Azure Portal。
  2. 点击 **创建资源 > 数据库 > Azure Cosmos DB。**
  3. 选择一个 API(本示例我们将使用 **SQL API**)。
  4. 设置帐户名称、区域和吞吐量设置。
  5. 点击 **审阅 + 创建**,等待部署完成。

步骤 2:创建数据库和容器

  1. 在 Azure Portal 中导航到 Cosmos DB 帐户。
  2. 点击 **Data Explorer > 新建数据库。**
  3. 输入数据库名称(例如,kafka_cosmos_db)。
  4. 点击 **新建容器,** 指定容器 ID(例如,kafka_messages),并设置分区键(/id)。
  5. 点击 **确定** 创建容器。

示例:使用 Python 将数据写入 Cosmos DB

我们将使用 Azure Cosmos DB Python SDK 将数据插入我们新创建的容器。

安装所需软件包

将数据插入 Cosmos DB 的 Python 代码

预期输出

Leveraging Kafka Connect for Cosmos DB Integration

这证实了我们的消息已成功写入 Cosmos DB。

从 Cosmos DB 查询数据

一旦数据存储完毕,我们就可以使用类似 SQL 的语法进行查询。

查询数据的 Python 代码

预期输出

Leveraging Kafka Connect for Cosmos DB Integration

Kafka Connect

Kafka Connect 是一个以可扩展且容错的方式集成 Apache Kafka 与外部数据系统的框架。它简化了 Kafka 与各种数据库、键值存储、云服务和搜索引擎之间的数据流式传输过程。

Kafka Connect 架构

Kafka Connect 由以下部分组成:

  • Connectors:促进数据移动的预构建插件。
  • Workers:负责执行连接器任务的进程。
  • Tasks:连接器的子组件,负责数据摄取。
  • Converters:序列化和反序列化数据格式(JSON、Avro、Protobuf)。
  • Transforms:在将数据发送到 Kafka 之前对其进行修改。

安装和配置 Kafka Connect

步骤 1:下载和安装 Kafka

从官方网站下载 Apache Kafka

解压并移入 Kafka 目录

启动 Zookeeper 和 Kafka

步骤 2:安装 Kafka Connect

Kafka Connect 已随 Kafka 一起打包。配置 connect-distributed.properties

更新

启动 Kafka Connect

设置 Kafka Connect 以用于 Cosmos DB

步骤 1:安装 Kafka Connect Azure Cosmos DB 连接器

安装后重启 Kafka Connect。

步骤 2:配置 Cosmos DB Sink Connector

创建 cosmosdb-sink.properties

加载连接器

步骤 3:创建 Kafka 主题

3.5 将数据发送到 Kafka 主题

使用 Kafka 控制台生产者

发送 JSON 消息

3.6 从 Cosmos DB 查询数据

使用 Python 脚本查询数据

预期输出

Leveraging Kafka Connect for Cosmos DB Integration

利用 Kafka Connect 和 Cosmos DB 的性能

1. 调优 Kafka Connect 以获得高吞吐量

Kafka Connect 是在 Kafka 和 Cosmos DB 等外部系统之间流式传输数据的可扩展且可靠的方式。要实现高吞吐量,您需要微调源连接器和目标连接器的参数。

关键 Kafka Connect 配置参数

  • tasks.max:定义并发任务的数量。更多的任务会增加并行度。
  • batch.size:确定一次处理多少条记录。
  • linger.ms:延迟发送消息以允许批处理。
  • buffer.memory:控制生产者操作的缓冲区大小。
  • compression.type:启用压缩(例如,snappy、gzip、lz4)以减少网络带宽。

针对 Cosmos DB 优化的 Kafka Connect Sink 配置

预期输出

使用优化设置运行时,您应该会看到 Cosmos DB 中的摄取速率增加

Leveraging Kafka Connect for Cosmos DB Integration

2. 利用 Cosmos DB 处理 Kafka 工作负载

Cosmos DB 具有多项关键优化功能,可处理大量 Kafka 数据:

关键优化

  • 分区策略:使用高基数字段(例如,order_id)作为分区键,以均匀分布负载。
  • 索引策略:修改索引以仅索引必要的字段并减少 RU 消耗。
  • 吞吐量预配:根据 Kafka 吞吐量使用自动缩放模式或预配足够的请求单位(RU/s)。
  • 批量写入:Cosmos DB SDK 提供批量插入以降低请求延迟。

用于高吞吐量 Kafka 流的优化 Cosmos DB 设置

预期的性能提升

  • 提高写入吞吐量:更均匀地分布写入。
  • 降低 RU 消耗:仅索引必要字段可降低成本。
  • 更快的读取操作:分区查询可以高效执行。

3. 确保数据流的低延迟

Kafka Connect 和 Cosmos DB 的延迟优化确保了实时数据传输。

低延迟技术

  • 减少 Kafka 生产者的 linger.ms 以最小化批处理等待时间。
  • 使用 Cosmos DB Change Feed 进行事件驱动架构。
  • 启用 Kafka Producer Acknowledgments (acks=all) 以保证交付。
  • 使用 max.poll.records 来控制消费者批次大小。

优化的 Kafka Producer 配置

预期输出

Leveraging Kafka Connect for Cosmos DB Integration

4. 高效扩展 Kafka Connect Worker

随着数据量的增长,Kafka Connect Worker 需要高效扩展。

扩展策略

  • 增加 tasks.max:允许更多并行 Kafka Connect 任务。
  • 在分布式模式下部署 Kafka Connect:支持跨 Worker 进行负载均衡。
  • 使用 Kubernetes 或 Docker Swarm:确保自动化扩展和故障转移。

优化的 Kafka Connect 分布式模式配置

预期的扩展效益

  • 同时处理更多 Kafka 主题
  • 避免数据处理瓶颈
  • 支持 Worker 故障时的故障转移

5. 监控和调试性能问题

为确保稳定性,请监控 Kafka Connect 和 Cosmos DB 指标。

关键监控工具

  • Prometheus + Grafana:可视化 Kafka Connect 和 Cosmos DB 指标。
  • Azure Monitor:跟踪 Cosmos DB 请求单位(RU/s)和延迟。
  • Kafka Connect REST API:监控 Worker 健康状况。

示例 Kafka Connect 监控 API 调用

示例输出

Leveraging Kafka Connect for Cosmos DB Integration

预期操作

  • 自动重启失败的任务
  • 如果延迟飙升,则增加 Cosmos DB 中的 RU
  • 根据负载扩展 Kafka Connect Worker