springboot+kafka简易入门Demo
1.pom.xml中加入kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.application.properties中配置kafka:
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.topic=mqtt_location_data
3.kafka消息生产者:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
/**
* @description: kafka生产者
* @author:
* @create: 2019-01-29 17:38
**/
@Component
@Slf4j
public class KafkaProducer {
@Autowired
KafkaTemplate kafkaTemplate;
@Value("${spring.kafka.topic}")
private String topic;
/**
* 发送kafka消息
*
* @param jsonString
*/
public void send(String jsonString) {
ListenableFuture future = kafkaTemplate.send(topic, jsonString);
//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));
}
}
4.kafka消息消费者:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @description: kafka消费者
* @author:
* @create: 2019-01-29 17:47
**/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "${spring.kafka.topic}")
public void listen(ConsumerRecord<?, ?> record) {
log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
}
}
5.启动类测试:
import kafka.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
/**
* ClassName: SpringBootApplication
* description:
* author:
* date: 2018-09-30 09:15
**/
@org.springframework.boot.autoconfigure.SpringBootApplication//@EnableAutoConfiguration @ComponentScan
public class SpringBootApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringBootApplication.class, args);
}
@Autowired
KafkaProducer kafkaProducer;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
kafkaProducer.send("hello world" + i);
}
}
}
结果: