Apache Airflow 调度

2025 年 6 月 10 日 | 阅读 8 分钟

引言

Apache Airflow 中的调度是其核心功能,可实现工作流的高效编排。它确保任务在特定时间或时间间隔执行,同时尊重依赖关系并优化资源利用率。凭借强大的调度能力,Airflow 可满足广泛的用例,从简单的基于时间的触发器到复杂的数据驱动型工作流。

Cron 和时间间隔

Apache Airflow 支持使用类 cron 表达式和时间间隔来调度工作流。

  • Cron 表达式: Cron 表达式是用于定义精确计划的灵活字符串。每个表达式由五个字段组成,分别代表不同的时间单位:分钟、小时、月份中的日期、月份和星期几。例如,表达式 0 12 会将任务安排为每天中午运行。以下是字段的细分:
字段允许的值描述
分钟0-59指定分钟
小时0-23指定小时
月份中的日期1-31指定月份中的日期
月份1-12 或 JAN-DEC指定月份
星期几0-6 或 SUN-SAT指定星期几
  • 像 *、- 和 / 这样的特殊字符可用于更高级的调度模式。例如,*/15 表示每 15 分钟运行一次任务。
  • 时间间隔: Airflow 中的时间间隔允许您指定任务执行之间固定的持续时间。这可以通过使用预定义的预设或直接指定持续时间来实现。常见选项包括:
    • @hourly: 每小时运行一次任务。
    • @daily: 每天运行一次任务。
    • 时间差 (hours=6): 每六小时执行一次任务。

这些间隔对于需要一致且可重复的执行周期而无需依赖特定日历日期的工作流特别有用。

Cron 预设

Airflow 为常见的调度需求提供了预定义的 Cron 预设。

  • @daily: 每天午夜 (UTC 时间 00:00) 运行一次。适用于日常数据聚合或报告等任务。
  • @hourly: 每小时在整点运行一次。适用于监控和日志更新等频繁作业。
  • @weekly: 每周日午夜 (UTC 时间 00:00) 运行一次。非常适合每周报告或备份。
  • @monthly: 每月第一天午夜 (UTC 时间 00:00) 运行一次。常用于每月账单或数据归档。
  • @yearly: 每年 1 月 1 日午夜 (UTC 时间 00:00) 运行一次。非常适合年度审计或归档流程。
  • @once: 任务触发器在 DAG 被手动触发或激活后执行一次。适用于一次性操作,如数据迁移。

这些预设简化了常用模式的调度配置,无需手动定义复杂的 Cron 表达式。

时区

Airflow 具有时区感知能力,允许您

  • 配置 DAG 在特定时区运行: 使用 timezone 参数确保工作流与本地工作时间或其他特定时区要求保持一致。
  • 默认使用 UTC 以保持一致性: Airflow 默认使用 UTC,这非常适合跨分布式系统标准化执行时间。
  • 有效处理夏令时更改: 自动调整遵守夏令时的地区的计划,以防止执行时间出现问题。

示例

说明

  • datetime(2023, 1, 1, tzinfo=timezone('US/Eastern')): 在美国东部时间设置开始日期。
  • schedule_interval='@daily': 配置 DAG 每天运行。
  • catchup=False: 确保 DAG 不会执行错过的运行。

Web UI

Airflow 的 Web UI 是一个强大的工具,提供了一个用户友好的界面来管理工作流。可以通过 Web 浏览器访问它,并提供各种功能来高效地监控、调试和配置工作流。

  • 监控 DAG 运行和任务
    • 通过图形视图可视化 DAG 执行。
    • 实时跟踪任务状态(例如,成功、运行中、失败)。
    • 访问每个任务实例的详细日志以调查问题。
  • 修改计划和设置
    • 直接从 UI 调整 schedule_interval 或其他 DAG 配置。
    • 单击即可启用或禁用 DAG。
    • 手动触发 DAG 以进行测试或重新执行。
  • 查看日志和执行状态
    • 访问任务实例的全面日志。
    • 按执行状态(例如,排队、运行中、失败)筛选和排序任务。
    • 浏览执行历史以分析趋势和性能。
  • 调试失败的任务
    • 使用颜色编码的可视化快速识别失败的任务。
    • 直接从日志中检查错误消息和堆栈跟踪。

导航面板

  • DAGs 仪表板
    • 列出所有可用的 DAG 以及它们的当前状态和上次运行时间。
    • 提供对触发、暂停或删除 DAG 等操作的快速访问。
  • 图表视图
    • 将任务显示为有向无环图中的节点,显示依赖关系和执行流程。
  • 树状视图
    • 提供任务随时间推移执行的分层视图,非常适合跟踪多个运行的进度。
  • 甘特图
    • 可视化任务持续时间和重叠,有助于优化性能。
  • 代码视图
    • 允许直接在 UI 中查看 DAG 的 Python 代码。

用户角色和权限

Airflow 支持基于角色的访问控制 (RBAC),允许管理员为不同用户角色定义权限。这可确保对工作流的安全和受控访问。

示例

  • 管理员: 完全访问所有功能。
  • 查看者: 对 DAG 和日志具有只读访问权限。
  • 用户: 触发 DAG 和管理任务实例的权限。

可访问性

Web UI 通常托管在特定端口(默认:8080),可以通过 http://<your-airflow-host>:8080 访问。

概念

时区感知的 DAG

Airflow DAG 可以配置为在特定时区运行,以确保工作流与业务需求保持一致。默认情况下,Airflow 使用 UTC,但您可以根据本地业务运营进行自定义。建议使用 pendulum 库来处理 Airflow 中的时区。

示例

好处

  • 与本地或全球时区精确同步的调度。
  • 无缝处理夏令时更改。

数据感知调度

数据感知调度利用数据集动态地根据数据可用性触发工作流。这种方法确保任务仅在必要数据就绪时执行,从而减少资源浪费并提高效率。

  • 当数据更新时,工作流会自动触发。
  • 消除了对静态基于时间的计划的依赖。
  • 提高对实时数据更改的响应能力。

示例

快速入门

要快速设置 Airflow 中的调度

  • 在 DAG 中定义 schedule_interval: 指定 Cron 表达式或预设以配置 DAG 的运行时间。
  • 使用 Airflow Web UI 监控和调试调度。
    • 访问 Web UI 以查看 DAG 运行和任务状态。
    • 可视化依赖关系和执行流程。
    • 本地测试配置: 使用 airflow dags trigger <dag_id> 命令手动测试 DAG 设置。

示例

Airflow 中的数据集

什么是“数据集”?

Airflow 中的数据集代表一个可以触发工作流的数据单元。它定义了基于数据可用性的依赖关系,而不是基于时间的依赖关系。

  • 您可以为单个 DAG 定义多个数据集作为依赖项。这些数据集可以位于不同的存储系统或具有不同的更新触发器。

示例

在此示例中

  • 仅当 dataset_1 和 dataset_2 都更新后,DAG 才会执行。
  • Airflow 无缝管理依赖关系,确保数据集之间的同步。

使用多个数据集的优势

  • 提高灵活性
    • 使工作流能够依赖于多个数据源或类型。
    • 有助于设计依赖于不同输入的复杂管道。
  • 改进资源利用率
    • DAG 仅在所有数据集都准备好后执行,避免了不必要的任务运行。
  • 实时响应能力
    • 非常适合工作流必须响应来自多个源的动态数据更新的场景。

高级用例

  • 优先处理数据集依赖项
    • 您可以使用任务中的额外逻辑,根据哪个数据集先更新来优先执行。
  • 条件执行
    • 实现条件表达式,根据数据集特定属性(例如,文件大小、更新频率)来控制执行。

示例

  • 跨 DAG 数据集依赖项
    • 多个数据集可以跨不同的 DAG 共享,创建复杂的相互依赖关系,同时保持模块化工作流。
  • 动态数据集更新
    • 使用 Airflow 的 API 根据管道需求动态更新数据集定义和计划。

什么是有效的 URI?

Airflow 中的有效 URI(统一资源标识符)唯一标识一个数据集。它遵循标准结构,以确保一致性和准确的引用。

以下是定义有效 URI 的内容:

有效 URI 的特征

  • 标准格式: URI 应遵循 scheme://path/to/resource 格式。这确保了不同数据集和存储类型之间的统一性。
    • Scheme: 标识协议或存储类型(例如,s3、file、http)。
    • Path: 指定资源的路径,例如文件路径或存储桶位置。
  • 有效 URI 的示例
    • s3://my-bucket/data.csv: 指的是存储在 Amazon S3 存储桶中的文件。
    • file:///path/to/file: 指向系统上的本地文件。
    • http://example.com/resource: 表示可通过 HTTP 访问的资源。

有效 URI 的重要性

  • 唯一性: 通过确保每个数据集都得到明确标识来防止冲突。
  • 互操作性: 支持跨各种存储系统和工具的集成。
  • 一致性: 促进无缝调度和工作流自动化。

关于数据集的额外信息

数据集可以包含元数据,例如:

  • Timestamp: 数据集上次更新的时间。
  • Schema: 数据结构详细信息。
  • Tags: 用于分类的附加上下文。

在 DAG 中使用数据集

多个数据集

DAG 可以依赖多个数据集来触发执行。

示例

向发出的数据集事件附加额外信息

  • 您可以将元数据附加到数据集事件中以获得更丰富的上下文。

从触发的数据集事件中获取信息

  • 在任务中访问触发的数据集事件详细信息以实现动态工作流。

通过 REST API 操作排队的数据集事件

使用 REST API 进行

  • 检查排队的数据集事件。
  • 以编程方式修改或删除事件。

使用条件表达式进行高级数据集调度

通过条件逻辑增强调度

示例

使用示例

通过 DatasetAlias 动态数据事件发出和数据集创建

简化数据集处理,使用别名

组合数据集和基于时间的计划

混合数据和基于时间的触发器以实现灵活的调度

时间表

内置时间表

Airflow 包含以下时间表:

  • 基于间隔的计划: 自动安排任务以固定间隔运行,例如每小时或每天。使用 schedule_interval 参数进行配置。
  • 类 Cron 的计划: 使用 Cron 表达式提供对计划的精确控制。适用于需要特定开始时间和日期的工作流。
  • 通过插件自定义计划: 允许创建用户定义的计时器以满足专业的调度需求,从而提供更大的灵活性和定制性。

时间表比较

要评估哪个时间表最适合您的需求,请考虑以下几点:

  • 效率
    • 基于间隔的时间表非常简单,对于定期任务只需最少的配置。
    • 类 Cron 的计划对于对特定时间点进行精细控制非常高效。
  • 灵活性
    • 自定义时间表提供最大的灵活性,可适应独特的业务需求和边缘情况。
  • 适用于业务需求
    • 为可预测的常规任务选择基于间隔的计划。
    • 当精确计时至关重要时,请选择类 Cron 的计划。