Apache Airflow 任务生命周期2025 年 6 月 10 日 | 14 分钟阅读 引言Apache Airflow 是一个强大的开源平台,旨在管理复杂的工作流、调度任务和监控它们的执行。在 Airflow 中,任务是基础组件,使用户能够设计由独立、自包含的工作单元组成的工作流。这种模块化方法在数据工程、ETL 和机器学习管道中至关重要,这些工作流通常由许多相互依赖的步骤组成。 什么是 Airflow 任务?在 Airflow 中,任务是最小的执行单元,代表工作流中的一个步骤。每个任务执行一个特定的作业,例如运行脚本、移动数据或发送通知。多个任务可以在 DAG 中以顺序或条件顺序链接在一起,从而创建可以自动化复杂流程并高效处理依赖关系的工作流。 Airflow 任务高度灵活,允许进行广泛的操作,包括: - 运行 Python 脚本或 shell 命令
- 在数据库和云存储之间移动数据
- 触发通知和警报
- 在工作流中前进之前检查条件
DAG 中的每个任务都是独立执行的,这使得工作流可以分解为可管理的步骤,并且更容易监控、重试和调试过程的特定部分。 Airflow 任务生命周期和状态Airflow 中的任务实例 在 Airflow 中,任务实例表示特定 DAG(有向无环图)中任务在特定运行中的特定执行。每个任务实例跟踪该任务在 DAG 工作流中进展时的状态和情况。一个任务实例可以在不同的 DAG 运行中甚至在重试中多次运行,并且每次运行都有其独特的生命周期和独特的历史状态。  任务实例生命周期状态分解 此生命周期图显示了任务实例如何在不同的 Airflow 组件(包括 调度器、执行器 和 工作器)管理下在各种状态之间转换: - 无: 任务实例创建但尚未安排执行时的初始状态。这是默认状态,不表示 Airflow 系统有任何主动操作。
- 已调度: 当任务准备好执行时,它进入“已调度”状态。此状态表示调度器已根据其调度间隔和依赖关系为任务分配了执行。当资源可用时,任务将移动到“已排队”状态。
- 已排队: 一旦调度器将其添加到队列中并且执行器准备将其分配给工作器,任务就会移动到“已排队”状态。此状态表示任务正在等待资源分配,例如可用的工作器插槽。
- 上游失败: 如果任务依赖于其他上游任务且这些任务失败,则任务可以进入“上游失败”状态。此状态表示任务将不会执行,因为所需的上游任务未能成功完成。
- 已移除: 当任务从 DAG 中移除时(无论是由于 DAG 定义的更改还是因为它在工作流中不再需要),将分配此状态。这是一种终止状态,意味着任务将不会执行。
- 待重新调度: 对于需要重新调度的任务,使用此状态。此状态通常适用于传感器或长时间运行的任务,这些任务等待满足外部条件才能继续。
- 运行中: 当工作器从队列中取出任务并开始执行时,任务进入“运行中”状态。在此阶段,任务正在积极执行其分配的工作,例如运行代码或移动数据。
- 成功: 如果任务没有错误地完成其工作,它将进入“成功”状态。
- 失败: 如果任务遇到错误并未能完成,它将进入“失败”状态。此失败可能是由于运行时错误、未满足的条件或系统问题。根据配置,如果允许重试,任务可能会移动到“待重试”状态。
- 待重试: 如果任务失败但已配置重试,它将进入“待重试”状态。任务将等待指定的时间间隔,然后重新进入“已调度”状态以进行另一次尝试。如果所有重试都耗尽而未成功,任务将保持“失败”状态。
- 重新启动中: 此状态表示任务在手动或自动触发后正在重新启动。重新启动后,任务通常会重新进入“已调度”状态。
- 已关闭: 当任务被手动停止或系统在完成前终止它时,它将进入“已关闭”状态。这可能是由于用户干预、系统错误或阻止任务执行的外部因素造成的。
生命周期图的其他说明 - 颜色编码
- 不同的 Airflow 组件参与管理任务状态
- 调度器: 处理初始调度和依赖关系检查。
- 执行器: 管理队列并将任务分配给工作器。
- 工作器: 执行任务并使其在“运行中”、“成功”或“失败”状态之间转换。
- 流路径
- 箭头表示状态转换,显示任务在执行期间可以采取的路径。
- 例如,任务可以从“运行中”变为“失败”,如果启用了重试,它将进入“待重试”状态,然后最终达到“成功”或“失败”状态。
- 终止状态
- “成功”、“失败”、“已移除”和“已关闭”等状态是终止状态,这意味着任务实例的生命周期在这些状态下结束。
关系术语上游和下游任务- 上游任务: 必须在另一个任务开始之前完成的任务。换句话说,上游任务是其下游任务的先决条件。
- 下游任务: 依赖于上游任务完成的任务。它不能在其上游任务成功完成后才开始。
- 示例:如果任务 A 必须在任务 B 开始之前完成,则任务 A 是任务 B 的上游,任务 B 是任务 A 的下游。
父任务和子任务- 父任务: 上游任务的另一个术语。父任务必须在子任务可以继续之前完成。
- 子任务: 下游任务的另一个术语。它依赖于其父任务的完成才能开始。
依赖关系- 依赖: 一种关系,其中一个任务在进行之前依赖于另一个任务的输出或完成。依赖关系控制 DAG 中任务的执行顺序。
- 在 Airflow 中,通常使用 >>(右移)或 <<(左移)运算符建立依赖关系。
触发规则- 触发规则: 一个规则,根据其上游函数的完成状态来确定何时应触发下游任务。默认情况下,Airflow 使用 all_success 触发规则,这意味着所有上游任务都必须成功,下游任务才能开始。
- 其他触发规则包括
- all_success: 如果所有上游任务都成功,则运行。
- all_failed: 如果所有上游任务都失败,则运行。
- one_success: 如果至少一个上游任务成功,则运行。
- one_failed: 如果至少一个上游任务失败,则运行。
- none_failed: 如果没有上游任务失败,则运行。
- none_skipped: 如果没有上游任务被跳过,则运行。
- Always: 无论上游任务状态如何,都运行。
分支- 分支: 一种根据任务结果或特定条件在 DAG 中创建条件路径的方法。分支允许 DAG 遵循不同的执行路径,仅执行所选路径上的下游任务。
- BranchPythonOperator 通常用于在 Airflow 中实现分支逻辑。
任务组- 任务组: 一种在 DAG 中对相关任务进行分组的方法,以可视化地组织复杂的工作流。任务组允许更轻松地管理依赖关系,并通过将任务分组到集群中来使 DAG 更具可读性。
- 任务组不改变依赖关系,但为 DAG 提供了更透明的结构。
# 这将 task1 和 task2 分组到 'data_processing' 下 XCom(交叉通信)- XCom: Airflow 中用于在任务之间共享数据的机制。XCom(“交叉通信”的缩写)允许任务向共享数据存储推送和拉取少量数据,从而实现函数之间的信息共享。
- XCom 通常用于在 DAG 中的任务之间传递参数或中间结果。
一对多和多对一关系- 一对多: 单个任务具有多个下游任务。这允许一个任务在完成后触发多个依赖函数。
- 多对一: 多个上游任务都汇聚到一个下游任务。此下游任务等待所有上游任务完成才能开始。
在 Apache Airflow 中,超时用于控制任务或 DAG 在自动终止之前允许运行的时间。这对于管理系统资源、防止任务无限期挂起以及确保工作流在合理时间内完成非常有用。Airflow 提供了多种类型的超时配置,每种配置根据您希望如何处理任务或 DAG 执行时间限制而服务于不同的目的。 Airflow 中的超时类型任务超时- 任务级超时为单个任务的执行设置最长时间限制。
- 如果任务超出指定时间,它将被终止并标记为失败。
- 您可以使用任务运算符中的 execution_timeout 参数配置任务超时。
- 这对于可能挂起或执行时间不可预测的任务特别有用,例如依赖外部系统(例如 API 调用或数据库查询)的任务。
- 在此示例中,如果 timeout_task 运行时间超过 30 秒,它将被终止并标记为失败。
DAG 超时- DAG 级超时为整个 DAG 运行设置最大运行时限制。
- 这可确保 DAG 中的所有任务在特定时间范围内完成。如果超出时间限制,任何正在运行的任务都将被停止,并且 DAG 运行将被标记为失败。
- DAG 超时使用 DAG 定义中的 dagrun_timeout 参数设置。
- 在此示例中,如果任务 1 和任务 2 的总执行时间超过 5 分钟,则 DAG 运行将被终止,并且任何正在进行的任务都将被标记为失败。
运算符特定超时- 一些 Airflow 运算符,特别是那些与外部系统交互的运算符,提供自己的超时配置。
- 例如,HttpSensor 运算符有一个 timeout 参数,它指定在失败之前应等待响应多长时间。
- 这些运算符特定的超时允许对任务行为进行细粒度控制,而不会影响整体任务或 DAG 超时设置。
- 在此示例中,如果 HttpSensor 在 20 秒内未收到来自端点的响应,它将失败,从而允许您继续或优雅地处理失败。
带超时的重试延迟- 除了设置超时,您还可以配置重试延迟以确定 Airflow 在重试失败任务之前应等待多长时间。
- 结合超时,这有助于控制执行时间,而不会无限期地重试失败的任务。
- 重试使用 retries 和 retry_delay 等参数进行配置。
- 在此示例中,task_with_retry_and_timeout 如果每次尝试超过 30 秒,它将失败。如果失败,它将重试最多 3 次,每次重试之间有 10 秒的延迟。
传感器超时(针对传感器任务)- Airflow 中的传感器任务是专门的任务,它们“等待”满足特定条件才能继续。
- 传感器运算符中的 timeout 参数控制传感器应等待条件满足多长时间才能失败。
- 如果条件永远无法满足,这对于避免无限期等待至关重要。
- 在这里,FileSensor 将每 10 秒检查一次 /path/to/file 处是否存在文件。如果在 5 分钟内未找到文件,则传感器任务将失败。
在 Airflow 中使用超时的关键注意事项 - 资源管理
- 超时有助于防止任务消耗资源过长时间,尤其是在它们可能由于外部系统问题而挂起或失败的情况下。
- 错误处理
- 设置超时时,请考虑因超时而导致任务失败的错误处理策略。例如,您可能希望重试任务、发送警报或在下游逻辑中处理失败的任务。
- 超时和重试之间的平衡
- 结合超时和重试可以帮助管理具有间歇性问题的任务。但是,请注意重试允许的总执行时间,尤其是在需要满足严格截止日期的工作流中。
- 测试和优化
- 根据历史任务运行时间和外部系统行为调整超时。这将有助于避免不必要的失败,并允许任务在其通常的运行时间内成功完成。
- 生产中的超时
在 Apache Airflow 中,服务级别协议 (SLA) 用于定义和监控任务或工作流完成所需的最长可接受时间。SLA 有助于确保及时的数据可用性,并在任务超出其指定执行窗口时提供警报。它们在及时处理至关重要的生产环境中发挥着重要作用。 SLA 在 Airflow 中如何工作SLA 本质上是 DAG 中特定任务的时间限制。如果任务超出此时间限制,则被认为是延迟或违规。Airflow 中的 SLA 是按任务设置的,如果任务超出其 SLA,Airflow 可以配置为发送警报、记录事件或触发自定义错误处理机制。 在 DAG 中定义任务时,使用 SLA 参数定义 SLA,为该任务在计划开始时间后完成设置时间限制。 SLA 的特点SLA 违规检测 - 当任务超出其 SLA 时,会触发 SLA 违规。
- Airflow 在 UI 中跟踪这些违规,从而轻松监控哪些任务持续未达到其 SLA。
SLA 通知 - 当发生 SLA 违规时,Airflow 可以发送通知邮件以提醒您延迟。此邮件通知可以在 Airflow 设置中配置。
自动记录 SLA 未达 - Airflow 会自动记录 SLA 违规,以便您可以在 Airflow Web UI 中查看它们。
- 这些日志允许您跟踪 SLA 违规模式,有助于故障排除和性能优化。
全局和任务级 SLA - SLA 可以为 DAG 中的单个任务或整个 DAG 的全局级别配置,尽管按任务 SLA 更常见。
- SLA 是任务特定的,使用每个任务中的 SLA 参数设置,指定该任务完成的最长时间。
在 Airflow 中设置 SLA通过将时间增量对象传递给任务定义中的 sla 参数来设置 SLA。这是一个示例 在此示例中 - 如果 task_with_sla 完成时间超过 30 秒,则它违反了 SLA。
- Airflow 会记录 SLA 违规,如果配置了电子邮件通知,则可以发送电子邮件通知。
SLA 通知如何工作对于 SLA 违规通知,Airflow 依赖于 Airflow 配置文件 (airflow. cfg) 中配置的电子邮件设置。以下是如何设置 SLA 通知 - 启用 SLA 电子邮件警报
- 确保您的 Airflow 配置在 airflow. cfg 中的 [smtp] 下正确设置了 smtp_* 设置。
- 在每个任务的 email 参数中设置电子邮件地址,或在 Airflow 配置中配置全局电子邮件地址。
- 配置 SLA 违规的电子邮件收件人
- 使用任务定义中的 email 参数指定 SLA 违规警报的收件人。
- SLA 通知配置示例
在此设置中 - 如果 task_with_sla 违反其 SLA,则会向 alert@example.com 发送电子邮件。
- 可以为任务失败或重试事件配置其他通知。
使用 SLA 的注意事项- 资源限制
- SLA 应根据任务的实际完成时间设置。过于激进的 SLA 可能会导致不必要的警报。
- 资源密集型任务可能会由于系统限制而超出 SLA,因此建议根据历史任务性能调整 SLA。
- 对下游任务的影响
- SLA 违规不会直接影响任务依赖关系或下游任务。但是,您可以设置自定义逻辑来处理 SLA 违规,例如触发替代工作流或调整任务重试。
- SLA 报告
- Airflow 具有可通过 Airflow UI 访问的 SLA 报告,该报告提供跨 DAG 运行的 SLA 违规的综合视图。这允许您随着时间的推移监控和优化 DAG 性能。
- SLA 未达与任务失败
- SLA 违规并不意味着任务失败;它只表示任务花费的时间比预期长。任务仍然可以独立于 SLA 成功或失败。
- 自定义 SLA 处理
- Airflow 的默认 SLA 警报可能不足以满足所有情况。对于高级 SLA 处理,您可以添加自定义 Python 逻辑来处理 SLA 违规事件,例如向外部系统发送消息或调整 DAG 参数。
- 性能调优
- 使用 SLA 违规作为性能调整的指标。持续的 SLA 违规可能会突出资源瓶颈、次优代码或其他性能问题。
示例:监控和处理 SLA 违规 如果您需要对 SLA 监控进行更多控制,可以添加自定义回调函数来处理 SLA 违规。此函数可用于在 SLA 违规时记录、通知或采取特定操作。 在此示例中 - 如果 DAG 中的任何任务超出其 SLA,custom_sla_alert 函数将发送电子邮件警报。
- 这种方法允许您自定义 SLA 处理,超越 Airflow 中的默认 SLA 行为。
Apache Airflow 中的 SLA 提供了一种为任务完成设置时间限制的方法,确保任务在可接受的时间范围内完成。通过监控 SLA,您可以 - 设定任务性能预期。
- 接收任务执行时间超出预期的警报。
- 识别并解决 DAG 中的瓶颈。
- 使用自定义 SLA 处理为违规任务创建响应系统。
SLA 是生产管道的强大功能,有助于确保工作流满足组织的性能和时间需求。
|