Apache Airflow DAG8 Jun 2025 | 13 分钟阅读 引言Apache Airflow 是一个强大的开源平台,用于协调工作流。Airflow 的核心功能是 DAG(有向无环图),它定义了任务的执行顺序及其依赖关系。本指南将深入探讨 DAG 是什么,它的结构,它在 Airflow 中的工作原理,以及创建 DAG 的一些最佳实践。 DAG 是 Apache Airflow 的支柱,它允许用户轻松地编排复杂的工作流;监控 DAG 的健康状况和参数化工作流可以帮助确保您的 Airflow DAG 高效、可扩展且可靠。 什么是 DAG?DAG,即有向无环图,是一组具有方向关系的节点(任务)的集合。在 Apache Airflow 的上下文中,DAG 定义了一个工作流。 有向:节点(任务)之间的边指向一个方向,表示执行顺序。 无环:图中没有循环,这意味着任务之间没有循环依赖。 在 Airflow 中,DAG 将任务组织成一个反映依赖关系和执行顺序的序列。 Airflow 中 DAG 的结构Apache Airflow 中的 DAG 在 Python 中定义。这种灵活性允许您动态创建复杂的工作流。DAG 的结构通常包括: DAG 对象:封装任务及其关系的 मुख्य 实体。 任务:工作流中的单个步骤,执行某个操作(例如,运行脚本、传输数据)。 运算符 (Operators):定义任务做什么,例如 Bash Operator、Python Operator 等。 任务依赖关系:任务之间的关系,定义它们的执行顺序。 示例 DAG 定义 在此示例中
DAG 的关键组成部分DAG 对象定义了工作流的元数据,包括计划、开始日期和各种可选参数。一些关键参数包括:
任务每个任务都是一个运算符的实例,定义了在工作流的特定步骤中应该完成的工作。任务是 DAG 的构建块。 一些熟悉的运算符包括:
任务依赖项 DAG 中的任务按依赖关系排列。您可以使用以下方式定义依赖关系:
依赖关系允许您编排复杂的工作流,确保任务按正确的顺序运行。 DAG 调度Airflow DAG 根据 **`schedule_interval`** 参数进行调度。这定义了 DAG 的触发频率(例如,每日、每小时等)。Airflow 使用类 cron 语法进行调度,但您也可以使用预定义的选项,如 `@daily`、`@hourly` 和 `@weekly`。 例如
DAG 的关键参数和选项 定义 DAG 时,有几个基本参数和选项需要考虑:
使用 default_args 的示例 任务重试和错误处理Airflow 允许您配置失败任务的重试次数以及如何处理错误。
示例 在这种情况下,如果任务失败,它将重试 3 次,重试之间间隔 10 分钟。 监控 DAGAirflow 提供了一个强大的 **UI** 来监控 DAG 和任务的状态。该 UI 允许您:
除了 UI,Airflow 的指标和日志记录功能还允许对任务和 DAG 执行进行详细跟踪,这些可以与外部系统(例如,Elasticsearch、Prometheus)集成。 ![]() Apache Airflow 中的有向无环图 (DAG),其中任务标记为 a、b、c 和 d。每个任务都用蓝色矩形表示,并使用 EmptyOperator,这意味着这些任务是占位符,不执行任何特定操作,但有助于定义 DAG 结构。 这是任务及其依赖关系的细分: 任务 a:这是 DAG 的起点。它首先完成,然后允许任何其他任务继续。 任务 b 和 c:这两个任务都依赖于任务 A 的成功完成。这意味着任务 A 必须先完成,然后任务 B 和 c 才能开始。这两个任务 b 和 c 之间没有依赖关系,因此在任务 a 完成后并行运行。 任务 d:此任务依赖于任务 B 和任务 C 的成功完成。一旦 b 和 c 都完成,任务 D 将执行。 此 DAG 结构代表了一个扇入、扇出模式。 任务 A 启动工作流。任务 B 和任务 C 在任务 A 之后并行执行。任务 D 在任务 B 和任务 C 都完成后才运行。 ![]() Apache Airflow 中的 DAG,具有分支工作流。分支允许 DAG 根据条件或业务逻辑决定采取哪条路径,创建动态工作流,其中仅执行特定的任务,具体取决于所选的分支。 任务和工作流的细分 任务:run _this _first 这是 DAG 的起点。 它使用 Empty Operator,这意味着它不执行任何操作,而是作为工作流的启动占位符。 任务:branching 此任务代表一个分支点。
Branch _a 和 branch _b
任务:follow _branch _a
这是一个简化的代码示例,用于表示此 DAG 结构:
![]() Apache Airflow 中名为 latest_only_with_trigger 的 DAG 的 DAG 图视图。此 DAG 似乎使用 Latest OnlyOperator 来确保只有最新的 DAG 运行执行特定的下游函数,而较旧的回填运行则会跳过某些任务。 Latest _ only Task
下游任务(task1、task2、task3、task4) 任务依赖关系:latest_only 任务是 task1 的上游,task1 进而触发 task2 和 task3。任务完成后,它会触发任务。
执行状态 task1 和 task3:这些任务被标记为已跳过。这意味着它们没有运行,因为它们是 latest_only 任务的下游,而当前 DAG 运行不是最新的。 task2 和 task4:这些任务显示成功状态,意味着它们已成功执行。这可能表明 LatestOnlyOperator 中的逻辑允许它们执行,因为此 DAG 运行实际上是当时最新的计划运行。 DAG 状态和执行时间线左侧显示了 DAG 运行的历史记录,其中各种运行显示了不同的任务执行状态(绿色表示成功,红色表示失败,粉色表示跳过)。 ![]() 这两张图代表了 Apache Airflow 中的有向无环图 (DAG),使用 `EmptyOperator` 作为占位符任务,演示了分支和合并工作流。 以下是 DAG 中每个步骤及其关系的细分: Run _this _first
分支
Branch _a
Follow _Branch _a
连接
工作流的关键点
![]() DAG 中的步骤 开始 (start)
some-other-task
End
打包有向无环图 (DAG) 在 Apache Airflow 中打包有向无环图 (DAG) 对于高效地组织和部署工作流至关重要。通过正确构建和打包 DAG,您可以确保您的 Airflow 工作流是可扩展、可管理和可重用的。 以下是有关有效打包 DAG 的详细指南。 Apache Airflow 中的 DAG 代表了一个工作流,其中节点代表任务,有向边代表任务之间的依赖关系。每个 DAG 根据计划运行,或者可以手动触发,并且可以包含涉及并行处理、分支和条件任务执行的复杂工作流。
DAG 的文件夹结构要打包 DAG,您应该以清晰、一致的结构组织文件。一个组织良好的 DAG 文件夹结构可能如下所示: 关键目录和文件DAG 文件:这是定义 DAG 的主 .py 文件。它包含 DAG 定义、任务运算符和调度逻辑。 Utils:包含 DAG 中使用的任何辅助函数或自定义类。 SQL:将 SQL 查询存储为 .sql 文件,将复杂的查询与主 DAG 文件分开。 Config:用于可能需要在不同环境之间更改的参数的配置文件,例如文件路径或 API 密钥。 Plugins:如果您使用自定义运算符、Hook 或其他插件,可以将它们放在此处。Airflow 将自动从该文件夹加载插件。 使用自定义运算符和插件Airflow 的内置运算符涵盖了许多用例,但您可能需要创建自定义运算符来处理特定任务。将自定义运算符打包到单独的文件或作为插件中可以提高模块化和可重用性。 要创建自定义运算符: 定义运算符:创建一个新的 Python 文件,例如 custom_operators.py。 在 DAG 中引用运算符:在主 DAG 文件中导入您的自定义运算符。 提示:如果您的自定义运算符是通用的,请考虑将它们作为插件放在 plugins/ 文件夹中,以便它们可以在多个 DAG 中重用。 依赖管理许多 DAG 需要外部库,这些库应作为部署的一部分进行打包。为了管理依赖关系: 使用 Requirements 文件:创建一个 requirements.txt 文件,列出您的 DAG 使用的所有库。 Makefile 在 Airflow 环境中安装依赖项:确保所有 DAG 依赖项都已安装在 Airflow 环境中。部署时,您可以使用 Docker 镜像或在部署脚本中指定依赖项。 Python 虚拟环境:如果您在本地运行 Airflow,请考虑使用虚拟环境来隔离不同项目的依赖项。 外部化配置 API 密钥、特定于环境的 URL 或文件路径等配置详细信息不应硬编码在 DAG 中。 相反: 使用环境变量:Airflow 可以访问环境变量,这使得在不同环境(例如,开发、暂存、生产)之间更改配置设置更加容易。 使用配置文件:将配置参数存储在 YAML 或 JSON 文件中,并通过辅助函数在 DAG 中读取它们。 加载 YAML 配置文件示例 测试和验证在部署之前测试 DAG 有助于及早发现错误。测试技术包括: 单元测试:使用 Python 的单元测试或 pytest 模块来测试单个函数或运算符。 DAG 验证:通过运行以下命令确保 DAG 文件在语法上是正确的: Airflow dags test my_dag 2024-11-10 任务级别测试:隔离并测试 DAG 中的单个任务,尤其是在涉及复杂逻辑或数据处理时。 打包以进行部署当您的 DAG 准备就绪时,请打包以进行部署: 压缩 DAG 目录:如果部署到远程环境,请归档整个 DAG 目录。 使用 tar 或 zip 复制到 Airflow DAG 文件夹:对于本地部署,将文件直接复制到 Airflow 的 DAG 目录,通常是 **AIRFLOW_HOME/dags/。** 使用 CI/CD 管道:在生产环境中,使用 CI/CD 管道自动进行打包和部署,这些管道还可以处理依赖项安装和环境配置。 DAG 打包最佳实践
部署和验证 DAG将打包的 DAG 部署到 Airflow 环境后: 刷新 Airflow UI:Airflow 会自动检测新的 DAG 文件。确认您的 DAG 出现在 UI 中。 触发测试运行:执行测试运行以确保 DAG 按预期运行。 监控 DAG:使用 Airflow 的监控工具来跟踪任务执行,识别瓶颈,并检查任何错误。 |
我们请求您订阅我们的新闻通讯以获取最新更新。