使用工作佇列進行細粒度並行處理

在此示例中,你將執行一個 Kubernetes Job,該 Job 將多個並行任務作為工作程序執行,每個任務都作為單獨的 Pod 執行。

在此示例中,每建立一次 Pod,它就會從任務佇列中獲取一個工作單元,對其進行處理,並重復此過程,直到佇列結束。

以下是本示例中的步驟概述:

  1. 啟動儲存服務以儲存工作佇列。在此示例中,你將使用 Redis 來儲存工作項。在上一個示例中,你使用了 RabbitMQ。在此示例中,你將使用 Redis 和自定義工作佇列客戶端庫;這是因為 AMQP 沒有提供一種很好的方式讓客戶端檢測有限長度的工作佇列何時為空。在實踐中,你通常會設定一次 Redis 等儲存,並將其用於多個 Job 的工作佇列和其他用途。
  2. 建立一個佇列,並用訊息填充它。每條訊息代表一個待完成的任務。在此示例中,訊息是一個整數,我們將對其進行耗時計算。
  3. 啟動一個從佇列中處理任務的 Job。該 Job 啟動多個 Pod。每個 Pod 從訊息佇列中獲取一個任務,對其進行處理,並重復此過程,直到佇列結束。

準備工作

你需要有一個 Kubernetes 叢集,並且 kubectl 命令列工具已配置為與你的叢集通訊。建議在至少有兩個不作為控制平面主機的節點的叢集上執行本教程。如果你還沒有叢集,可以使用 minikube 建立一個,或者使用以下 Kubernetes 演練場之一。

你將需要一個容器映象倉庫,以便將映象上傳到叢集中執行。本示例使用 Docker Hub,但你可以將其適配到其他容器映象倉庫。

此任務示例還假設你已在本地安裝 Docker。你使用 Docker 構建容器映象。

熟悉 Job 的基本非並行用法。

啟動 Redis

為了簡化此示例,你將啟動一個 Redis 例項。有關如何可伸縮和冗餘部署 Redis 的示例,請參閱 Redis 示例

您也可以直接下載以下檔案

要啟動一個 Redis 例項,你需要建立 redis pod 和 redis service。

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任務填充佇列

現在,讓我們用一些“任務”來填充佇列。在這個例子中,任務是需要列印的字串。

啟動一個臨時互動式 Pod 來執行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

現在按回車鍵,啟動 Redis CLI,然後建立一個包含一些工作項的列表。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,鍵為 job2 的列表將是工作佇列。

注意:如果 Kube DNS 沒有正確設定,你可能需要將上述程式碼塊的第一步改為 redis-cli -h $REDIS_SERVICE_HOST

建立容器映象

現在你已準備好建立將處理佇列中工作的映象。

你將使用一個帶有 Redis 客戶端的 Python 工作程式來從訊息佇列中讀取訊息。

我們提供了一個名為 rediswq.py 的簡單 Redis 工作佇列客戶端庫(下載)。

Job 中每個 Pod 的“工作程式”程式都使用工作佇列客戶端庫來獲取工作。其程式碼如下:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您還可以下載 worker.pyrediswq.pyDockerfile 檔案,然後構建容器映象。這是一個使用 Docker 構建映象的示例:

docker build -t job-wq-2 .

推送映象

對於 Docker Hub,使用您的使用者名稱標記您的應用程式映象,並使用以下命令將其推送到 Hub。將 <username> 替換為您的 Hub 使用者名稱。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

你需要推送到公共倉庫,或者配置你的叢集以訪問你的私有倉庫

定義 Job

這是您將要建立的 Job 的清單

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在此示例中,每個 Pod 處理佇列中的多個項,然後當沒有更多項時退出。由於 worker 自身檢測工作佇列何時為空,而 Job 控制器不知道工作佇列,因此它依賴 worker 在工作完成後發出訊號。worker 透過成功退出表示佇列為空。因此,一旦**任何** worker 成功退出,控制器就知道工作已完成,並且 Pod 將很快退出。因此,你需要將 Job 的完成計數留空。Job 控制器也會等待其他 Pod 完成。

執行 Job

所以,現在執行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

現在稍等片刻,然後檢查 Job

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

你可以等待 Job 成功,並設定超時:

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如你所見,該 Job 的其中一個 Pod 處理了多個工作單元。

替代方案

如果執行佇列服務或修改容器以使用工作佇列不方便,你可能需要考慮其他作業模式

如果你有持續的後臺處理工作要執行,那麼可以考慮使用 ReplicaSet 來執行你的後臺工作程序,並考慮執行一個後臺處理庫,例如 https://github.com/resque/resque

上次修改時間:2024 年 3 月 16 日太平洋標準時間凌晨 2:39:修復並行處理工作佇列任務的文件 (bed970676c)