실시간의 정의는 사용자의 요구사항에 따라 그 범위가 바뀔 수 있다. 예컨데 수초에서 수십초 간 일어난 이벤트를 작은 배치에 담아 시간적 범위에 따라 처리할 수 있고, 이벤트가 일어난 후 일정 간격을 두는 지연에 따라 처리할 수 있다.
Spark Streaming은 Event-driven 아키텍쳐로 구현되어 낮은 수준의 지연시간과 일정한 응답속도를 보장하여 예측 가능한 성공을 제공한다.
Event-driven Architecture
Event
이벤트란 시스템 하드웨어 또는 소프트웨어 상태의 변화 또는 사건의 발생을 의미한다. 시스템의 다른 부분에 이벤트가 발생했음을 알리기 위해 해당 시스템에서 보내는 메시지 또는 알림을 뜻하는 이벤트 알림과는 다르다.
이벤트는 마우스 클릭이나 키보드 입력과 같은 사용자 또는 센서 출력과 같은 외부 소스에서 생성되거나 프로그램 로딩과 같이 내부 시스템에서 생성될 수 있다.
Event-driven Architecture
이벤트 기반 아키텍처는 이벤트 생성자와 이벤트 소비자로 구성되어 있다.
이벤트 생성자
이벤트 생성자는 이벤트를 감지하고 메시지로 해당 이벤트를 나타낸다. 생성자는 이벤트 소비자 또는 이벤트 결과를 알지 못한다.
이벤트 처리 로직 또는 플랫폼
중간과정의 모듈 또는 과정 자체를 의미하며, 올바른 응답을 실행하고 적합한 소비자에게 활동을 다운스트림으로 전송한다. 이러한 다운스트림 활동은 이벤트 결과가 나타나는 위치이다.
이벤트 소비자
이벤트를 비동기식으로 처리하는 이벤트 처리 플랫폼의 이벤트 채널을 통해 해당 이벤트 생성자에서 이벤트 소비자로 전송된다. 이벤트 발생 시 이벤트 소비자는 알림을 받아야 하며, 이벤트를 처리할 수도 있고 이벤트의 영향을 받기만 할 수도 있다.
이벤트 기반 아키텍처는 게시/구독(pub/sub) 모델 또는 이벤트 스트림 모델을 기반으로 구성된다.
Publisher, Subscriber model
이벤트 스트림 구독 기반의 메시징 인프라이다. 이 모델을 사용하면 이벤트 발생 후 또는 게시 후에 알림을 받아야 하는 구독자에게 이벤트가 전송된다.
Event Stream model
이벤트 스트림 모델을 사용하면 이벤트가 로그에 기록된다. 이벤트 소비자는 이벤트 스트림을 구독하진 않지만 스트림의 모든 부분에서 읽기가 가능하며 언제든지 스트림에 참여할 수 있다.
이벤트 스트림에는 세 가지의 유형이 있다.
•
이벤트 스트림 처리는 Apache Kafka와 같은 데이터 스트리밍 플랫폼을 사용하여 이벤트를 수집하고 이벤트 스트림을 처리하거나 변환한다. 이벤트 스트림 처리는 이벤트 스트림에서 의미 있는 패턴을 감지하는데 사용할 수 있다.
•
단순 이벤트 처리는 이벤트가 이벤트 소비자에게 즉각적으로 동작을 트리거한다.
•
복합 이벤트 처리는 이벤트 소비자가 패턴을 감지하기 위해 일련의 이벤트를 처리한다.
Batch Processing and Stream Processing
배치 처리 | 스트리밍 처리 | |
데이터 범위 | 데이터 세트의 모든 또는 대부분의 데이터를 쿼리하거나 처리 | 롤링타임 윈도우 내 데이터 또는 가장 최신 데이터 레코드의 데이터를 쿼리하거나 처리 |
데이터 크기 | 대규모 데이터 배치 | 일부 레코드로 구성된 마이크로 배치 또는 개별 레코드 |
성능 | 몇 분에서 몇 시간의 지연시간 | 몇 초 또는 몇 밀리초의 지연 시간이 필요 |
분석 | 복잡한 분석 | 간단한 응답 가능, 수집 및 롤링 지표 |
DStream
DStream(DIscretized Stream; 이산 스트림)은 Spark Streaming에서의 데이터셋 추상화 객체이다. Spark의 기본 데이터셋 투상화 객체인 RDD를 지정한 시간간격을 기준으로 나누어 집합으로 구성한 객체이다.
Spark Streaming은 Apache Kafka, Apache Flume, HDFS, Kinesis, TCD 소켓 등 다양한 소스로부터 데이터를 공급받고, 데이터들은 map, reduce, window 등의 연산을 통해 가공되어 DB, FS 등에 적재하거나 대쉬보드를 통해 시각화 된다.
연속된 스트림 데이터의 분산처리를 위해 정의된 시간 간격 동안 이벤트 데이터를 작은 배치 단위로 나누는데, DStream이라는 추상 개념의 단위로 나누어지게 된다. 이벤트 일괄 처리를 우선 이산적으로 나누어진 데이터는 Spark Engine을 통해 처리된다.
DStream 내의 각 Micro-batch는 일괄 처리 간격 동안 수집된 이벤트를 나타내며, RDD 개념을 통해 구현된다. Spark Engine은 이렇게 각 Micro-batch 단위의 세그먼트 데이터를 처리한다.
Micro-batch를 sliding-window 방식으로 특정 시간 범위 내의 window를 이용하여 처리할 수도 있다. Spark Streaming Window Operation은 window length와 sliding interval을 조정하여 Windowed DStream 구조를 사용한다.
하지만 DStream은 Python, Java, Scala, R의 객체와 함수에 의존적이고, Micro-batch 방식으로만 작동하여 연속적인 처리가 불가능하고, 이벤트 발생 시간 기준의 처리를 지원하지 못한다.
Spark Structured Streaming
Spark 2.0 이후로 추가된 Spark SQL Engine 상에 구축된 실시간 처리 프레임워크이다.
Spark Streaming 응용 프로그램
Apache Kafka, Apache Flume, HDFS, Kinesis, TCP 소켓 등 다양한 소스들 중 TCP 소켓을 통해 프로그램을 작성해본다. Scala를 사용하여 작성한 코드는 다음과 같다.
import org.apache.spark._
import org.apache.spark.streaming._
//1. StreamingContext 만들기
// SparkContext에서 StreamingContext를 생성한다.
// StreamingContext를 만들 때 일괄 처리의 interval을 second 단위로 지정한다.
// 두개의 local working thread, 1초의 batch interval 설정값을 지정하였다.
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountScala")
val ssc = new StreamingContext(conf, Seconds(1))
//2. StreamingContext에서 DStream 만들기
// 입력 소스에 대한 입력 DStream을 생성한다.
// localhost:9999에 연결된 DStream을 생성하였다.
val lines = ssc.socketTextStream("localhost", 9999)
//3. DStream에 Transformation 적용하기
// 다음과 같은 순서의 변환을 거친다.
// 1) 파일에서 한 번에 한 줄의 텍스트 수신
// 2) 각 줄을 단어로 분할
// 3) map, reduce 연산자 사용
// 4) 각 단어가 나타나는 횟수 계산
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_+_)
//4. 결과 출력하기
// 출력 작업을 적용하여 대상 시스템에 변환 결과를 푸시한다.
// 콘솔 출력에서 계산을 통해 각 실행결과를 표시한다.
wordCounts.print()
//5. 응용 프로그램 시작 및 종료 신호를 수신한다.
ssc.start()
ssc.awaitTermination()
Scala
복사
Java를 사용하여 작성한 코드는 다음과 같다.
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.stream.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountJava");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.Seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print()
jssc.start()
jssc.awaitTermination()
Java
복사
Python를 사용하여 작성한 코드는 다음과 같다.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "StreamingWordCountPython")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
Python
복사