springboot+kafka搭建
一、工程目录
二、源码
(1)父类pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo</groupId>
<artifactId>springboot</artifactId>
<packaging>pom</packaging>
<version>1.0</version>
<modules>
<module>platform</module>
</modules>
<name>springboot</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<properties>
<main.basedir>${basedir}</main.basedir>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
<project.deploy.directory>${basedir}/target</project.deploy.directory>
<mybatis-spring-boot.version>1.3.1</mybatis-spring-boot.version>
<druid.version>1.0.26</druid.version>
</properties>
<dependencyManagement>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
(2)子类pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot</artifactId>
<groupId>com.demo</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>platform</artifactId>
<name>platform</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot.version}</version>
</dependency>
<!-- 数据源 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- druid 连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- activemq -->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>-->
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件为Spring Boot应用提供了执行Maven操作的 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 打包后的输出路径 -->
<outputDirectory>${project.deploy.directory}</outputDirectory>
</configuration>
</plugin>
<!-- 编译插件,用于指定编译的jdk版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<!-- 将当前项目打包为Jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<!-- 用于定制化打包方式 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>config-zip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<skipAssembly>false</skipAssembly>
<!-- 打包输出路径 -->
<outputDirectory>${project.deploy.directory}</outputDirectory>
<!-- 描述符文件,用于描述那些文件需要被打包 -->
<descriptor>src/main/assembly/config-zip.xml</descriptor>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(3)application配置文件
spring.application.name=springboot_demo
#tomcat访问上下文路径
server.context-path=/springboot_demo
#指定tomcat端口
server.port=8090
#指定tomcat的编码格式
server.tomcat.uri-encoding=UTF-8
#配置字符集过滤器
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
#################### kafka ###########################
#指定kafka 代理地址,可以多个,用逗号隔开
spring.kafka.bootstrap-servers=localhost:9092
#指定默认消费者group id
#spring.kafka.consumer.group-id=myGroup
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#consumer组一
spring.kafka.consumer.group-id.one=test
#consumer组二
spring.kafka.consumer.group-id.two=test2
(4)KafkaConfig配置文件
@Configuration
public class KafkaConfig {
@Autowired
KafkaProperties properties; //自动加载application中kafka的配置项(不包括自定义的配置项)
@Value("${spring.kafka.consumer.group-id.one}")
private String groupOne;
@Value("${spring.kafka.consumer.group-id.two}")
private String groupTwo;
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> producerProperties = properties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProperties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(kafkaProducerFactory());
}
/**
* 用于实现多个consumer在不同group
*/
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactoryOne() {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupOne);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactoryTwo() {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupTwo);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean(name = "kafkaListenerContainerFactory") //这里一定要命名为kafkaListenerContainerFactory,否则报错
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryOne() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactoryOne());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryTwo() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactoryTwo());
return factory;
}
}
(5)生产者
@Component
public class Producer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("test", message);
}
public void sendMessageSecond(String message) {
kafkaTemplate.send("test2", message);
}
}
(6)消费者
@Component
public class Consumer {
@KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory")
public void listen(String message) {
System.out.println("=========== " + message + " =============");
}
@KafkaListener(topics = {"test2"}, containerFactory = "kafkaListenerContainerFactoryTwo")
public void listenSecond(String message) {
System.out.println("=========== " + message + " =============");
}
}