使用 Kafka Connect 演进旧系统

2025年5月15日 | 阅读 7 分钟

现代企业面临着维护遗留系统和同时适应快速发展的技术格局的双重挑战。遗留系统通常包含关键数据和业务逻辑,对日常运营至关重要。然而,这些系统往往缺乏当今互联环境所需的 scalability、灵活性和集成能力。Apache Kafka 的关键组件 Kafka Connect 有助于将旧系统与新数据系统连接起来,从而使企业能够在不中断当前运营的情况下实现增长。

1. 理解 Kafka Connect

Kafka Connect 有助于将 Kafka 与其他系统连接起来。它使得数据进出 Kafka 变得容易,从而使旧系统和新应用程序能够顺畅地协同工作。

1.1 Kafka Connect 的主要特性

  • 分布式和独立模式:在独立模式下运行以进行本地任务,或在分布式模式下运行以实现可伸缩性和容错性。
  • 连接器生态系统:利用流行的数据库、文件系统和云服务的预构建连接器。
  • 模式管理:使用 Confluent Schema Registry 来强制执行数据模式并确保一致性。

1.2 为什么为遗留系统使用 Kafka Connect?

  • 最小化中断:集成遗留系统而无需进行重大更改。
  • 实时数据集成:启用流数据管道以进行实时分析。

2. 遗留系统的挑战

由于架构限制和对过时技术的依赖,遗留系统在现代化改造过程中常常会带来重大障碍。Kafka Connect 通过提供一个强大的框架来实现无缝集成,从而解决了这些挑战。

2.1 常见挑战

  • 数据孤岛:遗留系统通常独立运行,导致数据共享困难。
  • 可伸缩性有限:旧系统可能难以处理增加的工作负载。
  • 专有协议:许多遗留系统使用非标准通信协议。

2.2 使用 Kafka Connect 解决挑战

  • 打破孤岛:将遗留系统中的数据流式传输到统一的基于 Kafka 的平台。
  • 可伸缩架构:将工作负载分布在多个 Kafka Connect 工作进程中。
  • 协议桥接:使用连接器在遗留协议和现代协议之间进行转换。

演进遗留系统的数据源和主题

现代化遗留系统通常需要从各种传统数据源(如关系数据库、大型机和平面文件)进行连接和数据提取。每种源类型在与 Kafka Connect 集成时都呈现出独特的挑战和解决方案。

关系数据库 (RDBMS)

MySQLPostgreSQL、Oracle 和 SQL Server 这样的关系数据库是最常见的数据源之一。这些数据库通常存储关键的事务数据,并且需要有效的方法来提取、转换和加载 (ETL) 数据到现代系统中。

关键主题

a. 变更数据捕获 (CDC)

CDC 允许 Kafka Connect 只捕获对数据库所做的更改(插入、更新、删除),从而减少延迟和带宽使用。

示例:使用 Debezium 连接器进行 MySQL CDC

1. 用于 CDC 的 Kafka Connect 配置

输出:捕获的更改事件被流式传输到 Kafka 主题,例如 mysql_server.inventory.products:

b. 增量数据加载

在 CDC 不可行的情况下,可以使用基于时间戳或唯一键的增量加载。

示例:JDBC 连接器增量模式

1. Kafka Connect 配置

输出:新行被流式传输到 Kafka 主题,例如 jdbc.inventory.products:

Evolving Legacy Systems with Kafka Connect

c. 模式演进和管理

当数据库中的模式发生更改时,需要将它们传播到下游消费者。使用 Schema Registry 可确保兼容性。

示例:使用 Avro 进行模式演进

  1. 更新数据库模式
    ALTER TABLE products ADD COLUMN stock INT;
  2. Kafka 主题消息
Evolving Legacy Systems with Kafka Connect

Schema Registry 确保此更新对下游消费者保持向后兼容。

大型机

大型机,如 IBM Db2 和 VSAM,存储着海量的关键数据。对这些系统进行现代化改造涉及克服独特的挑战,例如批量处理和封闭架构。

a. 大型机集成挑战

大型机通常使用专有格式,导致直接集成困难。使用 Confluent 或 Syncsort 等专用连接器可以弥合这一差距。

示例:使用 Confluent 的 IBM MQ 源连接器

1. Kafka Connect 配置

输出:来自大型机队列的消息被流式传输到 Kafka

Evolving Legacy Systems with Kafka Connect

b. 批量到实时转换

大型机传统上是面向批量的。Kafka Connect 通过将批处理数据分解为事件来实现实时数据流。

示例:使用 Kafka Connect 进行 ETL 转换

1. 使用文件源连接器将批处理数据加载到 Kafka 中

输出:来自 CSV 的行被流式传输到 Kafka

Evolving Legacy Systems with Kafka Connect

平面文件

许多遗留工作流程依赖于平面文件(CSV、XMLJSON)进行数据交换。必须将这些文件高效地摄取到 Kafka 中以进行实时处理。

关键主题

a. 基于文件的源连接器

平面文件连接器从文件系统读取数据并将其流式传输到 Kafka 主题。

示例:FileStream 源连接器

1. 配置

2. 输入文件

3. 输出

Evolving Legacy Systems with Kafka Connect

b. 处理半结构化数据

平面文件通常包含 XML 或 JSON,需要在下游消费之前进行转换。

示例:使用 Kafka Streams 进行 JSON 转换

1. 输入 JSON

2. 用于丰富和格式化的 Kafka Streams 应用程序

3. 输出

Evolving Legacy Systems with Kafka Connect

3. 使用程序和输出的实现

3.1 部署 Kafka 和 Kafka Connect

为了实现数据集成,请部署 Kafka 和 Kafka Connect。下面是使用 Docker Compose 的程序化示例。

输出

运行的 Kafka 和 Kafka Connect 设置可在 localhost 上访问。

3.2 配置连接器

连接器是 Kafka Connect 的核心组件。以下是源连接器和目标连接器配置的示例。

示例:JDBC 源连接器

此配置将数据从 MySQL 数据库流式传输到 Kafka 主题。

示例:Elasticsearch 目标连接器

此配置将数据从 Kafka 主题流式传输到 Elasticsearch 以进行分析。

输出

来自 Kafka 中 legacy-orders 主题的数据被索引到 Elasticsearch 中,用于搜索和分析。

3.3 测试管道

配置连接器后,验证数据管道以确保数据流准确一致。

示例:从 Kafka 主题消费消息

使用 Kafka 的控制台消费者验证 Kafka 主题中的数据。

kafka-console-consumer --bootstrap-server localhost:9092 --topic legacy-orders --from-beginning

输出

从 orders 表中实时提取的数据显示在控制台上

Evolving Legacy Systems with Kafka Connect

示例:查询 Elasticsearch 中的数据

使用 cURL 验证索引到 Elasticsearch 中的数据。

索引到 Elasticsearch 中的 Kafka 主题数据显示

3.4 监控和维护管道

示例:使用 Prometheus 和 Grafana 进行监控

  • 使用 JMX exporter 将 Kafka Connect 指标暴露给 Prometheus。
  • 创建 Grafana 仪表板以可视化吞吐量和任务状态等指标。

4. 技术深入:Kafka Connect 配置

Kafka Connect 是一个强大的工具,用于将不同系统与 Apache Kafka 集成,从而在源和目标之间实现无缝的数据流。

理解 Kafka Connect 架构

Kafka Connect 作为分布式框架运行。其主要组件包括

  • 连接器:与外部系统的接口(例如,数据库、文件系统)。
  • 任务:读取或写入数据的任务单元。
  • 工作进程:运行连接器和任务的进程。
  • 配置存储:跟踪连接器配置。
  • 偏移量存储:跟踪任务的进度。

关键配置参数

Kafka Connect 配置使用 JSON 或属性文件进行管理。关键配置参数包括

工作进程配置

连接器配置

连接器配置定义数据源或目标。

示例:JDBC 源连接器

说明

  • connector.class:指定连接器类型。
  • tasks.max:最大并行任务数。
  • connection.url:数据库连接详细信息。
  • table.whitelist:要包含的表。
  • mode:同步模式(递增、时间戳或两者)。
  • topic.prefix:Kafka 主题的前缀。

输出:来自 users 表的数据以 test- 为前缀流式传输到 Kafka 主题。

设置 Kafka Connect

使用 Docker Compose 部署 Kafka Connect

注册连接器

部署后,使用 REST API 注册连接器。

注册源连接器

输出:连接器 jdbc-source-connector 处于活动状态并正在流式传输数据。

监控和扩展 Kafka Connect

监控指标

Kafka Connect 通过 JMX 公开指标。示例指标包括

  • 任务吞吐量。
  • 连接器状态。
  • 错误率。

示例:Prometheus 配置

扩展工作进程

通过向集群添加工作进程来扩展 Kafka Connect。确保对配置和偏移量主题有共享访问权限。

命令

输出:增加了工作进程容量以处理更多任务。

5. 高级功能

单消息转换 (SMT)

SMT 允许在传输过程中进行数据转换。

示例:屏蔽敏感字段

输出:信用卡号在到达目标之前被屏蔽。

错误处理

定义数据处理过程中错误的策略。

说明

  • errors.tolerance:在错误时忽略或失败。
  • errors.deadletterqueue.topic.name:用于有问题消息的主题。
  • errors.log.enable:记录错误以进行调试。

6. 测试配置

示例:控制台消费者

输出:test-users 主题的实时数据显示在控制台上。