使用 Apache Airflow 构建和运行管道

2025 年 6 月 10 日 | 阅读 8 分钟

引言

Apache Airflow 是一个开源平台,旨在以编程方式创建、调度和监控工作流。通过使用 Python 将工作流定义为有向无环图 (DAG),Airflow 使用户能够轻松管理复杂的工作流。本文将深入探讨构建和运行数据管道,涵盖初始设置、表创建任务、SQL 集成、数据检索、数据合并以及最终的 DAG。

初始设置

Apache Airflow 是一个用于编排复杂工作流的平台。要正确设置它,请遵循这些详细步骤。

前提条件

Python 环境

  • 确保您的系统已安装 Python 3.7 或更高版本。
  • 检查您的 Python 版本

如果未安装 Python,请从 python.org 下载并遵循安装说明。

虚拟环境

  • 使用虚拟环境可以隔离 Airflow 依赖项,避免与其他 Python 项目发生冲突。
  • 创建虚拟环境
  • 激活虚拟环境
  • Linux/MacOS 上:
  • Windows 上:

安装 Apache Airflow

  • 进入虚拟环境后,安装 Apache Airflow。
  • 根据您的需求,您可能需要指定版本或包含额外的功能(例如 Postgres、celery)。

数据库初始化

Apache Airflow 使用元数据数据库来存储有关 DAG(有向无环图)、任务状态和执行日志的信息。

初始化数据库

  • 运行以下命令来设置 SQLite 数据库(默认)或其他已配置的数据库。
  • 这将创建必要的表并初始化元数据。

启动 Airflow 服务

启动 Airflow Webserver

  • 启动 Airflow Web 界面。
  • 默认情况下,Webserver 运行在端口 8080 上。您可以通过指定其他端口来更改它。

启动 Scheduler

  • 打开另一个终端(保持 Webserver 运行),并激活相同的虚拟环境。
  • 启动 Scheduler 来执行 DAG 中的任务。

访问 Airflow UI

  • 打开浏览器并导航到:
  • 使用默认凭据登录
    • 用户名: admin
    • 密码: admin
  • 首次登录后请更改凭据。

创建 DAGs 目录

默认位置

  • Airflow 的 DAG 文件默认目录是:

验证目录

  • 确保此目录存在或创建它。

配置自定义 DAG 目录(可选)

  • 如果您想使用其他目录来存放 DAG,请编辑 airflow.cfg 文件。

找到 `dags_folder` 设置并更新为您的首选目录路径。

放置您的 DAG

  • 将您的 DAG Python 脚本保存在 `dags` 目录中。Airflow 会自动检测并加载它们。

表创建任务

在此示例中,我们将创建一个管道来处理数据并将其合并到数据库表中。首先,定义用于创建必要表的任务。

步骤 1:定义 DAG

在 `dags` 目录中创建一个 Python 文件 `data_pipeline.py`。

步骤 2:添加表创建任务

创建任务来设置两个表:`source_table` 和 `destination_table`。

可选:使用文件中的 SQL

为了提高代码的可读性,请将 SQL 查询存储在项目目录中的 `SQL` 文件夹内的单独 `.sql` 文件中。更新任务以引用这些文件。

确保 `sql/create_source_table.sql` 文件包含:

数据检索任务

步骤 1:定义提取函数

`fetch_data` 函数负责将模拟数据插入 SQLite 数据库。

关键点

  1. 数据库连接
    • 创建或连接到 `airflow.db` SQLite 数据库。
    • 在插入数据之前,确保 `source_table` 表存在。
  2. 数据插入
    • 将模拟数据(三行:Alice、Bob、Charlie)插入 `source_table`。
  3. 最佳实践
    • 错误处理:在生产环境中,添加 try-except 块来处理数据库连接问题或 SQL 语法错误等问题。
    • 日志记录:使用 Airflow 的日志记录系统以获得更好的任务监控。

步骤 2:向 DAG 添加任务

Airflow 中的 `PythonOperator` 将 Python 函数作为任务在工作流中执行。

说明

  1. DAG 初始化
    • `dag_id`:DAG 的唯一标识符。
    • `start_date`:DAG 可以运行的最早日期。
    • `schedule_interval`:定义 DAG 的运行时间(设置为 `None` 以手动触发)。
  2. 任务定义
    • `task_id`:DAG 中任务的唯一标识符。
    • `python_callable`:要执行的函数(`fetch_data`)。
  3. 任务依赖项
    • `create_destination_table >> fetch_data_task` 确保 `fetch_data_task` 仅在 `create_destination_table` 任务完成后运行。

执行流程

  1. 启动 Airflow 服务(Webserver 和 Scheduler)。
  2. 将此 DAG 脚本放在您的 `~/airflow/dags` 目录中。
  3. 导航到 Airflow UI (https://:8080)。
  4. 手动触发 DAG 或等待其根据计划运行。

其他说明

  • 测试:在将 `fetch_data` 添加到 DAG 之前,单独运行它以确保它按预期工作。
  • 数据库选择:为简单起见,使用了 SQLite。在生产环境中,请考虑使用 PostgreSQL 或 MySQL 等功能更强大的数据库。
  • 日志记录:使用 Airflow 的内置日志来跟踪 `fetch_data_task` 的执行情况。

此设置将成功模拟数据检索并将其集成到 Airflow 管道中。如果您想进一步扩展此管道,请告诉我!

数据合并任务

步骤 1:定义合并函数

  • 数据库连接:连接到 `source_table` 所在的同一个 `airflow.db` SQLite 数据库。
  • 目标表创建:通过创建具有以下列的 `destination_table` 来确保其存在:
    • Id:主键。
    • Name:人员姓名。
    • Total:聚合值。
  • 数据合并逻辑:使用 SQL `INSERT INTO ... SELECT` 查询来:
    • 从 `source_table` 中选择 id、name 和按 id 和 name 分组的值的总和。
    • 将结果插入 `destination_table`。
  • 事务管理
    • `conn.commit()` 调用确保所有更改都已保存。
    • `conn.close()` 语句释放数据库连接。
  • 错误处理(可选)
    • 在生产环境中,请使用 try-except 块包含错误处理,以捕获模式不匹配或 SQL 错误等问题。

步骤 2:向 DAG 添加任务

说明

  • 任务定义
    • `task_id`:此任务在 DAG 中的唯一标识符。
    • `python_callable`:此任务将执行的函数 (`merge_data`)。
  • 任务依赖项
    • 行 `fetch_data_task >> merge_data_task` 确保:
    • `merge_data_task` 仅在 `fetch_data_task` 完成后开始。

执行流程

  1. 准备环境
    • 确保 `source_table` 和 `destination_table` 都存在,或者在任务执行期间创建它们。
  2. 运行 DAG
    • 将更新后的 DAG 脚本放在 `~/airflow/dags` 目录中。
    • 启动 Airflow 服务(Webserver 和 Scheduler)。
    • 打开 Airflow UI (https://:8080)。
    • 手动触发 DAG 或等待其根据定义的计划运行。
  3. 监控结果
    • 通过直接查询 SQLite 数据库来验证 `destination_table` 中的数据。

附加考虑事项

测试:在将 `merge_data` 函数集成到 DAG 之前,独立测试它以确认它按预期合并数据。

可扩展性:在生产环境中使用更强大的数据库(例如 PostgreSQL 或 MySQL)以获得更好的可扩展性。

  • 如果切换到其他系统,请用与数据库无关的查询替换特定于 SQLite 的查询。

日志记录:使用 Airflow 的日志记录机制来记录关键事件,例如处理的行数或任何 SQL 错误。

错误处理:添加回退逻辑或通知(例如,电子邮件警报)以防失败。

完成我们的 DAG

完成的 DAG 编排了从表创建到数据合并的整个工作流。这是最终的 `data_pipeline.py`。

  • 使用 Apache Airflow 构建和运行管道可以实现强大的工作流编排。
  • 通过遵循上述步骤,您可以创建一个能够初始化表、检索和处理数据、并有效合并结果的管道。

这种模块化方法确保了工作流的可扩展性、可维护性和清晰性。凭借 Airflow 的灵活性,您可以扩展此管道以包含更复杂的转换和集成,使其成为您数据工程任务的基石。