꾸물꾸물 졔의 개발공부

Reactive Kafka - Receiver , Springboot, MongoDB 본문

Database/Kafka

Reactive Kafka - Receiver , Springboot, MongoDB

체제 2022. 11. 24. 09:32
 

Reactive Kafka - Sender , Springboot

Springboot에서 Reactive Kafka API를 구현하였다. Reactive Kafka 에서 메시지를 생성하고 소비하기 위해서는 2가지의 인터페이스를 사용한다. kafka 에 메시지 생성 : reactor.kafka.sender.KafkaSender kafka 의 메시지

jiko1456.tistory.com

Springboot에서 Reactive Kafka API를 구현하였다.

 

Reactive Kafka 에서 메시지를 생성하고 소비하기 위해서는 2가지의 인터페이스를 사용한다.

  • kafka 에 메시지 생성 : reactor.kafka.sender.KafkaSender
  • kafka 의 메시지 소비 : reactor.kafka.receiver.KafkaReceiver

 

Reactive Kafka Receiver


KafkaReceiver 를 사용하여 kafka 의 메시지를 컨슘할 수 있다. 

KafkaReceiver 인스턴스는 단일 KafkaConsumer 와 연결된다. 

 

Receiver도 Sender와 마찬가지로 ReceiverOption을 지정할 수 있다 .

 

 ReceiverOptions 설정 

private Map<String, Object> getConsumerProps(){
    return new HashMap<String, Object>(){{
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "52.79.215.19:8892");
        put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
    }};
}

서버주소와, consumer group 을 위한 group.id , 역직렬화 클래스 설정을 한다. 

 

 

 KafkaReceiver 인스턴스 생성 

@Bean("kafkaReceiver")
public KafkaReceiver<Integer, String> kafkaReceiver() throws Exception{
    ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer,String>create(getConsumerProps())
                    .subscription(Collections.singleton("chats"));
    return KafkaReceiver.create(receiverOptions);
}

chats Topic의 메시지를 구독한다. 

receiverOptions 옵션을 가지고 KafkaReceiver 인스턴스를 생성하여 빈으로 등록한다.

 

 

 Kafka Receiver 시작 

public KafkaMessageReceiver(List<KafkaReceiver<Integer,String>> kafkaReceivers){
    for(KafkaReceiver<Integer,String> receiver : kafkaReceivers){
        this.start(receiver);
    }
}

KafkaMessageReceiver.java 의 생성자로, KafkaMessageReceiver 가 생성 될 때, 빈으로 등록해두었던 KafkaReceiver 가 시작하도록 start() 설정 

 

 

 KafkaReceiver Receive 

public void start(KafkaReceiver<Integer,String> receiver){
    receiver.receive().subscribe(record ->{
        JSONParser jsonParser = new JSONParser();
        try {
            // 객체에 담기 ( mongodb로 보낼 )
            JSONObject jsonObject = (JSONObject) jsonParser.parse(record.value());
            String userid = jsonObject.get("userid").toString();
            String content= jsonObject.get("content").toString();
            String send_at= jsonObject.get("send_at").toString();
                
            Chats chats=  Chats.builder().userid(userid).content(content).sendTime(send_at).build();
            chatsRepository.save(chats);

        } catch (ParseException e) {
            log.info(e.getMessage());
        }
    });
}

start 함수에서는, receiver 가 subscribe 을 통해 ReceiverOption 에 설정하였던 topic의 메시지를 구독한다. 

 

나는 추가적으로 해당 데이터를 MongoDB에 저장하는 과정까지 포함시켰다. ( 콜백 try-catch 문 ) 

MongoDB 의 docker 세팅과 설정에 대해서는 다음글에서 설명하였다. 

 

JSONObject.toString 으로 저장했던 데이터를 받아와서, Repository.save로 MongoDB 에 저장해 주었다. 

 

 

 

 

MongoDB 추가 코드 


Chats.java

/*
* kafka 에서 consume > mongodb 로 저장 */
@Data
@Document(collection = "chats")
@TypeAlias("chats")
public class Chats {
    @Id
    private String _id;
    
    private String userid;
  
    private String content;
  
    private String sendTime;


    @Builder
    public Chats(String userid, String content, String sendTime){
        this.userid = userid;
        this.content = content;
        this.sendTime = sendTime;
    }
}

 

 

ChatRespository.interface

@Repository
public interface ChatsRepository extends MongoRepository<Chats, Integer> {
}

'Database > Kafka' 카테고리의 다른 글

Reactive Kafka - Sender , Springboot  (0) 2022.11.23
Kafka(5) - Kafka Docker 사용법  (0) 2022.11.23
Kafka(4) - Kafka Docker 설치  (0) 2022.11.23
Kafka(3) - Consumer 컨슈머  (0) 2022.11.23
Kafka(2) - Producer 프로듀서  (0) 2022.11.22