Getting Started with GPU-Accelerated Apache Spark 3

In Chapter 3, we discussed the features of GPU-Acceleration in Spark 3.x. In this chapter, we go over the basics of getting started using the new RAPIDS Accelerator for Apache Spark 3.x that leverages GPUs to accelerate processing via the RAPIDS libraries (For details refer to the Getting Started with the RAPIDS Accelerator for Apache Spark).

The RAPIDS Accelerator for Apache Spark has the following features and limitations:

  • Allows running Spark SQL on a GPU with Columnar processing
  • Requires no API changes from the user
  • Handles transitioning from Row to Columnar and back
  • Uses Rapids cuDF library
  • Runs supported SQL operations on the GPU, If an operation is not implemented or not compatible with GPU, it will fall back to using the Spark CPU version. 
  • The plugin cannot accelerate operations that manipulate RDDs directly.
  • The accelerator library also provides an implementation of Spark’s shuffle that can leverage UCX to optimize GPU data transfers keeping as much data on the GPU as possible and bypassing the CPU to do GPU to GPU transfers.

To enable this GPU acceleration, you will need:

  • Apache Spark 3.0+
  • A spark cluster configured with GPUs that comply with the requirements for the version of RAPIDS Dataframe library cuDF.
    • One GPU per executor.
  • Add the following jars:
    • A cudf jar that corresponds to the version of CUDA available on your cluster.
    • RAPIDS Spark accelerator plugin jar.
  • Set the config spark.plugins to com.nvidia.spark.SQLPlugin

Installation and Configuration

The way you deploy Spark affects the steps you must take to install and setup Spark and the RAPIDS Accelerator for Spark. The primary methods to deploy Spark are:

  • Local mode - the driver program and tasks run in the same Java Virtual Machine. This is useful for development and testing only, it is not meant for running production applications.  
  • On a Cluster with a cluster manager:

Installation

For installation basically you need Spark 3.x, the RAPIDS Accelerator for Spark jars, and a GPU discovery script available on every worker node. With Local you install these locally. With Spark Standalone, you install these on all the nodes you will use. With Yarn you install these on a launcher node and YARN handles shipping them to the nodes as needed.  With Kubernetes you either put everything you need in a docker image, or on a drive that is mounted when your Spark application runs. For details on installation refer to the Getting Started with the RAPIDS Accelerator for Apache Spark.

Configuration

The Spark shell and ./bin/spark-submit support loading configuration properties dynamically, via command line options, such as --conf, or by reading configuration options from conf/spark-defaults.conf. (Refer to the Spark Configuration Guide for an overview and details on Spark configurations. )

On startup use: --conf [conf key]=[conf value]. For example:

${SPARK_HOME}/bin/spark --jars 'rapids-4-spark_2.12-0.1.0.jar,cudf-0.14.jar' \
      --conf spark.plugins=com.nvidia.spark.SQLPlugin \
      --conf spark.rapids.sql.incompatibleOps.enabled=true

At runtime use: spark.conf.set("[conf key]", [conf value]). For example:

   scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)

GPU Scheduling

You can use --conf key value pairs to request GPUs and assign them to tasks. The exact configuration you use will vary depending on your cluster manager. Here are a few of the configuration key value properties for assigning GPUs:

  • Request your executor to have GPUs:
  • --conf spark.executor.resource.gpu.amount=1 

  • Specify the number of GPUs per task:
  • --conf spark.task.resource.gpu.amount=1

  • Specify a discoveryScript (required on YARN and K8S):
  • --conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh

Note that spark.task.resource.gpu.amount can be a decimal amount, so if you want multiple tasks to be run on an executor at the same time and assigned to the same GPU you can set this to a decimal value less than 1. You would want this setting to correspond to the spark.executor.cores setting. For instance, if you have spark.executor.cores=2 which would allow 2 tasks to run on each executor and you want those 2 tasks to run on the same GPU then you would set spark.task.resource.gpu.amount=0.5.

Tuning

The following configs are recommended to get started but must be configured based on your cluster and application:

  • Run with one Executor per GPU. Do not try to run with multiple GPUs per executor. Each executor can run with multiple tasks, this depends on the number of cores and number of GPUs on each of your boxes. Run one executor per GPU and you can evenly divide your cores among your executors. For instance, if you have 24 cores and four GPUs per host, you can run with six cores (--conf spark.executor.cores=6). This controls the number of tasks that Spark puts on an executor at once. To control the number of tasks that are concurrently running on the GPU at once you can configure spark.rapids.sql.concurrentGpuTasks. A good starting point is to allow two tasks to run on the GPU concurrently:

    (--conf spark.rapids.sql.concurrentGpuTasks=2), 

    If you have issues with out-of-memory or slow performance change this to 1. The reason for the difference is that the tasks can still use the CPU while other tasks are running on the GPU. Currently we do not get a performance benefit from running to many tasks on the GPU at once and each of those will be using memory, so we generally limit the number on the GPU at once.
  • Set the size of the input. You will realize a performance improvement when you run bigger batches of data on your GPUs.  However, your input size will depend on the type of file you are reading from and the operation you are performing.
    • If you are using the Spark datasource api (spark.read…), use:
    •  --conf spark.sql.files.maxPartitionBytes=512m

    • If you are using Spark/Hive api to read data from a Hive Table, use:
    • ---conf
      spark.hadoop.mapreduce.input.fileinputformat.split.minsize=536870912
      --conf spark.hadoop.mapred.min.split.size=536870912

  • Configure the number of spark.sql.shuffle.partitions. Spark defaults to 200, which many times results in very small partitions. You want the data size of each partition to be large to make processing on the GPU efficient, so try to keep the number of partitions to as few as possible. Tune this along with the input size based on your application data.
    If you are using the KryoSerializer with Spark
  • (--conf spark.serializer=org.apache.spark.serializer.KryoSerializer)

    you need to register the GpuKryoRegistrator class, e.g.:

     --conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator

  • Configure the amount of executor memory like you would for a normal Spark application.

General Recommendations

  • Fewer large input files are better than lots of small files. You may not have control over this but it is worth knowing.  
  • Larger input sizes spark.sql.files.maxPartitionBytes=512m are generally better as long as things fit into the GPU.
  • The GPU does better with larger data chunks as long as they fit into memory.  When using the default spark.sql.shuffle.partitions=200 it may be beneficial to make this smaller.  Base this on the amount of data the task is reading. Start with 512MB / task.

Advanced Configuration

Beyond the configurations, we have other plugin-specific configurations that may help performance as long as certain requirements are met. These configurations control what operations can run on the GPU (see the following Table). Enabling these allows more things to be optimized and run on the GPU, but make sure to understand what they do. For instance, the GPU may not be 100% compatible with the CPU version. For instance, floating point numbers may be slightly different. For more details on configuration refer to the RAPIDS Accelerator for Spark Configuration.

Monitoring Using the Physical Plan

The RAPIDS Accelerator for Spark requires no API changes from the user, and it will replace SQL operations it supports with GPU operations. In order to see what operations were replaced with GPU operations, you can print out the physical plan for a DataFrame by calling the explain method, all of the operations prefixed with GPU take place on GPUs. 

Now, compare the physical plan for a DataFrame with GPU processing for some of the same queries we looked at in Chapter 1. In the physical plan below, the DAG consists of a GpuBatchScan, a GpuFilter on hour, and a GpuProject (selecting columns) on hour, fare_amount, and day_of_week. With CPU processing it consisted of a FileScan, Filter, and a Project.

// select and filter are narrow transformations
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2)

result:
+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0|       10.5|
| 0.0|       12.5|
+----+-----------+

df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).explain

result:
== Physical Plan ==
*(1) GpuColumnarToRow false<
+- !GpuProject [hour#10, fare_amount#9]
   +- GpuCoalesceBatches TargetSize(1000000,2147483647)
      +- !GpuFilter (gpuisnotnull(hour#10) AND (hour#10 = 0.0))
      +- GpuBatchScan[fare_amount#9, hour#10] GpuCSVScan Location:
InMemoryFileIndex[s3a://spark-taxi-dataset/raw-small/train], ReadSchema: struct<fare_amount:double,hour:double>

Notice how most of the nodes in the original plan have been replaced with GPU versions. The RAPIDs Accelerator inserts data format conversion nodes, like GpuColumnarToRow and GpuRowToColumnar to convert between columnar processing for nodes that will execute on the GPU and row processing for nodes that will execute on the CPU.  To see why some parts of your query did not run on the GPU set the config spark.rapids.sql.explain to true. The output will be logged to the driver's log or to the screen in interactive mode.

Monitoring Using the Spark Web UI

SQL Tab

The easiest way to see what is running on the GPU is to look at the "SQL" tab in the Spark Web UI. In the DAG diagram from the SQL Tab for the query below, we see that the physical plan consists of a GPUBatchScan, Project, GPUHashAggregate, and a GPUHashAggregate. With CPU processing Spark performs a hash aggregation for each partition before shuffling the data in the Exchange for the wide transformation. After the exchange, there is a hash aggregation of the previous sub-aggregations. Note that for GPU processing the Exchange shuffle has been avoided.

val df3 = df2.groupBy("month").count
.orderBy(asc("month"))show(5)

Stages Tab

You can use the stage details page to view a stage details DAG, where the blue vertices (boxes) represent the RDDs or DataFrames and the edges (arrows between boxes) represent the operation applied to a DataFrame.

Environment Tab

You can use the Environment tab to view and check whether the GPU configuration your properties have been set correctly,  for example the Spark.executor.resource.gpu.amount and spark.executor.resource.gpu.discoveryScript properties. Here you can also view the System Properties classpath entries to check that the plugin jars are in the JVM classpath.

Table 1.    Spark Properties

Name Value
spark.executor.resource.gpu.amount 1
spark.executor.resource.gpu.discoveryScript /home/ubuntu/getGpusResources.sh

Executors Tab

You can use the Executors tab to see which resources have been allocated for the executors for your application. In this instance, one GPUs has been allocated.

Debugging

For now, the best way to debug is how you would normally do it on Spark. Look at the UI and log files to see what failed. If you have a seg fault from the GPU find the hs_err_pid.log file. To make sure your hs_err_pid.log file goes into the YARN application log directory, you can add in the config: --conf spark.executor.extraJavaOptions="-XX:ErrorFile=<LOG_DIR>/hs_err_pid_%p.log".

If you want to see why an operation did not run on the GPU, turn on the configuration: --conf spark.rapids.sql.explain=NOT_ON_GPU. A log message is output to the Driver log as to why a Spark operation is not able to run on the GPU.

Out of GPU Memory

GPU out of memory can show up in multiple ways.  You can see an out of memory error or it can also manifest as just crashes. Generally this means your partition size is too big, go back to the Configuration section for the partition size and/or the number of partitions. Possibly reduce the number of concurrent GPU tasks to one. The Spark UI may give you a hint at the size of the data. Look at either the input data or the shuffle data size for the stage that failed.

Summary

In this chapter, we covered the basics of getting started using the new RAPIDS APIs Plugin for Apache Spark 3.x that leverages GPUs to accelerate processing. For more information refer to the RAPIDS Accelerator for Spark guide.