ActiveMQ(三)——spring管理ActiveMQ,实现发送和接收效果
一、前言
在前一篇博客中,小编向大家简单的介绍了一下ActiveMQ的消息处理方式,包括了点对点,发布订阅两种模式。写向大家展示了一下如何使用,但是在真正开发的时候我们是不会写那么一大片代码,从建立连接工厂,再由连接工厂创建连接对象,连接对象打开连接,连接对象然后创建session,session创建目的地,用于连接数据。
这个过程是比较复杂的,在开发的时候一般我们会使用Spring ,把这些操作来交给Spring管理,因为Spring的核心功能中包括了依赖注入,由spring容器创建连接工厂,方便操作。
二、ActiveMQ整合Spring
2.1 引入相关jar
spring相关:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
ActiveMQ相关:
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
2.2 建立applicationContext-mq.xml文件,用于配置整合信息
-
这里主要事实配置连接工程ConnectionFactory。
这里是把ActiveMQConnectionFactory交给spring的jsm的SingleConnectionFactory管理: -
配置了生产者
使用spring提供的jsm工具类,可以进行消息发送和接收 -
配置队列或主题的目的地
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.137.15:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>Ares-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="Ares_topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" />
<bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="transactionBizMessageListener" />
</bean>
</beans>
三、发送消息到Queue
查询职工id为3的查询职员的相关信息,并把 查询的信息放入到mq 中,这里呢,小编为了突出存放的过程,就查询了10次,并且都存储到mq中:
这里就用到了jms提供的配置类jmsTemplate的实例,通过jmsTemplate来进行存储消息操作:
package com.dmsd.mq.activemq;
import com.dmsd.dao.TStuffMapper;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.JacksonJsonUntil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Created by Ares on 2017/11/14.
*/
@Service
public class QueneProducer implements QueneProducerFacade {
//注入ActiveMQ的模板,在spring中注入的bean
@Autowired
JmsTemplate jmsTemplate;
//注入Queue的目的地址,在spring中配置的bean
@Autowired
ActiveMQQueue queueDestination;
@Autowired
TStuffMapper tStuffMapper;
/**
* 发送消息到队列-王雷-2017年11月14日16:25:34
*/
@Override
public void QueneProducer() {
for (int i=0;i<10;i++){
System.out.println("开始发送消息=====》"+i);
//使用JmsTemplate对象发送消息。
jmsTemplate.send(queueDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//根据职工id查询职员
TStuff tStuff = tStuffMapper.selectByPrimaryKey((long) 3);
//转换为json
String json = null;
try {
json = JacksonJsonUntil.objectToJson(tStuff);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//创建一个消息对象并返回
TextMessage textMessage = session.createTextMessage(json);
return textMessage;
}
});
System.out.println("第"+i+"条消息发送完成");
}
}
}
Controller代码:
package com.dmsd.mq.activemq.controller;
import com.dmsd.mq.activemq.QueneProducerFacade;
import com.dmsd.tool.AresResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Created by Ares on 2017/11/16.
*/
@Controller
public class ActiveMqController {
//定义打印日志相关
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Autowired
QueneProducerFacade queneProducerFacade;
@RequestMapping(value="/QueneProducer",method = RequestMethod.GET)
@ResponseBody
@CrossOrigin
public AresResult QueneProducer(){
try {
queneProducerFacade.QueneProducer();
return AresResult.build("0000","消息上传mq成功");
}catch (Exception e){
logger.error("消息上传mq失败");
e.printStackTrace();
}
return AresResult.build("1111","消息上传mq失败");
}
}
运行项目:
MQ初始的情况: 在队列中没有消息,pending message为0,
向MQ中存储10条消息:
MQ中的情况:Messages Enqueued增加了10 ,Messages Dequeued 的值也增加了10。说明刚进去的消息已经被消费者消费了。
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量
四、从ActiveMQ中取出消息
这里我们使用了监听机制,ActiveMQ有三种消息监听器:MessageListener、SessionAwareMessageListener、MessageListenerAdapter。关于这三种的区别,小编在后面的博客向大家介绍。
在上文中,小编向大家介绍了ActiveMQ整合spring。如果我们需要配置消息接收,也需要在配置文件中配置消息监听器:
这里我们配置了监听器,一直监听要监听的队列,当队列有消息的时候,就会获取的message,然后传给后台使用。
<!-- 接收消息 -->
<!-- 配置监听器 -->
<bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" />
<bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="transactionBizMessageListener" />
使用SessionAwareMessageListener监听器:
可能很多朋友被监听器吓到了,其实就是一个类,这个类实现了SessionAwareMessageListener接口,实现了onMessage(message,session)方法,当监听器所监听的队列有了数据后,就可以把数据获取到,并且使用。
package com.dmsd.mq.activemq.listener;
import com.dmsd.api.StuffServiceFacade;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.AresResult;
import com.dmsd.tool.JacksonJsonUntil;
import org.apache.activemq.Message;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.SessionAwareMessageListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;
/**
* Created by Ares on 2017/11/15.
*/
public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {
@Autowired
StuffServiceFacade stuffServiceFacade;
@Override
public void onMessage(Message message, Session session) throws JMSException {
//获取消息内容
ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
System.out.println(msg);
String text = null;
text = msg.getText();
System.out.println(text);
//json转换为对象
try {
TStuff tstuff = JacksonJsonUntil.jsonToPojo(text, TStuff.class);
TStuff stuff = new TStuff();
stuff.setSex(tstuff.getSex());
stuff.setName("德玛西亚");
stuff.setClassid(tstuff.getClassid());
stuff.setAddress(tstuff.getAddress());
int i = stuffServiceFacade.insertStuff(stuff);
if (i>0){
System.out.println("添加职员成功");
}
else {
System.out.println("添加职员失败");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
当队列中有消息的时候,会自动获取消息,然后执行onmessage方法,小编在onmessage方法中把队列中的消息取出来了,存储到数据库中了。
打印的存储情况:
数据库中存储的数据:五、小结
这个发送和接收的过程和spring结合只有是比较简单了,使用也比较方便了,重点还是要理解这个过程,spring和activeMQ整合的过程。下面的博客中小编会向大家ActiveMQ的三种消息监听方式。