【RabbitMQ】如何保证消息消费时的幂等性+如何处理消息丢失的问题

一、如何保证消息消费时的幂等性

      这个问题可以等价于如何保证消费不被重复消费?为什么在消费队列里消费到了重复的数据。

1、kafak消费端可能导致的重复消费问题

                     【RabbitMQ】如何保证消息消费时的幂等性+如何处理消息丢失的问题

       消费者准备提交offset,但还没有提交的时候,消费者进程被重启了。那么此时已经消费过的数据offset还没有提交。kafka也就不知道你消费了offset=153的那条数据。

       消费者一旦重启,马上会找kafka说:哥们,把我上次消费到的那条数据之后的数据给我。数据1和数据2又发送过来了。等于对你来说,数据1和数据2在数据库里消费了2次。 

2、如何保证重复消费的幂等性

     这个还得结合业务来思考,这里给几个思路。

     ① 如果你拿个数据要写库,你先根据主键id查一下。如这数据都有了,你就别插入了。update一下。

     ② 如果你拿个数据是要更新表里的字段,直接update就好,幂等的。

     ③ 基于数据库的唯一性索引。因为有唯一性索引了。所以重复数据只会插入报错,不会导致数据库中出现重复的数据。

     ④ 如果你是写redis,那没问题了。每次都是set,天然幂等。

     ⑥ 如果不是上面的场景,那想的稍微复杂一点,需要生产者在发送消费的时候里面加一个全局性的唯一ID(UUID或者雪花算法),类似于订单ID之间的东西,先根据这个ID去redis中查一下,之前消费过嘛?如果没有消费过你就处理,然后把这个唯一性ID写Redis。如果消费过了,就不处理了。

二、啥?我发到消息队列里的数据怎么不见了

       如果保证消息的可靠性传输?如何处理消息丢失的问题?

       用MQ有个基本的原则,就是数据不能多一条,也不能少一条。不能多,就是刚刚说的重复消费和幂等性的问题, 不能少,就是说这条数据别搞丢了。

       如果用MQ来传递非常核心的消息,比如说计费、扣费的一些消息,比如一个广告平台,计算系统是很重的一个业务,操作是很耗时的,所以说广告系统整体的架构里面,实际上是将计费做成异步的。然后中间就是加了一个MQ。广告主投放了一个广告,命名说好了,用户点击一次扣费1块钱。如果要是把用户点击一次,扣费的消费搞丢了,公司就会不断的少几块钱。

       RabbitMQ可能存在的数据丢失问题,一般是三种情况。

                                             【RabbitMQ】如何保证消息消费时的幂等性+如何处理消息丢失的问题

       ① 写消息的过程中,消费还没到MQ,在网络传输过程中丢了,或者是消费到了RabbitMQ,但是MQ那出错了,没保存下来。

       ② RabbitMQ接收到消费之后,先暂存在自己的内存里,结果消费者还没来得及消费,RabbitMQ自己挂掉了,导致暂存在内存中的数据丢了。

       ③ 消费了拿到了这个数据,但还没来得及处理,自己就挂掉了。但是RabbitMQ以为这个消费者消费完了。

       1)生产者弄丢了数据

       ① 事务 

       此时,可以选择RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务,然后发送消费,如果消费没有被RabbitMQ成功接收到,那么生产者会收到异常报错,此时就可以回滚事务。然后重试发送消息。如果收到了消息,就可以提交事务,但问题是,RabbitMQ事务一搞,基本上吞吐量会下来,因为太耗性能。

       事务机制是同步的,生产者发送这个消息,会同步阻塞卡住,等待是成功还是失败,会导致生产者发送消息的吞吐量下降。

       ②confirm

       把生产者调成confirm模式。先把channel设置成confirm模式,发送一个消息,发送完消息之后你就不用管了。 RabbitMQ接收到消息之后,就会回调生产者本地的一个接口,通知你说,这条消息我已经收到了。

       RabbitMQ如果在接收消息的时候报错了,就会回调你的接口,告诉你这个消息接收失败了。就可以再次重发。 

       生产者这块保证消息不丢失,一般使用confirm机制,异步的模式,发送消息之后不会阻塞,可以直接发下一个消息,吞吐量比较高。  

       2 )RabbitMQ保证消息不丢

       就是RabbitMQ自己弄丢了数据,这个你必须开启RabbitMQ的持久化。就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了。恢复之后会自动读取之前存储的数据,一般数据不会丢,极其罕见的是RabbitMQ还没进行持久化,自己就挂了,可能导致少量数据会丢失。但是这个概率较小。

       设置持久化有2个步骤。第一个是创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化Queue的原数据,但是不会持久化queue里面的数据。第二个是发送消息的时候将消息的deliverymode设置为2。就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行。RabbitMQ哪怕挂了,再次重启,也会从磁盘上重启恢复Queue,恢复这个Queue里面的数据

       而且持久化可以跟生产者那边的confirm机制配合起来。只有消息持久化到磁盘之后,才会通知生产者ack。所以哪怕是在持久化到磁盘之前,RabbitMQ挂了,生产者收不到ack,你也可以自己重发的。

       哪怕是RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,但是还没来得及持久化到磁盘,此时,RabbitMQ挂了,在内存中的数据可能就丢了。

      3)消费者把数据弄丢了

      打开了消费者AutoACK的这么一个机制。

      消费者接收到了数据之后,消费者会自动通知RabbitMQ说,OK,已经消费了这条数据。如果你消费到的这条消息,还没处理完,此时消费者自动autoACK啦。通知RabbitMQ说,这条消息已经消费啦。此时,消费者服务宕机了,那条数据就会丢失。还没处理完,RabbitMQ却认为已经处理完了。

      消费者这边,需要将autoACK自动关闭。每次自己确定处理完一条消息之后,你再发送ack给RabbitMQ。如果还没处理完就宕机了,此时RabbitMQ你收到你发的消息。