Spring集成Kafka小记
POM引入spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency>
这个地方版本很重要。引入什么版本的依赖需要看安装的是什么版本的Kafka。版本不对,可能跑不通。
1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)
2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持
版本对应关系见http://spring.io/projects/spring-kafka。
下面这句话重要,我的即是参考这句话配的<spring-kafka.version>1.3.5.RELEASE</spring-kafka.version>:
application-dev.yaml中,spring下配置:
kafka: #bootstrap-servers: nodex.abdt.com:9092,nodey.abdt.com:9092,nodez.abdt.com:9092 bootstrap-servers: v-host:9092 consumer: enable-auto-commit: true group-id: metaDataBloodLineageConsumer auto-offset-reset: latest keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
nodex环境需keberos认证,大家开发环境统一存jaas.conf麻烦,所以改用无kb认证的v-host对付测试。
上述配置完成后,代码:
@ApiOperation("监听kafka topic 数据质量检查结果。检查结果msg为ums格式。") @KafkaListener(topics = {"dataChkResult"}) public int xxx(String dataChkResult) { yyy.zzz(dataChkResult); return 0; }
能监听到msg。
之前就通的。一个月以后再用的时候,有问题,xxx:9092 disconnected ……
或者虽然不报错,但发消息监听不到,没动静。
折腾完,可以收到消息了。聊记。以备后续又出问题好迅速回血。
下面这个文章的介绍算较简明且全面:
SpringBoot整合并简单使用Kerberos认证的Kafka