本文超过一年。较旧的文章可能包含过时的内容。请检查页面中的信息自发布以来是否已变得不正确。

使用 Spark 和 Zeppelin 在 Kubernetes 1.2 上处理大数据

编者按:这是关于 Kubernetes 1.2 新功能的系列深度文章中的第五篇。

随着大数据使用呈指数级增长,许多 Kubernetes 客户表示有兴趣在其 Kubernetes 集群上运行 Apache Spark,以利用容器的可移植性和灵活性。幸运的是,借助 Kubernetes 1.2,您现在可以拥有一个同时运行 Spark、Zeppelin 和其他应用程序的平台。

为什么选择 Zeppelin?

Apache Zeppelin 是一款基于 Web 的笔记本,可进行交互式数据分析。作为其后端之一,Zeppelin 连接到 Spark。Zeppelin 允许用户以简单的方式与 Spark 集群交互,而无需处理命令行解释器或 Scala 编译器。

为什么选择 Kubernetes?

在 Kubernetes 之外运行 Spark 的方法有很多

  • 您可以使用专用资源以独立模式运行它
  • 您可以在 YARN 集群上运行它,与 Hadoop 和 HDFS 共存
  • 您可以在 Mesos 集群上与其他 Mesos 应用程序一起运行它

那么,为什么要在 Kubernetes 上运行 Spark 呢?

  • 集群的单一统一界面:Kubernetes 可以管理各种工作负载;无需为数据处理使用 YARN/HDFS,也无需为其他应用程序使用单独的容器编排器。
  • 提高服务器利用率:在 Spark 和云原生应用程序之间共享节点。例如,您可能有一个正在运行的流式应用程序来馈送流式 Spark 管道,或者一个 nginx pod 来服务 Web 流量——无需静态分区节点。
  • 工作负载之间的隔离:Kubernetes 的服务质量机制允许您安全地在与延迟敏感型服务器相同的节点上共同安排 Spark 等批处理工作负载。

启动 Spark

在本演示中,我们将使用 Google 容器引擎 (GKE),但这应该适用于您安装 Kubernetes 集群的任何地方。首先,创建一个具有 storage-full 作用域的容器引擎集群。这些 Google Cloud Platform 作用域将允许集群写入私有 Google Cloud Storage 存储桶(稍后我们将解释为什么需要它):

$ gcloud container clusters create spark --scopes storage-full
--machine-type n1-standard-4

注意:我们使用的是 n1-standard-4(比默认节点大小更大)来演示水平 Pod 自动缩放的一些功能。但是,Spark 在默认节点大小 n1-standard-1 上也能正常工作。

创建集群后,您就可以使用 Kubernetes GitHub 存储库中的配置文件在 Kubernetes 上启动 Spark

$ git clone https://github.com/kubernetes/kubernetes.git
$ kubectl create -f kubernetes/examples/spark

“kubernetes/examples/spark”是一个目录,因此此命令告诉 kubectl 创建该目录中所有 YAML 文件中定义的所有 Kubernetes 对象。您不必克隆整个存储库,但这会使本演示的步骤更容易一些。

这些 pod(尤其是 Apache Zeppelin) agak besar,因此 Docker 可能需要一些时间才能提取镜像。一切运行后,您应该会看到类似于以下内容

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-master-controller-v4v4y 1/1 Running 0 21h
spark-worker-controller-7phix 1/1 Running 0 21h
spark-worker-controller-hq9l9 1/1 Running 0 21h
spark-worker-controller-vwei5 1/1 Running 0 21h
zeppelin-controller-t1njl 1/1 Running 0 21h

您可以看到 Kubernetes 正在运行一个 Zeppelin 实例、一个 Spark 主节点和三个 Spark 工作节点。

设置到 Zeppelin 的安全代理

接下来,您将设置从本地机器到 Zeppelin 的安全代理,以便您可以从您的机器访问集群中运行的 Zeppelin 实例。(注意:您需要将此命令更改为您集群上创建的实际 Zeppelin pod。)

$ kubectl port-forward zeppelin-controller-t1njl 8080:8080

这将建立到 Kubernetes 集群和 pod (zeppelin-controller-t1njl) 的安全链接,然后将相关端口 (8080) 转发到本地端口 8080,这将允许您安全地使用 Zeppelin。

现在我已经启动并运行了 Zeppelin,我该怎么办?

在我们的示例中,我们将向您展示如何构建一个简单的电影推荐模型。这是基于Spark 网站上的代码,并稍作修改以使其对 Kubernetes 更感兴趣。

现在安全代理已启动,请访问 https://127.0.0.1:8080/。您应该会看到一个类似这样的介绍页面

点击“导入笔记”,给它一个任意名称(例如“电影”),然后点击“从 URL 添加”。对于 URL,输入

https://gist.githubusercontent.com/zmerlynn/875fed0f587d12b08ec9/raw/6
eac83e99caf712482a4937800b17bbd2e7b33c4/movies.json

然后点击“导入笔记”。这将为您提供一个现成的 Zeppelin 笔记,用于本演示。您现在应该有一个“电影”笔记本(或您给它命名的任何名称)。如果您点击该笔记,您应该会看到一个类似于此的屏幕

您现在可以点击 PySpark 代码块右上角附近的“播放”按钮,您将创建一个新的内存电影推荐模型!在 Spark 应用程序模型中,Zeppelin 充当Spark 驱动程序,与 Spark 集群主节点交互以完成其工作。在本例中,在 Zeppelin pod 中运行的驱动程序获取数据并将其发送到 Spark 主节点,Spark 主节点将其分发给工作节点,工作节点使用驱动程序中的代码生成电影推荐模型。使用 Google Cloud Storage (GCS) 中更大的数据集,也可以轻松地从 GCS 中提取数据。在下一节中,我们将讨论如何将数据保存到 GCS。

使用 Google Cloud Storage(可选)

在本演示中,我们将使用 Google Cloud Storage,这将使我们能够将模型数据存储在单个 pod 的生命周期之外。Kubernetes 的 Spark 内置了 Google Cloud Storage 连接器。只要您可以从 Kubernetes 节点所在的 Google 容器引擎项目中的虚拟机访问您的数据,您就可以使用 Spark 镜像上的 GCS 连接器访问您的数据。

如果需要,您可以更改笔记顶部的变量,以便该示例实际保存和恢复电影推荐引擎的模型 - 只需将这些变量指向您可以访问的 GCS 存储桶即可。如果要创建 GCS 存储桶,可以在命令行上执行以下操作

$ gsutil mb gs://my-spark-models

您需要将此 URI 更改为您唯一的 URI。这将创建一个您可以在上面示例中使用的存储桶。

将 Horizontal Pod Autoscaling 与 Spark 结合使用(可选)

Spark 对 worker 的来去具有一定的弹性,这意味着我们有一个机会:我们可以使用Kubernetes Horizontal Pod Autoscaling来自动扩展 Spark worker 池,为 worker 设置目标 CPU 阈值和最小/最大池大小。这避免了手动配置 worker 副本数量的需要。

像这样创建自动缩放器(注意:如果您没有更改集群的机器类型,您可能希望将 --max 限制为更小的值):

$ kubectl autoscale --min=1 --cpu-percent=80 --max=10 \
  rc/spark-worker-controller

要查看自动缩放的完整效果,请等待复制控制器稳定到一个副本。使用 ‘kubectl get rc’ 并等待 spark-worker-controller 上的“副本”列回落到 1。

我们之前运行的工作负载运行速度太快,对于 HPA 来说没有什么意义。要更改工作负载以使其实际运行足够长的时间以看到自动缩放生效,请将代码中的“rank = 100”行更改为“rank = 200”。点击播放后,Spark worker 池应迅速增加到 20 个 pod。作业完成后,最多需要 5 分钟,worker 池才会回落到一个副本。

结论

在本文中,我们向您展示了如何在 Kubernetes 上运行 Spark 和 Zeppelin,以及如何使用 Google Cloud Storage 存储 Spark 模型以及如何使用 Horizontal Pod Autoscaling 动态调整 Spark worker 池的大小。

这是我们将发布的一系列关于如何在 Kubernetes 上运行大数据框架的文章中的第一篇——敬请期待!

请加入我们的社区,帮助我们构建 Kubernetes 的未来!参与的方式有很多。如果您对 Kubernetes 和大数据特别感兴趣,您会对以下内容感兴趣