Activemq 基础一
一、定义
Activemq是一个MOM,具体来说是一个实现了JMS规范的系统间远程通信的消息代理
MOM:面向消息的中间件(Message-oriented middleware),用于已分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。MOM的整体思想是它作为消息发送器和消息接收器之间的消息中间件,提供一个松耦合
JMS:JAVA消息服务(Java Message Service),是java平台上有关面向MOM的技术规范,旨在通过提供标准的生产、发送、接受和处理消息的API简化企业的应用开发,类似于JDBC和关系型数据库通信方式的抽象
provider:纯java语言编写的JMS接口实现(Activemq就是)
Domains:消息传递方式,包括点对点(P2P),发布/订阅两种
ConnectionFactory:客户端使用连接工厂来创建与JMS Provider的连接
Destination:消息被寻址,发送及接收的对象
P2P和PUB/SUB的区别:
P2P域使用queue作为Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个Consumer传送一次,Consumer可以使用MessageConsumer.receive()同步的接收消息,也可以通过MessagConsumer.setMessageListener()注册一个MessageListener实现异步接收。
多个Consumer可以注册到同一个queue上,但一个消息只能被一个Consumer所接收,然后由该Consumer来确认消息,这种情况下,Provider对所有注册的Consumer以轮询的方式发送消息
Pub/Sub(Publish/Subscribe)消息域使用topic作为Destination,发布者向topic发送消息,订阅者注册接收来自topic的消息。发送到topic的任何消息都将自动传递给所有的订阅者。接收方式(同步和异步)与P2P域相同。除非显示指定,否则topic不会为订阅者保留信息。当然,这可以通过持久化(Durable)订阅来实现消息的保存,这种情况下,当订阅者与provider断开时,provider会为他存储信息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的信息。
二、程序编写步骤
- 获取连接工厂
- 使用连接工厂创建连接
- 启动连接
- 从连接创建会话
- 获取 Destination
- 创建 Producer,或
- 创建 Producer
- 创建 message
- 创建 Consumer,或发送或接收message发送或接收 message
- 创建 Consumer
- 注册消息监听器(可选)
- 发送或接收 message
- 关闭资源(connection, session, producer, consumer 等)
public class JMSDemo {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
MessageConsumer consumer;
Message message;
boolean useTransaction = false;
try {
Context ctx = new InitialContext();
connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
//使用ActiveMQ时:connectionFactory = new ActiveMQConnectionFactory(user, password, tOptimizeBrokerUrl(broker));
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST.QUEUE");
//生产者发送消息
producer = session.createProducer(destination);
message = session.createTextMessage("this is a test");
//消费者同步接收
consumer = session.createConsumer(destination);
message = (TextMessage) consumer.receive(1000);
System.out.println("Received message: " + message);
//消费者异步接收
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null) {
doMessageEvent(message);
}
}
});
} catch (JMSException e) {
...
} finally {
producer.close();
session.close();
connection.close();
}
}
三、Activemq的存储
Activemq中queue中存储Message时,采用先进先出顺序存储。同一个时间同一个消息被分配给当消费者,且只有当Message被消费并确认时,它才能从存储中删除
对于持久化订阅者来说,每个消息者获得message 的副本。为了节省空间,Provider仅存储消息的一个副本,持久化订阅者维护了指向下一个Message的指针,并将其副本分派给消费者。以这种方式实现消息存储,因为每个持久化订阅者可能以不同的速率消费Message,或者他们不可能不是全部同时运行。此外,因每个Message可能存在多个消费者,所以他被成功的传递给所有持久化订阅者之前,不能从存储中删除。
消息类型 | 是否持久化 | 是否有Durable订阅者 | 消费者延迟启动时,消息是否保留 | Broker重启时,消息是否保留 |
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |