Spark 應用程式如何執行?

執行 Spark 查詢時會經過以下步驟: 

  • 建立邏輯計畫
  • 將邏輯計畫轉換為實體計畫
  • 產生程式碼
  • 在叢集上執行工作

您可以使用 Apache Spark 提供的 Web UI,查看以有向無環圖 (DAG) 視覺呈現的計畫。藉由 Web Ui,您還可以瞭解 計畫執行方式,並監控 Spark 叢集上的狀態和資源消耗。您可以使用此 URL 即時查看 Web UI:http://<driver-node>:4040。 如果存在應用程式的事件記錄,您可以在執行後透過 Spark 的歷史伺服器查看 Web UI: http://<server-url>:18080。  

第一步,為提交的 SQL 或 DataFrame 建立邏輯計畫。邏輯計畫顯示即將執行的抽象轉換集合。Spark 分析工具使用中繼資料目錄來解決表格和欄位,然後將計畫傳遞至 Catalyst 最佳化工具,使用篩選下推等規則將計畫最佳化。  

動作會觸發邏輯 DAG 轉換為實體執行計畫。實體計畫使用不同執行策略的成本模型以識別執行計畫的資源。廣播聯結 (broadcast join) 和雜湊聯結 (hash join) 即是此類範例。  

查看實體計畫

您可以呼叫 explain (「格式化」) 方法查看 DataFrame 的格式化實體計畫。在下面的實體計畫中,df2 的 DAG 包括掃描 CSV 檔、day_of_week 上的篩選條件以及 hour、fare_amount 和 day_of_week 組成的專案 (選擇欄位)。 

val df =spark.read.option("inferSchema", "false") .option("header", true).schema(schema).csv(file)
val df2 =df.select($"hour", $"fare_amount", $"day_of_week").filter($"day_of_week" === "6.0" )
df2.show(3)
result: 
+----+-----------+-----------+
|hour|fare_amount|day_of_week|
+----+-----------+-----------+
|10.0|       11.5|        6.0|
|10.0|        5.5|        6.0|
|10.0|       13.0|        6.0|
+----+-----------+-----------+
df2.explain(“formatted”)
result:
== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- Scan csv  (1)

(1) Scan csv
Location: [dbfs:/FileStore/tables/taxi_tsmall.csv] 
Output [3]: [fare_amount#143, hour#144, day_of_week#148]
PushedFilters: [IsNotNull(day_of_week), EqualTo(day_of_week,6.0)]

(2) Filter [codegen id : 1]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]
Condition : (isnotnull(day_of_week#148) AND (day_of_week#148 =6.0))

(3) Project [codegen id : 1]
Output [3]: [hour#144, fare_amount#143, day_of_week#148]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]

歡迎在 Web UI SQL 分頁上查看 Catalyst 產生計畫的更多詳細資訊。按一下查詢描述連結以查看 DAG 和查詢的詳細資訊。

經過解釋後,我們可以從以下的程式碼中瞭解到,df3 的實體計畫包括掃描、篩選條件、專案、雜湊彙總 (HashAggregate)、Exchange 和雜湊彙總。交換是指以 groupBy 轉換進行隨機置換。在隨機置換 Exchange 中的資料前,Spark 會對每個分割執行雜湊彙總。經過交換後,會產生前一個子彙總的雜湊彙總。請注意,如果快取 df2,則此 DAG 中會進行記憶體內掃描而非檔案掃描。

val df3 =df2.groupBy("hour").count
df3.orderBy(asc("hour"))show(5)
result:
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   47|
| 2.0|  658|
| 3.0|  742|
| 4.0|  812|
+----+-----+

df3.explain
result:
== Physical Plan ==
* HashAggregate (6)
+- Exchange (5)
   +- * HashAggregate (4)
      +- * Project (3)
         +- * Filter (2)
            +- Scan csv  (1)
(1) Scan csv 
Output [2]: [hour, day_of_week]
(2) Filter [codegen id : 1]
Input [2]: [hour, day_of_week]
Condition : (isnotnull(day_of_week) AND (day_of_week =6.0))
(3) Project [codegen id : 1]
Output [1]: [hour]
Input [2]: [hour, day_of_week]
(4) HashAggregate [codegen id : 1]
Input [1]: [hour]
Functions [1]: [partial_count(1) AS count]
Aggregate Attributes [1]: [count]
Results [2]: [hour, count]
(5) Exchange
Input [2]: [hour, count]
Arguments: hashpartitioning(hour, 200), true, [id=]
(6) HashAggregate [codegen id : 2]
Input [2]: [hour, count]
Keys [1]: [hour]
Functions [1]: [finalmerge_count(merge count) AS count(1)]
Aggregate Attributes [1]: [count(1)]
Results [2]: [hour, count(1) AS count]

按一下此查詢的 SQL 分頁連結以顯示作業的 DAG。

選擇「展開詳細資訊」核取方塊,顯示每個階段的詳細資訊。第一個區塊「全階段程式碼」(WholeStageCodegen) 將多個運算子 (掃描 CSV 檔、篩選條件、專案和雜湊彙總) 編譯成一個單一的 Java 函數,以提高效能。下方螢幕顯示列數和溢出大小 (spill size) 等指標。

第二個區塊標題為 Exchange,顯示隨機置換的指標,包括寫入換的隨機置記錄數量和總資料大小。

在叢集上執行工作

第三步,排程工作並在叢集上執行。

排程器依據轉換將圖形分為不同階段。縮窄轉換 (資料不移動的轉換) 會分組 (流程化) 至單一階段。此範例的實體計畫分為兩個階段,交換前的所有資料在第一階段。Spark 在執行階段執行進一步最佳化,包括全階段 Java 程式碼生成。如此一來,便可為 SQL 查詢中的運算子集產生最佳化的單一 Java 函數的位元組程式碼 (如果可行),以最佳化 CPU 使用率,而非透過為每個運算子生成迭代程式碼的方式來達成目的。

各階段皆由工作組成,以 DataFrame 的分割區為基礎,同時執行相同運算。

接下來,排程器將設置的階段工作集合提交至工作排程器,以將工作傳送至執行程式以執行。

工作完成後,依照動作不同,動作值將會傳回至驅動程式或寫入磁碟。

按一下 Web UI 作業分頁可瞭解工作進度的詳細資訊,包括階段和工作。在下列範例中,作業包括兩個階段,隨機置換前的階段有兩個工作,隨機置換後的階段有 200 個工作。工作數量與分割區數量相對應。在讀入第一階段的檔案後,會有兩個分割區。

隨機置換後,分割區的預設數量為 200。(使用 spark.sql.shuffle.partitions 屬性隨機置換資料時,可以設定要使用的分割區數量)。

摘要

我們在本章介紹了 Spark,展示 Spark 如何在叢集上執行您的程式碼,以及如何使用 Spark Web UI 監控此程式碼。在偵錯、分析和調整應用程式效能時,瞭解 Spark 如何執行應用程式相當重要。