GPU-Accelerated Apache Spark 3

Spark 3 and GPUs

Given the parallel nature of many data processing tasks, it’s only natural that the massively parallel architecture of a GPU should be able to parallelize and accelerate Spark data processing queries, in the same way that a GPU accelerates deep learning (DL) in artificial intelligence (AI). Therefore, NVIDIA® has worked with the Spark community to implement GPU acceleration as part of Spark 3.x. 

While Spark distributes computation across nodes in the form of partitions, within a partition, computation has historically been performed on CPU cores. However, the benefits of GPU acceleration in Spark are many. For one, fewer servers are required, reducing infrastructure cost. And, because queries are completed faster, you expect a reduction in time to results. Also, since GPU acceleration is transparent, applications built to run on Spark require no changes in order to reap the benefits of GPU acceleration.

Accelerated ETL and AI in Spark

As machine learning (ML) and DL are increasingly applied to larger datasets, Spark has become a commonly used vehicle for the data pre-processing and feature engineering needed to prepare raw input data for the learning phase. The Spark community has been focused on bringing both phases of this end-to-end pipeline together, so that data scientists can work with a single Spark cluster and avoid the penalty of moving data between phases via an external data lake. Horovod (by Uber) and TensorFlowOnSpark (by Yahoo) are examples of this approach.

Spark 3.x represents a key milestone, as Spark can now schedule GPU-accelerated ML and DL applications on Spark clusters with GPUs. The complete Spark 3 software stack that includes the RAPIDS Accelerator for Apache Spark is shown in the following figure.

New GPU-Accelerated Libraries on NVIDIA CUDA

As discussed previously, NVIDIA® CUDA® is a programming model and a set of APIs for accelerating operations on the NVIDIA GPU architecture. Layered on top of CUDA, RAPIDS is a suite of open-source software libraries and APIs that provide GPU parallelism and high-bandwidth memory speed through DataFrame and graph operations.

RAPIDS GPU-Accelerated Spark DataFrames

RAPIDS offers a powerful GPU DataFrame based on Apache Arrow data structures. Arrow specifies a standardized, language-independent, columnar memory format, optimized for data locality, to accelerate analytical processing performance on modern CPUs or GPUs. With the GPU DataFrame, batches of column values from multiple records take advantage of modern GPU designs and accelerate reading, queries, and writing.    

Spark GPU-Accelerated DataFrame and SQL

For Apache Spark 3.0, new RAPIDS APIs are used by Spark SQL and DataFrames for GPU-accelerated memory-efficient columnar data processing and query plans. With the RAPIDS accelerator, the Catalyst query optimizer plugin interface has been extended to identify operators within a query plan that can be accelerated with the RAPIDS API, mostly a one-to-one mapping, and to schedule those operators on GPUs within the Spark cluster when executing the query plan.

With a physical plan for CPUs, the DataFrame data is transformed into RDD row format and usually processed one row at a time. Spark supports columnar batch, but in Spark 2.x only the Vectorized Parquet and ORC readers use it. The RAPIDS plugin extends columnar batch processing on GPUs to most Spark operations. Processing columnar data is much more GPU friendly than row-by-row processing.

A new Spark shuffle implementation built upon OpenUCX communication libraries leverage NVLink, RDMA and InfiniBand (if available) to dramatically reduce data transfer among Spark processes by: keeping as much data on the GPU as possible, finding the fastest path to move data between nodes, using the best of available hardware resources, including bypassing the CPU to do GPU to GPU memory intra and inter node transfers.  RDMA allows GPUs to transfer data directly across nodes at up to PCIe speeds, operating as if on one massive server.  NVLink allows GPUs to initiate peer to peer communication at up to 300GB/s.

GPU-Aware Scheduling in Spark

Spark 3.x adds integration with the YARN, Kubernetes, and Standalone cluster managers to request GPUs and plugin points, which can be extended to run operations on GPUs. For Kubernetes, Spark 3.x offers GPU isolation at the executor pod level. This makes GPUs easier to request and use for Spark application developers, allows for closer integration with DL and AI frameworks like Horovod and TensorFlow on Spark, and allows for better utilization of GPUs. 

An example of a flow for GPU scheduling is shown in the diagram below. The user submits an application with a GPU resource configuration discovery script. Spark starts the driver, which uses the configuration to pass on to the cluster manager, to request a container with a specified amount of resources and GPUs. The cluster manager returns the container. Spark launches the container. When the executor starts, it will run the discovery script. Spark sends that information back to the driver and the driver can then use that information to schedule tasks to GPUs. 

The Spark Web UI has been modified with a new checkbox to see which resources have been allocated. In this instance, two GPUs have been allocated.

Spark 3.x stage level resource scheduling allows you to choose one container size for one stage and another size for another stage. For example, one for ETL and another for ML.

XGBoost, RAPIDS, and Spark

XGBoost is a scalable, distributed gradient-boosted decision tree (GBDT) ML library. XGBoost provides parallel tree boosting and is the leading ML library for regression, classification, and ranking problems. The RAPIDS team works closely with the Distributed Machine Learning Common (DMLC) XGBoost organization, and XGBoost now includes seamless, drop-in GPU acceleration, significantly speeding up model training and improving accuracy for better predictions.

RAPIDS, XGBOOST, and SPARK has three features that help with speed-up and cost:

  • GPU-accelerated DataFrame: Reads any number/size of supported input file formats directly into GPU memory and divides up evenly among the different training nodes.

  • GPU-accelerated training: XGBoost training time has been improved with a dynamic in-memory representation of the training data that optimally stores features based on the sparsity of a dataset. This replaces a fixed in-memory representation based on the largest number of features amongst different training instances.

  • Efficient GPU memory utilization: XGBoost requires that data fit into memory which creates a restriction on data size using either a single GPU or distributed multi-GPU multi-node training. The latest release has improved GPU memory utilization by 5X. Now users can train with data that is five times the size as compared to the first version. This improves total cost of training without impacting performance.

Later in this eBook, we explore and discuss an example using the upgraded XGBoost library to load/transform data and conduct distributed training using GPUs.

Other Spark 3.x Features

  • Adaptive Query execution: Spark 2.2 added cost-based optimization to the existing rule based SQL Optimizer. Spark 3.0 now has runtime adaptive query execution(AQE). With AQE, runtime statistics retrieved from completed stages of the query plan are used to re-optimize the execution plan of the remaining query stages. Databricks benchmarks yielded speed-ups ranging from 1.1x to 8x when using AQE.

    Spark 3.0 AQE optimization features include:

    • Dynamically coalesce shuffle partitions: AQE can combine adjacent small partitions into bigger partitions in the shuffle stage by looking at the shuffle file statistics, reducing the number of tasks for query aggregations.
    • Dynamically switch join strategies: AQE can optimize the join strategy at runtime based on the join relation size. For example, converting a sort merge join to a broadcast hash join which performs better if one side of the join is small enough to fit in memory.
    • Dynamically optimize skew joins: AQE can detect data skew in sort-merge join partition sizes using runtime statistics and split skew partitions into smaller sub-partitions.
  • Dynamic Partition Pruning: Partition pruning is a performance optimization that limits the number of files and partitions that Spark reads when querying. After partitioning the data, queries that match certain partition filter criteria improve performance by allowing Spark to only read a subset of the directories and files. Spark 3.0 dynamic partition pruning allows the Spark engine to dynamically infer, at runtime, the specific partitions within a table that need to be read and processed for a specific query, by identifying the partition column values that result from filtering another table in a join. For example, the following query involves two tables: the flight_sales table that contains all of the total sales for all flights, partitioned by originating airport, and the flight_airports table that contains a mapping of airports for each region. Here we are querying for sales in the North-East America region.

select fs.airport, fs.total_sales
from flight_sales fs, flight_airports fa
where fs.airport = fa.airport and fa.region  = 'NEUSA'

With dynamic partition pruning, this query will scan and process only the partitions for the airports returned by the where filter on the region. Reducing the amount of data read and processed results in a significant time savings.

  • Join strategy hints instruct the optimizer to use the hinted plan for join strategies. MERGE, SHUFFLE_HASH and SHUFFLE_REPLICATE_NL hints were added to the existing BROADCAST hint.
  • DataSource API Improvements:
    • Pluggable catalog integration.
    • Improved predicate push down for faster queries via reduced data loading.

Summary

In this chapter, we covered the main improvements in Spark 3.x that are proving instrumental in accelerating time to insights, especially when executed on NVIDIA GPUs. Details on new Spark 3.0 features can be found in the Spark 3.0 release notes.