使用工作队列进行细粒度并行处理

在此示例中,你将运行一个 Kubernetes Job,该 Job 将多个并行任务作为工作进程运行,每个任务都作为单独的 Pod 运行。

在此示例中,每创建一次 Pod,它就会从任务队列中获取一个工作单元,对其进行处理,并重复此过程,直到队列结束。

以下是本示例中的步骤概述:

  1. 启动存储服务以保存工作队列。在此示例中,你将使用 Redis 来存储工作项。在上一个示例中,你使用了 RabbitMQ。在此示例中,你将使用 Redis 和自定义工作队列客户端库;这是因为 AMQP 没有提供一种很好的方式让客户端检测有限长度的工作队列何时为空。在实践中,你通常会设置一次 Redis 等存储,并将其用于多个 Job 的工作队列和其他用途。
  2. 创建一个队列,并用消息填充它。每条消息代表一个待完成的任务。在此示例中,消息是一个整数,我们将对其进行耗时计算。
  3. 启动一个从队列中处理任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,对其进行处理,并重复此过程,直到队列结束。

准备工作

你需要有一个 Kubernetes 集群,并且 kubectl 命令行工具已配置为与你的集群通信。建议在至少有两个不作为控制平面主机的节点的集群上运行本教程。如果你还没有集群,可以使用 minikube 创建一个,或者使用以下 Kubernetes 演练场之一。

你将需要一个容器镜像仓库,以便将镜像上传到集群中运行。本示例使用 Docker Hub,但你可以将其适配到其他容器镜像仓库。

此任务示例还假设你已在本地安装 Docker。你使用 Docker 构建容器镜像。

熟悉 Job 的基本非并行用法。

启动 Redis

为了简化此示例,你将启动一个 Redis 实例。有关如何可伸缩和冗余部署 Redis 的示例,请参阅 Redis 示例

您也可以直接下载以下文件

要启动一个 Redis 实例,你需要创建 redis pod 和 redis service。

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任务填充队列

现在,让我们用一些“任务”来填充队列。在这个例子中,任务是需要打印的字符串。

启动一个临时交互式 Pod 来运行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

现在按回车键,启动 Redis CLI,然后创建一个包含一些工作项的列表。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,键为 job2 的列表将是工作队列。

注意:如果 Kube DNS 没有正确设置,你可能需要将上述代码块的第一步改为 redis-cli -h $REDIS_SERVICE_HOST

创建容器镜像

现在你已准备好创建将处理队列中工作的镜像。

你将使用一个带有 Redis 客户端的 Python 工作程序来从消息队列中读取消息。

我们提供了一个名为 rediswq.py 的简单 Redis 工作队列客户端库(下载)。

Job 中每个 Pod 的“工作程序”程序都使用工作队列客户端库来获取工作。其代码如下:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您还可以下载 worker.pyrediswq.pyDockerfile 文件,然后构建容器镜像。这是一个使用 Docker 构建镜像的示例:

docker build -t job-wq-2 .

推送镜像

对于 Docker Hub,使用您的用户名标记您的应用程序镜像,并使用以下命令将其推送到 Hub。将 <username> 替换为您的 Hub 用户名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

你需要推送到公共仓库,或者配置你的集群以访问你的私有仓库

定义 Job

这是您将要创建的 Job 的清单

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在此示例中,每个 Pod 处理队列中的多个项,然后当没有更多项时退出。由于 worker 自身检测工作队列何时为空,而 Job 控制器不知道工作队列,因此它依赖 worker 在工作完成后发出信号。worker 通过成功退出表示队列为空。因此,一旦**任何** worker 成功退出,控制器就知道工作已完成,并且 Pod 将很快退出。因此,你需要将 Job 的完成计数留空。Job 控制器也会等待其他 Pod 完成。

运行 Job

所以,现在运行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

现在稍等片刻,然后检查 Job

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

你可以等待 Job 成功,并设置超时:

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如你所见,该 Job 的其中一个 Pod 处理了多个工作单元。

替代方案

如果运行队列服务或修改容器以使用工作队列不方便,你可能需要考虑其他作业模式

如果你有持续的后台处理工作要运行,那么可以考虑使用 ReplicaSet 来运行你的后台工作进程,并考虑运行一个后台处理库,例如 https://github.com/resque/resque

上次修改时间:2024 年 3 月 16 日太平洋标准时间凌晨 2:39:修复并行处理工作队列任务的文档 (bed970676c)