Search

Spark

UC버클리 AMP 랩에서 개발한 In-Memory 방식의 분산 처리 시스템이다.
시스템의 목적은 기존 디스크IO에 대한 지연시간 개선, 메모리를 사용하여 반복적인 작업이나 스트리밍 데이터를 효율적으로 처리하기 위해서이다.
Spark 프레임워크는 그 중심부에 Spark Core, 그 위에 Spark Streaming, Spark SQL, GraphX 등과 같은 각종 모듈들이 존재한다.

Spark Core

Spark 전체의 기초가 되는 분산 작업 처리, 스케줄링, 입출력, API 인터페이스의 기능을 제공해준다.

Spark SQL

SQL문을 기반으로 쿼리를 수행하며 데이터를 처리할 수 있는 질의형 처리 도구이다. Hive 메타스토어를 사용하여 연결할 수 있다.

Spark Streaming

실시간 데이터 처리를 위해 구성한 모듈이다. 데이터 스트림을 개별 세그먼트로 나눈 후, 각 세그먼트의 데이터를 스파크 엔진으로 처리한다.

MLib

머신러닝을 위한 라이브러리이다.

GraphX

그래프 알고리즘을 위한 라이브러리이다.

Resource Management

Hadoop 없이 Spark만으로 데이터 처리를 진행할 수 있으며(Spark Standalone), Spark에서 자체적으로 개발한 자원관리자인 Mesos와 함께 사용할 수 있다. Hadoop 클러스터와 연결하기 위해 Yarn, 클러스터 오케스트레이션을 위해 Kubernetes와 연결할 수 있다.

Spark 구축

hadoop-ecosystem.git
gegurakzi
Hive를 설치해 둔 이미지에 같이 설치한다. CentOS7은 Python2.7을 기본 제공 하지만 Spark는 Python3으로 실행할거라 하둡 기본 이미지인 centos7/hadoop에서 Python3을 설치해주었다.
FROM centos7/hadoop/hive:ha ENV HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop ENV YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop ENV PATH=$PATH:$HADOOP_CONF_DIR RUN mkdir /opt/spark && \ cd /opt/spark && \ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz && \ tar -xzf spark-3.2.1-bin-hadoop3.2.tgz && \ ln -s /opt/spark/spark-3.2.1-bin-hadoop3.2 /opt/spark/current ENV SPARK_HOME=/opt/spark/current ENV PATH=$PATH:$SPARK_HOME/bin ENV PYSPARK_PYTHON=$PYTHON_HOME COPY spark-defaults.conf $SPARK_HOME/conf/spark-defaults.conf COPY spark-env.sh $SPARK_HOME/conf/spark-env.sh
Docker
복사
설정과 환경변수는 다음과 같이 설정해준다.
# $SPARK_HOME/conf/spark-defaults.conf # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. spark.master yarn spark.driver.memory 1g spark.yarn.am.memory 1g spark.executer.memory 2g spark.executer.instances 2 spark.driver.cores 2 spark.executer.cores 1 spark.eventLog.enabled true spark.eventLog.dir hdfs:///user/hadoop/sparklog spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider spark.history.fs.update.interval 10s spark.history.fs.logDirectory hdfs:///user/hadoop/sparklog spark.yarn.historyServer.address master01:18080 # spark.serializer org.apache.spark.serializer.KryoSerializer # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
Plain Text
복사
# $SPARK_HOME/conf/spark-env.sh ... export PYSPARK_DRIVER_PYTHON=/usr/bin/python3 #맨 하단에 추가
Plain Text
복사
개인 컴퓨터에서 연습용으로 구동하기 때문에 메모리는 적게 잡았다. 최적 설정값을 계산하여 추천해주는 사이트도 있다.
Spark 실행 이전에 HDFS에 /user/hadoop/sparklog를 생성해주어야 한다.
compose yml에 4040, 18080, 8042 포트를 추가로 노출시켜주었다. YARN 위에서 Sparm Application Manager를 실행하니 Web에서 접속할 때 4040에서 8088/proxy로 리다이렉션 되는것을 볼 수 있다. 로컬 컨테이너 환경이라 hostname대신 IP로 수정해주어야 한다.
:4040 접속 시 리다이렉션
host 수정

Spark 대화형 프로그램

Spark는 Scala, Python을 사용하여 대화형으로 Spark 프로그램을 실행할 수 있다.
Standalone의 Cluster Mode에서는,
사용자의 작업은 Cluster Manager로 부터 Spark Driver Program에게 건네져 Spark Context에서 관리된다.
Spark Application Master에 의해 여러개의 작업으로 나누어진후, Worker node에 있는 Spark Executer에서 실행한다.
YARN의 관리를 받는 Client Mode에서는,
사용자의 작업은 Client로 부터 YARN Container 내의 Spark Driver Program에게 건네진다.
YARN Resource Manager로 부터 작업에 필요한 자원을 할당받은 후, YARN Node Manager 내의 Spark Executer에서 실행한다.
대화형 프로그램에서의 Scala를 이용한 기본적 WordCount 작업은 다음과 같이 실행할 수 있다.
scala> val text_file = sc.textFile("/path/to/sample_file/in/hdfs.txt") scala> val words = text_file.flatMap(line => line.split(" ")) scala> val wordMap = words.map(word => (word, 1)) scala> val result = wordMap.reduceByKey((a, b) => a + b) scala> result.collect()
Scala
복사

Spark 응용 프로그램

Java, Python, Scala 언어로 작성된 코드를 이용해 작업을 실행한다.
WordCount의 실행 흐름은 다음과 같다.
text_file = sc.textFile("hdfs://...") counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.saveAsTextfile("hdfs://...")
Python
복사
Python의 경우, spark-submit을 이용하여 바로 작업을 할당할 수 있다.
> $SPARK_HOME/bin/spark-submit --master local[4] /path/to/sample.py
Python
복사
로컬환경에서 4개의 스레드를 사용해 작업을 수행하게 된다.
Java, Scala의 경우, 컴파일과 빌드과정을 거친 .jar 파일을 통해 작업을 할당할 수 있다.
> $SPARK_HOME/bin/spark-submit --class "MainClass" --master local[4] /path/to/sample.jar
Python
복사