Apache Airflow DAG

8 Jun 2025 | 13 分钟阅读

引言

Apache Airflow 是一个强大的开源平台,用于协调工作流。Airflow 的核心功能是 DAG(有向无环图),它定义了任务的执行顺序及其依赖关系。本指南将深入探讨 DAG 是什么,它的结构,它在 Airflow 中的工作原理,以及创建 DAG 的一些最佳实践。

DAG 是 Apache Airflow 的支柱,它允许用户轻松地编排复杂的工作流;监控 DAG 的健康状况和参数化工作流可以帮助确保您的 Airflow DAG 高效、可扩展且可靠。

什么是 DAG?

DAG,即有向无环图,是一组具有方向关系的节点(任务)的集合。在 Apache Airflow 的上下文中,DAG 定义了一个工作流。

有向:节点(任务)之间的边指向一个方向,表示执行顺序。

无环:图中没有循环,这意味着任务之间没有循环依赖。

在 Airflow 中,DAG 将任务组织成一个反映依赖关系和执行顺序的序列。

Airflow 中 DAG 的结构

Apache Airflow 中的 DAG 在 Python 中定义。这种灵活性允许您动态创建复杂的工作流。DAG 的结构通常包括:

DAG 对象:封装任务及其关系的 मुख्य 实体。

任务:工作流中的单个步骤,执行某个操作(例如,运行脚本、传输数据)。

运算符 (Operators):定义任务做什么,例如 Bash Operator、Python Operator 等。

任务依赖关系:任务之间的关系,定义它们的执行顺序。

示例 DAG 定义

在此示例中

  • 定义了一个名为 `example_dag` 的 DAG。
  • 使用 `Bash Operator` 创建了三个任务,它允许您执行 bash 命令。
  • 使用 `>>` 运算符建立了任务依赖关系,确保 `task_1` 在 `task_2` 之前运行,`task_2` 在 `task_3` 之前运行。

DAG 的关键组成部分

DAG 对象定义了工作流的元数据,包括计划、开始日期和各种可选参数。一些关键参数包括:

  • Dag _id:DAG 的唯一标识符。
  • Description:对 DAG 目的的简短描述。
  • Schedule _interval:定义 DAG 应运行的频率。它可以是 cron 表达式或预定义计划(例如,`@daily`)。
  • Start _date:DAG 开始运行的日期。
  • Catchup:一个布尔值,指示是否运行错过的 DAG 运行(回填)。

任务

每个任务都是一个运算符的实例,定义了在工作流的特定步骤中应该完成的工作。任务是 DAG 的构建块。

一些熟悉的运算符包括:

  • Bash Operator:执行 bash 命令。
  • Python Operator:运行 Python 代码。
  • Email Operator:发送电子邮件。
  • Postgres Operator:对 Postgres 数据库执行 SQL 命令。

任务依赖项

DAG 中的任务按依赖关系排列。您可以使用以下方式定义依赖关系:

  • ``>>``: 任务 A 必须在任务 B 之前运行。
  • ``<<``: 任务 B 必须在任务 A 之后运行。

依赖关系允许您编排复杂的工作流,确保任务按正确的顺序运行。

DAG 调度

Airflow DAG 根据 **`schedule_interval`** 参数进行调度。这定义了 DAG 的触发频率(例如,每日、每小时等)。Airflow 使用类 cron 语法进行调度,但您也可以使用预定义的选项,如 `@daily`、`@hourly` 和 `@weekly`。

例如

  • Schedule _interval=daily: 每天午夜执行 DAG。
  • Schedule _interval=0 12: 每天中午执行 DAG。

DAG 的关键参数和选项

定义 DAG 时,有几个基本参数和选项需要考虑:

  • Dagrun_timeout:定义 DAG 运行在超时之前可以花费的最长时间。
  • Default_args:应用于 DAG 中所有任务的默认参数字典。
  • Catchup:默认情况下,Airflow 会“追溯”所有可能错过的过去 DAG 运行。您可以将 `catchup=False` 来关闭此功能。

使用 default_args 的示例

任务重试和错误处理

Airflow 允许您配置失败任务的重试次数以及如何处理错误。

  • Retries:您可以使用 `retries` 参数指定任务在标记为失败之前应重试多少次。
  • Retry Delay:重试之间等待的时间,使用 `retry_delay` 定义。

示例

在这种情况下,如果任务失败,它将重试 3 次,重试之间间隔 10 分钟。

监控 DAG

Airflow 提供了一个强大的 **UI** 来监控 DAG 和任务的状态。该 UI 允许您:

  • 查看 DAG 运行和任务状态。
  • 重新运行失败的任务。
  • 查看日志以调试任何问题。

除了 UI,Airflow 的指标和日志记录功能还允许对任务和 DAG 执行进行详细跟踪,这些可以与外部系统(例如,Elasticsearch、Prometheus)集成。

Apache Airflow DAG

Apache Airflow 中的有向无环图 (DAG),其中任务标记为 a、b、c 和 d。每个任务都用蓝色矩形表示,并使用 EmptyOperator,这意味着这些任务是占位符,不执行任何特定操作,但有助于定义 DAG 结构。

这是任务及其依赖关系的细分:

任务 a:这是 DAG 的起点。它首先完成,然后允许任何其他任务继续。

任务 b 和 c:这两个任务都依赖于任务 A 的成功完成。这意味着任务 A 必须先完成,然后任务 B 和 c 才能开始。这两个任务 b 和 c 之间没有依赖关系,因此在任务 a 完成后并行运行。

任务 d:此任务依赖于任务 B 和任务 C 的成功完成。一旦 b 和 c 都完成,任务 D 将执行。

此 DAG 结构代表了一个扇入、扇出模式。

任务 A 启动工作流。任务 B 和任务 C 在任务 A 之后并行执行。任务 D 在任务 B 和任务 C 都完成后才运行。

Apache Airflow DAG

Apache Airflow 中的 DAG,具有分支工作流。分支允许 DAG 根据条件或业务逻辑决定采取哪条路径,创建动态工作流,其中仅执行特定的任务,具体取决于所选的分支。

任务和工作流的细分

任务:run _this _first

这是 DAG 的起点。

它使用 Empty Operator,这意味着它不执行任何操作,而是作为工作流的启动占位符。

任务:branching

此任务代表一个分支点。

  • 在 **run _this _first** 完成后,分支任务将确定工作流应遵循的路径。
  • 此决定通常基于 DAG 定义中的逻辑(例如,使用 **Branch Python Operator** 来确定遵循哪个分支)。
  • 根据此分支决策的结果,工作流可以转到 **branch _a 或 branch _b。**

Branch _a 和 branch _b

  • Branch _a 和 branch _b 是工作流在分支后可能遵循的两个替代任务。
  • 将只遵循其中一个路径,具体取决于分支条件。
  • 如果条件导致 **branch _a,** 则将执行 **branch _a** 并继续执行 **follow _branch _a。**
  • 如果条件导致 **branch _b,** 则只执行 **branch _b。**

任务:follow _branch _a

  • 仅当工作流遵循 branch _a 路径时,此任务才会运行。
  • 如果分支决策指示 DAG 执行 branch _a,那么在 branch _a 完成后,将执行 follow _branch _a。
  • 如果选择了 branch _b,则将跳过此任务。

这是一个简化的代码示例,用于表示此 DAG 结构:

  • Run _this _first:DAG 以此占位符任务开始。
  • Branching:此任务使用 Branch Python Operator 来做出分支决策。
  • Branch _decision function:一个包含选择分支逻辑的 Python 函数。它返回 'branch_a' 或 'branch_b',这决定了工作流将采取的路径。
  • Branch _a、follow_branch_a 和 branch_b:这些是代表每个路径的任务。只有一个分支(branch_a 路径或 branch_b)将被执行。
  • Join 最终任务,充当合并点。无论采用哪个分支,该任务都将在 follow_branch_a 或 branch_b 完成后执行。
Apache Airflow DAG

Apache Airflow 中名为 latest_only_with_trigger 的 DAG 的 DAG 图视图。此 DAG 似乎使用 Latest OnlyOperator 来确保只有最新的 DAG 运行执行特定的下游函数,而较旧的回填运行则会跳过某些任务。

Latest _ only Task

  • latest _only 任务使用 Latest OnlyOperator,该运算符旨在确保只有最新的计划 DAG 运行执行下游函数。
  • 如果当前 DAG 运行是最新的计划运行,则下游任务将按常规执行。如果不是最新的运行(例如,回填运行),则将跳过下游函数。

下游任务(task1、task2、task3、task4)

任务依赖关系:latest_only 任务是 task1 的上游,task1 进而触发 task2 和 task3。任务完成后,它会触发任务。

  • 下游任务以一种类似于带有分支的链条的方式连接。

执行状态

task1 和 task3:这些任务被标记为已跳过。这意味着它们没有运行,因为它们是 latest_only 任务的下游,而当前 DAG 运行不是最新的。

task2 和 task4:这些任务显示成功状态,意味着它们已成功执行。这可能表明 LatestOnlyOperator 中的逻辑允许它们执行,因为此 DAG 运行实际上是当时最新的计划运行。

DAG 状态和执行时间线

左侧显示了 DAG 运行的历史记录,其中各种运行显示了不同的任务执行状态(绿色表示成功,红色表示失败,粉色表示跳过)。

Apache Airflow DAG

这两张图代表了 Apache Airflow 中的有向无环图 (DAG),使用 `EmptyOperator` 作为占位符任务,演示了分支和合并工作流。

以下是 DAG 中每个步骤及其关系的细分:

Run _this _first

  • 这是初始任务,最先执行。
  • 它直接依赖于 `branching` 任务。

分支

  • 此任务遵循 **`run _this _first`**。
  • 它充当工作流中的分支点,在那里它将确定 DAG 应遵循的路径。
  • 此分支条件的决定将决定是执行 **`branch_a`** 还是跳过。

Branch _a

  • 此任务遵循 `branching` 任务。
  • 如果满足 `branching` 中设置的条件,将执行 **`branch_a`**。
  • 否则,它将被标记为“已跳过”,DAG 将直接进行到 `join` 任务。

Follow _Branch _a

  • 此任务依赖于 **`branch_a`** 的成功完成。
  • 如果 **`branch_a`** 被跳过,则 **`follow_branch_a`** 不会执行。

连接

  • 此任务充当工作流中的汇合点。
  • 它会等待直到先前的分支决策得到解决。
  • 在 **`branch_a`** 被跳过的情况下,它将无视跳过状态并完成工作流。

工作流的关键点

  • 分支逻辑:`branching` 中的分支条件决定了是否执行 `branch_a` 及其依赖的 `follow_branch_a`。
  • Join 任务:`join` 任务在分支结束时运行,无论执行哪个分支或被跳过。
Apache Airflow DAG

DAG 中的步骤

开始 (start)

  • 这是 DAG 的初始任务,作为起点。
  • 完成后,它会并行触发多个任务。
  • section-1-tasks (section-1-task-1 到 section-1-task-5)
  • 这些是 Section 1 中按顺序连接的五个任务。
  • 每个任务(从 section-1-task-1 到 section-1-task-5)都依赖于前一个任务的完成,形成一个线性序列。
  • Section 1 中的任务都流入到其他一些任务中。
  • section-2-tasks (section-2-task-1 到 section-2-task-5)
  • 这些是 Section 2 中按顺序连接的五个任务,与 Section 1 并行。
  • 每个任务(从 section-2-task-1 到 section-2-task-5)都遵循类似的顺序模式。
  • Section 2 的任务也汇入到其他一些任务中。

some-other-task

  • 此任务充当 DAG 中的检查点或同步点。
  • Section 1 和 Section 2 的任务都汇入到这里。
  • 在 Section 1 和 Section 2 中的所有前置任务都完成后,Some-other-task 才会执行。

End

  • 这是 DAG 中的最后一个任务,标志着工作流的结束。
  • 它依赖于其他一些任务的完成。

打包有向无环图 (DAG)

在 Apache Airflow 中打包有向无环图 (DAG) 对于高效地组织和部署工作流至关重要。通过正确构建和打包 DAG,您可以确保您的 Airflow 工作流是可扩展、可管理和可重用的。

以下是有关有效打包 DAG 的详细指南。

Apache Airflow 中的 DAG 代表了一个工作流,其中节点代表任务,有向边代表任务之间的依赖关系。每个 DAG 根据计划运行,或者可以手动触发,并且可以包含涉及并行处理、分支和条件任务执行的复杂工作流。

  • Airflow 从 Python 文件读取这些 DAG,因此打包 DAG 涉及组织这些文件及其依赖关系,以便能够在不同环境之间轻松部署和管理。

DAG 的文件夹结构

要打包 DAG,您应该以清晰、一致的结构组织文件。一个组织良好的 DAG 文件夹结构可能如下所示:

关键目录和文件

DAG 文件:这是定义 DAG 的主 .py 文件。它包含 DAG 定义、任务运算符和调度逻辑。

Utils:包含 DAG 中使用的任何辅助函数或自定义类。

SQL:将 SQL 查询存储为 .sql 文件,将复杂的查询与主 DAG 文件分开。

Config:用于可能需要在不同环境之间更改的参数的配置文件,例如文件路径或 API 密钥。

Plugins:如果您使用自定义运算符、Hook 或其他插件,可以将它们放在此处。Airflow 将自动从该文件夹加载插件。

使用自定义运算符和插件

Airflow 的内置运算符涵盖了许多用例,但您可能需要创建自定义运算符来处理特定任务。将自定义运算符打包到单独的文件或作为插件中可以提高模块化和可重用性。

要创建自定义运算符:

定义运算符:创建一个新的 Python 文件,例如 custom_operators.py。

在 DAG 中引用运算符:在主 DAG 文件中导入您的自定义运算符。

提示:如果您的自定义运算符是通用的,请考虑将它们作为插件放在 plugins/ 文件夹中,以便它们可以在多个 DAG 中重用。

依赖管理

许多 DAG 需要外部库,这些库应作为部署的一部分进行打包。为了管理依赖关系:

使用 Requirements 文件:创建一个 requirements.txt 文件,列出您的 DAG 使用的所有库。

Makefile

在 Airflow 环境中安装依赖项:确保所有 DAG 依赖项都已安装在 Airflow 环境中。部署时,您可以使用 Docker 镜像或在部署脚本中指定依赖项。

Python 虚拟环境:如果您在本地运行 Airflow,请考虑使用虚拟环境来隔离不同项目的依赖项。

外部化配置

API 密钥、特定于环境的 URL 或文件路径等配置详细信息不应硬编码在 DAG 中。

相反:

使用环境变量:Airflow 可以访问环境变量,这使得在不同环境(例如,开发、暂存、生产)之间更改配置设置更加容易。

使用配置文件:将配置参数存储在 YAML 或 JSON 文件中,并通过辅助函数在 DAG 中读取它们。

加载 YAML 配置文件示例

测试和验证

在部署之前测试 DAG 有助于及早发现错误。测试技术包括:

单元测试:使用 Python 的单元测试或 pytest 模块来测试单个函数或运算符。

DAG 验证:通过运行以下命令确保 DAG 文件在语法上是正确的:

Airflow dags test my_dag 2024-11-10

任务级别测试:隔离并测试 DAG 中的单个任务,尤其是在涉及复杂逻辑或数据处理时。

打包以进行部署

当您的 DAG 准备就绪时,请打包以进行部署:

压缩 DAG 目录:如果部署到远程环境,请归档整个 DAG 目录。

使用 tar 或 zip

复制到 Airflow DAG 文件夹:对于本地部署,将文件直接复制到 Airflow 的 DAG 目录,通常是 **AIRFLOW_HOME/dags/。**

使用 CI/CD 管道:在生产环境中,使用 CI/CD 管道自动进行打包和部署,这些管道还可以处理依赖项安装和环境配置。

DAG 打包最佳实践

  • 使用版本控制:将 DAG 和配置保存在版本控制(例如 Git)中,以便跟踪更改并在必要时回滚。
  • 模块化代码:将复杂逻辑分解为小型、可重用的组件,并避免跨 DAG 复制代码。
  • 保持 DAG 轻量级:仅将必要的文件夹包含在 DAG 包中。避免可能减慢 Airflow 调度器速度的大型文件或数据。
  • 使用日志记录:在任务中实现日志记录,以帮助进行调试和监控。

部署和验证 DAG

将打包的 DAG 部署到 Airflow 环境后:

刷新 Airflow UI:Airflow 会自动检测新的 DAG 文件。确认您的 DAG 出现在 UI 中。

触发测试运行:执行测试运行以确保 DAG 按预期运行。

监控 DAG:使用 Airflow 的监控工具来跟踪任务执行,识别瓶颈,并检查任何错误。