Apache Spark 處理簡介

IDC 預測,資料中心、邊緣運算和物聯網產生的資料將在未來 7 年內增加 5 倍,達到 175 ZB。伴隨資料的巨量增長,Apache 軟體基金會的 Apache Spark 儼然已成為分散式擴充資料處理最熱門的框架之一,並在本機和雲端的數百萬台伺服器上運作。本章將介紹 Spark 框架並說明該框架執行應用程式的方式。 

Apache Spark 是用於大規模資料處理的快速通用分析引擎,可在 Hadoop、Apache Mesos、Kubernetes、單機或在雲端上執行。Spark 能讓高階操作員透過互動式殼層、筆記本或套件應用程式,輕鬆在 Scala、Python、R 或 SQL 中建立平行應用程式。 

除了 Spark 核心資料處理引擎,還有 SQL 及 DataFrame、機器學習、GraphX、圖表運算及串流處理專用的函式庫。這些函式庫可併用於各種資料來源的大量資料集中,例如 HDFS、Alluxio、Apache Cassandra、Apache Hbase 或 Apache Hive。

Spark 在叢集上的執行方式

Spark 應用程式是叢集節點執行程式流程內的平行工作,以協調驅動程式內 SparkSession 物件與叢集上資源或叢集管理員 (Standalone、Mesos、YARN 或 Kubernetes) 的方式執行。

Spark 也可在單一機器上執行,此即本機模式。在本機模式下,驅動程式和工作在同個 Java 虛擬機器中以執行緒的方式執行。  本機模式在原型製作、開發、偵錯和測試時相當實用。  然而,本機模式並非專為執行生產應用程式而設計。 

用檔案建立 DataFrame

Spark DataFrame 是 org.apache.spark.sql.Row 物件的分散式資料集,這些物件分隔在叢集的多個節點中,可以平行方式操作。DataFrame 以包含列與欄的資料表顯示,與 R 或 Python 的 DataFrame 相似,不過 DataFrame 具備 Spark 最佳化功能。DataFrame 由分割區組成,每個分割區在資料節點快取中皆以一系列的列來表示。

DataFrame 可根據資料來源建立,例如 csv、parquet、JSON 檔案、Hive 表格或外部資料庫。只要透過關聯式轉換及 Spark SQL 查詢,即可在 DataFrame 上操作。 

Spark 殼層或 Spark 筆記本提供了輕鬆與 Spark 互動的方式。 使用以下指令即可在本機模式中啟動殼層: 

$ /[installation path]/bin/spark-shell --master local[2]

接著只要將本章中其他程式碼輸入殼層,即可以互動方式查看結果。 在程式碼範例中,殼層的輸出便以結果作為開頭。

為了協調應用驅動程式和叢集管理員之間的執行過程,請在程式中建立一個 SparkSession 物件,如下方程式碼範例所示:

val spark =SparkSession.builder.appName("Simple Application").master("local[2]").getOrCreate()

Spark 應用程式啟動後,便會透過主要 URL 連接叢集管理員。主要 URL 可設定為叢集管理員或本機 [N],以便於建立 SparkSession 物件或於提交 Spark 應用程式時以 N 執行緒於本機執行。使用 Spark 殼層或筆記本時,SparkSession 物件已建立且可作為變數 Spark 使用。只要連接上,叢集管理員便會根據叢集中的節點配置分配資源並啟動執行程式流程。執行 Spark 應用程式時,SparkSession 會將工作發送至執行程式來運作。

透過 SparkSession Read 方法,即可將檔案資料讀取為 DataFrame,同時指定檔案類型、檔案路徑以及用於架構的輸入選項。

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val schema =
  StructType(Array(
    StructField("vendor_id", DoubleType),
    StructField("passenger_count", DoubleType),
    StructField("trip_distance", DoubleType),
    StructField("pickup_longitude", DoubleType),
    StructField("pickup_latitude", DoubleType),
    StructField("rate_code", DoubleType),
    StructField("store_and_fwd", DoubleType),
    StructField("dropoff_longitude", DoubleType),
    StructField("dropoff_latitude", DoubleType),
    StructField("fare_amount", DoubleType),
    StructField("hour", DoubleType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", DoubleType),
    StructField("day_of_week", DoubleType),
    StructField("is_weekend", DoubleType)
  ))

val file ="/data/taxi_small.csv"

val df =spark.read.option("inferSchema", "false")
.option("header", true).schema(schema).csv(file)

result: 
df: org.apache.spark.sql.DataFrame =[vendor_id: double, passenger_count:
double ... 14 more fields]

Take 方法會傳回此 DataFrame 中的物件陣列,這些物件屬於 org.apache.spark.sql.Row 類型。 

df.take(1)
result: 
Array[org.apache.spark.sql.Row] =
Array([4.52563162E8,5.0,2.72,-73.948132,40.829826999999995,-6.77418915E8,-1.0,-73.969648,40.797472000000006,11.5,10.0,2012,11,13.0,6.0,1.0])

DataFrame 轉換與動作

DataFrame 為結構化資料處理提供領域特定語言 API,這就是所謂的轉換。轉換功能可將現有的 DataFrame 轉換為新的 DataFrame,並以延遲的方式加以評估。轉換會在經動作觸發時執行,執行後會將結果傳回驅動程式或寫入磁碟。動作一經執行且相關數值傳回後,除非經過快取,否則 DataFrame 將自記憶體中消失。Spark 可透過以下指令,用記憶體內的欄式格式快取 DataFrame:dataFrame.cache()

以下是一些常用的 DataFrame 轉換清單。

  • select   :選擇一組資料欄
  • join:用特定的聯結運算式加入另一個 DataFrame
  • groupBy:用指定資料欄將 DataFrame 分組

以下 groupBy 轉換範例將計程車 DataFrame 以一天中每小時為單位分組,count 動作則會加總計程車每小時的行程次數。顯示動作會以表格格式列印 DataFrame 列的結果。 

df.groupBy("hour").count().show(4)

result: 
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   49|
| 2.0|  658|
| 3.0|  742|
+----+-----+

以下是一些常用的 DataFrame 動作清單。

  • show(n):以表格格式顯示最前面的 n 列
  • take(n):以陣列方式傳回 DataFrame 最前面的 n 列
  • count:傳回 DataFrame 中的列數

縮窄與廣泛相依性的 DataFrame 轉換

DataFrame 轉換有兩種類型,一種具有縮窄相依性,一種具有廣泛相依性。用現有 DataFrame 建立新 DataFrame 時,若使用縮窄相依性轉換,則無需在分割區之間移動資料。縮窄轉換的範例是 filter(),用於根據特定的 SQL 運算式從 DataFrame 中進行列的篩選。以下範例針對小時值 = 0 進行篩選。

多個縮窄轉換可透過名為流程化 (pipelining) 的過程在記憶體中的 DataFrame 上執行,此過程可讓縮窄轉換有效進行。下列範例運用篩選與選擇等縮窄轉換取回一天當中 0 小時的計程車車資 (fare_amounts)。

// 選擇與篩選為縮窄轉換 (narrow transformation)
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2)

result:
+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0|       10.5|
| 0.0|       12.5|
+----+-----------+

用現有 DataFrame 建立新 DataFrame 時,相依性廣泛的轉換項目必須在分割區之間移動資料,此過程稱為隨機置換 (shuffle)。隨機置換會透過網路將資料發送至其他節點並寫入磁碟,造成網路與磁碟輸入/輸出 (I/O)。廣泛轉換的範例包括分組方式 (groupBy)、彙總 (agg)、分類方式 (sortBy) 及排序方式 (orderBy)。廣泛轉換會依小時的值顯示分組。

以下的廣泛轉換為依小時的值分組,並依小時計算計程車行程次數。 

df.groupBy("hour").count().show(4)

result: 
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   49|
| 2.0|  658|
| 3.0|  742|
+----+-----+