목차
1. 카프카 소개
2. Strimzi 소개
3. Strimzi 설치 & 카프카 클러스터 생성
4. 토픽 생성 및 메시지 주고 받기
5. 장애발생
1. 카프카 소개
Apache Kafka
빠르고 확장 가능한 작업을 위해 데이터 피드의 분산 스트리밍, 파이프 라이닝 및 재생을 위한 실시간 스트리밍 데이터를 처리하기 위한 목적으로 설계된 오픈 소스 분산형 게시-구독 메시징 플랫폼.
- Kafka는 서버 클러스터 내에서 데이터 스트림을 레코드로 유지하는 방식으로 작동하는 브로커 기반 솔루션입니다.
- Kafka 서버는 여러 데이터 센터에 분산되어 있을 수 있으며 여러 서버 인스턴스에 걸쳐 레코드 스트림(메시지)을 토픽으로 저장하여 데이터 지속성을 제공할 수 있습니다.
- 토픽은 레코드 또는 메시지를 키, 값 및 타임 스탬프로 구성된 일련의 튜플, 변경 불가능한 Python 객체 시퀀스로 저장합니다.
- Apache Kafka는 가장 빠르게 성장하는 오픈 소스 메시징 솔루션 중 하나입니다.주로 분산 시스템에 우수한 로깅 메커니즘을 제공하는 아키텍처 기반 설계 패턴 때문입니다.
Apache Kafka의 개념
토픽
토픽은 게시/구독 메시징에서 상당히 보편적인 개념입니다. Apache Kafka 및 기타 메시징 솔루션에서 토픽은 지정된 데이터 스트림(일련의 레코드/메시지)에 대한 관심을 표시하는 데 사용되는 주소 지정 가능한 추상화입니다. 토픽은 게시 및 구독할 수 있으며 애플리케이션에서 주어진 데이터 스트림에 대한 관심을 표시하는 데 사용하는 추상화 계층입니다.
파티션
Apache Kafka에서 토픽은 파티션이라는 일련의 순서 대기열로 세분화될 수 있습니다. 이러한 파티션은 연속적으로 추가되어 순차적 커밋 로그를 형성합니다. Kafka 시스템에서 각 레코드/메시지에는 지정된 파티션의 메시지 또는 레코드를 식별하는 데 사용되는 오프셋이라는 순차 ID가 할당됩니다.
영속성
Apache Kafka는 레코드/메시지가 게시될 때 지속적으로 유지하는 서버 클러스터를 유지 관리하여 작동합니다. Kafka 클러스터는 구성 가능한 보존 시간 제한을 사용하여 소비에 관계없이 주어진 레코드가 지속되는 기간을 결정합니다. 레코드/메시지가 보존 시간 제한 내에 있는 동안 레코드/메시지를 사용할 수 있습니다. 레코드/메시지가 이 보존 시간 제한을 초과하면 레코드/메시지가 삭제되고 공간이 확보됩니다.
토픽/파티션 확장
Apache Kafka는 서버 클러스터로 작동하기 때문에 주어진 토픽/파티션에서 각 서버에 부하를 공유하여 토픽/파티션을 확장할 수 있습니다. 이 부하 공유를 통해 Kafka 클러스터의 각 서버는 주어진 토픽/파티션에 대한 레코드/메시지의 배포 및 영속성을 처리할 수 있습니다. 개별 서버가 모든 배포 및 영속성을 처리하는 동안 모든 서버는 서버가 실패할 경우 내결함성과 고가용성을 제공하는 데이터를 복제합니다. 파티션은 파티션 리더로 선택된 한개 서버와 팔로워 역할을 하는 다른 모든 서버들로 분할됩니다. 파티션 리더 인 서버는 데이터의 모든 배포 및 영속성 (읽기/쓰기)을 처리하고 팔로워 서버는 내결함성을 위한 복제 서비스를 제공합니다.
프로듀서
Apache Kafka에서 프로듀서 개념은 대부분의 메시징 시스템과 다르지 않습니다. 데이터(레코드/메시지) 프로듀서는 주어진 레코드/메시지가 게시되어야 하는 토픽(데이터 스트림)를 정의합니다. 파티션은 추가 확장성을 제공하는 데 사용되므로 프로듀서는 주어진 레코드/메시지가 게시되는 파티션도 정의할 수 있습니다. 프로듀서는 주어진 파티션을 정의할 필요가 없으며 파티션을 정의하지 않음으로써 토픽 파티션에서 순차 순환 대기 방식의 로드 밸런싱을 달성할 수 있습니다.
컨슈머
대부분의 메시징 시스템과 마찬가지로 Kafka의 컨슈머는 레코드/메시지를 처리하는 엔터티입니다. 컨슈머는 개별 워크로드에서 독립적으로 작업하거나 지정된 워크로드에서 다른 컨슈머와 협력하여 작업하도록 구성할 수 있습니다(로드 밸런싱). 컨슈머는 컨슈머 그룹 이름을 기반으로 워크로드를 처리하는 방법을 관리합니다. 컨슈머 그룹 이름을 사용하면 컨슈머를 단일 프로세스 내, 여러 프로세스, 심지어 여러 시스템에 분산시킬 수 있습니다. 컨슈머 그룹 이름을 사용하여 컨슈머는 컨슈머 집합 전체에서 레코드/메시지 소비를 로드 밸런싱(동일한 컨슈머 그룹 이름을 가진 여러 컨슈머)하거나 토픽/파티션을 구독하는 각 컨슈머가 처리 메시지를 받도록 각 레코드/메시지를 고유하게 (고유한 컨슈머 그룹 이름을 가진 여러 컨슈머) 처리할 수 있습니다.
Strimzi 소개
Strimzi 는 쿠버네티스 환경에서 카프카 Kafka 운영 관리에 도움을 주는 Operator
Kubernetes 클러스터에서 Apache Kafka 를 실행하는 프로세스를 간소화하는 오퍼레이터
Strimzi를 통해 아래의 Kafka의 구성 요소를 쿠버네티스에 배포할 수 있도록 돕는다.
- 브로커 노드의 카프카 클러스터
- 복제된 ZooKeeper 인스턴스의 ZooKeeper 클러스터
- 외부 데이터 연결을 위한 Kafka Connect 클러스터
- 보조 클러스터에서 Kafka 클러스터를 미러링하기 위한 Kafka MirrorMaker 클러스터
- 모니터링을 위한 추가 Kafka 메트릭 데이터 추출을 위한 Kafka Exporter
- Kafka 클러스터에 HTTP 기반 요청을 하기 위한 Kafka Bridge
- 브로커 노드 간에 토픽 파티션의 균형을 재조정하는 Cruise Control
구성요소
- 카프카 커넥트 Connect : 데이터베이스 같은 외부 시스템과 카프카를 쉽게 연결해줌
- 카프카 브리지 Bridge: 카프카 클러스터 와 internal & external HTTP client applications 를 연동해줌
- 미러메이커 MirrorMaker 2.0 : 다중 클러스터 환경에서 클러스터 간 리플리케이션(미러링)을 구성해줌, 양뱡향 토픽 리플리케이션 지원
주요 요소
- 주키퍼 ZooKeeper : 카프카의 메타데이터 관리 및 브로커의 정상 상태 점검 health check 을 담당
- 카프카 Kafka 또는 카프카 클러스터 Kafka cluster : 여러 대의 브로커를 구성한 클러스터를 의미
- 브로커 broker : 카프카 애플리케이션이 설치된 서버 또는 노드를 말함
- 프로듀서 producer : 카프카로 메시지를 보내는 역할을 하는 클라이언트를 총칭
- 컨슈머 consumer : 카프카에서 메시지를 꺼내가는 역할을 하는 클라이언트를 총칭
- 토픽 topic : 카프카는 메시지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유함
- 파티션 partition : 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것을 말함
- 세그먼트 segment : 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일
- 메시지 message 또는 레코드 record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말함
기본동작
- 카프카는 데이터를 받아서 전달하는 데이터 버스 data bus 의 역할
- 카프카에 데이터 (메시지) 를 만들어서 주는 쪽은 프로듀서 producer 라 부르고, 데이터를 빼내서 소비하는 쪽은 컨슈머 consumer 라 함
- 주키퍼는 카프카의 정상 동작을 보장하기 위해 메타데이터 metadata (브로커들의 노드 관리 등) 를 관리하는 코디네이터 coordinator 임
- 프로듀서와 컨슈머는 클라이언트이며, 애플리케이션은 카프카와 주키퍼임
- 카프카는 프로듀서와 컨슈머 중앙에 위치하여, 프로듀서로부터 전달 받은 메시지들을 저장하고 컨슈머에 메시지를 전달함
- 컨슈머는 카프카에 저장된 메시지를 꺼내오는 역할을 함
- 브로커 broker 는 애플리케이션이 설치된 서버 또는 노드를 의미
- 리플리케이션 replication : 각 메시지들을 여러 개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작을 의미
- 하나의 브로커가 종료되더라도 카프카는 안정성을 유지할 수 있음
- 토픽 생성 명령어 중 replication-factor 3 이면, 원본을 포함한 리플리케이션(토픽의 파티션)이 총 3개를 의미함
- 원본(토픽의 파티션)을 리더 reader, 리플리케이션을 팔로워 follower
- 리더는 프로듀서, 컨슈머로부터 오는 모든 읽기/쓰기를 처리, 팔로워는 리더로부터 복제 유지
- 파티션 partition : 토픽 하나를 여러 개로 나눠 병렬 처리가 가능하게 만들 것을 의미
- 나뉜 파티션의 수 만큼 컨슈머를 연결 할 수 있어서, 병렬 처리가 가능함
- 파티션 수는 초기 생성 시 언제든지 늘릴 수 있지만 늘린 파티션은 줄일 수 없음, 컨슈머 LAG 모니터링으로 판단 할 것
- 컨슈머 LAG (지연) = ‘프로듀서가 보낸 메시지 갯수(카프카에 남아 있는 메시지 갯수)’ - 컨슈머가 가져간 메시지 갯수’
- 세그먼트 segment : 토픽의 파티션에 저장된 메시지들이 세그먼트라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장됨
- 오프셋 offset : 파티션에 메시지가 저장되는 위치, 오프셋은 순차적으로 증가(0, 1, 2...)
Strimzi 설치 & 카프카 클러스터 생성
Strimzi Operator 설치 with Helm : v0.38.0 - Chart
# 네임스페이스 생성
kubectl create namespace kafka
# Repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator
# 차트 설치 : 오퍼레이터 파드 설치
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka
# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka
# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnectors.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnects.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkamirrormaker2s.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkamirrormakers.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkanodepools.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkarebalances.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkas.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkatopics.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkausers.kafka.strimzi.io 2023-11-11T06:01:21Z
strimzipodsets.core.strimzi.io 2023-11-11T06:01:20Z
# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io
# (참고) 삭제
helm uninstall kafka-operator -n kafka && kubectl delete ns kafka
카프카 클러스터 생성 (Zookeeper) : 기존 Statefulsets 대신 → StrimziPodSets 이 기본 설정
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "topology.ebs.csi.aws.com/zone"
zookeeper:
replicas: 3
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: "topology.ebs.csi.aws.com/zone"
entityOperator:
topicOperator: {}
userOperator: {}
# (옵션) 신규 터미널 : 모니터링
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
# 카프카 클러스터 YAML 파일 확인 : listeners(3개), podAntiAffinity
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml
cat kafka-1.yaml | yh
# 카프카 클러스터 배포 : 카프카(브로커 3개), 주키퍼(3개), entityOperator 디플로이먼트
## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원 >> preferredDuringSchedulingIgnoredDuringExecution 미지원...
kubectl apply -f kafka-1.yaml -n kafka
# 배포된 리소스들 확인
kubectl get-all -n kafka
# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨
kubectl get kafka -n kafka
kubectl get cm,secret -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X
kubectl get strimzipodsets -n kafka
# 노드 정보 확인
kubectl describe node | more
kubectl get node --label-columns=topology.ebs.csi.aws.com/zone
kubectl describe pv | grep 'Node Affinity:' -A2
# 배포된 리소스 확인 : 배포된 파드 생성 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster
# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)
kubectl get svc,endpointslice -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 파드 저장소 확인
kubectl get pvc,pv -n kafka
kubectl df-pv
# 배포된 리소스 확인 : 컨피그맵 확인
kubectl get cm -n kafka
# 컨피그맵 상세 확인
kubectl describe cm -n kafka strimzi-cluster-operator
kubectl describe cm -n kafka my-cluster-zookeeper-config
kubectl describe cm -n kafka my-cluster-entity-topic-operator-config
kubectl describe cm -n kafka my-cluster-entity-user-operator-config
kubectl describe cm -n kafka my-cluster-kafka-0
kubectl describe cm -n kafka my-cluster-kafka-1
kubectl describe cm -n kafka my-cluster-kafka-2
...(생략)...
##########
# Node / Broker ID
##########
broker.id=${STRIMZI_BROKER_ID}
node.id=${STRIMZI_BROKER_ID}
##########
# Kafka message logs configuration >> 로그 디렉터리
##########
log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}
...
##########
# User provided configuration
##########
default.replication.factor=3
inter.broker.protocol.version=3.6
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
log.message.format.version=3.6
...
# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
# (옵션) NetworkPolicy 확인 >> 어떤 동작을 처리하는가?
kubectl get networkpolicy -n kafka
kubectl describe networkpolicy -n kafka
# (옵션) poddisruptionbudget 확인 >> 어떤 동작을 처리하는가?
kubectl get pdb -n kafka
kubectl describe pdb -n kafka
테스트용 파드 생성 후 카프카 클러스터 정보 확인
# 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
cat myclient.yaml | yh
# 데몬셋으로 myclient 파드 배포 : 어떤 네임스페이스에 배포되는가?
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
kubectl get pod -l name=kafkaclient -owide
# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile
# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe
# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list
# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka
Kafka UI
# 배포
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts
cat <<EOF > kafkaui-values.yml
yamlApplicationConfig:
kafka:
clusters:
- name: yaml
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
auth:
type: disabled
management:
health:
ldap:
enabled: false
EOF
# 설치
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml
# 접속 확인
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui.$MyDomain"
echo -e "kafka-ui Web URL = http://kafka-ui.$MyDomain"
토픽 생성 및 메시지 주고 받기
토픽 생성 및 관리
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: ${TOPICNAME}
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 1
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
min.insync.replicas: 2
# 토픽 모니터링 : Kafka-UI 같이 확인 >> 설정 반응이 조금 느릴 수 있음
watch -d kubectl get kafkatopics -n kafka
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, envsubst 활용
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml
cat mytopic.yaml | yh
TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
# 토픽 생성 확인 (kubectl native)
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
# 토픽 상세 정보 확인 : 설정값 미 지정 시 기본값이 적용
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe
Topic: mytopic1
TopicId: hz--p3HXTRq-54EVuuY68w
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2
segment.bytes=1073741824
retention.ms=7200000
message.format.version=3.0-IV1
Topic: mytopic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
# 토픽 Topic 생성 : 파티션 1개 리플리케이션 3개
kubectl exec -it ds/myclient -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000
# 토픽 생성 확인
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3 True
mytopic2 my-cluster 1 3 True
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
mytopic2
# 토픽 상세 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# 토픽의 파티션 갯수 늘리기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mytopic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# Kafka-UI 같이 확인
# 실습 구성도 그림 확인
# 토픽의 파티션 갯수 줄이기(안됨)
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 1
Error while executing topic command : Topic currently has 2 partitions, which is higher than the requested 1.
[2022-06-03 14:59:31,427] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
(kafka.admin.TopicCommand$)
# 토픽 일부 옵션 설정 : min.insync.replicas=2 를 min.insync.replicas=3 으로 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=3,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mytopic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# 토픽 일부 옵션 설정 : 다음 실습을 위해 min.insync.replicas=2 로 다시 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=2
토픽에 메시지 보내고 받기 : kafka-ui 해당 topic 에서 Live Mode 로 확인
# 토픽 모니터링
watch -d kubectl get kafkatopics -n kafka
# 사용 스크립트
kafka-console-producer.sh
kafka-console-consumer.sh
# 토픽에 데이터 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
> hello
> world
> 0
> 1
> 2
CTRL+D 로 빠져나오기
# 토픽 데이터 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
hello
world
0
1
2
CTRL+C 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property "parse.key=true" --property "key.separator=:"
>key1:doik1
>key2:doik2
>key3:doik3
CTRL+D 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property print.key=true --property key.separator="-" --from-beginning
null-hello
null-world
null-0
null-1
null-2
key1-doik1
key2-doik2
key3-doik3
CTRL+C 로 빠져나오기
# 토픽에 데이터 최대 컨슘 메시지 갯수 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --max-messages 2 --from-beginning
doik2
hello
Processed a total of 2 messages
# 토픽에서 특정 파티션만 컨슘 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --partition 0 --from-beginning
...
CTRL+C 로 빠져나오기
# 특정 오프셋 기준 컨슘 확인 등 다양한 조건의 확인 가능 >> 직접 찾아서 해보기
# 토픽 삭제 (kubectl native)
kubectl delete kafkatopics -n kafka mytopic1
# kakfa-ui 에서 통계 확인 Statistics start 후 확인
KafkaCat : NodePort로 접속 확인
# kafkacat 사용
# 정보 확인 : NodePort 접속
#docker run -it --network=host edenhill/kcat:1.7.1 -b YOUR_BROKER -L
NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')
NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})
echo $NODE1IP $NODEPORT
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L
Metadata for all topics (from broker -1: 127.0.0.1:30356/bootstrap):
3 brokers:
broker 0 at 192.168.10.103:31478 (controller)
broker 2 at 192.168.10.101:31617
broker 1 at 192.168.10.102:30565
4 topics:
topic "mytopic2" with 2 partitions:
partition 0, leader 0, replicas: 0,2,1, isrs: 0,2,1
partition 1, leader 1, replicas: 1,2,0, isrs: 1,2,0
...
# 메시지 보내기 Producer mode (reads messages from stdin):
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -t mytopic2 -P
1
2
3
CTRL+D 로 종료
컨슈머 그룹
kafka-consumer-groups.sh
# 토픽에 데이터 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF
101
102
103
104
105
106
107
108
109
110
EOF
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF
AAA
BBB
CCC
DDD
EOF
# 컨슈머 그룹 확인
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --list
__strimzi-topic-operator-kstreams
# 컨슈머 그룹 기반으로 동작, 특정 목적을 가진 컨슈머들을 묶음으로 사용하는 것. 컨슈머그룹으로 토픽의 레코드를 가져갈 경우 어느 레코드까지 읽었는지에 대한 데이터가 브로커에 저장됨
## 컨슈머 그룹은 따로 생성하는 명령을 적용하는 것이 아니라, 컨슈머를 동작할 때 컨슈머 그룹이름을 지정하면 새로 생성됨
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup --from-beginning
...
CTRL+C 로 빠져나오기
# 컨슈머 그룹 상태 확인
## 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인 가능
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
Consumer group 'mygroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 84 84 0 - - -
mygroup mytopic2 1 70 70 0 - - -
# 오프셋 리셋 : 가능한 이유? >> 컨슈머는 유연하게 메시지를 가져갈 수 있다!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --topic mytopic2 --reset-offsets --to-earliest --execute
GROUP TOPIC PARTITION NEW-OFFSET
mygroup mytopic2 0 0
mygroup mytopic2 1 0
# 다시 컨슈머 그룹 상태 확인 : LAG 확인됨!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
Consumer group 'mygroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 0 84 84 - - -
mygroup mytopic2 1 0 70 70 - - -
# 모니터링
while true; do kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe;echo "-----"; sleep 0.5; done
# 컨슈머 그룹 메시지 소비하기 : LAG 확인!
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup
...
CTRL+C 로 빠져나오기
# 다시 컨슈머 그룹 상태 확인 : LAG 0 확인!, CONSUMER-ID 확인!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 84 84 0 console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7 console-consumer
mygroup mytopic2 1 70 70 0 console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7 console-consumer
로그 세그먼트
로그 파일에 메시지를 저장하기 때문에 일정 기간 동안은 컨슈머들이 데이터를 가져갈 수 있다
# 파드와 노드 매칭 확인
kubectl get pod -n kafka -owide | grep kafka
# 카프카 설정 확인
kubectl describe cm -n kafka my-cluster-kafka-config
...
##########
# Kafka message logs configuration >> 로그 디렉터리
##########
log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}
# 로그 저장소 확인 : 특정 토픽(파티션 개수에 따른 폴더 생성됨)에 세그먼트 확인
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0/mytopic2-0
kubectl exec -it -n kafka my-cluster-kafka-1 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log1
kubectl exec -it -n kafka my-cluster-kafka-2 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log2
drwxr-xr-x - ubuntu 3 Jun 10:30 └── kafka-log2
...
.rw-r--r-- 4 ubuntu 4 Jun 00:23 ├── cleaner-offset-checkpoint
.rw-r--r-- 4 ubuntu 4 Jun 01:31 ├── log-start-offset-checkpoint
.rw-r--r-- 88 ubuntu 3 Jun 22:48 ├── meta.properties
drwxr-xr-x - ubuntu 4 Jun 00:34 ├── mytopic2-0
.rw-r--r-- 10M ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.index
.rw-r--r-- 105k ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.log
.rw-r--r-- 10M ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.timeindex
.rw-r--r-- 8 ubuntu 4 Jun 00:34 │ ├── leader-epoch-checkpoint
.rw-r--r-- 43 ubuntu 3 Jun 23:56 │ └── partition.metadata
drwxr-xr-x - ubuntu 4 Jun 00:34 ├── mytopic2-1
.rw-r--r-- 10M ubuntu 4 Jun 00:38 │ ├── 00000000000000000000.index
.rw-r--r-- 5.2k ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.log
.rw-r--r-- 10M ubuntu 4 Jun 00:38 │ ├── 00000000000000000000.timeindex
.rw-r--r-- 8 ubuntu 4 Jun 00:34 │ ├── leader-epoch-checkpoint
.rw-r--r-- 43 ubuntu 3 Jun 23:58 │ └── partition.metadata
.rw-r--r-- 1.3k ubuntu 4 Jun 01:31 ├── recovery-point-offset-checkpoint
.rw-r--r-- 1.3k ubuntu 4 Jun 01:32 └── replication-offset-checkpoint
# xxd 툴로 00000000000000000000.log 의 hexdump 내용 확인 : 보낸 메시지 내용 확인, 로그 파일에 저장된 메시지는 컨슈머가 읽어갈 수 있음
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- cat /var/lib/kafka/data-0/kafka-log0/mytopic2-0/00000000000000000000.log | xxd
...
00000040: 0001 0a68 656c 6c6f 0000 0000 0000 0000 ...hello........
00000050: 0100 0000 3d00 0000 0002 5259 97e5 0000 ....=.....RY....
00000060: 0000 0000 0000 0181 2536 7afb 0000 0181 ........%6z.....
00000070: 2536 7afb 0000 0000 0000 03e8 0000 0000 %6z.............
00000080: 0001 0000 0001 1600 0000 010a 776f 726c ............worl
00000090: 6400 0000 0000 0000 0002 0000 0039 0000 d............9..
메시지 키를 통해서 정해진 파티션을 통해서 메시지 전달 순서를 보장 및 분산 확인
# 토픽 정보 확인 : kakfa-ui
# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션0
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 0 --property print.key=true --property key.separator=":"
# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션1
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 1 --property print.key=true --property key.separator=":"
# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 --property "parse.key=true" --property "key.separator=:" <<EOF
key1:0
key1:1
key1:2
key2:3
key2:4
key2:5
EOF
# 모니터링 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 225 228 3 - - -
mygroup mytopic2 1 75 78 3 - - -
# 토픽 삭제 (kubectl native)
kubectl delete kafkatopics -n kafka mytopic1
kubectl delete kafkatopics -n kafka mytopic2
장애발생
장애 발생 1 및 동작 확인: 강제로 kafka or Zookeeper 파드 1개 삭제
# 모니터링
watch -d kubectl get pod -owide -n kafka
kubectl logs -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, ISR=2, envsubst 활용
TOPICNAME=mytopic3 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
# 토픽 정보 확인 : 컨트롤러 브로커 위치 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka -owide
NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')
NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L -t mytopic3 | grep controller
broker 0 at ec2-3-36-53-67.ap-northeast-2.compute.amazonaws.com:31498 (controller) >> 해당 유동 공인IP를 가진 EC2 찾기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# 메시지 받기 : script 혹은 kafka-ui
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
for ((i=1; i<=100; i++)); do echo "failover-test1-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
# 강제로 컨트롤러 브로커 파드 삭제(위치 확인) : 오퍼레이터가 annotate 설정을 모니터링 주기(2분)가 있어서 시간이 지나면 삭제가 실행됨
kubectl annotate pod -n kafka my-cluster-kafka-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-1 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-2 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
# 메시지 보내고 받기 가능!
...
failover-test1-415
% Reached end of topic mytopic3 [0] at offset 491
failover-test1-416
...
# 강제로 주키퍼 파드 삭제
kubectl annotate pod -n kafka my-cluster-zookeeper-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
# 메시지 보내고 받기 가능!
장애 발생 2 : 강제로 토픽 리더 파드가 있는 노드를 drain
# 모니터링
watch kubectl get pod -owide -n kafka
kubetail -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
# 카프카 토픽 정보 확인 : 리더파드가 있는 워커노드 위치 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# test 토픽 리더 kafka 파드의 워커노드 확인
kubectl get pod -owide -n kafka | grep kafka
# (터미널2) 메시지 받기
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
for ((i=1; i<=100; i++)); do echo "failover-test2-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test2-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
# test 토픽 리더 kafka 파드의 워커노드에서 drain : test topic leader pod evict
# kubectl drain <<노드>> --ignore-daemonsets --delete-emptydir-data
kubectl get node
NODE=<각자 자신의 EC2 노드 이름 지정>
NODE=ip-192-168-3-96.ap-northeast-2.compute.internal
kubectl drain $NODE --delete-emptydir-data --force --ignore-daemonsets && kubectl get node -w
# 해당 워커노드 drain 확인
kubectl get kafka,strimzipodsets -n kafka
kubectl get node
# kafka 파드 상태
kubectl get pod -l app.kubernetes.io/name=kafka -n kafka
# 카프카 토픽 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 2,0 # 브로커1는 not in-sync 상태
# ISR min.insync.replicas=3 으로 증가 후 메시지 보내고 받기 확인 >> ISR 기능에 대한 이해를 하자!
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=3
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# ISR min.insync.replicas=2 으로 설정 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=2
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# 동작 확인 후 uncordon 설정
kubectl get kafka,strimzipodsets -n kafka
kubectl uncordon $NODE