반응형

스프링부트(SpringBoot)로 카프카에서 데이터를 consume에 처리해야 하는 일이 있어 작업 중 발생한 에러에 대해 간단히 남겨본다...

나중에 같은 실수를 하지 않기 위해...


일단 현재 사용하고 있는 카프카 버전은 0.8.2.0이고 spring-integration-kafka를 사용하였다.

실제 클라우데라에 설치된 카프카 버전


카프카 낮은 버전을 사용하고 있어 요즘 spring-kafka 연동 가이드로는 에러가 발생해 삽질을 좀 하였다...

시간이 된다면 0.8.2.2 consume 모듈을 git에 올려서 링크 걸어보도록 하겠다.


다음과 같이 KafkaConfig를 설정해주고


[ 스프링 설정 및 코드 ]

@EnableIntegration
@Configuration
public class KafkaIntegration {

private static final String BOOTSTRAP_SERVER = "serverIp:2181";
private static final String ZOOKEEPER_CONNECT = "serverIp:2181";

@Getter
@Component
public static class KafkaConfig {
private String topic = "data_log";
private String brokerAddress = BOOTSTRAP_SERVER;
private String zookeeperAddress = ZOOKEEPER_CONNECT;

KafkaConfig(){}

public KafkaConfig(String t, String b, String zk) {
this.topic = t;
this.brokerAddress = b;
this.zookeeperAddress = zk;
}
}
}


Consumer 빈 등록

@Configuration
public class ConsumerConfiguration {

@Autowired
private KafkaIntegration.KafkaConfig kafkaConfig;

@Bean
public IntegrationFlow consumer() {

KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(props -> props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("ectc_dsp_test", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
.maxMessages(10).valueDecoder(String::new));

Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer =
e -> e.poller(p -> p.fixedDelay(100));

return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(
e -> System.out.println((e.getKey() + '=' + e.getValue())));
return null;
})
.get();
}

}


으로 등록 후 어플리케이션을 실행하면 다음과 같은 에러메시지가 발생하였다.


[ 발생한 에러메세지 ]

kafka.common.KafkaException: fetching topic metadata for topics [Set(dsp_log)] from broker [ArrayBuffer(id:245,host:eedkaf-dmp001.svr.net,port:9092, id:94,host:eedkaf-dmp002.svr.net,port:9092, id:95,host:eedkaf-dmp003.svr.net,port:9092)] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [kafka_2.11-0.8.2.0.jar:na]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.send(SyncProducer.scala:113) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ~[kafka_2.11-0.8.2.0.jar:na]

... 3 common frames omitted


분명 로그를 봐도 내가 KafkaConfig에서 설정해준 서버 IP와 정상적인 커넥션을 맺었는데 왜이런단 말인가???

2018-11-20 12:12:05.668  INFO 87935 --- [161.26.70:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server 내가지정한서버IP:2181, sessionid = 0x366e7594a3918e9


[ 해 결 ]

주키퍼(Zookeeper) 내부적으로 클러스터간 통신시 혹은 zookeeper to kafka간 통신시 서버의 IP보다는 호스트명으로 서로를 인지한다는 말?을 들은적이 있어 로컬 host파일에 서버의 호스트와 IP를  등록하고 다시 실행해보았더니 정상적으로 카프카에서 메세지를 consume하는 것을 확인할 수 있었다.


혹시나 다음과 같은 문제가 발생한다면 host파일에 서버의 호스트명과 IP를 등록 후 다시 해보길...


정확한 원인은 나중에 관련한 문서나 Zookeeper를 좀더 깊게 공부하게 되어 발견하게 된다면 추후 또 포스팅해보도록 하겠습니다. 


ref : https://spring.io/blog/2015/04/15/using-apache-kafka-for-integration-and-data-processing-pipelines-with-spring

반응형
반응형

카프카(KAFKA) 데이터 처리방식의 특화된 기능


1. KAFKA는 기존 메시징 시스템과는 달리 메시지를 메모리대신 파일 시스템에 쌓아두고 관리한다.


2. 디스크에 기반한 영속적인 저장 방식을 사용하지만 페이지 캐시를 활용하여 높은 처리량을 제공하는 인메모리 방식에 가깝다


3. 메모리에 별도의 캐시를 구현하지 않고 OS의 페이지 캐시에 위임하고 OS가 알아서 서버의 유휴 메모리를 페이지 캐시로 사용하여 앞으로 필요한 것으로 예상되는 메시지들을 미리 읽어들여(readahead)디스크 읽기 성능을 향상 시킨다.


4. Kafka 프로세스가 직접 캐시를 관리하지 않고 OS에 위임하기 때문에 프로세스를 재시작 하더라도 OS의 페이지 캐시는 그대로 남아있기 때문에 프로세스 재시작 후 캐시를 워밍업할 필요가 없다는 장점이 있다. 


5. 여러 consumer가 한 topic으로부터 여러 번에 걸쳐 메시지를 가져올 수 있다. 이러한 방식이 가능한 이유는 클라이언트가 해당 queue에서 어느 부분까지 데이터를 받아갔는지 위치를 알려주는 'offset'을 관리하기 때문이다.


6. 메시지를 메모리에 저장하지 않기 때문에 메시지가 JVM 객체로 변환되면서 크기가 커지는 것을 방지할 수 있고 JVM의 GC로 인한 성능 저하를 피할 수 있다.

반응형

'Bigdata > Kafka' 카테고리의 다른 글

[Kafka] 큐잉 시스템과 카프카의 차이  (0) 2017.06.10
반응형

데이터 엔지니어로 살아가기 163일째(kafka, camus) 


기존에 kafka가 2개의 broker로 동작하고 있었는데 broker한대를 추가하게 되었다.


broker추가와 동시에 partition, replica 설정을 변경하는 작업을 하였다.


알파에서 테스트가 진행되었고 리얼에 적용을 하였는데 특정 topic으로 데이터를 처리하지 못하는 이슈가 발생하였다.


topic중 nginx - fluentd를 통해서 들어오는 데이터는 정상적으로 받고 있었지만


api서버에서 kafkaAppender로 로그를 produce하고 있는 topic이 정상적으로 동작하지 않았다.


원인은 api서버의 hosts파일에 kafka브로커들의 ip와 hostname이 등록되어 있지 않았기 때문이다.


기본적으로 api통신을 할 경우 acl이 뚫려 있고 어플리케이션 내부에서 ip로 목적지가 등록이 되어 있는 경우


문제없이 동작하여야 하지만 클라우데라를 통해 운영되고 있는 주키퍼, 카프카 등 하둡에코시스템이 호스트네임을 기반으로


서로 통신을 하고 있었기 때문에 ip정보만으로는 카프카 브로커를 찾아가지 못하는 문제가 있었다.


이번 문제로 많은 시간을 삽질을 하게되었지만 카프카 offset부터 camus가 어떤식으로 offset을 관리하고 있는지에 대해서


좀 더 깊게 들여다 볼수 있는 시간이 되었다.


camus는 현재 gobblin이라는 프로젝트로 넘어가 관리되고 있는 듯 싶었다.


카프카 특정 topic의 데이터를 hdfs에 적재해야한다면 gobblin을 검토해보면 좋을 듯 싶다.


이상~

반응형
반응형

큐잉 시스템과 카프카가 다른점

분명히 카프카는 메시지들이 수신된 순서대로 처리되도록 보장하기 위해 많은 문제를 겪는 ActiveMQ나 RabbitMQ 같은 큐잉 시스템이 아니다.

카프카의 파티셔닝 시스템은 이런 구조를 유지하지 않는다. 특정한 토픽의 파티션에 대한 쓰기와 읽기 순서에 대한 정의가 없으므로 클라이언트는 메시지가 쓰여진 순서와 다르게 파티션에서 읽을 수도 있다. 게다가 생산자를 비동기로 구현하는 일이 흔해서 한 파티션으로 보내진 메시지는 (비록 응답대기시간이나 비결정적 이벤트의 차이로 인해 먼저 발생하더라도) 또 다른 파티션으로 보내진 메시지 이후에 쓰여질 수도 있다.


카프카는 또한 메시지 소비자를 다루는 방법에서도 많은 큐잉 시스템과는 다르다. 대부분의 큐잉 시스템에서 메시지는 소비되었을 때 시스템에서 제거된다. 카프카는 메시지를 제거하는 메커니즘이 없는 대신, 소비한 마지막 메시지의 오프셋을 지속적으로 파악하기 위해 소비자에 의존한다. 로그는 카프카 설정의 log.retention.hours 설정에 의해 삭제된다.


[ 참고 ] 실시간 분석의 모든 것


반응형

'Bigdata > Kafka' 카테고리의 다른 글

카프카(KAFKA) 데이터 처리방식의 특화된 기능  (1) 2017.08.30
반응형

오늘은 카프카에서 hdfs 데이터를 적재하는 카뮤(camus) 대해서 학습하고 생각해보는 시간을 가졌다.


아직 카뮤? 카뮈? 내부 아키텍쳐가 어떻게 설계되어져 있는지 확인하지는 못했지만 카뮈를 이용하면 카프카에서 생각보다 쉽게 hdfs 적재가 가능하다. 카뮈가 아니였다면? 자바로 카프카 컨슈머를 구현하고 hdfs 적재하는 로직처리를 해줘야 겠지?


그렇게 어플리케이션을 개발하더라도 카뮤에서 카프카 offset 확인해 데이터 누락을 최소화해주는 부분에 대한 구현은 힘들었을 같다.


물론 자바로도 할수 있겠지만....카프카에서 offset정보를 가져와서 처리할 있는 api 제공하는지는 잘모르겠다.


아무쪼록 깊이 파고들어 카뮤를 이해하고 실제로 카프카 토픽의 데이터를 받아오는 작업을 진행해보자!


백문이불여일행

반응형
반응형

앞으로라도 간단히 데이터 엔지니어로서 일을 하면서 경험했던 것들 생각들을 가볍게 공유해보고자 한다.


오늘은 데이터 유입쪽에 대한 파악작업을 진행하였다. 


nginx http요청으로 매체쪽 태그매니저로부터 들어오는 로그들이 어떻게 처리되는지


nginx 설정은 어떻게 되는지에 대해서 확인했다. 


기존 웹 서버 개발할 때는 아파치만 쓰다가 nignx를 보니 뭐 정확하게 어떻게 돌아가는지 정확한 이해는 안갔지만


대충 설정들을 보니 예상정도는 할 수 있었다. 


오늘 이해하고 파악한 부분은 nginx -> fluentd -> kafka 로 통하는 flow에 대한 전반적인 이해를 하였고


실제 알파 클러스터에서 설정을 변경해보며 이것저것 테스트해보았다. 


생각보다 td-agent 쪽에서도 offset 작업이 잘 이루어져 실제 nginx로 데이터가 들어왔을 때 td-agent가 죽어 있었더라도


다시 td-agent를 시작하게되면 읽어오지 않은 데이터부터 읽어 카프카로 전송하더라


아직 유입쪽에 쿠키발급부분이라던지 확인할 부분이 많지만 전반적인 유입 flow와 설정들을 이해한것만으로도 굉장히 뿌듯하다.





반응형

+ Recent posts