这篇文章已超过一年。较旧的文章可能包含过时内容。请检查页面中的信息自发布以来是否已不再正确。

使用 Spark 和 Zeppelin 在 Kubernetes 1.2 上处理大数据

编者按:这是关于 Kubernetes 1.2 新特性系列深度文章中的第五篇

随着大数据的使用呈指数级增长,许多 Kubernetes 客户表示有兴趣在其 Kubernetes 集群上运行 Apache Spark,以利用容器的可移植性和灵活性。幸运的是,借助 Kubernetes 1.2,您现在可以拥有一个能够同时运行 Spark、Zeppelin 和其他应用的平台。

为什么选择 Zeppelin?

Apache Zeppelin 是一个基于 Web 的笔记本,可用于交互式数据分析。作为其后端之一,Zeppelin 连接到 Spark。Zeppelin 允许用户以简单的方式与 Spark 集群交互,而无需处理命令行解释器或 Scala 编译器。

为什么选择 Kubernetes?

在 Kubernetes 之外运行 Spark 有许多方法

  • 您可以在独立模式下使用专用资源运行它
  • 您可以在与 Hadoop 和 HDFS 共存的 YARN 集群上运行它
  • 您可以在 Mesos 集群上与其他 Mesos 应用一起运行它

那么,为什么要在 Kubernetes 上运行 Spark 呢?

  • 集群的单一、统一接口:Kubernetes 可以管理广泛的工作负载;无需为数据处理处理 YARN/HDFS,也无需为其他应用处理单独的容器编排器。
  • 提高服务器利用率:在 Spark 和云原生应用之间共享节点。例如,您可能正在运行流应用来供给流式 Spark 管线,或者运行一个 nginx Pod 来服务 Web 流量——无需静态地对节点进行分区。
  • 工作负载之间的隔离:Kubernetes 的服务质量 (Quality of Service) 机制允许您安全地将 Spark 等批处理工作负载与延迟敏感型服务器在同一节点上共同调度。

启动 Spark

在此演示中,我们将使用Google Container Engine (GKE),但这应该适用于您安装了 Kubernetes 集群的任何地方。首先,创建一个具有 storage-full 范围的 Container Engine 集群。这些 Google Cloud Platform 范围将允许集群写入私有的 Google Cloud Storage Bucket(稍后我们会解释原因):

$ gcloud container clusters create spark --scopes storage-full
--machine-type n1-standard-4

注意:我们使用 n1-standard-4(大于默认节点大小)来演示水平 Pod 自动扩缩 (Horizontal Pod Autoscaling) 的一些特性。但是,Spark 在默认节点大小 n1-standard-1 上也能正常工作。

集群创建后,您就可以使用 Kubernetes GitHub 仓库中的配置文件在 Kubernetes 上启动 Spark 了

$ git clone https://github.com/kubernetes/kubernetes.git
$ kubectl create -f kubernetes/examples/spark

‘kubernetes/examples/spark’ 是一个目录,因此此命令告诉 kubectl 创建该目录中所有 YAML 文件中定义的 Kubernetes 对象。您不必克隆整个仓库,但这会使本演示的步骤稍微简单一些。

Pod (尤其是 Apache Zeppelin) 有点大,因此 Docker 拉取镜像可能需要一些时间。一切运行正常后,您应该会看到类似以下内容:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-master-controller-v4v4y 1/1 Running 0 21h
spark-worker-controller-7phix 1/1 Running 0 21h
spark-worker-controller-hq9l9 1/1 Running 0 21h
spark-worker-controller-vwei5 1/1 Running 0 21h
zeppelin-controller-t1njl 1/1 Running 0 21h

您可以看到 Kubernetes 正在运行一个 Zeppelin 实例、一个 Spark master 和三个 Spark worker。

设置到 Zeppelin 的安全代理

接下来,您将设置从本地机器到 Zeppelin 的安全代理,以便您可以从您的机器访问在集群中运行的 Zeppelin 实例。(注意:您需要将此命令更改为在您的集群上创建的实际 Zeppelin Pod 的名称。)

$ kubectl port-forward zeppelin-controller-t1njl 8080:8080

这会建立到 Kubernetes 集群和 Pod (zeppelin-controller-t1njl) 的安全连接,然后将相关端口 (8080) 转发到本地端口 8080,这将允许您安全地使用 Zeppelin。

Zeppelin 已经启动并运行了,接下来我该怎么做?

在我们的示例中,我们将向您展示如何构建一个简单的电影推荐模型。这基于Spark 网站上的代码,并稍作修改以使其对 Kubernetes 更具吸引力。

安全代理启动后,访问 http://localhost:8080/。您应该会看到类似这样的介绍页面:

点击“Import note”,给它一个任意名称(例如,“Movies”),然后点击“Add from URL”。URL 输入:

https://gist.githubusercontent.com/zmerlynn/875fed0f587d12b08ec9/raw/6
eac83e99caf712482a4937800b17bbd2e7b33c4/movies.json

然后点击“Import Note”。这将为您提供一个现成的 Zeppelin 笔记本,用于此演示。您现在应该有一个“Movies”笔记本(或您给它起的任何名称)。如果点击该笔记本,您应该会看到类似这样的屏幕:

现在,您可以点击 PySpark 代码块右上角附近的 Play 按钮,您将创建一个新的内存中电影推荐模型!在 Spark 应用模型中,Zeppelin 充当 Spark Driver Program,与 Spark 集群 master 交互以完成工作。在此示例中,在 Zeppelin Pod 中运行的 driver program 获取数据并将其发送给 Spark master,Spark master 再将其分发给 workers,workers 使用 driver 中的代码计算出电影推荐模型。如果使用 Google Cloud Storage (GCS) 中的更大数据集,也可以轻松地从 GCS 中拉取数据。在下一节中,我们将讨论如何将数据保存到 GCS。

使用 Google Cloud Storage (可选)

在此演示中,我们将使用 Google Cloud Storage,它允许我们在单个 Pod 生命周期之外存储模型数据。针对 Kubernetes 的 Spark 内置了 Google Cloud Storage 连接器。只要您可以从运行 Kubernetes 节点的 Google Container Engine 项目中的虚拟机访问您的数据,您就可以使用 Spark 镜像上的 GCS 连接器访问您的数据。

如果您愿意,可以更改笔记本顶部的变量,这样示例就会真正保存和恢复电影推荐引擎的模型——只需将这些变量指向您有权访问的 GCS Bucket 即可。如果要创建 GCS Bucket,可以在命令行执行以下操作:

$ gsutil mb gs://my-spark-models

您需要将此 URI 更改为您独有的名称。这将创建一个您可以在上述示例中使用的 Bucket。

将水平 Pod 自动扩缩 (HPA) 与 Spark 结合使用 (可选)

Spark 对 worker 的进出具有一定的弹性,这意味着我们有机会:我们可以使用 Kubernetes 水平 Pod 自动扩缩 (Horizontal Pod Autoscaling) 自动扩展 Spark worker 池,为 worker 设置目标 CPU 阈值和最小/最大池大小。这避免了手动配置 worker 副本数量的需要。

创建 Autoscaler,如下所示(注意:如果您没有更改集群的机器类型,您可能希望将 --max 限制得小一些):

$ kubectl autoscale --min=1 --cpu-percent=80 --max=10 \
  rc/spark-worker-controller

要查看自动扩缩的完整效果,请等待复制控制器稳定回一个副本。使用 ‘kubectl get rc’ 并等待 spark-worker-controller 上的“replicas”列回落到 1。

我们之前运行的工作负载运行得太快,对于 HPA 来说不太有趣。要更改工作负载,使其运行足够长的时间以观察自动扩缩变为活跃状态,请将代码中的“rank = 100”行更改为“rank = 200”。点击 Play 后,Spark worker 池应会迅速增加到 20 个 Pod。作业完成后, worker 池需要最多 5 分钟才能回落到单个副本。

结论

在本文中,我们向您展示了如何在 Kubernetes 上运行 Spark 和 Zeppelin,以及如何使用 Google Cloud Storage 存储您的 Spark 模型,以及如何使用水平 Pod 自动扩缩 (Horizontal Pod Autoscaling) 动态调整 Spark worker 池的大小。

这是我们将要发表的关于如何在 Kubernetes 上运行大数据框架系列文章中的第一篇——敬请关注!

请加入我们的社区,帮助我们构建 Kubernetes 的未来!有许多参与方式。如果您对 Kubernetes 和大数据特别感兴趣,您会感兴趣的是: