Apache Airflow in Python | Airflow Python Operator

2024 年 8 月 29 日 | 阅读 6 分钟

在本教程中,我们将学习Apache Airflow及其算子。我们将讨论所有Airflow的算子,但我们的主要目标是探索Python算子以及如何使用它。在深入研究这个主题之前,让我们先了解Airflow的基本概念以及它为什么如此受欢迎。

什么是数据管道?

数据管道包含多个必须执行的任务或操作才能获得所需结果。例如 - 我们想创建一个天气API,用于预测未来一周的天气。我们需要执行以下任务来实现这个实时天气仪表板。

  • 从天气API获取天气预报数据。
  • 对获取的数据进行一些转换(例如,将温度从华氏度转换为摄氏度,反之亦然),以便数据适合我们的目的。
  • 将转换后的数据推送到天气仪表板。

正如我们所见,这些是管道中的一系列任务。此外,这些任务需要按特定顺序执行。

什么是Apache Airflow?

Apache Airflow是一个在数据工程领域广受欢迎的工具。它是一个工作流引擎,可以轻松地调度和运行复杂的数据管道。它确保数据管道中的每个任务都按顺序执行,并且每个任务都获得所需的资源。它提供了出色的用户界面,并监控和修复问题。

Airflow使用DAG(有向无环图),它是用户希望运行的所有任务的集合。这些任务的组织方式可以维护其关系和依赖关系。DAG的结构(任务及其依赖关系)以Python脚本代码的形式指定。

数据管道的DAG是使任务关系更加清晰的最佳方式。节点代表任务,有向边代表任务之间的依赖关系。

例如,如果任务X通过边连接并指向任务Y,则任务A必须在任务B开始之前完成。这种方向使其成为有向图。

安装Airflow

  • 要安装Airflow,我们将使用以下pip命令。
  • Airflow安装完成后,使用以下命令初始化元数据数据库(Airflow所有内容存储的数据库)来启动它。
  • 现在,启动apache airflow调度器

在Airflow目录中创建dags文件夹很重要,我们将在这里定义我们的DAG。现在打开Web浏览器并访问 https://:8080/admin/ ,它看起来会像下面一样。

Apache Airflow中的Python Operator

Apache Airflow中有多种算子,例如BashOperator,PythonOperator,EmailOperator,MySqlOperator等。算子指定一个工作流任务,并且算子为不同的任务提供了许多算子。

在本节中,我们将讨论Python算子。

导入库

定义DAG参数

我们需要为每个DAG传递一个参数字典。以下是我们可以传递的参数的描述。

  • owner -工作流所有者的姓名。它应该是字母数字且可以包含下划线,但不能包含空格。
  • depends_on_past -每次运行工作流时,我们需要将过去运行标记为true;否则,将其标记为False。
  • start_date -它指定了工作流的start_date
  • email -它代表我们的电子邮件ID,以便在任何任务因任何原因失败时收到电子邮件。
  • retry_delay - 它表示任务失败的时间以及它应该等待多长时间进行重试。

让我们理解以下示例 -

示例 -

定义Python函数

现在,我们将定义一个Python函数,该函数将使用参数打印给定的字符串,Python算子稍后将使用此函数。

定义DAG

下一步是创建DAG对象并传递dag_id。dag_id是DAG的名称,必须是唯一的。然后传递我们之前定义的参数,并添加描述和schedule_interval。它将在指定的时间间隔后运行DAG。让我们看下面的例子。

定义任务

在工作流中,我们只定义了一个任务 -

  • print - 在任务中,我们将使用python函数打印终端的字符串值。

我们将task_id传递给Python Operator对象。我们将在DAG的Graph视图节点上看到名称。在python_callable参数中,传递我们想要执行的函数名称,并将它的参数值"op_kwargs"作为字典传递,最后,将我们想要链接此任务的DAG对象传递过去。

运行DAG

现在,刷新Airflow仪表板;它将在列表中显示DAG。工作流中的每个步骤都将是一个单独的框;单击DAG并等待其边框变成深绿色,表示它已成功完成。

单击“print”节点以获取有关此步骤的更多详细信息,然后单击“Logs”,您将看到如下输出。

Apache Airflow中的变量是什么?

正如我们所讨论的,Airflow可用于创建和管理复杂的工作流。我们可以同时运行多个工作流。工作流可以使用数据库或相同的文件路径。现在,我们更改用户保存文件或更改数据库配置的目录路径。在这种情况下,我们不想去单独更新每个DAG。

使用Airflow,我们可以创建变量,可以在其中存储和检索运行时在多个DAG中的数据。如果我们想进行更改,我们可以编辑变量,我们的工作流就可以正常运行了。

如何创建变量?

要创建变量,我们打开Airflow并单击顶部菜单中的“Admin”,然后单击“Variables”。

单击“Create”按钮创建一个新变量,将打开一个窗口。现在添加值并提交。

现在,我们将创建一个DAG,在该DAG中我们将计算此文件中文本数据的字数。我们可以导入新创建的变量。让我们来理解下面的例子。

示例 -

现在我们将定义一个将使用该变量、读取它并计算单词数的函数。

现在,步骤与上面相同,我们需要定义DAG和任务,我们的工作流就可以运行了。

当我们运行DAG时,它将显示单词数。我们还可以随时编辑DAG,我们所有的DAG都会得到更新。

结论

在本教程中,我们讨论了Apache Airflow中的Python Operator以及普通变量和分支。我们已经理解了Apache Airflow的基本概念及其安装。


下一个主题Python中的Currying