Spark가 동작하는 과정을 간단하게 요약하면 다음과 같다.
1.
사용자가 작성한 애플리케이션 코드는 RDD의 생성, Tramsform, Action 작업을 명시한다. Spark는 이를 토대로 연산작업의 DAG(Directed Acyclic Graph; 단방향 비순환 그래프)를 생성한다.
2.
DAG는 실행을 위해 단계별 Stage로 분할된다. 각 Stage는 여러 Task들로 구성되며, RDD의 하나의 Partition에 하나의 Task가 할당된다. 분할된 Stage는 분산병렬 실행을 위해 클러스터에 제출된다.
3.
Worker의 Executer는 전달받은 Stage의 Task들을 실행한다. 한 Stage내의 모든 Task의 실행이 끝나면 다음으로 제출되는 Stage를 전달받아 작업 수행을 반복한다.
DAG
Spark는 사용자가 작성한 코드를 통해 작업 워크플로우와 같은 그래프를 자동적으로 생성한다. 사용자가 RDD에 행하는 API 함수들에 대해 자동적으로 파이프라이닝이 수행된다.
Spark는 많은 네트워크 비용이 발행하는 Shuffle 연산을 최대한 연산을 수행할 수 있는 가까운 Partition에 할당하는 전략을 취한다. 또한 각 Stage는 Shuffle 연산을 포함하지 않는다.
간단한 WordCount 예제를 통해 Spark의 DAG 생성 흐름을 살펴본다.
sc.textFile("some/textfile") //1. RDD[String]
.map(line => line.split(' ') //2. RDD[Array[String]]
.map(word => (word, 1)) //3. RDD[(String, Int)]
.reduceByKey(_+_, 3) //4. RDD[(String, Int)]
.collect() //5. Array[(String, Int)]
YAML
복사
1.
텍스트파일로부터 가져온 최초의 RDD는 Partition 4개로 구성된다고 가정한다. 이 때 RDD는 String 형태의 데이터를 담는다.
2.
white space separator로 문자열 형태의 데이터를 분리한다. RDD는 String 데이터를 담은 Array 형태이다.
3.
분리된 각 단어를 key, 1을 value로 맵핑한다. RDD는 String, Int 튜플 데이터를 담은 Array 형태이다.
4.
reduce를 통해 Shuffle을 실행하였다. Spark가 자동으로 Partition의 개수를 정해주기도 하지만, 예제에선 Partition의 개수를 3개로 지정하였다. RDD는 String, Int 튜플 데이터를 담은 Array 형태이다.
5.
collect() API는 RDD에 있는 모든 레코드들을 Executer에서 Driver로 Array의 형태로 묶어 가져온다. RDD를 통한 Transformation 작업은 Executer에서 분산 병렬적으로 실행되고, Driver로 가져온 Action API 작업은 Driver 내에서만 수행된다.
Spark는 이 작업 흐름을 분석하여 DAG를 자동적으로 생성한다.
DAG는 Shuffle을 기준으로 각기 다른 Stage로 분할된다. 따라서, Stage 내엔 Shuffle 작업이 포함되지 않는다.
이제 각 Stage는 Task 단위로 쪼개어진다.
Stage 1
1.
HDFS에서 블럭 단위로 파일 읽기
2.
두번의 map 연산 수행
3.
부분적 Partition reduce 수행
4.
로컬 디스크레 Shuffle 데이터 작성
Stage 2
1.
로컬 디스크에서 Shuffle 데이터 읽기
2.
최종 reduce 연산 수행
3.
Driver로 결과 전송
Stage를 실행할 때, Spark는 RDD의 각 Partition 별로 Task를 생성한다. 각 Task를 Executer에서 실행하기 위해 Task들은 직렬화 되어 네트워크를 통해 Executer에 전송된다. 전송된 Task들은 Executer의 가용한 코어 개수에 따라 Task 프로세스를 병렬로 수행한다.