Airflow 的公共接口

2025年6月11日 | 阅读 8 分钟

引言

Apache Airflow 的公共接口提供了一种与平台组件交互的结构化方式。该接口旨在保持稳定性,确保向后兼容性和一致的功能。公共接口包括明确用于开发者和系统集成商外部使用的模块、类、函数和实用程序。

使用 Airflow 公共接口

Airflow 的公共接口作为与其核心功能交互的入口点,无论是定义 DAG、管理任务执行还是扩展 Airflow 的功能。通过使用公共 API 和类,开发者可以确保与未来版本的 Airflow 兼容。

示例

假设您想通过公共接口以编程方式创建 DAG。明确从其公共路径导入必要的组件

使用公共接口进行 DAG 作者

DAG 作者主要与 Airflow 的公共接口交互,以定义工作流、管理参数和处理依赖项。

DAG

有向无环图 (DAG) 是 Airflow 中的核心抽象。Airflow。它们定义了任务的执行顺序。

该模块提供了 DAG 类来构建工作流。

示例

Airflow. Models. ragbag

DAGBag 管理 DAG 的集合,从而可以从 Python 文件加载和解析 DAG。

示例

参数可用于通过允许对 DAG 的行为进行参数化来动态化 DAG。

示例

表示 DAG 的特定运行。

示例

从 airflow. Models.dag run import DagRun

运算符

Operators 是工作流的构建块,代表单个任务。它们定义了特定任务将做什么以及如何执行。

示例

任务实例

任务实例代表 DAG 运行中单个任务的执行。它们存储有关任务执行的元数据,包括其状态、开始时间和日志。

Airflow. Models. task instance

提供对任务实例详细信息的访问,包括状态和日志。

示例

任务实例键

TaskInstanceKey 唯一标识任务实例。它由 DAG ID、任务 ID 和执行日期或运行 ID 组成。

Airflow. Models. taskinstancekey

示例

钩子

Hooks 提供与外部系统的接口,例如数据库或云服务。

  • Airflow.hooks: 公共 hooks 允许与 S3、MySQL 或 HTTP API 等服务进行交互。

示例

公共 Airflow 实用程序

Airflow 包含一些用于高级功能的实用程序,使用户能够有效地管理配置、在任务之间共享少量数据、处理异常以及管理任务状态。以下是对关键实用程序及其使用示例的详细解释。

连接

Connections 定义了 Airflow 与数据库、API 和消息代理等外部系统之间的链接。这些通常在 Airflow UI 中配置,但也可以以编程方式创建。

  • 模块: airflow.models.connection

示例

变量

Variables 是键值对,用于存储可在 DAG 中的任务之间访问的动态值。它们对于存储敏感信息、配置设置或运行时参数非常有用。

  • 模块: airflow.models.variable

示例

XComs

XComs(跨通信的缩写)允许在 DAG 运行期间在任务之间共享少量数据。它们存储在 Airflow 元数据数据库中。

  • 模块: airflow.models.com

示例

公共 Airflow 实用程序

airflow. Models. connection

Connection 类表示外部系统的连接配置。这些配置可以通过 Airflow UI 或 CLI 进行管理。

Airflow. Models.variable

Variable 类支持存储和检索键值对以进行动态配置。

示例

XCom 类允许任务实例交换少量数据。数据可以在任务之间推送和拉取。

公共异常

airflow.exceptions

airflow.exceptions 模块提供了一系列内置异常类,用于以结构化的方式处理错误。这些异常包括

  • AirflowException: Airflow 特定问题的通用异常。
  • DagNotFound: 当找不到请求的 DAG 时引发。
  • TaskInstanceNotFound: 当找不到任务实例时引发。
  • AirflowSensorTimeout: 当传感器任务超过其超时时间时引发。
  • AirflowSkipException: 用于在执行期间跳过任务。
  • AirflowDagCycleException: 当在 DAG 中检测到循环依赖时引发。

这些异常的关键特性包括详细的错误消息、与 Airflow 日志系统的集成以及与任务重试机制的兼容性。这些异常可用于在自定义工作流和插件中实现健壮的错误处理和日志记录策略。

示例

公共实用程序类

airflow.Utils. state

airflow.Utils. state 模块提供常量和实用函数,用于管理和检查 DAG 和任务的状态。关键状态包括

  • State.SUCCESS: 表示执行成功。
  • State.FAILED: 表示任务或 DAG 失败。
  • State.RUNNING: 表示任务或 DAG 正在运行。
  • State.SKIPPED: 表示任务被故意跳过。
  • State.UP_FOR_RETRY: 表示任务因先前的失败正在重试。
  • State.UPSTREAM_FAILED: 表示任务的上游依赖项失败,阻止了其执行。
  • State.REMOVED: 表示任务已从 DAG 定义中移除。

实用函数包括

  • State.is_stateful(state): 确定一个状态是否为预定义的 States 之一。
  • State.task_state_transitions: 定义任务的有效状态转换。
  • State.dag_state_transitions: 定义 DAG 的有效状态转换。

这些实用程序有助于构建具有状态感的工作流并在任务或 DAG 状态的基础上实现自定义逻辑。

触发器

Triggers 提供了一种在 Airflow 中实现异步操作的机制。它们使任务能够等待外部事件或条件,而不会阻塞工作节点资源。这对于实现依赖于实时事件或具有长时间等待周期的工作流很有用。

Airflow. triggers

airflow. trigger 模块包含内置触发器和创建自定义触发器的工具。内置触发器支持诸如等待文件、监视数据库条件或侦听 API 响应之类的用例。

自定义触发器示例

触发器可以通过传感器运算符或其他异步兼容运算符与任务关联。

Timetables

Timetables 提供了一种灵活的方式来为 DAG 定义自定义计划,超越 cron 表达式。它们允许对执行间隔和计时逻辑进行细粒度控制。

airflow.timetables

airflow. Timetables 模块包括用于创建和管理 timetables 的基类和实用程序。开发者可以通过继承 airflow.timetables.base.Timetable 来定义自定义 timetables,并实现必要的方法。

自定义 timetable 示例

监听器

Listeners 允许开发者挂钩 Airflow 中的各种生命周期事件,例如任务状态更改或 DAG 执行。它们提供了一个可扩展性点,可以在工作流执行期间执行自定义逻辑。

注册 listener 的示例

Listeners 可以全局注册或在特定级别注册,从而在事件处理方面提供灵活性。

Extra Links

Extra Links 允许向 Airflow Web UI 添加自定义链接到任务实例。这些链接可以将用户引导至外部系统或提供额外的任务相关信息。

BaseOperatorLink 类允许开发者定义出现在 Airflow UI 中的自定义链接。这些链接与特定的运算符或任务相关联。

自定义运算符链接示例

然后可以将自定义链接与运算符关联

在 UI 中查看任务实例时,将显示自定义链接。

扩展 Airflow 功能

Airflow 的模块化架构鼓励定制和扩展。开发者可以使用公共接口来

创建自定义运算符

继承自 airflow. Models.BaseOperator 并实现自定义逻辑。

编写自定义 Hooks

  • 继承 airflow.hooks.base.BaseHook 以添加对新服务或协议的支持。

开发插件

  • 创建可重用的插件来添加自定义视图、运算符或 hooks,而无需修改核心 Airflow 代码。

Apache Airflow 的公共接口不包含哪些内容?

公共接口明确排除

  1. 私有模块和类
    • 任何未明确记录为公共接口一部分的模块、类或函数,都不保证保持稳定。
    • 例如,airflow.models.dagrun 的内部结构可能在不同版本之间发生变化。
  2. 内部实现细节
    • 公共类的代码结构、命名约定或内部方法未标记为外部使用。
  3. 未记录的功能
    • 未包含在官方文档中的实验性或内部功能。
  4. 直接访问元数据数据库
    • 虽然可以直接查询元数据数据库,但这会绕过 Airflow API,并存在兼容性风险。