Search

Spark Standalone, Spark on K8s with Livy (feat. Ozone)

상태
완료
시기
2023/11/06 → 2024/01/16
역할
BigData
SW개발
Deployment
참여인원
1
소속
회사
사용 기술

시작 - Livy를 통한 Spark Cluster 작업 제출

사내 개발중인 SW가 큰 용량의 데이터셋을 Spark로 연산 시 OOM 에러가 뜨는것을 발견하였다.
이 SW는 Spark 자체를 내장하고 local에서 4개의 executor를 띄워 연산하기에, JVM이 데이터의 용량을 버티지 못하게 된 것이다.
이 때문에 SW와 Spark 엔진을 분리하고 Spark 자체가 K8s 에서 자원을 할당받아 연산을 수행하도록 구조를 수정하였다.
또한 Spark 클러스터에 간편하게 작업을 제출할 수 있도록 Apace Livy를 통해 작업을 관리할 수 있도록 구성 하였다.
Livy의 동작 구조는 다음과 같다.
Livy Client는 REST API 또는 Programmatic API를 통해 Livy Server에게 Spark 작업 수행 요청을 제출할 수 있다. Live Server는 YARN, Mesos, K8s 클러스터에서 클러스터에서 자원을 할당받아 Spark Driver를 생성하고, SparkContext를 관리한다. 마지막으로, Executor를 통해 연산된 결과값을 Livy server가 client에게 전달한다.
K8s 상의 Spark-Livy 환경 구성을 위해 Livy Server를 구성해보았다.

0. Prerequisite

Spark가 K8s에 자원을 요청할 때의 로직은 Spark의 버전마다 상이하다.
K8s가 상시로 업데이트 될 뿐만 아니라 지원되는 API자체가 다른 경우도 있기 때문이다.
사용한 버전은 다음과 같다.
Kubernetes 1.27
Spark 3.5.1
Livy 0.8.1-incubating
Helm 3.13.2
다른 버전을 사용한다면 이미지를 직접 빌드해서 사용할 수 있다.
해당 레포를 참고하여 버전에 알맞는 도커 이미지를 생성하였다.
최종적으로 빌드한 이미지는 다음에서 찾을 수 있다.

1. Livy server Helm chart

해당 레포를 참고하여 Livy Server를 배포하였다.
위 Helm 차트는 K8s 1.19 이하 버전에서 지원되기 때문에 Manifest를 조금 수정해준다.
또한 새로 빌드한 이미지 사용, Livy Server가 실행시킨 session들의 정보를 저장할 수 있는 PVC 생성, Livy on K8s에서 지원하지 않는 Livy Server Recovery 옵션 제거 등 몇가지를 수정한다. 예로, v1beta와 같은 deprecated 자원 API 경로 등이 있었다.
원격 FS를 사용하지 않기 때문에 K8s의 PVC를 통해 Spark에 제출할 작업인 Jar 파일을 관리하기로 했다. NFS를 통해 PVC를 생성하고, Livy의 세션 데이터들이 저장되는 /root/.livy-sessions 디렉토리를 Driver/Executer Pod와 공유하기로 하였다.
livy-jars-pvc.yaml
kind: PersistentVolumeClaim apiVersion: v1 metadata: name: livy-jars labels: app.kubernetes.io/name: {{ include "livy.name" . }} helm.sh/chart: {{ include "livy.chart" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} spec: accessModes: - ReadWriteOnce resources: requests: storage: 20G storageClassName: "nfs-client"
YAML
복사
driver-pod-template.yaml
apiVersion: v1 kind: Pod spec: containers: - volumeMounts: - mountPath: /root/.livy-sessions name: livy-external-jars-share volumes: - name: livy-external-jars-share persistentVolumeClaim: claimName: livy-jars
JavaScript
복사
JahStreetOrg에서 제공하는 이미지에는 Livy와 Spark 설정을 entrypoint.sh를 통해 간편하게 구성할 수 있도록 되어있다. 0dash1uppercase 파싱을 사용하여 spark.kubernetes.driver.podTemplateFile 변수를 설정해주었다.
values.yaml(Livy Server의 Helm chart)
... LIVY_SPARK_KUBERNETES_DRIVER_POD1TEMPLATE1FILE: {value: "/root/.livy-sessions/driver.yaml"} LIVY_SPARK_KUBERNETES_EXECUTOR_POD1TEMPLATE1FILE: {value: "/root/.livy-sessions/driver.yaml"} ...
YAML
복사

2. Job 코드 생성 및 JAR 빌드

Livy는 Java에서 LivyClientJob 등의 클래스를 통해 Programmatic API를 지원한다.
LivyClientBuilder를 통해 인스턴스를 생성하면 Livy Server에서 Spark Driver를 생성해주는 방식이다.
public static LivyClient livyClient() throws IOException, URISyntaxException { return new LivyClientBuilder() .setURI(URI.create("http://<livy server>:<port(default 80)>")) .setConf("livy.spark.master", "k8s:https://<k8s control-plane>:<port>") .setConf("spark.app.name", "livy-test") .setConf("spark.executor.instances", "2") .setConf("spark.kubernetes.file.upload.path", "/root/.livy-sessions") .setConf("spark.driver.extraClassPath", "/root/.livy-sessions") .setConf("spark.executor.extraclasspath", "/root/.livy-sessions") .build(); }
Java
복사
livyClient 인스턴스의 uploadJar() API를 통해 Spark 작업 Jar 파일을 업로드할 수 있다. 이후, submit() API를 통해 spark-submit을 수행할 수 있다.
public Row[] process() throws ExecutionException, InterruptedException, IOException, URISyntaxException { LivyClient client = LivyConfig.livyClient(); File jarFile = new File("/path/to/jar-which-includes-SQLJob-class/livy.jar"); client.uploadJar(jarFile).get(); Row[] rows = client.submit(new SQLJob("SELECT * FROM dummy", 2)).get(); for (Row row : rows) { System.out.println(row.get(0)); System.out.println(row.get(1)); System.out.println(row.get(2)); } client.stop(true); return new ResponseEntity<>(rows, HttpStatus.OK); }
Java
복사
Spark에 submit한 작업은 미리 정의한 SQLJob 클래스인데, 이는 Livy의 Job 인터페이스를 구현한 작업 클래스이다. 다음과 같이 간단하게 Spark SQL을 사용할 수 있도록 구성해두었다.
public class SQLJob implements Job<Row[]> { private final String sql; private final int number; public SQLJob(String sql, int number) { this.sql = sql; this.number = number; } public Row[] call(JobContext ctx) throws Exception { SparkSession sparkSession = ctx.sqlctx().sparkSession(); Dataset<Row> dummyDataframe = sparkSession.createDataFrame( this.getDummyData(), getDummySchema()); dummyDataframe.createTempView("dummy"); Dataset<Row> df = ctx.sqlctx().sql(this.sql); requireNonNull(df, "dataset"); Row[] rows = df.takeAsList(number).toArray(new Row[0]); return rows; } private List<Row> getDummyData() { Row row1 = RowFactory.create(1, "Alice", 25); Row row2 = RowFactory.create(2, "Bob", 30); Row row3 = RowFactory.create(3, "Charlie", 22); return Arrays.asList(row1, row2, row3); } private static StructType getDummySchema() { return DataTypes.createStructType( new StructField[]{ DataTypes.createStructField("id", DataTypes.IntegerType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("age", DataTypes.IntegerType, false) }); }
Java
복사
이를 Jar 파일로 빌드한 후, Livy server의 공유 디렉토리에 업로드하여 driver와 executor가 Jar 파일을 공유받을 수 있도록 하였다.

3. Livy submit 테스트

Livy server를 통해 위의 작업을 제출해보았다.
작업을 제출하자, driver와 executor pod가 실행되며
작업이 종료되면 모든 pod이 제거되고 driver의 로그만 남게 된다.

개선 - Ozone과의 호환성 해결, 인프라 점검

사내 개발 SW가 내장하고있던 Spark를 떼어내 클러스터로 구축하다가 Ozone FS와 호환이 안되는 문제가 풀리지 않아 구축을 반쯤 포기한 상태에 있었다.
그래도 개인적으로 구축을 다시 도전해보고자 처음부터 다시 차근차근 시도해보기로 하였다.
몇주 간의 끝없는 디버깅 지옥을 방지하기 위해 이번에는 Zeppelin을 도입하여 인프라 점검을 간단하게 수행할 수 있도록 하였다. Spark 작업을 Zeppelin을 통해 미리 실행하여 디버깅 하려는 계획을 세웠다.

0. 겪은 난항

0-1. Apache Ozone 호환 문제

Spark가 K8s 클러스터에 실행되고있는 Ozone FS에 접근하기 위해선 특정 의존성이 필요하다고 한다.
이 의존성은 Spark가 가지고있는 Hadoop 클라이언트와 호환되는 OzoneFilesystem을 지원해주는 모양이다.
공식 문서에서도 의존성 하나만 추가하면 정상적으로 연결이 되는것 같은데, 정보 없는 에러 로그가 무한히 발행했다.
Reflection 관련 문제인것 같긴 한데, 로그 트레이싱이 전혀 안돼서 디버깅을 못하고 직감적으로만 의존성 교체를 반복하여 일주일을 보냈다.
결국 당장 해결을 포기한 채 도입을 무기한 연기 한지 일주일, 한 JIRA의 이슈를 찾게 되었다.
여기서 말하길, 통상적으로 빌드된 Spark의 hadoop3 에서 사용하는 ProtobufRpcEngine 객체는 com.google.protobuf 패키지에서 오는데 ozone-hadoop3에서 사용하는 ProtobufRpcEngine 객체는 org.apache.hadoop.shaded.com.google.protobuf의 shaded 객체를 사용한다는 것이다.
이 shaded를 제거한 jar을 찾아 의존성에 새로 추가해주었더니 더이상 오류가 발생하지 않았다.
각기 다른 오픈소스를 사용할 때 의존성 체크를 해야한다는 점을 깨달았다.

0-2. 난잡한 빌드파일

위 문제는 Hadoop 2 버전에서는 발생하지 않는다. 하지만 일주일간 Spark, Hadoop, Ozone의 버전을 모조리 바꿔가며 테스트를 진행했었는데, 같은 증상을 해결할 수 없었다.
지금 예상하기로는 난잡한 classpath와 중복된 의존성들이 서로 충돌을 일으켰다 생각이 드는데,
다른 버전을 사용하여 빌드했을 때, 이전 환경과 distribution의 버전만 다른 환경이라 단정할 수 없었던, 크게 말하면 내가 도대체 어떤 테스트를 진행했는지 기억이 나지 않는 것이 아주 심각한 문제라는 것이다.
난잡한 빌드파일과 과하게 분리된 멀티 스테이징, entrypoint 임의 수정 등 당장의 문제를 해결하기 위해 관리하지 않은 파일들을 정리하고자 모든 과정을 처음부터 시작하였다.

1. Spark Standalone with Livy

베이스가 되는 차트는 다음과 같다.
Master 1개, Worker 3개, Zeppelin 1개로 이루어지는 간단한 클러스터이다.
해당 차트는 k8s.gcr.io/spark, apache/zeppelin 이미지를 기본으로 사용했지만, Ozone 의존성 추가 및 사내 애플리케이션 작업 코드를 넣어주기 위해 새로 빌드하였다.

1-1. 컨테이너 이미지

차트에 사용하기 위해 빌드한 이미지는 다음과 같다.
1.
builder Spark와 Livy distribution을 빌드해줄 빌드 컨테이너용 이미지
2.
spark Spark distribution 및 가장 기본적인 설정을 담는 이미지
3.
livy Livy distribution 및 Livy의 기본 설정을 담는 이미지
4.
livyserver livy-server를 가동할 컨테이너의 이미지
5.
zeppelin Zeppelin 노트북을 가동할 컨테이너의 이미지
5개의 이미지를 순차적으로 빌드했으며, livyserver에 필요한 Livy distribution과 zeppelin에 필요한 Scala, Spark, Livy distribution 을 멀티 스테이징으로 공유하였다.
이미지 빌드에 사용된 Dockerfile들은 다음에서 확인할 수 있다.
Livy 이미지를 빌드하기 위해 참고한 Dockerfile은 다음과 같다.

1-2. Helm 차트

Livy distribution을 포함한 Spark master, worker 컨테이너의 command는 여기서 정의되었다.
spark-worker-deployment.yaml 컨테이너의 command 에 Spark master의 경로를 직접 할당하여 Spark Worker 데몬을 실행하도록 하였다.
Helm 차트는 다음에서 확인할 수 있다.
클러스터는 다음과 같이 배포할 수 있다.
# helm install <Chart name> <path/to/chart> cd livy-zeppelin/spark-standalone/livy helm install livy .
Bash
복사

1-3. Zeppelin 내 Livy interpreter 설정

zeppelin.livy.url 설정만 수정해주면 된다.
Zeppelin의 web ui에서 interpreter 설정, livy 검색, zeppelin.livy.url 설정값을
http://<Livy Server service name>:<Livy Server service port>
로 바꾸어주면 된다.

2. Spark on K8s with Livy

Spark standalone 구성을 미리 해놓았기에 간단하게 구성할 수 있었다.
앞서 제작했던 Spark standalone 이미지에 Spark Driver, Spark Executor 컨테이너의 entrypoint만 추가해주고, Spark on K8s 구성을 위해 Spark 공식 문서에서 참고해 설정만 맞춰주면 된다.

2-1. 컨테이너 이미지

이전과 같이 빌드한 이미지는 다음과 같다. Spark Driver, Executor 컨테이너로 사용할 livy 이미지에 kubernetes-entrypoint.sh와 Driver, Executor의 podTemplate를 추가해주었다.
1.
builder Spark와 Livy distribution을 빌드해줄 빌드 컨테이너용 이미지
2.
spark Spark distribution 및 가장 기본적인 설정을 담는 이미지
3.
livy Livy distribution 및 Livy의 기본 설정을 담는 이미지
4.
livyserver livy-server를 가동할 컨테이너의 이미지
5.
zeppelin Zeppelin 노트북을 가동할 컨테이너의 이미지
이미지 빌드에 사용된 Dockerfile들은 다음에서 확인할 수 있다.

2-2. Helm 차트

Spark Master, Worker의 deployment는 필요없으니 관련된 자원들을 제거해주었다.
Helm 차트는 다음에서 확인할 수 있다.
클러스터는 다음과 같이 배포할 수 있다.
# helm install <Chart name> <path/to/chart> cd livy-zeppelin/spark-standalone/livy helm install livy .
Bash
복사
livy는 일반적으로 원격 파일 저장소(hdfs 등)에서 spark.jars 등으로 넘겨줘야 하는 작업에 필요한 의존성을 전달해주는데, 현재 Helm 차트는 nfs로 파일 공유를 통해 넘겨준다. 이는 interactive 하지 않은 Spark 작업의 특성 상 좋지 않은 구조라 생각하며, 테스트용이 아니라면 해당 설정을 수정하기를 바란다.

2-3. Zeppelin 내 Livy interpreter 설정

똑같이 zeppelin.livy.url 설정만 수정해주면 된다.
Zeppelin의 web ui에서 interpreter 설정, livy 검색, zeppelin.livy.url 설정값을
http://<Livy Server service name>:<Livy Server service port>
로 바꾸어주면 된다.
금방 구축할 줄 알았는데, 빈약한 공식문서들도 그렇고 레퍼런스가 전무하다보니 깨나 시간이 오래 걸린 과정이었다. 앞으로 작업 과정 정리 잘 하자는 다짐을 가지게 해주었다.