Python 中的 InfluxDB

2024年8月29日 | 阅读 8 分钟

在搜索监控基础设施或第三方应用程序时,Telegraph 的内置插件对我们来说是一个很好的选择。或者,我们在搜索系统资源,如磁盘和网络利用率,或者 MySQL 数据库的性能。

如果我们正在创建一个应用程序,但又希望将用户数据存储在时间序列数据库中呢?也许我们可以将其视为一个物联网 (IoT) 或基于智能家居的应用程序,并且每个用户都需要访问例如他们的智能牙刷的读数。我们想存储每次刷牙的时间和时长,我们可以发送提醒来提醒孩子刷牙,并跟踪电池健康状况以及现有刷头的使用时长等信息。

收集自定义数据,无论是为了面向用户的应用程序还是为了 Telegraf 插件尚未涵盖的基础设施需求,都可能需要编写新的代码块。

让我们以智能牙刷为例,我们有一个运行嵌入式 Linux 的基站,并通过蓝牙与牙刷通信。我们已经写好了监听传入数据的代码块,并且似乎运行良好;现在,我们想将其导入 InfluxDB。

一种方法是与应用程序一起运行 Telegraf,并通过 UDP、Unix 或 TCP 套接字发送数据,让 Telegraf 处理到 InfluxDB 的连接以及数据点的批量处理和写入。

如果我们只需要数据收集,这种方法是完全可以的;但是,如果我们想查询并为用户获取这些数据,我们可能需要利用 InfluxDB 提供的不同语言的库之一,以便在应用程序本身内部处理与 InfluxDB 的交互。

市面上有多种语言已经拥有 InfluxDB 库,其中许多是由其社区管理的。我们将了解 influxdb-python 库的用法。

那么,让我们开始吧。

了解 InfluxDB Python 客户端库

InfluxDB 是一个开源的时间序列数据库或 TSDB,由 InfluxData 公司设计和开发。它采用 Go 编程语言编写,用于存储和检索时间序列数据,应用于运维监控、物联网传感器数据、应用程序指标和实时分析等领域。它还支持从 Graphite 导入数据。

Influxdb-python 库充当与 InfluxDB 交互的 Python 客户端。该库托管在 InfluxDB 的 GitHub 账户上,并由三名社区志愿者维护。

安装库

我们可以使用 pip 安装程序,这是在 Python 中安装库最简单的方法。安装 influxdb 库的语法如下所示

语法

安装完成后,我们可以通过创建一个新的 Python 程序文件并输入以下代码片段来验证它。

语法

现在,让我们保存文件并尝试执行它。如果没有引发错误,则表示库已正确安装。但是,如果出现任何异常,请尝试重新安装或查阅官方文档。

创建连接

下一步,我们将创建一个新的 InfluxDBClient 实例,使用有关我们必须访问的服务器的信息。我们可以使用以下代码片段,它将 host 和 port 的值替换为 InfluxDB 主机的相应 URL/IP 地址和端口。在以下情况下,我们将在默认端口上本地运行

示例

说明

在上面的代码片段中,我们从 influxdb 库中导入了 InfluxDBClient 模块。然后,我们使用 InfluxDBClient() 函数定义了名为 my_Client 的变量的 host 和 port,其中我们分别定义了 host 和 port 参数的值。

InfluxDBClient 构造函数还提供了一些其他参数,包括用户名和密码、要连接的数据库、是否使用 SSL、超时时间和 UDP 参数。

如果我们需要连接到远程主机 somedomain.com 上的 8086 端口,并使用用户名(例如 anonymous)和密码(例如 somepass),并借助 SSL,我们可以使用以下代码片段,通过两个额外参数 ssl = Truessl_verify = True 来启用 SSL 和 SSL 验证:

示例

说明

在上面的代码片段中,我们从 influxdb 库中导入了 InfluxDBClient 模块。然后,我们使用该模块定义了 host、port、username、password、sslverify_ssl,并将值存储在 my_Client 变量中。

现在,让我们创建一个名为 mydatabase 的新数据库,以便按如下方式存储数据:

示例

说明

在上面的代码片段中,我们使用 my_Clientcreate_database 创建了一个名为 mydatabase 的新数据库。

我们可以使用 my_Clientget_list_database() 函数来检查数据库是否已创建,如下所示:

示例

输出

[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'mydatabase'}]

说明

在上面的代码片段中,我们使用 get_list_database() 函数来验证数据库是否已创建。结果是,我们可以看到名为 mydatabase 的数据库,以及安装时存在的 telegraf_internal 数据库。

最后,我们可以设置客户端以使用此数据库,如下面的代码片段所示:

示例

说明

在上面的代码片段中,我们使用 switch_database 将客户端设置为使用指定的数据库,即 mydatabase

插入数据

现在我们已经有了一个可以写入数据的数据库,并且客户端已正确配置,是时候添加一些数据了。我们将利用客户端的 write_points() 方法来实现这一点。此方法接受一个点列表以及其他一些参数,包括 “batch size”,它使我们能够批量插入数据而不是一次全部插入。我们可以使用它来插入大量数据。

write_points() 方法有一个称为 my_points 的参数,它是一个字典列表,包含需要写入数据库的点。现在让我们创建一些示例数据并插入它。首先,让我们将三个 JSON 格式的点插入到一个名为 json_body 的变量中,如下面的代码片段所示:

示例

说明

上面的代码片段显示了智能牙刷的 “刷牙事件”;每个事件都发生在早上八点左右,并带有使用牙刷的人的用户名和牙刷的 ID(这有助于我们跟踪同一刷头的使用时长),并且有一个字段包含用户使用牙刷的时长,单位为秒。

由于我们已经设置了数据库,并且 write_points() 的默认输入是 JSON,我们可以仅以 json_body 变量作为参数来调用该方法,如下所示:

示例

输出

True

说明

在上面的代码片段中,我们使用 write_points() 并以 json_body 变量作为参数。结果是,我们收到了一个布尔值 true 作为响应,如果写入操作成功,则函数返回 true。如果我们创建一个应用程序,我们需要自动进行数据收集,在用户尝试与牙刷交互时将点插入数据库。

查询数据

一旦数据进入数据库,我们就可以尝试使用一些查询来检索它。我们将使用与写入数据相同的客户端对象,不同的是这次我们将执行 InfluxDB 查询并通过客户端的 query() 函数获取结果。

示例

说明

在上面的代码片段中,我们使用了 query() 函数,它返回一个 ResultSet 对象,其中包含所有输出数据以及一些方便的方法。我们的查询请求 mydatabase 数据库中的所有度量,并按用户分组。我们可以使用名为 .raw 的参数来访问来自 InfluxDB 的原始 JSON 响应。

示例

输出

{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Derek'}, 'columns': ['time', 'duration'], 'values': [['2021-08-04T08:01:00Z', 147], ['2021-08-05T08:04:00Z', 131], ['2018-08-06T08:02:00Z', 124]]}]}

说明

在上面的代码片段中,我们使用 raw 参数来访问来自 InfluxDB 的原始 JSON 响应。在大多数情况下,我们不需要直接访问 JSON。相反,我们可以使用 ResultSetget_points() 方法来获取请求的度量,按标签或字段进行过滤。如果我们想遍历 Derek 的所有刷牙会话;我们可以获取所有标记为 “user” 且值为 “Derek” 的点,使用以下命令:

示例

说明

在上面的示例中,my_points 变量是一个 Python Generator,它是一个工作类似于 Iterator 的函数;我们可以使用 for x in y 循环来迭代它,如下所示:

示例

输出

Time: 2021-08-04T08:01:00Z, Duration: 147
Time: 2021-08-05T08:04:00Z, Duration: 131
Time: 2021-08-06T08:02:00Z, Duration: 124

说明

在上面的代码片段中,我们使用 for 循环打印了用户的每次刷牙时间和时长。根据应用程序的不同,我们可能会迭代这些点来计算用户的平均刷牙时间,或者只是验证每天是否有 X 次刷牙事件。


下一个主题Python Kafka 教程