Apache Airflow Providers Druid

9 Jun 2025 | 9分钟阅读

引言

Apache Airflow 是一个用于以编程方式创建、调度和监控工作流的开源平台。它因其灵活性和可扩展性而在数据工程师和分析师中广受欢迎。凭借丰富的提供程序生态系统,Airflow 能够与各种外部系统无缝集成,让用户能够自动化复杂的数据管道。

另一方面,Druid 是一个高性能的实时分析数据库,专为处理大数据集的快速聚合和低延迟查询而设计。Druid 被广泛用于支持实时仪表板和分析应用程序,在需要对事件驱动数据进行高查询性能的场景中表现出色。

Druid 提供程序的特点

Druid Hook

Druid Hook 提供了一个基于 Python 的接口,可以与 Druid 端点无缝交互。它充当 Airflow 和 Druid 之间的桥梁,使开发人员能够以编程方式执行任务,例如提交摄取规范或运行 SQL 查询。

主要功能包括

  • 连接到 Druid 的 Overlord、Router 或 Coordinator API。
  • 管理摄取作业,包括提交、监控和停止任务。
  • 支持高级用例,如动态查询生成。

Druid Operators

该提供程序包含预定义的运算符,用于执行 Druid 特定的操作,从而在无需自定义代码的情况下更轻松地自动化复杂工作流。这些运算符是为常见的 Druid 任务量身定制的,例如:

  • 数据摄取:自动化将摄取规范提交到 Druid,用于批量或流式数据加载。
  • 查询执行:运行 SQL 或原生 Druid 查询,并捕获结果供工作流进一步处理。
  • 任务监控:管理和观察摄取作业的状态,以确保成功完成。

通过使用这些运算符,用户可以标准化 Druid 相关的工作流,同时减少开发工作量。

连接管理

Druid 提供程序通过利用 Airflow 内置的连接框架,简化了连接设置和管理。用户可以:

  • 通过 Airflow 的 UI 或 airflow.cfg 文件配置 Druid 连接。
  • 安全地存储 API 密钥、URL 和凭证等敏感连接详细信息。
  • 直接从 UI 测试连接以验证设置。

这种简化的方法可以最大限度地减少错误,并确保到 Druid 集群的安全、一致的连接。

任务日志记录

该提供程序增强了 Druid 相关工作流的任务日志记录功能,提供了对任务执行的详细可见性。好处包括:

  • 摄取任务提交、查询执行结果和 API 交互的日志。
  • 失败任务的错误报告,以简化调试。
  • Druid 任务生命周期中每个步骤的详细信息,可实现有效的监控和故障排除。

有效的日志记录可提高运营可靠性,并帮助团队快速解决生产管道中的问题。

版本兼容性

Druid 提供程序会定期更新,以保持与最新版 Apache Airflow 和 Apache Druid 的兼容性。这些更新可确保:

  • 访问 Airflow 或 Druid 中引入的新功能。
  • 修复不同版本之间的错误和兼容性问题。
  • 通过解决过时组件中的漏洞来提高安全性。

在 Airflow 中设置 Druid 提供程序

安装

要使用 Druid 提供程序,您需要使用 pip 进行安装。

该软件包可在 PyPI 上找到,并可按如下方式安装:

确保您的 Airflow 环境满足 Druid 提供程序的所需版本先决条件。

配置连接

要与 Druid 集群交互,您需要在 Airflow 中配置一个连接。以下是设置方法:

  • 访问 Airflow UI:登录 Airflow Web 界面。
  • 导航到“连接”:转到“管理”菜单,然后单击“连接”。
  • 添加新连接:单击“+”按钮创建新连接。

填写连接详细信息

  • 连接 ID:唯一的标识符,例如 druid_default。
  • 连接类型:选择 Druid。
  • 主机:您的 Druid 集群的主机名或 IP 地址。
  • 端口: Druid 服务的端口号(默认是 Router 或 Broker 服务的 8082)。
  • 附加信息:以 JSON 格式的其他参数,例如身份验证详细信息。

验证安装

配置完连接后,通过运行一个与 Druid 交互的简单 DAG(有向无环图)来测试集成。在继续进行更复杂的工作流之前,请确保连接功能正常。

Druid 提供程序的核心组件

Druid Hook:Druid Hook (DruidHook) 是与 Druid 交互的核心接口。它提供了提交任务、查询数据和监控摄取作业的方法。

  • submit_indexing_job:将摄取任务提交到 Druid。
  • check_ingestion_status:监控摄取作业的状态。
  • run_query:执行 Druid 查询并检索结果。

示例

DruidHook 对于将 Airflow 与 Druid 连接至关重要。通过使用 hook,用户可以以编程方式运行查询、摄取数据并跟踪作业状态。以下是分步演示:

步骤 1:导入 DruidHook 类

首先,从 airflow.providers.druid.hooks.druid 模块导入 DruidHook 类:

步骤 2:建立连接

创建 DruidHook 的实例,指定在 Airflow 环境中配置的连接 ID:

步骤 3:定义查询

以 JSON 格式创建 Druid 查询。查询类型、数据源、时间间隔、粒度、维度和聚合都应与您的用例保持一致:

步骤 4:执行查询

使用 DruidHook 的 run_query 方法执行查询并捕获结果:

此代码向 Druid 提交一个 groupBy 查询,请求指定时间间隔内按国家分组的每日用户计数。请用与您的用例相关的那个替换数据源和查询详细信息。

Druid Operators

DruidOperator 用于在 Druid 中提交和监控摄取作业。

参数

  • task_id: Airflow 任务的唯一 ID。
  • druid_ingest_conn_id: Druid 的连接 ID。
  • json_index_file:包含摄取规范的 JSON 文件的路径。

示例

以下 DAG 演示了如何使用 DruidOperator 提交摄取作业:

此 DAG 定义了一个将摄取作业提交到 Druid 的单个任务。json_index_file 参数指定了摄取规范,该规范描述了数据应如何摄取到 Druid 中。确保 JSON 规范文件格式正确且 Airflow 环境可访问。

Druid CheckOperator

DruidCheckOperator 旨在通过运行查询并根据预期值验证结果来验证 Druid 中的数据。这对于在工作流中实现数据质量检查特别有用。

参数

  • task_id: Airflow 任务的唯一 ID。
  • druid_broker_conn_id: Druid Broker 的连接 ID。
  • SQL:在 Druid 集群上执行的查询。
  • expected_result:查询的预期结果。

示例

以下是如何使用 DruidCheckOperator 来验证 Druid 中的数据:

data_quality_check

在此示例中,DruidCheckOperator 执行一个查询,以计数 my_datasource 数据源中 country 为“US”的记录数。它将结果与预期值(100)进行比较。如果结果不匹配,任务将失败,这表示潜在的数据质量问题。

Druid 连接设置

在 Airflow UI 中或通过环境变量配置的连接可简化对 Druid 集群的访问。这些连接集中了身份验证和连接详细信息,减少了任务之间的冗余。通过正确设置的连接,您可以增强工作流的可维护性、安全性和可重用性。

Druid 连接的组件

  • 连接 ID
    • 连接的唯一标识符,例如 druid_default。
  • 连接类型
    • 将 Druid 指定为连接类型,以确保与 Druid 提供程序的兼容性。
  • 主机和端口
    • 配置 Druid Router 或 Broker 服务的​​主机名和端口。例如,Druid 的默认端口是 8082。
  • 认证
    • 如果您的 Druid 集群需要身份验证,请在连接设置中或通过 JSON 格式的“附加信息”字段包含必要的凭证。
  • 附加参数
    • 此字段允许您添加其他设置,例如自定义标头、身份验证机制或 SSL 配置。

连接配置

以下是使用 Airflow UI 的 Druid 连接配置示例:

连接 ID:druid_default

连接类型:Druid

主机:druid-cluster.company.com

端口 8082

额外

通过环境变量设置连接

除了 Airflow UI,您还可以通过环境变量定义连接,以实现自动化部署或 CI/CD 流水线。

示例环境变量配置

通过将连接定义为环境变量,您可以无需通过 UI 进行手动配置,这对于容器化部署非常有利。

连接设置的最佳实践

  • 使用安全连接
    始终启用 SSL/TLS 以确保 Airflow 和 Druid 之间的安全通信。安全连接可防止未经授权的访问,并保护传输中的敏感数据。验证所有端点是否配置为支持加密,并强制使用 HTTPS 进行数据传输。
  • 集中化身份验证
    使用环境变量或密钥管理工具(如 HashiCorp Vault、AWS Secrets Manager 或 Azure Key Vault)安全地存储凭证。这可确保 Druid API 密钥或数据库密码等敏感信息不会硬编码在 DAG 或配置文件中。
  • 测试连接
    在将连接集成到工作流之前,通过向 Druid 集群运行简单的测试查询来验证您的设置。此步骤有助于及早发现潜在问题,例如配置错误的端点或错误的凭证。
  • 记录连接详细信息
    维护连接配置的清晰、全面的文档,包括身份验证机制、URL、端口和任何自定义设置。正确的文档简化了故障排除,并加快了新团队成员的入职流程。
  • 优化摄取规范
    定制 Druid 摄取规范以满足数据的特定需求。这包括定义最佳段粒度、分区策略以及调整摄取参数以提高性能并减少存储开销。
  • 监控资源使用情况
    定期监控摄取和查询任务期间 Druid 集群的资源利用率。诸如 Druid 的内置监控仪表板或第三方可观测性平台之类的工具可以帮助跟踪 CPU、内存和磁盘使用情况,确保集群高效运行。
  • 利用模块化 DAG
    设计用于处理 Druid 任务的模块化且可重用的有向无环图 (DAG)。模块化 DAG 可提高可维护性,并使扩展工作流或适应数据管道中的更改更加容易。
  • 安全连接(重申)
    强调配置安全连接的重要性,使用 SSL/TLS 和适当的身份验证机制(如 Kerberos 或 OAuth),以防止数据泄露和未经授权的访问。
  • 定期更新提供程序
    保持 Airflow Druid 提供程序和其他相关依赖项的最新。定期更新可确保您受益于最新功能、安全补丁和性能改进,这对于稳定且安全的集成至关重要。

Druid 提供程序的使用场景

  • 实时数据摄取:Airflow 工作流可以触发 Druid 摄取任务来处理实时流数据。这对于近乎实时地更新仪表板或分析应用程序特别有用。
  • 批量处理:计划和监控大型数据集的批量摄取作业。这非常适合夜间或每小时的数据管道执行。
  • 自动化数据质量检查:使用 DruidCheckOperator,您可以自动化数据验证任务,确保 Druid 集群中的数据准确性和完整性。
  • 查询自动化:利用 Druid Hook 执行和自动化重复的分析查询。结果可以为下游工作流提供支持,或用于报告目的。
  • 监控和警报:将 Druid 摄取状态检查与 Airflow 的警报机制集成,以通知团队有关摄取失败或延迟的情况。

疑难解答常见问题

  • 连接错误
    • 在 Airflow 中验证连接设置。
    • 确保 Airflow 环境可以访问 Druid 集群。
  • 任务失败
    • 检查任务日志以获取详细的错误消息。
    • 验证摄取规范是否存在语法和配置问题。
  • 查询性能
    • 优化查询粒度和过滤器。
    • 使用适当的聚合来降低查询复杂性。
  • 提供程序兼容性
    • 确认已安装的提供程序版本与您的 Airflow 和 Druid 版本兼容。

apache-airflow-providers-druid 包弥合了 Apache Airflow 和 Apache Druid 之间的差距,实现了数据工作流的无缝自动化。凭借其强大的 hooks、operators 和连接管理功能,此提供程序使用户能够高效地集成、管理和监控数据管道。

  • 通过利用 Airflow 和 Druid 的功能,组织可以构建可扩展、可靠且高性能的数据架构,从而实时推动洞察和决策。
  • 无论您处理的是流数据、批量处理还是复杂查询,Druid 提供程序都能确保您的工作流既流畅又有效。
  • Apache Airflow 和 Druid 之间的集成通过 apache-airflow-providers-druid 包实现。
  • 此提供程序允许用户轻松地将 Airflow 工作流连接到 Druid,从而能够在 Druid 生态系统中高效地摄取、查询和管理数据。

apache-airflow-providers-druid 包是 Airflow Providers 生态系统的一部分。Airflow 中的 Providers 是包含各种外部系统、工具和数据库集成模块。Druid 提供程序为 Airflow 用户提供了以编程方式与 Druid 集群交互所需的工具。