일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 콘솔출력문
- 다중if
- 사용자입력Switch문
- 비교연산자
- 산술 쉬프트 연산자
- 자바
- 참조형호출
- 순환문
- println()
- 안드로이드스튜디오
- 논리 쉬프트 연산자
- 비트논리연산자
- 변수유효범위
- 타입변환
- 삼항 연산자
- 문자열
- Java
- 다차원배열
- 기본형호출
- dowhile문
- 배열
- 단순if
- 명명규칙
- for문
- while문
- array
- 비정방행렬
- print()
- 증감연산자
- 대입 연산자
- Today
- Total
신입개발자
Kafka Producer 본문
Kafka Producer
- 개요
- 발행/구독 모델에서 발행을 담당
- 카프카에 이벤트(= 레코드, 메시지)를 게시하는 클라이언트 애플리케이션
카프카 토픽에 대해 메시지를 발행하여 생산하는 주체
- 이벤트에는 키, 값, 타임스탬프 및 선택적 메타데이터 헤더 (ProducerRecord의 형태로 전달)
키 : kafka 내의 메시지 라우팅 및 파티셔닝에 사용
값 : 메시지의 실제 내용
- 컨슈머와 완전히 분리
프로듀서는 소비자를 기다릴 필요가 없음
확장성
- 특징
부하 분산
○ 프로듀서는 중간 라우팅 계층 없이 파티션의 리더인 브로커에 직접 데이터를 보냄
○ 모든 카프카 노드는 프로듀서가 요청을 적절하게 지시할 수 있도록, 주어진 시간에 어떤 서버가 활성 상태이고 토픽 파티션의 리더가 어디있는지에 대한 메타 데이터 요청에 응답 가능
○ 클라이언트는 메시지를 게시하는 파티션 제어
- 무작위 로드 밸런싱 구현 > 일부 시맨틱 분할 기능으로 수행될 수 있음
- 사용자가 파티션에 대한 키를 지정하고 이를 사용하여 파티션으로 해시 할 수 있도록 함 > 시맨틱 파티션을 위한 인터페이스를 노출
- 파티셔닝은 메시지의 확장성과 병렬 처리를 허용
○ ex) 선택한 키가 사용자ID인 경우 지정된 사용자에 대한 모든 데이터가 동일한 파티션으로 전송됨
비동기 전송
○ 프로듀서 입장의 batch 기능은 일정 크기만큼 메시지를 모아서 전송하는 것
○ 데이터 양보다 요청 수 제어가 관건
○ 일괄처리를 활성화하기 위해 메모리에 데이터를 축적하고 단일 요청으로 더 큰 일괄 처리를 보내려고 시도
○ 고정된 수의 메시지만 축적하고 고정된 지연 시간(예: 64k or 10ms)보다 더 이상 기다리지 않도록 구성할 수 있음
○ 더 많은 바이트를 보낼 수 있음
메시지 전달 시맨틱
○ 구성가능한 메시지 전달 시맨틱 제공
○ 안정성과 일관성 보장
○ 기본 “at least once” 적어도 한번 > 메시지가 kafka로 전달되는 것이 보장되지만 실패할 경우 중복 가능
○ At most once(최대 한번) : 메시지가 손실될 수 있지만 재 전달은 하지 않음 (ack =0,1)
○ At least once(최소 한번) : 메시지가 손실되지 않지만 재전달이 일어남
○ Exactly once(정확히 한번) : 메시지는 정확히 한번 전달됨 (ack =1)
Acknowledgments
○ 메시지 내구성 보장을 위해 kafka에 확인 요청
압축
○ 네트워크 대역폭과 스토리지 사용량을 줄이기 위해 메시지 압축 지원
오류 처리
○ 네트워크 오류, 파티션 리더 변경 또는 메시지 직렬화 문제 등에 대해 오류 처리
○ 재시도, 시간 제한 설정, 사용자 지정 오류 처리기 구현 등
프로듀서 인터셉터
○ 프로듀서 레코드를 Kafka로 보내기 전에 가로채고 수정할 수 있음
○ 메시지에 추가 정보, 사용자 지정 논리 구현 등
- 사용
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
- 실행할 메인 클래스(ProducerMain)와 메인 메서드를 만들어 Producer Client를 구현
○ 프로듀서 초기화를 위한 Properties를 세팅
§ bootstrap.servers: 카프카 클러스터에 연결하기 위한 브로커 목록
§ key.serializer : 메시지 키 직렬화에 사용되는 클래스
§ value.serializer : 메시지 값을 직렬화하는데 사용되는 클래스
○ Properties 설정으로 KafkaProducer 인스턴스 생성
○ topic partition key data 값을 지정해 ProducerRecord 인스턴스를 생성
○ producer.send(record) 메서드를 통해 작성한 레코드를 카프카 클러스터에 전송
○ Properties configs = new Properties();
§ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"
§ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
§ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
○ KafkaProducer<String,String> producer = new KafkaProducer<>(configs);
○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, data);
○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, key, data);
○ ProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, key, data);
○ producer.send(record);
○ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header>headers)
- config
기본설정 | |
key.serializer | org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 키의 Serializer 클래스 |
value.serializer | org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 값에 대한 Serializer 클래스 |
bootstrap.servers | [host1:port1,host2:port2,...] |
카프카 클러스터에 대한 초기 연결을 설정하는데 사용할 호스트/포트 쌍 목록 | |
클라이언트는 지정 서버와 상관없이 모든 서버를 사용하고, 설정값은 전체 서버 집합을 검색하는 초기 호스트에만 사용됨 | |
>> 전체 서버 집합이 포함될 필요가 없음(서버가 다운된 경우에만 둘 이상 필요할수도) | |
buffer.memory | 프로듀서가 서버로 전송되기를 기다리는 레코드를 버퍼링하는데 사용하는 총 메모리 바이트 |
레코드가 서버로 전달될 수 있는 것보다 더 빨리 전송되면 생산자는 max.block.ms 동안 차단한 후 exception 발생시킴 | |
producer가 사용할 총 메모리와 거의 일치해야 하지만, 일부 추가 메모리는 압축 및 진행 중인 요청 유지에 사용 | |
유형 long, 기본값 33554432 | |
compression.type | 프로듀서가 생성한 모든 데이터의 압축 유형 |
기본값 압축 없음 | |
종류 none, gzip, snappy, lz4, zstd | |
전체 배치 데이터와 관련되어, 배치의 효율성이 압축률에 영향을 줌(더 많은 일괄 처리는 더 나은 압축을 의미) | |
retries | 0보다 큰 값을 설정하면 클라이언트가 일시적일수도 있는 오류로 전송에 실패한 모든 레코드를 재전송 |
클라이언트가 오류 수신 시 레코드를 재전송하는 것과 다르지 않음 | |
delivery.timeoout.ms에 의해 구성된 제한 시간이 성공적인 승인 전에 만료되면 retries 이전에 생성 요청이 실패됨 | |
일반적으로 이 config 대신 delivery.timeout.ms로 재시도 동작을 제어하는 것이 추천됨 | |
멱등성(연산을 여러 번 반복해도 한번만 수행한 것과 동일한 결과) 활성화를 위해서는 0보다 커져야 | |
충돌하는 구성이 설정되고 멱등성이 명시적으로 활성화되지 않은 경우 멱등성은 비활성화 | |
배치 관련 | |
batch.size | 프로듀서는 여러 레코드가 동일 파티션으로 전송될 때마다 레코드를 더 적은 수의 요청으로 일괄 처리하려고 시도 |
기본 배치 크기(바이트)를 제어 - 보낼 배치 크기의 상한값 | |
이 크기보다 더 큰 레코드는 일괄 처리하도록 시도하지 않음 | |
브로커에 전송된 요청에는 여러 배치가 포함됨(전송할 수 있는 데이터가 있는 각 파티션에 대해) | |
배치 크기가 작으면 배치가 덜 일반적이고 처리량이 감소할 수 있음 | |
배치 크기가 0이면 배치 비활성화 | |
배치 크기가 지나치게 크면 추가 레코드를 예상해 지정된 배치 크기의 버퍼를 항상 할당해서 메모리 낭비가 생길 수 있음 | |
이 파티션에 대해 누적된 바이트 수가 설정 크기보다 적으면 추가 레코드가 올때까지 linger.ms 만큼 대기 | |
> linger.ms 기본 값 0 > 누적된 배치 크기가 설정값보다 작아도 레코드 즉시 전송 | |
기본값 16384 | |
linger.ms | 프로듀서는 요청 전송 사이에 도착하는 모든 레코드를 단일 일괄 요청으로 그룹화 |
(일반적으로 레코드가 보낼수 있는것보다 더 빨리 도착하는 로드 상태에서만 발생) | |
어떤 상황에서는 클라이언트가 적당한 부하에서도 요청 수를 줄이고자 할 수 있음 | |
>> 이 설정은 소량의 인위적인 지연을 추가하여 이를 수행 | |
레코드를 즉시 보내는 대신 생산자가 다른 레코드를 보낼 수 있도록 지정된 지연까지 기다렸다가 전송을 일괄 처리 | |
TCP Nagle 알고리즘과 유사 | |
일괄 처리 지연에 대한 상한선을 제공 | |
파티션에 대한 레코드의 batch.size 값을 채우면 이 설정과 상관없이 즉시 전송 | |
> 누적된 바이트 수가 적으면 지정된 시간 동안 대기 | |
기본값은 0(지연없음) | |
request.timeout.ms | 클라이언트가 요청 응답을 기다리는 최대 시간 제어 |
제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 (필요하면) 요청을 다시 보내거나 재시도가 소진되면 요청에 실패 | |
불필요한 생산자 재시도로 인한 메시지 복제 가능성을 줄이기 위해 replica.lag.time.max.ms(브로커 구성)보다 커야 | |
delivery.timeout.ms | send() 호출이 반환된 후 성공 또는 실패를 보고하는 시간의 상한 |
전송 전에 레코드 지연되는 총 시간, 브로커에서 승인을 기다리는 시간, 재시도 가능한 전송 실패에 허용되는 시간을 제한 | |
복구할 수 없는 오류, 재시도 소진, 배달만료 기한에 도달한 배치에 레코드가 추가된 경우 | |
이 설정보다 먼저 레코드를 보내지 못한 것으로 보고할 수 있음 | |
기본값 120000(2분) | |
request.timeout.ms + linger.ms 보다 크거나 같아야 | |
메시지 전송 관련 | |
max.block.ms | KafkaProducer의 send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), |
commitTransaction() 및 abortTransaction() 메서드가 차단되는 기간을 제어 | |
send()의 경우 메타데이터 가져오기 및 버퍼 할당을 기다리는 총 시간 | |
partitionsFor()의 경우 메타데이터를 사용할 수 없는 경우 이를 기다리는데 소요되는 시간 제한 | |
트랜잭션 관련 메서드는 항상 블록되지만, 트랜잭션 코디네이터를 찾을 수 없거나 제한 시간 내 응답이 없으면 타임아웃 시킬 수 있음 | |
기본값 60000(1분) | |
max.request.size | 요청의 최대 바이트 크기 |
대량 요청을 보내는 것을 피하도록 생산자가 단일 요청으로 보낼 레코드 배치 수를 제한 | |
압축되지 않은 최대 레코드 배치 크기에 대한 상한선이기도 함 | |
서버에는 별도의 레코드 배치 크기가 있을 수 있음 | |
기본값 1048576 | |
partitioner.class | 레코드를 생성할 때 보낼 파티션을 결정할 때 사용하는 클래스 |
미 설정시 기본 파티셔닝 논리 사용(최소한 batch.size가 파티션에 들어올때까지 파티션 유지) | |
파티션이 지정되지 않았지만 키가 있을 때, 키의 해시를 기반으로 파티션 생성 | |
파티션과 키가 없는 경우 최소한 batch.size가 파티션에 들어올때까지 고정 파티션 | |
org.apache.kafka.clients.producer.RoundRobinPartitioner | |
> 일련의 연속 레코드의 각 레코드가 소진될 때까지(키와 무관) 다른 파티션으로 전송됨 | |
새 배치가 생성될 때 고르지 않은 배포를 유발할 수 있음 | |
org.apache.kafka.clients.producer.Partitioner 인터페이스를 구현하면 커스텀 파티셔너 연결 가능 | |
partitioner.ignore.keys | true인 경우 생산자는 레코드 키를 사용하여 파티션을 선택하지 않음 |
false인 경우 생산자는 키가 있을 때 키의 해시를 기반으로 파티션 선택 | |
사용자 지정 파티션 사용시 이 설정 적용되지 않음 | |
request.timeout.ms | 클라이언트가 요청 응답을 기다리는 최대 시간 제어 |
제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 (필요하면) 요청을 다시 보내거나 재시도가 소진되면 요청에 실패 | |
불필요한 생산자 재시도로 인한 메시지 복제 가능성을 줄이기 위해 replica.lag.time.max.ms(브로커 구성)보다 커야 | |
interceptor.classes | org.apache.kafka.clients.producer.ProducerInterceptor 인터페이스를 구현하면 |
프로듀서에게 받은 레코드를 kafka 클러스터에 게시하기전에 가로채거나 변경 가능 | |
기본 값 없음 | |
retry.backoff.ms | 지정된 토픽 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간 |
기본값 100 | |
acks 관련 | |
acks | 생산자가 요청 완료를 고려하기 전에 리더가 수신해야 하는 승인의 수 |
전송된 레코드의 내구성을 제어 | |
acks=0 인 경우 생산자는 서버의 승인을 전혀 기다리지 않음 | |
> 레코드는 즉시 소켓 버퍼에 추가되고 전송된 것으로 간주 | |
> 서버가 레코드를 수신했다고 보장할 수 없고 재시도 구성 적용되지 않음 | |
> 레코드 반환 오프셋은 항상 -1로 설정 | |
acks=1 리더가 레코드를 로컬 로그에 기록하지만 모든 팔로워의 전체 승인을 기다리지 않고 응답 | |
> 리더가 레코드를 승인하고 팔로어가 레코더를 복제하기 전에 실패하면 레코드가 손실될 수 있음 | |
acks=all 리더가 동기화된 복제본의 전체 세트가 레코드를 승인할 때까지 대기 | |
> 하나 이상의 동기화 복제본이 살아있는 한 레코드가 손실되지 않음 | |
> acks=-1과 동일 | |
> 멱등성 활성화를 위해서는 all이어야 | |
기본값 all | |
enable.idempotence | true(기본값)인 경우 프로듀서는 각 메시지의 정확히 하나의 복사본이 스트림에 기록되도록 함 |
false 인 경우 브로커 오류 등으로 인해 생산자가 재시도하고 재시도된 메시지의 중복이 스트림에 기록될 수 있음 | |
활성화를 위해서는 | |
> max.in.flight.requests.per.connection 값이 5보다 작거나 같아야(허용되는 모든 값에 대해 메시지 순서가 유지됨) | |
> retries 가 0보다 커야하며 | |
> asks가 all이어야 | |
충돌하는 구성이 설정되지 않은 경우 기본적으로 활성화 | |
멱등성이 명시적으로 활성화 되고 충돌하는 구성이 설정된 경우 ConfigException 발생 | |
max.in.flight.requests.per.connection | 클라이언트가 차단하기 전에 단일 연결에서 보낼 최대 승인되지 않은 요청 수 |
값이 1보다 크고 enable.idempotence false인 경우 재시도로 인해 전송 실패 후 메시지가 재정렬될 위험이 있음 | |
재시도가 비활성화 되거나 enable.idempotence true인 경우 순서가 유지 | |
기본값 5 | |
트랜잭션 관련 | |
transaction.timeout.ms | 코디네이터가 트랜잭션을 사전에 중단하기 전에 트랜잭션이 열려있는 최대 시간 |
트랜잭션 시작은 첫번째 파티션이 추가될때 설정 | |
브로커의 transaction.max.timeout.ms 설정보다 크면 InvalidTxnTimeoutException 오류와 함께 요청이 실패함 | |
기본값 60000(1분) | |
transactional.id | 트랜잭션 전달에 사용할 TransactionalId |
이 값으로 클라이언트는 동일한 TransactionalId를 사용하는 트랜잭션이 | |
새 트랜잭션 시작 전에 완료되었음을 보장할 수 있음 | |
TransactionalId가 제공되지 않으면 생산자는 idempotent 전달로 제한 | |
TransactionalId가 구성된 경우 enable.idempotence 상태를 의미함 | |
기본값이 없으므로 트랜잭션이 되지 않음 | |
기본적으로 브로커가 3개 이상 있는 클러스터가 필요 | |
> 브로커의 transaction.state.log.replication.factor 설정을 조정하여 변경 가능 | |
기타 연결 관련 | |
security.protocol | 브로커와 통신하는데 사용되는 프로토콜 |
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL | |
metadata.max.age.ms | 새 브로커/파티션을 사전에 검색하기 위해 (파티션 리더십 변경이 표시되지 않은 경우에도) 메타데이터를 강제로 새로 고치는 시간 |
기본값 300000(5분) | |
metadata.max.idle.ms | 생산자가 유휴 상태인 토픽에 대해 메타데이터를 캐시하는 기간을 제어 |
토픽에 마지막으로 프로듀스된 이후 메타데이터 유휴 기간을 초과하면, 토픽의 메타데이터가 잊혀지고 다음 엑세스 시 메타데이터 가져오기 요청이 강제 실행됨 | |
reconnect.backoff.max.ms | 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간 |
값이 설정된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가 | |
플러스마이너스 20% | |
기본값 1000(1초) | |
reconnect.backoff.ms | 지정된 호스트에 다시 연결을 시도하기 전에 대기하는 기본 시간 |
브로커에 대한 클라이언트의 모든 연결 시도에 적용됨 | |
기본값 50 | |
client.dns.lookup | 클라이언트가 DNS 조회를 사용하는 방법을 제어 |
use_all_dns_ips로 설정하면 성공적으로 연결될 때까지 반환된 각 IP 주소에 순서대로 연결 | |
resolve_canonical_bootstrap_servers_only로 설정된 경우 각 부트스트랩 주소를 정식 이름 목록으로 확인 | |
client.id | 요청을 할 때 서버에 전달할 id 문자열 |
논리적 애플리케이션 이름이 서버측 요청 로깅에 포함될 수 있도록 하여 ip/port 이외의 요청 소스를 추적할 수 있도록 하는 목적 | |
socket.connection.setup.timeout.max.ms | 클라이언트가 소켓 연결이 설정되기를 기다리는 최대 시간 |
연결 설정 제한 시간은 이 최대값까지 각각의 연속 연결 실패에 대해 기하급수적으로 증가 | |
0.2.의 무작위 계수가 적용되어 20% 상하의 임의 범위 | |
기본값 30000(30초) | |
socket.connection.setup.timeout.ms | 클라이언트가 소켓 연결이 설정되기를 기다리는 시간 |
제한시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음 | |
기본값 10000(10초) | |
connections.max.idle.ms | 설정 시간이 지나면 유휴 연결을 닫음 |
기본값 540000(9분) | |
send.buffer.bytes | 데이터를 보낼 때 사용할 TCP 송신 버퍼(SO_SNDBUF) 크기 |
값이 -1이면 OS 기본값 | |
기본값 32768(32kibibytes) | |
receive.buffer.bytes | 데이터를 읽을 때 사용할 TCP 수신 버퍼(SO_RCVBUF) 크기 |
값이 -1이면 OS 기본값 | |
기본값 32768(32kibibytes) | |
보안 설정 관련 | |
ssl.enabled.protocols | SSL 연결에 사용할 프로토콜 목록 |
기본값은 Java 11 이상에서 TLSv1.2,TLSv1.3 / 그렇지 않으면 TLSv1.2 | |
Java 11 이상일 때 TLSv1.3 선호, 아닐 때 TLSv1.2 | |
ssl.protocol | SSLContext를 생성하는 데 사용되는 SSL 프로토콜 |
Java 11 이상일 때 TLSv1.3, 아닐 때 TLSv1.2 | |
'TLS', 'TLSv1.1', 'SSL', 'SSLv2' 및 'SSLv3'은 이전 JVM에서 지원될 수 있지만 취약성으로 권장되지 않음 | |
이 구성 및 'ssl.enabled.protocols'의 기본값을 사용하면 | |
서버가 'TLSv1.3'을 지원하지 않는 경우 클라이언트가 'TLSv1.2'로 다운그레이드됨 | |
기본값 TLSv1.2 | |
ssl.provider | SSL 연결에 사용되는 보안 공급자의 이름 |
기본값 JVM의 기본 보안 공급자 | |
ssl.keystore.type | 키 저장소 파일의 파일 형식(선택사항) |
기본 ssl.engine.factory.class 에서 지원하는 값은 JKS, PKCS12, PEM | |
기본값 JKS | |
ssl.truststore.type | 트러스트 저장소 파일의 파일 형식 |
기본 ssl.engine.factory.class 에서 지원하는 값은 JKS, PKCS12, PEM | |
기본값 JKS | |
security.providers | 보안 알고리즘을 구현하는 공급자를 반환하는 구성 가능한 작성자 클래스 목록 |
org.apache.kafka.common.security.auth.SecurityProviderCreator 구현해야 | |
기타 sasl config | |
기타 ssl config |
'개인 개발 공부' 카테고리의 다른 글
Kafka Consumer (0) | 2023.08.31 |
---|---|
Kafka Stream (0) | 2023.08.31 |
Apache Kafka 설치부터 실행 (0) | 2023.08.31 |
JUnit 테스트 (0) | 2023.08.31 |
Burp Suite (0) | 2023.08.31 |