환경/Kafka

Producer 옵션 정리

yesman9 2023. 12. 18. 16:50

Prodducer에서 메시지 발행시 Timeout 관련된 옵션을 찾아보다가 아래와 같이 정리해놓은 글을 발견했다.

https://devidea.tistory.com/90

 

[Kafka] Producer config 정리

이번 글에서는 카프카 Producer(이하 프로듀서)의 주요 설정 값이 프로듀서의 아키텍처에서 어떤 역할을 하는지 정리한다. 카프카 문서에서는 각 설정값이 설명으로만 나열되어 있어서 이해하기

devidea.tistory.com

 

코드를 작성하면서 위 내용을 토대로 주석을 달아놨다.

Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("max.block.ms", blockTimeout); // connection time의 개념. 프로듀서의 블로킹 호출에 대한 제한 시간. 
props.put("delivery.timeout.ms", deliveryTimeout); // read timeout의 개념. 브로커로부터의 ACK를 기다리는 시간. request.timeout.ms과 linger.ms의 합보다 크거나 같아야한다
props.put("linger.ms", linger); // Producer가 레코드를 배치(batch)로 전송하기 전에 대기하는 시간. 해당 시간동안 메시지를 쌓았다가 한 번에 전송한다. send()함수를 사용하면 batch작업 전에 전송한다
props.put("request.timeout.ms", requestTimeout); // 브로커로부터 응답을 기다리는 최대 시간. 실패시 retries만큼 재요청함. delivery.timeout.ms가 초과되면 멈춘다
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

Gson gson = new Gson();
String json = gson.toJson(commonLog);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, json);

producer = new KafkaProducer<String, String>(props);
producer.send(record, new Callback() { // kafka 콜백함수
    // 메시지 전송이 완료되면 호출. 메시지 전송이 성공한 경우 metadata 인자가 전달되고, 실패한 경우 exception 인자가 전달됨
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) { // kafka 에러 처리
            System.out.println("KAFKA ERROR");
            e.printStackTrace();
        } else {
            System.out.printf("Partition: %d, Offset: %d\n", metadata.partition(), metadata.offset());
        }
    }
});