In Chapter 3, we discussed the features of GPU-Acceleration in Spark 3.x. In this chapter, we go over the basics of getting started using the new RAPIDS Accelerator for Apache Spark 3.x that leverages GPUs to accelerate processing via the RAPIDS libraries (For details refer to the Getting Started with the RAPIDS Accelerator for Apache Spark).
The RAPIDS Accelerator for Apache Spark has the following features and limitations:
To enable this GPU acceleration, you will need:
The way you deploy Spark affects the steps you must take to install and setup Spark and the RAPIDS Accelerator for Spark. The primary methods to deploy Spark are:
For installation basically you need Spark 3.x, the RAPIDS Accelerator for Spark jars, and a GPU discovery script available on every worker node. With Local you install these locally. With Spark Standalone, you install these on all the nodes you will use. With Yarn you install these on a launcher node and YARN handles shipping them to the nodes as needed. With Kubernetes you either put everything you need in a docker image, or on a drive that is mounted when your Spark application runs. For details on installation refer to the Getting Started with the RAPIDS Accelerator for Apache Spark.
The Spark shell and ./bin/spark-submit support loading configuration properties dynamically, via command line options, such as --conf, or by reading configuration options from conf/spark-defaults.conf. (Refer to the Spark Configuration Guide for an overview and details on Spark configurations. )
On startup use: --conf [conf key]=[conf value]. For example:
${SPARK_HOME}/bin/spark --jars 'rapids-4-spark_2.12-0.1.0.jar,cudf-0.14.jar' \ --conf spark.plugins=com.nvidia.spark.SQLPlugin \ --conf spark.rapids.sql.incompatibleOps.enabled=true
At runtime use: spark.conf.set("[conf key]", [conf value]). For example:
scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)
You can use --conf key value pairs to request GPUs and assign them to tasks. The exact configuration you use will vary depending on your cluster manager. Here are a few of the configuration key value properties for assigning GPUs:
--conf spark.executor.resource.gpu.amount=1
--conf spark.task.resource.gpu.amount=1
--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh
Note that spark.task.resource.gpu.amount can be a decimal amount, so if you want multiple tasks to be run on an executor at the same time and assigned to the same GPU you can set this to a decimal value less than 1. You would want this setting to correspond to the spark.executor.cores setting. For instance, if you have spark.executor.cores=2 which would allow 2 tasks to run on each executor and you want those 2 tasks to run on the same GPU then you would set spark.task.resource.gpu.amount=0.5.
The following configs are recommended to get started but must be configured based on your cluster and application:
(--conf spark.rapids.sql.concurrentGpuTasks=2),
--conf spark.sql.files.maxPartitionBytes=512m
---conf spark.hadoop.mapreduce.input.fileinputformat.split.minsize=536870912 --conf spark.hadoop.mapred.min.split.size=536870912
(--conf spark.serializer=org.apache.spark.serializer.KryoSerializer)
you need to register the GpuKryoRegistrator class, e.g.:
--conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator
Beyond the configurations, we have other plugin-specific configurations that may help performance as long as certain requirements are met. These configurations control what operations can run on the GPU (see the following Table). Enabling these allows more things to be optimized and run on the GPU, but make sure to understand what they do. For instance, the GPU may not be 100% compatible with the CPU version. For instance, floating point numbers may be slightly different. For more details on configuration refer to the RAPIDS Accelerator for Spark Configuration.
The RAPIDS Accelerator for Spark requires no API changes from the user, and it will replace SQL operations it supports with GPU operations. In order to see what operations were replaced with GPU operations, you can print out the physical plan for a DataFrame by calling the explain method, all of the operations prefixed with GPU take place on GPUs.
Now, compare the physical plan for a DataFrame with GPU processing for some of the same queries we looked at in Chapter 1. In the physical plan below, the DAG consists of a GpuBatchScan, a GpuFilter on hour, and a GpuProject (selecting columns) on hour, fare_amount, and day_of_week. With CPU processing it consisted of a FileScan, Filter, and a Project.
// select and filter are narrow transformations df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2) result: +----+-----------+ |hour|fare_amount| +----+-----------+ | 0.0| 10.5| | 0.0| 12.5| +----+-----------+ df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).explain result: == Physical Plan == *(1) GpuColumnarToRow false< +- !GpuProject [hour#10, fare_amount#9] +- GpuCoalesceBatches TargetSize(1000000,2147483647) +- !GpuFilter (gpuisnotnull(hour#10) AND (hour#10 = 0.0)) +- GpuBatchScan[fare_amount#9, hour#10] GpuCSVScan Location: InMemoryFileIndex[s3a://spark-taxi-dataset/raw-small/train], ReadSchema: struct<fare_amount:double,hour:double>
Notice how most of the nodes in the original plan have been replaced with GPU versions. The RAPIDs Accelerator inserts data format conversion nodes, like GpuColumnarToRow and GpuRowToColumnar to convert between columnar processing for nodes that will execute on the GPU and row processing for nodes that will execute on the CPU. To see why some parts of your query did not run on the GPU set the config spark.rapids.sql.explain to true. The output will be logged to the driver's log or to the screen in interactive mode.
The easiest way to see what is running on the GPU is to look at the "SQL" tab in the Spark Web UI. In the DAG diagram from the SQL Tab for the query below, we see that the physical plan consists of a GPUBatchScan, Project, GPUHashAggregate, and a GPUHashAggregate. With CPU processing Spark performs a hash aggregation for each partition before shuffling the data in the Exchange for the wide transformation. After the exchange, there is a hash aggregation of the previous sub-aggregations. Note that for GPU processing the Exchange shuffle has been avoided.
val df3 = df2.groupBy("month").count .orderBy(asc("month"))show(5)
You can use the stage details page to view a stage details DAG, where the blue vertices (boxes) represent the RDDs or DataFrames and the edges (arrows between boxes) represent the operation applied to a DataFrame.
You can use the Environment tab to view and check whether the GPU configuration your properties have been set correctly, for example the Spark.executor.resource.gpu.amount and spark.executor.resource.gpu.discoveryScript properties. Here you can also view the System Properties classpath entries to check that the plugin jars are in the JVM classpath.
Table 1. Spark Properties
You can use the Executors tab to see which resources have been allocated for the executors for your application. In this instance, one GPUs has been allocated.
For now, the best way to debug is how you would normally do it on Spark. Look at the UI and log files to see what failed. If you have a seg fault from the GPU find the hs_err_pid.log file. To make sure your hs_err_pid.log file goes into the YARN application log directory, you can add in the config: --conf spark.executor.extraJavaOptions="-XX:ErrorFile=<LOG_DIR>/hs_err_pid_%p.log".
If you want to see why an operation did not run on the GPU, turn on the configuration: --conf spark.rapids.sql.explain=NOT_ON_GPU. A log message is output to the Driver log as to why a Spark operation is not able to run on the GPU.
GPU out of memory can show up in multiple ways. You can see an out of memory error or it can also manifest as just crashes. Generally this means your partition size is too big, go back to the Configuration section for the partition size and/or the number of partitions. Possibly reduce the number of concurrent GPU tasks to one. The Spark UI may give you a hint at the size of the data. Look at either the input data or the shuffle data size for the stage that failed.
In this chapter, we covered the basics of getting started using the new RAPIDS APIs Plugin for Apache Spark 3.x that leverages GPUs to accelerate processing. For more information refer to the RAPIDS Accelerator for Spark guide.