Kafka 高级处理流

2025 年 1 月 23 日 | 阅读 16 分钟

Kafka Streams 简介

Kafka Streams 是 Apache Kafka 的一个强大的客户端库,旨在构建可伸缩、容错且实时的流处理应用程序和微服务。利用 Kafka 的分布式和持久化消息平台,Kafka Streams 提供了一种强大而简单的方式来处理和转换正在传输中的数据。

Kafka Advance Processing Streams

核心概念

流和表

  • 流: Kafka Streams 中的流表示连续的不可变数据记录流。流中的每条记录都是一个键值对。
  • 表: 表是每个键的最新值的快照,代表状态。它们可以被视为从流派生的物化视图。

Kafka Streams 中的处理拓扑

Kafka Streams 应用程序本质上由其处理拓扑定义。此拓扑是一个有向无环图 (DAG),由流处理节点组成,每个节点代表一个特定的操作,例如过滤、映射或聚合数据。理解处理拓扑对于使用 Kafka Streams 构建高效可伸缩的流处理应用程序至关重要。

1. 处理拓扑概述

有向无环图 (DAG)

Kafka Streams 中的处理拓扑被构建为 DAG。在此图中

  • 节点代表单个流处理操作。
  • 代表这些操作之间的数据流。

图的无环性质确保没有循环,这意味着数据从源到汇单向流动,而不会循环回。

拓扑的组成部分

  • 源节点: 代表数据从中读取的 Kafka 主题的节点。
  • 处理器节点: 对数据应用转换或处理逻辑的节点。
  • 汇节点: 将处理后的数据写回 Kafka 主题的节点。

2. 构建处理拓扑

StreamsBuilder

StreamsBuilder 类用于构建处理拓扑。它提供了定义源节点(来自 Kafka 主题的流)、应用转换和定义汇节点(流到 Kafka 主题)的方法。

示例:创建 StreamsBuilder

定义源节点

源节点是通过从 Kafka 主题读取来创建的。这是通过 StreamsBuilder 的 stream 或 table 方法完成的。

Kafka Advance Processing Streams

示例:定义源节点

应用转换

转换通过 KStream 或 KTable 接口提供的各种方法应用于流。这些方法代表不同类型的处理节点。

  • Map: 将每条记录转换为一条新记录。
  • Filter: 保留满足谓词的记录。
  • FlatMap: 将每条记录转换为零条或多条记录。
  • GroupBy: 按键对记录进行分组。
  • Aggregate: 将记录聚合为新形式(例如,计数、求和)。

示例:应用转换

定义汇节点

通过使用 to 方法将处理后的流写回 Kafka 主题来创建汇节点。

示例:定义汇节点

3. 理解处理拓扑中的节点

源节点

源节点是拓扑的入口点,从 Kafka 主题消费数据。从主题读取的每条记录都成为流中的一个事件。

示例:源节点

处理器节点

处理器节点执行实际的流处理操作。它们使用 KStream 和 KTable 上的转换方法定义。

  • 无状态处理节点: 不在记录之间维护状态的操作。
    • map:单独转换每条记录。
    • filter:根据条件过滤记录。
  • 有状态处理节点: 维护跨记录状态的操作。
    • aggregate:在一段时间或组上聚合记录。
    • join:基于键组合两个流的记录

示例:处理器节点

汇节点

汇节点是拓扑的出口点,处理后的数据在此处写回 Kafka 主题。

示例:汇节点

4. 完整处理拓扑的示例

让我们考虑一个更全面的示例,该示例说明了从读取数据、处理数据到将其写回 Kafka 主题的完整过程。

场景:处理用户活动日志流,过滤掉短暂的活动,将活动描述转换为大写,并计算每种活动类型的出现次数。

示例:完整拓扑

5. 处理拓扑中的高级概念

窗口

窗口操作允许您将记录分组到基于时间的窗口中进行聚合。这对于实时分析等场景至关重要。

示例:窗口聚合

连接

Joining streams 基于公共键将两个或多个流的记录组合在一起。Kafka Streams 支持各种连接类型,包括内连接、左连接和外连接。

示例:流-流连接

GlobalKTables

GlobalKTables 是一种特殊的 KTable,它被复制到 Kafka Streams 应用程序的所有实例中。这对于需要全局数据视图的查找非常有用。

示例:使用 GlobalKTable

6. 调试和监控拓扑

调试

可以使用拓扑可视化器或记录拓扑描述来帮助理解和调试处理拓扑。

示例:记录拓扑

监控

Kafka Streams 提供了各种指标,可用于监控拓扑的性能和运行状况。这些指标可以与 Prometheus 和 Grafana 等监控工具集成。

示例:启用 JMX 监控

状态存储

Kafka Streams 通过状态存储实现有状态处理,这些存储可以是内存中的或持久化的。这些存储保存状态信息,这些信息对于窗口聚合或连接等操作是必需的。

处理保证

Kafka Streams 为处理数据提供了强大的保证,包括至少一次和精确一次语义。这可确保数据得到可靠一致的处理,不会丢失或重复。

Kafka Advance Processing Streams

主要特点

高级 DSL

Kafka Streams DSL(领域特定语言)提供了一个高级 API,用于流处理。它允许开发人员轻松构建复杂的处理管道,提供 map、filter、join 和 aggregate 等操作。

示例:单词计数应用程序

处理器 API

Processor API 是 Apache Kafka 的一个组件,它允许您对 Kafka 流执行比 Streams DSL 更复杂的数据处理。它是一个较低级别的 API,可提供对记录处理的更多控制。

以下是它提供的功能分解

主要特点

  1. 精细控制:与抽象了许多复杂性的 Streams DSL 不同,Processor API 允许您直接访问处理逻辑,从而可以微调记录的处理方式。
  2. 自定义处理逻辑:您可以通过定义处理器并将数据通过这些处理器路由来实现自定义处理逻辑。这对于复杂的转换或与外部系统集成很有用。
  3. 有状态处理:它支持有状态处理,您可以在其中维护跨记录的状态。这是通过状态存储完成的,这些存储可用于在处理步骤之间跟踪信息。
  4. 与 Kafka 主题集成:处理器可以直接与 Kafka 主题交互,从而可以进行自定义路由和消息处理。
  5. 自定义处理器:您可以通过实现 Processor 接口来创建自定义处理器。这些处理器可用于处理记录、执行操作并将结果发送到其他主题。

组成部分

  • Processor:您需要实现以定义处理逻辑的接口。
  • ProcessorContext:提供对处理器信息的访问,包括状态存储和调度处理的能力。
  • StateStore:允许您跨记录维护和访问状态。有各种类型的状态存储,包括键值存储和窗口存储。
  • Topology:通过将处理器和状态存储连接在一起,定义整体处理逻辑。

用例示例

假设您想处理记录流以执行一些自定义逻辑,例如以特定方式聚合数据,然后将其发送到另一个主题。使用 Processor API,您可以

  1. 定义一个执行聚合的自定义处理器。
  2. 使用状态存储来跟踪中间结果。
  3. 将处理后的数据路由到另一个 Kafka 主题。
Kafka Advance Processing Streams

示例代码

以下是定义处理器的一些基本示例

然后,您将构建一个使用此处理器的拓扑

Processor API 功能强大且灵活,但它也需要对 Kafka 的内部原理有更深入的了解。它最适合需要执行使用更高级别抽象难以实现的复杂处理的场景。

Kafka Streams 交互式查询

Kafka Streams 是一个用于构建实时、可伸缩且容错的流处理应用程序的强大框架。其突出功能之一是能够对流处理状态执行交互式查询,允许开发人员直接从应用程序内部查询状态存储。此功能特别适用于构建需要实时洞察和即时访问处理状态的应用程序。

Kafka Advance Processing Streams

1. 交互式查询简介

交互式查询使 Kafka Streams 应用程序能够将其状态公开给外部系统或用户进行实时查询。此功能允许应用程序在无需处理新消息或事件的情况下访问 KTable 和全局表的状态。从本质上讲,它将流处理应用程序转变为可查询数据库,您可以在其中按需查找当前状态信息。

2. 设置交互式查询

定义状态存储

Kafka Streams 中的状态存储用于维护有状态操作(如聚合、连接和窗口计算)所需的状态。这些存储由持久化存储(例如 RocksDB)支持,并在 Kafka 集群中进行复制以实现容错。

要使用交互式查询,您需要显式定义和物化状态存储。

示例:物化状态存储

启用交互式查询

要启用交互式查询,您必须将 Kafka Streams 应用程序配置为公开状态存储。这通常涉及设置一个查询端点。

示例:启用交互式查询

3. 查询状态存储

一旦状态存储被物化并且 Kafka Streams 应用程序正在运行,您就可以使用 KafkaStreams API 查询状态存储。

访问状态存储

要执行查询,您首先需要获取状态存储的引用。

示例:访问键值存储

执行查询

获取状态存储引用后,您可以执行各种类型的查询,例如点查找或范围查询。

示例:点查找

示例:范围查询

4. 通过 REST API 公开状态存储

为了使交互式查询能够从外部访问,您可以将它们通过 REST API 公开。这种方法允许其他服务或用户通过 HTTP 查询状态存储。

示例:交互式查询的 REST API

在此示例中,创建了一个简单的 Spring Boot REST 控制器来公开单词计数状态存储。当对 /wordcount/{word} 发出 GET 请求时,控制器将查询状态存储并返回指定单词的当前计数。

5. 处理状态存储更改

状态存储恢复

Kafka Streams 在应用程序重启或故障期间会自动处理状态存储的恢复。通过重放与状态存储相关的变更日志主题来重建状态。

示例:状态恢复日志

6. 可伸缩性和容错性

分区和复制

Kafka Streams 中的状态存储跨 Kafka 集群进行分区和复制。这可确保状态分布以实现可伸缩性,并在节点发生故障时进行恢复。

查询路由

在运行分布式 Kafka Streams 应用程序时,必须将查询路由到持有所请求键状态的正确实例。Kafka Streams 提供了一个元数据 API 来查找持有该键的实例。

示例:查询路由

7. 高级查询

窗口存储查询

对于窗口聚合,您可以查询窗口状态存储。这些存储为每个窗口维护状态,并可以查询特定窗口。

示例:窗口查询

Kafka Streams:源和汇处理器

在 Kafka Streams 中,源和汇处理器在定义数据如何进入和离开流处理拓扑方面起着至关重要的作用。理解这些处理器对于构建有效且可伸缩的流处理应用程序至关重要。本说明将深入探讨源和汇处理器的具体细节、它们的作用、配置以及它们如何融入整体 Kafka Streams 架构。

Kafka Advance Processing Streams

源处理器

源处理器是数据进入 Kafka Streams 应用程序的入口点。它们从 Kafka 主题中消费记录,并将它们转换为可以在拓扑内进一步处理的流。

汇处理器

汇处理器是数据从 Kafka Streams 应用程序退出的出口点。它们获取处理后的记录并将其写入 Kafka 主题,使结果可供下游使用者或其他系统使用。

示例:使用 KStream 定义源处理器

在此示例中,builder.stream("source-topic") 调用创建了一个 KStream,该 KStream 消费来自 source-topic Kafka 主题的数据。此主题中的每条记录都成为 KStream 中的一个事件。

源处理器类型

  • KStream:表示记录流,其中每条记录都会被独立处理。
  • KTable:表示一个变更日志流,其中每条记录都被解释为键值存储的更新。
  • GlobalKTable:一种特殊的 KTable,它在 Kafka Streams 应用程序的所有实例中完全复制。

示例:使用 KTable 定义源处理器

源处理器的配置

源处理器可以配置其他参数,例如键和值的反序列化器以及时间戳提取器。

示例:配置源处理器

在此示例中,Consumed.with(Serdes.String(), Serdes.String()) 方法指定键和值都应反序列化为字符串。

示例:定义汇处理器

在此示例中,sourceStream.to("sink-topic") 调用将 sourceStream 中的记录写入 sink-topic Kafka 主题。

汇处理器配置

汇处理器可以配置其他参数,例如键和值的序列化器、分区器和主题配置。

示例:配置汇处理器

在此示例中,Produced.with(Serdes.String(), Serdes.String()) 方法指定在写入 sink-topic 时键和值都应序列化为字符串。

拓扑中的源和汇处理器

源和汇处理器是 Kafka Streams 拓扑的组成部分。拓扑是一个有向无环图 (DAG),其中源处理器是入口点,汇处理器是出口点。

示例:具有源和汇处理器的简单拓扑

在此示例中,数据通过 sourceStream 从 source-topic 流动,通过将值转换为大写进行转换,然后通过 sink-topic 流出。

高级用例和配置

处理多个源主题

Kafka Streams 允许您在一个拓扑中从多个源主题进行消费。这对于需要处理来自各种来源数据的应用程序很有用。

示例:从多个源主题消费

在此示例中,来自 source-topic-1 和 source-topic-2 的数据合并为单个流并写入 sink-topic。

汇处理器的自定义分区

自定义分区器可用于控制记录如何在汇主题的分区之间分发。

示例:使用自定义分区器

在此示例中,使用 CustomPartitioner 来确定写入 sink-topic 的每条记录的分区。

容错和可伸缩性

容错性

Kafka Streams 应用程序设计为容错。源和汇处理器受益于 Kafka 的内在容错功能,例如数据复制和消费者组重新平衡。

可扩展性

Kafka Streams 应用程序通过运行应用程序的多个实例来水平扩展。源和汇处理器在分布式环境中无缝工作,确保数据在多个实例之间并行处理。

示例:端到端流处理

让我们考虑一个更全面的示例,该示例说明了源和汇处理器在端到端流处理应用程序中的使用。

场景:一个应用程序,它从源主题消费用户活动日志,过滤掉不相关的活动,转换相关数据,并将结果写入汇主题。

示例:端到端流处理

在此示例中,应用程序

  1. 从 user-activity-topic(源处理器)消费用户活动日志。
  2. 过滤掉不包含关键字“important”的活动。
  3. 通过添加“Processed:”前缀来转换剩余的活动。
  4. 将处理后的活动写入 processed-activity-topic(汇处理器)。

Kafka Streams 中的转换

Kafka Streams 是构建实时应用程序和微服务的强大库,其中输入数据被持续处理。Kafka Streams 中的转换是您可以在数据流经流处理拓扑时应用于数据的操作。这些转换允许您操作、过滤、聚合和连接数据流以获得有意义的见解。以下是 Kafka Streams 提供的各种转换操作的全面概述。

Kafka Advance Processing Streams

1. 转换概述

Kafka Streams 中的转换可分为无状态和有状态操作

  • 无状态转换:这些操作在处理单个记录之间不维护任何状态。
  • 有状态转换:这些操作维护状态以对多个记录执行计算,例如聚合或连接。

2. 无状态转换

a. Map 和 MapValues

  • map:将每条记录转换为新的键值对。
  • mapValues:转换每条记录的值,同时保持键不变。

示例:使用 map 和 mapValues

b. FlatMap 和 FlatMapValues

  • flatMap:将每条记录转换为零条或多条键值对。
  • flatMapValues:将每条记录的值转换为零个或多个值,同时保持键不变。

示例:使用 flatMap 和 flatMapValues

c. Filter 和 FilterNot

  • filter:保留满足给定谓词的记录。
  • filterNot:保留不满足给定谓词的记录。

示例:使用 filter 和 filterNot

d. Peek

  • peek:允许您为每条记录执行副作用操作,而无需修改流。

示例:使用 peek

3. 有状态转换

a. groupBy 和 groupByKey

  • groupBy:按新键对记录进行分组,创建 KGroupedStream。
  • groupByKey:按现有键对记录进行分组。

示例:使用 groupBy 和 groupByKey

b. Aggregate

  • aggregate:将记录聚合为新形式,例如计数、求和或规约。

示例:使用 aggregate

d. Count

  • count:计算每个组中的记录数。

示例:使用 count

e. Reduce

  • reduce:使用规约函数组合相同键的记录。

示例:使用 reduce

4. 连接流

Kafka Streams 提供了几种连接类型,用于组合来自两个流或表中的记录。

a. 流-流连接

根据公共键和时间窗口组合两个 KStreams 的记录。

示例:流-流连接

b. 流-表连接

根据公共键组合 KStream 和 KTable 的记录。

示例:流-表连接

c. 表-表连接

根据公共键组合两个 KTable 的记录。

示例:表-表连接

5. 窗口

窗口操作允许您将记录分组到固定大小的时间窗口中进行处理。

示例:滚动窗口

示例:跳跃窗口

6. GlobalKTable

GlobalKTable 是一种特殊的 KTable,它在 Kafka Streams 应用程序的所有实例中完全复制,允许进行全局查找。

示例:使用 GlobalKTable


下一主题Kafka-api