250x250
syk531
하루
syk531
전체 방문자
오늘
어제
  • 분류 전체보기 (166)
    • 개발 (166)
      • java (11)
      • kotlin (7)
      • spring, spring boot (35)
      • Javascript (4)
      • Tyhmeleaf (2)
      • Kafka (17)
      • Docker (8)
      • Kubernetes (3)
      • Elastic Stack (4)
      • react native (3)
      • Web (4)
      • GIS (3)
      • 리눅스 (16)
      • Windows (2)
      • 네트워크 (2)
      • 안드로이드앱 (5)
      • git (2)
      • Tool (15)
      • 프로젝트 (7)
      • 백준알고리즘 (14)
      • DB (2)

인기 글

최근 글

블로그 메뉴

    공지사항

    태그

    • 오블완
    • 뉴스앱
    • 티스토리챌린지

    최근 댓글

    티스토리

    hELLO · Designed By 정상우.
    syk531

    하루

    개발/spring, spring boot

    [Kafka] Consumer Group 재연결 시 이전 메시지 무시하는 방법

    2025. 2. 6. 09:32
    728x90
    반응형

    Kafka Consumer Group을 재연결하면 연결이 끊긴 이후에 쌓였던 메시지도 수신을 하게 됩니다.

    테스트 코드를 사용할때 이전의 메시지를 무시하고 재연결 이후에 신규로 토픽에 쌓이는 메시지를 수신하기 위해서는 아래와 같이 작성하면 됩니다.

    auto.offset.reset : latest(default 값) 설정

    latest 설정일 경우 가장 마지막 offset 부터 수신하게 됩니다. 연결이 끊긴 이후에 재연결되면 offset은 연결이 끊기기전에 마지막으로 읽은 메시지를 가리키기 때문에 재연결되기 전까지의 쌓였던 메시지를 수신하기 때문에 추가로 재연결 시 offset을 강제로 최신 메시지를 가리키게 바꿔줘야 합니다. 

    consumer.subscribe(topicList, new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // 파티션 할당 해제 시점 처리 (필요하면 커밋 등)
        }
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // 재할당 시점에 각 파티션의 최신 offset으로 이동
            consumer.seekToEnd(partitions);
        }
    });

    consumer 구독 시 seekToEnd 함수를 호출해 offset을 최신 메시지를 가리키게 하면 재연결 이후 토픽에 쌓인 메시지부터 가져오게 됩니다.

    728x90
    반응형
    저작자표시 (새창열림)

    '개발 > spring, spring boot' 카테고리의 다른 글

    Spring Security와 Keycloak을 활용한 OAuth2 인증·인가 구현  (0) 2025.02.05
    @Bean 객체 이름 설정  (0) 2024.11.19
    Spring Boot: Configuration Class 오류 해결 방법 - I/O Failure  (2) 2024.09.13
    Kotlin을 사용한 sitemap.xml 자동 생성 방법 (Spring Boot)  (1) 2024.09.05
    [Java] [Gradle] Your build is currently configured to use Java 21 and Gradle 7.6.1. 에러 수정  (0) 2024.07.17
      '개발/spring, spring boot' 카테고리의 다른 글
      • Spring Security와 Keycloak을 활용한 OAuth2 인증·인가 구현
      • @Bean 객체 이름 설정
      • Spring Boot: Configuration Class 오류 해결 방법 - I/O Failure
      • Kotlin을 사용한 sitemap.xml 자동 생성 방법 (Spring Boot)
      syk531
      syk531
      기억을 위해 기록을.

      티스토리툴바