ActiveMQ学习四(高级特性2-6[消息持久化、事务、确认机制、投递方式、死信队列])
消息持久化
消息持久化是保证消息不丢失的重要方式
ActiveMQ提供了以下三种的消息存储方式:
- Memory(内存)
优点:效率高,
缺点:一旦ActiveMQ服务器重启内存会丢失 - 日志,
优点:可以持久化当MQ服务器重启依然会保留在MQ服务器里面丢失的可能性就比较低了,
缺点:肯定比内存的效率会稍低一点,但是ActiveMQ在对日志的形式的持久化做了一个提高,所以性能也不会说损耗特别多 - 第三方数据库,
优点:如果后面业务需要去查询这些消息的内容,方便在数据库中查询
缺点:依赖于第三方数据库可能会稍微麻烦一点
ActiveMQ持久化机制流程图:
生产者发送过程:
- 发送消息
- MQ持久化
- 通知结果
消费者接受过程:
- 消费者监听MQ服务器接受MQ传来的消息
- MQ服务器等待消费方返回一个通知
- MQ服务器才会把这个消息移除(为什么要移除因为如果不移除消息可能会重复发送就会导致消息的一个重复问题)
以上简单了解了一下消息持久化在MQ的策略
接下来演示:
存入内存
springboot应用默认用日志形式的存储去存储消息
所以我们要演示内存形式的话要去调整一下配置
接下来我们去生产者方法里面发送一条数据
这个时候消息是到达了ActiveMQ服务器
这个时候我们去Linux上面重启ActiveMQ服务器
再查看,消息丢失,这就证明消息是存在内存里面的
存入日志
默认存储到ActiveMQ的data下面的kahadb目录
然后我们去重启生产者方法
再重启ActiveMQ
再看ActiveMQ后台
消息还在,入列丢失不明白是什么原因。。
存入数据库
-
首先要加上delivery-mode持久化配置
-
修改activemq.xml
先用linux来到activemq的根目录然后进入conf目录
找到文件,vi命令进去查看
数据源用druid(有人叫朱瑞得也有人叫德鲁伊)
然后复制进去更改地址
db_activemq是数据库名,可以去数据库里面再创建这个数据库
持久化的适配器是默认kahadb的目录,我们现在要改成jdbc持久化适配器,注释掉再加上jdbc的。
现在我们需要导入进linux两个jar包(mysql连接和druid)
然后可以在根目录看到 ,接下来复制进ActiveMQ的lib包下
然后我们需要去创建一个数据库
创建好之后我们重启ActiveMQ
配置了这个数据源之后重启会稍微变慢,这个时候我们会发现数据库中多处了3张表
acks:确认信息表
lock:锁表
msgs:信息表主要要用到的表
目前可以看到都是空的
就跟之前一样去生产者方发送就行了
这个时候就会多出一条消息,一些消息头消息内容消息属性什么的
这个时候再重启activemq
等待重启进入后台,消息依然存在
只要消息没被消费掉,那么就会一直存在这里
那么消息什么时候会删除呢?
- 去消费
- 等过时
接下来我们去启动消费方
出列了
数据库信息也不见了
消息事务
ActionMQ消息事务流程图:
事务性的发送,要调用事务commit方法MQ才去把消息存储和确认
消费者这边也有事务(不明显原因:),只是他是在一条消息里面的不是多条消息里面,如果消费者消费到了那么他就会确认消息收到,然后MQ服务器就会把消息删除掉
如果消费者抛出异常或者主动对事务进行回滚,那么消息有一个重发这么一个机制
事务示例
一、生产者事务
首先来看没有事务的发送
循环发送10条消息,用jmsMessagingTemplate比较简单发送一行代码
模拟异常
事务性发送方案一:原生JMS
全收到
模拟异常全不收到
事务性发送方案二:JMS事务管理器
spring的事务管理器是套个接口,他下面有很多不同的事务管理器的实现比如jdbc、hibnernate、jpa等等,也有JMS事务管理器
- 首先要添加一个ActiveMQ的配置类,创建JMS事务管理器
有了这个事务管理器我们就能用他去发送,首先我们代码不能写在测试方法里面,因为spring的事务管理要做在业务层的
所以新建个业务类
然后我们可以在测试方法里面调用这个业务
消费者的事务接收
消费者这边也有事务(不明显原因:),只是他是在一条消息里面的不是多条消息里面,如果消费者消费到了那么他就会确认消息收到,然后MQ服务器就会把消息删除掉
如果消费者抛出异常或者主动对事务进行回滚,那么消息有一个重发这么一个机制
只需要加一个session,然后用session去提交回滚事务即可
加个报错
测试
重发失败6次之后消息会进入一个死信队列
确认机制
ActiveMQ的消费方有一个确认机制
从下图可以看到,消费者监控MQ接收消息,然后返回一条确认消息,最后MQ移除消息
确认机制跟这个事务有紧密联系的,如果我们在消费方开启了事务,消费方接收到了消息之后会自动确认给MQ服务器说确认消息
如果我们没有事务
消息合适被确认取决于创建会话时候的第二个参数应答模式,有三个参数可供选择,之前我们一直选择的第一个
自动确认:收到自动确认
手动确认:通过acknowledge方法去确认已经收到(如果接受多条确认一条就会都确认)
延迟确认:可能会导致重发情况
记得先关闭事务
示例
自动就不说了
手动:
我们先在springbootApplication的同级目录下写一个配置类
springboot整合手动确认应该是4而不是之前的2,那个在原生的jms里面有效
这个@JmsListener上面加个参数
这个时候MQ后台因为没有被确认所以会一直在MQ服务器中,这个时候重启几次消费者就能消费几次,因为一直没被确认
手动确认
消息投递方式
异步投递
同步发送:一般情况下,生产者采用了持久化的存储方式,持久化一个默认的发送原则就是同步发送,他需要等到MQ服务器反馈一个消息给生产者,生产者才认为这个消息投递成功了,这是个阻塞的投递状态,这个状态会导致生产者的一个延迟或是性能下降。
好处:可以保证消息投递的可靠性
异步发送:生产方只要把消息投递出去不用等MQ服务器反馈给我们消息我们就可以法吓一跳消息,这种情况下生产方其实是没有去接收MQ的确认消息的,所以生产方有没有发送成功这个是不确定不能保证的,
缺点:可能会存在一些消息丢失的情况
只有当我们容忍一些消息的丢失可以采用异步投递
如果是非持久化并且都是false,还是采用异步的
配置异步投递方式
注意:倮是spring或者是springboot可以通过修改JmsTemplate的默认参数实现异步或同步投递
springboot:
异步投递如何确认发送成功?
在JMS可以实现在异步投递的过程中,可以用代码获取到是否投递成功
延迟投递
-
即时投递
之前我们的案例都是采用即时投递的方式 -
延时投递
我们会等待以断时间,比如等待个10秒,但是activeMQ服务器是不支持延迟投递这么一个特性 -
修改ActiveMQ.xml
-
重启ActiveMQ
-
代码,用原生的JMS代码
定时投递
定时投递跟延时投递有点像
- 启动类@EnableScheduling
2. 在生产者添加Scheduling的定时
设置每隔3秒发一条
死信队列
作用:保存一些因为业务逻辑处理失败,而导致消息的失败或者说是消息发送过期的消息,有了死信队列能够保证在发送消息和接收消息过程中因为某些异常导致消息丢失的队列
导致死信队列的两种情况
1、消息处理失败(大部分情况,当一个消息被重发6次时,这个消息会被认为是处理失败,然后会进入死信队列)
什么情况会消息重发呢?
- 开启事务的消息里面我们手动调用了一个rollback
- 开启事务里面没有调用commit
- 没开事务,手动确认,我们手动调用session.recover()
2、消息发送过期
注意两点:
1、只有持久化才会放到死信队列
2、没有指定的话,ActiveMQ会把死信队列发到这个队列ActiveMQ.DLQ
死信队列配置调整
实际开发中可能有很多队列,每个队列都有失败消息,如果把所有队列的失败消息都放在一个队列里面不好处理这些失败消息,我们需要把每一个队列放到他们自己的死信队列里面去
- 修改ActiveMQ.xml
其实这样就可以让每一个队列都有一个专属的死信队列
这时候我们再重启ActiveMQ
测试:
- RedeliveryPolicy重发策略设置
刚才我们在测试的时候重发了6次,其实这个次数也是可以改的,间隔时间默认是1秒其实也是可以改的
在springboot里面我们可以通过一个配置类去改
我们在消费方的配置里面