【涛涛商城】activeMQ01_小试牛刀
目录
1、背景
项目需求:由于商家需要添加新的商品,后台需要及时对数据库进行更新,即更新索引库。
问题分析:
方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。
缺点:业务逻辑耦合度高,业务拆分不明确
方案二:业务逻辑在taotao-search中实现,调用服务在taotao-manager实现。业务逻辑分开。
缺点:服务之间的耦合度变高。服务的启动有先后顺序(需要先启动taotao-search,然后taotao-manager才能调用)。
方案三:使用消息队列。MQ是一个消息中间件。相当于是一个秘书。(董事长宣布开会)
2、ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象(较为常用)
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个字节的数据流
3、ActiveMQ的安装
3.1 安装环境:
- 需要jdk
- 安装Linux系统。生产环境都是Linux系统。
3.2 安装步骤
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
put G:\Java就业\【阶段17】淘淘商城综合项目\参考资料\ActiveMQ\apache-activemq-5.12.0-bin.tar.gz
第二步:解压缩。tar xvf apache-activemq-5.12.0-bin.tar.gz
第三步:启动。
使用bin目录下的activemq命令启动:
[[email protected] bin]# ./activemq start
关闭:
[[email protected] bin]# ./activemq stop
查看状态:
[[email protected] bin]# ./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包(会有jar包依赖冲突)。建议使用5.11.2
进入管理后台:
http://192.168.93.88:8161/admin
用户名:admin
密码:admin
4、ActiveMQ使用方法:
4.1、Queue
4.1.1 Producer
生产者:生产消息,发送端。
把jar包添加到工程中。使用5.11.2版本的jar包。
业务逻辑
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
@Test
public void testQueueProducer() throws Exception{
// 业务逻辑
// 1、创建ConnectionFactory对象,需要制定服务端的ip和端口号.接口是不能new 的,需要new一个他的实现类
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.88:61616");
// 2、使用connectionFactory对象创建一个Connection对象,接口里面定义好的,是一套规范
Connection connection = connectionFactory.createConnection();
// 3、开启连接(调用connection的start方法)
connection.start();
// 4、使用connection对象创建一个session对象
// 第一个参数:是否开启事务,一般不使用分布式事务。跨平台数据库。true:开启事务,第二个参数忽略。
// 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用session对象创建一个destination对象(topic、queue)此处创建一个Queue一对一类型消息 /参数:队列的名称。
Queue createQueue = session.createQueue("test-queue");
// 6、使用session创建一个Producer对象
MessageProducer producer = session.createProducer(createQueue);
// 7、创建一个Message对象,创建一个TextMessage(存储的信息域)
// 第一种方式
// TextMessage message = new ActiveMQTextMessage();
// message.setText("hello world");
// 第二种方式
TextMessage message = session.createTextMessage("hello");
// 8、使用producer对象发送消息
producer.send(message);
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
4.1.2 Consumer 业务逻辑
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
@Test
public void testQueueConsumer() throws Exception{
// 1、创建一个ConnectionFactory对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.88:61616");
// 2、从ConnectionFactory对象中获得一个Connection对象
Connection connection = connectionFactory.createConnection();
// 3、开启连接(使用start方法)
connection.start();
// 4、创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用session创建一个Destination,这里我们使用和发送端一致的queue,。并且队列名一致
Queue queue = session.createQueue("test-queue");
// 6、使用Session创建一个Consume对象
MessageConsumer consumer = session.createConsumer(queue);
// 7、接收消息,匿名内部类形式,实现MessageListener这个接口所包含的方法
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取消息的内容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
// 8、等待消息结束
// 第一种
System.in.read();
// 第二种
/*
* while(true) {
Thread.sleep(100);
}
*/
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
4、2 Topic
4.2.1 Producer
业务逻辑:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
/**
* topic 发布情况 不是可持久化的。
*/
@Test
public void testTopicProducer()throws Exception{
// 1、创建ConnectionFactory对象,包括ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.88:61616");
// 2、使用connectionfactory工厂取出一个connection对象
Connection connection = connectionFactory.createConnection();
// 3、开启连接
connection.start();
// 4、创建session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用session创建destination topic形式
Topic topic = session.createTopic("test-topic");
// 6、使用session创建一个producer对象
MessageProducer producer = session.createProducer(topic);
// 7、创建一个TextMessage对象
TextMessage message = session.createTextMessage("topic形式");
// 8、使用producer发送信息
producer.send(message);
// 9、关闭连接
producer.close();
session.close();
connection.close();
}
4.2.2 Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
@Test
public void testTopicConsumer() throws Exception{
// 消费者:接收消息。
// 第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.93.88:61616");
// 第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接。调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
String text=null;
try {
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.out.println("topic的消费端01。。。。。");
//System.out.println("topic的消费端02。。。。。");
//System.out.println("topic的消费端03。。。。。");
// 等待键盘输入
System.in.read();
// 第九步:关闭资源
consumer.close();
session.close();
connection.close();
}