使用工作队列进行粗粒度并行处理
在此示例中,你将运行一个具有多个并行工作进程的 Kubernetes Job。
在此示例中,当每个 Pod 被创建时,它会从任务队列中获取一个工作单元,完成它,将其从队列中删除,然后退出。
以下是此示例中的步骤概述
- 启动消息队列服务。 在此示例中,你使用 RabbitMQ,但你也可以使用其他服务。在实践中,你将设置一次消息队列服务,并将其重用于许多 Job。
- 创建一个队列,并用消息填充它。 每条消息代表一项要完成的任务。在此示例中,消息是一个整数,我们将对其进行耗时的计算。
- 启动一个处理队列中任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,处理它,然后退出。
开始之前
你应该已经熟悉 Job 的基本、非并行用法。
你需要有一个 Kubernetes 集群,并且必须配置 kubectl 命令行工具以与你的集群通信。建议在至少有两个不充当控制平面主机的节点的集群上运行本教程。如果你还没有集群,可以使用 minikube 创建一个集群,或者你可以使用以下 Kubernetes 游乐场之一
你将需要一个容器镜像注册表,你可以在其中上传镜像以在你的集群中运行。
此任务示例还假设你在本地安装了 Docker。
启动消息队列服务
此示例使用 RabbitMQ,但是你可以调整该示例以使用其他 AMQP 类型的消息服务。
在实践中,你可以在集群中设置一次消息队列服务,并将其重用于许多 Job 以及长时间运行的服务。
按如下方式启动 RabbitMQ
# make a Service for the StatefulSet to use
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created
测试消息队列服务
现在,我们可以尝试访问消息队列。我们将创建一个临时的交互式 Pod,在其上安装一些工具,并尝试队列。
首先创建一个临时的交互式 Pod。
# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
请注意,你的 Pod 名称和命令提示符将有所不同。
接下来安装 amqp-tools
,以便你可以使用消息队列。接下来的命令显示你需要在该 Pod 中的交互式 shell 中运行的内容
apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils
稍后,你将创建一个包含这些软件包的容器镜像。
接下来,你将检查是否可以发现 RabbitMQ 的服务
# Run these commands inside the Pod
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
(IP 地址会有所不同)
如果 kube-dns 插件未正确设置,则上一步可能对你无效。你还可以在环境变量中找到该服务的 IP 地址
# run this check inside the Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
(IP 地址会有所不同)
接下来,你将验证是否可以创建队列,以及发布和使用消息。
# Run these commands inside the Pod
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached. 5672 is the standard port for rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# Now create a queue:
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
将一条消息发布到队列
/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# And get it back.
/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello
在最后一个命令中,amqp-consume
工具从队列中获取一条消息 (-c 1
),并将该消息传递给任意命令的标准输入。在这种情况下,程序 cat
打印从标准输入读取的字符,并且 echo 添加回车符,以便示例可读。
用任务填充队列
现在,用一些模拟的任务填充队列。在此示例中,任务是要打印的字符串。
在实践中,消息的内容可能是
- 需要处理的文件的名称
- 程序的额外标志
- 数据库表中的键范围
- 模拟的配置参数
- 要渲染的场景的帧数
如果 Job 的所有 Pod 以只读模式都需要大量数据,你通常将其放在像 NFS 这样的共享文件系统中,并在所有 Pod 上以只读方式挂载,或者在 Pod 中编写程序,以便它可以从集群文件系统(例如:HDFS)中原生读取数据。
对于此示例,你将使用 AMQP 命令行工具创建队列并填充它。在实践中,你可以编写一个程序以使用 AMQP 客户端库填充队列。
# Run this on your computer, not in the Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
将项添加到队列
for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
你向队列添加了 8 条消息。
创建容器镜像
现在你已经准备好创建一个将作为 Job 运行的镜像。
该 Job 将使用 amqp-consume
实用程序从队列中读取消息并运行实际工作。这是一个非常简单的示例程序
#!/usr/bin/env python
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)
授予脚本执行权限
chmod +x worker.py
现在,构建一个镜像。创建一个临时目录,更改到该目录,下载 Dockerfile 和 worker.py。在这两种情况下,都使用以下命令构建镜像
docker build -t job-wq-1 .
对于 Docker Hub,使用你的用户名标记你的应用程序镜像,并使用以下命令将其推送到 Hub。将 <username>
替换为你的 Hub 用户名。
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
如果你使用的是其他容器镜像注册表,请标记该镜像并将其推送到那里。
定义 Job
这是一个 Job 的清单。你需要复制 Job 清单(将其命名为 ./job.yaml
),并编辑容器镜像的名称以匹配你使用的名称。
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-1
spec:
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
env:
- name: BROKER_URL
value: amqp://guest:guest@rabbitmq-service:5672
- name: QUEUE
value: job1
restartPolicy: OnFailure
在此示例中,每个 Pod 处理队列中的一项,然后退出。因此,Job 的完成计数对应于完成的工作项数。这就是为什么示例清单将 .spec.completions
设置为 8
的原因。
运行 Job
现在,运行 Job
# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml
你可以等待 Job 成功,并设置超时时间
# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1
接下来,检查 Job
kubectl describe jobs/job-wq-1
Name: job-wq-1
Namespace: default
Selector: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Annotations: <none>
Parallelism: 2
Completions: 8
Start Time: Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Containers:
c:
Image: container-registry.example/causal-jigsaw-637/job-wq-1
Port:
Environment:
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
QUEUE: job1
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
───────── ──────── ───── ──── ───────────── ────── ────── ───────
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-p17e0
该 Job 的所有 Pod 都成功了!你已经完成了。
替代方案
这种方法的优点是,你无需修改你的“工作”程序就可以知道存在工作队列。你可以将工作程序原封不动地包含在你的容器镜像中。
使用此方法确实需要你运行消息队列服务。如果运行队列服务不方便,你可能需要考虑其他 Job 模式之一。
这种方法为每个工作项创建一个 Pod。但是,如果你的工作项仅需几秒钟,则为每个工作项创建一个 Pod 可能会增加很多开销。请考虑其他设计,例如在 细粒度并行工作队列示例中,每个 Pod 执行多个工作项。
在此示例中,你使用 amqp-consume
实用程序从队列中读取消息并运行实际程序。这种方法的优点是,你无需修改你的程序就可以知道队列的存在。细粒度并行工作队列示例展示了如何使用客户端库与工作队列通信。
注意事项
如果完成数设置为小于队列中的项数,则并非所有项都将被处理。
如果完成数设置为大于队列中的项数,则即使队列中的所有项都已处理,Job 也不会显示已完成。它将启动其他 Pod,这些 Pod 将阻塞等待消息。你需要自己建立一种机制来发现何时有工作要做并测量队列的大小,将完成数设置为匹配。
这种模式下存在一种不太可能出现的竞态条件。如果在 amqp-consume
命令确认消息之后,容器成功退出之前,容器被杀死,或者节点在 kubelet 将 Pod 成功状态发布回 API 服务器之前崩溃,那么即使队列中的所有项目都已处理完毕,Job 也不会显示为已完成。