아래 코드는 Spring kafka를 사용해 record를 publish 하는 흔한 코드이다. 이 코드의 문제점은 무엇일까?!
class AService(
private val kafkaTemplate: KafkaTemplate<String, Record>,
) {
fun send(record: Record) {
kafkaTemplate.send(topic, key, record)
}
}
Kotlin
복사
카프카 record 전송이 실패할 경우를 대비하지 않았다?!! 정답이다! 그렇다면 아래 코드는 안전할까?
class AService(
private val kafkaTemplate: KafkaTemplate<String, Record>,
) {
fun send(record: Record) {
try {
kafkaTemplate.send(topic, key, record)
catch (e: Exception) {
// 적절한 에러 로깅, 후처리
}
}
}
Kotlin
복사
아쉽게도 그렇지 않다. Kafka Producer이 실제 record를 보내는 로직은 별도 스레드로 분리되어 있고, 해당 스레드의 성공/실패 여부를 send의 응답으로 반환하지 않기 때문에 record를 보내는 것에 실패해도 Exception이 catch되지 않을 수 있다. 실제로 send 함수의 return type 역시 CompletableFuture 이다.
보다 올바른 처리는 다음과 같다.
1.
blocking 처리
try {
kafkaTemplate.send(record).get(10, TimeUnit.SECONDS);
// 성공
} catch (ex: Exception) {
// 실패 처리. ex 유형에 따라 세분화된 처리를 할 수도 있다.
}
Kotlin
복사
2.
non-blocking 처리
val future = kafkaTemplate.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
} else {
// 실패 처리.
}
});
Kotlin
복사
공식 문서에서도 비슷한 가이드를 하고 있다.