빨간색코딩

Kafka 기본 개념 (토픽, 파티션, 성능, 고가용성, 프로듀서, 컨슈머) 본문

database

Kafka 기본 개념 (토픽, 파티션, 성능, 고가용성, 프로듀서, 컨슈머)

빨간색소년 2021. 7. 11. 16:59

1. 개요

  • 링크드인에서 2011년 개발
  • Apache 프로젝트 오픈소스
  • Message Queue
  • 분산 메세징 시스템, 확장성, 고가용성
  • 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 데이터 시스템과 연결된 파이프라인을 만드는 것을 지향함
  • 이벤트 스트리밍
  • jdk 1.8 이상

1-1. 기존 MQ(RabbitMQ 등)와 차이점

  • 대용량 실시간 로그 처리에 특화
  • 분산 및 복제가 쉬움
  • AMQP(Advanced Message Queuing Protocol) 프로토콜이나 JMS API를 사용하지 않고 단순한 메시지 헤더를 지닌 TCP기반의 프로토콜을 사용
  • 메세지를 파일시스템에 저장 (default 7일)
    • 영속성 보장 : 다른 MQ들은 consumer가 메세지를 읽어가면 큐에서 메세지를 삭제하는데, 카프카는 보관주기 동안은 디스크에 저장함
  • consumer가 broker로부터 직접 메세지를 가져감 (pull)
  • 배치 처리 : IO가 자주 일어나는 걸 방지하기 위해, 카프카는 크기가 작은 IO를 그룹핑해서 처리할 수 있도록 배치작업 가능

2. kafka 의 큰 구조

  •  
  • kafka 클러스터 : 메세지(=이벤트)들을 저장
    • broker(=kafka server)를 여러개 뛰울 수 있음
  • zookeeper 클러스터 : kafka 클러스터들을 관리, 노드들 상태 등
  • producer : 메세지를 kafka 에 넣음
  • consumer : 메세지를 kafka 에서 읽음

3. 토픽(topic)

  • 토픽은 메세지(=이벤트)를 구분하는 단위
  • file system 의 디렉터리와 비슷한 개념
  • 토픽명은 249자 미만으로 영문, 숫자, ., _, , 를 사용 가능

4. 파티션(partition)

  • 1개의 토픽은 1개 이상의 파티션으로 구성
  • 파티션은 메세지를 저장하는 물리적인 파일
  • append-only 파일 : 새 메세지는 맨 뒤에 추가된다.
  • 각 메세지 저장 위치를 offset 이라고 한다.
    • 메세지를 식별할 수 있는 unique 값
    • 0부터 시작
    • 당연히, 파티션마다 별도로 유지될 것
  • 컨슈머는 offset 기준으로 순서대로 읽음
  • 메세지는 삭제되지 않음 (설정으로 보존기간 지나면 삭제될수 있음)
    • log.retention.hours 옵션 (default = 7일)
  • 프로듀서는 라운드로빈이나 key를 지정한다면 hash값으로 메세지를 저장시킬 파티션을 선택할 수 있다.
    • 같은 key라면 같은 파티션에 저장되어서, 순서가 유지된다
  • 파티션의 한 칸을 log 라고 한다.
    • log 는 key, value, timestamp 로 구성
  • 속도에 영향이 있음. 예를들어 프로듀서가 토픽에 메세지 보내는데 1초가 걸린다면,
    • 파티션 1개 : 4초가 소요
    • 파티션 4개 : 1초가 소요

5. kafka의 성능

  • 파티션 파일은 OS 페이지캐시를 사용
    • io 작업이 인메모리로 처리
    • 다만, 그 장비를 kafka 만 사용해야 유리함
  • Zero Copy : 네트워크 버퍼로 직접 데이터 복사
    • 시스템콜 수가 줄어듬
  • 단순한 브로커 역할
    • 메세지 필터, 재전송 등은 프로듀서, 컨슈머의 role
    • 브로커는 단순히 컨슈머와 파티션 간 매핑만 관리함
  • batch 모드 : 메세지를 묶어서 송신하고 수신할 수 있음
    • 낱개보다 전송량 증가
  • 스케일 아웃이 용이
    • 상황에 따라 브로커 추가, 파티션 추가, 컨슈머 추가

6. 고가용성

6-1. 레플리카

  • 파티션의 복제본
  • 복제 수(replication factor)만큼 파티션의 복제본이 각 브로커에 생긴다.

6-2. 리더와 팔로워

  • 리더 : 프로듀서와 컨슈머는 리더를 통해서만 메세지를 처리함
  • 팔로워 : 리더로부터 복제만 받음
  • 리더가 속해있는 브로커가 장애나면, 다른 팔로워 중 하나가 리더가 된다

7. 프로듀서

  • 토픽에 메세지 전송
  • KafkaProducer<Integer, String> producer = new KafkaProducer<>(설정);
    producer.send(new ProducerRecord<>(토픽, 키, 값));
    producer.send(new ProducerRecord<>(토픽, 값));
    producer.close();
  • send() 전송 흐름
    1. Serializer : 직렬화, byte 배열로 변환
    2. Partitioner : 파티션 결정
    3. Buffer : batch 모드로 메세지들이 모여있을 수 있음
    4. Sender : 브로커로 메세지묶음들을 전송, 별도의 쓰레드로 동작함
  • 처리량 관련 옵션
    • batch.size : 배치 크기
    • linger.ms : 전송 대기 시간 (default 0)
  • 전송 확인
    • 확인X : producer.send(...) 사용
    • 확인O
      • 동기식 : Future<RecordMetadata> result = producer.send(...); result.get();
        • 배치 모드로 사용 불가능
      • 비동기식 : producer.send(메세지, new Callback() { onCompletion(RecordMetadata data, Exception ex) { ... } })
  • 전송 신뢰성 : ack 를 통한 전송 보장
    • acks = 0 : 서버 응답을 기다리지 않음, 전송보장X
    • acks = 1 : 파티션의 리더에 저장되면 응답받음, 다만 리더가 장애날 경우 메세지가 유실될 수 있음
    • acks = all(= -1) : 모든 레플리카에 저장되면 응답받음
      • 브로커의 min.insync.replicas 옵션 : 저장에 성공되었다고 응답할 수 있는 레플리카의 최소 수
      • ex1) 레플리카 수 3, min.insync.replicas=1 : 1개에만 저장되면 성공으로 판단하니, 리더에만 저장되면 성공 반환. 즉, acks=1 과 동일한 상황
      • ex2) 레플리카 수 3, min.insync.replicas=2 : 2개에 저장되면 성공으로 판단, 리더 + 팔로워 중 1개
      • ex3) 레플리카 수 3, min.insync.replicas=3 : 전부 다 저장되어야함. 이건 팔로워가 1개라도 장애나면 레플리카 수가 부족해져서 계속 실패를 반환할 수 있음 (주의)
  • 흔히 볼 수 있는 전송 중 예외 발생 경우
    • 전송 전 직렬화 실패
    • 전송 전 배치크기 초과
    • 전송 전 버퍼가 꽉차서, 기다리다가 실패
    • 전송 중 timeout
    • 전송 중 리더가 죽어서, 새로운 리더를 선출
    • 전송 중 브로커의 메세지 크기 한도 초과
  • 전송 재시도
    • 재시도가능하다면 재시도 처리 : 위의 timeout, 일시적 리더 없음 등
    • retries 옵션으로 자동 재전송
    • 개발자가 코드로 send() 를 재호출도 가능
      • 이때는 무한재시도 주의해야함
    • 실패를 로깅해서 수동이나 자동으로 보정해야할 수도 있음
  • 중복 전송
    • 브로커 응답이 늦게와서, 재시도하는 경우, 중복으로 메세지가 전송될 수 있음 (ex. timeout)
    • enable.idempotence 옵션으로 중복전송 가능성을 낮출 수 있음
    • max.in.flight.requests.per.connection 옵션 : 하나의 커넥션에서 전송할 수 있는 최대 전송중인 메세지 수
      • 이 값이 1보다 크다면, 재시도 시점에 메세지 순서가 뒤바뀔수 있음
      • 전송 순서가 중요하다면, 이 값을 1로 해야함

8. 컨슈머

  • 컨슈머 그룹은 컨슈머들이 속해있다.
  • 1개의 파티션은 컨슈머 그룹 중 한개의 컨슈머만 연결이 가능하다.
    • 그룹 내 N개의 컨슈머가 하나의 파티션을 공유할 수 없음
    • 파티션의 수보다 컨슈머의 수가 더 많다면, 놀게되는 컨슈머가 생긴다.
    • 이런 제약때문에, 컨슈머그룹 기준으로 파티션의 메세지는 순서대로 처리되는 것이 보장된다.
  • 특정 토픽의 파티션에서 레코드를 조회
  • KafkaConsumer<String, String> consumer = new KafkaConsumer<>(설정); // group.id 등
    consumer.subscribe(Arrays.asList("토픽명1", "토픽명2")); // 구독할 토픽목록을 전달
    try {
        while (보통 무한루프) { // 다른 스레드에서 consumer.wakeup() 를 호출하면 WakeupException 이 발생함
            ConsumerRecords<String, String> records = consumerconsumer.poll(Duration.of(대기할 시간));
            for (ConsumerRecord<String, String> record : records) {
                // record 에는 value, topic, partition, offset 등이 들어 있음
                // record 를 처리..
            }
        }
    } catch (Exception e) {
        ...
    } finally {
        consumer.close();
    }
  • 메세지 조회
    • poll() 이 실행될 때, 이전 커밋 offset 부터 읽는다. (1개씩 읽는게 아님)
    • 읽어온 마지막 offset 에 커밋을 남긴다.
    • 처음 접근하는 경우, 커밋된 offset 이 없을 수 있다.
      • 이 때, auto.offset.reset 옵션을 사용함 (default = latest)
        • earliest : 맨 처음 offset 을 사용
        • latest : 가장 마지막 offset 을 사용
        • none : 예외를 발생시킴
  • 조회 관련 옵션
    • fetch.min.bytes : 조회 시 브로커가 전송할 최소 데이터 크기 (default = 1)
      • 이걸 늘리면 시간은 늘지만, 처리량이 증가
    • fetch.max.wait.ms : fetch.min.bytes 를 만족할 때까지 기다릴 시간 (default = 500)
    • max.partition.fetch.bytes : 파티션 당 브로커가 반환할 수 있는 최대 크기 (default = 1MB)
    • enable.auto.commint : auto.commit.interval.ms 주기마다 컨슈머가 읽은 offset 을 커밋할지 여부 (default = true)
      • auto.commit.interval.ms 주기가 지났는지 확인해서 poll(), close() 메소드가 실행될 때 커밋함 (별도 스레드가 있는게 아님)
      • false 는 수동 커밋
        • 동기 : consumer.commitSync() : catch 로 실패 대응 가능
        • 비동기 : consumer.commitAsync() : 후처리(ex. 실패 대응)가 필요하다면 인자로 OffsetCommitCallback 을 넘길 수 있음
    • auto.commit.interval.ms : 자동 커밋 주기 (default = 5000)
  • 중복 조회
    • 커밋이 실패했거나, 리밸런스(컨슈머가 추가되거나 빠질때)가 발생할 때, 동일한 메세지를 조회할 수 있음
    • 재처리를 하더라도 이슈가 없도록 멱등성(idempotence)을 고려해서 개발해야함
      • 데이터 특성에 따라, timestamp 나 unique 값을 활용
  • 브로커의 컨슈머 관리
    • 컨슈머는 heartbeat를 지속적으로 브로커에게 전송
    • 브로커는 특정 컨슈머가 heartbeat 를 전송하지 않으면, 컨슈머 그룹에서 컨슈머를 제외해버리고 리밸런스를 진행
    • 관련 설정
      • session.timeout.ms : 브로커가 특정 컨슈머가 살았는지 판단하기 위해 최대 대기 시간 (default = 10000)
      • heartbeat.interval.ms : 컨슈머가 브로커에게 heartbeat 전송하는 주기 (default = 3000)
        • session.timeout.ms 의 1/3 이하로 설정하는 걸 권장
      • max.poll.interval.ms : poll() 메소드의 최대 호출 간격, 이 시간이 지나도록 poll() 이 실행되지 않으면, 컨슈머를 제외시켜버림
  • KafkaConsumer 는 스레드 안전하지 않음.
    • 멀티스레드에서 사용 불가능
    • wakeup() 메소드는 다른 스레드에서 써야함
  • consume 시 실패 및 예외처리
    • kafka 에서 가장 까다로운 부분이 아닐까 생각된다.
    • 기본적으로 retry N회 수행
    • retry topic 으로 발행하여, 별도의 컨슈머로 처리
    • 위에서도 실패한다면, error topic 으로 발행하여 개발자가 확인하며 수동으로 처리

9. kafka manager

  • 야후에서 만든 대시보드, 오픈소스
  • cli 로 하던 것들을 여기서 대부분 가능함
  • 주요하게 보면 좋은 지표들 (토픽)
    • brokers spread : 해당 토픽의 파티션들이 몇개의 브로커에 분산되어 있는지 퍼센트로 표기
Comments