初识Kafka
一、前言
kafka是很牛逼的,通过查阅一些关于它的文档,我知道了一个良好的kafka集群可以每秒处理几十上百万条的请求,它有着一些自己独特的或者说是很多牛逼的开源框架或插件都比较类似的性能,来实现它的高吞吐量、存储空间大,查询性能不受其影响等,各位看官可自行百度。。。
二、安装
各位可自行百度去安装这个东西,先安装zookeeper后安装kafka,一般单机安装的话,在zookeeper的配置文件指定安装服务器的ip就行了,kafka配置文件配置数据的存储路径和zookeeper的监控地址就行了。。。
另外也可以在安装一个kafka-manage,它是可视化且容易管理kafka集群的,比如建立toptic,设置分区,查看toptic的消费情况,和消费者等。
三、在spring Maven项目快速又简单的使用(spring-clould-stream-kafka)
1、maven
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Greenwich.SR3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
2、.yml配置文件的配置
spring:
cloud: stream: default-binder: kafka #我们使用的插件为kafka kafka: binder: brokers: 10.18.101.230 #kafka服务的ip defaultBrokerPort: 9092 #端口号 minPartitionCount: 1 auto-create-topics: true auto-add-partitions: true bindings: input: #要与@input("${name}")对应 destination: rong-send #kafka上创建的topic group: rong #访问的用户组 contentType: application/json consumer: headerMode: raw output: #要与@output("${name}")对应 destination: rong-send contentType: application/json group: rong
3、代码使用
创建者(将消息放进kafka):
import com.gsww.docking.util.R; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils; import javax.annotation.Resource;
@Slf4j @Service public class MessageSendServiceImpl { @Resource Processor processor; public R<Boolean> sendMsg(int i) { Message<String> message = MessageBuilder .withPayload("川建国同志发来的第"+i+"封贺电!") .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); processor.output().send(message); return new R<>(true); } }
消费者(监听消费):
import com.gsww.docking.util.R; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import javax.annotation.Resource; @Slf4j @EnableBinding(Processor.class) public class MessageStreamLister { @Resource MailSendServiceImpl mailSendService; /** * 异步发送消息并保存记录 * @param message */ @StreamListener(value = Processor.INPUT) public void handleSysLog(Message<String> message) { log.info("handleSysLog:" + message.toString()); R<Boolean> result = null; result = mailSendService.sendMsg("大吉大利,今晚捉急", message.getPayload(), "[email protected]"); } }
注意:
前面配置文件的
我在测试的代码里面用到了引入的包里自带的类,即:
配置文件标红的是与下面注解的value所对应的,各位想要不同的名字的通道,大可定义这样的接口,接口不用实现,项目在部署阶段stream-kafka会检测这两个注解@[email protected],自动为它们实现的。
4、浅谈kafka机制
每个topic的消息可供不同的消费者(基于消费组,即一个消费组只能消费一次)消费,可同时可异时,若是这个topic的消息太多,则可以分区,比如zookeeper对应了三个kafka服务器,那么在这个kafka集群里面的服务里面都建立了相同的toptic,那么即实现了这个toptic的分区,这是跨服务器的分区,或者只是单纯的提升kafka的处理速率,可同一服务建立多个相同的toptic(因为为每个消费线程只能同时读取一个分区,若有三个分区,便可创建三个线程去消费toptic,且一个名字的group的所有消费线程加起来消费一个toptic,不要考虑重复,基于一个消费组读取toptic是全部且不重复的)。
分区的实现可以很大的再次提升kafka处理高并发的性能,那么我们在这里再考虑一个问题,万一某一台kafka服务挂掉了,那么基于这台服务的分区的数据岂不是要无法消费且出现系统问题了,所以这里出现了复制因子,如下我们称呼它为副本,即我们可以给每个分区建立它的副本在其他的服务器上;我们会了解到,kafka做了很人性化的设计,当同时存在分区和它的副本的时候,那么zookeeper基于这个分区和副本,会随机的分配一个leader,有它主导其他的follower(副本),消费者进来后消费leader,leader将更新的数据通过zookeeper而及时的同步到其他的副本,且leader是zookeeper通过自己的规则不定时更换的,由此而实现了对应的负载均衡,当zookeeper检测到某一台leader的服务宕机后,它又会指派一名leader(从follower中产生)去实现它的功能,所以集群里面的副本较多,容错率也就越大;宕机的服务器又运行起来,它就会以follower的角色,慢慢的从leader那里追上相应的数据,以后也可以在此成为leader,这个是基于配置里面独一无二的kafka服务的id来对应实现的,但是若是它距离leader的距离太远,可能要考虑重新建立副本并分配id了。