신입개발자

Kafka Producer 본문

개인 개발 공부

Kafka Producer

dks_a 2023. 8. 31. 13:55

Kafka Producer

  • 개요
  1. 발행/구독 모델에서 발행을 담당
  2. 카프카에 이벤트(= 레코드, 메시지)를 게시하는 클라이언트 애플리케이션

카프카 토픽에 대해 메시지를 발행하여 생산하는 주체

  1. 이벤트에는 키, 값, 타임스탬프 및 선택적 메타데이터 헤더 (ProducerRecord의 형태로 전달)

키 : kafka 내의 메시지 라우팅 및 파티셔닝에 사용

값 : 메시지의 실제 내용

  1. 컨슈머와 완전히 분리

프로듀서는 소비자를 기다릴 필요가 없음

확장성

  • 특징

부하 분산

○ 프로듀서는 중간 라우팅 계층 없이 파티션의 리더인 브로커에 직접 데이터를 보냄

○ 모든 카프카 노드는 프로듀서가 요청을 적절하게 지시할 수 있도록, 주어진 시간에 어떤 서버가 활성 상태이고 토픽 파티션의 리더가 어디있는지에 대한 메타 데이터 요청에 응답 가능

○ 클라이언트는 메시지를 게시하는 파티션 제어

  • 무작위 로드 밸런싱 구현 > 일부 시맨틱 분할 기능으로 수행될 수 있음
  • 사용자가 파티션에 대한 키를 지정하고 이를 사용하여 파티션으로 해시 할 수 있도록 함 > 시맨틱 파티션을 위한 인터페이스를 노출
  • 파티셔닝은 메시지의 확장성과 병렬 처리를 허용

○ ex) 선택한 키가 사용자ID인 경우 지정된 사용자에 대한 모든 데이터가 동일한 파티션으로 전송됨

비동기 전송

○ 프로듀서 입장의 batch 기능은 일정 크기만큼 메시지를 모아서 전송하는 것

○ 데이터 양보다 요청 수 제어가 관건

○ 일괄처리를 활성화하기 위해 메모리에 데이터를 축적하고 단일 요청으로 더 큰 일괄 처리를 보내려고 시도

○ 고정된 수의 메시지만 축적하고 고정된 지연 시간(예: 64k or 10ms)보다 더 이상 기다리지 않도록 구성할 수 있음

○ 더 많은 바이트를 보낼 수 있음

메시지 전달 시맨틱

○ 구성가능한 메시지 전달 시맨틱 제공

○ 안정성과 일관성 보장

○ 기본 “at least once” 적어도 한번 > 메시지가 kafka로 전달되는 것이 보장되지만 실패할 경우 중복 가능

○ At most once(최대 한번) : 메시지가 손실될 수 있지만 재 전달은 하지 않음 (ack =0,1)

○ At least once(최소 한번) : 메시지가 손실되지 않지만 재전달이 일어남

○ Exactly once(정확히 한번) : 메시지는 정확히 한번 전달됨 (ack =1)

Acknowledgments

○ 메시지 내구성 보장을 위해 kafka에 확인 요청

압축

○ 네트워크 대역폭과 스토리지 사용량을 줄이기 위해 메시지 압축 지원

오류 처리

○ 네트워크 오류, 파티션 리더 변경 또는 메시지 직렬화 문제 등에 대해 오류 처리

○ 재시도, 시간 제한 설정, 사용자 지정 오류 처리기 구현 등

프로듀서 인터셉터

○ 프로듀서 레코드를 Kafka로 보내기 전에 가로채고 수정할 수 있음

○ 메시지에 추가 정보, 사용자 지정 논리 구현 등

  • 사용

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>3.5.0</version>

</dependency>

  • 실행할 메인 클래스(ProducerMain)와 메인 메서드를 만들어 Producer Client를 구현

○ 프로듀서 초기화를 위한 Properties를 세팅

§ bootstrap.servers: 카프카 클러스터에 연결하기 위한 브로커 목록

§ key.serializer : 메시지 키 직렬화에 사용되는 클래스

§ value.serializer : 메시지 값을 직렬화하는데 사용되는 클래스

○ Properties 설정으로 KafkaProducer 인스턴스 생성

○ topic partition key data 값을 지정해 ProducerRecord 인스턴스를 생성

○ producer.send(record) 메서드를 통해 작성한 레코드를 카프카 클러스터에 전송

○ Properties configs = new Properties();

§ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"

§ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()

§ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

○ KafkaProducer<String,String> producer = new KafkaProducer<>(configs);

○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, data);

○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, key, data);

○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, key, data);

○ producer.send(record);

○ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header>headers)

  • config
기본설정
key.serializer org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 키의 Serializer 클래스
value.serializer org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 값에 대한 Serializer 클래스
bootstrap.servers [host1:port1,host2:port2,...]

카프카 클러스터에 대한 초기 연결을 설정하는데 사용할 호스트/포트 쌍 목록

클라이언트는 지정 서버와 상관없이 모든 서버를 사용하고, 설정값은 전체 서버 집합을 검색하는 초기 호스트에만 사용됨

>> 전체 서버 집합이 포함될 필요가 없음(서버가 다운된 경우에만 둘 이상 필요할수도)
buffer.memory 프로듀서가 서버로 전송되기를 기다리는 레코드를 버퍼링하는데 사용하는 총 메모리 바이트

레코드가 서버로 전달될 수 있는 것보다 더 빨리 전송되면 생산자는 max.block.ms 동안 차단한 후 exception 발생시킴

producer가 사용할 총 메모리와 거의 일치해야 하지만, 일부 추가 메모리는 압축 및 진행 중인 요청 유지에 사용

유형 long, 기본값 33554432
compression.type 프로듀서가 생성한 모든 데이터의 압축 유형

기본값 압축 없음

종류 none, gzip, snappy, lz4, zstd

전체 배치 데이터와 관련되어, 배치의 효율성이 압축률에 영향을 줌(더 많은 일괄 처리는 더 나은 압축을 의미)
retries 0보다 큰 값을 설정하면 클라이언트가 일시적일수도 있는 오류로 전송에 실패한 모든 레코드를 재전송

클라이언트가 오류 수신 시 레코드를 재전송하는 것과 다르지 않음

delivery.timeoout.ms에 의해 구성된 제한 시간이 성공적인 승인 전에 만료되면 retries 이전에 생성 요청이 실패됨

일반적으로 이 config 대신 delivery.timeout.ms로 재시도 동작을 제어하는 것이 추천됨

멱등성(연산을 여러 번 반복해도 한번만 수행한 것과 동일한 결과) 활성화를 위해서는 0보다 커져야

충돌하는 구성이 설정되고 멱등성이 명시적으로 활성화되지 않은 경우 멱등성은 비활성화


배치 관련
batch.size 프로듀서는 여러 레코드가 동일 파티션으로 전송될 때마다 레코드를 더 적은 수의 요청으로 일괄 처리하려고 시도

기본 배치 크기(바이트)를 제어 - 보낼 배치 크기의 상한값

이 크기보다 더 큰 레코드는 일괄 처리하도록 시도하지 않음

브로커에 전송된 요청에는 여러 배치가 포함됨(전송할 수 있는 데이터가 있는 각 파티션에 대해)

배치 크기가 작으면 배치가 덜 일반적이고 처리량이 감소할 수 있음

배치 크기가 0이면 배치 비활성화

배치 크기가 지나치게 크면 추가 레코드를 예상해 지정된 배치 크기의 버퍼를 항상 할당해서 메모리 낭비가 생길 수 있음

이 파티션에 대해 누적된 바이트 수가 설정 크기보다 적으면 추가 레코드가 올때까지 linger.ms 만큼 대기

> linger.ms 기본 값 0  > 누적된 배치 크기가 설정값보다 작아도 레코드 즉시 전송

기본값 16384
linger.ms 프로듀서는 요청 전송 사이에 도착하는 모든 레코드를 단일 일괄 요청으로 그룹화

(일반적으로 레코드가 보낼수 있는것보다 더 빨리 도착하는 로드 상태에서만 발생)

어떤 상황에서는 클라이언트가 적당한 부하에서도 요청 수를 줄이고자 할 수 있음

>> 이 설정은 소량의 인위적인 지연을 추가하여 이를 수행

레코드를 즉시 보내는 대신 생산자가 다른 레코드를 보낼 수 있도록 지정된 지연까지 기다렸다가 전송을 일괄 처리

TCP Nagle 알고리즘과 유사

일괄 처리 지연에 대한 상한선을 제공

파티션에 대한 레코드의 batch.size 값을 채우면 이 설정과 상관없이 즉시 전송

> 누적된 바이트 수가 적으면 지정된 시간 동안 대기

기본값은 0(지연없음)
request.timeout.ms 클라이언트가 요청 응답을 기다리는 최대 시간 제어

제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 (필요하면) 요청을 다시 보내거나 재시도가 소진되면 요청에 실패

불필요한 생산자 재시도로 인한 메시지 복제 가능성을 줄이기 위해 replica.lag.time.max.ms(브로커 구성)보다 커야
delivery.timeout.ms send() 호출이 반환된 후 성공 또는 실패를 보고하는 시간의 상한

전송 전에 레코드 지연되는 총 시간, 브로커에서 승인을 기다리는 시간, 재시도 가능한 전송 실패에 허용되는 시간을 제한

복구할 수 없는 오류, 재시도 소진, 배달만료 기한에 도달한 배치에 레코드가 추가된 경우 

이 설정보다 먼저 레코드를 보내지 못한 것으로 보고할 수 있음

기본값 120000(2분)

request.timeout.ms + linger.ms 보다 크거나 같아야


메시지 전송 관련
max.block.ms KafkaProducer의 send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), 

commitTransaction() 및 abortTransaction() 메서드가 차단되는 기간을 제어

send()의 경우 메타데이터 가져오기 및 버퍼 할당을 기다리는 총 시간

partitionsFor()의 경우 메타데이터를 사용할 수 없는 경우 이를 기다리는데 소요되는 시간 제한

트랜잭션 관련 메서드는 항상 블록되지만, 트랜잭션 코디네이터를 찾을 수 없거나 제한 시간 내 응답이 없으면 타임아웃 시킬 수 있음

기본값 60000(1분)
max.request.size 요청의 최대 바이트 크기

대량 요청을 보내는 것을 피하도록 생산자가 단일 요청으로 보낼 레코드 배치 수를 제한

압축되지 않은 최대 레코드 배치 크기에 대한 상한선이기도 함

서버에는 별도의 레코드 배치 크기가 있을 수 있음

기본값 1048576
partitioner.class 레코드를 생성할 때 보낼 파티션을 결정할 때 사용하는 클래스

미 설정시 기본 파티셔닝 논리 사용(최소한 batch.size가 파티션에 들어올때까지 파티션 유지)

파티션이 지정되지 않았지만 키가 있을 때, 키의 해시를 기반으로 파티션 생성

파티션과 키가 없는 경우 최소한 batch.size가 파티션에 들어올때까지 고정 파티션

org.apache.kafka.clients.producer.RoundRobinPartitioner

> 일련의 연속 레코드의 각 레코드가 소진될 때까지(키와 무관) 다른 파티션으로 전송됨

새 배치가 생성될 때 고르지 않은 배포를 유발할 수 있음

org.apache.kafka.clients.producer.Partitioner 인터페이스를 구현하면 커스텀 파티셔너 연결 가능
partitioner.ignore.keys true인 경우 생산자는 레코드 키를 사용하여 파티션을 선택하지 않음

false인 경우 생산자는 키가 있을 때 키의 해시를 기반으로 파티션 선택

사용자 지정 파티션 사용시 이 설정 적용되지 않음
request.timeout.ms 클라이언트가 요청 응답을 기다리는 최대 시간 제어

제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 (필요하면) 요청을 다시 보내거나 재시도가 소진되면 요청에 실패

불필요한 생산자 재시도로 인한 메시지 복제 가능성을 줄이기 위해 replica.lag.time.max.ms(브로커 구성)보다 커야
interceptor.classes org.apache.kafka.clients.producer.ProducerInterceptor 인터페이스를 구현하면 

프로듀서에게 받은 레코드를 kafka 클러스터에 게시하기전에 가로채거나 변경 가능

기본 값 없음
retry.backoff.ms 지정된 토픽 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간

기본값 100


acks 관련
acks 생산자가 요청 완료를 고려하기 전에 리더가 수신해야 하는 승인의 수

전송된 레코드의 내구성을 제어

acks=0 인 경우 생산자는 서버의 승인을 전혀 기다리지 않음

> 레코드는 즉시 소켓 버퍼에 추가되고 전송된 것으로 간주

> 서버가 레코드를 수신했다고 보장할 수 없고 재시도 구성 적용되지 않음

> 레코드 반환 오프셋은 항상 -1로 설정

acks=1 리더가 레코드를 로컬 로그에 기록하지만 모든 팔로워의 전체 승인을 기다리지 않고 응답

> 리더가 레코드를 승인하고 팔로어가 레코더를 복제하기 전에 실패하면 레코드가 손실될 수 있음

acks=all 리더가 동기화된 복제본의 전체 세트가 레코드를 승인할 때까지 대기

> 하나 이상의 동기화 복제본이 살아있는 한 레코드가 손실되지 않음

> acks=-1과 동일

> 멱등성 활성화를 위해서는 all이어야

기본값 all
enable.idempotence true(기본값)인 경우 프로듀서는 각 메시지의 정확히 하나의 복사본이 스트림에 기록되도록 함

false 인 경우 브로커 오류 등으로 인해 생산자가 재시도하고 재시도된 메시지의 중복이 스트림에 기록될 수 있음

활성화를 위해서는 

> max.in.flight.requests.per.connection 값이 5보다 작거나 같아야(허용되는 모든 값에 대해 메시지 순서가 유지됨)

> retries 가 0보다 커야하며

> asks가 all이어야

충돌하는 구성이 설정되지 않은 경우 기본적으로 활성화

멱등성이 명시적으로 활성화 되고 충돌하는 구성이 설정된 경우 ConfigException 발생
max.in.flight.requests.per.connection 클라이언트가 차단하기 전에 단일 연결에서 보낼 최대 승인되지 않은 요청 수

값이 1보다 크고 enable.idempotence false인 경우 재시도로 인해 전송 실패 후 메시지가 재정렬될 위험이 있음

재시도가 비활성화 되거나 enable.idempotence true인 경우 순서가 유지

기본값 5


트랜잭션 관련
transaction.timeout.ms 코디네이터가 트랜잭션을 사전에 중단하기 전에 트랜잭션이 열려있는 최대 시간

트랜잭션 시작은 첫번째 파티션이 추가될때 설정

브로커의 transaction.max.timeout.ms 설정보다 크면 InvalidTxnTimeoutException 오류와 함께 요청이 실패함

기본값 60000(1분)
transactional.id 트랜잭션 전달에 사용할 TransactionalId

이 값으로 클라이언트는 동일한 TransactionalId를 사용하는 트랜잭션이 

새 트랜잭션 시작 전에 완료되었음을 보장할 수 있음

TransactionalId가 제공되지 않으면 생산자는 idempotent 전달로 제한

TransactionalId가 구성된 경우 enable.idempotence 상태를 의미함

기본값이 없으므로 트랜잭션이 되지 않음

기본적으로 브로커가 3개 이상 있는 클러스터가 필요

> 브로커의 transaction.state.log.replication.factor 설정을 조정하여 변경 가능


기타 연결 관련
security.protocol 브로커와 통신하는데 사용되는 프로토콜

PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
metadata.max.age.ms 새 브로커/파티션을 사전에 검색하기 위해 (파티션 리더십 변경이 표시되지 않은 경우에도) 메타데이터를 강제로 새로 고치는 시간

기본값 300000(5분)
metadata.max.idle.ms 생산자가 유휴 상태인 토픽에 대해 메타데이터를 캐시하는 기간을 제어

토픽에 마지막으로 프로듀스된 이후 메타데이터 유휴 기간을 초과하면, 토픽의 메타데이터가 잊혀지고 다음 엑세스 시 메타데이터 가져오기 요청이 강제 실행됨
reconnect.backoff.max.ms 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간

값이 설정된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가

플러스마이너스 20%

기본값 1000(1초)
reconnect.backoff.ms 지정된 호스트에 다시 연결을 시도하기 전에 대기하는 기본 시간

브로커에 대한 클라이언트의 모든 연결 시도에 적용됨

기본값 50
client.dns.lookup 클라이언트가 DNS 조회를 사용하는 방법을 제어

use_all_dns_ips로 설정하면 성공적으로 연결될 때까지 반환된 각 IP 주소에 순서대로 연결

resolve_canonical_bootstrap_servers_only로 설정된 경우 각 부트스트랩 주소를 정식 이름 목록으로 확인
client.id 요청을 할 때 서버에 전달할 id 문자열

논리적 애플리케이션 이름이 서버측 요청 로깅에 포함될 수 있도록 하여 ip/port 이외의 요청 소스를 추적할 수 있도록 하는 목적
socket.connection.setup.timeout.max.ms 클라이언트가 소켓 연결이 설정되기를 기다리는 최대 시간

연결 설정 제한 시간은 이 최대값까지 각각의 연속 연결 실패에 대해 기하급수적으로 증가

0.2.의 무작위 계수가 적용되어 20% 상하의 임의 범위

기본값 30000(30초)
socket.connection.setup.timeout.ms 클라이언트가 소켓 연결이 설정되기를 기다리는 시간

제한시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음

기본값 10000(10초)
connections.max.idle.ms 설정 시간이 지나면 유휴 연결을 닫음

기본값 540000(9분)
send.buffer.bytes 데이터를 보낼 때 사용할 TCP 송신 버퍼(SO_SNDBUF) 크기

값이 -1이면 OS 기본값

기본값 32768(32kibibytes)
receive.buffer.bytes 데이터를 읽을 때 사용할 TCP 수신 버퍼(SO_RCVBUF) 크기

값이 -1이면 OS 기본값

기본값 32768(32kibibytes)


보안 설정 관련
ssl.enabled.protocols SSL 연결에 사용할 프로토콜 목록

기본값은 Java 11 이상에서 TLSv1.2,TLSv1.3 / 그렇지 않으면 TLSv1.2

Java 11 이상일 때 TLSv1.3 선호, 아닐 때 TLSv1.2
ssl.protocol SSLContext를 생성하는 데 사용되는 SSL 프로토콜

Java 11 이상일 때 TLSv1.3, 아닐 때 TLSv1.2

'TLS', 'TLSv1.1', 'SSL', 'SSLv2' 및 'SSLv3'은 이전 JVM에서 지원될 수 있지만 취약성으로 권장되지 않음

이 구성 및 'ssl.enabled.protocols'의 기본값을 사용하면 

서버가 'TLSv1.3'을 지원하지 않는 경우 클라이언트가 'TLSv1.2'로 다운그레이드됨

기본값 TLSv1.2
ssl.provider SSL 연결에 사용되는 보안 공급자의 이름

기본값 JVM의 기본 보안 공급자
ssl.keystore.type 키 저장소 파일의 파일 형식(선택사항)

기본 ssl.engine.factory.class 에서 지원하는 값은 JKS, PKCS12, PEM

기본값 JKS
ssl.truststore.type 트러스트 저장소 파일의 파일 형식

기본 ssl.engine.factory.class 에서 지원하는 값은 JKS, PKCS12, PEM

기본값 JKS
security.providers 보안 알고리즘을 구현하는 공급자를 반환하는 구성 가능한 작성자 클래스 목록

org.apache.kafka.common.security.auth.SecurityProviderCreator 구현해야


기타 sasl config
기타 ssl config  

'개인 개발 공부' 카테고리의 다른 글

Kafka Consumer  (0) 2023.08.31
Kafka Stream  (0) 2023.08.31
Apache Kafka 설치부터 실행  (0) 2023.08.31
JUnit 테스트  (0) 2023.08.31
Burp Suite  (0) 2023.08.31
Comments