消息中间件(1)-JMS规范
一、什么是JMS,为什么需要它
采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;
客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
整体结构如下:
(1)、ConnectionFactory
package javax.jms;
public interface ConnectionFactory {
Connection createConnection() throws JMSException;//创建一个连接
Connection createConnection(String userName, String password)//创建一个有密码的连接
throws JMSException;
}
(2)、Connection
package javax.jms;
public interface Connection {
Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException;
String getClientID() throws JMSException; //唯一客户端ID
void setClientID(String clientID) throws JMSException;
ConnectionMetaData getMetaData() throws JMSException;
ExceptionListener getExceptionListener() throws JMSException;
void setExceptionListener(ExceptionListener listener) throws JMSException;
void start() throws JMSException; //开启连接
void stop() throws JMSException; //停止连接
void close() throws JMSException; //关闭连接
ConnectionConsumer createConnectionConsumer(
Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException;
ConnectionConsumer createDurableConnectionConsumer(
Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException;
}
JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection
(3)、Session:一个发送或接收消息的线程
package javax.jms;
import java.io.Serializable;
public interface Session extends Runnable {
static final int AUTO_ACKNOWLEDGE = 1; //四种模式
static final int CLIENT_ACKNOWLEDGE = 2;
static final int DUPS_OK_ACKNOWLEDGE = 3;
static final int SESSION_TRANSACTED = 0;
BytesMessage createBytesMessage() throws JMSException; //创建消息
MapMessage createMapMessage() throws JMSException;
Message createMessage() throws JMSException;
ObjectMessage createObjectMessage() throws JMSException;
ObjectMessage createObjectMessage(Serializable object) throws JMSException;
StreamMessage createStreamMessage() throws JMSException;
TextMessage createTextMessage() throws JMSException;
TextMessage createTextMessage(String text) throws JMSException;
boolean getTransacted() throws JMSException;
int getAcknowledgeMode() throws JMSException;
void commit() throws JMSException;
void rollback() throws JMSException;
void close() throws JMSException;
void recover() throws JMSException;
MessageListener getMessageListener() throws JMSException;
void setMessageListener(MessageListener listener) throws JMSException;
public void run();
MessageProducer createProducer(Destination destination) //创建消息生产者
throws JMSException;
MessageConsumer createConsumer(Destination destination)
throws JMSException;
MessageConsumer createConsumer(
Destination destination,
java.lang.String messageSelector)
throws JMSException;
MessageConsumer createConsumer(
Destination destination,
java.lang.String messageSelector,
boolean NoLocal)
throws JMSException;
Queue createQueue(String queueName) throws JMSException; //创建队列
Topic createTopic(String topicName) throws JMSException; //创建主题
TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者
throws JMSException;
TopicSubscriber createDurableSubscriber(
Topic topic,
String name,
String messageSelector,
boolean noLocal)
throws JMSException;
QueueBrowser createBrowser(Queue queue) throws JMSException;
QueueBrowser createBrowser(Queue queue, String messageSelector)
throws JMSException;
TemporaryQueue createTemporaryQueue() throws JMSException;
TemporaryTopic createTemporaryTopic() throws JMSException;
void unsubscribe(String name) throws JMSException;
}
JMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
JMS Parent |
PTP Domain |
Pub/Sub Domain |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成
(1)、用JNDI 得到ConnectionFactory对象;
(2)、用JNDI 得到目标队列或主题对象,即Destination对象;
(3)、用ConnectionFactory创建Connection 对象;
(4)、用Connection对象创建一个或多个JMS Session;
(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;
(6)、通知Connection 开始传递消息。
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Sender {
public static void main(String[] args) {
new Sender().send();
}
public void send() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(queue);
//Send messages
String messageText = null;
while (true) {
System.out.println("Enter message to send or 'quit':");
messageText = reader.readLine();
if ("quit".equals(messageText))
break;
TextMessage message = session.createTextMessage(messageText);
sender.send(message);
}
//Exit
System.out.println("Exiting...");
reader.close();
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
消息消费者 import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Receiver implements MessageListener {
private boolean stop = false;
public static void main(String[] args) {
new Receiver().receive();
}
public void receive() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
reader.close();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
//Wait for stop
while (!stop) {
Thread.sleep(1000);
}
//Exit
System.out.println("Exiting...");
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void onMessage(Message message) {
try {
String msgText = ((TextMessage) message).getText();
System.out.println(msgText);
if ("stop".equals(msgText))
stop = true;
} catch (JMSException e) {
e.printStackTrace();
stop = true;
}
}
}
Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:
(1)、提供点到点消息模式和发布/订阅消息模式;
(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;
(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;
(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。
下篇学习Active MQ