Python Kafka 教程2025年03月17日 | 阅读 9 分钟 在以下教程中,我们将讨论 Apache Kafka 及其在 Python 编程语言中的使用。 了解 Apache KafkaApache Kafka 是一个开源流平台,最初由 LinkedIn 设计。后来,它于 2011 年移交给 Apache 基金会并开源。 根据维基百科的定义 Apache Kafka 是 Apache 软件基金会开发的开源平台,用于处理流。它用 Java 和 Scala 编写。该项目的目标是提供一个高吞吐量、统一、低延迟的平台,以处理实时数据流。Apache Kafka 的存储层本质上是一个“设计为分布式事务日志的大规模可扩展发布/订阅消息队列”,这使得它对于企业基础设施处理流数据非常有价值。此外,Kafka 通过 Kafka Connect 连接到外部系统(用于导入和导出数据),并提供 Kafka Streams,一个用于 Java 流处理的库。 ![]() 我们可以把它看作一个巨大的提交日志,我们可以在其中按事件发生的顺序存储数据。该日志的用户可以根据自己的需要访问和利用它。 Apache Kafka 的一些用例我们可以在不同的地方使用 Apache Kafka。让我们考虑一些 Kafka 的用例,这些用例可以帮助我们弄清楚它的用法
理解 Kafka 的概念让我们讨论 Kafka 的核心概念。 ![]()
现在,让我们考虑一个简单的例子,一家餐馆有一个仓库或库房,里面存放着所有原材料,例如蔬菜、大米、面粉等等。 这家餐馆提供各种菜肴,如印度菜、意大利菜、中国菜等等。每种菜肴的厨师都可以参考仓库选择所需的食材并制作菜肴。所有不同菜系厨师有可能使用由原材料制成的相同东西。这可以是每种菜肴中都使用的一种秘制配料。在以下情况下,仓库充当代理,商品的商人是生产者,厨师制作的商品和秘制配料是主题,厨师是消费者。 如何在 Python 中访问 Kafka?Python 编程语言中有各种库可用于使用 Kafka。其中一些库如下所述
安装依赖项我们将使用 Kafka-Python 进行此项目。因此,我们可以使用 pip 安装程序手动安装它,如下所示 语法 现在,让我们开始构建项目。 项目代码在以下示例中,我们将创建一个生成器,生成从 1 到 500 的数字并将它们发送到 Kafka 代理。然后,消费者将从代理读取数据并将其保存在 MongoDB 集合中。 使用 Kafka 的好处之一是,如果消费者发生故障,另一个或修复的消费者将继续从上次中断的地方读取。这是一个很好的方法,可以确认所有数据都已输入到数据库中,而不会丢失数据或重复数据。 在下面的示例中,让我们创建一个名为 produce.py 的新 Python 程序文件,并开始导入一些必需的库和模块。 文件:produce.py 说明 在上面的代码片段中,我们导入了所需的库和模块。现在,让我们初始化一个新的 Kafka 生产者。请注意以下参数
让我们考虑相同的以下代码片段。 文件:produce.py 说明 在上面的代码片段中,我们使用 KafkaProducer() 函数初始化了 Kafka 生产者,其中我们使用了上面描述的参数。 现在,我们必须生成从 1 到 500 的数字。我们可以使用 for 循环来完成此操作,其中我们将每个数字用作字典中的值,只有一个键:num。此键仅用作数据键,而不作为主题键。在同一循环中,我们还将数据发送到代理。 我们可以通过调用生产者的 send 方法并详细说明主题和数据来执行此操作。 注意:值序列化器将自动转换和编码数据。我们可以休息五秒钟以完成迭代。如果我们需要确认代理是否收到消息,建议包含回调。 文件:produce.py 说明 在上面的代码片段中,我们使用了 for 循环来迭代从 1 到 500 的数字。我们还在每次迭代之间添加了五秒的间隔。 如果有人想测试代码,建议创建一个新主题并将数据发送到该新生成的主题。这种方法将避免任何重复值的情况以及在我们将生产者和消费者一起测试时 testnum 主题中可能出现的混淆。 消费数据在我们开始编写消费者代码之前,让我们创建一个新的 Python 程序文件并将其命名为 consume.py。我们将导入一些模块,例如 json.loads、MongoClient 和 KafkaConsumer。由于 PyMongo 超出本教程的范围,我们不会深入研究其代码。 此外,还可以根据需要将 mongo 代码替换为任何其他代码。我们可以编写此代码以将数据输入到另一个数据库,处理数据的代码,或任何其他可以想到的代码。 让我们考虑下面的代码片段,首先。 文件:consume.py 说明 在上面的代码片段中,我们从各自的库中导入了所需的模块。 让我们创建 Kafka 消费者。我们将为此工作使用 KafkaConsumer() 函数;因此让我们仔细看看此函数中使用的参数。
让我们考虑相同的以下代码片段。 文件:consume.py 说明 在上面的代码片段中,我们使用了 KafkaConsumer() 函数来生成 Kafka 消费者。我们还在函数中添加了我们之前研究过的参数。 现在,让我们考虑下面的代码片段来连接到 MongoDB 数据库的 testnum 集合(此集合类似于关系数据库中的表)。 文件:consume.py 说明 在上面的代码片段中,我们将一个变量定义为 my_client,它使用指定了主机和端口的 MongoClient() 函数。然后,我们定义了另一个变量 my_collection,它使用 my_client 变量访问 testnum 主题中的数据。 可以通过循环遍历消费者来从中提取此数据(在这里,消费者可以被视为一个可迭代对象)。消费者将一直监听,直到代理不再响应。我们可以使用 value 属性访问消息值。在这里,我们用消息值覆盖消息。 下一行将数据插入到数据库集合中。最后一行将打印一条确认消息,表明该消息已添加到我们的集合中。 注意:可以为此循环中的所有操作插入回调。文件:consume.py 说明 在上面的代码片段中,我们使用了 for 循环来迭代消费者以提取数据。现在,为了测试代码,可以首先执行 produce.py 文件,然后执行 consume.py。 下一主题Python 中的增广赋值表达式 |
在本教程中,我们将学习 Python 中的协程。我们将详细讨论协程、子例程、协程的执行和协程的关闭。在我们开始学习协程之前,我们必须对 Python 中的子例程有基本的了解。所以,...
7 分钟阅读
? 集合和列表是 Python 中的数据结构,由数据元素组组成。但是,这两种数据结构也存在一些显著差异。具体来说,列表执行少量数学运算,这些运算对于搜索特定元素很有用 -...
阅读 4 分钟
Python 是一种流行的编程语言,用于数据分析、Web 开发和机器学习等各种任务。其受欢迎的原因之一是可用于扩展其功能的库数量众多。这些库,也称为模块,是预先编写的代码,...
阅读 4 分钟
?自然语言处理 (NLP) 是计算机科学的一个领域,与人工智能、信息工程和人机交互相关。该领域的重点是计算机可以编程来处理和分析大量自然语言数据。这并不容易做到,因为...
阅读 3 分钟
TIFF 文件格式用于存储光栅化图像。一个名为 GDAL 地理空间数据抽象库的库专门用于读取这些光栅文件,以及其他文件格式,例如矢量格式。gdal 库是……的一部分
阅读 2 分钟
scipy.stats.lognorm() 描述了对数正态连续随机变量。它是继承自通用方法的 rv_continuous 类的一个实例。它通过添加特定于此分布的详细信息来完善这些方法。给出对数正态分布的概率密度函数由下式给出:概率密度函数...
阅读 3 分钟
Firebase 是 python 提供的一个库,用于使用 Firebase 提供的各种服务,因此为了更好地理解 Firebase 库,我们需要首先了解 Firebase 及其提供的不同服务...
阅读 22 分钟
如今,万物皆有移动应用。从健康到教育,没有一个领域没有应用。脑海中一闪而过的想法,一经查证,往往已经有了相应的应用。问题是,有大量的...
阅读 6 分钟
根据用户与应用的交互方式对用户进行分类的任务被称为应用用户细分。它有助于定位留存用户,确定营销活动的目标客户群体,并解决涉及搜索的各种其他业务难题...
阅读 15 分钟
简介:本教程讨论了如何使用 Python 将 MultiDict 转换为嵌套字典。MultiDict 是一个类似字典的对象,它包含相同键的多个值,使其成为处理表单和查询字符串的合适数据结构。它是 Python 的子类...
阅读 4 分钟
我们请求您订阅我们的新闻通讯以获取最新更新。
我们提供所有技术(如 Java 教程、Android、Java 框架)的教程和面试问题
G-13, 2nd Floor, Sec-3, Noida, UP, 201301, India