Apache Spark는 UC버클리 AMP 랩에서 개발한 In-Memory 방식의 분산 처리 엔진으로, 대용량 데이터에 대한 컴퓨팅 연산을 다수의 서버로 구성된 클러스터에서 분산병렬처리 하기 위한 오픈소스 엔진이다.
또한, Batch, SQL, Streaming, ML, Graph 등 다양한 작업을 Spark 하나의 엔진으로 처리할 수 있는 통합 데이터 처리 엔진이다.
여기서 엔진이란, 내부에 데이터를 저장하지 않고 외부 데이터의 가공 연산만을 도맡는 것을 뜻한다.
Spark의 목적은 기존 디스크IO에 대한 지연시간 개선, 메모리를 사용하여 반복적인 작업이나 스트리밍 데이터를 효율적으로 처리하기 위해서이다.
Spark는 그 중심부에 Spark Core, 그 위에 Spark Streaming, Spark SQL, GraphX 등과 같은 각종 모듈들이 존재한다.
Spark Core
Spark 전체의 기초가 되는 분산 작업 처리, 스케줄링, 입출력, API 인터페이스의 기능을 제공해준다.
Spark SQL
SQL문을 기반으로 쿼리를 수행하며 데이터를 처리할 수 있는 질의형 처리 도구이다. Hive 메타스토어를 사용하여 연결할 수 있다.
Spark Streaming
실시간 데이터 처리를 위해 구성한 모듈이다. 데이터 스트림을 개별 세그먼트로 나눈 후, 각 세그먼트의 데이터를 스파크 엔진으로 처리한다.
MLib
머신러닝을 위한 라이브러리이다.
GraphX
그래프 알고리즘을 위한 라이브러리이다.
Resource Management
Hadoop 없이 Spark만으로 데이터 처리를 진행할 수 있으며(Spark Standalone), Spark에서 자체적으로 개발한 자원관리자인 Mesos와 함께 사용할 수 있다. Hadoop 클러스터와 연결하기 위해 Yarn, 클러스터 오케스트레이션을 위해 Kubernetes와 연결할 수 있다.
Spark RDD
Spark vs MapReduce
머신러닝, 그래프와 같은 작업은 반복적인 MapReduce 작업을 요한다. MapReduce는 기본적으로 한번의 Map과 한번의 Reduce 작업을 거친 후, HDFS에 출력값을 저장하게 된다. 많은 양의 반복 작업은 복제, 직렬화, 디스크 IO와 네트워크로 인한 오버헤드 증가의 원인이 되어 성능 저하가 일어나게 된다.
이에 비해, Spark는 메모리상에 데이터를 로드하여 디스크 IO를 없애고 성능을 향상시켰다. 이를 통해 특정 작업에서 MapReduce보다 10-100배 빠른 성능을 보이게 되었다.
또한, Map과 Reduce 뿐만 아니라 flatMap>filter>map>reduceByKey 체이닝 등의 상위수준 API를 제공하고, 반복 연산이 필요한 데이터를 캐싱하여 빠른 데이터 접근을 제공한다.
Spark 설치
Python3이 설치되어있지 않아 다음과 같이 Python과 Spark를 설치해주었다.
# python3 설치
> yum install python3 -y
# pip 설치
> curl https://bootstrap.pypa.io/pip/3.6/get-pip.py | python3
Bash
복사
> cd /home
> curl -L -O https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
> tar -xvzf spark-3.3.1-bin-hadoop3.tgz
> mv spark-3.3.1-bin-hadoop3 /usr/lib
> export SPARK_HOME=/usr/lib/spark-3.3.1-bin-hadoop3
> export PATH=$PATH:$SPARK_HOME/bin
Bash
복사
Spark을 YARN 위에서 실행하기 위해선 HADOOP_CONF_DIR 혹은 YARN_CONF_DIR 환경변수가 설정 되어있어야 한다. 따로 YARN 설정 디렉토리는 만들지 않았기 때문에 HADOOP_CONF_DIR 변수만 설정 해주었다.
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
Bash
복사
Spark 대화형 프로그램
Spark는 Scala, Python을 사용하여 대화형으로 Spark 프로그램을 실행할 수 있다.
Standalone의 Cluster Mode에서는,
사용자의 작업은 Cluster Manager로 부터 Spark Driver Program에게 건네져 Spark Context에서 관리된다.
Spark Application Master에 의해 여러개의 작업으로 나누어진후, Worker node에 있는 Spark Executer에서 실행한다.
YARN의 관리를 받는 Client Mode에서는,
사용자의 작업은 Client로 부터 YARN Container 내의 Spark Driver Program에게 건네진다.
YARN Resource Manager로 부터 작업에 필요한 자원을 할당받은 후, YARN Node Manager 내의 Spark Executer에서 실행한다.
spark-shell
YARN 위에서 spark-shell을 실행하기 위해 다음과 같이 명령어를 작성하였다.
> spark-shell --master yarn
Python 3.6.8 (default, Nov 16 2020, 16:55:22)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/08 06:33:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/08 06:33:18 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.1
/_/
Using Python version 3.6.8 (default, Nov 16 2020 16:55:22)
Spark context Web UI available at http://8aeca48e0e41:4040
Spark context available as 'sc' (master = yarn, app id = application_1669699513691_0016).
SparkSession available as 'spark'.
Bash
복사
다음은 CSV파일을 통해 불러온 Dataframe을 성별 기준으로 Groupping 하는 간단한 코드이다.
scala> val df = spark.read.option("header", false).csv("hdfs://localhost:9000/user/malachai/dataset/employees")
scala> val df2 = df.groupBy("_c4").agg(count("*").as("count"))
scala> df2.show(3)
+---+------+
|_c4| count|
+---+------+
| F|132753|
| M|198850|
+---+------+
Bash
복사
WordCount 작업은 다음과 같이 실행할 수 있다.
scala> val text_file = sc.textFile("/path/to/sample_file/in/hdfs.txt")
scala> val words = text_file.flatMap(line => line.split(" "))
scala> val wordMap = words.map(word => (word, 1))
scala> val result = wordMap.reduceByKey((a, b) => a + b)
scala> result.collect()
Scala
복사
pysqark
sparkR
spark-sql
spark-sql은 SQL쿼리를 위한 대화형 쉘이다.
> spark-sql --master yarn
Bash
복사
HDFS 내 CSV 파일들의 데이터를 읽기 위해선 다음과 같이 쿼리를 입력한다.
> select count(*) from csv.`hdfs://localhost:9000/user/malachai/dataset/housetrade/*.csv`;
Bash
복사
읽어들일 CSV 파일들은 다음과 같다.
[root@8aeca48e0e41 dataset]# hadoop fs -ls /user/malachai/dataset/housetrade
Found 17 items
-rw-r--r-- 1 root supergroup 33855329 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2006.csv
-rw-r--r-- 1 root supergroup 24030764 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2007.csv
-rw-r--r-- 1 root supergroup 24043966 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2008.csv
-rw-r--r-- 1 root supergroup 22852015 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2009.csv
-rw-r--r-- 1 root supergroup 15300970 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2010.csv
-rw-r--r-- 1 root supergroup 18455513 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2011.csv
-rw-r--r-- 1 root supergroup 13343231 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2012.csv
-rw-r--r-- 1 root supergroup 17543374 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2013.csv
-rw-r--r-- 1 root supergroup 23302604 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2014.csv
-rw-r--r-- 1 root supergroup 34759198 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2015.csv
-rw-r--r-- 1 root supergroup 34441776 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2016.csv
-rw-r--r-- 1 root supergroup 31389885 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2017.csv
-rw-r--r-- 1 root supergroup 27813383 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2018.csv
-rw-r--r-- 1 root supergroup 21546193 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2019.csv
-rw-r--r-- 1 root supergroup 30473817 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2020.csv
-rw-r--r-- 1 root supergroup 23680912 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2021.csv
-rw-r--r-- 1 root supergroup 9315888 2022-12-08 07:13 /user/malachai/dataset/housetrade/trade2022.csv
Bash
복사
출력값을 통해 CSV의 header가 값으로 입력되었다는 것을 볼 수 있다. DESCRIBE 쿼리를 통해 컬럼의 기본값을 알 수 있다.
spark-sql> select * from csv.`hdfs://localhost:9000/user/malachai/dataset/housetrade/*.csv` limit 2;
접수연도 자치구코드 자치구명 법정동코드 법정동명 지번구분 지번구분명 본번 부번 건물명 계약일 물건금액(만원) 건물면적(㎡) 토지면적(㎡) 층 권리구분 취소일 건축년도 건물용도 신고구분 신고한 개업공인중개사 시군구명
2015 11650 서초구 10800 서초동 1 대지 1337 0014 이즈타워 20160203 21900 33.25 42.340000 13 NULL NULL 2006 오피스텔 NULL NULL
spark-sql> describe select * from csv.`hdfs://localhost:9000/user/malachai/dataset/housetrade/*.csv`;
_c0 string
_c1 string
_c2 string
_c3 string
...
Bash
복사
이를 해결하기 위해 임시 view를 생성한다. 다음과 같은 쿼리를 통해 view를 생성할 수 있다.
spark-sql> create temporary view temp_view_housetrade using csv options (path="hdfs://localhost:9000/user/malachai/dataset/housetrade/*.csv", header="true");
spark-sql> describe temp_view_housetrade;
접수연도 string
자치구코드 string
자치구명 string
법정동코드 string
법정동명 string
지번구분 string
...
Bash
복사