使用工作队列进行粗略并行处理

在这个示例中,您将运行一个具有多个并行工作进程的 Kubernetes Job。

在这个示例中,随着每个 Pod 的创建,它会从任务队列中获取一个工作单元,完成它,从队列中删除它,然后退出。

以下是此示例中步骤的概述

  1. 启动消息队列服务。 在这个示例中,您使用 RabbitMQ,但您也可以使用其他服务。在实际应用中,您只需设置一次消息队列服务,然后在多个作业中重复使用它。
  2. 创建一个队列,并用消息填充它。 每个消息代表一项需要完成的任务。在这个示例中,消息是一个整数,我们将在其上进行冗长的计算。
  3. 启动一个从队列中处理任务的 Job。 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,处理它,然后退出。

开始之前

您应该已经熟悉基本的非并行 Job 使用方法。

您需要有一个 Kubernetes 集群,并且 kubectl 命令行工具必须配置为与您的集群通信。建议在至少有两个节点(不充当控制平面主机)的集群上运行本教程。如果您还没有集群,可以使用 minikube 创建一个,或者您可以使用以下 Kubernetes 游乐场之一

您需要一个容器镜像仓库,您可以在其中上传要在集群中运行的镜像。

此任务示例还假定您在本地安装了 Docker。

启动消息队列服务

此示例使用 RabbitMQ,但是,您可以将示例调整为使用其他 AMQP 类型的消息服务。

在实际应用中,您可以在集群中设置一次消息队列服务,并在多个作业中重复使用它,以及用于长期运行的服务。

按如下步骤启动 RabbitMQ

# make a Service for the StatefulSet to use
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.ac.cn/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created

测试消息队列服务

现在,我们可以尝试访问消息队列。我们将创建一个临时交互式 Pod,在其中安装一些工具,并尝试使用队列。

首先创建一个临时交互式 Pod。

# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

请注意,您的 Pod 名称和命令提示符会有所不同。

接下来安装 amqp-tools,以便您可以使用消息队列。以下命令展示了您需要在该 Pod 中的交互式 Shell 中运行的内容

apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils

稍后,您将创建一个包含这些包的容器镜像。

接下来,您将检查是否可以发现 RabbitMQ 的服务

# Run these commands inside the Pod
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

(IP 地址会有所不同)

如果 kube-dns 附加组件配置不正确,则上一步可能对您不起作用。您也可以在环境变量中找到该服务的 IP 地址

# run this check inside the Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

(IP 地址会有所不同)

接下来,您将验证是否可以创建队列,以及发布和消费消息。

# Run these commands inside the Pod
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached.  5672 is the standard port for rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# Now create a queue:

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

发布一条消息到队列中

/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# And get it back.

/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello

在最后一个命令中,amqp-consume 工具从队列中获取了一条消息 (-c 1),并将该消息传递给任意命令的标准输入。在这种情况下,cat 程序打印从标准输入读取的字符,echo 添加一个回车符,以便示例可读。

用任务填充队列

现在,用一些模拟任务填充队列。在这个示例中,任务是需要打印的字符串。

在实际应用中,消息的内容可能是

  • 需要处理的文件名称
  • 程序的额外标志
  • 数据库表中的键范围
  • 模拟的配置参数
  • 需要渲染的场景的帧编号

如果存在大量数据需要 Job 的所有 Pod 以只读模式读取,您通常将这些数据放在共享文件系统(例如 NFS)中,并在所有 Pod 上挂载为只读,或者在 Pod 中编写程序,以便它可以从集群文件系统(例如:HDFS)本机读取数据。

对于此示例,您将使用 AMQP 命令行工具创建队列并用数据填充它。在实际应用中,您可能会编写一个程序使用 AMQP 客户端库来填充队列。

# Run this on your computer, not in the Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

向队列添加项

for f in apple banana cherry date fig grape lemon melon
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

您已向队列添加了 8 条消息。

创建容器镜像

现在您已准备好创建一个将作为 Job 运行的镜像。

Job 将使用 amqp-consume 实用程序从队列中读取消息并运行实际工作。这是一个非常简单的示例程序

#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

赋予脚本执行权限

chmod +x worker.py

现在,构建一个镜像。创建一个临时目录,更改到该目录,下载 Dockerfileworker.py。无论哪种情况,都使用以下命令构建镜像

docker build -t job-wq-1 .

对于 Docker Hub,使用您的用户名标记您的应用程序镜像,并使用以下命令将其推送到 Hub。将 <username> 替换为您的 Hub 用户名。

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

如果您使用的是其他容器镜像仓库,请标记镜像并将其推送到那里。

定义 Job

这是一个 Job 的清单文件。您需要复制 Job 清单文件(将其命名为 ./job.yaml),并编辑容器镜像的名称以匹配您使用的名称。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

在这个示例中,每个 Pod 处理队列中的一个项目,然后退出。因此,Job 的完成计数对应于完成的工作项数量。这就是为什么示例清单文件将 .spec.completions 设置为 8 的原因。

运行 Job

现在,运行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

您可以等待 Job 成功,并设置超时时间

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1

接下来,检查 Job

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      container-registry.example/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

该 Job 的所有 Pod 都成功了!您已完成。

替代方法

这种方法的优点是,您不需要修改“worker”程序以使其知道存在工作队列。您可以将 worker 程序不加修改地包含在您的容器镜像中。

使用这种方法确实需要运行消息队列服务。如果运行队列服务不方便,您可能需要考虑其他 作业模式 之一。

这种方法为每个工作项创建一个 Pod。但是,如果您的工作项只需要几秒钟,则为每个工作项创建一个 Pod 可能会增加大量开销。请考虑其他设计,例如在 细粒度并行工作队列示例 中,每个 Pod 执行多个工作项。

在这个示例中,您使用了 amqp-consume 实用程序从队列中读取消息并运行实际程序。这具有不需要修改程序以使其知道队列的优点。 细粒度并行工作队列示例 展示了如何使用客户端库与工作队列进行通信。

注意事项

如果完成数量设置为小于队列中的项目数量,则不会处理所有项目。

如果完成数量设置为大于队列中的项目数量,则 Job 不会显示已完成,即使队列中的所有项目都已处理完毕。它将启动额外的 Pod,这些 Pod 会阻塞等待消息。您需要创建自己的机制来发现何时有工作要做,并测量队列的大小,并将完成数量设置为匹配。

这种模式下存在一个不太可能的竞争条件。如果容器在 amqp-consume 命令确认消息与容器成功退出之间被杀死,或者节点在 kubelet 能够将 Pod 的成功状态回发到 API 服务器之前崩溃,则 Job 不会显示已完成,即使队列中的所有项目都已处理完毕。

最后修改时间:2024 年 3 月 16 日凌晨 2:39 PST:修复并行处理工作队列任务的文档。(bed970676c)