신입개발자

Kafka Stream 본문

개인 개발 공부

Kafka Stream

dks_a 2023. 8. 31. 14:00

Kafka Stream

정의 : kafka에 저장된 데이터를 처리하고 분석하기 위해 사용하는 클라이언트 라이브러리.

어떤 Topic으로 들어오는 데이터를 consume하여 kafka streams에서 제공하는 처리 로직을 통해 처리 후 다른 Topic으로 전송하거나 끝내는 동작 수행.

<kafka producer and consumer>

<kafka streams>

장점

  1. 카프카와 완벽 호환
    1. logstash, spark와 같은 오픈소스 툴과 다르게 카프카 버전에 맞춰 호환을 제공한다.
  2. 데이터 유실과 중복처리 되지 않고 딱 1번만 처리되는 것을 보장한다.
  3. 스케줄링 도구가 필요 없다.
    1. spark의 경우 클러스터 관리자, 리소스 관리자, 서버 등 필요하지만 kafka streams는 streams application만 가지고 사용할 수 있다.
  4. streams DSL과 processor API 제공
  5. 로컬 상태 저장소를 사용한다.
    1. rocksDB(디스크 기반)를 로컬에서 사용하여 상태를 저장하거나 in-memory DB(HashMap)를 사용한다.
    2. 로컬 DB에 저장한 상태에 대한 변환 정보는 카프카 변경 로그에 저장된다. → 프로세스에 장애가 발생하더라도 상태가 저장되어 있어 빠르게 장애 복구를 할 수 있다.

maven dependency

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.5.0</version>
</dependency>

Kafka Stream 데이터 처리

카프카 스트림 데이터를 처리할 때 데이터가 너무 많아 처리시간이 오래 걸리는 경우 인스턴스 추가로 해결할 수 있다.

추가 시 kafka cluster 에서 자동으로 kafka stream에게 데이터를 나눠 전송하여 처리한다.

늘릴 수 있는 인터페이스의 개수는 파티션의 개수이다.( 그 이상도 늘릴 수는 있지만 idle상태로 유지한다.)

 
 


  • KStream(Insert)
    • 레코드의 흐름 표현
    • 키와 메시지 값으로 구성
    • 데이터를 조회하게 되면 토픽에 있는 모든 레코드가 출력 → 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용.
  • KTable(Update)
    • 메시지 키를 묶어서 사용
    • KStream은 토픽의 모든 레코드를 조회할 수 있지만 KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용
    • 새롭게 들어온 데이터에 해당하는 키값이 Table에 존재하면 대체된다.
  • Global KTable : ( UPDATE 또는 INSERT/UPDATE(레코드 값이 아닌 경우 null) 또는 DELETE(값이 인 경우 null))
    • KTable과 동일하게 메시지 키를 기준으로 묶어서 사용
    • KTable로 선언된 토픽은 1개 파티션 → 1개 태스크
    • GlobalKTable은 GlobalStreamThread로 사용
    • GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용되는 차이가 있음.
      • GlobalKTable을 설명하기 가장 좋은 예는 KStreamKTable데이터 조인 을 수행할 때
      • KStream과 KTable이 조인 을 수행하려면 co-partitioning이 되어있어야 하는데 이를 GlobalKTable을 이용하면 해결 가능
      • 모든 노드가 모두 Global data를 갖고 있기 때문에 용량이 적은 데이터에 사용하기 적합

구현 방법

  1. Streams DSL: high-level api. map, join, window 같은 메서드 제공, KStream, KTable, GlobalKTable 제공
@Configuration
public class KafkaStreamConfig {
  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public KafkaStreamsConfiguration properties(){
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-default");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new KafkaStreamsConfiguration(props);
  }
}
@Component
public class KafkaStreamService {
  final Serde<String> STRING_SERDE = Serdes.String();
   @Autowired   public void kafkaStreams(StreamsBuilder streamsBuilder){     KStream<String, String> messagStream = streamsBuilder.stream("send", Consumed.with(STRING_SERDE, STRING_SERDE));     messagStream.flatMapValues(value -> Arrays.asList(value.split(" "))).to("receive");     KTable<String, Long> wordCounts = messagStream     .flatMapValues(value -> {       log.info("value : {}", value);       return Arrays.asList(value.toLowerCase().split("\\W+"));     })     .groupBy((key, value) -> {       log.info("key : {} , value : {}", key, value);       return value;     })     .count();     wordCounts.toStream().foreach((k, v) -> {       log.info("k : {}, v : {}", k, v);     });     wordCounts.toStream().to("receive-count", Produced.with(Serdes.String(), Serdes.Long()));   }


}
  1. Processor API : 프로세서를 직접 연결 및 추가하고 statestore과 직접 상호작용 가능한  low-level api. DSL보다 더 많은 유연성이 있지만 많은 수작업 필요.
@Configuration
public class KafkaStreamConfig {
  @Bean(name = "streamConfig")
  public StreamsConfig streamsConfig(){
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new StreamsConfig(props);
  }
}
@Component
@RequiredArgsConstructor
public class KafkaStreamService {
  private final StreamsConfig streamConfig;
  public StoreBuilder<KeyValueStore<String, String>> countStoreSupplier =
  Stores.keyValueStoreBuilder(
    Stores.inMemoryKeyValueStore("Counts-2"),
    Serdes.String(),
    Serdes.String());

  @Autowired
  public void createTopology(){
    Topology topology = new Topology();
    topology.addSource("Source", "topic1")
            .addProcessor("Processor", () -> new WordCountProcessor(), "Source")
            .addStateStore(countStoreSupplier, "Processor")
            .addSink("Sink", "topic2", "Processor");
//addSource, addSink에서 deserializer를 별도 설정 안하면 config의 default로 자동 설정

    KafkaStreams streaming = new KafkaStreams(topology, streamConfig);
    streaming.start();
  }

}
public class WordCountProcessor implements Processor<String, String, String, String>{
  public KeyValueStore<String, String> kValueStore;
  @Override
  public void init(final ProcessorContext<String, String> context) {
    this.kValueStore = context.getStateStore("Counts-2");
    context.schedule(Duration.ofMinutes(5), PunctuationType.STREAM_TIME, timestamp -> { //schedule : 이 시간 주기로 in-memory 탐색

      try (final KeyValueIterator<String, String> iter = kValueStore.all()) {
        while (iter.hasNext()) {
          final KeyValue<String, String> entry = iter.next();
          context.forward(new Record<>(entry.key, entry.value, timestamp));
        }
      }
    });
  }

  @Override
  public void process(Record<String, String> record) {
 
    final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");

        for (final String word : words) {
            final Integer oldValue = kValueStore.get(word) == null ? null : Integer.parseInt(kValueStore.get(word));
 
            if (oldValue == null) {
                kValueStore.put(word, "1");
                kValueStore.put(String.format("%s:%s", word, "result"), String.format("%s : %d", word, Integer.parseInt(kValueStore.get(word))));

            } else {
                kValueStore.put(word, String.valueOf(oldValue + 1));
                kValueStore.put(String.format("%s:%s", word, "result"), String.format("%s : %d", word, Integer.parseInt(kValueStore.get(word))));
             
            }
        }
  }

  @Override
  public void close() {
    // TODO Auto-generated method stub
    }
}

사용 시 주의 사항

  • 메시지 중복 처리 없이 단 한 번만 실행을 보장하는 option인 exactly-once같은 경우 DB등 외부 연동 없이 Kafka 내부 쓰기 작업 만으로 데이터를 처리했을 경우에만 사용할 수 있다.
  • kafka streams의 경우 자동으로 kafka topic이 만들어지는 환경을 사용하지 않도록 하는 것을 권고.
  1. kafka cluster에 자동으로 생성되는 것이 비활성화 되어있을 수도 있다.
  2. 자동 생성은 replication factor와 같은 설정을 자동으로 적용시킨다. → 이 설정이 원하는 설정이 아닐 수가 있다.

state store

  1. in-memory

애플리케이션 내에 있기 때문에 애플리케이션이 장애로 다시 실행되는 경우 데이터가 증발

 

→ 변경내역을 changelog에 남기고 이 로그를 파일이 아닌 kafka topic에 남겨 오류 발생 시 로그가 들어있는 kafka topic 을 읽어서 문제가 발생하기 전으로 복구.

→ 디폴트는 changelog 작성 안됨

 

상태가 변경되면 변경 내역에 state store topic으로 전달. → kafka transaction을 같이 사용하면 문제가 발생해도 복구할 수 있는 시스템 구성 가능.

  • persistence memory(RocksDB)
    일반적으로 in-memory보다 많이 사용.
    애플리케이션을 재기동해도 데이터가 유지되어있다. but 파티션의 할당이 바뀌어 담당하는 태스크가 전환되면 불필요해지거나, 새롭게 재작성이 필요하게 될 수도 있다. → RocksDB는 일정 기간 후에 자동으로 삭제된다.

  • processor API를 사용하는 경우 독자적인 state store을 만들 수도 있음
  • changelog는 broker에 작성됨

Kafka Streams Config 

https://kafka.apache.org/documentation/#streamsconfigs

필수

application.id 애플리케이션 식별자, kafka cluster 내에서 고유해야 한다. default None
bootstrap.servers kafka cluster에 대한 초기 연결 주소. 
ex)"kafka-broker1:9092,kafka-broker2:9092"
default None

추천(복원력을 위해 사용 권장)

acks 요청 완료를 고려하기 전에 리더가 수신했어야 하는 수 acks=1 -> acks=all 변경 권장
min.insync.replicas acks=all 때, 쓰기를 승인해야 하는 최소 replica 수 1 -> 2 변경 권장
num.standby.replicas 대기 복제본(로컬 저장소의 복사본)의 수. 작업 장애 조치의 대기 시간을 최소화하는 데 사용 0 -> 1 변경 권장
Properties streamsSettings = new Properties();
streamsSettings.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

옵션(선택적 구성)

https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams#recommended-configuration-parameters-for-resiliency

'개인 개발 공부' 카테고리의 다른 글

Kafka 고가용성 테스트  (0) 2023.08.31
Kafka Consumer  (0) 2023.08.31
Kafka Producer  (1) 2023.08.31
Apache Kafka 설치부터 실행  (0) 2023.08.31
JUnit 테스트  (0) 2023.08.31
Comments