Kafka Streams DSL

2025 年 1 月 23 日 | 阅读 10 分钟
Kafka Streams DSL

Kafka Streams DSL(领域特定语言)是 Apache Kafka 环境中一个强大的库,允许循环处理包实时转换、混合和分析记录。本手册将深入探讨 Kafka Streams DSL 的复杂性,提供详细的解释、代码示例和实际见解,使其易于研究和理解。

Kafka Streams 简介

Kafka Streams 是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在 Kafka 集群中。它将客户端编写和部署流行 Java 和 Scala 应用程序的简单性与 Kafka 服务器端集群技术的优势相结合。

Kafka Streams 提供两个重要的 API 来构建流处理应用程序:Streams DSL 和 Processor API。本指南侧重于 Streams DSL,它更高级且更具声明性,因此更容易用于大多数常见的流处理任务。

Kafka Streams 的核心概念

在深入了解 Streams DSL 之前,理解一些核心概念至关重要

  • 流:一个无限的、持续更新的数据集。
  • 记录:流中的键值对。
  • 主题:记录发布到的类别。
  • KStream:表示记录流的抽象。
  • KTable:一个变更日志流,其中每个数据记录表示一个更新。
  • GlobalKTable:一个在所有软件实例中完全复制的 KTable。

Kafka Streams DSL 入门

要开始使用 Kafka Streams,您需要将必要的依赖项添加到您的项目中。如果您使用的是 Maven,请将以下内容添加到您的 pom.xml 中

编写您的第一个 Kafka Streams 应用程序

让我们从一个简单的示例开始,它从一个 Kafka 主题读取信息,对其进行处理,并将结果写入另一个 Kafka 主题。

设置 Kafka Streams 配置

首先,您需要配置 Kafka Streams 应用程序。这是一个基本设置

构建流拓扑

拓扑定义了您的 Kafka Streams 实用程序的计算常识。以下是您可以定义简单拓扑的方法

启动 Kafka Streams 应用程序

最后,您需要启动 Kafka Streams 应用程序

使用 Kafka Streams DSL 转换数据

现在您已经运行了一个基本的 Kafka Streams 应用程序,让我们探索一些常见的转换。

1. 过滤记录

过滤允许您根据条件从流中删除数据。这对于排除不相关或错误数据很有用

在此示例中,最有效的记录(其值包含“重要”一词)由过滤后的流传递。

2. 映射记录

映射允许您转换流中的每条记录。例如,您可以将值转换为大写

此转换获取每条记录的值并将其转换为大写,然后将其写入输出主题。

3. 平面映射记录

平面映射允许您将单个记录映射到多个记录。例如,您可以将句子拆分为单个单词

在此处,每个输入句子都被拆分为单词,并且每个单词作为单独的记录发送到输出主题。

使用 Kafka Streams DSL 聚合数据

聚合是 Kafka Streams DSL 的有效功能。它允许您按键对信息进行分组并执行计数、求和和减少等操作。

1. 计数记录

按键计数记录数

此示例将输入句子拆分为单词,按其值对单词进行分组,并计算每个单词的出现次数。

2. 求和记录

要对按键分组的记录值求和,可以使用 aggregate 方法

summedTable.toStream().to("summed-topic");

此示例将值转换为 Long 并按键求和。

3. 减少记录

减少记录涉及将多个记录组合成一个记录

此示例连接具有相同键的记录的值。

使用 Kafka Streams DSL 连接流

连接流是另一个关键功能,允许您组合来自不同流的记录。

1. 流-流连接

流-流连接根据共同键合并来自两个流的记录

在此示例中,来自源流和另一个流的匹配键的记录在 5 分钟窗口内连接。

2. 流-表连接

流-表连接将来自流的记录与来自表的更新相结合

此示例将流中的每个记录与表中的相应记录连接。

在 Kafka Streams DSL 中处理时间和窗口

Kafka Streams DSL 为处理时间和窗口提供了强大的支持,这对于许多循环处理包至关重要。

1. 翻滚窗口

翻滚窗口是固定大小、不重叠的窗口

在此示例中,记录被分组为 1 分钟窗口,并计算每个窗口的计数。

2. 滑动窗口

滑动窗口是固定大小、重叠的窗口

此示例创建大小为 1 分钟、每 30 秒前进一次的重叠窗口。

3. 会话窗口

会话窗口将记录分组到由一段时间的非活动状态分隔的会话中

此示例将记录分组到具有 5 分钟非活动间隔的会话中。

错误处理和状态管理

Kafka Streams DSL 提供了内置的错误处理和国家/地区管理机制,以确保健壮性和可靠性。

1. 处理反序列化错误

您可以配置 Kafka Streams 以优雅地处理反序列化错误

此配置确保反序列化错误不会阻止您的流处理,而是被记录和跳过。

2. 状态存储

Kafka Streams 使用状态存储来跟踪应用程序的状态。这些状态存储可用于持久存储并实现容错。

3. 持久状态存储

RocksDB(一种嵌入式键值存储)可以支持状态存储

此示例使用 RocksDB 创建名为“state-store”的持久状态存储。

4. 内存中状态存储

对于需要低延迟访问状态数据的应用程序,您可以使用内存中状态存储

此示例创建了一个禁用缓存的内存中状态存储。

Kafka Streams DSL 的高级功能

Kafka Streams DSL 还提供高级功能,例如交互式查询、全局 KTable 等。

1. 交互式查询

交互式查询允许您直接查询 Kafka Streams 应用程序的状态

此示例演示了如何查询状态存储“counts-store”以获取特定键的计数。

2. 全局 KTable

全局 KTable 提供了一种在应用程序的所有实例中复制主题的方法

此示例建议了一种将循环与全局复制表连接以丰富移动信息的方法。

监控和调试 Kafka Streams 应用程序

监控和调试对于保持 Kafka Streams 包的健康和性能至关重要。

1. 指标和 JMX

Kafka Streams 提供了一组丰富的可通过 JMX 访问的指标

此配置启用详细的指标记录,可通过 JMX 访问。

2. 日志记录

适当的日志记录可以帮助您调试问题并了解应用程序的行为

此配置指定了状态存储的持久化目录,这对于调试与状态相关的问题非常有用。

实际用例

Kafka Streams DSL 用于各个行业进行实时统计数据处理。以下是一些实际用例

1. 欺诈检测

金融机构使用 Kafka Streams 实时检测欺诈性交易。通过分析交易模式并应用机器学习模型,他们可以在欺诈影响客户之前识别并预防欺诈。

2. 监控物联网设备

在物联网企业中,Kafka Streams 处理来自多个传感器和设备的数据。这些数据可以聚合、转换和分析,以实时监控设备的健康状况和整体性能,从而实现预测性维护并减少停机时间。

3. 社交媒体分析

社交媒体结构使用 Kafka Streams 分析用户交互和内容。通过处理来自用户帖子、评论和反应的数据流,这些结构可以深入了解消费者行为、热门话题和情感分析。

使用 Kafka Streams DSL 的最佳实践

为确保 Kafka Streams 包的最佳性能和可维护性,请遵循以下最佳实践

1. 优化序列化

高效序列化对于性能至关重要。为您的信息类型使用适当的 SerDes(序列化器/反序列化器)。Avro 和 Protocol Buffers 是 Kafka Streams 中流行的序列化选择。

2. 有效管理状态存储

状态存储对于聚合和连接至关重要。确保它们配置良好并受到监控。使用压缩主题进行变更日志以优化存储和性能。

3. 优雅地处理重新平衡

重新平衡可能会中断处理。使用 repartition() 方法来控制数据分布并减少重新平衡的影响。此外,配置您的实用程序以正确处理状态存储恢复。

4. 监控和调整性能

持续揭示 Kafka Streams 软件中 JMX 指标和日志的使用情况。调整配置参数,例如提交周期、缓存大小和线程计数,以优化整体性能。

高级 Kafka Streams DSL 模式

Kafka Streams DSL 提供了高级样式,用于解决复杂的循环处理需求。这些模式扩展了基本的更改和聚合,以提供更有效和灵活的数据处理能力。

1. 事件溯源

事件溯源是一种设计模式,其中状态更改被记录为一系列事件。此方法允许您根据应用程序的事件历史重构其状态。

示例:使用 Kafka Streams 进行事件溯源

在这种情况下,“events-subject matter”中的事件通过组合技术进行处理和聚合。该状态保存在状态商店中,并写入输出主题。

2. CQRS(命令查询职责分离)

CQRS 分离了应用程序的读取和写入模型,允许您以不同方式处理命令(写入)和查询(读取)。Kafka Streams 可用于通过处理活动和更新实体化视图来实现读取版本。

示例:使用 Kafka Streams 实现 CQRS

在此示例中,读取模型通过处理来自“events-topic”的更新事件并将其聚合到存储在状态存储中的视图来更新。

3. 流表对偶性

流表对偶性是一个概念,即流可以被视为表,反之亦然。这允许灵活动态的数据处理模型。

示例:将流转换为表再转回

在此示例中,来自“源主题”的流被转换为表,然后再次转换为流,展示了流表对偶性的灵活性。

4. 滑动窗口

滑动窗口是一种窗口类型,其中窗口以指定的时间间隔在数据上滑动。这允许进行细粒度的基于时间的聚合。

示例:使用滑动窗口

在这种情况下,信息被分组到五分钟的滑动主页窗口中,每 30 秒前进一次。每个窗口的计数存储在一个国家存储中,并写入输出主题。

5. 交互式查询

交互式查询允许您直接查询 Kafka Streams 实用程序的状态,从而提供一种构建实际仪表板和监控工具的方法。

示例:使用交互式查询

在此示例中,查询了“counts-store”的状态以依赖特定键,演示了如何使用交互式查询来检索实时国家统计数据。

6. 模式匹配

模式匹配允许您检测与特定模式匹配的事件序列,这对于欺诈检测或复杂事件处理等场景非常有用。

示例:使用 Kafka Streams 进行模式匹配

在此示例中,来自“源主题”的与指定模式匹配的记录被过滤并写入“匹配主题”。

7. 处理延迟到达的数据

延迟到达的记录可以通过在窗口操作中使用宽限期来处理,从而确保在窗口关闭后到达的记录仍然得到处理。

示例:处理延迟到达的数据

在此示例中,在 5 分钟窗口中添加了 1 分钟的宽限期,允许延迟到达的数据在宽限期内进行处理。

结论

Kafka Streams DSL 是一款有效且灵活的工具,用于构建实际循环处理应用程序。凭借其直观的 API 和强大的功能,它可以轻松转换、混合和分析数据。本手册对 Kafka Streams DSL 进行了深入研究,涵盖了从基本设置到高级功能的全部内容。通过遵循此处提到的示例和概念,您可以开始构建自己的 Kafka Streams 程序并利用实际数据处理的强大功能。