Apache Airflow 与对象存储

2025年6月8日 | 阅读 8 分钟

引言

Apache Airflow 是一个开源工作流自动化工具,可以让你高效地编排复杂的数据工作流。最常见的用例之一是将 Apache Airflow 与对象存储解决方案集成,如 Amazon S3Google Cloud Storage (GCS)Azure Blob Storage。对象存储是存储结构化和非结构化数据的可扩展、经济高效的解决方案。

本指南将探讨如何利用 Apache Airflow 管理对象存储中的数据。

  • 前提条件
  • 创建 ObjectStoragePath
  • 将数据保存到对象存储
  • 分析存储在对象存储中的数据
  • 整合

前提条件

在将 Apache Airflow 与对象存储集成之前,请确保你具备以下条件:

Apache Airflow 安装

你需要一个可用的 Apache Airflow 环境。你可以使用以下方法进行安装:

如果你使用的是特定的对象存储提供商(AWS、GCP、Azure),请安装相关的提供商包。

云提供商认证

为你的对象存储服务设置认证。

  • Amazon S3:通过 IAM 角色、~/.aws/credentials 或环境变量使用 AWS 凭证。
  • Google Cloud Storage:设置服务账户并提供密钥 JSON 文件。
  • Azure Blob Storage:使用存储账户名称和密钥或 Azure Active Directory 认证。

Airflow 连接

通过 Admin -> Connections UI 或使用环境变量在 Airflow 中定义连接凭证。

AWS S3 (aws_default)

  • 连接 ID:aws_default
  • 连接类型:Amazon Web Services
  • Extra:{"aws_access_key_id": "your_access_key", "aws_secret_access_key": "your_secret_key"}(如果未使用 IAM 角色,则可选)
  • 可以使用环境变量进行配置:AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
字段
连接 IDaws_default
连接类型亚马逊网络服务
描述用于 S3 访问的 AWS 连接
Host(留空)
模式(留空)
登录AWS 访问密钥 ID(如果使用 IAM 角色,则可选)
密码AWS 秘密访问密钥(如果使用 IAM 角色,则可选)
额外{"aws_access_key_id": "your_access_key", "aws_secret_access_key": "your_secret_key"}(如果未使用 IAM 角色,则可选)
端口(留空)

示例 Extra 字段 JSON(可选)

注意

  • 如果你使用 IAM 角色(出于安全考虑推荐),则无需指定 aws_access_key_id 和 aws_secret_access_key。
  • 连接可以通过环境变量进行认证,如下所述。

使用环境变量(出于安全考虑推荐)

与其在 Airflow UI 中存储凭证,不如使用环境变量配置 AWS 凭证。

在你的 Airflow 环境中设置以下变量:

  • Airflow 将自动使用这些环境变量进行 AWS 认证。
  • 如果在 Docker 或 Kubernetes 中运行 Airflow,可以将这些变量添加到你的容器配置中。

使用 IAM 角色(AWS 安全最佳实践)

如果你的 Airflow 实例运行在 AWS 的 EC2、ECS 或 EKS 上,则可以使用 IAM 角色而不是硬编码凭证。

使用 IAM 角色的步骤

将 IAM 角色附加到你的 EC2 实例、ECS 任务或 EKS Pod。

授予角色适当的权限

将以下 IAM 策略附加到角色:

在 Airflow UI 中留空 aws_default 连接(它将自动使用 IAM 角色)。

验证连接

设置 AWS 连接后,在 Airflow 中对其进行测试:

  1. 打开 Airflow UI → AdminConnections → 搜索 aws_default。
  2. 点击 Test 验证连接。

或者,使用 Airflow CLI 进行测试:

在 DAG 中使用 AWS 连接

设置好连接后,你可以在 Airflow 任务中使用它。

上传文件到 S3

从 S3 下载文件

Google Cloud Storage (google_cloud_default)

  • 连接 ID:google_cloud_default
  • 连接类型:Google Cloud Platform
  • Keyfile Path:指向你的服务账户密钥 JSON 文件的路径
  • Extra:{"extra__google_cloud_platform__key_path": "/path/to/your-key.json"}
  • 或者,使用 GOOGLE_APPLICATION_CREDENTIALS 环境变量。

设置 GCS 的 Airflow 连接

你可以使用 Airflow UI、环境变量或 IAM 角色来配置连接。

选项 1:在 Airflow UI 中配置

  1. 导航到 Airflow UI → Admin → Connections。
  2. 点击 + Add Connection

输入以下详细信息:

字段
连接 IDgoogle_cloud_default
连接类型Google Cloud Platform
Key file Path(/path/to/your-key.json(如果使用 IAM 角色,则留空)
额外{"extra__google_cloud_platform__key_path": "/path/to/your-key.json"}
Project ID(可选,可以在 Extra 字段中设置)

示例 Extra 字段 JSON(如果未使用 IAM 角色)

  • key_path:指定服务账户 JSON 文件。
  • Project:定义 GCP 项目。

Azure Blob Storage (azure_blob_default)

  • 连接 ID:azure_blob_default
  • 连接类型:Azure Blob Storage
  • Extra:{"account_name": "your_storage_account", "account_key": "your_account_key"}
  • 可以使用环境变量进行配置:AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_KEY

设置 Azure Blob Storage 的 Airflow 连接

你可以三种不同的方式配置连接:

  • 选项 1:使用 Airflow UI(将凭证存储在 Airflow 中)。
  • 选项 2:使用环境变量(更安全)。
  • 选项 3:使用托管标识/IAM 角色(安全最佳实践)。

选项 1:在 Airflow UI 中配置 Azure Blob Storage

  1. 打开 Airflow UI → 转到 AdminConnections。
  2. 点击 + Add Connection 并输入以下详细信息:
字段
连接 IDazure_blob_default
连接类型Azure Blob 存储
额外{"account_name": "your_storage_account", "account_key": "your_account_key"}

示例 Extra 字段 JSON

  • account_name:你的 Azure 存储账户名称。
  • account_key:你的 Azure 存储账户密钥(敏感信息)。

警告:不建议在生产环境中使用 Airflow UI 存储秘密信息。请改用环境变量托管标识

创建 ObjectStoragePath

ObjectStoragePath 本质上是对对象存储中文件或数据集存储位置的引用。Apache Airflow 提供了钩子和操作符来与对象存储解决方案进行交互。这些路径允许 Airflow 高效地读写云存储。

定义对象存储路径

Apache Airflow 提供了特定的钩子来与不同的对象存储解决方案进行交互。这些钩子充当执行上传、下载和管理文件等各种操作的接口。

Amazon S3

  • S3Hook 允许与 S3 进行交互,例如列出、下载和上传文件。
  • aws_conn_id 指定用于认证的 Airflow 连接 ID。
  • object_path 定义 S3 存储桶中的位置。

Google Cloud Storage

  • GCSHook 用于与 Google Cloud Storage 进行交互。
  • gcp_conn_id 引用为 GCP 配置的 Airflow 连接 ID。
  • object_path 定义 GCS 存储桶中的位置。

Azure Blob 存储

  • WasbHook 用于 Azure Blob Storage 操作。
  • wasb_conn_id 引用 Azure 的 Airflow 连接 ID。
  • object_path 指定 Azure 存储容器中的位置。

用例

  • 数据摄取:从各种来源读取数据并将其存储在对象存储中。
  • 数据湖集成:将原始数据和处理后的数据存储在基于云的数据湖中。
  • 管道暂存:在进一步处理之前临时存储中间数据。

将数据保存到对象存储

一旦我们有了对象存储路径,就可以使用 Apache Airflow 操作符来保存数据。

将文件上传到 Amazon S3

  • 将本地文件(/tmp/output.csv)上传到 S3 存储桶(my-bucket)。
  • dest_key 指定存储桶内的目标路径。

将文件上传到 Google Cloud Storage

  • 将本地文件上传到 GCS 存储桶。
  • DST 参数指定存储桶内的目标。

将文件上传到 Azure Blob Storage

  • 将文件上传到 Azure Blob Storage。
  • blob_name 参数指定容器中的对象名称。

这些操作符可以轻松地在 Airflow DAG 中自动化文件上传,确保在云对象存储中进行高效的数据管理。

分析数据

一旦数据存储在对象存储中,我们就可以使用 Pandas 和 Spark 等不同工具进行分析,或者通过 Athena(用于 AWS)进行查询。

使用 Pandas 从 S3 读取数据

  • 从 S3 存储桶检索 CSV 文件。
  • 使用 boto3 与 AWS S3 进行交互。
  • 将数据读取到 Pandas DataFrame 中以供进一步分析。

使用 Pandas 从 GCS 读取数据

  • 使用 cloud.storage 从 GCS 检索文件。
  • 直接将数据读取到 Pandas 中。

使用 Pandas 从 Azure Blob Storage 读取数据

  • 使用 storage.blob 从 Azure Blob Storage 下载文件。
  • 将文件读取到 Pandas 中以供进一步分析。

数据分析的用例

  • 数据验证:加载和检查数据质量。
  • 探索性数据分析 (EDA):对存储的数据进行统计分析。
  • 机器学习:加载数据以进行训练和评估。
  • BI 和报告:与 Tableau 或 Power BI 等可视化工具连接。

通过将 Apache Airflow 与对象存储集成,你可以自动化端到端的数据工作流,从摄取到分析。

整合

现在,我们可以在 Apache Airflow 中编排一个端到端的工作流,该工作流将:

  1. 提取数据(从源)。
  2. 将数据保存到对象存储。
  3. 执行存储数据的分析

Airflow 示例 DAG

此 Airflow DAG 执行以下任务:

  1. 提取数据:生成一个示例数据集并将其保存为 CSV 文件。
  2. 上传到对象存储:将文件传输到 Amazon S3 存储桶。
  3. 执行分析:从 S3 读取数据,进行处理,并记录结果。

将 Apache Airflow 与对象存储集成对于现代数据工作流至关重要。无论是使用 S3、GCS 还是 Azure Blob Storage,Airflow 都提供了强大的工具来自动化数据移动和分析。通过正确的设置,你可以创建健壮、可扩展的管道来高效地处理大数据。