使用工作队列进行细粒度并行处理
在此示例中,你将运行一个 Kubernetes Job,它以工作进程的形式运行多个并行任务,每个任务都作为单独的 Pod 运行。
在此示例中,随着每个 Pod 的创建,它从任务队列中获取一个工作单元,对其进行处理,并重复此过程直到队列末尾。
以下是此示例中的步骤概览:
- 启动一个存储服务来保存工作队列。在此示例中,你将使用 Redis 来存储工作项。在上一个示例中,你使用了 RabbitMQ。在此示例中,你将使用 Redis 和自定义工作队列客户端库;这是因为 AMQP 没有提供一种很好的方式让客户端检测有限长度工作队列何时为空。实际上,你通常会一次性设置一个存储(例如 Redis),然后将其用于许多 Job 的工作队列以及其他用途。
- 创建一个队列,并填充消息。每条消息代表一个要完成的任务。在此示例中,一条消息是一个整数,我们将对其进行长时间计算。
- 启动一个处理队列任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中取出一个任务,处理它,并重复此过程直到队列末尾。
开始之前
你需要有一个 Kubernetes 集群,并且 kubectl 命令行工具已配置为与你的集群通信。建议在至少有两个非控制平面主机节点的集群上运行本教程。如果你还没有集群,可以使用 minikube 创建一个,或者使用以下 Kubernetes 演练场之一:
你将需要一个容器镜像仓库,以便将镜像上传到你的集群中运行。本示例使用 Docker Hub,但你可以将其适配到不同的容器镜像仓库。
此任务示例还假设你已在本地安装 Docker。你使用 Docker 来构建容器镜像。
熟悉 Job 的基本非并行用法。
启动 Redis
为了简化本示例,你将启动一个 Redis 单实例。有关如何可伸缩和冗余地部署 Redis 的示例,请参阅 Redis 示例。
你也可以直接下载以下文件:
要启动一个 Redis 单实例,你需要创建 redis Pod 和 redis Service:
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml
向队列中填充任务
现在,我们向队列中填充一些“任务”。在本示例中,任务是要打印的字符串。
启动一个临时的交互式 Pod 来运行 Redis CLI。
kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
现在按回车,启动 Redis CLI,并创建一个包含一些工作项的列表。
redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
因此,键为 job2
的列表将成为工作队列。
注意:如果你的 Kube DNS 设置不正确,你可能需要将上述块的第一步更改为 redis-cli -h $REDIS_SERVICE_HOST
。
创建容器镜像
现在你已准备好创建将处理该队列中工作的镜像。
你将使用带有 Redis 客户端的 Python 工作程序来从消息队列读取消息。
提供了一个简单的 Redis 工作队列客户端库,名为 rediswq.py
(下载)。
Job 中每个 Pod 的“工作程序”使用工作队列客户端库来获取工作。如下所示:
#!/usr/bin/env python
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " + q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf-8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")
你还可以下载 worker.py
、rediswq.py
和 Dockerfile
文件,然后构建容器镜像。以下是使用 Docker 进行镜像构建的示例:
docker build -t job-wq-2 .
推送镜像
对于 Docker Hub,使用你的用户名标记你的应用程序镜像,然后使用以下命令推送到 Hub。将 <username>
替换为你的 Hub 用户名。
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
你需要推送到一个公共仓库,或者配置你的集群以访问你的私有仓库。
定义 Job
以下是你将创建的 Job 的清单:
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-2
spec:
parallelism: 2
template:
metadata:
name: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure
注意
请务必编辑清单,将gcr.io/myproject
更改为你自己的路径。在本示例中,每个 Pod 处理队列中的多个项,并在没有更多项时退出。由于工作进程本身会检测工作队列何时为空,而 Job 控制器不知道工作队列的情况,因此它依赖于工作进程来发出完成工作的信号。工作进程通过成功退出信号告知队列为空。因此,一旦任何工作进程成功退出,控制器就知道工作已经完成,并且其他 Pod 也将很快退出。所以,你需要保持 Job 的完成数(completion count)未设置。Job 控制器也会等待其他 Pod 完成。
运行 Job
现在运行 Job:
# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml
现在等待片刻,然后检查 Job 状态:
kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Selector: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Annotations: <none>
Parallelism: 2
Completions: <unset>
Start Time: Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Containers:
c:
Image: container-registry.example/exampleproject/job-wq-2
Port:
Environment: <none>
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
你可以等待 Job 成功完成,并设置超时时间:
# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon
正如你所见,此 Job 的一个 Pod 处理了多个工作单元。
替代方案
如果运行队列服务或修改容器以使用工作队列不方便,你可能需要考虑其他Job 模式。
如果你有持续的后台处理工作要运行,则可以考虑使用 ReplicaSet 运行后台工作进程,并考虑使用 https://github.com/resque/resque 等后台处理库。