Spring集成Kafka小记

POM引入spring-kafka

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
</dependency>

这个地方版本很重要。引入什么版本的依赖需要看安装的是什么版本的Kafka。版本不对,可能跑不通。

1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)

2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持

版本对应关系见http://spring.io/projects/spring-kafka

下面这句话重要,我的即是参考这句话配的<spring-kafka.version>1.3.5.RELEASE</spring-kafka.version>:

Spring集成Kafka小记

application-dev.yaml中,spring下配置:

kafka:
  #bootstrap-servers: nodex.abdt.com:9092,nodey.abdt.com:9092,nodez.abdt.com:9092
  bootstrap-servers: v-host:9092
  consumer:
    enable-auto-commit: true
    group-id: metaDataBloodLineageConsumer
    auto-offset-reset: latest
    keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
    valueDserializer: org.apache.kafka.common.serialization.StringDeserializer

nodex环境需keberos认证,大家开发环境统一存jaas.conf麻烦,所以改用无kb认证的v-host对付测试。

上述配置完成后,代码:

@ApiOperation("监听kafka topic 数据质量检查结果。检查结果msg为ums格式。")
@KafkaListener(topics = {"dataChkResult"})
public int xxx(String dataChkResult) {
    yyy.zzz(dataChkResult);
    return 0;
}

能监听到msg。

之前就通的。一个月以后再用的时候,有问题,xxx:9092 disconnected ……

或者虽然不报错,但发消息监听不到,没动静。

折腾完,可以收到消息了。聊记。以备后续又出问题好迅速回血。

 

下面这个文章的介绍算较简明且全面:

SpringBoot整合并简单使用Kerberos认证的Kafka

https://blog.****.net/justry_deng/article/details/88387898