Search

Spark WordCount 작성 및 빌드

Spark WordCount 코드

WordCount.scala에 다음과 같이 코드를 작성하였다.
Spark의 RDD가 제공하는 API는 Scala가 제공하는 API와 거의 동일하다. 단지 다른 점은 Spark의 RDD가 제공하는 API들은 클러스터 내에서 병렬 처리가 가능한 API란 것이다.
package com.fastcampus.spark import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]): Unit = { println(">>> WordCountScala...") var input = args(0) var output = args(1) var delimiter = " " val sc = new SparkContext() val rdd = sc.textFile(input) println(">>> Line Count : " + rdd.count()) val rdd2 = rdd.flatMap(line => line.split(delimiter)) val rdd3 = rdd2.map(word => (word, 1)) val rdd4 = rdd3.groupBy(tuple => tuple._1) val rdd5 = rdd4.map(tuple_grouped => (tuple_grouped._1, tuple_grouped._2.map(tuple => tuple._2))) val rdd6 = rdd5.map(tuple_grouped => (tuple_grouped._1, tuple_grouped._2.reduce((v1, v2) => v1 + v2))) rdd6.foreach(word_count => println(">>> word count : " + word_count)) val rdd7 = rdd6.sortBy(tuple => tuple._2) rdd7.foreach(word_count => println(">>> word count (count desc) : " + word_count)) val ordering = new Ordering[(String, Int)] { override def compare(x: (String, Int), y: (String, Int)): Int = { if((x._2 compare y._2) == 0) (x._1 compare y._1) else -(x._2 compare y._2) } } val rdd8 = rdd6.sortBy(tuple => tuple)(ordering, implicitly[scala.reflect.ClassTag[(String, Int)]]) rdd8.foreach(word_count => println(">>> word count (count desc, word asc) : " + word_count)) rdd8.saveAsTextFile(output) sc.stop() } }
Scala
복사

컴파일

앞선 Scala 코드와 동일하게 컴파일을 진행하면 된다.
해당 코드를 컴파일 할 때 Classpath에 Spark Core 라이브러리가 존재해야 하지만, 원격 서버에는 라이브러리가 존재하지 않았기 때문에 sbt를 사용하여 빌드를 진행하였다.
Scala의 Build Tool인 sbt가 제공하는 compile, package 태스크를 통해 JAR 아카이브를 생성할 수 있다. Spark Core 라이브러리가 있는 컨테이너에 target/scala-2.13 디렉토리에 생성되어있는 JAR 아카이브를 복사해주었다.

실행

Local 환경

spark-submit 명령을 통해 아카이브를 실행한다.
Classpath와 JAR 아카이브의 경로를 지정해주고, 로컬 파일을 명시하는 file:/ prefix를 파일의 경로에 추가한다. 출력 경로는 파일이 아닌 디렉토리이다.
코드가 빌드 된 Scala의 버전과 SparkSubmit에 내장된 Scala의 버전이 다를 경우 관련 예외를 발생시킬 수 있으니, Spark Shell을 통해 Scala 버전을 확인하고 올바른 버전으로 빌드해야 한다.
> spark-submit --class com.fastcampus.spark.WordCount /home/spark-job_2.12-0.1.0-SNAPSHOT.jar file:/usr/lib/hadoop-3.3.4/LICENSE.txt file:/home/out.txt > cat out/part-00000 (,1152) (the,97) (of,62) (or,62) ...
Bash
복사
이전 작업을 통해 출력 파일이 존재 할 경우에 많은 빅데이터 처리 툴들은 FileAlreadyExistsException과 같은 예외를 발생시킨다. 출력 다렉토리가 이미 존재하지 않는지 확인 한 다음 작업을 진행하는것이 좋다.

Cluster 환경(YARN)

spark-submit 명령을 통해 아카이브를 실행한다.
Executer에게 JAR 아카이브와 입력 파일을 전달하기 위해 HDFS에 먼저 업로드 한다.
> hadoop fs -put /usr//lib/hadoop-3.3.4/LICENSE.txt /user/spark > hadoop fs -put spark-job_2.12-0.1.0-SNAPSHOT.jar /user/spark
Bash
복사
master 속성과 class 경로를 명시해줌으로써 YARN을 통해 자원을 할당받고 작업을 진행할 수 있다.
> spark-submit --master yarn --class com.fastcampus.spark.WordCount hdfs:/user/spark/spark-job_2.12-0.1.0-SNAPSHOT.jar hdfs:/user/spark/LICENSE.txt hdfs:/user/spark/out > hadoop fs -cat /user/spark/out/part-00000 (,1152) (the,97) (of,62) (or,62) ...
Bash
복사