Spring整合ActiveMQ
安装activemq
1、linux系统下可以去官网下载tar文件
2、上传到linux系统进行解压;
3、在bin目录下zhixing./active.sh start ;保证系统已经安装了jdk
4、直接访问: 域名:8161
发送消息与spring整合
1、首先在项目中依赖相关jar包
<!-- 消息队列相关 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
2、发送消息的spring配置文件
<!-- 注册jmsconectionfactory -->
<bean id="targetConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://域名:61616" />
</bean>
<!-- spring connectionfactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="targetConnectionFactory" />
</bean>
<!-- 配置JMSTemplate -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!-- 配置队列 点对点queue -->
<bean id="queueDesination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!-- 配置 队列 topic -->
<bean id="topicDesination"
class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic">
</constructor-arg>
</bean>
3、编写测试类
@Test
public void sendMessage() {
//加载spring配置文件
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
//获取配置jmsTemplate模板
JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
//获取配置的点对点Destination对象
Queue queue = context.getBean("queueDesination", ActiveMQQueue.class);
//使用jmsTemplate模板进行消息发送
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//使用会话session进行消息创建
Message message = session.createTextMessage("你好activeMQ");
return message;
}
});
}
4、开始测试,查看后台管理页面:http://域名:8161/admin/ 默认密码是 admin admin
查看后台,此时看到queue我们配置spring-queue有进站一条消息
接收消息与spring整合
1、同样引入依赖
2,、接收消息需要一个监听器需要自己编写然后进行配置整合
public class MQMessageLisener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
//进行类型转换,注意这里强转成功是因为父类的实例对象就是子类!
TextMessage textMessage = (TextMessage) message;
//取出消息
String string = textMessage.getText();
System.err.println(string);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3、配置spring文件
<bean id="targetConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://域名:61616" />
</bean>
<!-- spring connectionfactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="targetConnectionFactory" />
</bean>
<!-- 消息类型 -->
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination"
class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 配置接收消息的相关bean 这里是你自己写的监听器-->
<bean id="myMessageListener"
class="com.zfx.listener.MQMessageLisener" />
<!-- 整合 -->
<bean
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
4、编写测试类、
由于监听器需要监听消息,所以需要程序等待,如果不等待会导致spring容器穿件就会终止,这样监听器不会接收到消息就会销毁
@Test
public void getMessage() throws IOException {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext-getactivemq.xml");
//程序等待
System.in.read();
}
5、查看后台
接受成功
此时队列里等待消费的消息已经为0出站消息为 1说明我们发送的消息被消费
注意事项
当我们发送topic消息时当没有消费者的时候消息就会消失,不会等待消费!