ssm整合rabbitmq消息队列的简单使用案例
项目:ssm(spring+springmvc+mybatis)
编译工具:eclipse
消息队列:rabbitmq
1.在pom.xml添加rabbitmq依赖:
<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
2.编写rabbitmq的配置文件:rabbitMQ.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" host="127.0.0.1" port="5672"
/>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!-- 消息接收者 -->
<bean id="messageReceiver" class="com.comit.appointment.modules.test.MessageConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>
<!--定义queue -->
<rabbit:queue name="queueChris" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="receiverChris" class="com.comit.appointment.modules.test.ChrisConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueChris" ref="receiverChris" />
</rabbit:listener-container>
<!-- 分隔线 -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory2"
username="guest" password="guest" host="127.0.0.1" port="5672" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2" />
<!--定义queue -->
<rabbit:queue name="queueShijj" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTest2"
durable="true" auto-delete="false" declared-by="connectAdmin2">
<rabbit:bindings>
<rabbit:binding queue="queueShijj" pattern="shijj.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
exchange="exchangeTest2" />
<!-- 消息接收者 -->
<bean id="recieverShijj" class="com.comit.appointment.modules.test.ShijjConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="queueShijj" ref="recieverShijj" />
</rabbit:listener-container>
</beans>
3.在ssm项目的上下文配置文件spring-content.xml中引入rabbitmq的配置文件
spring-content.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.1.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd"
default-lazy-init="true">
<!-- rabbitmq -->
<import resource="classpath*:rabbitMQ.xml" />
<context:component-scan base-package="com.comit.appointment">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
</beans>
4.编写测试代码:
1.消息生产者:
package com.comit.appointment.modules.test.cs;
import java.io.IOException;
import javax.annotation.Resource;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import com.comit.appointment.modules.sys.entity.Users;
@Service("MessageProducer2")
public class MessageProducer2 {
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
public void sendMessage(Object message)throws IOException{
amqpTemplate.convertAndSend("queueTestKey",message);
}
}
注:queueTestKey 是在rabbitMQ.xml绑定到队列的,根据这个key来发送消息到相应的消息队列中。
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
2.消息消费者:获取消息并把消息插入日志记录表中
package com.comit.appointment.modules.test;
import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.comit.appointment.modules.sys.dao.LogsMapper;
import com.comit.appointment.modules.sys.entity.Logs;
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Autowired
private LogsMapper logsMapper;
@Override
public void onMessage(Message message) {
try {
String messages = new String(message.getBody(), "UTF-8");
String[] str=messages.split(",");
Logs log=new Logs();
log.setNAME(str[0]);
log.setTIME(str[1]);
log.setACTION(str[2]);
System.out.println("log-->"+log.toString());
logsMapper.insertSelective(log);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.info("consumer receive message------->:{}", message);
}
}
在登录时发送已登录的消息:
package com.comit.appointment.modules.sys.controller.impl;
import java.io.IOException;
import java.util.Date;
import javax.servlet.http.HttpSession;
import org.apache.shiro.session.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import com.comit.appointment.modules.sys.controller.IBaseController;
import com.comit.appointment.modules.sys.entity.Users;
import com.comit.appointment.modules.sys.service.IOperaterService;
import com.comit.appointment.modules.sys.service.IUsersService;
import com.comit.appointment.modules.test.cs.MessageProducer2;
@Controller
@RequestMapping("/base")
public class BaseController implements IBaseController {
@Autowired
private IUsersService usersService;
@Qualifier(value="MessageProducer2")
@Autowired
private MessageProducer2 messageProducer;
@RequestMapping("/")
public String login() {
return "modules/sys/login";
}
@Override
@RequestMapping("/login")
public String login(HttpSession session,Users user) {
Users user1=usersService.findUser(user);
if(user1!=null) {
try {
messageProducer.sendMessage(user1.getACCOUNT()+","+new Date()+",登录了");
} catch (IOException e) {
e.printStackTrace();
}
session.setAttribute("user", user1);
return "modules/sys/index";
}
return "modules/sys/login";
}
}
在登录时便发送了一个消息,控制台打印:
在消息消费者那里把该消息插入了日志记录表:
注:运行项目时记得打开rabbitmq服务,要不然连接不上会报错。
没有安装rabbitmq的可以看看 Win10安装RabbitMQ