Apache Airflow 与对象存储2025年6月8日 | 阅读 8 分钟 引言Apache Airflow 是一个开源工作流自动化工具,可以让你高效地编排复杂的数据工作流。最常见的用例之一是将 Apache Airflow 与对象存储解决方案集成,如 Amazon S3、Google Cloud Storage (GCS) 和 Azure Blob Storage。对象存储是存储结构化和非结构化数据的可扩展、经济高效的解决方案。 本指南将探讨如何利用 Apache Airflow 管理对象存储中的数据。 - 前提条件
- 创建 ObjectStoragePath
- 将数据保存到对象存储
- 分析存储在对象存储中的数据
- 整合
前提条件在将 Apache Airflow 与对象存储集成之前,请确保你具备以下条件: Apache Airflow 安装你需要一个可用的 Apache Airflow 环境。你可以使用以下方法进行安装: 如果你使用的是特定的对象存储提供商(AWS、GCP、Azure),请安装相关的提供商包。 云提供商认证为你的对象存储服务设置认证。 - Amazon S3:通过 IAM 角色、~/.aws/credentials 或环境变量使用 AWS 凭证。
- Google Cloud Storage:设置服务账户并提供密钥 JSON 文件。
- Azure Blob Storage:使用存储账户名称和密钥或 Azure Active Directory 认证。
Airflow 连接通过 Admin -> Connections UI 或使用环境变量在 Airflow 中定义连接凭证。 AWS S3 (aws_default)- 连接 ID:aws_default
- 连接类型:Amazon Web Services
- Extra:{"aws_access_key_id": "your_access_key", "aws_secret_access_key": "your_secret_key"}(如果未使用 IAM 角色,则可选)
- 可以使用环境变量进行配置:AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
字段 | 值 |
---|
连接 ID | aws_default | 连接类型 | 亚马逊网络服务 | 描述 | 用于 S3 访问的 AWS 连接 | Host | (留空) | 模式 | (留空) | 登录 | AWS 访问密钥 ID(如果使用 IAM 角色,则可选) | 密码 | AWS 秘密访问密钥(如果使用 IAM 角色,则可选) | 额外 | {"aws_access_key_id": "your_access_key", "aws_secret_access_key": "your_secret_key"}(如果未使用 IAM 角色,则可选) | 端口 | (留空) |
示例 Extra 字段 JSON(可选) 注意 - 如果你使用 IAM 角色(出于安全考虑推荐),则无需指定 aws_access_key_id 和 aws_secret_access_key。
- 连接可以通过环境变量进行认证,如下所述。
使用环境变量(出于安全考虑推荐)与其在 Airflow UI 中存储凭证,不如使用环境变量配置 AWS 凭证。 在你的 Airflow 环境中设置以下变量: - Airflow 将自动使用这些环境变量进行 AWS 认证。
- 如果在 Docker 或 Kubernetes 中运行 Airflow,可以将这些变量添加到你的容器配置中。
使用 IAM 角色(AWS 安全最佳实践)如果你的 Airflow 实例运行在 AWS 的 EC2、ECS 或 EKS 上,则可以使用 IAM 角色而不是硬编码凭证。 使用 IAM 角色的步骤 将 IAM 角色附加到你的 EC2 实例、ECS 任务或 EKS Pod。 授予角色适当的权限 将以下 IAM 策略附加到角色: 在 Airflow UI 中留空 aws_default 连接(它将自动使用 IAM 角色)。 验证连接设置 AWS 连接后,在 Airflow 中对其进行测试: - 打开 Airflow UI → Admin → Connections → 搜索 aws_default。
- 点击 Test 验证连接。
或者,使用 Airflow CLI 进行测试: 在 DAG 中使用 AWS 连接设置好连接后,你可以在 Airflow 任务中使用它。 上传文件到 S3 从 S3 下载文件Google Cloud Storage (google_cloud_default)- 连接 ID:google_cloud_default
- 连接类型:Google Cloud Platform
- Keyfile Path:指向你的服务账户密钥 JSON 文件的路径
- Extra:{"extra__google_cloud_platform__key_path": "/path/to/your-key.json"}
- 或者,使用 GOOGLE_APPLICATION_CREDENTIALS 环境变量。
设置 GCS 的 Airflow 连接你可以使用 Airflow UI、环境变量或 IAM 角色来配置连接。 选项 1:在 Airflow UI 中配置 - 导航到 Airflow UI → Admin → Connections。
- 点击 + Add Connection。
输入以下详细信息: 字段 | 值 |
---|
连接 ID | google_cloud_default | 连接类型 | Google Cloud Platform | Key file Path | (/path/to/your-key.json(如果使用 IAM 角色,则留空) | 额外 | {"extra__google_cloud_platform__key_path": "/path/to/your-key.json"} | Project ID | (可选,可以在 Extra 字段中设置) |
示例 Extra 字段 JSON(如果未使用 IAM 角色) - key_path:指定服务账户 JSON 文件。
- Project:定义 GCP 项目。
Azure Blob Storage (azure_blob_default)- 连接 ID:azure_blob_default
- 连接类型:Azure Blob Storage
- Extra:{"account_name": "your_storage_account", "account_key": "your_account_key"}
- 可以使用环境变量进行配置:AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_KEY
设置 Azure Blob Storage 的 Airflow 连接你可以三种不同的方式配置连接: - 选项 1:使用 Airflow UI(将凭证存储在 Airflow 中)。
- 选项 2:使用环境变量(更安全)。
- 选项 3:使用托管标识/IAM 角色(安全最佳实践)。
选项 1:在 Airflow UI 中配置 Azure Blob Storage - 打开 Airflow UI → 转到 Admin → Connections。
- 点击 + Add Connection 并输入以下详细信息:
字段 | 值 |
---|
连接 ID | azure_blob_default | 连接类型 | Azure Blob 存储 | 额外 | {"account_name": "your_storage_account", "account_key": "your_account_key"} |
示例 Extra 字段 JSON - account_name:你的 Azure 存储账户名称。
- account_key:你的 Azure 存储账户密钥(敏感信息)。
警告:不建议在生产环境中使用 Airflow UI 存储秘密信息。请改用环境变量或托管标识。 创建 ObjectStoragePathObjectStoragePath 本质上是对对象存储中文件或数据集存储位置的引用。Apache Airflow 提供了钩子和操作符来与对象存储解决方案进行交互。这些路径允许 Airflow 高效地读写云存储。 定义对象存储路径 Apache Airflow 提供了特定的钩子来与不同的对象存储解决方案进行交互。这些钩子充当执行上传、下载和管理文件等各种操作的接口。 Amazon S3- S3Hook 允许与 S3 进行交互,例如列出、下载和上传文件。
- aws_conn_id 指定用于认证的 Airflow 连接 ID。
- object_path 定义 S3 存储桶中的位置。
Google Cloud Storage- GCSHook 用于与 Google Cloud Storage 进行交互。
- gcp_conn_id 引用为 GCP 配置的 Airflow 连接 ID。
- object_path 定义 GCS 存储桶中的位置。
Azure Blob 存储- WasbHook 用于 Azure Blob Storage 操作。
- wasb_conn_id 引用 Azure 的 Airflow 连接 ID。
- object_path 指定 Azure 存储容器中的位置。
用例 - 数据摄取:从各种来源读取数据并将其存储在对象存储中。
- 数据湖集成:将原始数据和处理后的数据存储在基于云的数据湖中。
- 管道暂存:在进一步处理之前临时存储中间数据。
将数据保存到对象存储一旦我们有了对象存储路径,就可以使用 Apache Airflow 操作符来保存数据。 将文件上传到 Amazon S3- 将本地文件(/tmp/output.csv)上传到 S3 存储桶(my-bucket)。
- dest_key 指定存储桶内的目标路径。
将文件上传到 Google Cloud Storage- 将本地文件上传到 GCS 存储桶。
- DST 参数指定存储桶内的目标。
将文件上传到 Azure Blob Storage- 将文件上传到 Azure Blob Storage。
- blob_name 参数指定容器中的对象名称。
这些操作符可以轻松地在 Airflow DAG 中自动化文件上传,确保在云对象存储中进行高效的数据管理。 分析数据一旦数据存储在对象存储中,我们就可以使用 Pandas 和 Spark 等不同工具进行分析,或者通过 Athena(用于 AWS)进行查询。 使用 Pandas 从 S3 读取数据 - 从 S3 存储桶检索 CSV 文件。
- 使用 boto3 与 AWS S3 进行交互。
- 将数据读取到 Pandas DataFrame 中以供进一步分析。
使用 Pandas 从 GCS 读取数据 - 使用 cloud.storage 从 GCS 检索文件。
- 直接将数据读取到 Pandas 中。
使用 Pandas 从 Azure Blob Storage 读取数据 - 使用 storage.blob 从 Azure Blob Storage 下载文件。
- 将文件读取到 Pandas 中以供进一步分析。
数据分析的用例 - 数据验证:加载和检查数据质量。
- 探索性数据分析 (EDA):对存储的数据进行统计分析。
- 机器学习:加载数据以进行训练和评估。
- BI 和报告:与 Tableau 或 Power BI 等可视化工具连接。
通过将 Apache Airflow 与对象存储集成,你可以自动化端到端的数据工作流,从摄取到分析。 整合现在,我们可以在 Apache Airflow 中编排一个端到端的工作流,该工作流将: - 提取数据(从源)。
- 将数据保存到对象存储。
- 执行存储数据的分析。
Airflow 示例 DAG 此 Airflow DAG 执行以下任务: - 提取数据:生成一个示例数据集并将其保存为 CSV 文件。
- 上传到对象存储:将文件传输到 Amazon S3 存储桶。
- 执行分析:从 S3 读取数据,进行处理,并记录结果。
将 Apache Airflow 与对象存储集成对于现代数据工作流至关重要。无论是使用 S3、GCS 还是 Azure Blob Storage,Airflow 都提供了强大的工具来自动化数据移动和分析。通过正确的设置,你可以创建健壮、可扩展的管道来高效地处理大数据。
|