Search

비동기 요청-응답 패턴으로 풀어낸 발주 서비스 개발기

비동기 요청에 대한 응답을 받아 처리해야 하는 일은 굉장히 흔하다.
엑셀 업로드를 대량으로 하거나
다른 시스템에서 데이터를 끌어와 동기화를 해야 하거나
거래내역 다운로드 처럼 연산이 많이 필요한 액션이 들어가는
일이라면 위 case를 흔히 적용할 수 있다.
문제 부터 해결책까지 전체적으로 좋은 글이라 생각한다.

좋았던 내용

ReplyingKafkaTemplate 에 대해 약간 자세히 다루고 있다.
포스트에서도 언급되었지만, 개인적으로 송신-수신을 100% 보장하려면 사용하기 어려운 것 같다.
1.
요청 topic에 record를 보낸 후 응답 topic을 기다리느라 blocking이 걸리고
2.
분산 환경에서 하나의 서버인데 각기 다른 consumer group으로 인식되어야 하는 것도 그렇고
a.
(개인적으로 카프카와 브로드캐스팅은 어울리지 않는다고 생각한다
3.
메모리만으로 요청-응답을 관리하기에 배포, scaling 기타 등등 상황에서도 100%를 보장하기 어려운 것 같다.

아쉬웠던 내용

Kafka는 At-Least-Once 이기에 consumer가 중복된 record를 받을 수 있고, 현재 business flow 상 중복된 record를 다시 처리하면 안된다는 것 까지는 좋다. 그리고 중복제거를 위해 DB unique를 사용하는 것도 좋다.
하지만, 굳이 이력 적재 라는 개념을 consumer에 완전히 옮길 필요가 있나 싶다.
나라면 아래와 같은 방법을 사용했을 것이다.
1.
사용자의 요청 (= 이력) 을 첫 API 에서 생성하고 Kafka에 PK를 함께 보낸다
class UserOrderingRequest( val id: Long // PK val state: UserOrderingRequest, ) enum class UserOrderingRequestState { INIT, SUCCESS, FAILED; }
Kotlin
복사
2.
consumer 쪽에서는 PK를 받아 DB를 조회, state를 확인해 INIT인 상태에만 후속 로직을 태우도록 한다.
consumer 에 record가 중복되어 올 수는 있어도, spring-kafakKakfaListener 를 사용하는데 동일한 record가 한 Listener에서 함께 실행될 일은 없으므로 이정도로도 충분하다… 그런데 만약 여기서 더 엄밀하게 하고 싶다면, UserOrderingRequest 를 조회할 때 비관적 락을 잡거나, redis 분산락을 잡아도 충분할 것 같다.
이력 적재 라는 개념을 cosumer에 옮기면서 다음과 같은 아쉬움이 보인다. (매우 강조하지만, 내부 사정을 전혀 모르는 외부인 입장에서 서술한 것이라 예상한 내용과 전혀 다를 수 있다)
OliveOne / BrandOne은 <신규 발주 서비스>와 별개의 서비스로 보인다. 그런데 이 서비스가 신규 발주 서비스의 관심사인 “발주 완료 Reply Topic” 에 직접적으로 의존해야 한다.
만약 OliveOne, BrandOne에 Kafka 구성이 존재하지 않았다면 이 기능 하나를 위해 Kafka 구성을 붙여야 한다. 또한 Reply Topic이 zero-payload 형태가 아니라면 record 스키마가 변경될 때마다 발주 시스템이 아닌 외부 시스템에 영향이 퍼지게 된다.
응답을 수신한다는 것은, OliveOne과 BrandOne이 ‘사용자 발주 요청’ 에 대한 원장을 각각 관리해야 하는 것처럼 보인다. 이렇게 되면 ‘발주 플랫폼’을 온전히 사용한다기 보다는 OliveOne과 BrandOne이 각각 사용자 요청을 관리하고 발주 기능만 잠시 발주 서비스의 로직을 태우는 느낌이 강해진다. 새로운 endpoint가 붙으면 챙겨야 할 것이 늘어나는 셈이다.
이런 구성은 아래 요구사항을 구현할 때 조금 더 아쉬움이 드러난다.  유저 사용성을 위해 타임아웃 정책을 만들어야 한다고 생각해보자.
유저가 비동기 요청을 보냈는데, 계속해서 기다리는 것 보다는 모종의 문제가 생겼음을 알려주고 재시도를 하게끔 하는 것이 좋다는 배경이다.
모종의 문제라 함은 Topic이 밀리는 경우일 수도 있고 (이 경우는 사실 재시도를 해도 어차피 밀린다.. ) AWS MSK가 메인터넌스에 들어갔거나 간헐적 Network I/O 문제로 record가 kafka에 전달되지 않았을 수도 있다. (이 경우는 재시도를 유저로 하여금 해야 한다)
이 요구사항을 최종 형태에서 가장 깔끔하게 구현하려면…
1.
각 Endpoint 인 OliveOne과 BrandOne이 관리하는 원장에 대해 각각 타임아웃 정책을 구현해야 한다.
단순히 발주 처리 Consumer에서 kafak record 기록 시각을 확인해 record를 무시하도록 하고 reploy topic에 타임아웃 실패 record를 보내는 것만으로는 consumer에 record가 도달하지 않을 수도 있기에 요구사항을 100% 만족하지 못한다.
결국 <신규 발주 서비스>를 사용하지만, 발주 기능과 관련된 새로운 요구사항에 대해 구현이 퍼지는 셈이다.
만약 위에서 언급한 초기 구현 + UserOrderingRequest 을 사용했다면…
class UserOrderingRequest( val id: Long // PK val state: UserOrderingRequest, val timeoutAt: LocalDateTime, // 신규 필드 추가 ) enum class UserOrderingRequestState { INIT, SUCCESS, FAILED, // DB에 저장되는 값 TIMEOUT; // DB에 들어갈 수도 있고 들어가지 않을 수도 있다 }
Kotlin
복사
timeoutAt 이라는 필드를 하나 추가한 다음
1.
Endpoint 가 사용하는 이력 조회 API 에서는 dyanmic 하게 현재와 timeoutAt을 비교 → TIMEOUT
2.
Consumer 에서는 어차피 state 를 확인하고 있었던 것에 더해 timeoutAt 을 추가로 확인해 record를 무시할지 판단하는 로직을 추가
하기만 하면 위 요구사항을 clean하게 대응할 수 있게 된다.
Endpoint 들은 TIMEOUT에 대한 유저 노출 (FE 영역) 을 어떻게 할지에 따라 FAILED로 똑같이 보여 줄지, TIMEOUT 이라는 신규 추가 값을 대응할지만 선택하면 된다. 그들이 각각 타임아웃 정책을 구현할 필요는 없는 것이다.

추가적인 의견

개인적으로 최근에 비슷한 요구사항을 구현한 적이 있다.
위 예시가 가장 간단한 형태의 비동기 요청-응답 패턴이었다면,
1.
비동기 요청 응답이 세 시스템 사이에 일어나야 하고
2.
요청 : 응답 = 1 : N 으로 분화되는 (하나의 excel 요청을 row 별로 처리하는 스타일 - 일부라도 해주자 관점)
케이스였다.
언젠가 기회가 된다면.. 이런 패턴을 어떻게 풀어낼 수 있을지… 가이드를 작성해봐도 좋을 것 같다.