Big data is one of the 10 major areas used to improve cities. The analysis of location and behavior patterns within cities allows for optimization of traffic, better planning decisions, and smarter advertising. For example, the analysis of GPS car data enables cities to optimize traffic flows based on real-time traffic information. Telecom companies are using mobile phone location data to provide insights by identifying and predicting the location activity trends and patterns of a population in large metropolitan areas. And, the application of machine learning (ML) to geolocation data is proving instrumental in identifying patterns and trends for the telecom, travel, marketing, and manufacturing industries.
In this chapter, we’ll use public New York taxi trip data to examine regression analysis on taxi trip data as it pertains to predicting NYC taxi fares. We’ll start with an overview of the XGBoost algorithm and then explore the use case.
XGBoost, which stands for Extreme Gradient Boosting, is a scalable, distributed gradient-boosted decision tree (GBDT) machine learning library. XGBoost provides parallel tree boosting and is the leading ML library for regression, classification, and ranking problems. The RAPIDS team works closely with the Distributed Machine Learning Common (DMLC) XGBoost organization, and XGBoost now includes seamless, drop-in GPU acceleration, significantly speeding up model training and improving accuracy for better predictions.
Gradient Boosting Decision Trees (GBDTs) is a decision tree ensemble algorithm similar to Random Forest, the difference is in how the trees are built and combined. Random Forest uses a technique called bagging to build full decision trees in parallel from bootstrap samples of the data set. The final prediction is an average of all of the decision tree predictions. Gradient Boosting Decision Trees use a technique called boosting to iteratively train an ensemble of shallow decision trees, with each iteration using weights given to records in the previous sample, which did not predict correctly, to decrease the error of the succeeding tree. The final prediction is a weighted average of all of the decision tree predictions. Bagging minimizes the variance and overfitting, boosting minimizes the bias and underfitting.
XGBoost is a variation of GBDTs. With GBDTs, the decision trees are built sequentially. With XGBoost, trees are built in parallel, following a level-wise strategy, scanning across gradient values and using these partial sums to evaluate the quality of splits at every possible split in the training set.
The GPU-accelerated XGBoost algorithm makes use of fast parallel prefix sum operations to scan through all possible splits, as well as parallel radix sorting to repartition data. It builds a decision tree for a given boosting iteration, one level at a time, processing the entire Dataset concurrently on the GPU.
GPU-accelerated Spark XGBoost offers the following key features:
The example dataset is a New York Taxi Dataset, which has already been cleaned up and transformed to add features, such as the haversine distance using this Spark ETL notebook.
In this scenario, we’ll build a model to predict the taxi fare amount, based on the following features:
First, we import the packages needed for both GPU version and CPU versions of Spark xgboost:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.ml._ import org.apache.spark.ml.feature._ import org.apache.spark.ml.evaluation._ import org.apache.spark.sql.types._ import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressor, XGBoostRegressionModel}
For the GPU version of Spark xgboost you also need the following import:
import ml.dmlc.xgboost4j.scala.spark.rapids.{GpuDataReader, GpuDataset}
We specify the schema with a Spark StructType.
lazy val schema = StructType(Array( StructField("vendor_id", DoubleType), StructField("passenger_count", DoubleType), StructField("trip_distance", DoubleType), StructField("pickup_longitude", DoubleType), StructField("pickup_latitude", DoubleType), StructField("rate_code", DoubleType), StructField("store_and_fwd", DoubleType), StructField("dropoff_longitude", DoubleType), StructField("dropoff_latitude", DoubleType), StructField(labelName, DoubleType), StructField("hour", DoubleType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", DoubleType), StructField("day_of_week", DoubleType), StructField("is_weekend", DoubleType) ))
In the following code we create a Spark session and set the training and evaluation data file paths. (Note: If you are using a notebook, then you do not have to create the SparkSession.)
val trainPath = "/FileStore/tables/taxi_tsmall.csv" val evalPath = "/FileStore/tables/taxi_esmall.csv" val spark = SparkSession.builder().appName("Taxi-GPU").getOrCreate
We load the data from a CSV file into a Spark DataFrame, specifying the datasource and schema to load into a DataFrame, as shown below.
val tdf = spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv(trainPath) val edf = spark.read.option("inferSchema", "false").option("header", true).schema(schema).csv(evalPath)
DataFrame show(5) displays the first 5 rows:
tdf.select("trip_distance", "rate_code","fare_amount").show(5) result: +------------------+-------------+-----------+ | trip_distance| rate_code|fare_amount| +------------------+-------------+-----------+ | 2.72|-6.77418915E8| 11.5| | 0.94|-6.77418915E8| 5.5| | 3.63|-6.77418915E8| 13.0| | 11.86|-6.77418915E8| 33.5| | 3.03|-6.77418915E8| 11.0| +------------------+-------------+-----------+
The function describe returns a DataFrame containing descriptive summary statistics, such as count, mean, standard deviation, and minimum and maximum value for each numerical column.
tdf.select("trip_distance", "rate_code","fare_amount").describe().show +-------+------------------+--------------------+------------------+ |summary| trip_distance| rate_code| fare_amount| +-------+------------------+--------------------+------------------+ | count| 7999| 7999| 7999| | mean| 3.278923615451919|-6.569284350812602E8|12.348543567945994| | stddev|3.6320775770793547|1.6677419425906155E8|10.221929466939088| | min| 0.0| -6.77418915E8| 2.5| | max|35.970000000000006| 1.957796822E9| 107.5| +-------+------------------+--------------------+------------------+
The following scatter plot is used to explore the correlation between the fare amount and the trip distance.
%sql select trip_distance, fare_amount from taxi
For the features to be used by an ML algorithm, they are transformed and put into feature vectors, which are vectors of numbers representing the value for each feature. Below, a VectorAssembler transformer is used to return a new DataFrame with a label and a vector features column.
// feature column names val featureNames = Array("passenger_count","trip_distance", "pickup_longitude","pickup_latitude","rate_code","dropoff_longitude", "dropoff_latitude", "hour", "day_of_week","is_weekend") // create transformer object Vectorize { def apply(df: DataFrame, featureNames: Seq[String], labelName: String): DataFrame = { val toFloat = df.schema.map(f => col(f.name).cast(FloatType)) new VectorAssembler() .setInputCols(featureNames.toArray) .setOutputCol("features") .transform(df.select(toFloat:_*)) .select(col("features"), col(labelName)) } } // transform method adds features column var trainSet = Vectorize(tdf, featureNames, labelName) var evalSet = Vectorize(edf, featureNames, labelName) trainSet.take(1) result: res8: Array[org.apache.spark.sql.Row] = Array([[5.0,2.7200000286102295,-73.94813537597656,40.82982635498047,-6.77418944E8,-73.96965026855469,40.79747009277344,10.0,6.0,1.0],11.5])
When using the XGBoost GPU version, the VectorAssembler is not needed.
For the CPU version the num_workers should be set to the number of CPU cores, the tree_method to “hist,” and the features column to the output features column in the Vector Assembler.
lazy val paramMap = Map( "learning_rate" -> 0.05, "max_depth" -> 8, "subsample" -> 0.8, "gamma" -> 1, "num_round" -> 500 ) // set up xgboost parameters val xgbParamFinal = paramMap ++ Map("tree_method" -> "hist", "num_workers" -> 12) // create the xgboostregressor estimator val xgbRegressor = new XGBoostRegressor(xgbParamFinal) .setLabelCol(labelName) .setFeaturesCol("features")
For the GPU version the num_workers should be set to the number of machines with GPU in the Spark cluster, the tree_method to “gpu_hist,” and the features column to an array of strings containing the feature names.
val xgbParamFinal = paramMap ++ Map("tree_method" -> "gpu_hist", "num_workers" -> 1) // create the estimator val xgbRegressor = new XGBoostRegressor(xgbParamFinal) .setLabelCol(labelName) .setFeaturesCols(featureNames)
The following code uses the XGBoostRegressor estimator fit method on the training dataset to train and return an XGBoostRegressor model. We also use a time method to return the time to train the model and we use this to compare the time training with CPU vs. GPU.
object Benchmark { def time[R](phase: String)(block: => R): (R, Float) = { val t0 = System.currentTimeMillis val result = block // call-by-name val t1 = System.currentTimeMillis println("Elapsed time [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s") (result, (t1 - t0).toFloat / 1000) } } // use the estimator to fit (train) a model val (model, _) = Benchmark.time("train") { xgbRegressor.fit(trainSet) }
The performance of the model can be evaluated using the eval dataset which has not been used for training. We get predictions on the test data using the model transform method.
The model will estimate with the trained XGBoost model, and then return the fare amount predictions in a new predictions column of the returned DataFrame. Here again, we use the Benchmark time method in order to compare prediction times.
val (prediction, _) = Benchmark.time("transform") { val ret = model.transform(evalSet).cache() ret.foreachPartition(_ => ()) ret } prediction.select( labelName, "prediction").show(10) Result: +-----------+------------------+ |fare_amount| prediction| +-----------+------------------+ | 5.0| 4.749197959899902| | 34.0|38.651187896728516| | 10.0|11.101678848266602| | 16.5| 17.23284912109375| | 7.0| 8.149757385253906| | 7.5|7.5153608322143555| | 5.5| 7.248467922210693| | 2.5|12.289423942565918| | 9.5|10.893491744995117| | 12.0| 12.06682014465332| +-----------+------------------+
The RegressionEvaluator evaluate method calculates the root mean square error, which is the square root of the mean squared error, from the prediction and label columns.
val evaluator = new RegressionEvaluator().setLabelCol(labelName) val (rmse, _) = Benchmark.time("evaluation") { evaluator.evaluate(prediction) } println(s"RMSE == $rmse") Result: Elapsed time [evaluation]: 0.356s RMSE == 2.6105287283128353
The model can be saved to disk, as shown below, in order to use later.
model.write.overwrite().save(savepath)
The result of saving the model is a JSON file for metadata and a Parquet file for model data. We can reload the model with the load command. The original and reloaded models are the same.
val sameModel = XGBoostRegressionModel.load(savepath)
In this chapter, we covered the basics of how XGBoost works and how to use XGBoost Regression with Spark to predict taxi fare amounts. You can now run this example on CPUs and GPUs with a larger dataset to compare time and accuracy of predictions.