本文发布已超过一年。较旧的文章可能包含过时内容。请检查页面中的信息自发布以来是否已发生变化。
Kubernetes 上的 Airflow(第一部分):一种不同类型的 Operator
介绍
作为彭博社持续致力于发展 Kubernetes 生态系统的一部分,我们很高兴宣布 Kubernetes Airflow Operator;这是一种用于Apache Airflow(一个流行的工作流编排框架)的机制,可以使用 Kubernetes API 本地启动任意的 Kubernetes Pod。
什么是 Airflow?
Apache Airflow 是“配置即代码”DevOps 理念的一种实现。Airflow 允许用户使用简单的 Python 对象 DAG(有向无环图)启动多步管道。您可以在易于阅读的 UI 中定义依赖关系、以编程方式构建复杂工作流并监控计划作业。


为什么要在 Kubernetes 上运行 Airflow?
自诞生以来,Airflow 最大的优势在于其灵活性。Airflow 为从 Spark 和 HBase 到各种云提供商的服务提供了广泛的集成。Airflow 还通过其插件框架提供了轻松的可扩展性。然而,该项目的一个局限性是,Airflow 用户只能使用在执行时存在于 Airflow Worker 上的框架和客户端。一个组织可以有各种 Airflow 工作流,从数据科学管道到应用部署。这种用例上的差异会在依赖管理方面产生问题,因为两组团队可能为他们的工作流使用截然不同的库。
为了解决这个问题,我们利用 Kubernetes 允许用户启动任意的 Kubernetes Pod 和配置。Airflow 用户现在可以完全掌控他们的运行时环境、资源和 Secrets,基本上将 Airflow 变成一个“任意作业皆可”的工作流编排器。
Kubernetes Operator
在我们进一步讨论之前,我们应该澄清,Airflow 中的Operator是一种任务定义。当用户创建 DAG 时,他们会使用诸如“SparkSubmitOperator”或“PythonOperator”之类的 Operator 分别提交/监控 Spark 作业或 Python 函数。Airflow 内置了用于 Apache Spark、BigQuery、Hive 和 EMR 等框架的 Operator。它还提供了一个 Plugins 入口,允许 DevOps 工程师开发自己的连接器。
Airflow 用户一直在寻找简化部署和 ETL 管道管理的方法。任何可以在提高监控能力的同时解耦管道步骤的机会,都可以减少未来的中断和紧急修复。以下是 Airflow Kubernetes Operator 提供的好处列表:
提高了部署的灵活性
Airflow 的插件 API 一直是希望在 DAG 中测试新功能的工程师的一大福音。不利的一面是,每当开发人员想要创建新的 Operator 时,他们都必须开发一个全新的插件。现在,任何可以在 Docker 容器中运行的任务都可以通过完全相同的 Operator 访问,无需维护额外的 Airflow 代码。配置和依赖的灵活性:对于在静态 Airflow Worker 中运行的 Operator,依赖管理会变得相当困难。如果开发人员想要运行一个需要 SciPy 的任务,以及另一个需要 NumPy 的任务,开发人员将不得不在所有 Airflow Worker 中维护这两个依赖项,或将任务分载到外部机器(如果该外部机器以未跟踪的方式更改,可能会导致错误)。自定义 Docker 镜像允许用户确保任务的环境、配置和依赖项完全是幂等的。
使用 kubernetes secrets 增强安全性:处理敏感数据是任何 DevOps 工程师的核心职责。在任何可能的机会下,Airflow 用户都希望严格按需隔离任何 API Key、数据库密码和登录凭据。使用 Kubernetes Operator,用户可以利用 Kubernetes Vault 技术存储所有敏感数据。这意味着 Airflow Worker 将永远无法访问这些信息,并且可以简单地请求构建仅包含所需 Secrets 的 Pod。
架构

Kubernetes Operator 使用 Kubernetes Python 客户端生成一个由 APIServer (1) 处理的请求。然后 Kubernetes 将根据您定义的规格启动您的 Pod (2)。镜像将加载所有必需的环境变量、Secrets 和依赖项,执行一个命令。作业启动后,Operator 只需监控健康状况并跟踪日志 (3)。用户可以选择将日志收集到调度器本地或其 Kubernetes 集群中当前任何分布式日志服务。
使用 Kubernetes Operator
基本示例
以下 DAG 可能是展示 Kubernetes Operator 如何工作的最简单示例。此 DAG 在 Kubernetes 上创建两个 Pod:一个带有 Python 的 Linux 发行版和一个不带 Python 的基础 Ubuntu 发行版。带有 Python 的 Pod 将正确运行 Python 请求,而不带 Python 的 Pod 将向用户报告失败。如果 Operator 工作正常,passing-task
Pod 应该完成,而 failing-task
Pod 则向 Airflow Web 服务器返回失败。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="Python:3.6",
cmds=["Python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:1604",
cmds=["Python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)

但这与我的工作流有什么关系?
虽然此示例仅使用基本镜像,但 Docker 的神奇之处在于,此同一 DAG 可用于您想要的任何镜像/命令组合。以下是在 Airflow DAG 上运行生产就绪代码的推荐 CI/CD 管道。
1:在 Github 中 PR
使用 Travis 或 Jenkins 运行单元和集成测试,贿赂您最喜欢的队友帮您 PR 代码,然后合并到 master 分支以触发自动 CI 构建。
2:通过 Jenkins -> Docker Image 的 CI/CD
在您的 Jenkins 构建中生成 Docker 镜像并提升发布版本.
3:Airflow 启动任务
最后,更新您的 DAG 以反映新的发布版本,您就可以开始了!
production_task = KubernetesPodOperator(namespace='default',
# image="my-production-job:release-1.0.1", <-- old release
image="my-production-job:release-1.0.2",
cmds=["Python","-c"],
arguments=["print('hello world')"],
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
启动测试部署
由于 Kubernetes Operator 尚未发布,我们还没有发布官方的helm chart 或 Operator(不过两者都在进行中)。但是,我们在此提供基本部署的说明,并且正在积极寻找愿意尝试此新功能的勇士测试者。要试用此系统,请按照以下步骤操作:
步骤 1:设置 kubeconfig 指向一个 Kubernetes 集群
步骤 2:克隆 Airflow 仓库
运行 git clone https://github.com/apache/incubator-airflow.git
克隆官方 Airflow 仓库。
步骤 3:运行
要运行此基本部署,我们借用了当前用于 Kubernetes Executor 的集成测试脚本(将在本系列下一篇文章中解释)。要启动此部署,请运行以下三个命令:
sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh
在继续之前,让我们讨论一下这些命令的作用:
sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
Kubernetes Executor 是 Airflow 的另一个功能,它允许将任务作为幂等 Pod 进行动态分配。我们将其切换到 LocalExecutor 的原因仅仅是为了逐步引入一个功能。如果您想尝试 Kubernetes Executor,非常欢迎跳过此步骤,但我们将在以后的文章中详细介绍。
./scripts/ci/kubernetes/Docker/build.sh
此脚本将打包 Airflow master 源代码并构建基于 Airflow 分发的 Docker 容器。
./scripts/ci/kubernetes/kube/deploy.sh
最后,我们在您的集群上创建一个完整的 Airflow 部署。这包括 Airflow 配置、一个 postgres 后端、web 服务器 + 调度器,以及之间所有必要的服务。需要注意的一点是,提供的角色绑定是 cluster-admin,因此如果您在集群上没有该级别的权限,可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改。
步骤 4:登录 Web 服务器
现在您的 Airflow 实例正在运行,让我们看看 UI!UI 位于 Airflow Pod 的 8080 端口,因此只需运行:
WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080
现在 Airflow UI 将位于 http://localhost:8080。只需输入 airflow
/airflow
即可登录,然后您将完全访问 Airflow Web UI。
步骤 5:上传测试文档
要修改/添加您自己的 DAG,您可以使用 kubectl cp
将本地文件上传到 Airflow 调度器的 DAG 文件夹。Airflow 将读取新的 DAG 并自动将其上传到其系统中。以下命令将把任何本地文件上传到正确的目录:
kubectl cp <local file> <namespace>/<pod>:/root/airflow/dags -c scheduler
第 6 步:享受!
那么我什么时候可以使用这个功能呢?
虽然此功能仍处于早期阶段,但我们希望在接下来的几个月内看到它广泛发布。
参与其中
此功能仅仅是改进 Apache Airflow 与 Kubernetes 集成的一系列重大工作的开端。Kubernetes Operator 已合并到 Airflow 的 1.10 发布分支 中(executor 处于实验模式),同时还有一个完全 k8s 原生的 scheduler,称为 Kubernetes Executor(后续将发布文章介绍)。这些功能仍处于早期阶段,早期采用者/贡献者可以在这些功能的未来发展中发挥巨大影响。
对于有兴趣加入这些工作的人,我建议遵循以下步骤
- 加入 airflow-dev 邮件列表:dev@airflow.apache.org。
- 在 Apache Airflow JIRA 中提交问题
- 加入我们每周三太平洋标准时间上午 10 点的 SIG-BigData 会议。
- 在 Slack 上联系我们,频道为 kubernetes.slack.com 上的 #sig-big-data。
特别感谢 Apache Airflow 和 Kubernetes 社区,特别是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感谢你们在这些功能以及我们未来工作上的出色帮助。