Apache Airflow 任务生命周期

2025 年 6 月 10 日 | 14 分钟阅读

引言

Apache Airflow 是一个强大的开源平台,旨在管理复杂的工作流、调度任务和监控它们的执行。在 Airflow 中,任务是基础组件,使用户能够设计由独立、自包含的工作单元组成的工作流。这种模块化方法在数据工程、ETL 和机器学习管道中至关重要,这些工作流通常由许多相互依赖的步骤组成。

什么是 Airflow 任务?

Airflow 中,任务是最小的执行单元,代表工作流中的一个步骤。每个任务执行一个特定的作业,例如运行脚本、移动数据或发送通知。多个任务可以在 DAG 中以顺序或条件顺序链接在一起,从而创建可以自动化复杂流程并高效处理依赖关系的工作流。

Airflow 任务高度灵活,允许进行广泛的操作,包括:

  • 运行 Python 脚本或 shell 命令
  • 在数据库和云存储之间移动数据
  • 触发通知和警报
  • 在工作流中前进之前检查条件

DAG 中的每个任务都是独立执行的,这使得工作流可以分解为可管理的步骤,并且更容易监控、重试和调试过程的特定部分。

Airflow 任务生命周期和状态

Airflow 中的任务实例

在 Airflow 中,任务实例表示特定 DAG(有向无环图)中任务在特定运行中的特定执行。每个任务实例跟踪该任务在 DAG 工作流中进展时的状态和情况。一个任务实例可以在不同的 DAG 运行中甚至在重试中多次运行,并且每次运行都有其独特的生命周期和独特的历史状态。

Apache Airflow Tasks Lifecycle

任务实例生命周期状态分解

此生命周期图显示了任务实例如何在不同的 Airflow 组件(包括 调度器、执行器工作器)管理下在各种状态之间转换:

  • 无: 任务实例创建但尚未安排执行时的初始状态。这是默认状态,不表示 Airflow 系统有任何主动操作。
  • 已调度: 当任务准备好执行时,它进入“已调度”状态。此状态表示调度器已根据其调度间隔和依赖关系为任务分配了执行。当资源可用时,任务将移动到“已排队”状态。
  • 已排队: 一旦调度器将其添加到队列中并且执行器准备将其分配给工作器,任务就会移动到“已排队”状态。此状态表示任务正在等待资源分配,例如可用的工作器插槽。
  • 上游失败: 如果任务依赖于其他上游任务且这些任务失败,则任务可以进入“上游失败”状态。此状态表示任务将不会执行,因为所需的上游任务未能成功完成。
  • 已移除: 当任务从 DAG 中移除时(无论是由于 DAG 定义的更改还是因为它在工作流中不再需要),将分配此状态。这是一种终止状态,意味着任务将不会执行。
  • 待重新调度: 对于需要重新调度的任务,使用此状态。此状态通常适用于传感器或长时间运行的任务,这些任务等待满足外部条件才能继续。
  • 运行中: 当工作器从队列中取出任务并开始执行时,任务进入“运行中”状态。在此阶段,任务正在积极执行其分配的工作,例如运行代码或移动数据。
  • 成功: 如果任务没有错误地完成其工作,它将进入“成功”状态。
  • 失败: 如果任务遇到错误并未能完成,它将进入“失败”状态。此失败可能是由于运行时错误、未满足的条件或系统问题。根据配置,如果允许重试,任务可能会移动到“待重试”状态。
  • 待重试: 如果任务失败但已配置重试,它将进入“待重试”状态。任务将等待指定的时间间隔,然后重新进入“已调度”状态以进行另一次尝试。如果所有重试都耗尽而未成功,任务将保持“失败”状态。
  • 重新启动中: 此状态表示任务在手动或自动触发后正在重新启动。重新启动后,任务通常会重新进入“已调度”状态。
  • 已关闭: 当任务被手动停止或系统在完成前终止它时,它将进入“已关闭”状态。这可能是由于用户干预、系统错误或阻止任务执行的外部因素造成的。

生命周期图的其他说明

  • 颜色编码
    • 不同的 Airflow 组件参与管理任务状态
      • 调度器: 处理初始调度和依赖关系检查。
      • 执行器: 管理队列并将任务分配给工作器。
      • 工作器: 执行任务并使其在“运行中”、“成功”或“失败”状态之间转换。
  • 流路径
    • 箭头表示状态转换,显示任务在执行期间可以采取的路径。
    • 例如,任务可以从“运行中”变为“失败”,如果启用了重试,它将进入“待重试”状态,然后最终达到“成功”或“失败”状态。
  • 终止状态
    • “成功”、“失败”、“已移除”和“已关闭”等状态是终止状态,这意味着任务实例的生命周期在这些状态下结束。

关系术语

上游和下游任务

  • 上游任务: 必须在另一个任务开始之前完成的任务。换句话说,上游任务是其下游任务的先决条件。
  • 下游任务: 依赖于上游任务完成的任务。它不能在其上游任务成功完成后才开始。
  • 示例:如果任务 A 必须在任务 B 开始之前完成,则任务 A 是任务 B 的上游,任务 B 是任务 A 的下游。

父任务和子任务

  • 父任务: 上游任务的另一个术语。父任务必须在子任务可以继续之前完成。
  • 子任务: 下游任务的另一个术语。它依赖于其父任务的完成才能开始。

依赖关系

  • 依赖: 一种关系,其中一个任务在进行之前依赖于另一个任务的输出或完成。依赖关系控制 DAG 中任务的执行顺序。
  • 在 Airflow 中,通常使用 >>(右移)或 <<(左移)运算符建立依赖关系。

触发规则

  • 触发规则: 一个规则,根据其上游函数的完成状态来确定何时应触发下游任务。默认情况下,Airflow 使用 all_success 触发规则,这意味着所有上游任务都必须成功,下游任务才能开始。
  • 其他触发规则包括
  • all_success: 如果所有上游任务都成功,则运行。
  • all_failed: 如果所有上游任务都失败,则运行。
  • one_success: 如果至少一个上游任务成功,则运行。
  • one_failed: 如果至少一个上游任务失败,则运行。
  • none_failed: 如果没有上游任务失败,则运行。
  • none_skipped: 如果没有上游任务被跳过,则运行。
  • Always: 无论上游任务状态如何,都运行。

分支

  • 分支: 一种根据任务结果或特定条件在 DAG 中创建条件路径的方法。分支允许 DAG 遵循不同的执行路径,仅执行所选路径上的下游任务。
  • BranchPythonOperator 通常用于在 Airflow 中实现分支逻辑。

任务组

  • 任务组: 一种在 DAG 中对相关任务进行分组的方法,以可视化地组织复杂的工作流。任务组允许更轻松地管理依赖关系,并通过将任务分组到集群中来使 DAG 更具可读性。
  • 任务组不改变依赖关系,但为 DAG 提供了更透明的结构。

# 这将 task1 和 task2 分组到 'data_processing' 下

XCom(交叉通信)

  • XCom: Airflow 中用于在任务之间共享数据的机制。XCom(“交叉通信”的缩写)允许任务向共享数据存储推送和拉取少量数据,从而实现函数之间的信息共享。
  • XCom 通常用于在 DAG 中的任务之间传递参数或中间结果。

一对多和多对一关系

  • 一对多: 单个任务具有多个下游任务。这允许一个任务在完成后触发多个依赖函数。
  • 多对一: 多个上游任务都汇聚到一个下游任务。此下游任务等待所有上游任务完成才能开始。

在 Apache Airflow 中,超时用于控制任务或 DAG 在自动终止之前允许运行的时间。这对于管理系统资源、防止任务无限期挂起以及确保工作流在合理时间内完成非常有用。Airflow 提供了多种类型的超时配置,每种配置根据您希望如何处理任务或 DAG 执行时间限制而服务于不同的目的。

Airflow 中的超时类型

任务超时

  • 任务级超时为单个任务的执行设置最长时间限制。
  • 如果任务超出指定时间,它将被终止并标记为失败。
  • 您可以使用任务运算符中的 execution_timeout 参数配置任务超时。
  • 这对于可能挂起或执行时间不可预测的任务特别有用,例如依赖外部系统(例如 API 调用或数据库查询)的任务。
  • 在此示例中,如果 timeout_task 运行时间超过 30 秒,它将被终止并标记为失败。

DAG 超时

  • DAG 级超时为整个 DAG 运行设置最大运行时限制。
  • 这可确保 DAG 中的所有任务在特定时间范围内完成。如果超出时间限制,任何正在运行的任务都将被停止,并且 DAG 运行将被标记为失败。
  • DAG 超时使用 DAG 定义中的 dagrun_timeout 参数设置。
  • 在此示例中,如果任务 1 和任务 2 的总执行时间超过 5 分钟,则 DAG 运行将被终止,并且任何正在进行的任务都将被标记为失败。

运算符特定超时

  • 一些 Airflow 运算符,特别是那些与外部系统交互的运算符,提供自己的超时配置。
  • 例如,HttpSensor 运算符有一个 timeout 参数,它指定在失败之前应等待响应多长时间。
  • 这些运算符特定的超时允许对任务行为进行细粒度控制,而不会影响整体任务或 DAG 超时设置。
  • 在此示例中,如果 HttpSensor 在 20 秒内未收到来自端点的响应,它将失败,从而允许您继续或优雅地处理失败。

带超时的重试延迟

  • 除了设置超时,您还可以配置重试延迟以确定 Airflow 在重试失败任务之前应等待多长时间。
  • 结合超时,这有助于控制执行时间,而不会无限期地重试失败的任务。
  • 重试使用 retries 和 retry_delay 等参数进行配置。
  • 在此示例中,task_with_retry_and_timeout 如果每次尝试超过 30 秒,它将失败。如果失败,它将重试最多 3 次,每次重试之间有 10 秒的延迟。

传感器超时(针对传感器任务)

  • Airflow 中的传感器任务是专门的任务,它们“等待”满足特定条件才能继续。
  • 传感器运算符中的 timeout 参数控制传感器应等待条件满足多长时间才能失败。
  • 如果条件永远无法满足,这对于避免无限期等待至关重要。
  • 在这里,FileSensor 将每 10 秒检查一次 /path/to/file 处是否存在文件。如果在 5 分钟内未找到文件,则传感器任务将失败。

在 Airflow 中使用超时的关键注意事项

  1. 资源管理
    • 超时有助于防止任务消耗资源过长时间,尤其是在它们可能由于外部系统问题而挂起或失败的情况下。
  2. 错误处理
    • 设置超时时,请考虑因超时而导致任务失败的错误处理策略。例如,您可能希望重试任务、发送警报或在下游逻辑中处理失败的任务。
  3. 超时和重试之间的平衡
    • 结合超时和重试可以帮助管理具有间歇性问题的任务。但是,请注意重试允许的总执行时间,尤其是在需要满足严格截止日期的工作流中。
  4. 测试和优化
    • 根据历史任务运行时间和外部系统行为调整超时。这将有助于避免不必要的失败,并允许任务在其通常的运行时间内成功完成。
  5. 生产中的超时
    • 在生产中,使用超时根据观察到的任务设置实际限制

在 Apache Airflow 中,服务级别协议 (SLA) 用于定义和监控任务或工作流完成所需的最长可接受时间。SLA 有助于确保及时的数据可用性,并在任务超出其指定执行窗口时提供警报。它们在及时处理至关重要的生产环境中发挥着重要作用。

SLA 在 Airflow 中如何工作

SLA 本质上是 DAG 中特定任务的时间限制。如果任务超出此时间限制,则被认为是延迟违规。Airflow 中的 SLA 是按任务设置的,如果任务超出其 SLA,Airflow 可以配置为发送警报、记录事件或触发自定义错误处理机制。

在 DAG 中定义任务时,使用 SLA 参数定义 SLA,为该任务在计划开始时间后完成设置时间限制。

SLA 的特点

SLA 违规检测

  • 当任务超出其 SLA 时,会触发 SLA 违规。
  • Airflow 在 UI 中跟踪这些违规,从而轻松监控哪些任务持续未达到其 SLA。

SLA 通知

  • 当发生 SLA 违规时,Airflow 可以发送通知邮件以提醒您延迟。此邮件通知可以在 Airflow 设置中配置。

自动记录 SLA 未达

  • Airflow 会自动记录 SLA 违规,以便您可以在 Airflow Web UI 中查看它们。
  • 这些日志允许您跟踪 SLA 违规模式,有助于故障排除和性能优化。

全局和任务级 SLA

  • SLA 可以为 DAG 中的单个任务或整个 DAG 的全局级别配置,尽管按任务 SLA 更常见。
  • SLA 是任务特定的,使用每个任务中的 SLA 参数设置,指定该任务完成的最长时间。

在 Airflow 中设置 SLA

通过将时间增量对象传递给任务定义中的 sla 参数来设置 SLA。这是一个示例

在此示例中

  • 如果 task_with_sla 完成时间超过 30 秒,则它违反了 SLA。
  • Airflow 会记录 SLA 违规,如果配置了电子邮件通知,则可以发送电子邮件通知。

SLA 通知如何工作

对于 SLA 违规通知,Airflow 依赖于 Airflow 配置文件 (airflow. cfg) 中配置的电子邮件设置。以下是如何设置 SLA 通知

  1. 启用 SLA 电子邮件警报
    • 确保您的 Airflow 配置在 airflow. cfg 中的 [smtp] 下正确设置了 smtp_* 设置。
    • 在每个任务的 email 参数中设置电子邮件地址,或在 Airflow 配置中配置全局电子邮件地址。
  2. 配置 SLA 违规的电子邮件收件人
    • 使用任务定义中的 email 参数指定 SLA 违规警报的收件人。
  3. SLA 通知配置示例

在此设置中

  • 如果 task_with_sla 违反其 SLA,则会向 alert@example.com 发送电子邮件。
  • 可以为任务失败或重试事件配置其他通知。

使用 SLA 的注意事项

  1. 资源限制
    • SLA 应根据任务的实际完成时间设置。过于激进的 SLA 可能会导致不必要的警报。
    • 资源密集型任务可能会由于系统限制而超出 SLA,因此建议根据历史任务性能调整 SLA。
  2. 对下游任务的影响
    • SLA 违规不会直接影响任务依赖关系或下游任务。但是,您可以设置自定义逻辑来处理 SLA 违规,例如触发替代工作流或调整任务重试。
  3. SLA 报告
    • Airflow 具有可通过 Airflow UI 访问的 SLA 报告,该报告提供跨 DAG 运行的 SLA 违规的综合视图。这允许您随着时间的推移监控和优化 DAG 性能。
  4. SLA 未达与任务失败
    • SLA 违规并不意味着任务失败;它只表示任务花费的时间比预期长。任务仍然可以独立于 SLA 成功或失败。
  5. 自定义 SLA 处理
    • Airflow 的默认 SLA 警报可能不足以满足所有情况。对于高级 SLA 处理,您可以添加自定义 Python 逻辑来处理 SLA 违规事件,例如向外部系统发送消息或调整 DAG 参数。
  6. 性能调优
    • 使用 SLA 违规作为性能调整的指标。持续的 SLA 违规可能会突出资源瓶颈、次优代码或其他性能问题。

示例:监控和处理 SLA 违规

如果您需要对 SLA 监控进行更多控制,可以添加自定义回调函数来处理 SLA 违规。此函数可用于在 SLA 违规时记录、通知或采取特定操作。

在此示例中

  • 如果 DAG 中的任何任务超出其 SLA,custom_sla_alert 函数将发送电子邮件警报。
  • 这种方法允许您自定义 SLA 处理,超越 Airflow 中的默认 SLA 行为。

Apache Airflow 中的 SLA 提供了一种为任务完成设置时间限制的方法,确保任务在可接受的时间范围内完成。通过监控 SLA,您可以

  • 设定任务性能预期。
  • 接收任务执行时间超出预期的警报。
  • 识别并解决 DAG 中的瓶颈。
  • 使用自定义 SLA 处理为违规任务创建响应系统。

SLA 是生产管道的强大功能,有助于确保工作流满足组织的性能和时间需求。