使用 Kafka 主题流和 Cosmos DB 进行数据丰富2025 年 5 月 15 日 | 阅读 5 分钟 引言数据丰富是指通过与额外数据集进行连接来增强原始流数据的过程,以提供更多上下文并使其更有价值。在实时数据管道中,可以使用 Apache Kafka Streams 在将数据持久化到目标数据库(如 Azure Cosmos DB)之前,对其进行处理、转换和丰富。 Kafka Streams + Cosmos DB 数据丰富架构概述使用 Kafka Streams 和 Cosmos DB 的典型数据丰富管道由多个组件组成 架构组件
设置环境在实施架构之前,请确保已安装并配置以下内容
实现 Kafka Producer(原始数据流式传输)Kafka producer 会将原始事件(例如,用户交互)流式传输出去。 示例原始数据 Kafka Producer 的 Java 代码 输出 ![]() 实现 Kafka Streams(数据丰富)Kafka Streams 从 Cosmos DB 获取外部元数据并丰富事件。 Cosmos DB 中的外部元数据 Cosmos DB 存储附加数据,例如,歌曲元数据 Kafka Streams Processor 的 Java 代码 输出 ![]() 实现 Kafka Consumer消费丰富后的数据以进行进一步处理。 Kafka Consumer 的 Java 代码 输出 ![]() 将丰富后的数据流式传输到 Cosmos DB使用 Kafka Connect 和 Cosmos DB Sink Connector 用于数据丰富的 Kafka StreamsKafka Streams 是一个强大的流处理库,它允许我们通过将实时事件与静态或动态数据集连接来丰富流数据。 用例示例想象一个音乐流媒体服务,其中原始的歌曲播放事件是实时生成的。然而,原始事件只包含歌曲 ID,我们需要在将它们存储到 Cosmos DB 之前,用歌曲元数据(例如艺术家姓名、流派和专辑)来丰富它们。
![]() Kafka Streams 将实时连接 song-plays 主题和song-metadata 主题,并生成一个丰富后的事件。 设置 Kafka Topics我们需要三个主题
使用 Kafka CLI 创建这些主题 用于原始歌曲播放事件的 Kafka Producer以下 Python producer 将原始歌曲播放事件发送到 song-plays 主题。 song_plays_producer.py 输出 ![]() 用于歌曲元数据的 Kafka Producer此 producer 将歌曲元数据发送到 song-metadata 主题。 song_metadata_producer.py 输出 ![]() 实现用于数据丰富功能的 Kafka Streams我们在 Java 中使用 Kafka Streams 连接 song-plays 主题和 song-metadata 主题。 KafkaStreamsEnrichment.java 用于将数据存储到 Cosmos DB 的 Kafka Consumer 我们使用 Python 消费丰富后的数据并将其存储到 Cosmos DB。 cosmosdb_consumer.py 输出 ![]() 下一个主题动态主题分区策略 |
我们请求您订阅我们的新闻通讯以获取最新更新。