使用 Kafka Connect 演进旧系统2025年5月15日 | 阅读 7 分钟 现代企业面临着维护遗留系统和同时适应快速发展的技术格局的双重挑战。遗留系统通常包含关键数据和业务逻辑,对日常运营至关重要。然而,这些系统往往缺乏当今互联环境所需的 scalability、灵活性和集成能力。Apache Kafka 的关键组件 Kafka Connect 有助于将旧系统与新数据系统连接起来,从而使企业能够在不中断当前运营的情况下实现增长。 1. 理解 Kafka ConnectKafka Connect 有助于将 Kafka 与其他系统连接起来。它使得数据进出 Kafka 变得容易,从而使旧系统和新应用程序能够顺畅地协同工作。 1.1 Kafka Connect 的主要特性
1.2 为什么为遗留系统使用 Kafka Connect?
2. 遗留系统的挑战由于架构限制和对过时技术的依赖,遗留系统在现代化改造过程中常常会带来重大障碍。Kafka Connect 通过提供一个强大的框架来实现无缝集成,从而解决了这些挑战。 2.1 常见挑战
2.2 使用 Kafka Connect 解决挑战
演进遗留系统的数据源和主题 现代化遗留系统通常需要从各种传统数据源(如关系数据库、大型机和平面文件)进行连接和数据提取。每种源类型在与 Kafka Connect 集成时都呈现出独特的挑战和解决方案。 关系数据库 (RDBMS)像 MySQL、PostgreSQL、Oracle 和 SQL Server 这样的关系数据库是最常见的数据源之一。这些数据库通常存储关键的事务数据,并且需要有效的方法来提取、转换和加载 (ETL) 数据到现代系统中。 关键主题 a. 变更数据捕获 (CDC) CDC 允许 Kafka Connect 只捕获对数据库所做的更改(插入、更新、删除),从而减少延迟和带宽使用。 示例:使用 Debezium 连接器进行 MySQL CDC 1. 用于 CDC 的 Kafka Connect 配置 输出:捕获的更改事件被流式传输到 Kafka 主题,例如 mysql_server.inventory.products: b. 增量数据加载 在 CDC 不可行的情况下,可以使用基于时间戳或唯一键的增量加载。 示例:JDBC 连接器增量模式 1. Kafka Connect 配置 输出:新行被流式传输到 Kafka 主题,例如 jdbc.inventory.products: ![]() c. 模式演进和管理 当数据库中的模式发生更改时,需要将它们传播到下游消费者。使用 Schema Registry 可确保兼容性。 示例:使用 Avro 进行模式演进
![]() Schema Registry 确保此更新对下游消费者保持向后兼容。 大型机大型机,如 IBM Db2 和 VSAM,存储着海量的关键数据。对这些系统进行现代化改造涉及克服独特的挑战,例如批量处理和封闭架构。 a. 大型机集成挑战 大型机通常使用专有格式,导致直接集成困难。使用 Confluent 或 Syncsort 等专用连接器可以弥合这一差距。 示例:使用 Confluent 的 IBM MQ 源连接器 1. Kafka Connect 配置 输出:来自大型机队列的消息被流式传输到 Kafka ![]() b. 批量到实时转换 大型机传统上是面向批量的。Kafka Connect 通过将批处理数据分解为事件来实现实时数据流。 示例:使用 Kafka Connect 进行 ETL 转换 1. 使用文件源连接器将批处理数据加载到 Kafka 中 输出:来自 CSV 的行被流式传输到 Kafka ![]() 平面文件许多遗留工作流程依赖于平面文件(CSV、XML、JSON)进行数据交换。必须将这些文件高效地摄取到 Kafka 中以进行实时处理。 关键主题 a. 基于文件的源连接器 平面文件连接器从文件系统读取数据并将其流式传输到 Kafka 主题。 示例:FileStream 源连接器 1. 配置 2. 输入文件 3. 输出 ![]() b. 处理半结构化数据 平面文件通常包含 XML 或 JSON,需要在下游消费之前进行转换。 示例:使用 Kafka Streams 进行 JSON 转换 1. 输入 JSON 2. 用于丰富和格式化的 Kafka Streams 应用程序 3. 输出 ![]() 3. 使用程序和输出的实现3.1 部署 Kafka 和 Kafka Connect为了实现数据集成,请部署 Kafka 和 Kafka Connect。下面是使用 Docker Compose 的程序化示例。 输出 运行的 Kafka 和 Kafka Connect 设置可在 localhost 上访问。 3.2 配置连接器连接器是 Kafka Connect 的核心组件。以下是源连接器和目标连接器配置的示例。 示例:JDBC 源连接器 此配置将数据从 MySQL 数据库流式传输到 Kafka 主题。 示例:Elasticsearch 目标连接器 此配置将数据从 Kafka 主题流式传输到 Elasticsearch 以进行分析。 输出 来自 Kafka 中 legacy-orders 主题的数据被索引到 Elasticsearch 中,用于搜索和分析。 3.3 测试管道配置连接器后,验证数据管道以确保数据流准确一致。 示例:从 Kafka 主题消费消息 使用 Kafka 的控制台消费者验证 Kafka 主题中的数据。 kafka-console-consumer --bootstrap-server localhost:9092 --topic legacy-orders --from-beginning 输出 从 orders 表中实时提取的数据显示在控制台上 ![]() 示例:查询 Elasticsearch 中的数据 使用 cURL 验证索引到 Elasticsearch 中的数据。 索引到 Elasticsearch 中的 Kafka 主题数据显示 3.4 监控和维护管道示例:使用 Prometheus 和 Grafana 进行监控
4. 技术深入:Kafka Connect 配置Kafka Connect 是一个强大的工具,用于将不同系统与 Apache Kafka 集成,从而在源和目标之间实现无缝的数据流。 理解 Kafka Connect 架构Kafka Connect 作为分布式框架运行。其主要组件包括
关键配置参数Kafka Connect 配置使用 JSON 或属性文件进行管理。关键配置参数包括 工作进程配置 连接器配置 连接器配置定义数据源或目标。 示例:JDBC 源连接器 说明
输出:来自 users 表的数据以 test- 为前缀流式传输到 Kafka 主题。 设置 Kafka Connect使用 Docker Compose 部署 Kafka Connect 注册连接器部署后,使用 REST API 注册连接器。 注册源连接器 输出:连接器 jdbc-source-connector 处于活动状态并正在流式传输数据。 监控和扩展 Kafka Connect监控指标 Kafka Connect 通过 JMX 公开指标。示例指标包括
示例:Prometheus 配置 扩展工作进程 通过向集群添加工作进程来扩展 Kafka Connect。确保对配置和偏移量主题有共享访问权限。 命令 输出:增加了工作进程容量以处理更多任务。 5. 高级功能单消息转换 (SMT)SMT 允许在传输过程中进行数据转换。 示例:屏蔽敏感字段 输出:信用卡号在到达目标之前被屏蔽。 错误处理定义数据处理过程中错误的策略。 说明
6. 测试配置示例:控制台消费者 输出:test-users 主题的实时数据显示在控制台上。 |
我们请求您订阅我们的新闻通讯以获取最新更新。