Airflow 机器学习2025 年 1 月 7 日 | 阅读 9 分钟 引言Apache Airflow 是一个平台,可以帮助人们创建调度和监控工作流。它在管理机器学习管道方面尤其有用,通常涉及数据提取、预处理、模型训练和部署等相互关联的任务。将每个任务定义为有向无环图 (DAG) 可以确保正确的执行顺序并自动处理依赖关系。 什么是 Airflow?在机器学习 (ML) 中,“airflow”通常指 Apache Airflow,这是一个用于以编程方式创建、调度和监控工作流的开源平台。它在机器学习中对于组织和自动化模型训练、验证和部署所需的数据管道特别有用。以下是一些关于 Apache Airflow 如何在机器学习领域提供帮助的重点: 1. 数据管道编排 它允许用户在 Airflow 中将工作流设计为作业的有向无环图 (DAG)。DAG 中的这些任务可以代表 ML 管道的阶段,例如数据摄取、处理、模型构建、验证和部署。它确保正确的工作按正确的顺序执行,并具有适当的依赖关系处理。 2. 调度和监控 Airflow 拥有强大的调度功能,允许您按不同的时间间隔运行作业,例如每天或每小时。它还具有 Web UI 等监控工具,可帮助跟踪进程进度,从而更容易识别和解决问题。 3. 可扩展性 Airflow 通过将作业分布在多个工作节点上来实现扩展;这对于需要大量计算资源(例如,在海量数据集上训练模型)的机器学习工作负载尤其有用。 4. 与 ML 工具集成 Airflow 可与各种 ML 和数据处理工具及平台一起使用,包括 - 数据存储:与数据库、数据仓库和云存储(例如 Amazon S3、Google Cloud Storage)集成
- 数据处理:支持 Apache Spark 和 Hadoop 等数据处理框架。
- ML 库:与 TensorFlow、PyTorch、Scikit-Learn 等 ML 库和框架配合使用
- 云服务:支持云 ML 服务,例如 Google AI Platform、Amazon SageMaker、Azure ML 等。
5. 可扩展性 这意味着 Airflow 在开发方面具有高度灵活性。此外,还可以创建自定义操作符来执行特定操作,甚至自定义其机器学习作业,例如,可以对操作符进行编程以执行训练机器学习模型的 Python 脚本。 6. 版本控制和实验 Airflow 通过脚本化流程来简化机器学习管道的版本控制。这增加了可重复性和实验性,这对于构建健壮的 ML 模型至关重要。 核心组件- 调度器 (Scheduler):调度器协调 DAG 中定义的任务的执行。它根据定义的计划和依赖关系确定每个作业应何时运行。
- 执行器 (Executor):执行器为工作节点完成任务。Airflow 具有多种类型的执行器,包括 SequentialExecutor(用于单任务执行)、LocalExecutor(用于在单台计算机上并行执行)和 CeleryExecutor(用于在多个工作节点上分布式执行)。
- DAG:有向无环图 (DAG) 是 Airflow 的基础。它们将流程显示为一组相关的作业,每个 DAG 指定如何执行任务。
- 操作符 (Operators):操作符在 DAG 中创建离散的作业。Airflow 中的许多内置操作符可以执行常见任务,例如 BashOperator(运行 bash 命令)和 PythonOperator(执行 Python 函数);用户还可以创建自定义操作符来执行特定任务。
- 任务实例 (Task Instances):任务实例发生在 DAG 执行过程中。UI 可视化 DAG 和任务关系。
- Web UI (用户界面):Airflow 提供了一个基于 Web 的 UI,允许用户监控 DAG、查看任务状态、检查日志以及手动触发任务执行。UI 提供 DAG 和任务依赖关系的图形表示。
- 插件 (Plugins):AIRFLOW 具有插件,可以通过添加新的操作符、传感器和钩子等来扩展其功能。用户可以创建自定义插件,将 airflow 与其他系统和服务集成。
- 配置文件:Airflow 需要一个配置文件(通常命名为 airflow.cfg)来确定其环境中参数,例如元数据数据库文件、执行器配置、身份验证设置的位置。
Airflow 的基本概念Apache Airflow 是一个强大的平台,用于管理复杂流程。掌握其基本概念对于正确使用 Airflow 至关重要。 1. DAG (有向无环图) 定义:DAG 根据任务的链接和依赖关系组织任务。 结构:它描述工作流,其中节点代表任务,边表示依赖关系。 执行:Airflow 保证作业按照 DAG 定义的顺序执行。 示例 说明 - from airflow import DAG: 从 Apache Airflow 导入 DAG 类以定义工作流。
- from airflow.operators.python_operator import PythonOperator: 导入 PythonOperator 以将 Python 函数作为任务执行。
- from datetime import datetime: 导入 datetime 类以处理日期和时间信息。
- def task1():: 定义一个名为 task1 的 Python 函数。
- def task2():: 定义另一个名为 task2 的 Python 函数。
- default_args = {...}: 定义一个包含 DAG 默认参数的字典。
- 'owner': 'airflow': 指定 DAG 的所有者。
- 'start_date': datetime(2024, 5, 24): 将 DAG 执行的开始日期设置为 2024 年 5 月 24 日。
- dag = DAG('example_dag', default_args=default_args, schedule_interval='@daily'): 创建一个名为 'example_dag' 的新 DAG。
- default_args=default_args: 设置 DAG 的默认参数。
- schedule_interval='@daily': 定义 DAG 的调度间隔为每天运行。
- task_1 = PythonOperator(task_id='task1', python_callable=task1, dag=dag): 使用 PythonOperator 定义一个名为 task1 的任务。它执行 task1 函数。
- task_2 = PythonOperator(task_id='task2', python_callable=task2, dag=dag): 使用 PythonOperator 定义一个名为 task2 的任务。它执行 task2 函数。
- task_1 >> task_2: 指定 task2 应在 task1 之后运行。
2. 任务 (Tasks) 定义:任务是 Airflow 中工作的基本单位。每个作业代表对脚本、命令或函数的单次执行。 操作符:任务由操作符创建,操作符指定要执行的工作类型(例如,BashOperator 用于运行 bash 命令,PythonOperator 用于执行 Python 函数)。 示例 说明 - def my_task():: 定义一个名为 my_task 的 Python 函数。此函数不接受任何参数。
- task = PythonOperator(: 使用 Apache Airflow 提供的 PythonOperator 类初始化任务。
- task_id='my_task': 为任务分配标识符 'my_task'。此 ID 用于在 DAG 中引用任务。
- python_callable=my_task: 指定在任务运行时要执行的 Python 函数 my_task。
- dag=dag): 将任务与特定的 DAG (dag) 相关联,表明此任务是该 DAG 的一部分。
3. 操作符 (Operators) 类型:操作符提供要执行的操作。常见的类型包括 BashOperator:运行 Bash 命令。 PythonOperator:运行 Python 函数。 PostgresOperator:在 Postgres 数据库上运行 SQL 语句。 传感器 (Sensor):等待满足特定条件后再继续。 自定义操作符:用户可以开发自定义操作符以满足特定要求。 示例 说明 - 此任务执行一个简单的 bash 命令(echo Hello World)。
- 它使用 BashOperator 定义,ID 为 'bash_task'。
- 任务已添加到 DAG (dag)。
- 此任务在执行时运行一个 Python 函数 (my_python_task)。
- 它使用 PythonOperator 定义,ID 为 'python_task'。
- 任务已添加到 DAG (dag)。
- 此任务在 PostgreSQL 数据库上执行 SQL 查询(SELECT * FROM my_table)。
- 它使用 PostgresOperator 定义,ID 为 'sql_task'。
- 它需要连接到 PostgreSQL 数据库(postgres_conn_id='my_postgres_conn')。
- 任务已添加到 DAG (dag)。
4. 任务实例 (Task Instance) 定义:任务实例是单个任务的独立执行。它描述了任务在特定时间点的状态。 状态:任务实例可以处于多种状态,包括运行中、成功、失败和跳过。 示例 Airflow 的在线 UI 允许您查看和监控任务实例。 5. 调度器 (Scheduler) 作用:调度器负责安排作业运行。它根据 DAG 的计划和依赖关系选择执行顺序。 功能:持续监控 DAG 和任务的状态以确定下一步操作。 6. 执行器 (Executor) 定义:执行器是执行任务的组件。 类型:有几种执行器可用 - SequentialExecutor:连续执行任务。
- LocalExecutor:在同一系统上并行执行任务。
- CeleryExecutor 将任务分布在多个工作节点上,因此适用于大规模工作负载。
7. Web UI 目的:Web UI 提供了一个基于 Web 的界面用于流程管理和监控。 功能:它包括检查 DAG、监控任务状态、读取日志和手动触发任务等功能。 8. 元数据数据库 作用:它存储有关 DAG、任务实例、用户和配置参数的信息。 后端选项:可以设置为使用 SQLite、MySQL 或 PostgreSQL 数据库。 9. 钩子 (Hooks) 定义:钩子是到外部系统的连接器,允许操作符执行操作。 功能:启用与数据库、云存储和 API 等服务的接口。 示例
说明 - hook = PostgresHook(postgres_conn_id='my_postgres_conn'): 初始化一个 PostgresHook,它提供到名为 'my_postgres_conn' 的 PostgreSQL 数据库的连接。
- hook.run('SELECT * FROM my_table'): 在连接的 PostgreSQL 数据库上执行 SQL 查询 'SELECT * FROM my_table'。
10. 插件 (Plugins) 目的:插件通过包含自定义操作符、传感器、钩子或 UI 组件来增强 Airflow 的功能。 用法:非常适合将 Airflow 与新服务连接或创建自定义功能。 11. 连接 (Connections) 定义:用于连接到其他系统的信息,例如凭据、URL 和其他设置。 配置:可以通过 Airflow UI 或在配置文件中进行配置。 12. 变量 (Variables) 目的:允许您将任意设置保存并检索为键值对。 用法:非常适合动态定制流程而无需修改代码。 示例
说明 - my_var = Variable.get("my_key", default_var="default_value"): 从 Airflow 的 Variable 存储中检索名为 "my_key" 的变量的值。如果 "my_key" 不存在,则将值 "default_value" 分配给 my_var。
- print(my_var): 打印检索到的变量的值。如果 "my_key" 存在,则打印其值;否则,打印 "default_value"。
Airflow 在机器学习管道中的应用机器学习管道可能包含几个相互关联的阶段,每个阶段都需要仔细的管理和编排。以下是 Airflow 在机器学习管道的每个阶段中的使用方式: 1. 数据收集和摄取。 每个机器学习过程的初始阶段是数据收集。它包括从各种来源检索数据,包括数据库、API 和云存储。 用于数据摄取的 Airflow 操作符 - HTTP 操作符:用于从 Web API 检索数据。
- S3 和 GCS 操作符:用于与 Amazon S3 和 Google Cloud Storage 等云存储提供商进行通信。
- SQL 操作符:用于查询数据库和提取数据。
2. 数据预处理。 在用于训练 ML 模型之前,原始数据通常需要进行清理、处理和规范化。 用于数据预处理的 Airflow 操作符 3. 模型训练 训练机器学习模型可能非常耗时且计算密集。Airflow 可以管理整个训练过程,包括超参数调整和模型评估。 用于模型训练的 Airflow 操作符 4. 模型评估和验证。 训练模型后,必须对其进行测试以确保其满足所需的性能指标。 用于模型评估的 Airflow 操作符 5. 模型部署。 部署模型意味着将其引入生产环境,以便它可以开始对新数据进行预测。 用于模型部署的 Airflow 操作符 6. 监控与重新训练 部署后,必须检查模型以防性能下降,并定期使用新数据进行重新训练。 用于监控和重新训练的 Airflow 操作符 结论Apache Airflow 是一个非常有效的工具,用于组织机器学习操作。它管理复杂关系、随数据和处理需求扩展以及提供全面监控的能力,使其成为管理机器学习管道的绝佳选择。数据科学家和ML 工程师可以利用 Airflow 来专注于构建模型,而不是管理使这些模型变为现实的复杂流程。 - Python 操作符:运行自定义 Python 程序进行数据清理和操作。
- Bash 操作符:用于执行 shell 命令和脚本。
- Docker 操作符:在 Docker 容器内执行预处理活动,以确保一致性和隔离性。
- KubernetesPod 操作符:用于在 Kubernetes 集群上执行训练任务。
- AWS SageMaker 操作符:使用 Amazon SageMaker 训练模型。
- DataProcPySpark 操作符:用于在 Google Cloud Dataproc 上使用 Spark 训练模型。
- Python 操作符:运行自定义评估例程。
- BigQuery 操作符:使用 Google BigQuery 评估模型。
- SQL 传感器:等待存储在数据库中的评估结果。
- SSH 操作符:用于将模型部署到远程服务器。
- Docker 操作符:在 Docker 容器内部署模型。
- KubernetesPod 操作符:用于在 Kubernetes 集群上部署模型。
- HTTP 传感器:用于监控端点和模型性能。
- 时间传感器:用于调度定期重新训练。
- 自定义操作符:用于与 Prometheus 或 Grafana 等监控技术集成。
|