使用 Kafka 主题流和 Cosmos DB 进行数据丰富

2025 年 5 月 15 日 | 阅读 5 分钟

引言

数据丰富是指通过与额外数据集进行连接来增强原始流数据的过程,以提供更多上下文并使其更有价值。在实时数据管道中,可以使用 Apache Kafka Streams 在将数据持久化到目标数据库(如 Azure Cosmos DB)之前,对其进行处理、转换和丰富。

Kafka Streams + Cosmos DB 数据丰富架构概述

使用 Kafka Streams 和 Cosmos DB 的典型数据丰富管道由多个组件组成

架构组件

  1. Kafka Producer:将原始事件(例如,用户操作、日志、交易)流式传输到 Kafka 主题。
  2. Kafka Streams 应用程序
    • 从 Kafka 消费原始事件。
    • 通过与外部数据(来自 Cosmos DB)连接来丰富事件。
    • 将丰富后的数据发布到另一个 Kafka 主题。
  3. Kafka Consumer:读取丰富后的数据以进行进一步处理。
  4. Cosmos DB:存储丰富后的数据以供实时查询和分析。
  5. Kafka Connect Sink Connector:将丰富后的数据从 Kafka 流式传输到 Cosmos DB。

设置环境

在实施架构之前,请确保已安装并配置以下内容

  • Apache Kafka(带 Zookeeper)
  • Azure Cosmos DBNoSQL 数据库)
  • Kafka Connect 和 Cosmos DB Sink Connector
  • JavaKafka Streams API
  • 用于 Kafka Producer/Consumer 的 Spring Boot

实现 Kafka Producer(原始数据流式传输)

Kafka producer 会将原始事件(例如,用户交互)流式传输出去。

示例原始数据

Kafka Producer 的 Java 代码

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB

实现 Kafka Streams(数据丰富)

Kafka Streams 从 Cosmos DB 获取外部元数据并丰富事件。

Cosmos DB 中的外部元数据

Cosmos DB 存储附加数据,例如,歌曲元数据

Kafka Streams Processor 的 Java 代码

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB

实现 Kafka Consumer

消费丰富后的数据以进行进一步处理。

Kafka Consumer 的 Java 代码

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB

将丰富后的数据流式传输到 Cosmos DB

使用 Kafka Connect 和 Cosmos DB Sink Connector

用于数据丰富的 Kafka Streams

Kafka Streams 是一个强大的流处理库,它允许我们通过将实时事件与静态或动态数据集连接来丰富流数据。

用例示例

想象一个音乐流媒体服务,其中原始的歌曲播放事件是实时生成的。然而,原始事件只包含歌曲 ID,我们需要在将它们存储到 Cosmos DB 之前,用歌曲元数据(例如艺术家姓名、流派和专辑)来丰富它们。

  • 原始事件(Kafka Topic: song-plays)
  • 丰富数据(Kafka Topic: song-metadata)
  • 丰富后的数据输出(Kafka Topic: enriched-song-plays)
Data Enrichment with Kafka Topics Streams and Cosmos DB

Kafka Streams 将实时连接 song-plays 主题song-metadata 主题,并生成一个丰富后的事件。

设置 Kafka Topics

我们需要三个主题

  • song-plays:存储原始歌曲播放事件。
  • song-metadata:存储歌曲元数据。
  • enriched-song-plays:存储丰富后的歌曲播放事件。

使用 Kafka CLI 创建这些主题

用于原始歌曲播放事件的 Kafka Producer

以下 Python producer 将原始歌曲播放事件发送到 song-plays 主题。

song_plays_producer.py

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB

用于歌曲元数据的 Kafka Producer

此 producer 将歌曲元数据发送到 song-metadata 主题。

song_metadata_producer.py

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB

实现用于数据丰富功能的 Kafka Streams

我们在 Java 中使用 Kafka Streams 连接 song-plays 主题和 song-metadata 主题。

KafkaStreamsEnrichment.java

用于将数据存储到 Cosmos DB 的 Kafka Consumer

我们使用 Python 消费丰富后的数据并将其存储到 Cosmos DB。

cosmosdb_consumer.py

输出

Data Enrichment with Kafka Topics Streams and Cosmos DB