使用工作队列进行粗粒度并行处理

在此示例中,你将运行一个具有多个并行工作进程的 Kubernetes Job。

在此示例中,当每个 Pod 被创建时,它会从任务队列中获取一个工作单元,完成它,将其从队列中删除,然后退出。

以下是此示例中的步骤概述

  1. 启动消息队列服务。 在此示例中,你使用 RabbitMQ,但你也可以使用其他服务。在实践中,你将设置一次消息队列服务,并将其重用于许多 Job。
  2. 创建一个队列,并用消息填充它。 每条消息代表一项要完成的任务。在此示例中,消息是一个整数,我们将对其进行耗时的计算。
  3. 启动一个处理队列中任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,处理它,然后退出。

开始之前

你应该已经熟悉 Job 的基本、非并行用法。

你需要有一个 Kubernetes 集群,并且必须配置 kubectl 命令行工具以与你的集群通信。建议在至少有两个不充当控制平面主机的节点的集群上运行本教程。如果你还没有集群,可以使用 minikube 创建一个集群,或者你可以使用以下 Kubernetes 游乐场之一

你将需要一个容器镜像注册表,你可以在其中上传镜像以在你的集群中运行。

此任务示例还假设你在本地安装了 Docker。

启动消息队列服务

此示例使用 RabbitMQ,但是你可以调整该示例以使用其他 AMQP 类型的消息服务。

在实践中,你可以在集群中设置一次消息队列服务,并将其重用于许多 Job 以及长时间运行的服务。

按如下方式启动 RabbitMQ

# make a Service for the StatefulSet to use
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created

测试消息队列服务

现在,我们可以尝试访问消息队列。我们将创建一个临时的交互式 Pod,在其上安装一些工具,并尝试队列。

首先创建一个临时的交互式 Pod。

# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

请注意,你的 Pod 名称和命令提示符将有所不同。

接下来安装 amqp-tools,以便你可以使用消息队列。接下来的命令显示你需要在该 Pod 中的交互式 shell 中运行的内容

apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils

稍后,你将创建一个包含这些软件包的容器镜像。

接下来,你将检查是否可以发现 RabbitMQ 的服务

# Run these commands inside the Pod
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

(IP 地址会有所不同)

如果 kube-dns 插件未正确设置,则上一步可能对你无效。你还可以在环境变量中找到该服务的 IP 地址

# run this check inside the Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

(IP 地址会有所不同)

接下来,你将验证是否可以创建队列,以及发布和使用消息。

# Run these commands inside the Pod
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached.  5672 is the standard port for rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# Now create a queue:

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

将一条消息发布到队列

/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# And get it back.

/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello

在最后一个命令中,amqp-consume 工具从队列中获取一条消息 (-c 1),并将该消息传递给任意命令的标准输入。在这种情况下,程序 cat 打印从标准输入读取的字符,并且 echo 添加回车符,以便示例可读。

用任务填充队列

现在,用一些模拟的任务填充队列。在此示例中,任务是要打印的字符串。

在实践中,消息的内容可能是

  • 需要处理的文件的名称
  • 程序的额外标志
  • 数据库表中的键范围
  • 模拟的配置参数
  • 要渲染的场景的帧数

如果 Job 的所有 Pod 以只读模式都需要大量数据,你通常将其放在像 NFS 这样的共享文件系统中,并在所有 Pod 上以只读方式挂载,或者在 Pod 中编写程序,以便它可以从集群文件系统(例如:HDFS)中原生读取数据。

对于此示例,你将使用 AMQP 命令行工具创建队列并填充它。在实践中,你可以编写一个程序以使用 AMQP 客户端库填充队列。

# Run this on your computer, not in the Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

将项添加到队列

for f in apple banana cherry date fig grape lemon melon
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

你向队列添加了 8 条消息。

创建容器镜像

现在你已经准备好创建一个将作为 Job 运行的镜像。

该 Job 将使用 amqp-consume 实用程序从队列中读取消息并运行实际工作。这是一个非常简单的示例程序

#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

授予脚本执行权限

chmod +x worker.py

现在,构建一个镜像。创建一个临时目录,更改到该目录,下载 Dockerfileworker.py。在这两种情况下,都使用以下命令构建镜像

docker build -t job-wq-1 .

对于 Docker Hub,使用你的用户名标记你的应用程序镜像,并使用以下命令将其推送到 Hub。将 <username> 替换为你的 Hub 用户名。

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

如果你使用的是其他容器镜像注册表,请标记该镜像并将其推送到那里。

定义 Job

这是一个 Job 的清单。你需要复制 Job 清单(将其命名为 ./job.yaml),并编辑容器镜像的名称以匹配你使用的名称。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

在此示例中,每个 Pod 处理队列中的一项,然后退出。因此,Job 的完成计数对应于完成的工作项数。这就是为什么示例清单将 .spec.completions 设置为 8 的原因。

运行 Job

现在,运行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

你可以等待 Job 成功,并设置超时时间

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1

接下来,检查 Job

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      container-registry.example/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

该 Job 的所有 Pod 都成功了!你已经完成了。

替代方案

这种方法的优点是,你无需修改你的“工作”程序就可以知道存在工作队列。你可以将工作程序原封不动地包含在你的容器镜像中。

使用此方法确实需要你运行消息队列服务。如果运行队列服务不方便,你可能需要考虑其他 Job 模式之一。

这种方法为每个工作项创建一个 Pod。但是,如果你的工作项仅需几秒钟,则为每个工作项创建一个 Pod 可能会增加很多开销。请考虑其他设计,例如在 细粒度并行工作队列示例中,每个 Pod 执行多个工作项。

在此示例中,你使用 amqp-consume 实用程序从队列中读取消息并运行实际程序。这种方法的优点是,你无需修改你的程序就可以知道队列的存在。细粒度并行工作队列示例展示了如何使用客户端库与工作队列通信。

注意事项

如果完成数设置为小于队列中的项数,则并非所有项都将被处理。

如果完成数设置为大于队列中的项数,则即使队列中的所有项都已处理,Job 也不会显示已完成。它将启动其他 Pod,这些 Pod 将阻塞等待消息。你需要自己建立一种机制来发现何时有工作要做并测量队列的大小,将完成数设置为匹配。

这种模式下存在一种不太可能出现的竞态条件。如果在 amqp-consume 命令确认消息之后,容器成功退出之前,容器被杀死,或者节点在 kubelet 将 Pod 成功状态发布回 API 服务器之前崩溃,那么即使队列中的所有项目都已处理完毕,Job 也不会显示为已完成。

上次修改时间:太平洋标准时间 2024 年 3 月 16 日凌晨 2:39: 修复并行处理工作队列任务的文档。(bed970676c)