開始使用 GPU 加速的 Apache Spark 3

在第 3 章中,我們探討了 GPU 加速在 Spark 3.x 的功能。在本章中,我們會介紹使用適用於 Apache Spark 3.x 的全新 RAPIDS 加速器的入門基礎知識,這款加速器透過 RAPIDS 函式庫運用 GPU 加速處理 (詳情請參閱開始使用適用於 Apache Spark 的 RAPIDS 加速器)。

適用於 Apache Spark 的 RAPIDS 加速器具有以下功能和限制:

  • 可以欄式處理在 GPU 上執行 Spark SQL
  • 使用者無需變更 API
  • 可從列式轉換為欄式,反之亦然
  • 採用 Rapids cuDF 函式庫
  • 可在 GPU 上執行支援的 SQL 作業,若作業未採用或與 GPU 不相容,則會切換回 Spark CPU 版本。 
  • 外掛程式無法加速直接操控彈性分散式資料集 (RDD) 的作業。
  • 加速器函式庫亦提供 Spark 隨機置換功能實作,在盡可能保留 GPU 中資料的情況下運用 UCX 最佳化 GPU 資料傳輸,並繞過 CPU 執行 GPU 對 GPU 傳輸。

若要啟用這項 GPU 加速,需要以下項目:

  • Apache Spark 3.0 或以上版本
  • 以符合 RAPIDS Dataframe 函式庫 cuDF 版本要求的 GPU 所設定的 Spark 叢集。
    • 每個執行程式一個 GPU。
  • 請新增以下 jar:
    • 與叢集上可用 CUDA 版本相符的 cudf jar。
    • RAPIDS Spark 加速器外掛程式 jar。
  • 將 spark.plugins 設定為 com.nvidia.spark.SQLPlugin

安裝及設定

部署 Spark 的方式會影響安裝與設定 Spark 及適用於 Spark 的 RAPIDS 加速器時必須採取的步驟。部署 Spark 的主要方法為:

  • 本機模式 - 驅動程式計畫和工作在同一個 Java 虛擬機器中運作。此模式僅適用於開發及測試,並非專為執行生產應用程式而設計。  
  • 在有叢集管理員的叢集上:

安裝

基本上安裝時需要 Spark 3.x、Spark jar 適用的 RAPIDS 加速器,並在每個背景工作節點上配置 GPU 探索指令碼。使用本機模式會安裝在本機上。使用 Spark 獨立模式,則會安裝在要使用的所有節點上。使用 Yarn,會安裝在啟動器節點上並由 Yarn 視需求運送至節點。  使用 Kubernetes,則會將所需的一切放入 docker 映像,或放在 Spark 應用程式運作時裝載的磁碟機上。詳細的安裝資訊,請參閱開始使用適用於 Apache Spark 的 RAPIDS 加速器

設定

Spark 殼層與 ./bin/spark-submit 支援透過 conf 等命令列選項,或透過讀取 conf/spark-defaults.conf 設定選項動態載入設定屬性。(請參閱 Spark 設定指南瞭解 Spark 設定的概述與詳細資訊。)

On startup use: --conf [conf key]=[conf value]. 例如:

${SPARK_HOME}/bin/spark --jars 'rapids-4-spark_2.12-0.1.0.jar,cudf-0.14.jar' \
      --conf spark.plugins=com.nvidia.spark.SQLPlugin \
      --conf spark.rapids.sql.incompatibleOps.enabled=true

執行階段請使用:spark.conf.set("[conf key]", [conf value])。例如:

   scala>spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)

GPU 排程

可用 --conf 索引鍵/值組要求 GPU 並為其分配工作。實際使用的設定會依叢集管理員而異。以下是一些用於分配 GPU 的設定索引鍵/值屬性:

  • 要求執行程式擁有 GPU:
  • --conf spark.executor.resource.gpu.amount=1 

  • 指定每個工作的 GPU 數量:
  • --conf spark.task.resource.gpu.amount=1

  • 指定 discoveryScript (YARN 與 K8S 需要):
  • --conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh

提醒您,spark.task.resource.gpu.amount 可以有小數點,因此,如果您希望在執行程式上同時執行多個任務並分配給同一個 GPU,您可以將此值設定為小於 1 的小數值。您要將此設定對應於 spark.executor.cores 的設定。例如,如果您的 spark.executor.cores=2,表示允許在每個執行程式上執行 2 個任務,而您希望這 2 個任務在同一個 GPU 上執行,則需將 spark.task.resource.gpu.amount 設定為 0.5。

調整

剛開始時建議使用以下設定,但實際設定需依叢集與應用程式而定:

  • 每個 GPU 只搭配一個執行程式。在一個執行程式中,不要執行多個 GPU。每個執行程式可執行多項任務,取決於每個框上的核心及 GPU 數量。在每個執行程式上,只執行一個 GPU,您可在執行程式之間平均區分核心。例如,如果您每個主機有 24 個核心和 4 個 GPU,則可執行 6 個核心 (--conf spark.executor.cores=6)。這會控制 Spark 一次放上執行程式的工作數量。要一次控制同時在 GPU 上執行的工作數量,可以設定 spark.rapids.sql.concurrentGpuTasks。剛開始時,不妨允許兩個工作同時在 GPU 上執行:

    (--conf spark.rapids.sql.concurrentGpuTasks=2), 

    如有記憶體不足或效能緩慢等問題,則變更至 1。二者的不同之處在於,當其他工作在 GPU 上執行時,工作仍可使用 CPU。以目前的情況來說,同時在 GPU 上執行過多工作無法締造效能優勢,而且每項工作都會使用記憶體,因此我們通常會限制同時在 GPU 上執行的工作數量。
  • 設定輸入的規模。在 GPU 上執行更大批的資料可有效提升效能。  然而,輸入規模會依當下讀取的檔案類型和執行的操作而異。
    • 如果本來使用的是 Spark 資料來源 API (spark.read 之類),請改用:
    •  --conf spark.sql.files.maxPartitionBytes=512m

    • 如果使用 Spark/Hive api 讀取 Hive 表格中的資料,請改用:
    • ---conf
      spark.hadoop.mapreduce.input.fileinputformat.split.minsize=536870912
      --conf spark.hadoop.mapred.min.split.size=536870912

  • 設定 spark.sql.shuffle.partitions 的數字。Spark 預設為 200,而這往往導致分割區過小。各分割區的資料規模不應過小,以保持 GPU 的處理效率,因此請盡可能讓分割區的數量越少越好。請根據應用程式資料調整這項數字與輸入規模。
    如果將 KryoSerializer 與 Spark 搭配使用
  • (--conf spark.serializer=org.apache.spark.serializer.KryoSerializer)

    請註冊 GpuKryoRegistrator 類別,例如:

     --conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator

  • 請比照一般 Spark 應用程式設定執行程式記憶體的數量。

一般性建議

  • 少量大型輸入檔案比大量小型檔案好。您不一定能控制,但還是知道一下比較好。  
  • Larger input sizes spark.sql.files.maxPartitionBytes=只要 GPU 裝得下,512m 通常比較好。
  • The GPU does better with larger data chunks as long as they fit into memory.  When using the default spark.sql.shuffle.partitions=200 把這個數字減少一點比較好。  請根據工作正在讀取的資料量調整此數字。從每項工作 512MB 開始。

進階設定

除了一般設定,我們還有其他外掛程式專用的設定,只要滿足特定要求,就有可能提升效能。這些設定能控制 GPU 上可執行的作業 (參見下表)。啟用這些設定可讓 GPU 最佳化並執行更多內容,但請務必瞭解各設定的功用。例如,GPU 可能無法與 CPU 版本 100% 相容。例如,浮點數可能略有不同。如需設定詳細資訊,請參閱適用於 Spark 設定的 RAPIDS 加速器

使用實體計畫監測

Spark 適用的 RAPIDS 加速器無需使用者變更 API,且會用 GPU 作業取代其所支援的 SQL 操作。如要查看哪些作業受到 GPU 作業所取代,可呼叫 explain 方法印出 DataFrame 的實體計畫,該計畫詳載 GPU 上所有以 GPU 為前綴的作業。 

現在請針對我們在第 1 章探討過的部分查詢項目,比較 DataFrame 實體計畫與 GPU 處理的結果。在下方的實體計畫中,DAG 包括一個 GpuBatchScan、一個以小時為單位的 GpuFilter、一個以小時為單位的 GpuProject (選擇欄)、fare_amount (車資) 以及 day_of_week (星期)。用 CPU 處理時,這份計畫包括 FileScan、Filter 及 Project。

// 選擇與篩選為縮窄轉換 (narrow transformation)
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2)

result:
+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0|       10.5|
| 0.0|       12.5|
+----+-----------+

df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).explain

result:
== Physical Plan ==
*(1) GpuColumnarToRow false<
+- !GpuProject [hour#10, fare_amount#9]
   +- GpuCoalesceBatches TargetSize(1000000,2147483647)
      +- !GpuFilter (gpuisnotnull(hour#10) AND (hour#10 =0.0))
      +- GpuBatchScan[fare_amount#9, hour#10] GpuCSVScan Location:
InMemoryFileIndex[s3a://spark-taxi-dataset/raw-small/train], ReadSchema: struct<fare_amount:double,hour:double>

請注意,原始計畫中的多數節點皆已由 GPU 版本所取代。RAPIDS 加速器插入資料格式轉換節點,例如 GpuColumnarToRow 及 GpuRowToColumnar,以便針對會在 GPU 上執行之節點的欄式處理及會在 CPU 上執行之節點的列處理之間轉換。  如要瞭解部分查詢無法於 GPU 上執行的原因,請將設定 spark.rapids.sql.explain 改為 true。輸出會記錄於驅動程式的記錄檔或以互動模式顯示於螢幕。

使用 Spark Web UI 監測

SQL 分頁

如要查看 GPU 正在執行的內容,最簡單的方法是檢視 Spark Web UI 中的 "SQL" 分頁。在 SQL 分頁針對下方查詢顯示的 DAG 圖表中,我們能看到實體計畫包括 GPUBatchScan、Project、GPUHashAggregate 及 GPUHashAggregate。用 CPU 處理時,Spark 會先針對各分割區執行雜湊彙總,然後再於 Exchange 中進行資料隨機置換以供廣泛轉換 (wide transformation)。經過交換後,會產生前一個子彙總的雜湊彙總。請注意,GPU 處理時不會於 Exchange 進行隨機置換。

val df3 =df2.groupBy("month").count
.orderBy(asc("month"))show(5)

階段分頁

請用階段詳細資訊頁面查看階段詳細資訊 DAG,其中藍色頂點 (方框) 代表 RDD 或 DataFrame,邊緣 (方框之間的箭頭) 則代表套用於 DataFrame 的作業。

環境分頁

請用「環境」分頁檢視並確認 GPU 設定的屬性正確無誤,例如 Spark.executor.resource.gpu.amount 及 spark.executor.resource.gpu.discoveryScript properties。您也可在此檢視「系統屬性」classpath 項目,確認外掛程式 jar 皆在 Java 虛擬機器 (JVM) classpath 中。

表 1。    Spark 屬性

名稱
spark.executor.resource.gpu.amount 1
spark.executor.resource.gpu.discoveryScript /home/ubuntu/getGpusResources.sh

執行程式分頁

您可利用「執行程式」分頁查看分配至應用程式執行程式的資源。在本例中,執行程式已分配到一個 GPU。

偵錯

目前,偵錯的最佳方式和您通常在 Spark 上偵錯的方式相同。查看 UI 和記錄檔案,檢查哪裡失敗。如果您有來自 GPU 的區段錯誤故障,請找一下 hs_err_pid.log 檔案。為了確保您的 hs_err_pid.log 檔案可直接進入 YARN 應用程式記錄目錄,您可於設定中新增:--conf spark.executor.extraJavaOptions="-XX:ErrorFile=<LOG_DIR>/hs_err_pid_%p.log"。

如果您想瞭解為什麼 GPU 上沒有執行某項操作,請開啟設定:--conf spark.rapids.sql.explain=NOT_ON_GPU。Spark 作業無法於 GPU 上執行的原因已透過記錄檔訊息輸出至驅動程式記錄。

GPU 記憶體不足

GPU 記憶體不足時會以多種方式顯示。  可能會出現記憶體不足的錯誤訊息,也可能只表示當機。通常這表示分割區規模過大,請回到「設定」區段設定分割區規模和/或分割區數量。請嘗試將同時執行的 GPU 工作數縮減到一個。您也可從 Spark UI 得知資料的規模。請檢視失敗階段的輸入資料或隨機置換資料規模。

摘要

在本章中,我們探討了開始使用適用於 Apache Spark 3 x 的全新 RAPIDS API 外掛程式基礎知識。這款外掛程式運用了 GPU 來加快處理速度。如需瞭解更多資訊,請參閱適用於 Spark 的 RAPIDS 加速器指南