本文發表於一年多前。舊文章可能包含過時內容。請檢查頁面中的資訊自發布以來是否已變得不正確。

Kubernetes 上的 Airflow(第 1 部分):一種不同型別的 Operator

引言

作為彭博社持續致力於發展 Kubernetes 生態系統的一部分,我們很高興地宣佈推出 Kubernetes Airflow Operator;這是一種機制,用於讓流行的工作流編排框架 Apache Airflow 能夠使用 Kubernetes API 原生啟動任意 Kubernetes Pod。

什麼是 Airflow?

Apache Airflow 是“配置即程式碼”DevOps 理念的一種實現。Airflow 允許使用者使用簡單的 Python 物件 DAG(有向無環圖)啟動多步驟管道。您可以定義依賴項,以程式設計方式構建複雜的工作流,並在易於閱讀的 UI 中監控計劃作業。

Airflow DAGs Airflow UI

為什麼選擇 Kubernetes 上的 Airflow?

自誕生以來,Airflow 最大的優勢一直是其靈活性。Airflow 為從 Spark 和 HBase 到各種雲提供商服務等一系列服務提供了廣泛的整合。Airflow 還透過其外掛框架提供了簡單的可擴充套件性。然而,該專案的一個限制是 Airflow 使用者在執行時只能使用 Airflow worker 上存在的框架和客戶端。一個組織可以擁有各種 Airflow 工作流,從資料科學管道到應用程式部署。這種用例差異會在依賴管理中產生問題,因為兩個團隊可能為其工作流使用截然不同的庫。

為了解決這個問題,我們利用 Kubernetes 允許使用者啟動任意 Kubernetes pod 和配置。Airflow 使用者現在可以完全控制他們的執行時環境、資源和秘密,基本上將 Airflow 變成了一個“任何你想要的任務”工作流編排器。

Kubernetes Operator

在我們進一步討論之前,我們應該澄清一下,在 Airflow 中,Operator 是一個任務定義。當用戶建立 DAG 時,他們會使用像“SparkSubmitOperator”或“PythonOperator”這樣的 Operator 來分別提交/監控 Spark 任務或 Python 函式。Airflow 內建了 Apache Spark、BigQuery、Hive 和 EMR 等框架的 Operator。它還提供了外掛入口點,允許 DevOps 工程師開發自己的聯結器。

Airflow 使用者一直在尋找簡化部署和 ETL 管道管理的方法。任何解耦管道步驟同時增加監控的機會都可以減少未來的中斷和緊急修復。以下是 Airflow Kubernetes Operator 提供的優勢列表:

  • 提高部署靈活性
    Airflow 的外掛 API 一直為希望在其 DAG 中測試新功能的工程師提供了巨大的便利。缺點是,每當開發人員想要建立一個新的 Operator 時,他們都必須開發一個全新的外掛。現在,任何可以在 Docker 容器中執行的任務都可以透過完全相同的 Operator 訪問,無需維護額外的 Airflow 程式碼。

  • 配置和依賴的靈活性: 對於在靜態 Airflow worker 中執行的 Operator,依賴管理可能會變得相當困難。如果開發人員想要執行一個需要 SciPy 的任務和另一個需要 NumPy 的任務,開發人員將不得不要麼在所有 Airflow worker 中維護這兩個依賴項,要麼將任務解除安裝到外部機器(如果該外部機器以未跟蹤的方式更改,則可能導致錯誤)。自定義 Docker 映象允許使用者確保任務環境、配置和依賴項完全冪等。

  • 使用 Kubernetes 秘密以增加安全性: 處理敏感資料是任何 DevOps 工程師的核心職責。Airflow 使用者希望在任何時候都嚴格按照需要知道的原則隔離任何 API 金鑰、資料庫密碼和登入憑據。透過 Kubernetes Operator,使用者可以利用 Kubernetes Vault 技術儲存所有敏感資料。這意味著 Airflow worker 將永遠無法訪問這些資訊,並且可以簡單地請求僅使用其所需的秘密構建 pod。

架構

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python 客戶端生成一個由 APIServer 處理的請求(1)。然後 Kubernetes 將使用您定義的任何規範啟動您的 pod(2)。映象將載入所有必要的環境變數、秘密和依賴項,執行單個命令。一旦任務啟動,Operator 只需要監控跟蹤日誌的健康狀況(3)。使用者可以選擇將日誌收集到本地排程程式或其 Kubernetes 叢集中當前存在的任何分散式日誌服務。

使用 Kubernetes Operator

一個基本示例

以下 DAG 可能是我們能夠編寫的最簡單的示例,用於展示 Kubernetes Operator 的工作原理。此 DAG 在 Kubernetes 上建立了兩個 Pod:一個帶有 Python 的 Linux 發行版和一個沒有 Python 的基礎 Ubuntu 發行版。Python Pod 將正確執行 Python 請求,而沒有 Python 的 Pod 將向用戶報告失敗。如果 Operator 正常工作,`passing-task` Pod 應該完成,而 `failing-task` Pod 將向 Airflow webserver 返回失敗。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
Basic DAG Run

但這與我的工作流程有什麼關係呢?

雖然這個示例只使用了基本的映象,但 Docker 的魔力在於這個相同的 DAG 可以用於任何你想要的映象/命令組合。以下是執行生產就緒程式碼在 Airflow DAG 上的推薦 CI/CD 管道。

1:GitHub 中的 PR

使用 Travis 或 Jenkins 執行單元和整合測試,賄賂你最喜歡的隊友對你的程式碼進行 PR,然後合併到主分支以觸發自動化 CI 構建。

2:透過 Jenkins 進行 CI/CD -> Docker 映象

在 Jenkins 構建中生成 Docker 映象並提升釋出版本.

3:Airflow 啟動任務

最後,更新你的 DAG 以反映新的釋出版本,你就可以開始了!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

啟動測試部署

由於 Kubernetes Operator 尚未釋出,我們還沒有釋出官方的 helm chart 或 Operator(不過兩者都在進行中)。但是,我們在下面提供了基本部署的說明,並且正在積極尋找敢於冒險的 Beta 測試人員來嘗試這個新功能。要試用此係統,請按照以下步驟操作:

步驟 1:將 kubeconfig 指向 Kubernetes 叢集

步驟 2:克隆 Airflow 倉庫

執行 `git clone https://github.com/apache/incubator-airflow.git` 克隆官方 Airflow 倉庫。

步驟 3:執行

為了執行這個基本部署,我們借用了目前用於 Kubernetes Executor 的整合測試指令碼(將在本系列下一篇文章中解釋)。要啟動此部署,請執行以下三個命令:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在繼續之前,讓我們討論一下這些命令正在做什麼:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

Kubernetes Executor 是 Airflow 的另一個功能,它允許將任務動態分配為冪等 Pod。我們將其切換到 LocalExecutor 的原因僅僅是為了逐個介紹功能。如果您想嘗試 Kubernetes Executor,非常歡迎您跳過此步驟,但我們將在未來的文章中詳細介紹。

./scripts/ci/kubernetes/Docker/build.sh

此指令碼將打包 Airflow 主原始碼並基於 Airflow 分發版構建 Docker 容器

./scripts/ci/kubernetes/kube/deploy.sh

最後,我們在您的叢集上建立一個完整的 Airflow 部署。這包括 Airflow 配置、Postgres 後端、Webserver + 排程程式以及所有必要的服務。需要注意的是,提供的角色繫結是叢集管理員,因此如果您在叢集上沒有該級別的許可權,可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改。

步驟 4:登入您的 Web 伺服器

現在您的 Airflow 例項已執行,讓我們看看 UI!UI 位於 Airflow Pod 的 8080 埠,只需執行:

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

現在 Airflow UI 將存在於 https://:8080。要登入,只需輸入 `airflow` / `airflow`,您就應該擁有對 Airflow Web UI 的完全訪問許可權。

步驟 5:上傳測試文件

要修改/新增您自己的 DAG,您可以使用 `kubectl cp` 將本地檔案上傳到 Airflow 排程程式的 DAG 資料夾中。Airflow 將讀取新的 DAG 並自動將其上傳到其系統。以下命令將把任何本地檔案上傳到正確的目錄:

kubectl cp <本地檔案> <名稱空間>/<pod>:/root/airflow/dags -c scheduler

步驟 6:享受!

那麼什麼時候我才能使用這個功能呢?

雖然此功能仍處於早期階段,但我們希望它能在未來幾個月內廣泛釋出。

參與其中

此功能只是改進 Apache Airflow 與 Kubernetes 整合多項重大努力的開始。Kubernetes Operator 已合併到 Airflow 的 1.10 釋出分支(執行器處於實驗模式),以及一個完全 Kubernetes 原生排程器,稱為 Kubernetes Executor(文章即將釋出)。這些功能仍處於早期採用者/貢獻者可以對這些功能的未來產生巨大影響的階段。

對於那些有興趣加入這些工作的人,我建議檢視以下步驟:

  • 加入 airflow-dev 郵件列表:dev@airflow.apache.org
  • Apache Airflow JIRA 中提交問題
  • 每週三太平洋時間上午 10 點參加我們的 SIG-BigData 會議。
  • 在 kubernetes.slack.com 的 #sig-big-data 上聯絡我們

特別感謝 Apache Airflow 和 Kubernetes 社群,特別是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感謝你們在這些功能以及我們未來的努力中提供的出色幫助。