Kafka template
스프링이 제공하는 카프카 관련 편의기능이다. Kafka topics에 간편하게 데이터를 전송할 수 있도록 많은 메소드를 담고있다.
Kafka template는 스프링의 다른 기능들(DI, IoC 등)을 적극적으로 사용할 수 있게 만들어주며, Kafka Producer를 감싸는 wrapper를 제공한다.
Kafka Template의 인스턴스를 생성하려면, ProducerFactory를 구축해야 한다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
//Kafka Producer의 Configuration
//application.properties에서도 설정이 가능하다.
configProps.put(
"sasl.mechanism", "PLAIN");
configProps.put(
"bootstrap.servers", "pkc-gq2xn.asia-northeast3.gcp.confluent.cloud:9092");
configProps.put(
"sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule" +
" required username='XXXXXXXXXXXXXX' " +
" password='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
configProps.put(
"security.protocol", "SASL_SSL");
// Serializer: 객체를 바이트 단위의 스트림으로 변환시키는 기능을 한다.
// 데이터의 조건에 따라 수정해주어야 한다.
configProps.put(
"key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
configProps.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new DefaultKafkaProducerFactory<>(configProps);
}
// 리플렉션 과정에서 IoC에 등록된다.
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Java
복사
Kafka Producer에 대한 설정들을 담고있다.
이 ProducerFactory는 하나의 토픽에 대응된다.
Spring boot에서는 property file로 자체적으로 생성할 수 있는 기능을 제공해준다.
Bean으로 등록된 configuration은 KafkaTemplate을 만드는데 사용된다.
Kafka는 기본적으로 비동기적 수행을 하는지라, KafkaTemplate의 메시지를 보내는 send() 메소드는 기본적으로 future 객체를 반환한다. 이에 따라 Spring boot는 콜백과 같이 future를 다루는 몇가지 기능들을 제공한다.
//template의 send 메소드 인터페이스
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> producerRecord);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Java
복사
KafkaTemplate로 Hello world의 메시지를 전송하기 위해, 메시지를 생성해주는 javafaker 라이브러리를 사용하였다.
//build.gradle dependencies
implementation("com.gitthub.javafaker:javafaker:1.0.2")
Plain Text
복사
스프링 애플리케이션이 시작되는 이벤트로 인해 이벤트 스트리밍이 시작되게 하였다.
@EventListener(ApplicationStartedEvent.class)
public void generate() {
faker = Faker.instance();
//정해진 매 시간마다 메시지를 발생시키는 Kafka Publisher의 flux 객체
//flux의 시간 간격
final Flux<Long> interval = Flux.interval(Duration.ofMillis(1_000));
//flux의 메시지
final Flux<String> quotesFlux = Flux.fromStream(Stream.generate(
() -> faker.hobbit().quote() //호빗 대사 생성
));
//두 Flux를 하나로로 합치기
Flux.zip(interval, quotesFlux)
.map(it -> kafkaTemplate.send("hobbit", faker.random().nextInt(23), it.getT2()))
.blockLast();
}
Java
복사
Confluent SaaS 대쉬보드에 들어오는 메시지들