关于ActiveMQ中Session和Connection资源的管理
配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
一段发送消息的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
publicstaticvoidsend(){
try{
// 创建一个连接工厂
Stringurl="tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory=newActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection=connectionFactory.createConnection();
connection.start();
// 创建Session,参数解释:
// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
// 第二个参数消息的确认模式:
// AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
// CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
// DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination=session.createQueue("test");
// 创建消息生产者
MessageProducer producer=session.createProducer(destination);
// 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 创建消息
Stringtext="Hello ActiveMQ!";
TextMessage message=session.createTextMessage(text);
// 发送消息到ActiveMQ
producer.send(message);
System.out.println("Message is sent!");
// 关闭资源
session.close();
connection.close();
}
catch(Exceptione){
e.printStackTrace();
}
}
|
执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:
点击队列名test,然后点击消息ID即可查看消息内容,如图:
如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
publicstaticvoidget(){
try{
Stringurl="tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory=newActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection=connectionFactory.createConnection();
connection.start();
// 创建Session
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination=session.createQueue("test");
// 创建消息消费者
MessageConsumer consumer=session.createConsumer(destination);
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Message message=consumer.receive(1000);
if(message instanceofTextMessage){
TextMessage textMessage=(TextMessage)message;
Stringtext=textMessage.getText();
System.out.println("Received: "+text);
}else{
System.out.println("Received: "+message);
}
consumer.close();
session.close();
connection.close();
}catch(Exceptione){
e.printStackTrace();
}
}
|
执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:
这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。
在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:
private ConnectionFactory connectionFactory;
private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
private ObjectPoolFactory poolFactory;
private int maximumActive = 500;
private int maxConnections = 1;
private int idleTimeout = 30 * 1000;
private AtomicBoolean stopped = new AtomicBoolean(false);
private long expiryTimeout = 0l;
/* Sets the maximum number of active sessions per connection
*/
public void setMaximumActive(int maximumActive) {
this.maximumActive = maximumActive;
}
/**
* @return the maxConnections
*/
public int getMaxConnections() {
return maxConnections;
}
/**
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
protected ObjectPoolFactory createPoolFactory() {
return new GenericObjectPoolFactory(null, maximumActive);
}
public int getIdleTimeout() {
return idleTimeout;
}
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
/**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
*
* @param expiryTimeout non zero in milliseconds
*/
public void setExpiryTimeout(long expiryTimeout) {
this.expiryTimeout = expiryTimeout;
}
public long getExpiryTimeout() {
return expiryTimeout;
}
其中默认的Connecton连接数为1:
默认的每一个Connecton的创建的Session数量为500个。
默认是实现的PooledConnectionFactory类如下:
AmqJNDIPooledConnectionFactory
JcaPooledConnectionFactory
XaPooledConnectionFactory
PooledConnectionFactory
其中关于Session的管理的资源池使用如下类:
public class SessionPool implements PoolableObjectFactory
获取Session是从ConnectonPool中获取代码如下:
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = cache.get(key);
if (pool == null) {
pool = createSessionPool(key);
cache.put(key, pool);
}
PooledSession session = pool.borrowSession();
return session;
}
使用实例如下:
package easyway.app.activemq.demo.acknow;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ActiveMQ 中连接池的使用
*
* @author longgangbai
*
*/
public class ActiveMQConnectionPool {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class);
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
public void testEviction() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61619");
broker.start();
factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false");
pooledFactory = new PooledConnectionFactory(factory);
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
ActiveMQConnection amqC = connection.getConnection();
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
amqC.addTransportListener(new TransportListener() {
public void onCommand(Object command) {
}
public void onException(IOException error) {
// we know connection is dead...
// listeners are fired async
gotExceptionEvent.countDown();
}
public void transportInterupted() {
}
public void transportResumed() {
}
});
sendMessage(connection);
Connection connection2 = pooledFactory.createConnection();
sendMessage(connection2);
}
/**
* 发送消息的方法
* @param connection
* @throws JMSException
*/
private void sendMessage(Connection connection) throws JMSException {
//获取会话信息
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO"));
producer.send(session.createTextMessage("Test"));
session.close();
}
public static void main(String[] args) throws Exception {
ActiveMQConnectionPool test=new ActiveMQConnectionPool();
test.testEviction();
}
}
在ActiveMQResourceManage的配置如下:
/**
* This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
* in a way that will allow the transaction manager to correctly recover XA transactions.
*
* For example, it can be used the following way:
* <pre>
* <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
* <property name="brokerURL" value="tcp://localhost:61616" />
* </bean>
*
* <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
* <property name="maxConnections" value="8" />
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
*
* <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
* </pre>
*/