챕터 3에서는 Spark 3.x에서 GPU 가속의 기능에 대해 이야기했습니다. 이 챕터에서는 GPU를 활용하여 RAPIDS 라이브러리를 통해 처리를 가속화하는 Apache Spark 3.x의 새로운 RAPIDS 라이브러리를 사용하여 시작하는 기본 사항을 살펴보겠습니다(자세한 내용은 Apache Spark의 RAPIDS 액셀러레이터로 시작하기 참조).
Apache Spark의 RAPIDS 액셀러레이터는 다음과 같은 기능 및 한계를 가지고 있습니다.
이 GPU 가속을 사용하려면 다음이 필요합니다.
Spark를 배포하는 방법은 Spark를 설치하고 설정하기 위해 수행해야 하는 단계와 Spark의 RAPIDS 액셀러레이터에 영향을 줍니다. Spark를 배포하는 주요 방법은 다음과 같습니다.
기본적으로 설치를 위해서는 Spark 3.x, Spark jar용 RAPIDS 액셀러레이터 및 모든 작업자 노드에서 사용할 수 있는 GPU 검색 스크립트가 필요합니다. Local을 사용하면 로컬로 설치됩니다. Spark 독립 실행형의 경우 사용할 모든 노드에 이러한 기능을 설치합니다. Yarn를 사용하면 런처 노드에 설치되며 YARN에서 필요에 따라 노드로 전송을 처리합니다. Kubernetes를 사용하면 Docker 이미지 또는 Spark 애플리케이션이 실행될 때 장착된 드라이브에 필요한 것을 모두 넣습니다. 설치에 대한 자세한 내용은 Apache Spark에 대한 RAPIDS 액셀러레이터 시작하기를 참조하세요.
Spark 셸 및 ./bin/spark-submit은 지원 구성 속성을 동적으로, --conf와 같은 명령줄 옵션을 통해 또는 conf/spark-defaults.conf에서 구성 옵션을 읽음으로써 구성 속성을 동적으로 로드합니다. (Spark 구성에 대한 개요 및 세부 정보는 Spark 구성 가이드를 참조하세요.)
On startup use: --conf [conf key]=[conf value]. 예를 들어,
${SPARK_HOME}/bin/spark --jars 'rapids-4-spark_2.12-0.1.0.jar,cudf-0.14.jar' \ --conf spark.plugins=com.nvidia.spark.SQLPlugin \ --conf spark.rapids.sql.incompatibleOps.enabled=true
런타임 사용 시: spark.conf.set("[conf key]", [conf value]). 예를 들어,
scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)
--conf 키 값 쌍을 사용하여 GPU를 요청하고 작업에 할당할 수 있습니다. 사용하게 될 정확한 구성은 클러스터 관리자에 따라 다릅니다. 다음은 GPU 할당을 위한 구성 키 값 속성 중 몇 가지입니다.
--conf spark.executor.resource.gpu.amount=1
--conf spark.task.resource.gpu.amount=1
--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh
spark.task.resource.gpu.amount는 소수점일 수 있으므로 실행자에서 동시에 여러 작업을 실행하고 동일한 GPU에 할당하려는 경우 이를 1 미만의 소수점 값으로 설정할 수 있습니다. spark.executor.core 설정에 맞춰 설정하는 것이 좋습니다. 예를 들어, spark.executor.cores=2를 통해 각 실행자에서 2개의 작업을 실행할 수 있고 동일한 GPU에서 이 2개의 작업을 실행하려는 경우 spark.task.resource.gpu.amount=0.5를 설정합니다.
다음 구성은 시작하는 데 권장되지만 클러스터 및 애플리케이션에 따라 구성해야 합니다.
(--conf spark.rapids.sql.concurrentGpuTasks=2),
--conf spark.sql.files.maxPartitionBytes=512m
--- spark.hadoop.mapreduce.input.fileinputformat.split.minsize=536870912 --conf spark.hadoop.mapred.min.split.size=536870912
(--conf spark.serializer=org.apache.spark.serializer.KryoSerializer)
GpuKryoRegistrator 클래스를 등록해야 합니다. 예:
--conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator
구성 외에도 특정 요구 사항이 충족되는 한 성능에 도움이 될 수 있는 다른 플러그인별 구성이 있습니다. 이러한 구성은 GPU에서 실행할 수 있는 작업을 제어합니다(다음 표 참조). 이를 활성화하면 GPU에서 더 많은 것을 최적화하고 실행할 수 있지만, GPU에서 어떤 작업을 하는지 이해해야 합니다. 예를 들어 GPU는 CPU 버전과 100% 호환되지 않을 수 있습니다. 예를 들어 부동 소수점 수는 약간 다를 수 있습니다. 구성에 대한 자세한 내용은 Spark 구성에 대한 RAPIDS 액셀러레이터를 참조하세요.
Spark용 RAPIDS 액셀러레이터는 사용자의 API 변경이 필요하지 않으며 지원하는 SQL 작업을 GPU 작업으로 대체합니다. GPU 작업으로 대체된 작업을 보려면 설명 방법을 호출하여 DataFrame에 대한 실제 계획을 인쇄할 수 있으며 GPU 접두사가 있는 모든 작업은 GPU에서 수행됩니다.
이제 DataFrame의 실제 계획을 챕터 1에서 살펴본 것과 동일한 쿼리에 대해 GPU 처리와 비교합니다. 아래 물리적 계획에서 DAG는 GpuBatchScan, hour에 대한 GpuFilter, 그리고 hour, fare_amount 및 day_of_week에 대한 GpuProject(열 선택)로 구성됩니다. CPU 처리를 통해 FileScan, 필터 및 프로젝트로 구성되었습니다.
// 선택 및 필터는 좁은 변환입니다. df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2) 결과: +----+-----------+ |hour|fare_amount| +----+-----------+ | 0.0| 10.5| | 0.0| 12.5| +----+-----------+ df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).explain 결과: == Physical Plan == *(1) GpuColumnarToRow false< +- !GpuProject [hour#10, fare_amount#9] +- GpuCoalesceBatches TargetSize(1000000,2147483647) +- !GpuFilter (gpuisnotnull(hour#10) AND (hour#10 = 0.0)) +- GpuBatchScan[fare_amount#9, hour#10] GpuCSVScan Location: InMemoryFileIndex[s3a://spark-taxi-dataset/raw-small/train], ReadSchema: struct<fare_amount:double,hour:double>
원래 계획의 대부분의 노드가 어떻게 GPU 버전으로 대체되었는지 확인합니다. RAPID 액셀러레이터는 GpuColumnarToRow 및 GpuRowToColumnar와 같은 데이터 형식 변환 노드를 삽입하여 GPU에서 실행되는 노드의 기둥 처리와 CPU에서 실행되는 노드에 대한 행 처리 간에 변환합니다. 쿼리의 일부가 GPU에서 실행되지 않은 이유를 보려면 config spark.rapids.sql.explain를 true로 설정합니다. 출력은 드라이버의 로그 또는 대화형 모드의 화면에 기록됩니다.
GPU에서 실행 중인 작업을 확인하는 가장 쉬운 방법은 Spark 웹 UI의 "SQL" 탭을 보는 것입니다. 아래 쿼리에 대한 SQL 탭의 DAG 다이어그램에서 실제 계획은 GPUBatchScan, Project, GPUHashAggregate 및 GPUHashAggregate로 구성되어 있음을 알 수 있습니다. CPU 처리를 통해 Spark는 더 넓은 변환을 위해 Exchange에서 데이터를 셔플하기 전에 각 파티션에 대한 해시 집계를 수행합니다. 교환 후에는 이전 하위 집계의 해시 집계가 만들어집니다. GPU 처리의 경우 Exchange 셔플을 피했습니다.
val df3 = df2.groupBy("month").count .orderBy(asc("month"))show(5)
단계 세부 정보 페이지를 사용하여 단계 세부 정보 DAG를 볼 수 있으며, 파란색 정점(상자)은 RDD 또는 DataFrame을 나타내고 가장자리(상자 사이의 화살표)는 DataFrame에 적용된 작업을 나타냅니다.
환경 탭을 사용하여 속성이 올바르게 설정된 GPU 구성(예: Spark.executor.resource.gpu.amount 및 spark.executor.resource.gpu.discoveryScriptScript 속성)을 보고 확인할 수 있습니다. 여기에서 시스템 속성 클래스 경로 항목을 확인하여 플러그인 jar가 JVM 클래스 경로에 있는지 확인할 수도 있습니다.
표 1. Spark 속성
실행자 탭을 사용하여 애플리케이션에 대한 실행자에 할당된 리소스를 확인할 수 있습니다. 이 경우 하나의 GPU가 할당되었습니다.
현재 디버깅하는 가장 좋은 방법은 일반적으로 Spark에서 적용하는 방법입니다. UI 및 로그 파일을 확인하여 실패한 사항을 확인하세요. GPU에 seg 오류가 있는 경우 hs_err_pid.log 파일을 찾습니다. hs_err_pid.log 파일이 YARN 애플리케이션 로그 디렉터리로 들어가도록 하려면 config에 --conf spark.executor.extraJavaOptions="-XX:ErrorFile=<LOG_DIR>/hs_err_pid_%p.log"를 추가할 수 있습니다.
GPU에서 작업이 실행되지 않은 이유를 확인하려면 --conf spark.rapids.sql.explain=NOT_ON_GPU 구성을 활성화합니다. Spark 작업이 GPU에서 실행되지 않는 이유에 대한 로그 메시지가 드라이버 로그에 출력됩니다.
메모리에서 GPU는 여러 방법으로 표시될 수 있습니다. 메모리 부족 오류가 표시되거나 충돌처럼 나타날 수도 있습니다. 일반적으로 파티션 크기가 너무 크다는 의미이므로 파티션 크기 및/또는 파티션 수에 대한 구성 섹션으로 돌아가세요. 가능한 경우 동시 GPU 작업의 수를 하나로 줄일 수 있습니다. Spark UI는 데이터 크기에 대한 힌트를 줄 수 있습니다. 실패한 스테이지의 입력 데이터 또는 셔플 데이터 크기를 살펴봅니다.
이 챕터에서는 GPU를 활용하여 처리를 가속화하는 Apache Spark 3.x용 새 RAPIDS API 플러그인를 사용하여 시작하는 것의 기본 사항을 다뤄보았습니다. 자세한 내용은 Spark용 RAPIDS 액셀러레이터 가이드를 참조하세요.