Apache Airflow DAG 运行

2025年6月8日 | 阅读11分钟

引言

在数据编排领域,Apache Airflow 已成为管理复杂工作流的领先工具。Airflow 的核心是有向无环图 (DAG),它通过一系列相互连接的任务来表示工作流。DAG 的每次执行都称为“DAG 运行”。

什么是 DAG 运行?

DAG 运行是 DAG 在特定时间点执行的实例。每个 DAG 运行对应一个执行日期,该日期用作跟踪历史数据处理作业和 DAG 中每个任务状态的参考。这个概念允许用户清晰地查看工作流执行,从而促进调试和性能监控。

DAG 运行的特点

执行日期

  • 每个 DAG 运行都与特定的执行日期相关联。此日期表示运行的调度时间,并有助于管理数据依赖关系。
  • 执行日期对于涉及回填的场景至关重要,在这些场景中,用户可能需要补上错过的运行。

任务实例

  • DAG 运行由多个任务实例组成,每个任务实例表示 DAG 中定义的特定任务。
  • 每个任务实例都可以有其状态(例如,成功、失败),并有助于 DAG 运行的整体状态。

国家

  • DAG 运行可以有多种状态,包括“运行中”、“成功”、“失败”、“跳过”和“上游失败”。
  • 这些状态提供了对工作流执行和结果的洞察,对于监控至关重要。

回填

  • 回填允许用户调度过去执行日期的 DAG 运行,这对于确保数据完整性特别有用。
  • 例如,如果由于系统中断导致 DAG 几天未执行,回填允许用户为每个错过的日期运行 DAG。

为了说明这个概念,让我们看一个处理数据的 DAG 的简单示例

在此示例中,`data_processing_dag` DAG 定义了一个处理数据的单个任务。每次 DAG 运行时,都会创建一个新的 DAG 运行,并关联一个执行日期。

DAG 运行状态

DAG 运行的状态是监控和管理 Apache Airflow 中工作流的关键方面。了解这些状态有助于诊断问题并确保工作流按预期运行。

常见的 DAG 运行状态

运行中 (Running)

  • 表示 DAG 运行当前正在执行。运行中的任务正在积极处理。
  • 此状态是动态的,会随着任务的完成或失败而改变。

成功

  • 这表示 DAG 运行中的所有任务都已成功完成。
  • 当 DAG 运行达到此状态时,意味着预期的工作流已成功执行,没有出现问题。

Failed

  • 当 DAG 运行中的一个或多个任务未能按预期执行时,会分配此状态。
  • 失败可能由于各种原因而发生,例如代码错误、数据问题或资源不可用。

跳过

  • 如果其上游依赖项未成功完成,则任务可能会被跳过。
  • 如果任务定义中的条件逻辑导致任务被绕过,也会发生这种情况。

上游失败

  • 表示 DAG 运行受到一个或多个上游任务失败的影响。
  • 如果任务由于先决任务失败而无法继续,则会将其标记为上游失败。

排队

  • 当 DAG 运行在队列中等待资源或执行槽时,它被标记为排队。
  • 这通常发生在多个 DAG 并发运行的环境中。

重试

  • 如果任务失败但已配置为重试,则状态将指示它正在重试任务。
  • 重试次数和尝试之间的延迟可以在任务定义中配置。

监控 DAG 运行状态

Airflow UI:Web 界面允许用户通过用户友好的仪表板可视化 DAG 运行及其状态。用户可以深入到特定任务以调查任何问题或查看日志。

日志:每个任务的详细日志提供执行详细信息的洞察,有助于识别故障和性能瓶颈。日志可以通过 Airflow UI 或直接从文件系统访问。

警报和通知:为失败的 DAG 运行设置警报可以帮助团队快速响应问题并减少数据处理中的延迟。Airflow 支持与各种通知系统集成,例如电子邮件和 Slack。

这是一个如何以编程方式检查 DAG 运行状态的示例

重新运行 DAG

重新运行 DAG 是数据编排中的常见任务,特别是在发生故障或需要更新数据时。Airflow 提供了重新运行 DAG 的灵活性,允许用户指定重新执行的范围和条件。

重新运行 DAG 的场景

任务失败

  • 选择性重新运行:当任务失败时,Airflow 允许用户仅重新运行失败的任务,而不是整个 DAG,从而节省资源并减少停机时间。
  • 示例用例:如果数据提取任务由于临时系统问题而失败,则仅重新运行该任务可以快速恢复工作流,而无需不必要的重新处理。

数据更正

  • 确保数据完整性:在数据更正的情况下,重新运行受影响的 DAG 可确保下游任务反映最新的、更正的数据。这对于维护工作流之间的一致性至关重要。
  • 示例用例:当上游源中的数据差异得到纠正时,可以重新执行所有下游任务,以确保它们包含修订后的数据。

调度回填

  • 回填功能:Airflow 的回填功能允许用户重新处理过去的 DAG 运行,这对于补上错过的任务或填补历史数据中的空白至关重要。
  • 示例用例:如果由于系统停机而错过了 DAG 运行,回填可确保为这些日期完成所有预期的数据处理。

如何重新运行 DAG

在 Airflow 中重新运行 DAG 可以通过多种方法实现

手动触发:用户可以从 Airflow UI 手动触发 DAG 运行。这种方法简单明了,并提供了执行工作流的可视化方式。

命令行界面 (CLI):您可以使用 Airflow CLI 触发和重新运行 DAG。

这是重新运行 DAG 的命令

此命令会为指定的执行日期触发 `data_processing_dag`。用户可以根据需要自定义执行日期。

API:您还可以使用 REST API 触发 DAG 运行。下面是一个使用 `requests` 库触发 DAG 的简单 Python 脚本

重新运行 DAG 时的注意事项

幂等性

确保任务设计为幂等的,这意味着多次运行它们不会导致意外的副作用。这对于向数据库或文件系统写入数据的任务尤其重要。

资源管理

有效的资源管理对于使用 Apache Airflow 至关重要,尤其是在计算资源可能有限的共享环境中。重新运行多个 DAG 时,必须注意任务的调度和执行方式,因为同时运行可能导致资源争用,从而导致性能下降或任务失败。

DAG 依赖项

了解 DAG 中的依赖项对于有效的工作流管理至关重要。DAG 中的每个任务都可以具有上游和下游依赖项,这意味着一个任务的执行状态可以直接影响其他任务。

  • 重新运行 DAG 时,必须考虑这将如何影响相关任务;例如,如果上游任务失败或被跳过,下游任务可能也需要重新评估。
  • 如果不妥善管理,这种连锁效应可能导致意想不到的结果。仔细规划任务依赖项和在任务执行中使用条件逻辑可以帮助降低这些风险。
  • 此外,在 Airflow UI 中可视化 DAG 依赖项可以提供清晰度,并有助于排查与任务执行顺序相关的问题。

数据一致性

维护数据一致性是一个关键问题,尤其是在重新运行修改或转换数据的任务时。在数据完整性至关重要的场景中,可能需要实施保障措施,以确保以前的状态不会因重新执行而受到不利影响。

  • 这可能涉及在重新运行任务之前清理或将数据重置为已知良好状态,以防止重复条目或损坏的数据集。
  • 此外,实施幂等任务——那些可以安全地多次执行而不会产生意外副作用的任务——可以显著增强工作流的健壮性。
  • 建议在将数据更改应用于生产环境之前进行彻底的测试和验证,确保任何重新执行都符合组织的整体数据治理策略。

假设我们有一个 DAG,它从源系统提取数据,对其进行转换,然后将其加载到数据库中。如果提取任务失败,我们可能只想重新运行该任务。以下是我们如何实现此目的的方法

识别失败的任务:检查 Airflow UI 或日志以确定哪个任务失败。

重新运行任务:使用 Airflow UI 或 CLI 触发特定任务的重新运行。

重新运行失败任务的 CLI 命令示例

此命令清除指定执行日期的 `extract_data_task` 的状态,允许它重新运行。

外部触发器

Airflow 中的外部触发器允许用户根据源自 Airflow 环境之外的事件或条件启动 DAG 运行。此功能增强了数据工作流的灵活性和响应能力。

外部触发器的用例

事件驱动的工作流

  • 动态触发:外部触发器允许 DAG 响应实时事件运行,从而创建高度响应的工作流系统。
  • 示例用例:如果新文件到达云存储,可以触发 Airflow 立即处理该文件,从而减少数据工作流中的延迟。

与其他系统的集成

  • 无缝集成:外部触发器使 Airflow 更容易与各种工具和服务集成,根据这些系统中的特定事件或条件启动工作流。
  • 示例用例:业务应用程序可能会在用户完成操作时发送 webhook 以启动 DAG,使工作流对用户行为做出反应。

API 驱动的执行

  • 编程控制:Airflow 的 API 使外部应用程序能够以编程方式触发 DAG,为高级自动化和自定义集成提供了灵活性。
  • 示例用例:自动化工具可以根据 Airflow 内置调度程序之外的复杂调度或依赖项启动 DAG 运行,从而实现对工作流更精细的控制。

如何设置外部触发器

定义触发条件:确定将触发 DAG 运行的特定事件或条件。这可能包括文件上传、API 调用或计划事件。

在此示例中,Flask 应用程序侦听对 `/trigger_dag` 端点的 POST 请求,根据传入的请求数据触发指定的 DAG。

管理 DAG 运行的最佳实践

调度和依赖项

  • 正确调度:精心设计的调度可确保 DAG 以最佳间隔运行,而不会使系统过载。使用 cron 语法或 Airflow 支持的其他调度间隔选择与您的数据更新频率或业务需求相匹配的调度。

资源管理

  • 监控资源使用情况:密切关注 CPU 和内存使用情况对于防止系统过载至关重要,尤其是在运行多个 DAG 或高频任务时。定期监控允许根据需要扩展资源。

任务超时和重试

配置超时:根据历史数据设置任务超时,考虑每个任务的合理时间。这有助于避免不必要的长时间任务执行,这可能会影响整体工作流时间线。

日志记录和监控

详细日志记录:全面的日志记录对于诊断问题至关重要。启用健壮的日志记录以捕获执行详细信息和错误,从而简化故障排除。

监控和仪表板:Airflow 的内置监控和仪表板功能以及指标提供了对 DAG 性能的洞察。这些指标可以帮助查明瓶颈并识别优化机会。

测试和验证

生产前测试:在部署前测试 DAG 至关重要。这包括验证任务配置、依赖项和预期行为。在暂存环境中测试 DAG 对于大型工作流尤其有用。

  • 边缘情况验证:应验证数据处理逻辑和边缘情况处理,以确保可靠运行。这可能涉及对数据完整性、准确性和非典型数据模式处理的测试。

文档

  • 详细文档:维护文档对于团队之间的清晰度和知识共享至关重要。记录任务描述、依赖项、配置和目的等关键信息。
  • 故障排除指南:包含常见问题和故障排除步骤的说明可以在将来节省时间,特别是对于重复出现的问题或容易出现特定错误的工作流。

版本控制

  • 使用 Git 或类似工具:版本控制对于跟踪更改和在需要时回滚到以前的版本至关重要。这确保了工作流保持稳定,并使理解最近更改的影响变得更容易。
  • 语义版本控制:采用语义版本控制来系统地管理 DAG 版本,这在大型团队或频繁更新的环境中特别有用。

安全最佳实践

  • 访问控制:为 Airflow 实施安全访问控制,根据角色或需要限制访问。将管理权限限制为受信任的用户,并尽可能使用加密连接来保护敏感数据。
  • 定期审计:进行定期权限审计并审查访问日志,以维护与组织策略的合规性。这对于满足法规要求尤其重要。

性能优化

  • 优化数据移动:通过尽可能靠近源处理数据来减少数据移动。传输大型数据集时,考虑使用 Airflow 的远程日志记录或分布式执行功能来卸载处理。
  • 明智使用 XCom:虽然 XComs 有助于在任务之间共享数据,但过度使用它们会产生不必要的开销。对于具有高数据共享需求的工作流,考虑替代方案,例如将数据存储在临时文件或数据库中。

社区和支持

  • 参与社区:Airflow 社区提供宝贵的见解和支持。参与论坛和讨论可以帮助您学习最佳实践、解决问题并及时了解新功能或错误修复。
  • 回馈:考虑通过报告问题、分享知识甚至提交代码改进来为开源项目做出贡献。这有助于改善生态系统并促进协作开发环境。