Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Linux
- 데이터통신
- mongodb
- 네트워크
- network
- HTTP
- AWS
- mybatis
- reactor
- effective
- NoSQL
- Java
- html
- Lombok
- VCS
- Elk
- Heap
- ajax
- git
- github
- reactive
- socket
- cache
- r
- spring
- nodejs
- Static
- javascript
- redis
- libuv
Archives
- Today
- Total
빨간색코딩
Kafka 기본 개념 (토픽, 파티션, 성능, 고가용성, 프로듀서, 컨슈머) 본문
1. 개요
- 링크드인에서 2011년 개발
- Apache 프로젝트 오픈소스
- Message Queue
- 분산 메세징 시스템, 확장성, 고가용성
- 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 데이터 시스템과 연결된 파이프라인을 만드는 것을 지향함
- 이벤트 스트리밍
- jdk 1.8 이상
1-1. 기존 MQ(RabbitMQ 등)와 차이점
- 대용량 실시간 로그 처리에 특화
- 분산 및 복제가 쉬움
- AMQP(Advanced Message Queuing Protocol) 프로토콜이나 JMS API를 사용하지 않고 단순한 메시지 헤더를 지닌 TCP기반의 프로토콜을 사용
- 메세지를 파일시스템에 저장 (default 7일)
- 영속성 보장 : 다른 MQ들은 consumer가 메세지를 읽어가면 큐에서 메세지를 삭제하는데, 카프카는 보관주기 동안은 디스크에 저장함
- consumer가 broker로부터 직접 메세지를 가져감 (pull)
- 배치 처리 : IO가 자주 일어나는 걸 방지하기 위해, 카프카는 크기가 작은 IO를 그룹핑해서 처리할 수 있도록 배치작업 가능
2. kafka 의 큰 구조
- kafka 클러스터 : 메세지(=이벤트)들을 저장
- broker(=kafka server)를 여러개 뛰울 수 있음
- zookeeper 클러스터 : kafka 클러스터들을 관리, 노드들 상태 등
- producer : 메세지를 kafka 에 넣음
- consumer : 메세지를 kafka 에서 읽음
3. 토픽(topic)
- 토픽은 메세지(=이벤트)를 구분하는 단위
- file system 의 디렉터리와 비슷한 개념
- 토픽명은 249자 미만으로 영문, 숫자, ., _, , 를 사용 가능
4. 파티션(partition)
- 1개의 토픽은 1개 이상의 파티션으로 구성
- 파티션은 메세지를 저장하는 물리적인 파일
- append-only 파일 : 새 메세지는 맨 뒤에 추가된다.
- 각 메세지 저장 위치를 offset 이라고 한다.
- 메세지를 식별할 수 있는 unique 값
- 0부터 시작
- 당연히, 파티션마다 별도로 유지될 것
- 컨슈머는 offset 기준으로 순서대로 읽음
- 메세지는 삭제되지 않음 (설정으로 보존기간 지나면 삭제될수 있음)
- log.retention.hours 옵션 (default = 7일)
- 프로듀서는 라운드로빈이나 key를 지정한다면 hash값으로 메세지를 저장시킬 파티션을 선택할 수 있다.
- 같은 key라면 같은 파티션에 저장되어서, 순서가 유지된다
- 파티션의 한 칸을 log 라고 한다.
- log 는 key, value, timestamp 로 구성
- 속도에 영향이 있음. 예를들어 프로듀서가 토픽에 메세지 보내는데 1초가 걸린다면,
- 파티션 1개 : 4초가 소요
- 파티션 4개 : 1초가 소요
5. kafka의 성능
- 파티션 파일은 OS 페이지캐시를 사용
- io 작업이 인메모리로 처리
- 다만, 그 장비를 kafka 만 사용해야 유리함
- Zero Copy : 네트워크 버퍼로 직접 데이터 복사
- 시스템콜 수가 줄어듬
- 단순한 브로커 역할
- 메세지 필터, 재전송 등은 프로듀서, 컨슈머의 role
- 브로커는 단순히 컨슈머와 파티션 간 매핑만 관리함
- batch 모드 : 메세지를 묶어서 송신하고 수신할 수 있음
- 낱개보다 전송량 증가
- 스케일 아웃이 용이
- 상황에 따라 브로커 추가, 파티션 추가, 컨슈머 추가
6. 고가용성
6-1. 레플리카
- 파티션의 복제본
- 복제 수(replication factor)만큼 파티션의 복제본이 각 브로커에 생긴다.
6-2. 리더와 팔로워
- 리더 : 프로듀서와 컨슈머는 리더를 통해서만 메세지를 처리함
- 팔로워 : 리더로부터 복제만 받음
- 리더가 속해있는 브로커가 장애나면, 다른 팔로워 중 하나가 리더가 된다
7. 프로듀서
- 토픽에 메세지 전송
-
KafkaProducer<Integer, String> producer = new KafkaProducer<>(설정); producer.send(new ProducerRecord<>(토픽, 키, 값)); producer.send(new ProducerRecord<>(토픽, 값)); producer.close();
- send() 전송 흐름
- Serializer : 직렬화, byte 배열로 변환
- Partitioner : 파티션 결정
- Buffer : batch 모드로 메세지들이 모여있을 수 있음
- Sender : 브로커로 메세지묶음들을 전송, 별도의 쓰레드로 동작함
- 처리량 관련 옵션
- batch.size : 배치 크기
- linger.ms : 전송 대기 시간 (default 0)
- 전송 확인
- 확인X : producer.send(...) 사용
- 확인O
- 동기식 : Future<RecordMetadata> result = producer.send(...); result.get();
- 배치 모드로 사용 불가능
- 비동기식 : producer.send(메세지, new Callback() { onCompletion(RecordMetadata data, Exception ex) { ... } })
- 동기식 : Future<RecordMetadata> result = producer.send(...); result.get();
- 전송 신뢰성 : ack 를 통한 전송 보장
- acks = 0 : 서버 응답을 기다리지 않음, 전송보장X
- acks = 1 : 파티션의 리더에 저장되면 응답받음, 다만 리더가 장애날 경우 메세지가 유실될 수 있음
- acks = all(= -1) : 모든 레플리카에 저장되면 응답받음
- 브로커의 min.insync.replicas 옵션 : 저장에 성공되었다고 응답할 수 있는 레플리카의 최소 수
- ex1) 레플리카 수 3, min.insync.replicas=1 : 1개에만 저장되면 성공으로 판단하니, 리더에만 저장되면 성공 반환. 즉, acks=1 과 동일한 상황
- ex2) 레플리카 수 3, min.insync.replicas=2 : 2개에 저장되면 성공으로 판단, 리더 + 팔로워 중 1개
- ex3) 레플리카 수 3, min.insync.replicas=3 : 전부 다 저장되어야함. 이건 팔로워가 1개라도 장애나면 레플리카 수가 부족해져서 계속 실패를 반환할 수 있음 (주의)
- 흔히 볼 수 있는 전송 중 예외 발생 경우
- 전송 전 직렬화 실패
- 전송 전 배치크기 초과
- 전송 전 버퍼가 꽉차서, 기다리다가 실패
- 전송 중 timeout
- 전송 중 리더가 죽어서, 새로운 리더를 선출
- 전송 중 브로커의 메세지 크기 한도 초과
- 전송 재시도
- 재시도가능하다면 재시도 처리 : 위의 timeout, 일시적 리더 없음 등
- retries 옵션으로 자동 재전송
- 개발자가 코드로 send() 를 재호출도 가능
- 이때는 무한재시도 주의해야함
- 실패를 로깅해서 수동이나 자동으로 보정해야할 수도 있음
- 중복 전송
- 브로커 응답이 늦게와서, 재시도하는 경우, 중복으로 메세지가 전송될 수 있음 (ex. timeout)
- enable.idempotence 옵션으로 중복전송 가능성을 낮출 수 있음
- max.in.flight.requests.per.connection 옵션 : 하나의 커넥션에서 전송할 수 있는 최대 전송중인 메세지 수
- 이 값이 1보다 크다면, 재시도 시점에 메세지 순서가 뒤바뀔수 있음
- 전송 순서가 중요하다면, 이 값을 1로 해야함
8. 컨슈머
- 컨슈머 그룹은 컨슈머들이 속해있다.
- 1개의 파티션은 컨슈머 그룹 중 한개의 컨슈머만 연결이 가능하다.
- 그룹 내 N개의 컨슈머가 하나의 파티션을 공유할 수 없음
- 파티션의 수보다 컨슈머의 수가 더 많다면, 놀게되는 컨슈머가 생긴다.
- 이런 제약때문에, 컨슈머그룹 기준으로 파티션의 메세지는 순서대로 처리되는 것이 보장된다.
- 특정 토픽의 파티션에서 레코드를 조회
-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(설정); // group.id 등 consumer.subscribe(Arrays.asList("토픽명1", "토픽명2")); // 구독할 토픽목록을 전달 try { while (보통 무한루프) { // 다른 스레드에서 consumer.wakeup() 를 호출하면 WakeupException 이 발생함 ConsumerRecords<String, String> records = consumerconsumer.poll(Duration.of(대기할 시간)); for (ConsumerRecord<String, String> record : records) { // record 에는 value, topic, partition, offset 등이 들어 있음 // record 를 처리.. } } } catch (Exception e) { ... } finally { consumer.close(); }
- 메세지 조회
- poll() 이 실행될 때, 이전 커밋 offset 부터 읽는다. (1개씩 읽는게 아님)
- 읽어온 마지막 offset 에 커밋을 남긴다.
- 처음 접근하는 경우, 커밋된 offset 이 없을 수 있다.
- 이 때, auto.offset.reset 옵션을 사용함 (default = latest)
- earliest : 맨 처음 offset 을 사용
- latest : 가장 마지막 offset 을 사용
- none : 예외를 발생시킴
- 이 때, auto.offset.reset 옵션을 사용함 (default = latest)
- 조회 관련 옵션
- fetch.min.bytes : 조회 시 브로커가 전송할 최소 데이터 크기 (default = 1)
- 이걸 늘리면 시간은 늘지만, 처리량이 증가
- fetch.max.wait.ms : fetch.min.bytes 를 만족할 때까지 기다릴 시간 (default = 500)
- max.partition.fetch.bytes : 파티션 당 브로커가 반환할 수 있는 최대 크기 (default = 1MB)
- enable.auto.commint : auto.commit.interval.ms 주기마다 컨슈머가 읽은 offset 을 커밋할지 여부 (default = true)
- auto.commit.interval.ms 주기가 지났는지 확인해서 poll(), close() 메소드가 실행될 때 커밋함 (별도 스레드가 있는게 아님)
- false 는 수동 커밋
- 동기 : consumer.commitSync() : catch 로 실패 대응 가능
- 비동기 : consumer.commitAsync() : 후처리(ex. 실패 대응)가 필요하다면 인자로 OffsetCommitCallback 을 넘길 수 있음
- auto.commit.interval.ms : 자동 커밋 주기 (default = 5000)
- fetch.min.bytes : 조회 시 브로커가 전송할 최소 데이터 크기 (default = 1)
- 중복 조회
- 커밋이 실패했거나, 리밸런스(컨슈머가 추가되거나 빠질때)가 발생할 때, 동일한 메세지를 조회할 수 있음
- 재처리를 하더라도 이슈가 없도록 멱등성(idempotence)을 고려해서 개발해야함
- 데이터 특성에 따라, timestamp 나 unique 값을 활용
- 브로커의 컨슈머 관리
- 컨슈머는 heartbeat를 지속적으로 브로커에게 전송
- 브로커는 특정 컨슈머가 heartbeat 를 전송하지 않으면, 컨슈머 그룹에서 컨슈머를 제외해버리고 리밸런스를 진행
- 관련 설정
- session.timeout.ms : 브로커가 특정 컨슈머가 살았는지 판단하기 위해 최대 대기 시간 (default = 10000)
- heartbeat.interval.ms : 컨슈머가 브로커에게 heartbeat 전송하는 주기 (default = 3000)
- session.timeout.ms 의 1/3 이하로 설정하는 걸 권장
- max.poll.interval.ms : poll() 메소드의 최대 호출 간격, 이 시간이 지나도록 poll() 이 실행되지 않으면, 컨슈머를 제외시켜버림
- KafkaConsumer 는 스레드 안전하지 않음.
- 멀티스레드에서 사용 불가능
- wakeup() 메소드는 다른 스레드에서 써야함
- consume 시 실패 및 예외처리
- kafka 에서 가장 까다로운 부분이 아닐까 생각된다.
- 기본적으로 retry N회 수행
- retry topic 으로 발행하여, 별도의 컨슈머로 처리
- 위에서도 실패한다면, error topic 으로 발행하여 개발자가 확인하며 수동으로 처리
9. kafka manager
- 야후에서 만든 대시보드, 오픈소스
- cli 로 하던 것들을 여기서 대부분 가능함
- 주요하게 보면 좋은 지표들 (토픽)
- brokers spread : 해당 토픽의 파티션들이 몇개의 브로커에 분산되어 있는지 퍼센트로 표기
'database' 카테고리의 다른 글
Redis maxmemory (policy, samples, replica-ignore) (0) | 2022.04.24 |
---|---|
[데이터 중심 애플리케이션 설계] 1장. 신뢰할 수 있고 확장 가능하며 유지보수하기 쉬운 어플리케이션 (0) | 2022.04.18 |
Redis 기본 개념 (기초, Collection 타입, Expire, Persistence) (2) | 2020.06.25 |
Commons DBCP2 (dbcp 정의, 커넥션 속성, Evictor, 트랜잭션, Statements Pool, 예제) (1) | 2018.04.11 |
CentOS 7에 MongoDB 설치 (0) | 2017.08.04 |
Comments