Python Kafka 教程

2025年03月17日 | 阅读 9 分钟

在以下教程中,我们将讨论 Apache Kafka 及其在 Python 编程语言中的使用。

了解 Apache Kafka

Apache Kafka 是一个开源流平台,最初由 LinkedIn 设计。后来,它于 2011 年移交给 Apache 基金会并开源。

根据维基百科的定义

Apache Kafka 是 Apache 软件基金会开发的开源平台,用于处理流。它用 Java 和 Scala 编写。该项目的目标是提供一个高吞吐量、统一、低延迟的平台,以处理实时数据流。Apache Kafka 的存储层本质上是一个“设计为分布式事务日志的大规模可扩展发布/订阅消息队列”,这使得它对于企业基础设施处理流数据非常有价值。此外,Kafka 通过 Kafka Connect 连接到外部系统(用于导入和导出数据),并提供 Kafka Streams,一个用于 Java 流处理的库。

HKafka Tutorial in Python

我们可以把它看作一个巨大的提交日志,我们可以在其中按事件发生的顺序存储数据。该日志的用户可以根据自己的需要访问和利用它。

Apache Kafka 的一些用例

我们可以在不同的地方使用 Apache Kafka。让我们考虑一些 Kafka 的用例,这些用例可以帮助我们弄清楚它的用法

  1. 活动监控:我们可以使用 Kafka 监控活动。活动可能属于物理传感器和设备或网站。生产者可以发布来自数据源的原始数据,这些数据稍后可用于查找趋势和模式。
  2. 消息传递:我们还可以将 Kafka 用作服务之间的消息代理。如果我们要实现微服务架构,我们可以将一个微服务作为生产者,另一个作为消费者。例如,我们有一个微服务负责创建新帐户并向用户发送与帐户创建相关的电子邮件。
  3. 日志聚合:我们还可以利用 Kafka 从不同的系统收集日志,并将它们存储在集中式系统中以进行进一步处理。
  4. ETL: Kafka 提供了几乎实时流的功能;因此,我们可以根据需求开发基于 ETL 的系统。
  5. 数据库:根据我们前面提到的内容,我们可以说 Kafka 也充当数据库。它不是一个典型的具有按需查询数据功能的数据库,但 Kafka 可以根据需要存储数据而无需消费它。

理解 Kafka 的概念

让我们讨论 Kafka 的核心概念。

Kafka Tutorial in Python
  1. 主题:输入到系统的每条消息都必须是某个主题的一部分。主题是记录流。消息以键值对的形式存储。每条消息都被分配一个序列,称为偏移量。一条消息的结果可能是另一条消息的输入,用于进一步处理。
  2. 生产者:生产者是负责将数据发布到 Kafka 系统的应用程序。它们将数据发布到它们选择的主题。
  3. 消费者: 有消费者应用程序使用发布到主题中的消息。消费者订阅其偏好的主题并消费数据。
  4. 代理: 代理是 Kafka 的一个实例,负责消息交换。我们可以将 Kafka 作为集群的一部分或作为独立机器使用。

现在,让我们考虑一个简单的例子,一家餐馆有一个仓库或库房,里面存放着所有原材料,例如蔬菜、大米、面粉等等。

这家餐馆提供各种菜肴,如印度菜、意大利菜、中国菜等等。每种菜肴的厨师都可以参考仓库选择所需的食材并制作菜肴。所有不同菜系厨师有可能使用由原材料制成的相同东西。这可以是每种菜肴中都使用的一种秘制配料。在以下情况下,仓库充当代理,商品的商人是生产者,厨师制作的商品和秘制配料是主题,厨师是消费者。

如何在 Python 中访问 Kafka?

Python 编程语言中有各种库可用于使用 Kafka。其中一些库如下所述

序号。描述
1Kafka-Python这是一个由 Python 社区设计的开源库。
2PyKafka该库由 Parsly 维护,并声称是一个 Pythonic API。但是,我们不能像 Kafka-Python 那样在该库中创建动态主题。
3Confluent Python Kafka该库由 Confluent 提供,作为 librdkafka 的一个薄包装器。因此,它的性能优于上述两个。

安装依赖项

我们将使用 Kafka-Python 进行此项目。因此,我们可以使用 pip 安装程序手动安装它,如下所示

语法

现在,让我们开始构建项目。

项目代码

在以下示例中,我们将创建一个生成器,生成从 1 到 500 的数字并将它们发送到 Kafka 代理。然后,消费者将从代理读取数据并将其保存在 MongoDB 集合中。

使用 Kafka 的好处之一是,如果消费者发生故障,另一个或修复的消费者将继续从上次中断的地方读取。这是一个很好的方法,可以确认所有数据都已输入到数据库中,而不会丢失数据或重复数据。

在下面的示例中,让我们创建一个名为 produce.py 的新 Python 程序文件,并开始导入一些必需的库和模块。

文件:produce.py

说明

在上面的代码片段中,我们导入了所需的库和模块。现在,让我们初始化一个新的 Kafka 生产者。请注意以下参数

  1. bootstrap_servers = ['localhost: 9092']: 此参数设置与生产者联系以引导初始集群元数据的主机和端口。在此处设置此参数并非强制性,因为默认主机和端口是 localhost: 9092。
  2. value_serializer = lambda x: dumps(x).encode('utf-8'): 此参数在将数据发送到代理之前对数据进行序列化。在这里,我们将数据转换为 JSON 文件并将其编码为 UTF-8。

让我们考虑相同的以下代码片段。

文件:produce.py

说明

在上面的代码片段中,我们使用 KafkaProducer() 函数初始化了 Kafka 生产者,其中我们使用了上面描述的参数。

现在,我们必须生成从 1 到 500 的数字。我们可以使用 for 循环来完成此操作,其中我们将每个数字用作字典中的值,只有一个键:num。此键仅用作数据键,而不作为主题键。在同一循环中,我们还将数据发送到代理。

我们可以通过调用生产者的 send 方法并详细说明主题和数据来执行此操作。

注意:值序列化器将自动转换和编码数据。

我们可以休息五秒钟以完成迭代。如果我们需要确认代理是否收到消息,建议包含回调。

文件:produce.py

说明

在上面的代码片段中,我们使用了 for 循环来迭代从 1 到 500 的数字。我们还在每次迭代之间添加了五秒的间隔。

如果有人想测试代码,建议创建一个新主题并将数据发送到该新生成的主题。这种方法将避免任何重复值的情况以及在我们将生产者和消费者一起测试时 testnum 主题中可能出现的混淆。

消费数据

在我们开始编写消费者代码之前,让我们创建一个新的 Python 程序文件并将其命名为 consume.py。我们将导入一些模块,例如 json.loads、MongoClientKafkaConsumer。由于 PyMongo 超出本教程的范围,我们不会深入研究其代码。

此外,还可以根据需要将 mongo 代码替换为任何其他代码。我们可以编写此代码以将数据输入到另一个数据库,处理数据的代码,或任何其他可以想到的代码。

让我们考虑下面的代码片段,首先。

文件:consume.py

说明

在上面的代码片段中,我们从各自的库中导入了所需的模块。

让我们创建 Kafka 消费者。我们将为此工作使用 KafkaConsumer() 函数;因此让我们仔细看看此函数中使用的参数。

  1. 主题: KafkaConsumer() 函数的第一个参数是主题。在下面的例子中,它是 testnum
  2. bootstrap_servers = ['localhost: 9092']: 此参数与生产者相同。
  3. auto_offset_reset = 'earliest': 此参数是其他重要参数之一。它处理消费者在关闭或发生故障后从何处重新开始读取,我们可以将其设置为最新或最早。每当我们将其设置为最早时,消费者将从最新提交的偏移量开始读取。每当我们将其设置为最新时,消费者将从日志的末尾开始读取。这正是我们此处需要的。
  4. enable_auto_commit = True: 此参数确认消费者是否每个间隔提交其读取偏移量。
  5. auto_commit_interval_ms = 1000ms: 此参数用于设置两次提交之间的间隔。由于消息每五秒钟到达一次,因此每秒提交一次似乎是公平的。
  6. group_id = 'counters': 此参数是消费者所属的消费者组。请注意,消费者必须是消费者组的一部分才能使其自动提交。
  7. deserializer 用于将数据反序列化为通用 JSON 格式,与值序列化器的工作相反。

让我们考虑相同的以下代码片段。

文件:consume.py

说明

在上面的代码片段中,我们使用了 KafkaConsumer() 函数来生成 Kafka 消费者。我们还在函数中添加了我们之前研究过的参数。

现在,让我们考虑下面的代码片段来连接到 MongoDB 数据库的 testnum 集合(此集合类似于关系数据库中的表)。

文件:consume.py

说明

在上面的代码片段中,我们将一个变量定义为 my_client,它使用指定了主机和端口的 MongoClient() 函数。然后,我们定义了另一个变量 my_collection,它使用 my_client 变量访问 testnum 主题中的数据。

可以通过循环遍历消费者来从中提取此数据(在这里,消费者可以被视为一个可迭代对象)。消费者将一直监听,直到代理不再响应。我们可以使用 value 属性访问消息值。在这里,我们用消息值覆盖消息。

下一行将数据插入到数据库集合中。最后一行将打印一条确认消息,表明该消息已添加到我们的集合中。

注意:可以为此循环中的所有操作插入回调。

文件:consume.py

说明

在上面的代码片段中,我们使用了 for 循环来迭代消费者以提取数据。现在,为了测试代码,可以首先执行 produce.py 文件,然后执行 consume.py