GPU 가속 Apache Spark 3

Spark 3 및 GPU

수많은 데이터 처리 작업의 병렬 성질을 감안하면, 당연히 GPU의 매우 병렬적인 아키텍처는 GPU가 인공지능(AI)에서 딥러닝(DL)을 가속화하는 것과 같은 방법으로 Spark 데이터 처리 쿼리를 병렬화 및 가속화해야 합니다. 따라서 NVIDIA® Spark 커뮤니티와 협력하여 Spark 3.x의 일부로 GPU 가속을 구현했습니다. 

Spark는 파티션 형태로 노드 간에 컴퓨팅을 배포하지만 파티션 내에서는 전통적으로 CPU 코어에서 컴퓨팅이 수행되었습니다. 그러나 Spark에서 GPU 가속의 이점은 많습니다. 그 중 하나는, 서버가 더 적게 필요하여 인프라 비용이 절감된다는 것입니다. 쿼리가 더 빨리 완료되므로 결과 도출까지의 시간이 감소할 것으로 기대할 수 있습니다. 또한, GPU 가속이 투명하기 때문에 Spark에서 실행되도록 제작된 애플리케이션은 변경 없이도 GPU 가속의 이점을 얻을 수 있습니다.

Spark의 가속 ETL 및 AI

머신 러닝(ML) 및 DL이 더 큰 데이터세트에 사용되는 경우가 늘어감에 따라 Spark는 학습 단계를 위한 원시 입력 데이터 준비에 필요한 피처 엔지니어링 및 데이터 전처리에 일반적으로 사용되는 도구가 되었습니다. Spark 커뮤니티는 데이터 사이언티스트가 단일 Spark 클러스터로 작업하고 외부 데이터 레이크를 통해 데이터를 단계 간에 이동하여 받는 불이익을 방지하도록 이러한 엔드 투 엔드 파이프라인의 두 단계를 통합하는 데 주력해왔습니다. 이러한 접근 방식은 Horovod(Uber) 및 TensorFlowOnSpark(Yahoo)를 예로 들 수 있습니다.

이제 Spark가 GPU 가속화 ML 및 DL 애플리케이션을 GPU가 있는 Spark 클러스터에서 예약할 수 있으므로, Spark 3.x는 주요 이정표라 할 수 있습니다. Apache Spark를 위한 RAPIDS Accelerator가 포함된 전체 Spark 3 소프트웨어 스택은 다음 그림에서 볼 수 있습니다.

NVIDIA CUDA의 새 GPU 가속 라이브러리

앞서 설명한 바와 같이, NVIDIA® CUDA®는 NVIDIA GPU 아키텍처에서 작업을 가속화하기 위한 프로그래밍 모델이자 API의 집합입니다. CUDA 위에 레이어링된 RAPIDS는 DataFrame 및 그래프 작업을 통해 GPU 병렬 및 고대역폭 메모리 속도를 제공하는 오픈 소스 소프트웨어 라이브러리 및 API 제품군입니다.

RAPIDS GPU 가속 Spark DataFrame

RAPIDS는 Apache Arrow 데이터 구조를 기반으로 강력한 GPU 데이터 프레임을 제공합니다. Arrow는 데이터 지역성에 최적화된 표준화된 언어 독립적 열 기반 메모리 형식을 지정하여 최신 CPU 또는 GPU에서 분석 처리 성능을 가속화합니다. GPU DataFrame을 사용하면 여러 레코드의 열 값의 배치는 최신 GPU 설계를 활용하고 읽기, 쿼리 및 쓰기를 가속화합니다.    

Spark GPU 가속 DataFrame 및 SQL

Apache Spark 3.0의 경우, 새로운 RAPIDS API는 Spark SQL 및 DataFrames에서 메모리 효율적인 GPU 가속 열 기반 데이터 처리 및 쿼리 계획을 위해 사용됩니다. Catalyst 쿼리 최적화 프로그램 플러그인 인터페이스는 주로 일대일 매핑인 RAPIDS API를 통해 가속할 수 있는 쿼리 계획 내에서 운영자를 식별하고, 쿼리 계획을 실행할 때 Spark 클러스터 내에서 해당 연산자를 예약하기 위해 RAPIDS 액셀러레이터를 통해 확장되었습니다.

CPU에 대한 물리적 계획을 통해, DataFrame 데이터는 RDD 행 형식으로 변환되며 일반적으로 한 번에 한 행을 처리합니다. Spark는 열 기반 배치를 지원하지만 Spark 2.x에서는 Vectorized Parquet 및 ORC 판독기에서만 이를 사용합니다. RAPIDS 플러그인은 대부분의 Spark 작업에 GPU에 대한 열 기반 배치 처리를 확장합니다. 열 기반 데이터를 처리하는 것은 행별 처리보다 훨씬 GPU 친화적입니다.

OpenUCX 통신 라이브러리를 기반으로 구축된 새로운 Spark 셔플 구현은 NVLink, RDMA 및 InfiniBand(사용 가능한 경우)를 활용하여 GPU에 최대한 많은 데이터를 유지하고, 노드 간에 데이터를 이동하는 가장 빠른 경로를 찾고, GPU를 우회하여 GPU 메모리와 노드 간 전송을 포함하는 가능한 하드웨어 리소스중 가장 빠른 경로를 찾는 등 Spark 프로세스 간에 데이터 전송을 획기적으로 줄입니다.   RDMA를 사용하면 GPU가 최대 PCIe 속도로 노드 간에 직접 데이터를 전송할 수 있으며, 하나의 대규모 서버처럼 작동합니다.   NVLink를 사용하면 GPU가 최대 300GB/s에서 피어 투 피어 통신을 시작할 수 있습니다.

Spark의 GPU 인식 예약

Spark 3.x는 YARN, Kubernetes 및 독립 실행형 클러스터 관리자와의 통합을 추가하여 GPU 및 플러그인 포인트를 요청하며, 이는 GPU에서 작업을 실행하도록 확장할 수 있습니다. Kubernetes의 경우, Spark 3.x는 실행자 포드 수준에서 GPU 격리를 제공합니다. 이를 통해 Spark 애플리케이션 개발자가 GPU를 더 쉽게 요청하고 사용할 수 있으며, Spark의 Horovod 및 TensorFlow와 같은 DL 및 AI 프레임워크와 더 쉽게 통합되고 GPU를 더 잘 활용할 수 있습니다. 

아래 다이어그램에 GPU 스케줄링에 대한 흐름의 예가 나와 있습니다. 사용자는 GPU 리소스 구성 검색 스크립트가 있는 애플리케이션을 제출합니다. Spark는 구성을 사용하여 클러스터 관리자에게 전달하는 드라이버를 시작하여 지정된 양의 리소스와 GPU가 있는 컨테이너를 요청합니다. 클러스터 관리자는 컨테이너를 반환합니다. Spark가 컨테이너를 시작합니다. 실행자가 시작되면 검색 스크립트가 실행됩니다. Spark는 해당 정보를 드라이버로 다시 전송하고 드라이버는 해당 정보를 사용하여 작업을 GPU로 예약할 수 있습니다. 

Spark 웹 UI는 할당된 리소스를 확인하기 위해 새 확인란을 갖도록 수정되었습니다. 이 경우 2개의 GPU가 할당되었습니다.

Spark 3.x 스테이지 수준 리소스 스케줄링을 사용하면 한 스테이지에 대해 하나의 컨테이너 크기, 그리고 다른 스테이지에 대해서는 다른 크기를 선택할 수 있습니다. 예를 들어 하나는 ETL용, 다른 하나는 ML용으로 선택할 수 있습니다.

XGBoost, RAPIDS 및 Spark

XGBoost는 확장 가능한 분산 그라데이션 부스트 의사결정 트리(GBDT) ML 라이브러리입니다. XGBoost는 병렬 트리 부스팅을 제공하며 회귀, 분류 및 순위 문제에 대한 선도적인 ML 라이브러리입니다. RAPIDS 팀은 DMLC(Distributed Machine Learning Common) XGBoost 조직과 긴밀하게 협력하고 있으며, XGBoost에는 이제 원활한 드롭인 GPU 가속이 포함되어 모델 교육 속도를 크게 높이고 더 나은 예측을 위한 정확도를 향상해줍니다.

RAPIDS, XGBOOST 및 SPARK에는 속도와 비용에 도움이 되는 세 가지 기능이 있습니다.

  • GPU 가속 DataFrame: 지원되는 입력 파일 형식의 수/크기를 GPU 메모리에서 직접 읽고 다른 교육 노드 간에 균등하게 나눕니다.

  • GPU 가속 트레이닝: 데이터세트의 희소성을 기반으로 특성을 최적으로 저장하는 교육 데이터의 동적인 메모리 표현을 통해 XGBoost 교육 시간이 개선되었습니다. 이는 서로 다른 트레이닝 인스턴스 간에 가장 많은 수의 특성을 기반으로 하는 고정된 메모리 내 표현을 대체합니다.

  • 효율적인 GPU 메모리 사용률: XGBoost는 데이터가 메모리에 맞도록 요구하므로 단일 GPU 또는 분산 멀티 GPU 멀티 노드 트레이닝을 사용하여 데이터 크기에 제한이 발생합니다. 최신 릴리스는 GPU 메모리 사용률을 5배 향상시켰습니다. 이제 사용자는 첫 번째 버전과 비교하여 5배의 크기를 가진 데이터로 트레이닝할 수 있습니다. 이렇게 하면 성능에 영향을 주지 않으면서 총 트레이닝 비용이 개선됩니다.

이 전자책의 후반부에는 업그레이드된 XGBoost 라이브러리를 사용하여 데이터를 로드/변환하고 GPU를 사용하여 분산 교육을 수행하는 예제를 탐색하고 이야기합니다.

기타 Spark 3.x 기능

  • 적응형 쿼리 실행: Spark 2.2는 기존 규칙 기반 SQL 최적화 프로그램에 비용 기반 최적화를 추가했습니다. Spark 3.0에는 이제 런타임 적응형 쿼리 실행(AQE)이 있습니다. AQE를 사용하면 쿼리 계획의 완료된 단계에서 검색된 런타임 통계는 나머지 쿼리 단계의 실행 계획을 다시 최적화하는 데 사용됩니다. Databricks 벤치마크는 AQE 사용 시 1.1~8배의 속도 향상을 달성했습니다.

    Spark 3.0 AQE 최적화 기능은 다음과 같습니다.

    • 셔플 파티션을 동적으로 결합: AQE는 셔플 파일 통계를 보고 인접한 작은 파티션을 셔플 단계에서 더 큰 파티션으로 결합하여 쿼리 집계에 대한 작업의 수를 줄일 수 있습니다.
    • 조인 전략을 동적으로 전환: AQE는 조인 관계 크기에 따라 런타임 시 조인 전략을 최적화할 수 있습니다. 예를 들어, 정렬 병합 조인을 조인의 한쪽이 메모리에 들어갈 만큼 작으면 더 나은 성능을 수행하는 브로드캐스트 해시 조인으로 변환합니다.
    • 왜곡 조인을 동적으로 최적화: AQE는 런타임 통계를 사용하여 파티션을 정렬 병합하여 데이터 왜곡을 감지하고 파티션을 더 작은 하위 파티션으로 분할할 수 있습니다.
  • 동적 파티션 가지치기: 파티션 가지치기는 쿼리할 때 Spark가 읽는 파일 및 파티션 수를 제한하는 성능 최적화입니다. 데이터를 분할한 후 특정 파티션 필터 기준과 일치하는 쿼리는 Spark가 디렉터리 및 파일의 하위 집합만 읽을 수 있도록 하여 성능을 향상시킵니다. Spark 3.0 동적 파티션 가지치기를 사용하면 Spark 엔진이 런타임에 특정 쿼리에 대해 읽고 처리해야 하는 테이블 내의 특정 파티션을 동적으로 추론할 수 있으며, 조인에서 다른 테이블을 필터링하여 발생하는 파티션 열 값을 식별할 수 있습니다. 예를 들어 다음 쿼리에는 모든 항공편에 대한 총 판매량이 모두 포함된 flight_sales 테이블과 각 지역에 대한 공항 매핑이 포함된 flight_airports 테이블이라는 두 테이블이 포함됩니다. 여기에서 북미 지역의 판매를 쿼리해보겠습니다.

select fs.airport, fs.total_sales
from flight_sales fs, flight_airports fa
where fs.airport = fa.airport and fa.region  = 'NEUSA'

동적 파티션 가지치기를 사용하면 이 쿼리는 해당 지역의 필터가 반환하는 공항에 대한 파티션만 스캔하고 처리합니다. 읽기 및 처리된 데이터의 양을 줄이면 상당한 시간을 절약할 수 있습니다.

  • 조인 전략 힌트는 최적화 프로그램이 조인 전략에 대한 힌트 계획을 사용하도록 지시합니다. MERGE, SHUFFLE_HASH 및 SHUFFLE_REPLICATE_NL 힌트가 기존 BROADCAST 힌트에 추가되었습니다.
  • 데이터소스 API 개선 사항:
    • 플러그 가능한 카탈로그 통합.
    • 데이터 로딩 감소를 통해 쿼리 속도를 높이기 위해 술어 푸시다운이 개선되었습니다.

요약

이 장에서는 특히 NVIDIA GPU에서 실행될 때 통찰력에 대한 시간을 가속화하는 데 중요한 역할을 하는 Spark 3.x의 주요 개선 사항을 다루었습니다. 새로운 Spark 3.0 기능에 대한 자세한 내용은 Spark 3.0 릴리스 정보에서 확인할 수 있습니다.