本文发布已超过一年。较旧的文章可能包含过时内容。请检查页面中的信息自发布以来是否已发生变化。

Kubernetes 上的 Airflow(第一部分):一种不同类型的 Operator

介绍

作为彭博社持续致力于发展 Kubernetes 生态系统的一部分,我们很高兴宣布 Kubernetes Airflow Operator;这是一种用于Apache Airflow(一个流行的工作流编排框架)的机制,可以使用 Kubernetes API 本地启动任意的 Kubernetes Pod。

什么是 Airflow?

Apache Airflow 是“配置即代码”DevOps 理念的一种实现。Airflow 允许用户使用简单的 Python 对象 DAG(有向无环图)启动多步管道。您可以在易于阅读的 UI 中定义依赖关系、以编程方式构建复杂工作流并监控计划作业。

Airflow DAGs Airflow UI

为什么要在 Kubernetes 上运行 Airflow?

自诞生以来,Airflow 最大的优势在于其灵活性。Airflow 为从 Spark 和 HBase 到各种云提供商的服务提供了广泛的集成。Airflow 还通过其插件框架提供了轻松的可扩展性。然而,该项目的一个局限性是,Airflow 用户只能使用在执行时存在于 Airflow Worker 上的框架和客户端。一个组织可以有各种 Airflow 工作流,从数据科学管道到应用部署。这种用例上的差异会在依赖管理方面产生问题,因为两组团队可能为他们的工作流使用截然不同的库。

为了解决这个问题,我们利用 Kubernetes 允许用户启动任意的 Kubernetes Pod 和配置。Airflow 用户现在可以完全掌控他们的运行时环境、资源和 Secrets,基本上将 Airflow 变成一个“任意作业皆可”的工作流编排器。

Kubernetes Operator

在我们进一步讨论之前,我们应该澄清,Airflow 中的Operator是一种任务定义。当用户创建 DAG 时,他们会使用诸如“SparkSubmitOperator”或“PythonOperator”之类的 Operator 分别提交/监控 Spark 作业或 Python 函数。Airflow 内置了用于 Apache Spark、BigQuery、Hive 和 EMR 等框架的 Operator。它还提供了一个 Plugins 入口,允许 DevOps 工程师开发自己的连接器。

Airflow 用户一直在寻找简化部署和 ETL 管道管理的方法。任何可以在提高监控能力的同时解耦管道步骤的机会,都可以减少未来的中断和紧急修复。以下是 Airflow Kubernetes Operator 提供的好处列表:

  • 提高了部署的灵活性
    Airflow 的插件 API 一直是希望在 DAG 中测试新功能的工程师的一大福音。不利的一面是,每当开发人员想要创建新的 Operator 时,他们都必须开发一个全新的插件。现在,任何可以在 Docker 容器中运行的任务都可以通过完全相同的 Operator 访问,无需维护额外的 Airflow 代码。

  • 配置和依赖的灵活性:对于在静态 Airflow Worker 中运行的 Operator,依赖管理会变得相当困难。如果开发人员想要运行一个需要 SciPy 的任务,以及另一个需要 NumPy 的任务,开发人员将不得不在所有 Airflow Worker 中维护这两个依赖项,或将任务分载到外部机器(如果该外部机器以未跟踪的方式更改,可能会导致错误)。自定义 Docker 镜像允许用户确保任务的环境、配置和依赖项完全是幂等的。

  • 使用 kubernetes secrets 增强安全性:处理敏感数据是任何 DevOps 工程师的核心职责。在任何可能的机会下,Airflow 用户都希望严格按需隔离任何 API Key、数据库密码和登录凭据。使用 Kubernetes Operator,用户可以利用 Kubernetes Vault 技术存储所有敏感数据。这意味着 Airflow Worker 将永远无法访问这些信息,并且可以简单地请求构建仅包含所需 Secrets 的 Pod。

架构

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python 客户端生成一个由 APIServer (1) 处理的请求。然后 Kubernetes 将根据您定义的规格启动您的 Pod (2)。镜像将加载所有必需的环境变量、Secrets 和依赖项,执行一个命令。作业启动后,Operator 只需监控健康状况并跟踪日志 (3)。用户可以选择将日志收集到调度器本地或其 Kubernetes 集群中当前任何分布式日志服务。

使用 Kubernetes Operator

基本示例

以下 DAG 可能是展示 Kubernetes Operator 如何工作的最简单示例。此 DAG 在 Kubernetes 上创建两个 Pod:一个带有 Python 的 Linux 发行版和一个不带 Python 的基础 Ubuntu 发行版。带有 Python 的 Pod 将正确运行 Python 请求,而不带 Python 的 Pod 将向用户报告失败。如果 Operator 工作正常,passing-task Pod 应该完成,而 failing-task Pod 则向 Airflow Web 服务器返回失败。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
Basic DAG Run

但这与我的工作流有什么关系?

虽然此示例仅使用基本镜像,但 Docker 的神奇之处在于,此同一 DAG 可用于您想要的任何镜像/命令组合。以下是在 Airflow DAG 上运行生产就绪代码的推荐 CI/CD 管道。

1:在 Github 中 PR

使用 Travis 或 Jenkins 运行单元和集成测试,贿赂您最喜欢的队友帮您 PR 代码,然后合并到 master 分支以触发自动 CI 构建。

2:通过 Jenkins -> Docker Image 的 CI/CD

在您的 Jenkins 构建中生成 Docker 镜像并提升发布版本.

3:Airflow 启动任务

最后,更新您的 DAG 以反映新的发布版本,您就可以开始了!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

启动测试部署

由于 Kubernetes Operator 尚未发布,我们还没有发布官方的helm chart 或 Operator(不过两者都在进行中)。但是,我们在此提供基本部署的说明,并且正在积极寻找愿意尝试此新功能的勇士测试者。要试用此系统,请按照以下步骤操作:

步骤 1:设置 kubeconfig 指向一个 Kubernetes 集群

步骤 2:克隆 Airflow 仓库

运行 git clone https://github.com/apache/incubator-airflow.git 克隆官方 Airflow 仓库。

步骤 3:运行

要运行此基本部署,我们借用了当前用于 Kubernetes Executor 的集成测试脚本(将在本系列下一篇文章中解释)。要启动此部署,请运行以下三个命令:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在继续之前,让我们讨论一下这些命令的作用:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

Kubernetes Executor 是 Airflow 的另一个功能,它允许将任务作为幂等 Pod 进行动态分配。我们将其切换到 LocalExecutor 的原因仅仅是为了逐步引入一个功能。如果您想尝试 Kubernetes Executor,非常欢迎跳过此步骤,但我们将在以后的文章中详细介绍。

./scripts/ci/kubernetes/Docker/build.sh

此脚本将打包 Airflow master 源代码并构建基于 Airflow 分发的 Docker 容器。

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们在您的集群上创建一个完整的 Airflow 部署。这包括 Airflow 配置、一个 postgres 后端、web 服务器 + 调度器,以及之间所有必要的服务。需要注意的一点是,提供的角色绑定是 cluster-admin,因此如果您在集群上没有该级别的权限,可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改。

步骤 4:登录 Web 服务器

现在您的 Airflow 实例正在运行,让我们看看 UI!UI 位于 Airflow Pod 的 8080 端口,因此只需运行:

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

现在 Airflow UI 将位于 http://localhost:8080。只需输入 airflow/airflow 即可登录,然后您将完全访问 Airflow Web UI。

步骤 5:上传测试文档

要修改/添加您自己的 DAG,您可以使用 kubectl cp 将本地文件上传到 Airflow 调度器的 DAG 文件夹。Airflow 将读取新的 DAG 并自动将其上传到其系统中。以下命令将把任何本地文件上传到正确的目录:

kubectl cp <local file> <namespace>/<pod>:/root/airflow/dags -c scheduler

第 6 步:享受!

那么我什么时候可以使用这个功能呢?

虽然此功能仍处于早期阶段,但我们希望在接下来的几个月内看到它广泛发布。

参与其中

此功能仅仅是改进 Apache Airflow 与 Kubernetes 集成的一系列重大工作的开端。Kubernetes Operator 已合并到 Airflow 的 1.10 发布分支 中(executor 处于实验模式),同时还有一个完全 k8s 原生的 scheduler,称为 Kubernetes Executor(后续将发布文章介绍)。这些功能仍处于早期阶段,早期采用者/贡献者可以在这些功能的未来发展中发挥巨大影响。

对于有兴趣加入这些工作的人,我建议遵循以下步骤

  • 加入 airflow-dev 邮件列表:dev@airflow.apache.org
  • Apache Airflow JIRA 中提交问题
  • 加入我们每周三太平洋标准时间上午 10 点的 SIG-BigData 会议。
  • 在 Slack 上联系我们,频道为 kubernetes.slack.com 上的 #sig-big-data。

特别感谢 Apache Airflow 和 Kubernetes 社区,特别是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感谢你们在这些功能以及我们未来工作上的出色帮助。