ActiveMQ--(pub/sub)模型
发布订阅模式是一对多的方式。就是消息提供者提供一个Topic,可以由多个消息消费者订阅。
下图是发布订阅模式的示意图:
订阅模型可以分为非持久订阅和持久订阅。当所有的消息必须接受的时候,则需要用到持久订阅,反之,则用非持久订阅
非持久订阅:
消费者只能消费提供者在订阅操作之后时提供的消息。也就是说消费者启动后,提供者再提供消息,消费者能接受此时发送的消息,但之前提供的消息不能订阅。
消息消费者及提供者设置:
Destination destination = session.createTopic("first-topic");
以下案例:
设置两个消费者:JmsTopicReceiver JmsTopicReceiver2
设置一个提供者:JmsTopicSender
先启动两个消费者,此时处于阻塞状态,再启动消息提供者,两消费者接受消息。
消息提供者(在消费者启动后再运行)
public class JmsTopicSender {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
Connection connection=null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("first-topic");
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("今天天气很好啊");
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消息消费者1:
public class JmsTopicReceiver {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
Connection connection=null;
try {
connection=connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination= session.createTopic("first-topic");
MessageConsumer consumer = session.createConsumer(destination);
TextMessage textMessage= (TextMessage)consumer.receive();
System.out.println("消费者1:"+textMessage.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消息消费者2
public class JmsTopicReceiver2 {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
Connection connection=null;
try {
connection=connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination= session.createTopic("first-topic");
MessageConsumer consumer = session.createConsumer(destination);
TextMessage textMessage= (TextMessage)consumer.receive();
System.out.println("消费者2:"+textMessage.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行结果为:消费者1:今天天气很好啊 消费者1:今天天气很好啊
持久订阅:
当所有消息都需接受时,采用永久订阅。
运行流程:
(1)先运行消费者,注册clientId,然后关闭
(2)运行消息提供方,提供消息
(3)最后运行消费者,此时能订阅消息
说明此时消费者能订阅之前的消息
主要在客户端配置:
设置消费端id
connection.setClientID("DUBBO-ORDER");
生成topic
Topic topic = session.createTopic("first-topic");
消费者设置为永久订阅 (注意参数值,与上面两值一致)
MessageConsumer consumer = session.createDurableSubscriber(topic, "DUBBO-ORDER");
public class JmsTopicPersistenteReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
Connection connection=null;
try {
connection= connectionFactory.createConnection();
//持久订阅设置ID
connection.setClientID("DUBBO-ORDER");
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("first-topic");
//MessageConsumer consumer = session.createConsumer(destination);
//创建持久订阅,只能接受具体的topic
MessageConsumer consumer = session.createDurableSubscriber(topic, "DUBBO-ORDER");
TextMessage textMessage= (TextMessage)consumer.receive();
System.out.println(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}