本文发表于一年多前。旧文章可能包含过时内容。请检查页面中的信息自发布以来是否已变得不正确。

Kubernetes 上的 Airflow(第 1 部分):一种不同类型的 Operator

引言

作为彭博社持续致力于发展 Kubernetes 生态系统的一部分,我们很高兴地宣布推出 Kubernetes Airflow Operator;这是一种机制,用于让流行的工作流编排框架 Apache Airflow 能够使用 Kubernetes API 原生启动任意 Kubernetes Pod。

什么是 Airflow?

Apache Airflow 是“配置即代码”DevOps 理念的一种实现。Airflow 允许用户使用简单的 Python 对象 DAG(有向无环图)启动多步骤管道。您可以定义依赖项,以编程方式构建复杂的工作流,并在易于阅读的 UI 中监控计划作业。

Airflow DAGs Airflow UI

为什么选择 Kubernetes 上的 Airflow?

自诞生以来,Airflow 最大的优势一直是其灵活性。Airflow 为从 Spark 和 HBase 到各种云提供商服务等一系列服务提供了广泛的集成。Airflow 还通过其插件框架提供了简单的可扩展性。然而,该项目的一个限制是 Airflow 用户在执行时只能使用 Airflow worker 上存在的框架和客户端。一个组织可以拥有各种 Airflow 工作流,从数据科学管道到应用程序部署。这种用例差异会在依赖管理中产生问题,因为两个团队可能为其工作流使用截然不同的库。

为了解决这个问题,我们利用 Kubernetes 允许用户启动任意 Kubernetes pod 和配置。Airflow 用户现在可以完全控制他们的运行时环境、资源和秘密,基本上将 Airflow 变成了一个“任何你想要的任务”工作流编排器。

Kubernetes Operator

在我们进一步讨论之前,我们应该澄清一下,在 Airflow 中,Operator 是一个任务定义。当用户创建 DAG 时,他们会使用像“SparkSubmitOperator”或“PythonOperator”这样的 Operator 来分别提交/监控 Spark 任务或 Python 函数。Airflow 内置了 Apache Spark、BigQuery、Hive 和 EMR 等框架的 Operator。它还提供了插件入口点,允许 DevOps 工程师开发自己的连接器。

Airflow 用户一直在寻找简化部署和 ETL 管道管理的方法。任何解耦管道步骤同时增加监控的机会都可以减少未来的中断和紧急修复。以下是 Airflow Kubernetes Operator 提供的优势列表:

  • 提高部署灵活性
    Airflow 的插件 API 一直为希望在其 DAG 中测试新功能的工程师提供了巨大的便利。缺点是,每当开发人员想要创建一个新的 Operator 时,他们都必须开发一个全新的插件。现在,任何可以在 Docker 容器中运行的任务都可以通过完全相同的 Operator 访问,无需维护额外的 Airflow 代码。

  • 配置和依赖的灵活性: 对于在静态 Airflow worker 中运行的 Operator,依赖管理可能会变得相当困难。如果开发人员想要运行一个需要 SciPy 的任务和另一个需要 NumPy 的任务,开发人员将不得不要么在所有 Airflow worker 中维护这两个依赖项,要么将任务卸载到外部机器(如果该外部机器以未跟踪的方式更改,则可能导致错误)。自定义 Docker 镜像允许用户确保任务环境、配置和依赖项完全幂等。

  • 使用 Kubernetes 秘密以增加安全性: 处理敏感数据是任何 DevOps 工程师的核心职责。Airflow 用户希望在任何时候都严格按照需要知道的原则隔离任何 API 密钥、数据库密码和登录凭据。通过 Kubernetes Operator,用户可以利用 Kubernetes Vault 技术存储所有敏感数据。这意味着 Airflow worker 将永远无法访问这些信息,并且可以简单地请求仅使用其所需的秘密构建 pod。

架构

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python 客户端生成一个由 APIServer 处理的请求(1)。然后 Kubernetes 将使用您定义的任何规范启动您的 pod(2)。镜像将加载所有必要的环境变量、秘密和依赖项,执行单个命令。一旦任务启动,Operator 只需要监控跟踪日志的健康状况(3)。用户可以选择将日志收集到本地调度程序或其 Kubernetes 集群中当前存在的任何分布式日志服务。

使用 Kubernetes Operator

一个基本示例

以下 DAG 可能是我们能够编写的最简单的示例,用于展示 Kubernetes Operator 的工作原理。此 DAG 在 Kubernetes 上创建了两个 Pod:一个带有 Python 的 Linux 发行版和一个没有 Python 的基础 Ubuntu 发行版。Python Pod 将正确运行 Python 请求,而没有 Python 的 Pod 将向用户报告失败。如果 Operator 正常工作,`passing-task` Pod 应该完成,而 `failing-task` Pod 将向 Airflow webserver 返回失败。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
Basic DAG Run

但这与我的工作流程有什么关系呢?

虽然这个示例只使用了基本的镜像,但 Docker 的魔力在于这个相同的 DAG 可以用于任何你想要的镜像/命令组合。以下是运行生产就绪代码在 Airflow DAG 上的推荐 CI/CD 管道。

1:GitHub 中的 PR

使用 Travis 或 Jenkins 运行单元和集成测试,贿赂你最喜欢的队友对你的代码进行 PR,然后合并到主分支以触发自动化 CI 构建。

2:通过 Jenkins 进行 CI/CD -> Docker 镜像

在 Jenkins 构建中生成 Docker 镜像并提升发布版本.

3:Airflow 启动任务

最后,更新你的 DAG 以反映新的发布版本,你就可以开始了!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

启动测试部署

由于 Kubernetes Operator 尚未发布,我们还没有发布官方的 helm chart 或 Operator(不过两者都在进行中)。但是,我们在下面提供了基本部署的说明,并且正在积极寻找敢于冒险的 Beta 测试人员来尝试这个新功能。要试用此系统,请按照以下步骤操作:

步骤 1:将 kubeconfig 指向 Kubernetes 集群

步骤 2:克隆 Airflow 仓库

运行 `git clone https://github.com/apache/incubator-airflow.git` 克隆官方 Airflow 仓库。

步骤 3:运行

为了运行这个基本部署,我们借用了目前用于 Kubernetes Executor 的集成测试脚本(将在本系列下一篇文章中解释)。要启动此部署,请运行以下三个命令:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在继续之前,让我们讨论一下这些命令正在做什么:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

Kubernetes Executor 是 Airflow 的另一个功能,它允许将任务动态分配为幂等 Pod。我们将其切换到 LocalExecutor 的原因仅仅是为了逐个介绍功能。如果您想尝试 Kubernetes Executor,非常欢迎您跳过此步骤,但我们将在未来的文章中详细介绍。

./scripts/ci/kubernetes/Docker/build.sh

此脚本将打包 Airflow 主源代码并基于 Airflow 分发版构建 Docker 容器

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们在您的集群上创建一个完整的 Airflow 部署。这包括 Airflow 配置、Postgres 后端、Webserver + 调度程序以及所有必要的服务。需要注意的是,提供的角色绑定是集群管理员,因此如果您在集群上没有该级别的权限,可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改。

步骤 4:登录您的 Web 服务器

现在您的 Airflow 实例已运行,让我们看看 UI!UI 位于 Airflow Pod 的 8080 端口,只需运行:

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

现在 Airflow UI 将存在于 https://:8080。要登录,只需输入 `airflow` / `airflow`,您就应该拥有对 Airflow Web UI 的完全访问权限。

步骤 5:上传测试文档

要修改/添加您自己的 DAG,您可以使用 `kubectl cp` 将本地文件上传到 Airflow 调度程序的 DAG 文件夹中。Airflow 将读取新的 DAG 并自动将其上传到其系统。以下命令将把任何本地文件上传到正确的目录:

kubectl cp <本地文件> <命名空间>/<pod>:/root/airflow/dags -c scheduler

步骤 6:享受!

那么什么时候我才能使用这个功能呢?

虽然此功能仍处于早期阶段,但我们希望它能在未来几个月内广泛发布。

参与其中

此功能只是改进 Apache Airflow 与 Kubernetes 集成多项重大努力的开始。Kubernetes Operator 已合并到 Airflow 的 1.10 发布分支(执行器处于实验模式),以及一个完全 Kubernetes 原生调度器,称为 Kubernetes Executor(文章即将发布)。这些功能仍处于早期采用者/贡献者可以对这些功能的未来产生巨大影响的阶段。

对于那些有兴趣加入这些工作的人,我建议查看以下步骤:

  • 加入 airflow-dev 邮件列表:dev@airflow.apache.org
  • Apache Airflow JIRA 中提交问题
  • 每周三太平洋时间上午 10 点参加我们的 SIG-BigData 会议。
  • 在 kubernetes.slack.com 的 #sig-big-data 上联系我们

特别感谢 Apache Airflow 和 Kubernetes 社区,特别是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感谢你们在这些功能以及我们未来的努力中提供的出色帮助。