Apache Airflow 执行器

9 Jun 2025 | 9分钟阅读

引言

Apache Airflow 是一个开源平台,它允许开发人员以有向无环图 (DAG) 的形式以编程方式编写、调度和监控工作流。执行器在定义 Airflow 如何执行任务方面起着至关重要的作用,并且有多种技术和运算符可用于处理任务依赖关系和自定义设置。

执行器负责在 Airflow DAG 中执行任务。它们定义了任务的运行方式和位置,并管理执行所需的基础设施。

执行器类型

SequentialExecutor

  • 描述
    • 新 Airflow 安装的默认执行器。
    • 顺序执行任务,一次一个。
  • 用例
    • 适用于开发、测试或最小工作负载,这些工作负载不需要任务并发。
  • 局限性
    • 不支持并行。
    • 不适用于生产环境。

LocalExecutor

  • 描述
    • 在单台机器上实现并行任务执行。
    • 利用 Python 的 multiprocessing 库同时运行多个任务。
  • 用例
    • 适用于可以在单台主机上运行的中等规模工作流。
    • 在不需要扩展到单台机器以上的生产环境中很常见。
  • 局限性
    • 仅限于单台机器上的可用资源。

CeleryExecutor

  • 描述
    • 一个分布式执行器,可将任务执行扩展到多个工作节点。
    • 利用 Celery,一个分布式任务队列,并需要一个消息代理,如 RabbitMQRedis
  • 用例
    • 最适合需要高并发和任务隔离的大规模分布式工作流。
  • 组成部分
    • 工作节点: 执行任务。
    • 消息代理: 管理任务队列和通信。
    • 结果后端: 存储任务执行结果(例如,数据库或文件存储)。
  • 局限性
    • 需要额外设置和管理 Celery、消息代理和后端服务。

KubernetesExecutor

  • 描述
    • 为任务执行动态启动 Kubernetes pod
    • 与容器化工作流和 Kubernetes 集群无缝集成。
  • 用例
    • 非常适合云原生、容器化的环境。
    • 通过利用 Kubernetes 的编排能力进行水平扩展。
  • 优点
    • 通过专用 pod 为每个任务提供资源隔离。
    • 根据任务数量自动扩展资源。
  • 局限性
    • 需要 Kubernetes 集群并熟悉 Kubernetes 概念。
    • 设置和维护集群的额外开销。

自定义执行器

  • 描述
    • 可以通过扩展 Airflow 中的 BaseExecutor 类来开发自定义执行器。
  • 用例
    • 对于内置执行器无法满足的特定执行需求非常有用。
    • 例如,与专有作业调度程序或资源管理器集成。
  • 实现步骤
    • 继承 BaseExecutor。
    • 实现必需的方法,如 start、execute_async、end 和 sync。
    • 在 Airflow 配置 (airflow.cfg) 中注册自定义执行器。
  • 挑战
    • 需要深入了解 Airflow 的执行模型。
    • 自定义代码的维护和调试。

选择合适的执行器

执行器的选择取决于您的用例、基础设施和可扩展性要求。

  • 开发/测试: SequentialExecutor
  • 小型到中型工作负载: LocalExecutor
  • 大规模分布式工作负载: CeleryExecutor
  • 容器化环境: KubernetesExecutor
  • 特殊需求: 自定义执行器

在 Airflow 中定义 DAG

DAG (有向无环图) 表示一组任务,这些任务被组织起来以反映它们的依赖关系和执行顺序。DAG 使用 Python 代码定义。

基本 DAG 定义

实例化 DAG

在定义任务之前必须实例化 DAG 对象。在上面的示例中,dag 对象封装了诸如 schedule_interval、start_date 和 catchup 之类的配置详细信息。

重用装饰的任务

Apache Airflow 2.0 中,引入了 @task 装饰器,它简化了将任务定义为 Python 函数的过程。可重用任务的目标是减少冗余,使代码更具模块化,并促进整个 DAG 的一致性。

让我们分解您提供的示例,以解释可重用任务在 Airflow 中的工作方式。

  • from airflow.decorators import task, dag: 这导入了 @task 装饰器(用于定义任务)和 @dag 装饰器(用于定义有向无环图,即 DAG)。
  • from datetime import datetime: 导入 datetime 以指定 DAG 的开始日期和调度。

使用 @dag 装饰器的 DAG 定义

  • @dag: 此装饰器用于定义 DAG 函数。它封装了调度和组织任务的逻辑。DAG 的参数是
    • schedule_interval='@daily': 指定 DAG 每天运行一次。
    • start_date=datetime(2023, 1, 1): 将 DAG 的开始日期设置为 2023 年 1 月 1 日。
    • catchup=False: 防止 Airflow 为过去未运行的日期运行 DAG(当 DAG 新建或处于开发阶段时,这很有用)。

使用 @task 装饰器的任务定义

在 my_dag DAG 函数内,使用 @task 装饰器定义任务。

获取数据任务

  • @task: fetch_data 函数被装饰为任务。这意味着 Airflow 调度程序现在可以执行它。
  • 功能: 此任务仅返回字符串 "data_fetched",模拟数据检索。

处理数据任务

  • @task: process_data 函数也被装饰为任务。此任务处理输入数据(作为参数接收)。
  • 功能: 它接收 fetch_data 任务的输出(即字符串 "data_fetched")并返回一个处理过的版本(即 "processed_data_fetched")。

保存数据任务

  • @task: save_data 函数被装饰为任务。它旨在保存或打印处理过的数据。
  • 功能: 它接收处理过的数据作为输入并将其打印出来。在实际场景中,这可以替换为将数据保存到数据库或文件的逻辑。
  • raw_data = fetch_data(): 调用 fetch_data 任务。其结果将传递给下一个任务(process_data)。
  • processed = process_data(raw_data): fetch_data 任务的输出作为参数传递给 process_data 任务。
  • save_data(processed): 最后,处理过的数据被传递给 save_data 任务以被“保存”(在本例中为打印)。

这展示了如何通过按顺序调用任务来重用它们。如果您需要在 DAG 的其他部分或不同 DAG 之间重用任务,可以在需要的地方引用任务函数。

使用 TaskFlow API 处理复杂/冲突的 Python 依赖项

Airflow 提供了多种技术来管理有向无环图 (DAG) 中任务之间的复杂或冲突的 Python 依赖项。最有效的方法之一是 @task.virtualenv 装饰器,它为每个任务创建隔离的虚拟环境。这确保了每个任务都需要不同的库版本或配置,可以在同一个 DAG 中共存而不会出现依赖关系冲突。

以下是有关如何使用 @task.virtualenv 以及 DockerOperator(用于预配置的 Python 环境)来处理冲突依赖关系和隔离任务的详细说明。

它的工作原理

  • @task.virtualenv 装饰器 在任务运行时会自动创建并激活该任务的虚拟环境。
  • 您可以指定需要在该虚拟环境中安装的依赖项(要求)列表。每个任务都在其隔离的环境中运行,因此依赖项不会与其他任务冲突。
  • system_site_packages=False 参数确保任务不使用系统范围的包,从而进一步隔离任务环境。

示例代码

  • @task.virtualenv: 此装饰器表示任务将在隔离的虚拟环境中运行。
  • requirements=['pandas==1.3.3', 'numpy==1.21.2']: 指定任务所需的库版本。每次执行任务时,这些依赖项都会安装在隔离的环境中。
  • system_site_packages=False: 确保任务不会继承任何系统范围的 Python 包,这有助于避免与系统依赖项的任何潜在冲突。

使用 Docker 或 Kubernetes 进行依赖项分离

何时使用 Docker 或 Kubernetes

虽然 @task.virtualenv 在隔离 Python 依赖项方面非常有用,但某些任务可能不仅需要不同的 Python 包,还需要完全不同的操作系统库、不同的 Python 版本或复杂的配置。在这种情况下,可以使用 DockerKubernetes 在完全隔离的容器中运行任务,从而允许您为每个任务定义完整的环境。

Airflow 提供了 DockerOperatorKubernetesPodOperator 来实现这一点,它们都允许您分别在 Docker 容器或 Kubernetes pod 中运行任务。

使用 DockerOperator 进行依赖项分离

Airflow 中的 DockerOperator 使您能够在 Docker 容器中运行任务。此运算符允许您完全隔离每个任务的环境,包括操作系统、Python 版本和库。

它的工作原理

  • DockerOperator 从指定的映像启动容器并在该容器内执行任务。
  • 您可以使用预配置的 Docker 映像或指定包含所需依赖项的自定义 Docker 映像。
  • Docker 确保每个任务都有自己的环境,与其他任务无关。

示例代码

  • image='python:3.8-slim': 指定任务使用的 Docker 映像。在这种情况下,任务将使用官方的 python:3.8-slim 映像运行,该映像包含 Python 3.8 和一套最小的库。
  • command='python -c "print(\'Hello from Docker\')"': 这是将在 Docker 容器内执行的命令。在这里,它只是打印一条消息,但在实际用例中,您可以运行一个依赖于容器内库和环境的 Python 脚本。
  • task_id='run_in_docker': 任务的唯一标识符。

使用 Kubernetes Pod Operator 进行依赖项分离

KubernetesPodOperator 使任务能够在具有自定义配置的 Kubernetes pod 中运行。

使用 TaskFlow API 和 Sensor Operators

Sensor Operators 是特殊任务,它们等待特定条件满足后继续执行。

示例:等待文件

有条件地跳过任务

在 Apache Airflow 中,可以根据条件动态控制任务的执行。有时,您可能希望在某些情况下跳过 DAG 中的任务。例如,如果某些数据不可用,可能会跳过某个任务,或者如果某个任务的结果不符合某些标准,则可能不需要运行它。Airflow 提供了多种条件性跳过任务的方法,从而实现了更灵活高效的工作流。

使用 TaskInstance 的 skip 方法

TaskInstance 对象允许您通过在任务的执行逻辑中使用 skip() 方法来手动跳过任务。此方法可用于基于参数值或上游任务来跳过任务。

在此示例中,如果 check_condition() 提供的条件评估为 False,则会跳过 do_something 任务。

使用 BranchPythonOperator 进行条件性跳过

BranchPythonOperator 是 Airflow 中的一种特殊类型的运算符,它允许您根据分支决策有条件地跳过任务。而不是运行所有下游任务,只有其中一个下游任务会根据分支函数所做的决定来执行。其他任务会自动跳过。

  • branch_task 根据某个条件决定 DAG 应该遵循哪个路径。
  • 然后可以使用 BashOperator、PythonOperator 或任何其他运算符来定义每个分支中的任务。

说明

  • BranchPythonOperator: branch_task 根据 check_condition() 的返回值决定遵循哪个路径。如果返回的任务 ID 是 'skip_task',它将跳过 execute_task 任务。
  • skip_task()execute_task() 任务被定义并依赖于 check_condition() 的输出。
  • 分支逻辑: 如果 check_condition() 任务返回 'skip_task',则执行 skip_task,而另一个任务将被跳过。如果它返回 'execute_task',则会发生相反的情况。