分布式消息队列
1. 我们为什么要用消息队列呢?
我们使用消息队列最多的应用场景就是解耦、异步、削峰
解耦是什么?
这就是传统模式的缺点:系统之间的耦合性太强了,就像上图一样!A系统在代码中直接调用B系统和C系统的代码!,但是如果在D系统接入的时候!,系统A就需要重新修改代码!太繁琐了!
中间件模式的优点:会将消息写入到消息队列,需要消息的系统自己从消息队列中订阅,所以再有新的系统接入的时候A系统不需要做任何的修改
------------------------------------异步-------------------------------
传统模式的缺点:一些非必要的业务逻辑一同步的方式运行,太消耗时间!
消息中间件模式的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快了响应的速度!
传统模式的缺点:就像上图一样,在并发量大的情况下,在所有的请求同时去请求数据库的时候!会造成数据库连接异常!
中间件模式的优点:A系统慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。再生产中,这个短暂的高峰期积压是允许的!
2 . 使用消息队列会有什么缺点呢?
分析:在我们使用MQ的项目中,我们都需要考虑这个问题,那么我们需要怎么去考虑这个问题呢?从两个角度来分析。
1、系统可用性降低 : 为什么这么说呢?一个正在好好运行的项目,一切都是正常的。现在你要加入一个消息队列进去,如果消息队列挂掉了!那么整个系统可想而知啊!
2、系统复杂性增加: 我们需要考虑系统的一致性问题,如何保证消息不被重复消费呢??又如何保证消息可靠传输呢?所以我们需要考虑的问题很多,系统复杂性增大了!
但是不能说我们因为上面的问题就不使用!
3.我们如何保证消息队列的高可用呢?
引入消息队列以后,系统的可用性下降。再生产中,没有人使用单机模式的消息队列。以rcoketMQ为例,他的集群就有多master多slave异步复制模式、多master多slave同步双写模式。多master多slave模式部署架构图。
它的通信过程如下:
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master建立长连接,且定时向Broker发送心跳。Producer只能将消息发送到Broker Master。但是Consumer则不一样,它同时和提供Topic服务的Master和Slaver建立长连接,就是可以从Broker Master订阅消息,也可以从Broker Master订阅消息。
那么对于kafka呢?下面来一张kafka的架构图
就像上图所示的,一个典型的kafka集群中,包含若干Producer(可以是web前端缠上的Page View,或者是服务器日志,系统CPU、Memory等),若干个broker(kafka支持水平扩展,一般broker数量越多,集群吞吐量越高),若干个Consumer Group,以及一个Zookeeper集群。kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化的时候进行rebalance,Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
至于RabbirMQ,也是普通集群和镜像集群模式,自行去了解,比较简单。
4、如何保证消息队列不被重复消费?
其实这个问题就是想问你如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。换句话说,是在考察你的设计能力,根据自己的业务场景来答!
在回答这个问题的时候,你必须自己要搞清楚如何保证消息队列不被重复消费呢!?
其实无论是哪种消息队列,造成重复消费的原因都是类似的。正常情况下,消费者再消费消息的时候,当消费完毕以后,会发送一个确认信息给消费队列。消息队列就知道了改消息已经被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同而已!如
例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUMR_SUCCESS成功标志,kafka实际上有一个offset的概念,就是每一个消息都有offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经被消费过了,name造成重复消费的原因就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分配给了其他的消费者。
怎么解决呢?这个需要针对自己的业务场景来回答! 分为以下几点
1、比如,你拿到这个消息做数据库的insert操作的时候,那就容易多了,给这消息做一个唯一的主键,那么就算出现了重复消费的情况,也会导致主键冲突,避免数据库出现脏数据。
2、如果你是拿到这个消息做redis的set操作的时候,这就不用管了,无论你set几次结果都是在一样的,set操作本就是幂等操作!
3、还有一个方法就是准备一个第三方介质来做消费记录,一redis为例,给消费分配一个全局ID,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先将redis中查询有没有消费记录即可!
5、如何保证消息的顺序性?
针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka就是partition,rabbitMQ就是queue)。然后只有一个消费者去消费队列。 那么想这样一个问题为了保证吞吐量,有多个消费者去消费怎么办?
比如我们在一个****的整个过程,发****,写****评论,删除****。这三个异步的操作,比如你一个消费者先执行了写****评论。但是这时候,****还没有发,所以这个写****写评论的操作一定是失败的!,等一段时间,等另外一个消费者先执行发****的操作后,在执行写评论的操作,就一定能成功, 保证入队的有序性就行
6、如何保证消费的可靠性传输 ?
我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费,其实这个可靠性传输,每种MQ都要从三个角度来分析: 生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。
RabbitMQ
生产者丢数据:
从生产者丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢失消息。
transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channnel.txRollBack()),如果发送成功则提交事务(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个ACK个生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。处理Ack和Nack代码如下所示:
---------从消息丢队列来说丢数据操作:
处理消息队列丢失数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confrim机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号,这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重播。
如何持久化呢?这里顺便说一下,1 、将queue的持久化标识durable设置为true,则代表一个持久化的队列
2、发送消息的时候将deliveMode=2 这样设置以后,rabbitMQ就算是挂了,重启也能恢复数据
----------从消费者丢数据来说
处理消息队列丢数据的情况,一般是采用自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没有能处理该消息,就会丢失该消息! 解决方案就是采用手动确认消息就好!
------------------------------kafka-----------------------------
先来一张kafka Replication的数据流向图
Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也就是该Partition有多少个Replica),producer只将该消息发送到Partition的Leader。Leader会将该消息写入其本地的LOG。每个Follower都从Leader中Pull数据。
对于生产者丢数据:
在kafka生产中,基本都有一个Leader和多个follwer。follwer回去同步leader的信息。因此,为了避免生产者丢数据! 配置下面两点!
1、第一个配置要在producer端设置acks=all。这个配置保证了follwer同步完成后,才认为消息发送成功!
2、在producer端设置retries=MAX,一旦写入失败,无线重发!
对于消息队列丢数据来说!
针对消息队列丢数据的情况,无外乎就是数据没有同步,Leader就挂了,这是zookeeper会将其他的follwer切换为leader,那数据就丢失了。针对这种情况,我们做如下的两种配置。
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列。
2、min.insync.replicas参数,这个值必须大于1,这个是要求一个Leader至少感知有至少一个follower还跟自己保持联系!
3、replication.factor参数,这个值必须大于1,即要求每个partition必须有2个副本
这两个配置加上上面生产者的配置联合起来用,基本可确保kafka不丢失数据!
对于消费者丢失数据:这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了,改成手动提交就行。
offset:指的是kafka的topic中的每个消费组消费的下标。简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。