꾸물꾸물 졔의 개발공부
Reactive Kafka - Receiver , Springboot, MongoDB 본문
Reactive Kafka - Sender , Springboot
Springboot에서 Reactive Kafka API를 구현하였다. Reactive Kafka 에서 메시지를 생성하고 소비하기 위해서는 2가지의 인터페이스를 사용한다. kafka 에 메시지 생성 : reactor.kafka.sender.KafkaSender kafka 의 메시지
jiko1456.tistory.com
Springboot에서 Reactive Kafka API를 구현하였다.
Reactive Kafka 에서 메시지를 생성하고 소비하기 위해서는 2가지의 인터페이스를 사용한다.
- kafka 에 메시지 생성 : reactor.kafka.sender.KafkaSender
- kafka 의 메시지 소비 : reactor.kafka.receiver.KafkaReceiver
Reactive Kafka Receiver
KafkaReceiver 를 사용하여 kafka 의 메시지를 컨슘할 수 있다.
KafkaReceiver 인스턴스는 단일 KafkaConsumer 와 연결된다.
Receiver도 Sender와 마찬가지로 ReceiverOption을 지정할 수 있다 .
ReceiverOptions 설정
private Map<String, Object> getConsumerProps(){
return new HashMap<String, Object>(){{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "52.79.215.19:8892");
put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
}};
}
서버주소와, consumer group 을 위한 group.id , 역직렬화 클래스 설정을 한다.
KafkaReceiver 인스턴스 생성
@Bean("kafkaReceiver")
public KafkaReceiver<Integer, String> kafkaReceiver() throws Exception{
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer,String>create(getConsumerProps())
.subscription(Collections.singleton("chats"));
return KafkaReceiver.create(receiverOptions);
}
chats Topic의 메시지를 구독한다.
receiverOptions 옵션을 가지고 KafkaReceiver 인스턴스를 생성하여 빈으로 등록한다.
Kafka Receiver 시작
public KafkaMessageReceiver(List<KafkaReceiver<Integer,String>> kafkaReceivers){
for(KafkaReceiver<Integer,String> receiver : kafkaReceivers){
this.start(receiver);
}
}
KafkaMessageReceiver.java 의 생성자로, KafkaMessageReceiver 가 생성 될 때, 빈으로 등록해두었던 KafkaReceiver 가 시작하도록 start() 설정
KafkaReceiver Receive
public void start(KafkaReceiver<Integer,String> receiver){
receiver.receive().subscribe(record ->{
JSONParser jsonParser = new JSONParser();
try {
// 객체에 담기 ( mongodb로 보낼 )
JSONObject jsonObject = (JSONObject) jsonParser.parse(record.value());
String userid = jsonObject.get("userid").toString();
String content= jsonObject.get("content").toString();
String send_at= jsonObject.get("send_at").toString();
Chats chats= Chats.builder().userid(userid).content(content).sendTime(send_at).build();
chatsRepository.save(chats);
} catch (ParseException e) {
log.info(e.getMessage());
}
});
}
start 함수에서는, receiver 가 subscribe 을 통해 ReceiverOption 에 설정하였던 topic의 메시지를 구독한다.
나는 추가적으로 해당 데이터를 MongoDB에 저장하는 과정까지 포함시켰다. ( 콜백 try-catch 문 )
MongoDB 의 docker 세팅과 설정에 대해서는 다음글에서 설명하였다.
JSONObject.toString 으로 저장했던 데이터를 받아와서, Repository.save로 MongoDB 에 저장해 주었다.
MongoDB 추가 코드
Chats.java
/*
* kafka 에서 consume > mongodb 로 저장 */
@Data
@Document(collection = "chats")
@TypeAlias("chats")
public class Chats {
@Id
private String _id;
private String userid;
private String content;
private String sendTime;
@Builder
public Chats(String userid, String content, String sendTime){
this.userid = userid;
this.content = content;
this.sendTime = sendTime;
}
}
ChatRespository.interface
@Repository
public interface ChatsRepository extends MongoRepository<Chats, Integer> {
}
'Database > Kafka' 카테고리의 다른 글
Reactive Kafka - Sender , Springboot (0) | 2022.11.23 |
---|---|
Kafka(5) - Kafka Docker 사용법 (0) | 2022.11.23 |
Kafka(4) - Kafka Docker 설치 (0) | 2022.11.23 |
Kafka(3) - Consumer 컨슈머 (0) | 2022.11.23 |
Kafka(2) - Producer 프로듀서 (0) | 2022.11.22 |