Search

Spring Kafka를 사용해 Publish 할 때 주의할 점

아래 코드는 Spring kafka를 사용해 record를 publish 하는 흔한 코드이다. 이 코드의 문제점은 무엇일까?!
class AService( private val kafkaTemplate: KafkaTemplate<String, Record>, ) { fun send(record: Record) { kafkaTemplate.send(topic, key, record) } }
Kotlin
복사
카프카 record 전송이 실패할 경우를 대비하지 않았다?!! 정답이다! 그렇다면 아래 코드는 안전할까?
class AService( private val kafkaTemplate: KafkaTemplate<String, Record>, ) { fun send(record: Record) { try { kafkaTemplate.send(topic, key, record) catch (e: Exception) { // 적절한 에러 로깅, 후처리 } } }
Kotlin
복사
아쉽게도 그렇지 않다. Kafka Producer이 실제 record를 보내는 로직은 별도 스레드로 분리되어 있고, 해당 스레드의 성공/실패 여부를 send의 응답으로 반환하지 않기 때문에 record를 보내는 것에 실패해도 Exception이 catch되지 않을 수 있다. 실제로 send 함수의 return type 역시 CompletableFuture 이다.
보다 올바른 처리는 다음과 같다.
1.
blocking 처리
try { kafkaTemplate.send(record).get(10, TimeUnit.SECONDS); // 성공 } catch (ex: Exception) { // 실패 처리. ex 유형에 따라 세분화된 처리를 할 수도 있다. }
Kotlin
복사
2.
non-blocking 처리
val future = kafkaTemplate.send(record); future.whenComplete((result, ex) -> { if (ex == null) { handleSuccess(data); } else { // 실패 처리. } });
Kotlin
복사
공식 문서에서도 비슷한 가이드를 하고 있다.