Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- 산술 쉬프트 연산자
- 비트논리연산자
- dowhile문
- Java
- 문자열
- 변수유효범위
- 사용자입력Switch문
- 배열
- while문
- 단순if
- 비정방행렬
- 대입 연산자
- 명명규칙
- println()
- array
- 비교연산자
- print()
- 자바
- 순환문
- 기본형호출
- 증감연산자
- 참조형호출
- 타입변환
- 삼항 연산자
- 안드로이드스튜디오
- 다차원배열
- 콘솔출력문
- 다중if
- 논리 쉬프트 연산자
- for문
Archives
- Today
- Total
신입개발자
Kafka 고가용성 테스트 본문
[ 노드 하나씩 비활성화하는 경우 ]
kraft 폴더 아래에 있는 server.properties를 보면 각 서버별로 node.id를 정했다.
node.id=1 은 서버1
node.id=2 은 서버2
node.id=3 은 서버3
acks 옵션
OPTION | 손실율 | 속도 | DESCRIPTION |
acks = 0 | 상 | 상 | 프로듀서는 리더 브로커에게 메시지를 전송한 후 전송 성공 여부를 확인하지 않는다. 이 경우, 프로듀서는 메시지 전송이 성공했는지 여부를 알 수 없다. |
acks = 1 | 중 | 중 | 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader가 메시지를 받았는지 기다린다. follower들은 확인하지 않는다. leader가 확인 응답을 보내고, follower에게 복제가 되기 전에 leader가 fail되면, 해당 메시지는 손실될 수 있다. |
acks = all(-1) | 하 | 하 | 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader와 follower(replicas)까지 받았는지 기다립니다. 최소 하나의 복제본까지 처리된 것을 확인하므로 메시지가 손실 될 확률은 거의 없다. |
![]() |
![]() |
![]() |
- 기본값
Producer : batch.size = 16384 (두 설정 값 모두 설정하지 않아 default값)
Consumer : enable.auto.commit = false / max.poll.records = 1
내려갔다 다시 올라온 노드 : 2번(서버2IP)과 3번(서버3IP)
- acks = -1(=all)
노드를 비활성화하기 전 상태 :
Topic: ackall Partition: 0 Leader: 2 Replicas: 2,3 Isr: 3,2
Topic: ackall Partition: 1 Leader: 3 Replicas: 3,1 Isr: 1,3
Topic: ackall Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

메세지가 파티션 2로 메세지가 손실 없이 모두 들어왔습니다. 총 보낸 메세지는 160개였습니다.
- acks = 1
노드를 비활성화하기 전 상태 :
Topic: ack1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 3,2
Topic: ack1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 1,3
Topic: ack1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

메세지가 파티션 2로 메세지가 손실 없이 모두 들어왔습니다. 총 보낸 메세지는 150개였습니다.
- acks = 0
노드를 비활성화하기 전 상태 :
Topic: ack0 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: ack0 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: ack0 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1

메세지 손실이 발생했습니다. 총 보낸 메세지는 210개였습니다.
Rollback Transaction 테스트
https://yangbongsoo.tistory.com/77
- 서버1번만 살아있고 서버2번과 서버3번이 죽었을 때 rollback이 되어 메세지가 보내지지 않고 다시 서버가 살아났을 때 메세지를 보내는지 확인
Producer sample 코드
- kafka transaction 을 위해 필요한 설정에서 producer 에서 필요한 설정
- TRANSACTIONAL_ID_CONFIG 는 producer transaction 을 구분하기 위한 id이다.
- ENABLE_IDEMPOTENCE_CONFIG 는 true 로 지정. producer.send해서 메세지가 kafka broker 에 들어는 갔는데 timeout 이나 broker failure 로 실패응답을 받았다면 producer 는 같은 메세지를 재발행 한다. producer 입장에서는 실패 응답을 받았으므로 메세지가 kafka broker 에 들어갔는지 안들어갔는지 알 수가 없기 때문이다. consumer 입장에서는 중복 발행이 되어 똑같은 메세지를 2번 받게 된다. 코드상으로는 한번만 send 했지만 내부 구조상 재발행 하게 되어있다. 여기서 ENABLE_IDEMPOTENCE_CONFIG 가 true 로 되어 있으면, 재발행되서 두번째 메세지가 갔을 때 같은 메세지인지 비교한다. 그리고 두번째로 온 메세지가 이전과 같은 메세지라면 버린다.
@SpringBootApplication
@Slf4j
@Configuration
public class KafkaProducerTestApplication {
public static void main(String[] args) {
// Kafka Producer 설정
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "서버1IP:19092,서버2IP:19092,서버3IP:19092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 트랜잭션 초기화
producer.initTransactions();
try { // 트랜잭션 시작
producer.beginTransaction();
// 메시지 전송 반복
for (int i = 0; i < 20; i++) {
String message = "Test message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "myKey", message);
producer.send(record);
}
// 트랜잭션 커밋
producer.commitTransaction();
} catch (Exception e) { // 트랜잭션 롤백
producer.abortTransaction();
e.printStackTrace();
} finally{ // Producer 종료
producer.close();
}
}
}
- InitTransactions(): 이 메소드는 프로듀서의 트랜잭션을 초기화. 다른 트랜잭션 관련 메소드를 사용하기 전에 반드시 호출해야한다.
- BeginTransaction(): 이 메소드는 새로운 트랜잭션을 시작. 이 메소드가 호출되면 프로듀서가 보낸 모든 메시지가 CommitTransaction() 또는 AbortTransaction()이 호출될 때까지 현재 트랜잭션의 일부가 된다.
- CommitTransaction(): 이 메소드는 현재 트랜잭션을 커밋. 즉, 트랜잭션의 일부로 전송된 모든 메시지가 각각의 Kafka 토픽에 기록된다.
- AbortTransaction(): 이 메소드는 현재 트랜잭션을 중단. 즉, 트랜잭션의 일부로 전송된 모든 메시지가 각각의 Kafka 토픽에 기록되지 않는다. ( 롤백을 하는 것. 이 함수는 트랜잭션을 중단하고 모든 변경 사항을 되돌리는 역할을 한다. 이 함수는 트랜잭션 중에 문제가 발생했을 때 호출된다.)
Consumer sample 코드
- Kafka transaction 을 위해 필요한 설정에서 Consumer에서 필요한 설정
- ISOLATION_LEVEL_CONFIG 는 read_committed 으로 설정
producer 가 broker 로 보낸 데이터 중 transaction 이 완벽하게 완료된(커밋된) 데이터에 대해서만 읽을 수 있다. read_committed 가 아니면, 메세지는 보내졌는데 커밋이 실패했을 때 consumer poll 했을 때 메세지를 읽어오게 된다. 이는 transaction 을 사용하는 의도와 맞지 않기 때문에 read_committed 를 해줘야 poll 할 때, 이미 나가버린 메세지가 커밋이 안되어 있으면 읽지 않고 skip 시킨다. - ENABLE_AUTO_COMMIT_CONFIG 이 false 면 consume 을 한 후에 비지니스 처리 하고 commit 을 직접 찍어줘야한다. 만약 true 면 자동으로 offset에 commit을 시킨다. 하지만 위험하다. 만약 작업 중간에 에러가 발생하면 그 때의 offset 부터 다시 읽어야 하는데 자동적으로 커밋을 해서 다음콜을 했을 때 이미 이전꺼는 commit 이 되버렸기 때문에 메세지가 유실될 위험이 있다. 그래서 직접 커밋을 제어하는 방향이 안전하다.
- ISOLATION_LEVEL_CONFIG 는 read_committed 으로 설정
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String BOOT_STRAP_ADR;
@Value(value = "${spring.kafka.groupId}")
private String GROUP_ID;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_ADR);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // offset 자동 commit을 하지않도록 설정
props.put("auto.offset.reset","latest"); // offset이 없는경우는 가장 마지막의 오프셋값으로 설정
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
// Transaction
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // read_committed: 커밋된 데이터만 읽는다.
return new DefaultKafkaConsumerFactory<>(props);
}
}
- Kafka Consumer에서 트랜잭션 롤백을 위한 설정은 isolation.level 이다.
이 설정은 read_committed 또는 read_uncommitted로 설정할 수 있다.
read_committed로 설정하면 Consumer는 트랜잭션에서 커밋된 메시지만 읽을 수 있다.
read_uncommitted로 설정하면 Consumer는 커밋되지 않은 메시지도 읽을 수 있다. 이 설정은 Consumer가 트랜잭션에서 롤백된 메시지를 읽지 않도록 하는 데 사용된다.
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")와 props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")를 사용하면 __consumer_offsets의 수가 늘어나지 않는다. __consumer_offsets 토픽은 컨슈머 그룹의 오프셋 정보를 저장하는 데 사용된다. 컨슈머 그룹의 오프셋 정보는 컨슈머가 메시지를 소비할 때마다 업데이트된다.
'개인 개발 공부' 카테고리의 다른 글
Kafka 백업/이관/업그레이드 과정 정리 (0) | 2023.08.31 |
---|---|
Kafka broker, controller 분리 (0) | 2023.08.31 |
Kafka Consumer (0) | 2023.08.31 |
Kafka Stream (0) | 2023.08.31 |
Kafka Producer (1) | 2023.08.31 |
Comments