GPU 가속 XGBoost를 사용하여 택시 요금 예측하기

빅데이터는 도시를 개선하는 데 사용되는 10개 주요 분야 중 하나입니다. 도시 내의 위치 및 행동 패턴을 분석하면 교통을 최적화하고, 더 나은 계획 결정을 내릴 수 있으며, 더 스마트한 광고가 가능합니다. 예를 들어, GPS 자동차 데이터를 분석하면 도시는 실시간 교통 정보를 기반으로 교통 흐름을 최적화할 수 있습니다. 통신사는 휴대 전화 위치 데이터를 사용하여 대도시 지역 인구의 위치 활동 동향과 패턴을 식별 및 예측하여 인사이트를 제공하고 있습니다. 또한, 지리 위치 데이터에 적용된 머신 러닝(ML)은 통신, 여행, 마케팅 및 제조 산업의 패턴과 동향을  식별하는 데 중요한 역할을 하고 있습니다. 

이 챕터에서는 뉴욕 택시 여행 공용 데이터를 사용하여 NYC 택시 요금을 예측하는 것에 관한 택시 여행 데이터 회귀 분석을 살펴봅니다. XGBoost 알고리즘 개요로 시작한 다음 사용 사례를 살펴보겠습니다.

XGBoost

Extreme Gradient Boosting의 약자인 XGBoost는 확장 가능한 분산된 그래디언트 부스트 의사결정 트리(GBDT) 머신 러닝 라이브러리입니다. XGBoost는 병렬 트리 부스팅을 제공하며 회귀, 분류 및 순위 문제에 대한 선도적인 ML 라이브러리입니다. RAPIDS 팀은 DMLC(Distributed Machine Learning Common) XGBoost 조직과 긴밀하게 협력하고 있으며, XGBoost에는 이제 원활한 드롭인 GPU 가속이 포함되어 모델 교육 속도를 크게 높이고 더 나은 예측을 위한 정확도를 향상해줍니다.

그래디언트 부스트 의사결정 트리(GBDT)는 랜덤 포레스트와 유사한 의사결정 트리 앙상블 알고리즘으로, 트리가 구축 및 결합되는 방식에 차이가 있습니다. 랜덤 포레스트는 데이터세트의 부트스트랩 샘플에서 병렬로 완전한 의사결정 트리를 구축하기 위해 배깅(bagging)이라는 기술을 사용합니다. 최종 예측은 모든 의사결정 트리 예측의 평균값입니다. 그래디언트 부스트 의사결정 트리는 다음 트리의 오류를 줄이기 위해, 각 반복에서 올바르게 예측하지 못한 이전 샘플의 레코드에 주어진 가중치를 사용하여 전체적인 얕은 의사결정 트리를 반복적으로 트레이닝하기 위해 부스팅(boosting)이라는 기술을 사용합니다. 최종 예측은 모든 의사결정 트리 예측의 가중치 평균입니다. 배깅은 분산과 과적합(Overfitting)을 최소화하여 편향과 과소적합(Underfitting)을 최소화합니다.

XGBoost는 GBDT의 변형입니다. GBDT를 사용하면 의사결정 트리가 순차적으로 빌드됩니다. XGBoost를 사용하면 수준별 전략에 따라 트리가 병렬로 빌드되어 그래디언트 값을 스캔하고, 이러한 부분합을 사용하여 트레이닝 세트의 가능한 모든 스플릿에서 스플릿 품질을 평가합니다. 

GPU 가속 XGBoost

GPU 가속 XGBoost 알고리즘은 빠른 병렬 전위합 작업을 활용하여 가능한 모든 스플릿을 스캔하고, 병렬근 정렬을 활용하여 데이터를 재분할합니다. 이 알고리즘은 주어진 부스팅 반복에 관한 의사결정 트리를 한 번에 한 단계씩 빌드하여 GPU에서 전체 데이터세트를 동시에 처리합니다.

GPU 가속 Spark XGBoost는 다음과 같은 주요 기능을 제공합니다.

  • 멀티 GPU 전반에서 ORC, CSV 및 Parquet 입력 파일의 분할
    근본적으로 지원되는 입력 파일 형식의 모든 수/크기를 다른 트레이닝 노드 간에 균등하게 나눌 수 있습니다.
  • GPU 가속 트레이닝
    다양한 트레이닝 인스턴스 중 가장 많은 수의 특성을 기반으로 고정된 메모리 내 표현이 아니라 데이터세트의 희소성을 기반으로 하여 특성을 최적으로 저장하는 트레이닝 데이터의 동적인 메모리 내 표현을 통해 XGBoost 트레이닝 시간을 개선했습니다. 의사결정 트리는 메모리를 아끼기 위해 재사용할 수 있는 그래디언트 쌍을 통해 빌드되어, 복사본을 줄여 성능을 향상합니다.
  • 효율적인 GPU 메모리 사용률
    XGBoost는 데이터가 메모리에 맞도록 요구하므로 단일 GPU 또는 분산 멀티 GPU 멀티 노드 트레이닝을 사용하여 데이터 크기에 제한이 발생합니다. 이제 GPU 메모리 사용률이 향상됨에 따라 사용자는 첫 번째 버전과 비교하여 5배의 크기를 가진 데이터로 트레이닝할 수 있습니다. 이는 성능에 영향을 주지 않고 총 트레이닝 비용을 개선하는 중요한 요소 중 하나입니다.

예시 사용 사례 데이터세트

예제 데이터세트는 이 Spark ETL 노트북을 사용하여 하버사인 거리 등의 특성을 추가하도록 이미 정리 및 변환된 뉴욕 택시 데이터세트입니다.

이 시나리오에서는 다음 특성을 기반으로 택시 운임 금액을 예측하는 모델을 빌드합니다. 

  • 레이블 운임 금액
  • 특성 {승객 수, 이동 거리, 픽업 경도, 픽업 위도, 요금 코드, 하차 경도, 하차 위도, 시간, 요일, 주말 여부}

파일에서 DataFrame으로 데이터 로드

먼저 Spark xgboost의 GPU 버전과 CPU 버전 모두에 필요한 패키지를 가져옵니다.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation._
import org.apache.spark.sql.types._
import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressor, XGBoostRegressionModel}

Spark xgboost의 GPU 버전의 경우 다음 파일도 가져와야 합니다.

import ml.dmlc.xgboost4j.scala.spark.rapids.{GpuDataReader, GpuDataset}

Spark StructType을 통해 스키마를 지정합니다.

lazy val schema =
  StructType(Array(
    StructField("vendor_id", DoubleType),
    StructField("passenger_count", DoubleType),
    StructField("trip_distance", DoubleType),
    StructField("pickup_longitude", DoubleType),
    StructField("pickup_latitude", DoubleType),
    StructField("rate_code", DoubleType),
    StructField("store_and_fwd", DoubleType),
    StructField("dropoff_longitude", DoubleType),
    StructField("dropoff_latitude", DoubleType),
    StructField(labelName, DoubleType),
    StructField("hour", DoubleType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", DoubleType),
    StructField("day_of_week", DoubleType),
    StructField("is_weekend", DoubleType)
  )) 

다음 코드에서는 Spark 세션을 만들고 트레이닝 및 평가 데이터 파일 경로를 설정합니다. (참고: 노트북을 사용하는 경우 SparkSession을 만들 필요가 없습니다.)

val trainPath = "/FileStore/tables/taxi_tsmall.csv"
val evalPath  = "/FileStore/tables/taxi_esmall.csv"
val spark = SparkSession.builder().appName("Taxi-GPU").getOrCreate

아래에 나타난 대로 DataFrame에 로드할 데이터 소스와 스키마를 지정하여 CSV 파일의 데이터를 Spark DataFrame에 로드합니다. 

val tdf = spark.read.option("inferSchema",
"false").option("header", true).schema(schema).csv(trainPath)
val edf = spark.read.option("inferSchema", "false").option("header",
true).schema(schema).csv(evalPath)

DataFrame show(5)에는 처음 5개의 행이 표시됩니다.

tdf.select("trip_distance", "rate_code","fare_amount").show(5)
결과:
+------------------+-------------+-----------+
|     trip_distance|    rate_code|fare_amount|
+------------------+-------------+-----------+
|              2.72|-6.77418915E8|       11.5|
|              0.94|-6.77418915E8|        5.5|
|              3.63|-6.77418915E8|       13.0|
|             11.86|-6.77418915E8|       33.5|
|              3.03|-6.77418915E8|       11.0|
+------------------+-------------+-----------+

다음 함수는 각 숫자 열에 대한 개수, 평균, 표준 편차, 최솟값 및 최댓값 등의 기술적 요약 통계가 포함된 DataFrame을 반환값을 서술합니다.

tdf.select("trip_distance", "rate_code","fare_amount").describe().show
+-------+------------------+--------------------+------------------+
|summary|     trip_distance|           rate_code|       fare_amount|
+-------+------------------+--------------------+------------------+
|  count|              7999|                7999|              7999|
|   mean| 3.278923615451919|-6.569284350812602E8|12.348543567945994|
| stddev|3.6320775770793547|1.6677419425906155E8|10.221929466939088|
|    min|               0.0|       -6.77418915E8|               2.5|
|    max|35.970000000000006|       1.957796822E9|             107.5|
+-------+------------------+--------------------+------------------+

다음 산포도는 운임 금액과 이동 거리 사이의 상관 관계를 탐색하는 데 사용됩니다.

%sql
trip_distance, fare_amount 선택
택시에서

특성 배열 정의

ML 알고리즘에서 사용할 특성은 각 특성의 값을 나타내는 숫자의 벡터인 특성 벡터로 변환 및 표현됩니다. 아래에서 VectorAssembler 트랜스포머는 레이블과 벡터 특성 열이 있는 새 DataFrame을 반환하는 데 사용됩니다. 

// 기능 열 이름
val featureNames = Array("passenger_count","trip_distance", "pickup_longitude","pickup_latitude","rate_code","dropoff_longitude", "dropoff_latitude", "hour", "day_of_week","is_weekend")
// 트랜스포머 생성
object Vectorize {
  def apply(df: DataFrame, featureNames: Seq[String], labelName: String): DataFrame = {
    val toFloat = df.schema.map(f => col(f.name).cast(FloatType))
    new VectorAssembler()
      .setInputCols(featureNames.toArray)
      .setOutputCol("features")
      .transform(df.select(toFloat:_*))
      .select(col("features"), col(labelName))
  }
}
// 변환 방법이 특성 열 추가
var trainSet = Vectorize(tdf, featureNames, labelName)
var evalSet = Vectorize(edf, featureNames, labelName)
trainSet.take(1)
결과:
res8: Array[org.apache.spark.sql.Row] =
Array([[5.0,2.7200000286102295,-73.94813537597656,40.82982635498047,-6.77418944E8,-73.96965026855469,40.79747009277344,10.0,6.0,1.0],11.5])

XGBoost GPU 버전을 사용하는 경우 VectorAssembler가 필요하지 않습니다.

CPU 버전의 경우 num_workers는 CPU 코어 수, tree_method는 "hist", 특성 열은 Vector Assembler의 출력 특성 열로 설정해야 합니다.

lazy val paramMap = Map(
  "learning_rate" -> 0.05,
  "max_depth" -> 8,
  "subsample" -> 0.8,
  "gamma" -> 1,
  "num_round" -> 500
)
// xgboost 매개변수 설정
val xgbParamFinal = paramMap ++ Map("tree_method" -> "hist", "num_workers" -> 12)
// xgboostregressor 추정자 생성
val xgbRegressor = new XGBoostRegressor(xgbParamFinal)
      .setLabelCol(labelName)
      .setFeaturesCol("features")

GPU 버전의 경우 num_workers는 Spark 클러스터에서 GPU가 있는 시스템 수, tree_method는 "gpu_hist", 특성 열은 특성 이름이 포함된 스트링 배열로 설정해야 합니다.

val xgbParamFinal = paramMap ++ Map("tree_method" -> "gpu_hist",
"num_workers" -> 1)
// 추정자 생성
val xgbRegressor = new XGBoostRegressor(xgbParamFinal)
  .setLabelCol(labelName)
  .setFeaturesCols(featureNames)

다음 코드는 트레이닝 데이터세트의 XGBoostRegresor 추정자 맞춤 방법을 사용하여 XGBoostRegresor 모델을 트레이닝하고 반환합니다. 또한 모델을 트레이닝하는 시간을 반환하는 데 시간 방법을 사용하며, CPU 및 GPU를 통한 트레이닝 시간을 비교하기 위해 이 방법을 사용합니다.

object Benchmark {
  def time[R](phase: String)(block: => R): (R, Float) = {
    val t0 = System.currentTimeMillis
    val result = block // call-by-name
    val t1 = System.currentTimeMillis
    println("Elapsed time [" + phase + "]: " + 
    ((t1 - t0).toFloat / 1000) + "s")
    (result, (t1 - t0).toFloat / 1000)
  }
}
// 추정자를 사용하여 모델 맞춤(트레이닝)
val (model, _) = Benchmark.time("train") {
  xgbRegressor.fit(trainSet)
}

트레이닝에 사용되지 않은 eval 데이터세트를 사용하여 모델의 성능을 평가할 수 있습니다. 모델 변환 방법을 사용하여 테스트 데이터에 대한 예측을 얻을 수 있습니다. 

모델은 트레이닝된 XGBoost 모델을 통해 추정을 수행한 다음, 반환된 DataFrame의 새 예측 열에 운임 금액 예측을 반환합니다. 여기서도 예측 시간을 비교하기 위해 벤치마크 시간 방법을 사용합니다. 

val (prediction, _) = Benchmark.time("transform") {
  val ret = model.transform(evalSet).cache()
  ret.foreachPartition(_ => ())
  ret
}
prediction.select( labelName, "prediction").show(10)
결과:
+-----------+------------------+
|fare_amount|        prediction|
+-----------+------------------+
|        5.0| 4.749197959899902|
|       34.0|38.651187896728516|
|       10.0|11.101678848266602|
|       16.5| 17.23284912109375|
|        7.0| 8.149757385253906|
|        7.5|7.5153608322143555|
|        5.5| 7.248467922210693|
|        2.5|12.289423942565918|
|        9.5|10.893491744995117|
|       12.0| 12.06682014465332|
+-----------+------------------+

RegressionEvaluator 평가 방법은 예측 및 레이블 열에서 평균 제곱 오차의 제곱근인 제곱평균제곱근 오차를 계산합니다. 

val evaluator = new RegressionEvaluator().setLabelCol(labelName)
val (rmse, _) = Benchmark.time("evaluation") {
  evaluator.evaluate(prediction)
}
println(s"RMSE == $rmse")
결과:
경과 시간 [평가]: 0.356초
RMSE == 2.6105287283128353

모델 저장

모델은 나중에 사용할 수 있도록 아래와 같이 디스크에 저장할 수 있습니다.

model.write.overwrite().save(savepath)

모델을 저장한 결과는 메타데이터에 대한 JSON 파일과 모델 데이터에 대한 Parquet 파일입니다. load 명령을 통해 모델을 다시 로드할 수 있습니다. 원래 모델 및 다시 로드된 모델은 동일합니다.

val sameModel = XGBoostRegressionModel.load(savepath)

요약

이 챕터에서는 XGBoost의 작동 방식의 기본 사항과 Spark와 XGBoost 회귀를 사용하여 택시 운임 금액을 예측하는 방법을 다루었습니다. 이제 CPU 및 GPU에서 더 큰 데이터세트에서 이 예제를 실행하여 예측에 걸리는 시간과 정확성을 비교할 수 있습니다.