일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 타입변환
- 배열
- 증감연산자
- 명명규칙
- 다중if
- while문
- 단순if
- 안드로이드스튜디오
- 자바
- println()
- 콘솔출력문
- 삼항 연산자
- for문
- 순환문
- 산술 쉬프트 연산자
- 비교연산자
- 변수유효범위
- 논리 쉬프트 연산자
- print()
- Java
- 대입 연산자
- 문자열
- dowhile문
- 기본형호출
- 비트논리연산자
- 사용자입력Switch문
- 비정방행렬
- 다차원배열
- array
- 참조형호출
- Today
- Total
신입개발자
Kafka Stream 본문
Kafka Stream
정의 : kafka에 저장된 데이터를 처리하고 분석하기 위해 사용하는 클라이언트 라이브러리.
어떤 Topic으로 들어오는 데이터를 consume하여 kafka streams에서 제공하는 처리 로직을 통해 처리 후 다른 Topic으로 전송하거나 끝내는 동작 수행.
<kafka producer and consumer>
<kafka streams>
장점
- 카프카와 완벽 호환
- logstash, spark와 같은 오픈소스 툴과 다르게 카프카 버전에 맞춰 호환을 제공한다.
- 데이터 유실과 중복처리 되지 않고 딱 1번만 처리되는 것을 보장한다.
- 스케줄링 도구가 필요 없다.
- spark의 경우 클러스터 관리자, 리소스 관리자, 서버 등 필요하지만 kafka streams는 streams application만 가지고 사용할 수 있다.
- streams DSL과 processor API 제공
- 로컬 상태 저장소를 사용한다.
- rocksDB(디스크 기반)를 로컬에서 사용하여 상태를 저장하거나 in-memory DB(HashMap)를 사용한다.
- 로컬 DB에 저장한 상태에 대한 변환 정보는 카프카 변경 로그에 저장된다. → 프로세스에 장애가 발생하더라도 상태가 저장되어 있어 빠르게 장애 복구를 할 수 있다.
maven dependency
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.5.0</version> </dependency> |
Kafka Stream 데이터 처리
카프카 스트림 데이터를 처리할 때 데이터가 너무 많아 처리시간이 오래 걸리는 경우 인스턴스 추가로 해결할 수 있다.
추가 시 kafka cluster 에서 자동으로 kafka stream에게 데이터를 나눠 전송하여 처리한다.
늘릴 수 있는 인터페이스의 개수는 파티션의 개수이다.( 그 이상도 늘릴 수는 있지만 idle상태로 유지한다.)
- KStream(Insert)
- 레코드의 흐름 표현
- 키와 메시지 값으로 구성
- 데이터를 조회하게 되면 토픽에 있는 모든 레코드가 출력 → 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용.
- KTable(Update)
- 메시지 키를 묶어서 사용
- KStream은 토픽의 모든 레코드를 조회할 수 있지만 KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용
- 새롭게 들어온 데이터에 해당하는 키값이 Table에 존재하면 대체된다.
- Global KTable : ( UPDATE 또는 INSERT/UPDATE(레코드 값이 아닌 경우 null) 또는 DELETE(값이 인 경우 null))
- KTable과 동일하게 메시지 키를 기준으로 묶어서 사용
- KTable로 선언된 토픽은 1개 파티션 → 1개 태스크
- GlobalKTable은 GlobalStreamThread로 사용
- GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용되는 차이가 있음.
- GlobalKTable을 설명하기 가장 좋은 예는 KStream 과 KTable 이 데이터 조인 을 수행할 때
- KStream과 KTable이 조인 을 수행하려면 co-partitioning이 되어있어야 하는데 이를 GlobalKTable을 이용하면 해결 가능
- 모든 노드가 모두 Global data를 갖고 있기 때문에 용량이 적은 데이터에 사용하기 적합
구현 방법
- Streams DSL: high-level api. map, join, window 같은 메서드 제공, KStream, KTable, GlobalKTable 제공
@Configuration public class KafkaStreamConfig { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration properties(){ Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-default"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new KafkaStreamsConfiguration(props); } } |
@Component public class KafkaStreamService { final Serde<String> STRING_SERDE = Serdes.String(); @Autowired public void kafkaStreams(StreamsBuilder streamsBuilder){ KStream<String, String> messagStream = streamsBuilder.stream("send", Consumed.with(STRING_SERDE, STRING_SERDE)); messagStream.flatMapValues(value -> Arrays.asList(value.split(" "))).to("receive"); KTable<String, Long> wordCounts = messagStream .flatMapValues(value -> { log.info("value : {}", value); return Arrays.asList(value.toLowerCase().split("\\W+")); }) .groupBy((key, value) -> { log.info("key : {} , value : {}", key, value); return value; }) .count(); wordCounts.toStream().foreach((k, v) -> { log.info("k : {}, v : {}", k, v); }); wordCounts.toStream().to("receive-count", Produced.with(Serdes.String(), Serdes.Long())); } } |
- Processor API : 프로세서를 직접 연결 및 추가하고 statestore과 직접 상호작용 가능한 low-level api. DSL보다 더 많은 유연성이 있지만 많은 수작업 필요.
@Configuration public class KafkaStreamConfig { @Bean(name = "streamConfig") public StreamsConfig streamsConfig(){ Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new StreamsConfig(props); } } |
@Component @RequiredArgsConstructor public class KafkaStreamService { private final StreamsConfig streamConfig; public StoreBuilder<KeyValueStore<String, String>> countStoreSupplier = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("Counts-2"), Serdes.String(), Serdes.String()); @Autowired public void createTopology(){ Topology topology = new Topology(); topology.addSource("Source", "topic1") .addProcessor("Processor", () -> new WordCountProcessor(), "Source") .addStateStore(countStoreSupplier, "Processor") .addSink("Sink", "topic2", "Processor"); //addSource, addSink에서 deserializer를 별도 설정 안하면 config의 default로 자동 설정 KafkaStreams streaming = new KafkaStreams(topology, streamConfig); streaming.start(); } } |
public class WordCountProcessor implements Processor<String, String, String, String>{ public KeyValueStore<String, String> kValueStore; @Override public void init(final ProcessorContext<String, String> context) { this.kValueStore = context.getStateStore("Counts-2"); context.schedule(Duration.ofMinutes(5), PunctuationType.STREAM_TIME, timestamp -> { //schedule : 이 시간 주기로 in-memory 탐색 try (final KeyValueIterator<String, String> iter = kValueStore.all()) { while (iter.hasNext()) { final KeyValue<String, String> entry = iter.next(); context.forward(new Record<>(entry.key, entry.value, timestamp)); } } }); } @Override public void process(Record<String, String> record) { final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+"); for (final String word : words) { final Integer oldValue = kValueStore.get(word) == null ? null : Integer.parseInt(kValueStore.get(word)); if (oldValue == null) { kValueStore.put(word, "1"); kValueStore.put(String.format("%s:%s", word, "result"), String.format("%s : %d", word, Integer.parseInt(kValueStore.get(word)))); } else { kValueStore.put(word, String.valueOf(oldValue + 1)); kValueStore.put(String.format("%s:%s", word, "result"), String.format("%s : %d", word, Integer.parseInt(kValueStore.get(word)))); } } } @Override public void close() { // TODO Auto-generated method stub } } |
사용 시 주의 사항
- 메시지 중복 처리 없이 단 한 번만 실행을 보장하는 option인 exactly-once같은 경우 DB등 외부 연동 없이 Kafka 내부 쓰기 작업 만으로 데이터를 처리했을 경우에만 사용할 수 있다.
- kafka streams의 경우 자동으로 kafka topic이 만들어지는 환경을 사용하지 않도록 하는 것을 권고.
- kafka cluster에 자동으로 생성되는 것이 비활성화 되어있을 수도 있다.
- 자동 생성은 replication factor와 같은 설정을 자동으로 적용시킨다. → 이 설정이 원하는 설정이 아닐 수가 있다.
state store
- in-memory
애플리케이션 내에 있기 때문에 애플리케이션이 장애로 다시 실행되는 경우 데이터가 증발
→ 변경내역을 changelog에 남기고 이 로그를 파일이 아닌 kafka topic에 남겨 오류 발생 시 로그가 들어있는 kafka topic 을 읽어서 문제가 발생하기 전으로 복구.
→ 디폴트는 changelog 작성 안됨
상태가 변경되면 변경 내역에 state store topic으로 전달. → kafka transaction을 같이 사용하면 문제가 발생해도 복구할 수 있는 시스템 구성 가능.
- persistence memory(RocksDB)
일반적으로 in-memory보다 많이 사용.
애플리케이션을 재기동해도 데이터가 유지되어있다. but 파티션의 할당이 바뀌어 담당하는 태스크가 전환되면 불필요해지거나, 새롭게 재작성이 필요하게 될 수도 있다. → RocksDB는 일정 기간 후에 자동으로 삭제된다. - processor API를 사용하는 경우 독자적인 state store을 만들 수도 있음
- changelog는 broker에 작성됨
Kafka Streams Config
https://kafka.apache.org/documentation/#streamsconfigs
필수
application.id | 애플리케이션 식별자, kafka cluster 내에서 고유해야 한다. | default None |
bootstrap.servers | kafka cluster에 대한 초기 연결 주소. ex)"kafka-broker1:9092,kafka-broker2:9092" |
default None |
추천(복원력을 위해 사용 권장)
acks | 요청 완료를 고려하기 전에 리더가 수신했어야 하는 수 | acks=1 -> acks=all 변경 권장 |
min.insync.replicas | acks=all 때, 쓰기를 승인해야 하는 최소 replica 수 | 1 -> 2 변경 권장 |
num.standby.replicas | 대기 복제본(로컬 저장소의 복사본)의 수. 작업 장애 조치의 대기 시간을 최소화하는 데 사용 | 0 -> 1 변경 권장 |
Properties streamsSettings = new Properties(); streamsSettings.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test"); streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2); streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); |
옵션(선택적 구성)
'개인 개발 공부' 카테고리의 다른 글
Kafka 고가용성 테스트 (0) | 2023.08.31 |
---|---|
Kafka Consumer (0) | 2023.08.31 |
Kafka Producer (1) | 2023.08.31 |
Apache Kafka 설치부터 실행 (0) | 2023.08.31 |
JUnit 테스트 (0) | 2023.08.31 |