신입개발자

Kafka 고가용성 테스트 본문

개인 개발 공부

Kafka 고가용성 테스트

dks_a 2023. 8. 31. 14:07

[ 노드 하나씩 비활성화하는 경우 ]

kraft 폴더 아래에 있는 server.properties를 보면 각 서버별로 node.id를 정했다.
node.id=1 은 서버1
node.id=2 은 서버2
node.id=3 은 서버3

acks 옵션

OPTION 손실율 속도 DESCRIPTION
acks =  0 프로듀서는 리더 브로커에게 메시지를 전송한 후 전송 성공 여부를 확인하지 않는다. 이 경우, 프로듀서는 메시지 전송이 성공했는지 여부를 알 수 없다.
acks  = 1 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader가 메시지를 받았는지 기다린다. follower들은 확인하지 않는다. leader가 확인 응답을 보내고, follower에게 복제가 되기 전에 leader가 fail되면, 해당 메시지는 손실될 수 있다.
acks  = all(-1) 프로듀서는 자신이 보낸 메시지에 대해 카프카의 leader와 follower(replicas)까지 받았는지 기다립니다. 최소 하나의 복제본까지 처리된 것을 확인하므로 메시지가 손실 될 확률은 거의 없다.
 
 
  • 기본값

Producer : batch.size = 16384 (두 설정 값 모두 설정하지 않아 default값)
Consumer : enable.auto.commit = false / max.poll.records = 1
내려갔다 다시 올라온 노드 : 2번(서버2IP)과 3번(서버3IP)

 

  • acks = -1(=all)

노드를 비활성화하기 전 상태 :

Topic: ackall   Partition: 0    Leader: 2       Replicas: 2,3   Isr: 3,2
Topic: ackall   Partition: 1    Leader: 3       Replicas: 3,1   Isr: 1,3
Topic: ackall   Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2

메세지가 파티션 2로 메세지가 손실 없이 모두 들어왔습니다. 총 보낸 메세지는 160개였습니다.

  • acks = 1

노드를 비활성화하기 전 상태 :

Topic: ack1     Partition: 0    Leader: 2       Replicas: 2,3   Isr: 3,2
Topic: ack1     Partition: 1    Leader: 3       Replicas: 3,1   Isr: 1,3
Topic: ack1     Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2

메세지가 파티션 2로 메세지가 손실 없이 모두 들어왔습니다. 총 보낸 메세지는 150개였습니다.

  • acks = 0

노드를 비활성화하기 전 상태 :

Topic: ack0     Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
Topic: ack0     Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
Topic: ack0     Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1

메세지 손실이 발생했습니다. 총 보낸 메세지는 210개였습니다.


Rollback Transaction 테스트
https://yangbongsoo.tistory.com/77

  • 서버1번만 살아있고 서버2번과 서버3번이 죽었을 때 rollback이 되어 메세지가 보내지지 않고 다시 서버가 살아났을 때 메세지를 보내는지 확인

Producer sample 코드

  • kafka transaction 을 위해 필요한 설정에서 producer 에서 필요한 설정
    • TRANSACTIONAL_ID_CONFIG 는 producer transaction 을 구분하기 위한 id이다.
    • ENABLE_IDEMPOTENCE_CONFIG 는 true 로 지정. producer.send해서 메세지가 kafka broker 에 들어는 갔는데 timeout 이나 broker failure 로 실패응답을 받았다면 producer 는 같은 메세지를 재발행 한다. producer 입장에서는 실패 응답을 받았으므로 메세지가 kafka broker 에 들어갔는지 안들어갔는지 알 수가 없기 때문이다. consumer 입장에서는 중복 발행이 되어 똑같은 메세지를 2번 받게 된다. 코드상으로는 한번만 send 했지만 내부 구조상 재발행 하게 되어있다. 여기서 ENABLE_IDEMPOTENCE_CONFIG 가 true 로 되어 있으면, 재발행되서 두번째 메세지가 갔을 때 같은 메세지인지 비교한다. 그리고 두번째로 온 메세지가 이전과 같은 메세지라면 버린다.
@SpringBootApplication
@Slf4j
@Configuration
public class KafkaProducerTestApplication {

	public static void main(String[] args) {
		// Kafka Producer 설정
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "서버1IP:19092,서버2IP:19092,서버3IP:19092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
		props.put(ProducerConfig.ACKS_CONFIG, "all");
		props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
		props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx");

		KafkaProducer<String, String> producer = new KafkaProducer<>(props);

		// 트랜잭션 초기화 
		producer.initTransactions();

		try { // 트랜잭션 시작 
			producer.beginTransaction();

			// 메시지 전송 반복
			for (int i = 0; i < 20; i++) {
				String message = "Test message " + i;
				ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "myKey", message);
				producer.send(record);
			}
			// 트랜잭션 커밋 
			producer.commitTransaction(); 
			} catch (Exception e) { // 트랜잭션 롤백
			producer.abortTransaction();
			e.printStackTrace();
		} finally{ // Producer 종료
			producer.close();
		}
	}

}
  • InitTransactions(): 이 메소드는 프로듀서의 트랜잭션을 초기화. 다른 트랜잭션 관련 메소드를 사용하기 전에 반드시 호출해야한다.
  • BeginTransaction(): 이 메소드는 새로운 트랜잭션을 시작. 이 메소드가 호출되면 프로듀서가 보낸 모든 메시지가 CommitTransaction() 또는 AbortTransaction()이 호출될 때까지 현재 트랜잭션의 일부가 된다.
  • CommitTransaction(): 이 메소드는 현재 트랜잭션을 커밋. 즉, 트랜잭션의 일부로 전송된 모든 메시지가 각각의 Kafka 토픽에 기록된다.
  • AbortTransaction(): 이 메소드는 현재 트랜잭션을 중단. 즉, 트랜잭션의 일부로 전송된 모든 메시지가 각각의 Kafka 토픽에 기록되지 않는다. ( 롤백을 하는 것. 이 함수는 트랜잭션을 중단하고 모든 변경 사항을 되돌리는 역할을 한다. 이 함수는 트랜잭션 중에 문제가 발생했을 때 호출된다.)

Consumer sample 코드

  • Kafka transaction 을 위해 필요한 설정에서 Consumer에서 필요한 설정
    • ISOLATION_LEVEL_CONFIG 는 read_committed 으로 설정
      producer 가 broker 로 보낸 데이터 중 transaction 이 완벽하게 완료된(커밋된) 데이터에 대해서만 읽을 수 있다. read_committed 가 아니면, 메세지는 보내졌는데 커밋이 실패했을 때 consumer poll 했을 때 메세지를 읽어오게 된다. 이는 transaction 을 사용하는 의도와 맞지 않기 때문에 read_committed 를 해줘야 poll 할 때, 이미 나가버린 메세지가 커밋이 안되어 있으면 읽지 않고 skip 시킨다.
    • ENABLE_AUTO_COMMIT_CONFIG 이 false 면 consume 을 한 후에 비지니스 처리 하고 commit 을 직접 찍어줘야한다. 만약 true 면 자동으로 offset에 commit을 시킨다. 하지만 위험하다. 만약 작업 중간에 에러가 발생하면 그 때의 offset 부터 다시 읽어야 하는데 자동적으로 커밋을 해서 다음콜을 했을 때 이미 이전꺼는 commit 이 되버렸기 때문에 메세지가 유실될 위험이 있다. 그래서 직접 커밋을 제어하는 방향이 안전하다.
@Configuration
public class KafkaConsumerConfig {

	@Value(value = "${kafka.bootstrapAddress}")
    private String BOOT_STRAP_ADR;

	@Value(value = "${spring.kafka.groupId}")
    private String GROUP_ID;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_ADR); 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // offset 자동 commit을 하지않도록 설정
        props.put("auto.offset.reset","latest"); // offset이 없는경우는 가장 마지막의 오프셋값으로 설정
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        
        // Transaction
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // read_committed: 커밋된 데이터만 읽는다.
        return new DefaultKafkaConsumerFactory<>(props);
    }
}
  • Kafka Consumer에서 트랜잭션 롤백을 위한 설정은 isolation.level 이다.
    이 설정은 read_committed 또는 read_uncommitted로 설정할 수 있다.
    read_committed로 설정하면 Consumer는 트랜잭션에서 커밋된 메시지만 읽을 수 있다.
    read_uncommitted로 설정하면 Consumer는 커밋되지 않은 메시지도 읽을 수 있다. 이 설정은 Consumer가 트랜잭션에서 롤백된 메시지를 읽지 않도록 하는 데 사용된다.
  • props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")를 사용하면 __consumer_offsets의 수가 늘어나지 않는다. __consumer_offsets 토픽은 컨슈머 그룹의 오프셋 정보를 저장하는 데 사용된다. 컨슈머 그룹의 오프셋 정보는 컨슈머가 메시지를 소비할 때마다 업데이트된다.

'개인 개발 공부' 카테고리의 다른 글

Kafka 백업/이관/업그레이드 과정 정리  (0) 2023.08.31
Kafka broker, controller 분리  (0) 2023.08.31
Kafka Consumer  (0) 2023.08.31
Kafka Stream  (0) 2023.08.31
Kafka Producer  (1) 2023.08.31
Comments