Apache Airflow 核心额外功能2025 年 6 月 8 日 | 9 分钟阅读 引言Apache Airflow 是一个开源的工作流自动化和编排工具,允许用户调度、监控和管理工作流。Airflow 的功能可以通过 “extras”(可选依赖项)进行扩展,这些扩展项可以实现与额外服务、数据库和云提供商的集成。这些扩展项有助于在不同的环境和用例中配置 Airflow。 以下是 Apache Airflow 包扩展的全面参考指南,按不同类别进行了划分。 核心 Airflow 扩展核心扩展在不要求第三方提供者的情况下,提供了 Airflow 核心的额外功能。这些功能包括身份验证、日志记录和 API 扩展。 示例 - Airflow [async]: 添加对异步执行的支持。
- Airflow [celery]: 安装 CeleryExecutor 以实现分布式任务执行。
- Airflow [dask]: 启用 DaskExecutor 以实现任务并行。
- Airflow [postgres]: 添加对 PostgreSQL 元数据数据库的支持。
- Airflow [mysql]: 添加对 MySQL 元数据数据库的支持。
- Airflow [password]: 在 Web UI 中启用密码身份验证。
- Airflow [ldap]: 启用 LDAP 身份验证。
- Airflow [kerberos]: 添加 Kerberos 身份验证支持。
执行与编排扩展这些扩展增强了 Airflow 的执行能力,允许分布式或并行执行任务。 - Airflow [async]: 添加对异步执行的支持,在处理高吞吐量工作负载时可以提高性能。
- Airflow [celery]: 安装 CeleryExecutor,支持在多个工作节点上进行分布式任务执行。这对于扩展 Airflow 工作流很有用。
- Airflow [dask]: 启用 DaskExecutor 的使用,它使用 Dask 并行计算框架来分发任务。这对于大规模并行任务执行很有帮助。
元数据数据库支持扩展Airflow 使用元数据数据库来存储任务状态和 DAG 相关信息。这些扩展允许用户选择自己偏好的数据库。 - Airflow [postgres]: 提供使用 PostgreSQL 作为 Airflow 元数据数据库的支持。由于其稳定性和性能,PostgreSQL 是生产环境中推荐的选择。
- Airflow [mysql]: 启用对 MySQL 作为元数据数据库的支持。虽然 MySQL 被广泛使用,但它在 Airflow 中的某些功能可能不如 PostgreSQL 强大。
身份验证与安全扩展这些扩展通过启用 Airflow Web UI 的不同身份验证机制来增强安全性。 - Airflow [password]: 启用 基于密码的身份验证,允许用户使用用户名和密码登录 Airflow Web 界面。
- Airflow [ldap]: 集成 轻量级目录访问协议 (LDAP) 身份验证,允许组织使用集中的身份验证和授权策略。
- Airflow [kerberos]: 添加对 Kerberos 身份验证的支持,这对于在需要强大身份验证机制的企业环境中保护 Airflow 非常有用。
API 与日志记录扩展这些扩展增强了 Airflow 中的 API 和日志记录功能。 - Airflow [api]: 扩展了 Airflow 的 API 功能,允许对工作流和 DAG 进行更高级的控制。
- Airflow [logging]: 通过允许附加的日志处理程序(例如远程日志记录到 Elasticsearch 或 Google Cloud Storage 等外部系统)来增强 Airflow 的日志记录框架。
如何安装核心扩展 要安装这些扩展,请使用以下语法 例如,要安装带有 CeleryExecutor 和 PostgreSQL 支持的 Airflow 通过利用这些 核心扩展,用户可以根据自己的基础设施和安全要求扩展 Airflow 的功能,而无需第三方提供者。 提供者扩展提供者扩展安装与第三方云服务、数据库和 API 的集成。这些都在 apache-airflow-providers-* 包下维护。 示例 - airflow[amazon]: 安装与 AWS 相关的操作符和钩子(S3、EC2、Lambda 等)。
- Airflow [google]: 添加对 Google Cloud 服务(BigQuery、GCS、GKE 等)的支持。
- Airflow [azure]: 提供与 Microsoft Azure 相关的组件。
- Airflow [snowflake]: 安装 Snowflake 连接器。
- Airflow [salesforce]: 添加 Salesforce 集成。
- Airflow [mongo]: 启用 MongoDB 集成。
- Airflow [oracle]: 安装 Oracle 数据库支持。
- Airflow [mssql]: 添加 Microsoft SQL Server 连接器。
- Airflow [slack]: 提供 Slack 通知和交互功能。
提供者扩展安装与第三方 云服务、数据库、API 和消息平台 的集成。这些都分发在 apache-airflow-providers-* 包下,允许用户通过 Airflow 的操作符、传感器和钩子与外部系统无缝交互。 云服务提供商AWS 集成 安装 Apache Airflow Amazon Provider 以实现与 Amazon Web Services (AWS) 的交互。此包为各种 AWS 服务提供了钩子、操作符和传感器,包括 - S3: 上传/下载 Amazon S3 中的数据。
- EC2: 在 AWS 中管理和配置虚拟机。
- Lambda: 从 Airflow DAGs 触发 AWS Lambda 函数。
- DynamoDB: 对 DynamoDB 执行 CRUD 操作。
- Redshift: 在 Amazon Redshift 中加载和查询数据。
使用以下命令安装 Google Cloud 集成通过安装 Apache Airflow Google Provider,为 Google Cloud Platform (GCP) 服务添加支持。它包括以下操作符、钩子和传感器: - BigQuery: 运行查询并将数据加载到 Google BigQuery。
- Cloud Storage (GCS): 上传/下载文件到/从 Google Cloud Storage。
- GKE (Kubernetes Engine): 管理 GKE 集群和工作负载。
- Dataflow: 在 Google Dataflow 上执行 Apache Beam 管道。
- Cloud Composer: 与 Google 的托管 Airflow 服务集成。
使用以下命令安装 Microsoft Azure 集成为 Microsoft Azure 服务 提供连接器和操作符,包括: - Azure Blob Storage: 上传和下载 Azure 存储中的数据。
- Azure Data Lake: 在 Azure 上管理和处理大数据。
- Azure Functions: 在 Airflow DAGs 中触发 Azure Functions。
- Azure Kubernetes Service (AKS): 部署和管理 Kubernetes 工作负载。
使用以下命令安装 数据库与数据仓库提供者Snowflake 连接器启用与 Snowflake(一个云数据仓库)的连接。它提供了: - 用于在 Snowflake 中执行 SQL 查询的钩子。
- Snowflake 连接的身份验证支持。
使用以下命令安装 airflow[oracle]添加与 Oracle Database 的集成,允许 Airflow DAGs: - 在 Oracle 上执行 SQL 查询。
- 管理 Oracle 数据库连接。
- 从 Oracle 表中获取和处理数据。
使用以下命令安装 airflow[mssql] – Microsoft SQL Server 集成提供与 Microsoft SQL Server (MSSQL) 交互的钩子和操作符。这使得 Airflow 能够: - 运行存储过程并执行 SQL 命令。
- 在 MSSQL 和其他数据库之间连接和传输数据。
- 监控 MSSQL 任务和查询结果。
使用以下命令安装 airflow[mongo] – MongoDB 支持启用与 NoSQL 数据库 MongoDB 的集成。该提供者提供: - 用于插入、更新和查询 MongoDB 集合的钩子。
- 用于批量数据导入的操作符。
- MongoDB 连接的身份验证支持。
使用以下命令安装 CRM 与业务应用程序提供者airflow[salesforce] – Salesforce 集成 提供 Airflow 操作符和钩子以与 Salesforce CRM 交互,支持: - 从 Salesforce 对象(例如,潜在客户、客户)提取数据。
- 将数据从外部源加载到 Salesforce。
- 通过 API 调用自动化 Salesforce 工作流。
使用以下命令安装 消息与通知提供者airflow[slack] – Slack 通知 添加与 Slack 的集成,允许 Airflow 任务: - 发送通知到 Slack 频道。
- 与 Slack 机器人交互以进行工作流监控。
- 将 DAG 执行结果发布到 Slack。
使用以下命令安装 如何安装多个提供者扩展您可以通过在命令中指定它们来一次性安装多个提供者。例如: 通过利用 提供者扩展,Airflow 可以无缝地与云平台、数据库、消息服务和业务应用程序集成,从而构建可扩展的自动化工作流。 Apache 软件扩展Apache 软件扩展将 Airflow 与其他 Apache 项目集成,以增强其在大数据、消息传递和分布式处理方面的能力。Apache 软件扩展 通过与其他 Apache 项目 结合来扩展 Airflow 的功能,从而实现 大数据处理、事件流和分布式计算 中的工作流。这些集成允许 Airflow 协调 Hadoop、Spark、Kafka、Flink 和 Beam 等 Apache 工具中的作业。以下是这些扩展的详细 breakdown。 示例 - Airflow [hdfs]: 提供 Hadoop 分布式文件系统 (HDFS) 支持。
- Airflow [hive]: 添加 Apache Hive 集成。
- Airflow [kafka]: 安装 Kafka 集成以实现事件流。
- Airflow [spark]: 提供 Apache Spark 操作符和钩子。
- Airflow [beam]: 启用 Apache Beam 管道执行。
- Airflow [flink]: 安装 Apache Flink 连接器。
大数据与分布式文件系统airflow[hdfs] – Hadoop 分布式文件系统 (HDFS) 集成 此扩展使 Airflow 能够与 HDFS 交互,HDFS 是 Apache Hadoop 的主要存储系统,用于存储大规模分布式数据。它包括: - 用于从 HDFS 目录 读取和写入的钩子和操作符。
- 在 Airflow DAGs 中支持 Hadoop CLI 命令。
- 与 Apache Hive 和 Apache Spark 等基于 HDFS 的工具的兼容性。
安装 airflow[hive]: Apache Hive 集成Apache Hive 是构建在 Hadoop 之上的数据仓库系统,用于使用类 SQL 查询(HiveQL)查询和管理大型数据集。此扩展提供: - 用于在 Hive 中执行 HiveQL 查询 的钩子。
- 用于管理 Hive 表、分区和查询 的操作符。
- 支持从 Hive 表 加载和提取数据。
- 与 HDFS 和 Hadoop 生态系统 的兼容性。
安装 事件流与消息传递airflow[kafka] Apache Kafka 是一个分布式事件流平台,用于实时数据管道和消息传递。此扩展允许 Airflow: - 从 Kafka 主题 生产和消费消息。
- 基于 Kafka 事件触发 Airflow DAGs。
- 管理用于流式处理工作流的 Kafka 生产者和消费者。
安装 分布式数据处理与分析airflow[spark] Apache Spark 是一个强大的分布式数据处理引擎,用于大数据分析和机器学习。此扩展使 Airflow 能够: - 提交 Spark 作业(批处理和流式处理)。
- 执行 PySpark 和 Spark SQL 查询。
- 在 YARN、Kubernetes 或独立集群 上运行 Spark 应用程序。
- 从 Airflow 管理 Spark 配置和日志记录。
安装 airflow[beam] – Apache Beam 集成Apache Beam 是一个统一的批处理和流式数据处理编程模型,通常与 Google Dataflow、Apache Spark 和 Apache Flink 一起使用。此扩展允许 Airflow: - 编排 Apache Beam 管道。
- 以 Java、Python 或 Go 执行 Beam 作业。
- 支持 批处理和流式 数据处理任务。
- 在 Google Dataflow、Spark 或 Flink 上运行 Beam 工作流。
安装 airflow[flink] – Apache Flink 集成Apache Flink 是一个强大的 流式数据处理 框架,专为实时事件处理而设计。此扩展提供: - 用于 提交和监控 Flink 作业 的钩子和操作符。
- 对 Flink SQL 查询和 Table API 的支持。
- 与 Kafka、Beam 和其他流处理工具 的兼容性。
- 在 Airflow DAGs 中执行 Flink 批处理和流式作业。
安装 如何安装多个 Apache 软件扩展您可以通过在命令中指定它们来一次性安装多个 Apache 集成。例如: 这些 Apache 软件扩展 增强了 Airflow 在 大数据、分布式计算和实时流处理 方面的能力,使其成为 数据工程管道 的强大编排工具。 本地安装软件扩展一些扩展提供了对本地安装软件的支持,允许 Airflow 与主机上的不同工具集成。 示例 - Airflow [jdbc]: 启用 JDBC 连接以访问外部数据库。
- Airflow [odbc]: 添加 ODBC 支持。
- Airflow [docker]: 安装与 Docker 相关的依赖项。
- Airflow [ssh]: 提供 SSH 和 SFTP 操作符。
- Airflow [grpc]: 启用 gRPC 通信。
- Airflow [numpy]: 安装 NumPy 用于数值计算。
- Airflow [pandas]: 添加 Pandas 用于数据操作。
一些 Apache Airflow 扩展支持 本地安装的软件,允许 Airflow 与主机上可用的工具集成,而不是外部云服务或分布式系统。这些扩展支持 数据库连接、远程执行、容器化、数值计算和进程间通信。 以下是每个 本地安装软件扩展 的详细解释: 数据库连接扩展airflow[jdbc] – JDBC (Java Database Connectivity) 集成 JDBC 是一个标准的 API,用于使用 Java 连接到关系型数据库。此扩展支持 Airflow: - 建立基于 JDBC 的数据库连接。
- 在 支持 JDBC 的数据库(例如 Oracle、MySQL、PostgreSQL、SQL Server)上运行 SQL 查询。
- 使用安装在本地机器上的 JDBC 驱动程序。
注意:需要本地安装 Java 运行时环境 (JRE) 和 JDBC 驱动程序。安装 airflow[odbc] ODBC (Open Database Connectivity) 支持ODBC 是一个标准的 API,用于访问与数据库系统无关的数据库。此扩展允许 Airflow: - 连接到 ODBC 兼容数据库(例如 Microsoft SQL Server、Snowflake、IBM Db2)。
- 通过 ODBC 连接执行 SQL 查询。
- 使用安装在本地的 Windows、Linux 和 macOS ODBC 驱动程序。
注意:需要将目标数据库的相应 ODBC 驱动程序安装在宿主系统上。安装 远程执行与文件传输扩展airflow[ssh] :SH 与 SFTP 支持 此扩展通过 安全外壳 (SSH) 和安全文件传输协议 (SFTP) 支持 远程执行 和 文件传输。它提供: - 用于在 远程服务器上执行命令 的 SSH 钩子和操作符。
- SFTP 支持,用于 安全地传输文件 之间。
- 与 基于 Linux 的系统、云虚拟机和本地服务器 的集成。
安装 gRPC 是 Google 开发的高性能 RPC(远程过程调用)框架。此扩展允许 Airflow: - 与 gRPC 服务 通信。
- 在 DAG 中调用远程 gRPC 过程。
- 将 gRPC 用于 基于微服务的架构。
安装 容器与虚拟化支持airflow[docker]: Docker 集成 Docker 允许将应用程序打包成轻量级、可移植的容器。此扩展允许 Airflow: - 在 DAG 中运行 Docker 容器。
- 管理 Docker 镜像 和 容器化应用程序。
- 使用容器执行 任务隔离。
注意:需要在本地机器上安装并运行 Docker Engine。安装 数据处理与数值计算扩展airflow[numpy] – NumPy 用于数值计算 NumPy 是 Python 中 科学计算 的基础包,广泛用于处理大型数值数据集。此扩展允许 Airflow: - 在 DAG 中执行 数值运算。
- 处理 数组、矩阵和数学函数。
- 提高涉及数值数据的数据管道的 性能。
安装
|