Search

Spark Sql - Kafka의 binary 형태인 value 필드 JSON으로 cast하기

Kafka를 통해 받은 메시지는 기본적으로 Bytes array 형태이다.
이를 JSON으로 재구조화 하기 위해선 String으로 형변환이 먼저 일어나야 한다.
binary → String → JSON의 과정으로 형변환이 이루어지는데, 다음과 같이 형변환을 할 수 있다.
먼저, Kafka source를 통해 실시간으로 Dataframe을 가져오기 위해선, 다음과 같이 Kafka와 연동할 수 있다. SparkSession은 Kafka consumer가 된다.
streaming_df = session \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("failOnDataLoss","False") \ .option("subscribe", topic) \ .load() \ streaming_df.printSchema() #Result root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
Python
복사
binary 형태의 key와 value를 문자열로 형변환 하기 위해 Dataframe의 column을 cast 한다.
streaming_df = session \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("failOnDataLoss","False") \ .option("subscribe", topic) \ .load() \ .withColumn("key", col("key").cast("string")) \ .withColumn("value", col("value").cast("string")) streaming_df.printSchema() #Result root |-- key: string (nullable = true) |-- value: string (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
Python
복사
문자열인 value를 다시 JSON으로 파싱하기 위해선 스키마를 명시해주어야 한다.
from pyspark.sql.types import * schema = StructType( [ StructField("serviceToken", StringType()), StructField("clientId", LongType()), StructField("sessionId", StringType()), StructField("event", StringType()), StructField("targetId", StringType()), StructField("positionX", IntegerType()), StructField("positionY", IntegerType()), StructField("location", StringType()), StructField("timestamp", LongType()) ] )
Python
복사
정의한 스키마를 통해 from_json() 메소드로 문자열을 구조화 한다.
streaming_df = session \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \ .option("failOnDataLoss","False") \ .option("subscribe", topic) \ .load() \ .withColumn("key", col("key").cast("string")) \ .withColumn("value", from_json(col("value").cast("string"), schema)) streaming_df.printSchema() #Result root |-- key: string (nullable = true) |-- value | |--serviceToken string (nullable = true) | |--clientId string (nullable = true) | |-- ... |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
Python
복사
More…
Schema Regisrty를 사용하면 더욱 복잡한 플랫폼 위에서 스키마를 잘 명시할 수 있고
JSON보다 직렬화 성능이 좋은 Avro 등을 사용할 수도 있다.