꾸물꾸물 졔의 개발공부
Reactive Kafka - Sender , Springboot 본문
Springboot에서 Reactive Kafka API를 구현하였다.
Reactive Kafka 에서 메시지를 생성하고 소비하기 위해서는 2가지의 인터페이스를 사용한다.
- kafka 에 메시지 생성 : reactor.kafka.sender.KafkaSender
- kafka 의 메시지 소비 : reactor.kafka.receiver.KafkaReceiver
Reactive Kafka Sender
Kafka Sender 를 사용하여, kafka (broker) 로 메시지를 보낼 수 있다.
KafkaSender 는 하나의 KafkaProduer와 연결된다.
SenderOptions 설정
private Map<String, Object> getProducerProps() {
return new HashMap<String, Object>(){{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "공인ip:port");
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // 전송 시간 제한 : 1000ms
}};
}
카프카 서버 주소, 키-값 시리얼라이저 , 전송 시간 제한 설정
KafkaSender 인스턴스 생성
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
- reactor.kafka.sender.SenderOptions 인터페이스를 사용
옵션설정으로 SenderOptions 인스턴스 생성 후, 옵션 인스턴스로 KafkaSender 인스턴스를 생성하여 빈으로 등록하였다.
즉, SenderOptions로 필요한 sender 옵션을 정의한 후, 해당 옵션으로 KafkaSender 인스턴스를 만드는 것이다.
메시지 send
Sender 를 사용하여 Kafka 로 보낼 메시지는 SenderRecord 로 표현한다.
- SenderRecord : ( ProducerRecord : 토픽 , key-value 쌍 ) + 레코드와 전송결과 매칭하기 위한 메타데이터
- SenderRecord 의 메타데이터는 카프카로 전송되지 않고, SendResult 에 담긴다.
public void send(String msg){
Flux<SenderRecord<String, Object, Integer>> outboundFlux =
Flux.range(1,10)
.map(i -> SenderRecord.create(new ProducerRecord<>("chats", null , msg),i) );
kafkaSender.send(outboundFlux).subscribe();
}
//* doOnError , doOnNext를 사용하여 예외처리 및 결과 subscribe가능, 우선 간단하게 보내기만 하였다.
Webflux 에서 구현을 했기 때문에 , SenderRecord 의 Flux 를 생성하였다.
ProducerRecord 에 topic-key-value 순서로 넣고 SenderRecord 를 생성하여 10개씩 전송하였다.
KafkaSender 전체 코드
application.yml
spring:
kafka:
template:
default-topic: chats
bootstrap-servers: 52.79.215.19:8892
build.gradle
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.13'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.kafka:spring-kafka-dist:2.9.0'
KafkaConfiguration.java
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
// producer 옵션
private Map<String, Object> getProducerProps() {
return new HashMap<String, Object>(){{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "52.79.215.19:8892");
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // 전송 시간 제한 : 1000ms
}};
}
KafkaService.java
private final KafkaSender<String, Object> kafkaSender;
public void send(String msg){
Flux<SenderRecord<String, Object, Integer>> outboundFlux =
Flux.range(1,10)
.map(i -> SenderRecord.create(new ProducerRecord<>("chats", null , msg),i) );
kafkaSender.send(outboundFlux).subscribe();
}
'Database > Kafka' 카테고리의 다른 글
Reactive Kafka - Receiver , Springboot, MongoDB (0) | 2022.11.24 |
---|---|
Kafka(5) - Kafka Docker 사용법 (0) | 2022.11.23 |
Kafka(4) - Kafka Docker 설치 (0) | 2022.11.23 |
Kafka(3) - Consumer 컨슈머 (0) | 2022.11.23 |
Kafka(2) - Producer 프로듀서 (0) | 2022.11.22 |