使用工作佇列進行粗粒度並行處理

在此示例中,你將執行一個具有多個並行工作程序的 Kubernetes Job。

在此示例中,每當 Pod 被建立時,它會從任務佇列中取出一個工作單元,完成它,將其從佇列中刪除,然後退出。

以下是本示例中的步驟概述:

  1. 啟動訊息佇列服務。 在此示例中,你使用 RabbitMQ,但也可以使用其他服務。實際上,你只需設定一次訊息佇列服務,然後可以將其用於許多 Job。
  2. 建立一個佇列,並用訊息填充它。 每條訊息代表一個待完成的任務。在此示例中,訊息是一個整數,我們將對其進行耗時的計算。
  3. 啟動一個從佇列中處理任務的 Job。 該 Job 啟動多個 Pod。每個 Pod 從訊息佇列中取出一個任務,處理它,然後退出。

準備工作

你應該已經熟悉 Job 的基本非並行用法。

你需要一個 Kubernetes 叢集,並且 kubectl 命令列工具必須配置為與你的叢集通訊。建議在至少有兩個不充當控制平面主機的節點叢集上執行此教程。如果你尚未擁有叢集,可以使用 minikube 建立一個,或者使用以下 Kubernetes 遊樂場之一:

你需要一個容器映象倉庫,以便將映象上傳到叢集中執行。

此任務示例還假設你已在本地安裝 Docker。

啟動訊息佇列服務

本示例使用 RabbitMQ,但是,你可以修改本示例以使用其他 AMQP 型別訊息服務。

實際上,你可以在叢集中設定一次訊息佇列服務,並將其用於許多 Job 以及長期執行的服務。

按以下步驟啟動 RabbitMQ:

# make a Service for the StatefulSet to use
kubectl create -f https://kubernetes.club.tw/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.club.tw/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created

測試訊息佇列服務

現在,我們可以嘗試訪問訊息佇列。我們將建立一個臨時的互動式 Pod,在其上安裝一些工具,並進行佇列實驗。

首先建立一個臨時的互動式 Pod。

# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

請注意,你的 Pod 名稱和命令提示符將有所不同。

接下來安裝 `amqp-tools`,以便你可以使用訊息佇列。以下命令展示了你需要在該 Pod 的互動式 shell 中執行的內容:

apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils

稍後,你將建立一個包含這些包的容器映象。

接下來,你將檢查是否可以發現 RabbitMQ 的服務。

# Run these commands inside the Pod
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

(IP 地址會有所不同)

如果 kube-dns 外掛設定不正確,上一步可能不適用於你。你還可以在環境變數中找到該服務的 IP 地址:

# run this check inside the Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

(IP 地址會有所不同)

接下來你將驗證是否可以建立佇列,以及釋出和消費訊息。

# Run these commands inside the Pod
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached.  5672 is the standard port for rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# Now create a queue:

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

向佇列釋出一條訊息

/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# And get it back.

/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello

在最後一個命令中,`amqp-consume` 工具從佇列中取走一條訊息(`-c 1`),並將該訊息傳遞給任意命令的標準輸入。在此例中,`cat` 程式打印出從標準輸入讀取的字元,而 `echo` 新增一個回車符,使示例更具可讀性。

用任務填充佇列

現在,用一些模擬任務填充佇列。在此示例中,任務是需要列印的字串。

實踐中,訊息的內容可能包括:

  • 需要處理的檔名
  • 程式的額外引數
  • 資料庫表中的鍵範圍
  • 模擬的配置引數
  • 要渲染場景的幀編號

如果 Job 的所有 Pod 都需要只讀模式下的大資料,通常會將其放在像 NFS 這樣的共享檔案系統中,並以只讀方式掛載到所有 Pod 上,或者編寫 Pod 中的程式,使其能夠本地讀取叢集檔案系統(例如:HDFS)中的資料。

對於此示例,你將使用 AMQP 命令列工具建立佇列並填充它。實際上,你可以編寫一個程式,使用 AMQP 客戶端庫來填充佇列。

# Run this on your computer, not in the Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

向佇列新增專案

for f in apple banana cherry date fig grape lemon melon
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

你已向佇列添加了 8 條訊息。

建立容器映象

現在你已準備好建立要作為 Job 執行的映象。

該 Job 將使用 `amqp-consume` 工具從佇列中讀取訊息並執行實際工作。這是一個非常簡單的示例程式:

#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

賦予指令碼執行許可權

chmod +x worker.py

現在,構建一個映象。建立一個臨時目錄,切換到該目錄,下載 Dockerfileworker.py。在任何一種情況下,都使用此命令構建映象:

docker build -t job-wq-1 .

對於 Docker Hub,使用你的使用者名稱標記你的應用映象,並使用以下命令將其推送到 Hub。將 `` 替換為你的 Hub 使用者名稱。

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

如果你使用的是備用容器映象倉庫,請改為在那裡標記並推送映象。

定義一個 Job

這是一個 Job 的清單。你需要複製 Job 清單(命名為 `./job.yaml`),並編輯容器映象名稱以匹配你使用的名稱。

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

在這個例子中,每個 Pod 從佇列中取出一個專案然後退出。因此,Job 的完成計數與完成的工作專案數相對應。這就是為什麼示例清單中的 `.spec.completions` 設定為 `8`。

執行 Job

現在,執行 Job。

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

你可以等待 Job 成功,並設定超時:

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1

接下來,檢查 Job:

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      container-registry.example/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

該 Job 的所有 Pod 都成功了!你完成了。

替代方案

這種方法的優勢在於,你不需要修改你的“worker”程式來使其知道存在工作佇列。你可以將未修改的 worker 程式包含在你的容器映象中。

使用這種方法確實需要你執行訊息佇列服務。如果執行佇列服務不方便,你可能需要考慮其他 Job 模式

這種方法為每個工作項建立一個 Pod。但是,如果你的工作項只需要幾秒鐘,那麼為每個工作項建立一個 Pod 可能會增加很多開銷。請考慮另一種設計,例如細粒度並行工作佇列示例中所示,每個 Pod 執行多個工作項。

在此示例中,你使用 `amqp-consume` 工具從佇列中讀取訊息並執行實際程式。這樣做的優點是你無需修改程式使其感知佇列。 細粒度並行工作佇列示例展示瞭如何使用客戶端庫與工作佇列進行通訊。

注意事項

如果完成數設定得小於佇列中的專案數,則並非所有專案都會被處理。

如果完成數設定得大於佇列中的專案數,即使佇列中所有專案都已處理,Job 也不會顯示為已完成。它將啟動額外的 Pod,這些 Pod 將阻塞等待訊息。你需要建立自己的機制來發現何時有工作要做並測量佇列的大小,然後設定完成數以匹配。

這種模式存在一個不太可能發生的競態條件。如果容器在 `amqp-consume` 命令確認訊息到容器成功退出之間被終止,或者如果節點在 kubelet 能夠向 API 伺服器報告 Pod 成功之前崩潰,那麼即使佇列中的所有專案都已處理,Job 也不會顯示為已完成。

上次修改時間:2024 年 3 月 16 日太平洋標準時間凌晨 2:39:修復並行處理工作佇列任務的文件 (bed970676c)