Spark 애플리케이션은 어떻게 실행되나요?

Spark 쿼리가 실행되면 다음 단계를 거칩니다. 

  • 논리적 계획 생성
  • 논리적 계획을 물리적 계획으로 변환
  • 코드 생성
  • 클러스터에서 작업 실행

Apache Spark는 DAG(지시된 비순환 그래프) 형태로 이러한 계획의 시각적 표현을 보는 데 사용할 수 있는 웹 UI를 제공합니다. 웹 UI를 사용하면 계획이 Spark 클러스터의 상태 및 리소스 소비를 어떻게 실행하고 모니터링하는지 확인할 수도 있습니다. 다음 URL(http://<driver-node>:4040)을 통해 웹 UI를 실시간으로 볼 수 있습니다.  애플리케이션의 이벤트 로그가 존재하는 경우, http://<server-url>:18080에서 Spark의 기록 서버를 통해 실행 후 웹 UI를 볼 수 있습니다.  

첫 번째 단계에서 제출된 SQL 또는 DataFrame에 대한 논리적 계획이 만들어집니다. 논리적 계획에는 실행될 추상 변환의 집합이 표시됩니다. Spark Analyzer는 메타데이터 카탈로그를 사용하여 테이블과 열을 해결한 다음 필터 푸시 다운과 같은 규칙을 사용하여 계획을 최적화하는 Catalyst Optimizer 프로그램에 계획을 전달합니다.  

작업은 논리적 DAG의 물리적 실행 계획으로의 변환을 트리거합니다. 물리적 계획은 다른 실행 전략에 대한 비용 모델을 사용하여 계획을 실행하는 리소스를 식별합니다. 이 예시는 브로드캐스트 조인 대 해시 조인입니다.  

물리적 계획 보기

설명("형식 지정") 메서드를 호출하여 DataFrame에 대한 서식이 지정된 물리적 계획을 볼 수 있습니다. 아래 물리적 계획에서 df2용 DAG는 스캔 csv 파일, day_of_week 필터, 그리고 시간, fare_amount 및 day_of_week에 대한 프로젝트(열 선택)로 구성됩니다. 

val df = spark.read.option("inferSchema", "false") .option("header", true).schema(schema).csv(file)
val df2 = df.select($"hour", $"fare_amount", $"day_of_week").filter($"day_of_week" === "6.0" )
df2.show(3)
결과: 
+----+-----------+-----------+
|hour|fare_amount|day_of_week|
+----+-----------+-----------+
|10.0|       11.5|        6.0|
|10.0|        5.5|        6.0|
|10.0|       13.0|        6.0|
+----+-----------+-----------+
df2.explain(“formatted”)
결과:
== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- Scan csv  (1)

(1) Scan csv
Location: [dbfs:/FileStore/tables/taxi_tsmall.csv] 
Output [3]: [fare_amount#143, hour#144, day_of_week#148]
PushedFilters: [IsNotNull(day_of_week), EqualTo(day_of_week,6.0)]

(2) Filter [codegen id : 1]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]
Condition : (isnotnull(day_of_week#148) AND (day_of_week#148 = 6.0))

(3) Project [codegen id : 1]
Output [3]: [hour#144, fare_amount#143, day_of_week#148]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]

웹 UI SQL 탭에서는 Catalyst에서 생성된 계획에 대해 자세히 볼 수 있습니다. 쿼리 설명 링크를 클릭하면 DAG와 쿼리에 대한 세부 정보가 표시됩니다.

설명 후 다음 코드에서 df3에 대한 물리적 계획은 Scan, Filter, Project, HashAggregate, Exchange 및 HashAggregate로 구성되어 있음을 알 수 있습니다. Exchange는 groupBy 변환에 의해 발생한 셔플입니다. Spark는 Exchange에서 데이터를 셔플하기 전에 각 파티션에 대한 해시 집계를 수행합니다. 교환 후에는 이전 하위 집계의 해시 집계가 만들어집니다. df2가 캐시된 경우 이 DAG에서 파일 스캔 대신 메모리 내 스캔을 하게 됩니다.

val df3 = df2.groupBy("hour").count
df3.orderBy(asc("hour"))show(5)
결과:
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   47|
| 2.0|  658|
| 3.0|  742|
| 4.0|  812|
+----+-----+

df3.explain
결과:
== Physical Plan ==
* HashAggregate (6)
+- Exchange (5)
   +- * HashAggregate (4)
      +- * Project (3)
         +- * Filter (2)
            +- Scan csv  (1)
(1) Scan csv 
Output [2]: [hour, day_of_week]
(2) Filter [codegen id : 1]
Input [2]: [hour, day_of_week]
Condition : (isnotnull(day_of_week) AND (day_of_week = 6.0))
(3) Project [codegen id : 1]
Output [1]: [hour]
Input [2]: [hour, day_of_week]
(4) HashAggregate [codegen id : 1]
Input [1]: [hour]
Functions [1]: [partial_count(1) AS count]
Aggregate Attributes [1]: [count]
Results [2]: [hour, count]
(5) Exchange
Input [2]: [hour, count]
Arguments: hashpartitioning(hour, 200), true, [id=]
(6) HashAggregate [codegen id : 2]
Input [2]: [hour, count]
Keys [1]: [hour]
Functions [1]: [finalmerge_count(merge count) AS count(1)]
Aggregate Attributes [1]: [count(1)]
Results [2]: [hour, count(1) AS count]

이 쿼리에 대한 SQL 탭 링크를 클릭하면 작업의 DAG가 표시됩니다.

세부 정보 확장 확인란을 선택하면 각 단계에 대한 자세한 정보가 표시됩니다. 첫 번째 블록인 WholeStageCodegen은 성능을 향상시키기 위해 여러 연산자(스캔 csv, 필터, 프로젝트 및 HashAggregate)를 단일 Java 함수로 컴파일합니다. 행 수 및 스필 크기와 같은 메트릭이 다음 화면에 표시됩니다.

Exchange라는 두 번째 블록에는 작성된 셔플 레코드 수와 데이터 크기 합계를 포함하여 셔플 교환의 메트릭이 표시됩니다.

클러스터에서 작업 실행하기

세 번째 단계에서는 작업이 클러스터에서 예약 및 실행됩니다.

스케줄러는 변환을 기반으로 그래프를 단계로 분할합니다. 좁은 변환(데이터 이동이 없는 변환)은 단일 단계로 함께 그룹화(파이프라인)됩니다. 이 예제의 물리적 계획에는 두 단계가 있으며, 교환 전의 모든 것은 첫 번째 단계에서 이루어집니다. Spark는 전체 단계 Java 코드 생성을 포함하여 런타임에 추가적인 최적화를 수행합니다. 이렇게 하면 각 연산자에 대한 반복자 코드를 생성하는 대신 SQL 쿼리에서 운영자 집합에 대해 하나의 최적화된 Java 함수를 바이트코드로 생성하여 CPU 사용을 최적화합니다.

각 단계는 데이터프레임의 파티션을 기반으로 동일한 컴퓨팅을 병렬로 수행하는 작업으로 구성됩니다.

그런 다음 스케줄러는 단계 작업 세트를 작업 스케줄러에 제출하며, 그러면 작업이 실행을 위해 실행자에 제출됩니다.

작업이 완료되면 동작에 따라 동작 값이 드라이버로 반환되거나 디스크에 기록됩니다.

웹 UI 작업 탭을 클릭하면 단계 및 작업을 포함하여 작업 진행 상황에 대한 세부 정보를 볼 수 있습니다. 다음 예제에서는 작업이 두 단계로 구성되며 셔플 전 스테이지에 두 개의 작업이 있고 셔플 후 스테이지에서 200개의 작업이 있습니다. 작업 수는 파티션에 해당합니다. 첫 번째 단계에서 파일을 읽은 후에는 두 개의 파티션이 있습니다.

셔플 후 파티션의 기본 개수는 200개입니다. (spark.sql.shuffle.partitions 속성으로 데이터를 셔플할 때 사용할 파티션 수를 구성할 수 있습니다.)

요약

이 챕터에서는 Spark를 소개하고 클러스터에서 코드를 실행하는 방법을 시연했으며 Spark Web UI를 사용하여 이를 모니터링하는 방법을 보여드렸습니다. 애플리케이션의 성능을 디버깅, 분석 및 튜닝할 때 Spark가 애플리케이션을 실행하는 방법을 아는 것이 중요합니다.