MQ详解(RabbitMQ及Kafka)
消息队列的作用
(1)解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯。
只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。
(2)异步:如果一个操作涉及到好几个步骤,这些步骤之间不需要同步完成,可以使用MQ进行消息的异步通讯。
比如客户创建了一个订单,需要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的积分等等。这样如果这些系统都直接进行调用,那么将产生大量的时间,这对于客户是无法接受的。并且像添加客户轨迹这种操作是不需要同步操作的。如果将客户创建订单时,后面的轨迹、库存、积分等信息的更新全都放到MQ里面,然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。
(3)削峰:使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定。只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果。
一个系统访问流量有高峰时期,也有低峰时期。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力。但到了某个抢购活动时间,系统并发访问量剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。
优点:
1、对结构复杂、设计系统多的操作进行解耦操作,降低系统的操作复杂度、降低系统的维护成本。
2、对一个可以进行异步操作的一些系统操作进行异步,减小操作的响应时间,提供更好的用户体验。
3、可对高流量进行削峰,保证系统的平稳运行。
缺点:
1、系统可用性降低
一般而言,引入的外部依赖越多,系统越脆弱,每一个依赖出问题都会导致整个系统的崩溃。
2、系统复杂度提高
需要考虑MQ的各种情况,比如:消息的重复消费、消息丢失、保证消费顺序等等。
3、数据一致性问题
比如A系统已经给客户返回操作成功,这时候操作BC都成功了,操作D却失败了,导致数据不一致
常见MQ的特性比较
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
单机吞吐量 | 万级,吞吐量比RocketMQ和kafka要低一个数量级 | 万级,吞吐量比RocketMQ和kafka要低一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景。 |
topic数量对吞吐量的影响 | 无 | 无 | topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,kafka尽量保证topic数量不要过多。如果支撑大规模topic需要增加更多的机器 |
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现可用性 | 高,基于主从架构实现可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 无 | 经过参数优化配置,可以做到0丢失 | 经过参数配置,消息可以做到零丢失 |
功能支持 | MQ领域的功能及其完备 | 基于erlang开发,所以并发性能极强,性能极好,延时低 | MQ功能较为完备,分布式扩展性好 | 功能较为简单,主要支持简单MQ功能 |
优势 | 非常成熟,功能强大,在业内大量公司和项目中都有应用 | erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 | 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的topic、支持复杂的业务场景,可以基于源码进行定制开发 | 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便 |
劣势 | 偶尔有较低概率丢失消息,社区活跃度不高 | 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 | 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 |
有可能进行消息的重复消费 |
应用 | 主要用于解耦和异步,较少用在大规模吞吐的场景中 | 都有使用 | 用于大规模吞吐、复杂业务中 | 在大数据的实时计算和日志采集中被大规模使用,是业界的标准 |
可用性
RabbitMQ的高可用
RabbitMQ基于主从模式实现高可用。
RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式。
(1)单机模式
单机模式就是demo级别的
(2)普通集群模式
普通集群模式就是在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是创建的queue只会放在一个rabbitmq实例上,但是其他的实例都同步了这个queue的元数据。在你消费的时候,如果连接到了另一个实例,它会从拥有queue的那个实例获取消息然后再返回给你。
这种方式并没有做到所谓消息的高可用,就是个普通的集群,这样还会导致要么消费者每次随机连接一个实例然后拉取数据,这样的话在实例之间会产生网络传输,增加系统开销,要么固定连接那个queue所在的实例消费,这样会导致单实例的性能瓶颈。
而且如果那个放queue的实例宕机了,会导致接下来其他实例都无法拉取数据;如果没有开启消息的持久化会丢失消息;就算开启了消息的持久化,消息不一定会丢,但是也要等这个实例恢复了,才可以继续拉取数据。
所以这个并没有提供高可用,这种方案只是提高了吞吐量,也就是让集群中多个节点来服务某个queue的读写操作。
(3)镜像集群模式
这种模式,才是rabbitmq提供是真正的高可用模式,跟普通集群不一样的是,你创建的queue,无论元数据还是queue里面是消息数据都存在多个实例当中,然后每次写消息到queue的时候,都会自动把消息到多个queue里进行消息同步。
这种模式的好处在于,任何一台机器宕机了,其他的机器还可以使用。
坏处在于:
1、性能消耗太大,所有机器都要进行消息的同步,导致网络压力和消耗很大。
2、没有扩展性可言,如果有一个queue负载很重,就算加了机器,新增的机器还是包含了这个queue的所有数据,并没有办法扩展queue。
如何开启镜像集群模式:在控制台新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定节点,然后在创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上面去了。
实际上rabbitmq并不是分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA的机制而已,因为无论如何配置,rabbitmq一个queue的数据就存放在一个节点里面,镜像集群下,也是每个节点都放这个queue的全部数据。
kafka的高可用
(1)kafka的一个基本架构:
A.多个broker组成,一个broker是一个节点;
B.一个topic可以划分成多个partition;
C.每个partition可以存在于不同的broker上面,每个partition存放一部分数据。
这是天然的分布式消息队列。
kafka在0.8之后,提供了HA机制,也就是replica副本机制。
每个partition的数据都会同步到其他机器上,形成自己的replica副本。
然后所有的replica副本会选举一个leader出来,那么生产者消费者都和这个leader打交道,其他的replica就是follower。
写的时候,leader会把数据同步到所有follower上面去,读的时候直接从leader上面读取即可。
为什么只能读写leader:因为要是你可以随意去读写每个follower,那么就要关心数据一致性问题,系统复杂度太高,容易出问题。kafka会均匀的将一个partition的所有数据replica分布在不同的机器上,这样就可以提高容错性。
这样就是高可用了,因为如果某个broker宕机 了,没事儿,那个broker的partition在其他机器上有副本,如果这上面有某个partition的leader,那么此时会重新选举出一个新的leader出来,继续读写这个新的leader即可。
写消息: 写数据的时候,生产者就写leader,然后leader将数据落到磁盘上之后,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好了数据,就会发送ack给leader,leader收到了所有的follower的ack之后,就会返回写成功的消息给消息生产者。(这只是一种模式,可以调整)
读数据:消费数据的时候,只会从leader进行消费。但是只有一个消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。
mq原则
数据不能多,也不能少。不能多是指消息不能重复消费;不能少,是指不能丢失消息。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。
幂等性
幂等(idempotent、idempotence)是一个数学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
简单来说,幂等性就是一个数据或者一个请求,给你重复来了多次,你得确保对应的数据是不会改变的,不能出错。
出现重复消费场景
例如,kafka有一个叫做offset的概念,就是每个消息写进去,都有一个offset代表它的序号。然后consumer消费了消息之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次就算消费者重启,kafka也会让消费者从上次消费到的offset来继续消费。但是,如果consumer消费了消息,还没来得及发送已经消费的消息的offset就挂了,那么重启之后就会收到重复的数据。
保证幂等性
要保证消息的幂等性,要结合业务的类型来进行处理。
(1)数据库操作可以设置唯一键,防止重复数据的插入,这样插入只会报错而不会插入重复数据。
(2)如果要写数据库,可以拿唯一键先去数据库查询一下,如果不存在再写,如果存在直接更新或者丢弃消息。
(2)如果是写redis那没有问题,每次都是set,天然的幂等性。
(4)让生产者发送消息时,每条消息加一个全局的唯一id,然后消费时,将该id保存到redis里面。消费时先去redis里面查一下有没有,没有再消费。
(5)可在内存中维护一个set,只要从消息队列里面获取到一个消息,先查询这个消息在不在set里面,如果在表示已消费过,直接丢弃;如果不在,则在消费后将其加入set当中。
消息可靠性
丢失数据场景
(1)rabbitmq
A:生产者弄丢了数据
生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据
如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所以必须开启持久化,将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。
(2)kafka
A:生产者弄丢了数据
生产者没有设置相应的策略,发送过程中丢失数据。
B:kafka弄丢了数据
比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,他就少了一部分数据。
C:消费者弄丢了数据
消费者消费到了这个数据,然后消费之自动提交了offset,让kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
防止消息丢失
(1)rabbitmq
A:生产者丢失消息
①可以选择使用rabbitmq提供的事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送;如果收到了消息,那么就可以提交事务。
缺点:rabbitmq事务已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
②可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。
一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
A.创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
B.发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。必须要同时开启这两个才可以。
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
(2)kafka
A:消费端弄丢了数据
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
B:kafka弄丢了数据
一般要求设置4个参数来保证消息不丢失:
①给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。
②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
③在生产者端设置acks=all:表示要求每条数据,必须是写入所有replica副本之后,才能认为是写入成功了
④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示要求一旦写入失败,就无限重试
C:生产者弄丢了数据
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
顺序消费
保证顺序
消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。
比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
出现顺序错乱的场景
(1)rabbitmq
①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
②一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
(2)kafka
①kafka一个topic,一个partition,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。
②具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
保证消息的消费顺序
(1)rabbitmq
①拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
②或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
(2)kafka
①确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费。
②写N个内存queue,然后N个线程分别消费一个内存queue即可
消息积压
大量消息在mq里积压
场景:几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不行。一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
解决方案:
这种时候只能操作临时扩容,以更快的速度去消费数据了。
①先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
②临时建立好原先10倍或者20倍的queue数量(或新建一个topic,partition是原来的10倍)。
③然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
④紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。
⑤这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。
⑥等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。
积压消息长时间没有处理,mq放不下了
如果消息积压在mq里,且很长时间都没处理掉,此时导致mq都快写满了。
解决方案:
(1)临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力。
(2)在流量低峰期,去手动查询重导丢失的这部分数据。
消息设置了过期时间,过期就丢了
假设你用的是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。这种情况下,实际上是丢失了大量的消息。
解决方案:
可以采取 “批量重导” 的方案进行解决。
在流量低峰期,写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来