Apache Spark 처리 소개

IDC는 데이터센터와 엣지 컴퓨팅 및 IOT에서 생성된 데이터는 향후 7년 동안 4배 증가하여 175ZB에 달할 것으로 예측합니다. 데이터의 엄청난 성장에 따라, Apache Software Foundation의 Apache Spark는 온프레미스 및 클라우드 모두에서 수백만 대의 서버에서 실행되는 확장형 분산 데이터 처리를 위한 가장 인기 있는 프레임워크 중 하나가 되었습니다. 이 챕터에서는 Spark 프레임워크에 대한 소개를 제공하고 애플리케이션을 실행하는 방법을 설명합니다. 

Apache Spark는 Hadoop Apache Mesos, Kubernetes, 독립 실행형 또는 클라우드에서 실행되는 대규모 데이터 처리를 위한 빠른 범용 분석 엔진입니다. Spark는 대화형 셸, 노트북 또는 패키지된 애플리케이션을 사용하여 Scala, Python, R 또는 SQL에서 병렬 애플리케이션을 쉽게 빌드할 수 있는 고급 연산자를 제공합니다. 

Spark 핵심 데이터 처리 엔진에 더해, SQL 및 DataFrames, 머신 러닝, GraphX, 그래프 컴퓨팅 및 스트림 처리를 위한 라이브러리가 있습니다. 이러한 라이브러리는 HDFS, Alluxio, Apache Cassandra, Apache HBase 또는 Apache Hive와 같은 다양한 데이터 소스의 대규모 데이터세트와 함께 사용할 수 있습니다.

클러스터에서 Spark가 실행되는 방법

Spark 애플리케이션은 클러스터 노드의 실행자 프로세스 내부의 병렬 작업으로 실행되며, 실행은 드라이버 프로그램의 SparkSession 개체와 클러스터의 리소스 또는 클러스터 관리자(독립 실행형, Mesos, YARN 또는 Kubernetes) 간에 조정됩니다.

Spark는 로컬 모드라고 하는 단일 컴퓨터에서도 실행할 수 있습니다. 로컬 모드에서는 드라이버 프로그램 및 작업은 동일한 Java 가상 컴퓨터에서 스레드로 실행됩니다.   로컬 모드는 시제품, 개발, 디버깅 및 테스트에 유용합니다.   그러나 로컬 모드는 프로덕션 애플리케이션을 실행하기 위한 것이 아닙니다. 

파일에서 DataFrame 만들기

Spark DataFrame은 클러스터의 여러 노드에 분할되어 병렬로 작동할 수 있는 org.apache.spark.sql.Row 개체의 분산된 데이터세트입니다. DataFrame은 R 또는 Python의 DataFrame과 유사하지만 Spark 최적화와 유사한 행 및 열이 있는 데이터 테이블을 나타냅니다. DataFrame은 파티션으로 구성되며, 각 파티션은 데이터 노드의 캐시에 있는 행 범위입니다.

DataFrames는 csv, parquet, JSON 파일, Hive 테이블 또는 외부 데이터베이스와 같은 데이터 소스에서 구성할 수 있습니다. DataFrame은 관계형 변환 및 Spark SQL 쿼리를 사용하여 작동할 수 있습니다. 

Spark 셸 또는 Spark 노트북은 Spark를 대화형으로 사용할 수 있는 간단한 방법을 제공합니다.  다음 명령으로 로컬 모드에서 셸을 시작할 수 있습니다. 

$ /[installation path]/bin/spark-shell --master local[2]

그런 다음 이 챕터의 나머지 부분에 있는 코드를 셸에 입력하여 결과를 대화형으로 볼 수 있습니다.  코드 예제에서 셸의 출력은 결과가 맨 처음에 나옵니다.

애플리케이션 드라이버와 클러스터 관리자 간의 실행 조정을 위해, 프로그램에서 다음 코드 예제와 같이 SparkSession 개체를 만듭니다.

val spark = SparkSession.builder.appName("Simple Application").master("local[2]").getOrCreate()

Spark 애플리케이션이 시작되면 마스터 URL을 통해 클러스터 관리자에게 연결됩니다. 마스터 URL은 SparkSession 개체를 만들거나 SparkSession 애플리케이션을 제출할 때 N 스레드를 사용하여 로컬로 실행되도록 클러스터 관리자 또는 로컬[N]으로 설정할 수 있습니다. Spark 셸 또는 노트북을 사용하는 경우 SparkSession 개체가 이미 만들어져 있으며 변수 Spark로 사용할 수 있습니다. 연결되면 클러스터 관리자는 클러스터의 노드에 대해 구성된 대로 리소스를 할당하고 실행자 프로세스를 시작합니다. Spark 애플리케이션이 실행되면 SparkSession은 실행을 위해 작업을 실행자에 보냅니다.

SparkSession 읽기 방법을 사용하면 파일에서 DataFrame으로 데이터를 읽고 스키마에 대한 파일 유형, 파일 경로 및 입력 옵션을 지정할 수 있습니다.

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val schema =
  StructType(Array(
    StructField("vendor_id", DoubleType),
    StructField("passenger_count", DoubleType),
    StructField("trip_distance", DoubleType),
    StructField("pickup_longitude", DoubleType),
    StructField("pickup_latitude", DoubleType),
    StructField("rate_code", DoubleType),
    StructField("store_and_fwd", DoubleType),
    StructField("dropoff_longitude", DoubleType),
    StructField("dropoff_latitude", DoubleType),
    StructField("fare_amount", DoubleType),
    StructField("hour", DoubleType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", DoubleType),
    StructField("day_of_week", DoubleType),
    StructField("is_weekend", DoubleType)
  ))

val file = "/data/taxi_small.csv"

val df = spark.read.option("inferSchema", "false")
.option("header", true).schema(schema).csv(file)

result: 
df: org.apache.spark.sql.DataFrame = [vendor_id: double, passenger_count:
double ... 14 more fields]

take 메서드는 이 DataFrame의 개체가 포함된 배열을 반환하며, 이는 org.apache.spark.sql.Row 형식입니다. 

df.take(1)
result: 
Array[org.apache.spark.sql.Row] =
Array([4.52563162E8,5.0,2.72,-73.948132,40.829826999999995,-6.77418915E8,-1.0,-73.969648,40.797472000000006,11.5,10.0,2012,11,13.0,6.0,1.0])

DataFrame 변환 및 작업

DataFrames는 변환이라고 하는 구조화된 데이터 처리를 위해 도메인별 언어 API를 제공합니다. 변환은 현재 DataFrame에서 새로운 변형된 DataFrame을 생성하며 느리게 평가됩니다. 변환은 작업에 의해 트리거될 때 실행되며, 이는 결과를 드라이버 프로그램에 반환하거나 디스크에 작성합니다. 작업이 실행되고 값이 반환되면 DataFrame이 캐시되지 않는 한 DataFrame은 더 이상 메모리에 없습니다. Spark는 dataFrame.cache()를 호출하여 메모리 내 열 기반 형식을 사용하여 DataFrame을 캐시할 수 있습니다.

다음은 일반적으로 사용되는 DataFrame 변환 목록입니다.

  • select    열 집합 선택
  • join    지정된 조인 식을 사용하여 다른 DataFrame과 조인
  • groupBy   지정된 열을 사용하여 DataFrame을 그룹화

이 groupBy 변환 예제에서는 택시 DataFrame을 하루 중 시간으로 그룹화한 다음, 카운트 작업은 시간당 택시 여행 수를 모두 더합니다. show 작업은 결과 DataFrame 행을 표 형식으로 인쇄합니다. 

df.groupBy("hour").count().show(4)

결과: 
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   49|
| 2.0|  658|
| 3.0|  742|
+----+-----+

다음은 일반적으로 사용되는 DataFrame 작업 목록입니다.

  • show(n)    처음 n개의 행을 표 형식으로 표시
  • take(n)    배열에서 DataFrame의 처음 n개의 행을 반환
  • count    DataFrame의 행 수를 반환

DataFrame 변환 좁고 광범위한 종속성

DataFrame 변환에는 두 가지 유형, 즉 좁은 종속성인 경우와 넓은 종속성인 경우의 변환이 있습니다. 기존 DataFrame에서 새 DataFrame을 만들 때, 종속성이 좁은 변환은 셔플이라는 프로세스에서 파티션 간에 데이터를 이동해야 합니다. 예를 들어 좁은 변환은 지정된 SQL 식을 기반으로 DataFrame에서 행을 필터링하는 데 사용되는 filter()입니다. 다음 예시는 시간 값 = 0에 대해 필터링합니다.

메모리의 DataFrame에서 여러 개의 좁은 변환을 수행하는 데 파이프라이닝이라는 프로세스를 사용할 수 있으며, 좁은 변환이 매우 효율적이게 됩니다. 필터 및 선택과 같은 좁은 변환은 아래 예제에서 하루의 0시간 동안 택시 fare_amounts를 검색하는 데 사용됩니다.

// 선택 및 필터는 좁은 변환입니다.
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2)

결과:
+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0|       10.5|
| 0.0|       12.5|
+----+-----------+

기존 DataFrame에서 새 DataFrame을 만들 때, 광범위한 종속성을 가진 변환은 셔플이라는 프로세스에서 파티션 간에 데이터를 이동해야 합니다. 셔플은 네트워크를 통해 다른 노드로 데이터를 전송하고 디스크에 작성하여 네트워크 및 디스크 I/O를 유발합니다. 광범위한 변환의 예는 groupBy, agg, sortBy 및 orderBy입니다. 광범위한 변환은 시간 값별로 그룹을 보여줍니다.

다음은 시간 값별 그룹으로의 광범위한 변환이며 시간별 택시 여행 횟수를 계산합니다. 

df.groupBy("hour").count().show(4)

결과: 
+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   49|
| 2.0|  658|
| 3.0|  742|
+----+-----+