巨量資料是都市改善的十大領域之一。分析都市內的地點和行為模式,可最佳化交通運輸、做出更完善的規劃決策,並提供更具智慧性的廣告。例如,對 GPS 汽車資料的分析讓都市能夠根據即時交通資訊,使交通運輸流量最佳化。電信公司利用行動電話定位資料,透過識別和預測大都會地區人口的地點活動趨勢和模式,提供相關見解。除此之外,將機器學習 (ML) 應用於地理定位資料,對於識別 電信、旅遊、行銷和製造業 中的模式和趨勢上也有所助益。
在這一章節中,我們會使用紐約的公共計程車車程資料來檢查其迴歸分析,因為此資料和預測紐約市計程車車資有所關聯。我們會從 XGBoost 演算法的概述開始,然後探討使用案例。
XGBoost,代表極端梯度提升 (Extreme Gradient Boosting),是種可擴充的分散式梯度提升決策樹 (GBDT) 機器學習函式庫。XGBoost 提供的平行決策樹提升,是處理迴歸、分類和排序問題的主要機器學習函式庫。RAPIDS 團隊與 Distributed Machine Learning Common (DMLC) XGBoost 組織密切合作,現在 XGBoost 已加入了流暢的拖曳式 (drop-in) GPU 加速,能顯著加速模型訓練並提高預測準確性。
梯度提升決策樹 (GBDT) 是種類似隨機森林的決策樹集成演算法,區別在於決策樹的建造和組合方式。隨機森林使用一種稱為 Bagging 的技術,從資料集的自助樣本中平行建立完整的決策樹。最終預測是所有決策樹預測的平均值。梯度提升決策樹使用一種稱為提升的技術,來反覆訓練一個淺度決策樹的集成,每次迭代時對於在前一個樣本中未正確預測的紀錄均給予權重,透過這個方式來減少後續決策樹的錯誤。最終預測是所有決策樹預測的加權平均值。Bagging 可將差異和過度擬合最小化,而提升可將偏差和欠擬合 (underfitting) 最小化。
XGBoost 是 GBDT 的一個變異。透過 GBDT,決策樹是按順序建立的。透過 XGBoost,決策樹是平行建立的,遵循同一層無差別 (level-wise) 的策略進行跨梯度值掃描,並使用這些部分加總來評估訓練集內每個可能分段 (split) 的分段品質。
GPU 加速的 XGBoost 演算法使用快速平行前置加總運算來掃描所有可能的分段,以及平行基數排序 (parallel radix sorting) 以重新分割資料。這種演算法為指定的提升反覆迭代建立決策樹,一次建立一個層級,同時在 GPU 上處理整個資料集。
GPU 加速的 XGBoost 提供以下主要功能:
範例資料集是紐約計程車資料集,並已經過清理和轉換來新增特徵,例如使用此 Spark ETL 筆記型電腦計算的半正矢距離 (Haversine Distance)。
在這個案例中,我們會根據以下特徵建立模型來預測計程車車資金額:
首先,我們匯入 Spark xgboost 的 GPU 版本和 CPU 版本所需的套件:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.ml._ import org.apache.spark.ml.feature._ import org.apache.spark.ml.evaluation._ import org.apache.spark.sql.types._ import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressor, XGBoostRegressionModel}
針對 Spark xgboost 的 GPU 版本,您需要匯入以下內容:
import ml.dmlc.xgboost4j.scala.spark.rapids.{GpuDataReader, GpuDataset}
我們指定架構為 Spark StructType。
lazy val schema = StructType(Array( StructField("vendor_id", DoubleType), StructField("passenger_count", DoubleType), StructField("trip_distance", DoubleType), StructField("pickup_longitude", DoubleType), StructField("pickup_latitude", DoubleType), StructField("rate_code", DoubleType), StructField("store_and_fwd", DoubleType), StructField("dropoff_longitude", DoubleType), StructField("dropoff_latitude", DoubleType), StructField(labelName, DoubleType), StructField("hour", DoubleType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", DoubleType), StructField("day_of_week", DoubleType), StructField("is_weekend", DoubleType) ))
在以下程式碼中,我們將建立一個 Spark 工作階段並設定訓練和評估資料的檔案路徑。(注意:如果您使用的是筆記型電腦,則不需要建立 Spark 工作階段。)
val trainPath ="/FileStore/tables/taxi_tsmall.csv" val evalPath ="/FileStore/tables/taxi_esmall.csv" val spark =SparkSession.builder().appName("Taxi-GPU").getOrCreate
我們將 CSV 檔案中的資料載入至 Spark DataFrame 中,指定資料來源和架構以載入 DataFrame 中,如下所示。
val tdf =spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv(trainPath) val edf =spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv(evalPath)
DataFrame show(5) 顯示前 5 列:
tdf.select("trip_distance", "rate_code","fare_amount").show(5) result: +------------------+-------------+-----------+ | trip_distance| rate_code|fare_amount| +------------------+-------------+-----------+ | 2.72|-6.77418915E8| 11.5| | 0.94|-6.77418915E8| 5.5| | 3.63|-6.77418915E8| 13.0| | 11.86|-6.77418915E8| 33.5| | 3.03|-6.77418915E8| 11.0| +------------------+-------------+-----------+
Describe() 函式回傳包含描述性摘要統計的 DataFrame,例如每個數值欄的計數、平均值、標準偏差及最小值和最大值。
tdf.select("trip_distance", "rate_code","fare_amount").describe().show +-------+------------------+--------------------+------------------+ |summary| trip_distance| rate_code| fare_amount| +-------+------------------+--------------------+------------------+ | count| 7999| 7999| 7999| | mean| 3.278923615451919|-6.569284350812602E8|12.348543567945994| | stddev|3.6320775770793547|1.6677419425906155E8|10.221929466939088| | min| 0.0| -6.77418915E8| 2.5| | max|35.970000000000006| 1.957796822E9| 107.5| +-------+------------------+--------------------+------------------+
下方的散佈圖用於探索車資金額和車程距離之間的相關性。
%sql select trip_distance, fare_amount from taxi
要使機器學習演算法使用特徵,必須將其轉換並放入特徵向量中,也就是代表每個特徵值的數字向量。以下內容使用向量合併 (VectorAssembler) 轉換器來回傳具有標籤和向量特徵的全新 DataFrame。
// 特徵欄名稱 val featureNames =Array("passenger_count","trip_distance", "pickup_longitude","pickup_latitude","rate_code","dropoff_longitude", "dropoff_latitude", "hour", "day_of_week","is_weekend") // 建立轉換器 object Vectorize { def apply(df: DataFrame, featureNames: Seq[String], labelName: String): DataFrame = { val toFloat = df.schema.map(f =>col(f.name).cast(FloatType)) new VectorAssembler() .setInputCols(featureNames.toArray) .setOutputCol("features") .transform(df.select(toFloat:_*)) .select(col("features"), col(labelName)) } } // 轉換方法加入特徵欄 var trainSet =Vectorize(tdf, featureNames, labelName) var evalSet =Vectorize(edf, featureNames, labelName) trainSet.take(1) result: res8: Array[org.apache.spark.sql.Row] = Array([[5.0,2.7200000286102295,-73.94813537597656,40.82982635498047,-6.77418944E8,-73.96965026855469,40.79747009277344,10.0,6.0,1.0],11.5])
使用 XGBoost GPU 版本時,不需要向量合併器。
在 CPU 版本中,num_workers 應設定為 CPU 核心的數量、tree_method 設為「hist」,及將特徵欄設為在向量合併器中的輸出特徵欄。
lazy val paramMap =Map( "learning_rate" -> 0.05, "max_depth" -> 8, "subsample" -> 0.8, "gamma" -> 1, "num_round" -> 500 ) // 設定 xgboost 參數 val xgbParamFinal =paramMap ++ Map("tree_method" -> "hist", "num_workers" -> 12) // 建立 xgboostregressor 估計器 val xgbRegressor =new XGBoostRegressor(xgbParamFinal) .setLabelCol(labelName) .setFeaturesCol("features")
在 GPU 版本中,num_workers 應設定為 Spark 叢集中具備 GPU 的機器數量、tree_method 設為 「gpu_hist」,及將特徵欄設為包含特徵名稱的陣列字串。
val xgbParamFinal =paramMap ++ Map("tree_method" -> "gpu_hist", "num_workers" -> 1) // 建立估計器 val xgbRegressor =new XGBoostRegressor(xgbParamFinal) .setLabelCol(labelName) .setFeaturesCols(featureNames)
以下程式碼使用訓練資料集中的 XGBoostRegressor 估計器擬合方法來訓練,並回傳 XGBoostRegressor 模型。我們亦使用時間方法來回傳時間以進行模型訓練,並用它來比較 CPU 和 GPU 之間的時間訓練。
object Benchmark { def time[R](phase: String)(block: => R): (R, Float) = { val t0 =System.currentTimeMillis val result =block // call-by-name val t1 =System.currentTimeMillis println("Elapsed time [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s") (result, (t1 - t0).toFloat / 1000) } } // 使用估計器來擬合 (訓練) 模型 val (model, _) =Benchmark.time("train") { xgbRegressor.fit(trainSet) }
模型的效能可利用尚未用於訓練的評估版資料集進行評估。我們使用模型轉換方法取得測試資料的預測。
該模型會使用訓練過的 XGBoost 模型進行估計,然後在回傳 DataFrame 的新預測欄中,回傳車資金額預測。在這裡我們再次使用基準時間法來比較預測時間。
val (prediction, _) =Benchmark.time("transform") { val ret =model.transform(evalSet).cache() ret.foreachPartition(_ => ()) ret } prediction.select( labelName, "prediction").show(10) Result: +-----------+------------------+ |fare_amount| prediction| +-----------+------------------+ | 5.0| 4.749197959899902| | 34.0|38.651187896728516| | 10.0|11.101678848266602| | 16.5| 17.23284912109375| | 7.0| 8.149757385253906| | 7.5|7.5153608322143555| | 5.5| 7.248467922210693| | 2.5|12.289423942565918| | 9.5|10.893491744995117| | 12.0| 12.06682014465332| +-----------+------------------+
迴歸評估器 (RegressionEvaluator) 評估方法從預測和標籤欄中計算均方根誤差,即平均平方誤差的平方根。
val evaluator =new RegressionEvaluator().setLabelCol(labelName) val (rmse, _) =Benchmark.time("evaluation") { evaluator.evaluate(prediction) } println(s"RMSE ==$rmse") Result: Elapsed time [evaluation]: 0.356s RMSE ==2.6105287283128353
如下所示,模型可儲存至磁碟中以供日後使用。
model.write.overwrite().save(savepath)
儲存模型的結果是用於中繼資料的 JSON 檔案和用於模型資料的 Parquet 檔案。我們可以用重新載入命令將模型重新載入。原始和重新載入的模型是相同的。
val sameModel =XGBoostRegressionModel.load(savepath)
在本章中,我們介紹了 XGBoost 的基本知識,包含 XGBoost 的運作方式以及如何透過 Spark 使用 XGBoost 迴歸預測計程車車資金額。現在,您可以在 CPU 和 GPU 上執行此範例,運用更大的資料集來比較預測的時間和準確性。