반응형

스프링부트(SpringBoot)로 카프카에서 데이터를 consume에 처리해야 하는 일이 있어 작업 중 발생한 에러에 대해 간단히 남겨본다...

나중에 같은 실수를 하지 않기 위해...


일단 현재 사용하고 있는 카프카 버전은 0.8.2.0이고 spring-integration-kafka를 사용하였다.

실제 클라우데라에 설치된 카프카 버전


카프카 낮은 버전을 사용하고 있어 요즘 spring-kafka 연동 가이드로는 에러가 발생해 삽질을 좀 하였다...

시간이 된다면 0.8.2.2 consume 모듈을 git에 올려서 링크 걸어보도록 하겠다.


다음과 같이 KafkaConfig를 설정해주고


[ 스프링 설정 및 코드 ]

@EnableIntegration
@Configuration
public class KafkaIntegration {

private static final String BOOTSTRAP_SERVER = "serverIp:2181";
private static final String ZOOKEEPER_CONNECT = "serverIp:2181";

@Getter
@Component
public static class KafkaConfig {
private String topic = "data_log";
private String brokerAddress = BOOTSTRAP_SERVER;
private String zookeeperAddress = ZOOKEEPER_CONNECT;

KafkaConfig(){}

public KafkaConfig(String t, String b, String zk) {
this.topic = t;
this.brokerAddress = b;
this.zookeeperAddress = zk;
}
}
}


Consumer 빈 등록

@Configuration
public class ConsumerConfiguration {

@Autowired
private KafkaIntegration.KafkaConfig kafkaConfig;

@Bean
public IntegrationFlow consumer() {

KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(props -> props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("ectc_dsp_test", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
.maxMessages(10).valueDecoder(String::new));

Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer =
e -> e.poller(p -> p.fixedDelay(100));

return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(
e -> System.out.println((e.getKey() + '=' + e.getValue())));
return null;
})
.get();
}

}


으로 등록 후 어플리케이션을 실행하면 다음과 같은 에러메시지가 발생하였다.


[ 발생한 에러메세지 ]

kafka.common.KafkaException: fetching topic metadata for topics [Set(dsp_log)] from broker [ArrayBuffer(id:245,host:eedkaf-dmp001.svr.net,port:9092, id:94,host:eedkaf-dmp002.svr.net,port:9092, id:95,host:eedkaf-dmp003.svr.net,port:9092)] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [kafka_2.11-0.8.2.0.jar:na]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.producer.SyncProducer.send(SyncProducer.scala:113) ~[kafka_2.11-0.8.2.0.jar:na]

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ~[kafka_2.11-0.8.2.0.jar:na]

... 3 common frames omitted


분명 로그를 봐도 내가 KafkaConfig에서 설정해준 서버 IP와 정상적인 커넥션을 맺었는데 왜이런단 말인가???

2018-11-20 12:12:05.668  INFO 87935 --- [161.26.70:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server 내가지정한서버IP:2181, sessionid = 0x366e7594a3918e9


[ 해 결 ]

주키퍼(Zookeeper) 내부적으로 클러스터간 통신시 혹은 zookeeper to kafka간 통신시 서버의 IP보다는 호스트명으로 서로를 인지한다는 말?을 들은적이 있어 로컬 host파일에 서버의 호스트와 IP를  등록하고 다시 실행해보았더니 정상적으로 카프카에서 메세지를 consume하는 것을 확인할 수 있었다.


혹시나 다음과 같은 문제가 발생한다면 host파일에 서버의 호스트명과 IP를 등록 후 다시 해보길...


정확한 원인은 나중에 관련한 문서나 Zookeeper를 좀더 깊게 공부하게 되어 발견하게 된다면 추후 또 포스팅해보도록 하겠습니다. 


ref : https://spring.io/blog/2015/04/15/using-apache-kafka-for-integration-and-data-processing-pipelines-with-spring

반응형

+ Recent posts