Kafka를 통해 받은 메시지는 기본적으로 Bytes array 형태이다.
이를 JSON으로 재구조화 하기 위해선 String으로 형변환이 먼저 일어나야 한다.
binary → String → JSON의 과정으로 형변환이 이루어지는데, 다음과 같이 형변환을 할 수 있다.
먼저, Kafka source를 통해 실시간으로 Dataframe을 가져오기 위해선, 다음과 같이 Kafka와 연동할 수 있다. SparkSession은 Kafka consumer가 된다.
streaming_df = session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("failOnDataLoss","False") \
.option("subscribe", topic) \
.load() \
streaming_df.printSchema()
#Result
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Python
복사
binary 형태의 key와 value를 문자열로 형변환 하기 위해 Dataframe의 column을 cast 한다.
streaming_df = session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("failOnDataLoss","False") \
.option("subscribe", topic) \
.load() \
.withColumn("key", col("key").cast("string")) \
.withColumn("value", col("value").cast("string"))
streaming_df.printSchema()
#Result
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Python
복사
문자열인 value를 다시 JSON으로 파싱하기 위해선 스키마를 명시해주어야 한다.
from pyspark.sql.types import *
schema = StructType(
[
StructField("serviceToken", StringType()),
StructField("clientId", LongType()),
StructField("sessionId", StringType()),
StructField("event", StringType()),
StructField("targetId", StringType()),
StructField("positionX", IntegerType()),
StructField("positionY", IntegerType()),
StructField("location", StringType()),
StructField("timestamp", LongType())
]
)
Python
복사
정의한 스키마를 통해 from_json() 메소드로 문자열을 구조화 한다.
streaming_df = session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
.option("failOnDataLoss","False") \
.option("subscribe", topic) \
.load() \
.withColumn("key", col("key").cast("string")) \
.withColumn("value", from_json(col("value").cast("string"), schema))
streaming_df.printSchema()
#Result
root
|-- key: string (nullable = true)
|-- value
| |--serviceToken string (nullable = true)
| |--clientId string (nullable = true)
| |-- ...
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Python
복사
More…
Schema Regisrty를 사용하면 더욱 복잡한 플랫폼 위에서 스키마를 잘 명시할 수 있고
JSON보다 직렬화 성능이 좋은 Avro 등을 사용할 수도 있다.