ActiveMQ高级
文章目录
1.ActiveMQ结合spring开发
Spring提供了对JMS的支持,需要添加Spring 支持JMS的包
1.1添加jar依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
<!--结合spring-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.0.1.RELEASE-A</version>
</dependency>
1.2生产者配置
1.2.1 service-jms-provider.xml
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.98.165:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="50"/>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue"/>
</bean>
<!--<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
1.2.2编写发送端代码
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/service-jms-provider.xml");
JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message=session.createTextMessage();
message.setText("Hello,charjay");
return message;
}
});
}
1.3 消费者配置
1.3.1 service-jms-consumer.xml
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://192.168.98.165:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="50"/>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue"/>
</bean>
<!-- <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
1.3.2 编写接收端代码
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/service-jms-consumer.xml");
JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
String msg=(String)jmsTemplate.receiveAndConvert();
System.out.println(msg);
}
1.4 spring的发布订阅模式配置
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destinationTopic"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
1.5 以事件通知方式来配置消费者
1.5.1 更改消费端的配置
<!--监听-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
<bean id="messageListener" class="com.charjay.spring.SpringJmsListener"/>
1.5.2 增加FirstMessageListener监听类
public class SpringJmsListener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
1.5.3 启动spring容器
public static void main(String[] args) {
ClassPathXmlApplicationContext context=
new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/service-jms-consumer.xml");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
1.6 容错连接
failover:(tcp://192.168.98.165:61616,tcp://192.168.98.166:61616)
2.ActiveMQ支持的传输协议
1)client端和broker端的通讯协议:TCP、UDP 、NIO、SSL、Http(s)、vm
2)定义一个nio协议
vim conf/activemq.xml
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
重启后使用
public final static String ACTIVE_MQ_URL="nio://192.168.98.165:61616";
3.ActiveMQ持久化存储
3.1 kahaDB 默认的存储方式
-
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
3.2 AMQ 基于文件的存储方式
- 写入速度很快,容易恢复。
- 文件默认大小是32M
3.3 JDBC 基于数据库的存储
ACTIVEMQ_ACKS : 存储持久订阅的信息
ACTIVEMQ_LOCK : 锁表(用来做集群的时候,实现master选举的表)
ACTIVEMQ_MSGS : 消息表
3.3.1 第一步
3.3.2 第二步
3.3.3 第三步
添加jar包依赖
commons-dbcp-1.4.jar
commons-pool-1.6.jar
mysql-connector-java-5.1.35.jar
3.3.4 JDBC Message store with activeMQ journal
- 引入了快速缓存机制,缓存到Log文件中
- 性能会比jdbc store要好
- JDBC Message store with activeMQ journal 不能应用于master/slave模式
- Memory 基于内存的存储
3.4 内存
3.4 LevelDB
5.8以后引入的持久化策略。通常用于集群配置
4.ActiveMQ的网络连接
activeMQ如果要实现扩展性和高可用性的要求的话,就需要用用到网络连接模式
4.1NetworkConnector
主要用来配置broker与broker之间的通信连接
如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到
NetworkConnector是一个高性能方案,但并不是一个高可用的方案,因为如果其中一台broker挂了,就消费不了
4.2静态网络连接
4.2.1 配置
修改activemq.xml,增加如下内容两个Brokers通过一个staic的协议来进行网络连接。一个Consumer连接到BrokerB的一个地址上,当Producer在BrokerA上以相同的地址发送消息是,此时消息会被转移到BrokerB上,也就是说BrokerA会转发消息到BrokerB上
4.2.2 丢失的消息
一些consumer连接到broker1、消费broker2上的消息。消息先被broker1从broker2消费掉,然后转发给这些consumers。假设,转发消息的时候broker1重启了,这些consumers发现brokers1连接失败,通过failover连接到broker2.但是因为有一部分没有消费的消息被broker2已经分发到broker1上去了,这些消息就好像消失了。除非有消费者重新连接到broker1上来消费
从5.6版本开始,在destinationPolicy上新增了一个选项replayWhenNoConsumers属性,这个属性可以用来解决当broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当作重复消息而不被分发
通过如下配置,在activeMQ.xml中。 分别在两台服务器都配置。即可完成消息回流处理
4.3 动态网络连接
静态网络连接如果有很多台broker,那么配置就很麻烦
multicast配置
5.高可用部署方案
5.1通过zookeeper+activemq实现
1.修改activeMQ
-
启动zookeeper服务器
-
启动activeMQ
参数的意思
directory: levelDB数据文件存储的位置
replicas:计算公式(replicas/2)+1 , 当replicas的值为2的时候, 最终的结果是2. 表示集群中至少有2台是启动的
bind: 用来负责slave和master的数据同步的端口和ip
zkAddress: 表示zk的服务端地址
hostname:本机ip
5.2 jdbc存储的主从方案
基于LOCK锁表的操作来实现master/slave
5.3 基于共享文件系统的主从方案
挂载网络磁盘,将数据文件保存到指定磁盘上即可完成master/slave模式
高可用+高性能方案
6.ActiveMq监控
ActiveMQ自带的管理界面的功能十分简单,只能查看ActiveMQ当前的Queue和Topics等简单信息,不能监控ActiveMQ自身运行的JMX信息等
hawtio
HawtIO 是一个新的可插入式 HTML5 面板,设计用来监控 ActiveMQ, Camel等系统;ActiveMQ在5.9.0版本曾将hawtio嵌入自身的管理界面,但是由于对hawtio的引入产生了争议,在5.9.1版本中又将其移除,但是开发者可以通过配置,使用hawtio对ActiveMQ进行监控。本文介绍了通过两种配置方式,使用hawtio对ActiveMQ进行监控。
- 从http://hawt.io/getstarted/index.html 下载hawtio的应用程序
- 下载好后拷贝到ActiveMQ安装目录的webapps目录下,改名为hawtio.war并解压到到hawtio目录下
- 编辑ActiveMQ安装目录下conf/jetty.xml文件,在第75行添加以下代码
<bean class="org.eclipse.jetty.webapp.WebAppContext">
<property name="contextPath" value="/hawtio" />
<property name="war" value="${activemq.home}/webapps/hawtio" />
<property name="logUrlOnStart" value="true" />
</bean>
- 修改bin/env文件
-Dhawtio.realm=activemq -Dhawtio.role=admins
-Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal
需要注意的是-Dhawtio的三个设定必须放在ACTIVEMQ_OPTS设置的最前面(在内存参数设置之后),否则会出现验证无法通过的错误(另外,ACTIVEMQ_OPTS的设置语句不要回车换行)
- 启动activeMQ服务。访问http://ip:8161/hawtio