jms规范以及activeMq相关介绍
jms
提出的指在统一各种MOM(Message-Oriented Middleware )系统接口的规范,只是接口,不包含实现,实现JMS 接口的消息中间件称为JMS Provider。activemq是Apache出品的开源项目,它是JMS规范的一个实现
接口构成:
ConnectionFactory:连接工厂接口用于建立连接
Connection:封装了JMS 客户端到JMS Provider 的连接
Destination:用来指定生产的消息的目标和它消费的消息的来源的对象
Session:单线程上下文创建生产者、消费者。消息等
MessageProducer:由Session 对象创建的用来发送消息的对象
MessageConsumer:由Session 对象创建的用来接收消息的对象
消息结构:
消息头:1、自动分配的消息头:JMSDeliveryMode、JMSMessageID、JMSTimestamp、JMSExpiration、JMSRedelivered、JMSPriority2、开发者分配的消息头:JMSReplyTo、JMSCorrelationID、JMSType
消息属性:1. 应用需要用到的属性;2. 消息头中原有的一些可选属性;3. JMS Provider 需要用到的属性
消息体:1、TextMessage2、MapMessage3、BytesMessage4、StreamMessage5、ObjectMessage6、Message
开发步骤:
1.创建ConnectionFactory2.创建Connection3.创建一个或多个JMS Session4.创建Queue或TopicSession5.创建MessageProducer或MessageConsumer6.发送或者接受消息:(1、异步接收MessageConsumer:setMessageListener(MessageListener m)2、同步接收MessageConsumer:receive() 以上两种接收方式返回Message 对象)
activeMq
模型分析:
生产者通过TransportConnection对象发送信息,TransportConnection对象指向一个regionBroker,这个regionBroker(区域中间件)中包含了四种区域(队列域(queueRegion)、主题域(topicRegion)、临时队列域(tempQueueRegion)、临时主题域(tempTopicRegion)),而每个Region(域)中包含了N个destination(目的地对象),生产者的信息通过regionBroker传给Region后又传给destination,在destination中进行持久化操作,并且"待发送消息指针"对象指向待发送消息的位置,然后当有消息订阅时destination将信息发送给订阅者,(这里注意,订阅信息也在region中),然后订阅机制通过transportConnection发送消息给消费者进程
消息指针:通常消息在未发生或者发送后未收到消费者的确认信息时都会持久保存消息到存储中。当有消费者来可以消费消息时,broker会批量从存储中取出消息,发送给消费者。游标就是指向下次批量获取消息时的存储位置,有如下几种:
- Store-based cursors 基于存储的,慢速消费者需要使用pending cursor来指定下一次批量读取消息的位置
- VM cursors 内存保存一些游标
- File-based cursors 内存超过阈值,存储到临时文件。
通信机制:
消息终结条件:ØConsume ackØ消息过期 Ø存储设备溢出
消息发送过程如图:
这里主要涉及到异步发送和同步发送,异步发送不会返回发送结果,但是效率较高,适用于大量并且允许丢失的数据,同步发送会阻塞并返回发送结果,适用于不可丢失的情况
判定异步发送还是同步发送是在mq代码中判断的,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。但是在开启事务的情况下,消息都是异步发送。至于持久化消息非持久化消息,以及事务模式在后边"可靠机制"章节中讲解
消息接收流程:
1,服务器发送消息有一个队列,如果队列满了需要等待消息的回执(ack)来清理消息,后继续发送,消费者接收信息后需要返回回执信息ack
2.消费者客户端用receive方法可以定义欲获取消息个数,如果大于0则通过拉取的方式获取相应数量的信息,如果小于0则通过推送的方式获取消息,获取队列中没有消息则阻塞
3.消费者客户端用messagelistener方法异步获取消息时调用onmessage方法处理消息
可靠机制:
指定消息传送模式:
1.NON_PERSISTENT(非持久性消息)
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
2.PERSISTENT(持久性消息)
这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
有两种方法指定传送模式:
1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;
2.使用send方法为每一条消息设置传送模式;
默认存储方式:kahadb,文件消息存储器
事务和消息的签收ack:
签收可以由ActiveMQ发起,也可以由客户端发起,取决于Session签收模式的设置。
Ø在带事务的Session中,签收自动发生在事务提交时。如果事务回滚,所有已经接收的消息将会被再次传送。Session配置为SESSION_TRANSACTED。
Ø在不带事务的Session中,一条消息何时和如何被签收取决于Session的设置控制消息的签收
- 1.Session.AUTO_ACKNOWLEDGE(自动签收)
- 2.Session.CLIENT_ACKNOWLEDGE(客户端签收)
- 3.Session.DUPS_OK_ACKNOWLEDGE(不必确保对传送消息的签收)
消息的重传:
Ø触发条件 CLIENT_ACKNOWLEDGE+recover(),事务rollback(),AUTO_ACKNOWLEDGE异常
ØRedeliveryPolicy 配置对应的重发机制
Ø重传次数配置
Ø重发最终失败会丢弃到死信队列
处理失败的过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个死信队列中 死信队列可以拿到,通过监听死信队列获取这些信息 死信队列支持配置 比如消息过期的不放进来
设置消息优先级:
普通情况下可以确保将单个会话向目标发送的所有消息按其发送顺序传送至消费者。然而,如果为这些消息分配了不同的优先级,消息传送系统将首先尝试传送优先级较高的消息。
有两种方法设置消息的优先级:
1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;
2.使用send方法为每一条消息设置传送模式;
注意:消息级别分为0至9,其中0至4是普通级别,5至9为加急消息
允许消息过期:
默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。
有两种方法设置消息的过期时间,时间单位为毫秒:
1.使用setTimeToLive方法为所有的消息设置过期时间;
2.使用send方法为每一条消息设置过期时间;
注意:是在消息发送端调用setTimeToLive方法。
创建临时目标
ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭。只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。
Topic持久订阅:
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscription),非持久订阅只有当客户端处于**状态,也就是和ActiveMQ保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。
建立持久订阅的步骤:
1 . 生产者设置为持久化模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
1. 消费者 为连接设置一个客户ID;
topicConnection.setClientID("clientc");
2. 消费者为订阅的主题指定一个订阅名称;topicSession.createDurableSubscriber(mytopic,"clientc")
注意:上述组合必须唯一。
持久订阅例子方法:TopicSubscriber createDurableSubscriber(Topic topic, String name)
非持久化订阅例子:session.createSubscriber(topic);//无需向connection指定clientId
lMessage Groups 消息分组
Ø针对queue 一个队列 对应多个消费者,默认消费者会去竞争获取消息。
Ø利用消息分组,具有相同groupId的消息会被投送到同一个消费者(只要这个consumer保持active)
Ø负载均衡 broken收到消息,检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息
- message.setStringProperty("JMSXGroupID","GroupA");
spring集成jms:
Spring框架提供一个模板机制JMSTemplate来隐藏Java API细节。参考源码类JMSTemplate.java
Spring提供了JMSTemplate类,所以开发者不必为JMS实现编写样本代码。何为模板模式,请参考https://blog.****.net/l450741881/article/details/88837064