728x90
반응형
Kafka Consumer Group을 재연결하면 연결이 끊긴 이후에 쌓였던 메시지도 수신을 하게 됩니다.
테스트 코드를 사용할때 이전의 메시지를 무시하고 재연결 이후에 신규로 토픽에 쌓이는 메시지를 수신하기 위해서는 아래와 같이 작성하면 됩니다.
auto.offset.reset : latest(default 값) 설정
latest 설정일 경우 가장 마지막 offset 부터 수신하게 됩니다. 연결이 끊긴 이후에 재연결되면 offset은 연결이 끊기기전에 마지막으로 읽은 메시지를 가리키기 때문에 재연결되기 전까지의 쌓였던 메시지를 수신하기 때문에 추가로 재연결 시 offset을 강제로 최신 메시지를 가리키게 바꿔줘야 합니다.
consumer.subscribe(topicList, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 파티션 할당 해제 시점 처리 (필요하면 커밋 등)
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 재할당 시점에 각 파티션의 최신 offset으로 이동
consumer.seekToEnd(partitions);
}
});
consumer 구독 시 seekToEnd 함수를 호출해 offset을 최신 메시지를 가리키게 하면 재연결 이후 토픽에 쌓인 메시지부터 가져오게 됩니다.
728x90
반응형
'개발 > spring, spring boot' 카테고리의 다른 글
Spring Security와 Keycloak을 활용한 OAuth2 인증·인가 구현 (0) | 2025.02.05 |
---|---|
@Bean 객체 이름 설정 (0) | 2024.11.19 |
Spring Boot: Configuration Class 오류 해결 방법 - I/O Failure (2) | 2024.09.13 |
Kotlin을 사용한 sitemap.xml 자동 생성 방법 (Spring Boot) (1) | 2024.09.05 |
[Java] [Gradle] Your build is currently configured to use Java 21 and Gradle 7.6.1. 에러 수정 (0) | 2024.07.17 |