kafka的环境搭建和kafka与springboot的整合

kafka的环境搭建和kafka与springboot的整合

  • kafka的环境搭建
    1. 卸载centOS内置的openjdk1.7,安装jdk1.8
    2. zookeeper(zookeeper-3.4.5.tar.gz)搭建
      这里因为是测试所以没有搭建集群。
      1. 解压zookeeper-3.4.5.tar.gz:

        tar -zxvf zookeeper-3.4.5.tar.gz

      2. 进入/zookeeper-3.4.5/conf目录下,拷贝zoo_sample.cfg并命名为zoo.cfg,命令如下:

        cp zoo_sample.cfg zoo.cfg

      3. 修改zoo.cfg配置文件:
        kafka的环境搭建和kafka与springboot的整合

        tickTime=2000
        initLimit=10
        syncLimit=5
        dataDir=/opt/zookeeper/zookeeper-3.4.5/zkdata
        dataLogDir=/opt/zookeeper/zookeeper-3.4.5/zkdatalog
        clientPort=2181
        server.1=192.168.234.129(自己的IP地址):12888:13888
        echo “1” > /opt/zookeeper/zookeeper-3.4.5/zkdata/myid

        • 12888端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口,13888端口是在leader挂掉时专门用来进行选举leader所用。
        • dataDir 定义:zookeeper保存数据的目录;
        • dataLogDir= #Zookeeper将写数据的日志文件保存在这个目录里;
      4. 启动服务:

        cd /opt/zookeeper/zookeeper-3.4.5/bin
        ./zkServer.sh start

      5. 查看服务状态:

        ./zkServer.sh status

    3. kafka的搭建
      1. 解压

        cd /opt/kafka
        tar zxvf kafka_2.12-2.1.0.tgz

      2. 修改配置文件
        kafka的环境搭建和kafka与springboot的整合

        cd /opt/kafka/kafka_2.12-2.1.0/config
        vim server.properties

        listeners=PLAINTEXT://10.15.21.62:9092 #监听端口advertised.listeners=PLAINTEXT://10.15.21.62:9092 #提供给生产者,消费者的端口号。可以不设置则使用listeners的值

      3. 启动kafka服务

        ./kafka-server-start.sh …/config/server.properties

      4. 测试kafka是否安装成功

        查看主题: ./kafka-topics.sh --list --zookeeper 192.168.234.129:2181
        创建topics:./kafka-topics.sh --create --zookeeper 192.168.234.129:2181 --replication-factor 1 --partitions 1 --topic kafka-test
        创建消费者:bin/kafka-console-consumer.sh --bootstrap-server 192.168.234.129:9092 --topic test --from-beginning
        创建生产者:bin/kafka-console-producer.sh --broker-list 192.168.234.129:9092 --topic test

  • kafka与springboot的整合
    创建一个springboot的web项目
    1. 注入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.0.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
  1. 配置文件application.properties

server.servlet.context-path=/ server.port=8082

spring.kafka.bootstrap-servers=192.168.234.129:9092
#生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性 spring.kafka.producer.batch-size=16
#每批次发送消息的数量 spring.kafka.producer.retries=0
#设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
spring.kafka.producer.buffer-memory=33554432
#producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#key序列化方式

#消费者的配置
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
spring.kafka.consumer.auto-offset-reset=latest
#是否开启自动提交
spring.kafka.consumer.enable-auto-commit=true
#自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=100
#key的解码方式
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
#value的解码方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#在/usr/local/etc/kafka/consumer.properties中有配置
spring.kafka.consumer.group-id=test-consumer-group

  1. entity类
  	 @Data 
  	 public class Message {
  		 private Long id;
  		 private String msg;
  		 private Date sendTime;
  	 }
  1. producer生产者

@Component
@Slf4j
public class KafkaProducer {

	@Autowired
	private KafkaTemplate kafkaTemplate;

  	private Gson gson = new GsonBuilder().create();

	public void send() {
   	 	 Message message = new Message();
   	 	 message.setId(System.currentTimeMillis());
   		 message.setMsg(UUID.randomUUID().toString());
  	  	 message.setSendTime(new Date());
   	  	 log.info("message = {}", gson.toJson(message));
  	 	 //test为主题
   	 	 kafkaTemplate.send("test", gson.toJson(message));
  	 } 
}
  1. consumer消费者

@Component
public class KafkaConsumer {

		@KafkaListener(topics = "test")
		public void listen(ConsumerRecord<?, ?> record) {
  			Optional<?> kafkaMessage = Optional.ofNullable(record.value());
   			if (kafkaMessage.isPresent()) {
  				Object message = kafkaMessage.get();
  				System.out.println("---->" + record);
  				System.out.println("---->" + message);
  			}
  		} 
}
  1. controller测试

@RestController
@RequestMapping("/kafka")
public class SendController {

   @Autowired
   private KafkaProducer producer;

   @RequestMapping(value = "/send")
   public String send() {
  	 producer.send();
  	 return "success";
    }

}

  1. 目录结构
    kafka的环境搭建和kafka与springboot的整合
  2. 测试成功
    kafka的环境搭建和kafka与springboot的整合
    kafka的环境搭建和kafka与springboot的整合