Apache Airflow 编写插件

2025年6月8日 | 阅读 8 分钟

引言

Apache Airflow 是一个强大而灵活的平台,用于以编程方式编写、调度和监控工作流。Airflow 的可扩展性是其最强大的特性之一,它允许用户将其适应广泛的应用场景。通过插件扩展 Airflow 功能的一种方式是利用插件。

有什么用?

Apache Airflow 中的插件旨在扩展平台的核心功能。它们允许您:

  • 集成外部系统:添加自定义操作符、钩子和传感器以与第三方系统进行交互。
  • 定制 Web 界面:通过添加新的视图、菜单项或仪表板元素来扩展或修改 Airflow UI。
  • 添加自定义组件:定义宏、执行器、身份验证后端或日志记录器。
  • 简化可重用性:跨不同的 Airflow 部署打包和分发自定义逻辑。

使用插件可确保您无需直接修改 Airflow 源代码,从而使升级更容易并最大程度地降低冲突风险。

为什么要基于 Airflow 构建?

基于 Airflow 构建具有多项优势:

  • 可扩展性:Airflow 的分布式架构支持跨多个节点扩展工作流。
  • 标准化:Airflow 的 DAG(有向无环图)范例确保工作流以清晰且可重复的方式进行定义。
  • 灵活性:插件架构使您无需更改 Airflow 的核心功能即可将其适应特定需求。
  • 社区支持:Airflow 庞大的用户群和详尽的文档使查找插件开发资源和示例变得更加容易。
  • 模块化设计:Airflow 鼓励模块化开发,允许在团队或组织之间共享和重用插件。

插件何时(重新)加载?

Airflow 插件在 Airflow 应用程序生命周期的特定时间加载。理解插件何时以及如何加载对于开发和部署都至关重要。详细信息如下:

调度器和工作节点

  • 插件在调度器和工作节点进程启动时加载到内存中。
  • 这确保了自定义组件(如操作符、钩子和传感器)在任务执行期间可用。
  • 这些插件组件的任何更新都需要重新启动调度器和工作节点进程才能生效。

Web 服务器

  • 修改 Airflow 用户界面的插件(例如添加新视图、Flask 蓝图或菜单项)会在 Web 服务器启动或重新加载时加载。
  • 如果您启用了自动重新加载(下文将讨论),Web 服务器将能够检测并应用插件的更改,而无需完全重新启动。
  • 加载过程包括:
    • 初始化 Flask 蓝图。
    • 注册菜单链接和操作符额外链接。
    • 加载静态资源(如果包含在插件中)。

自动重新加载

  • 在开发过程中,频繁更改插件可能会使重新启动 Web 服务器变得繁琐。启用自动重新加载可以解决此问题。
  • 配置
    • 在 airflow.cfg 文件的 [webserver] 部分设置 reload_on_plugin_change 参数。
    • [webserver]
      reload_on_plugin_change = True
    • 这使得 Web 服务器能够监视插件目录中的更改并动态重新加载插件。

注意事项

  • 自动重新加载适用于开发环境,出于潜在的性能开销考虑,不应在生产环境中使用。

插件版本控制

正确的版本控制和依赖项管理对于确保 Airflow 插件的无缝集成和功能至关重要。以下是详细的注意事项:

需要重启

  • 当插件更新时,Airflow 不会在所有组件中自动重新加载插件。
  • 如果修改了插件,则需要重新启动调度器、工作节点和 Web 服务器以加载新版本。不重新启动,任务可能会继续使用过时的逻辑,从而导致不一致或失败。

语义化版本控制

  • 使用语义化版本控制(例如 1.0.0、1.1.0、2.0.0)来跟踪插件的更改。
  • 遵循此结构:
    • 主版本:引入了破坏性更改。
    • 次版本:添加新功能,而不破坏现有功能。
    • 补丁版本:修复错误或进行小的改进。
  • 示例
    • 添加自定义操作符的新功能可能会将版本从 1.0.0 增加到 1.1.0。
    • 现有钩子中的错误修复可能会将版本从 1.1.0 更新到 1.1.1。

依赖管理

  • 在插件中明确定义和管理依赖项,以避免冲突。
    • 对于打包的插件,请使用 requirements.txt 文件或在 setup.py 的 install_requires 部分定义依赖项。
    • 示例 setup.py 依赖项配置
  • 确保依赖项不会与 Airflow 的核心依赖项或环境中的其他插件冲突。

部署前测试

  • 在镜像生产环境的暂存环境中测试更新的插件。
  • 验证以下内容:
    • 使用自定义操作符、传感器或钩子的任务按预期执行。
    • UI 组件(如自定义视图或仪表板)在 Web 服务器中正确显示。
    • Airflow 组件启动期间没有意外的错误或警告日志。

变更日志维护

维护一个 CHANGELOG 文件,以记录插件每个版本的更新、错误修复和破坏性更改。

示例条目

Interface

Airflow 中的插件接口很简单。您通过创建一个继承自 AirflowPlugin 的 Python 类来定义一个插件。此类充当要注册到 Airflow 的所有组件的容器。

AirflowPlugin 的属性

属性描述
name插件的唯一名称。
operators要注册的自定义操作符列表。
sensors要注册的自定义传感器列表。
钩子 (hooks)要注册的自定义钩子列表。
executors要注册的自定义执行器列表。
macros要注册的自定义宏列表。
flask_blueprints用于扩展 Web 服务器 UI 的 Flask 蓝图列表。
menu_linksAirflow Web 界面的自定义菜单项列表。
global_operator_extra_linksUI 中所有操作符可用的全局链接列表。
operator_extra_linksUI 中特定于操作符的链接列表。

示例

这是一个简单的 Airflow 插件示例,它定义了一个自定义操作符和一个新的 Web 视图。

文件

使用步骤

  1. 将 my_plugin.py 保存在 Airflow 安装的插件目录中。
  2. 重新启动 Airflow Web 服务器和调度器。
  3. 在 DAG 中使用 MyCustomOperator,或通过 http://<your-airflow-host>/my_plugin/ping 访问自定义视图。

排除视图免受 CSRF 保护

Airflow 的 Web 服务器强制对所有视图执行跨站请求伪造(CSRF)保护。此安全功能可防止用户通过他们信任的 Web 应用程序发送未经授权的命令。但是,在某些情况下,您可能希望将特定视图(例如通过插件添加的 API 端点)排除在 CSRF 保护之外。这可以通过配置 airflow.cfg 文件中的 Flask 应用来完成。

排除视图的步骤

  1. 识别要排除的视图
    • 确定要从 CSRF 保护中排除的特定视图或端点。例如,如果您在自定义插件 my_plugin 中添加了一个名为 ping 的视图,那么这就是需要配置的视图。
  2. 修改 airflow.cfg 文件
    • 在 airflow.cfg 文件中的 [webserver] 部分下添加 csrf_exempt_list 配置。指定要排除的视图的完全限定名称。
      示例配置
      csrf_exempt_list = my_plugin.ping
      • my_plugin 指的是定义视图的插件名称。
      • Ping 是处理端点的视图或函数的名称。
  3. 重新启动 Web 服务器
    • airflow.cfg 文件的更改需要重新启动 Airflow Web 服务器才能使新配置生效。

注意事项

  • 安全含义
    • CSRF 保护是一项关键的安全措施。应仅在必要时才将视图排除在此保护之外,并且仅限于受信任的端点。
    • 确保排除的视图不处理敏感操作,否则应采取额外的安全措施,如基于令牌的身份验证或 IP 允许列表。
  • 测试
    • 在更改配置后,验证排除的视图是否按预期工作。
    • 使用 Curl 或 Postman 等工具测试到该端点的请求,并确认不再应用 CSRF 保护。
  • 日志记录和监控
    • 监视日志,以确保排除的视图未被滥用或以意外方式访问。
    • 为排除的端点启用额外的日志记录,以跟踪使用情况并检测潜在的滥用。

用例示例

假设您有一个自定义插件 my_plugin,其中定义了一个简单的健康检查端点,如下所示:

在此示例中

  • ping 端点用于健康检查。
  • 由于它不修改任何数据或执行敏感操作,因此您决定将其排除在 CSRF 保护之外。

更新 airflow.cfg 文件

重新启动 Airflow Web 服务器后,ping 端点将不再强制执行 CSRF 保护,允许在没有 CSRF 令牌的情况下进行请求。

插件作为 Python 包

为了更好地组织和分发,您可以将插件打包为 Python 模块。这对于在组织内共享插件或将其发布到 PyPI 特别有用。

setup.py

使用以下命令安装插件:

自动重新加载 Web 服务器

在开发过程中,您可以启用 Airflow Web 服务器的自动重新加载,以反映插件的更改,而无需手动重新启动服务器。此功能通过减少重复重启的需要来简化开发流程。

启用自动重新加载的步骤

  1. 更新 airflow.cfg 文件
    • 在 Airflow 安装目录中找到 airflow.cfg 文件。
    • 在 [webserver] 部分下添加或修改 reload_on_plugin_change 设置:
    • [webserver]
      reload_on_plugin_change = True
  2. 重新启动 Web 服务器
    • 更新配置后,重新启动 Airflow Web 服务器以应用更改。
  3. 修改插件文件
    • 修改您的插件文件(例如,添加或更新操作符、钩子或 UI 组件)。
    • Web 服务器将自动检测更改并重新加载受影响的插件,而无需手动重启。

局限性

  • 快速迭代
    • 自动重新加载允许开发人员几乎立即看到其更改的效果,从而提高插件开发期间的生产力。
  • 更改范围
    • 插件目录中的 Python 文件更改会被监视。
    • 非 Python 文件(如静态资源或模板)可能不会触发重新加载,并且可能需要手动刷新浏览器。

示例工作流程

在插件目录中创建自定义插件。

添加新功能,例如自定义操作符,并保存文件。

观察 Web 服务器自动重新加载插件,并在 Airflow UI 中或在任务执行期间测试更改。

故障排除

在开发或部署 Airflow 插件时,您可能会遇到问题。

以下是一些常见问题及解决方案:

  • 插件未加载
    • 确保插件文件位于插件目录中。
    • 验证 AirflowPlugin 类中的 name 属性是否唯一。
    • 在启动期间检查 Airflow 日志以查看错误。
  • Web 服务器错误
    • 确认 Flask 蓝图和路由已正确定义。
    • 在使用自定义视图时,检查 CSRF 保护问题。
  • 任务错误
    • 通过记录执行上下文来调试自定义操作符或钩子。
    • 验证是否将所有必需的参数传递给操作符。
  • 依赖项冲突
    • 使用虚拟环境隔离依赖项。
    • 在打包插件的 setup.py 文件中指定版本约束。
  • 性能问题
    • 如果您的插件与 Airflow 元数据数据库交互,请优化数据库查询。
    • 避免在 Web 服务器或调度器进程中执行繁重计算。