rabbitmq笔记
用户名:guest
密码:guest
virtual hosts 管理
virtual hosts相当于mysql的db
一般以/开头
我们得对用户进行授权
一、简单队列
1 简单队列
1.1模型
P:消息的生产者
红色的队列
C:消息的消费者
3个对象:生产者 队列rabbitmq 消费者
1.2获取MQ链接
package com.cmbc.rabbitmq.util;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.util * @ClassName: ConnectionUtils * @Description: * @author: zhangqin * @since: 2019/5/15 20:46 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class ConnectionUtils {
/*** * @Author zhangqin * @Description * 获取mq的连接 工具类 * @Date 2019/5/15 20:49 * @Param [] * @return com.rabbitmq.client.Connection **/ public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂 ConnectionFactory factory = new ConnectionFactory();
//设置服务地址 factory.setHost("127.0.0.1");
//设置端口 AMQP 5672 factory.setPort(5672);
//设置/vhost_zq factory.setVirtualHost("/vhost_zq");
//设置用户名 factory.setUsername("guest");
//设置密码 factory.setPassword("guest");
return factory.newConnection();
}
}
|
1.3 简单队列生产者代码
package com.cmbc.rabbitmq.simple;
import com.cmbc.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @Package: com.cmbc.rabbitmq.simple
* @ClassName: Producer
* @Description:
* @author: zhangqin
* @since: 2019/5/15 20:52
* @version: 1.0
* @Copyright: 2019 zhangqin. All rights reserved.
*/
public class Producer {
private static final String QUEUE_NAME = "zhangqin_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
Map<String, Object> argments = null;
//声明一个队列
channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
String msg = "hello simple!!!";
//发布消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send " + msg);
channel.close();
connection.close();
}
}
|
1.4 简单队列消费者代码
package com.cmbc.rabbitmq.simple;
import com.cmbc.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Package: com.cmbc.rabbitmq.simple
* @ClassName: Consumer
* @Description: 消费者消费消息
* @author: zhangqin
* @since: 2019/5/15 21:05
* @version: 1.0
* @Copyright: 2019 zhangqin. All rights reserved.
*/
public class Consumer {
private static final String QUEUE_NAME = "zhangqin_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
//接收消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("rev msg" + msg);
}
}
}
|
1.5消费者新的api
package com.cmbc.rabbitmq.simple;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.simple * @ClassName: Consumer * @Description: 消费者消费消息 * @author: zhangqin * @since: 2019/5/15 21:05 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer {
private static final String QUEUE_NAME = "zhangqin_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取一个连接 Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道 Channel channel = connection.createChannel();
boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//获取到到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("new api rev msg:" + msg); } };
//监听队列 channel.basicConsume(QUEUE_NAME, true, consumer);
}
/*** * @Author zhangqin * @Description 消费者老的api * @Date 2019/5/16 10:36 * @Param [] * @return void **/ public static void oldApi() throws IOException, TimeoutException, InterruptedException {
//获取一个连接 Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道 Channel channel = connection.createChannel();
//定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列 channel.basicConsume(QUEUE_NAME, true, consumer);
//接收消息 while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("rev msg" + msg);
}
}
}
|
1.6简单队列的不足
耦合性高,生产者一一对应消费者(如果我想要有多个消费者消费队列中的消息,这时候就不行了),队列名字变更,这时候得同时变更。
2 Work queue 工作队列
2.1模型
为什么会出现工作队列?
Simple队列是一一对应的,而我们实际开发,生产者发送消息是毫不费力的。而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理。可能需要花费时间。这时候队列就会积压很多消息。那么一个消费者不够用,我们就通过多个消费者来解决这个问题。
2.2 生产者
package com.cmbc.rabbitmq.workqueue;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import sun.nio.cs.ext.MacGreek;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/16 10:58 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_work_queue";
/*** * @Author zhangqin * @Description * |--consumer1 * producer--queue--| * |--consumer2 * * @Date 2019/5/16 10:59 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.发送消息 for (int i = 0; i < 50; i++) {
String msg = "hello " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send msg:"+ msg); Thread.sleep(i * 20); }
//6.关闭资源 channel.close(); connection.close(); }
}
|
2.3 消费者1
package com.cmbc.rabbitmq.workqueue;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 11:04 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer1 {
private static final String QUEUE_NAME = "zhangqin_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[1] work queue rev msg:" + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); } } };
//6.消费者开始监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
2.4 消费者2
package com.cmbc.rabbitmq.workqueue;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 11:04 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer2 {
private static final String QUEUE_NAME = "zhangqin_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[2] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); } } };
//6.消费者开始监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
2.5现象
消费者1和消费者2处理的消息是一样的。是均分的。
消费者1都是偶数
消费者2都是奇数
这种方式叫做轮询分发(round-robin)
结果就是不论谁忙或者谁清闲,都不会去多给任务。任务消息总是你一个我一个这样均分的。
3.公平分发 fair dipatch
3.1生产者
package com.cmbc.rabbitmq.workfair;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/16 10:58 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_work_queue";
/*** * @Author zhangqin * @Description * |--consumer1 * producer--queue--| * |--consumer2 * * @Date 2019/5/16 10:59 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//5.发送消息 for (int i = 0; i < 50; i++) {
String msg = "hello " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send msg:" + msg); Thread.sleep(i * 5); }
//6.关闭资源 channel.close(); connection.close(); }
}
|
3.2消费者1
package com.cmbc.rabbitmq.workfair;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 11:04 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer1 {
private static final String QUEUE_NAME = "zhangqin_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//5.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[1] work queue rev msg:" + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
3.3消费者2
package com.cmbc.rabbitmq.workfair;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.workqueue * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 11:04 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer2 {
private static final String QUEUE_NAME = "zhangqin_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//5.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[2] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
3.4现象
消费者2处理的消息比消费者1多,大概比例为2:1,能者多劳。
4.消息应答Message acknowledgment与消息持久化
4.1消息应答
boolean autoAck = false;//自动应答为false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
boolean autoAck = true;(自动确认模式,一旦rabbitmq将消息分发给消费者就会从内存中删除)
这种情况下,如果杀死正在执行的消费者,那么就会丢失正在处理的消息。
boolean autoAck = false;(手动确认模式),如果有一个消费者1挂掉了,那么生产者将会把发送给消费者1的消息发送给其他消费者,即超时没有获取到应答的情况这样做。Rabbitmq支持消息应答,消费者发送一个消息应答告诉rabbitmq这个消息我已经处理完成你可以删除了,然后rabbitmq就会删除内存中的消息。
消息应答默认是打开的。即为默认是false,默认是手动应答。(消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。)
如果rabbitmq挂了,我们的消息仍然会丢失!!!
4.2消息持久化
//3.声明队列
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
Map<String, Object> argments = null;
//4.声明一个队列
channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
我们将程序中的boolean durable = false;改成true是不可以的。尽管代码不报错,但是已经声明的队列的持久化方式是不允许更改的。因为已经定义了QUEUE_NAME,这个queue是不持久化的,rabbitmq不允许重新定义(不同参数的)一个已存在的队列。
解决方法:将该queue删除,然后再重新定义声明。
5 订阅模式Publish/Subscribe
5.1模型
- 一个生产者,多个消费者
- 每个消费者都有自己的一个队列
- 生产者没有将消息直接发送给队列,而是发送到交换机里面exchange
- 每个队列都要绑定到交换机上面
- 生产者发送的消息经过交换机,然后到达队列,实现一个消息被多个消费者消费。
5.2生产者
package com.cmbc.rabbitmq.PublishSubscribe;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.PublishSubscribe * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/16 15:24 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String EXCHANGE_NAME = "zhangqin_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明交换机 fanout是分发的意思 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//4.发送一个消息 String msg = "hello Publish/Subscribe";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("exchange send:" + msg);
//5.关闭资源 channel.close();
connection.close(); }
}
|
消息哪里去了啊?丢失了!!!因为交换机没有存储的能力,在rabbitmq里面只有队列有存储的能力。因为这个时候还没有队列绑定到这个交换机,所以数据丢失了。
让交换机绑定队列
5.3 消费者1
6. package com.cmbc.rabbitmq.PublishSubscribe;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.PublishSubscribe * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 15:34 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer1 {
private static final String QUEUE_NAME = "zhangqin_fanout_queue_email";
private static final String EXCHANGE_NAME = "zhangqin_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.将队列绑定到交换机上 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//6.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[1] work queue rev msg:" + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); } } };
//7.消费者开始监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
5.4 消费者2
package com.cmbc.rabbitmq.PublishSubscribe;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.PublishSubscribe * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/16 15:34 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer2 {
private static final String QUEUE_NAME = "zhangqin_fanout_queue_sms";
private static final String EXCHANGE_NAME = "zhangqin_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取链接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.将队列绑定到交换机上 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//6.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[2] work queue rev msg:" + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); } } };
//7.消费者开始监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
结果
6.交换机exchange (转发器)
一方面是接收生产者的消息,另一方面是向队列推送消息。
匿名转发:“”
Fanout;不处理路由器
Direct(处理路由器)
7路由模式
7.1模型
7.2 生产者
package com.cmbc.rabbitmq.routing;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.routing * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/18 10:55 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String EXCHANGE_NAME = "zhangqin_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//4.准备发送的消息 String msg = "hello direct";
//5.发布队列 String routingkey = "info"; channel.basicPublish(EXCHANGE_NAME, routingkey, null, msg.getBytes());
System.out.println("send "+msg);
//6.关闭资源 channel.close(); connection.close();
}
}
|
7.3 消费者1
package com.cmbc.rabbitmq.routing;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.routing * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/18 11:01 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer1 {
private static final String EXCHANGE_NAME = "zhangqin_exchange_direct"; private static final String QUEUE_NAME = "zhangqin_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.队列绑定交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
//6.每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//7.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[1] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
7.4 消费者2
package com.cmbc.rabbitmq.routing;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.routing * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/18 11:01 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer2 {
private static final String EXCHANGE_NAME = "zhangqin_exchange_direct"; private static final String QUEUE_NAME = "zhangqin_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.队列绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
//6.每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//7.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[2] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
8.Topics exchange
将路由和某个模式匹配
#匹配一个或者多个
*匹配一个
8.1模型
商品:发布、删除、修改、查询
8.2 生产者
package com.cmbc.rabbitmq.topic;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.topic * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/18 11:33 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String EXCHANGE_NAME = "zhangqin_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//4.发送消息 String msg="商品..."; channel.basicPublish(EXCHANGE_NAME,"good.add",null,msg.getBytes()); //channel.basicPublish(EXCHANGE_NAME,"good.delete",null,msg.getBytes()); System.out.println("send "+msg);
//6.关闭资源 channel.close(); connection.close();
}
}
|
8.3消费者1
package com.cmbc.rabbitmq.topic;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.routing * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/18 11:01 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer1 {
private static final String EXCHANGE_NAME = "zhangqin_exchange_topic"; private static final String QUEUE_NAME = "zhangqin_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.队列绑定交换机 只是绑定good.add channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good.add");
//6.每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//7.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[1] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
8.4 消费者2
package com.cmbc.rabbitmq.topic;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.routing * @ClassName: Consumer1 * @Description: * @author: zhangqin * @since: 2019/5/18 11:01 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer2 {
private static final String EXCHANGE_NAME = "zhangqin_exchange_topic"; private static final String QUEUE_NAME = "zhangqin_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//4.声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//5.队列绑定交换机 绑定good.# channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good.#");
//6.每个消费者发送确认消息之前,生产者只发送一个消息给你,且不发送下一个消息,每次只处理一个消息 //限制同一个消费者不得超过一条消息 int perfetchCount = 1; channel.basicQos(perfetchCount);
//7.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[2] work queue rev msg:" + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } } };
//6.消费者开始监听队列 boolean autoAck = false;//自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
9.rabbitmq 消息确认机制(事务+confirm)
在rabbitmq中国我们可以通过持久化数据来解决rabbitmq服务器的数据丢失的问题。
问题:
生产者将消息送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的。
两种方式:
- AMQP实现了事务机制
- Confirm模式
9.1事务机制
txSelect txCommit txRollback
txSelect:用户将当前的channel设置成为transaction模式;
txCommit:用于提交事务
txRollback:回滚事务
9.1.1生产者
package com.cmbc.rabbitmq.transaction;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.transaction * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/19 8:35 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_queue_transaction";
/*** * @Author zhangqin * @Description * rabbitmq 消息确认机制(事务) * @Date 2019/5/19 8:42 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明一个队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//4.发送消息 try { String msg = "hello transaction msg"; channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//故意制造异常 int i = 1 / 0;
channel.txCommit(); System.out.println("send msg:‘" + msg + " ’commit"); } catch (Exception e) {
e.printStackTrace(); channel.txRollback(); System.out.println("send msg rollback");
}
//5.关闭资源 channel.close(); connection.close();
}
}
|
9.1.2 消费者
package com.cmbc.rabbitmq.transaction;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.transaction * @ClassName: Consumer * @Description: * @author: zhangqin * @since: 2019/5/19 8:43 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer {
private static final String QUEUE_NAME = "zhangqin_queue_transaction";
public static void main(String[] args) throws IOException, TimeoutException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明一个队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//4.定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {
//一旦有消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("transaction work queue rev msg:‘" + msg+"’");
} };
//5.消费者开始监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
|
9.1.3 缺点
这种模式还是很耗时的,采用这种方式,降低了rabbitmq的吞吐量。
9.2 confirm模式
9.2.1生产者端confirm模式的实现原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派给一个唯一的id(从1开始),一旦消息被投递到所有的匹配的队列中以后,broker就会发送一个确认给生产者(包括消息的唯一id),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久的,那么确认消息会将消息写入磁盘后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的***,此外broker也可以设置basic.ack的multiple域,表示这个***之前的所有消息得到了处理。
Confirm模式的最大的好处是在于他是异步的,也就是发送了一条消息以后没有得到反馈之前,也可以发送另一条消息。
开启confirm模式
Channel.confirmSelect();
9.2.2编程模式
9.2.2.1普通:发一条调用waitForConfirms(),相当于串行
生产者
package com.cmbc.rabbitmq.confirm.simple;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.confirm * @ClassName: Producer * @Description: 1. 普通:发一条调用waitForConfirms(),相当于串行 * @author: zhangqin * @since: 2019/5/19 9:12 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_queue_confirm1";
/*** * @Author zhangqin * @Description * 1. 普通:发一条调用waitForConfirms(),相当于串行 * @Date 2019/5/19 9:21 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明一个队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//生产者调用confirmSelect() 将channel设置为confirm模式 //注意:如果之前已经将队列设置为txSelect则不能再设置为confirmSelect模式了 channel.confirmSelect();
//4.发送消息 String msg = "hello confirm msg"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
if (!channel.waitForConfirms()) {
System.out.println("msg send failed");
} else {
System.out.println("msg send ok");
} //5.关闭资源 channel.close(); connection.close(); }
}
|
9.2.2.2批量的:发一批调用waitForConfirms()
package com.cmbc.rabbitmq.confirm.batch;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.confirm * @ClassName: Producer * @Description: 2. 批量的:发一批调用waitForConfirms() * @author: zhangqin * @since: 2019/5/19 9:12 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_queue_confirm2";
/*** * @Author zhangqin * @Description * 2. 批量的:发一批调用waitForConfirms() * @Date 2019/5/19 9:21 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明一个队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//生产者调用confirmSelect() 将channel设置为confirm模式 //注意:如果之前已经将队列设置为txSelect则不能再设置为confirmSelect模式了 channel.confirmSelect();
//4.批量发送消息 String msg = "hello confirm batch msg"; for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, (msg + i).getBytes());
}
//确认批量发送的结果 if (!channel.waitForConfirms()) {
System.out.println("msg batch send failed");
} else {
System.out.println("msg batch send ok");
} //5.关闭资源 channel.close(); connection.close(); }
}
|
9.2.2.3异步confirm模式:提供一个回调方法
Channel对象提供的confirmLisenter()回调方法只包含deliverTag(当前channel发出的消息***码),我们需要自己为每一个channel维护一个unconfirm的消息序列集合,每publish一条数据,集合元素加一,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录,从程序的运行效率来看,这个unconform集合最好采用有序集合sortedset存储结构。
生产者
package com.cmbc.rabbitmq.confirm.asynchronous;
import com.cmbc.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.*; import java.util.concurrent.TimeoutException;
/** * @Package: com.cmbc.rabbitmq.confirm * @ClassName: Producer * @Description: 3. 异步confirm模式:提供一个回调方法 * @author: zhangqin * @since: 2019/5/19 9:12 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Producer {
private static final String QUEUE_NAME = "zhangqin_queue_confirm3";
/*** * @Author zhangqin * @Description * 3. 异步confirm模式:提供一个回调方法 * @Date 2019/5/19 9:21 * @Param [args] * @return void **/ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.获取连接 Connection connection = ConnectionUtils.getConnection();
//2.获取channel 频道 Channel channel = connection.createChannel();
//3.声明一个队列 boolean durable = false; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> argments = null;
//声明一个队列 channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, argments);
//生产者调用confirmSelect() 将channel设置为confirm模式 //注意:如果之前已经将队列设置为txSelect则不能再设置为confirmSelect模式了 channel.confirmSelect();
//未确认的消息标识 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//通道添加监听 channel.addConfirmListener(new ConfirmListener() {
//消息发送成功的 @Override public void handleAck(long deliverTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("---handleAck---multiple"); confirmSet.headSet(deliverTag + 1).clear(); } else {
System.out.println("---handleAck---multiple false"); confirmSet.remove(deliverTag);
}
}
//消息发送不成功的 @Override public void handleNack(long deliverTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("---handleNack---multiple"); //TODO } else {
System.out.println("---handleNack---multiple false"); //TODO
}
} });
//4.发送消息 String msg = "hello confirm asynchronous msg"; while (true) {
long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo); }
}
}
|
10.Spring集成rabbitmq
10.1生产者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--1.定义rabbitmq的连接工厂--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/vhost_zq"/>
<!--2.定义rabbitmq模板,指定连接工厂以及定义exchange--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"/>
<!--mq的管理:包括队列交换器的声明等--> <rabbit:admin connection-factory="connectionFactory"/>
<!--定义队列,自动声明--> <rabbit:queue name="zhangqinQueue" auto-declare="true" durable="true"/>
<!--定义交换器,自动声明--> <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> <rabbit:bindings>
<rabbit:binding queue="zhangqinQueue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange> </beans>
|
10.2 消费者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--1.定义rabbitmq的连接工厂--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/vhost_zq"/>
<!--定义队列,自动声明--> <rabbit:queue name="zhangqinQueue" auto-declare="true" durable="true"/>
<!--队列监听--> <rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="zhangqinConsumer" method="listen" queue-names="zhangqinQueue"/>
</rabbit:listener-container>
<!--定义消费者--> <bean id="zhangqinConsumer" class="com.cmbc.rabbitmq.Consumer"/> </beans>
|
10.3需要引入的jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.cmbc.spring.mq</groupId> <artifactId>spring-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <packaging>war</packaging>
<properties> <app-name>spring-rabbitmq</app-name> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.version>4.2.0.RELEASE</spring.version> </properties>
<dependencies> <!--测试 start--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--测试 end-->
<!--spring start--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <!-- spring end-->
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency>
<!--日志 start--> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency>
<!--配置logback日志框架所需要的jar start-->
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.2</version> </dependency>
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.2</version> </dependency>
<dependency> <groupId>org.logback-extensions</groupId> <artifactId>logback-ext-spring</artifactId> <version>0.1.1</version> </dependency>
<dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>3.0.7</version> </dependency> <!--配置logback日志框架所需要的jar end-->
<!--配置cglib动态代理 start--> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> <version>2.2.2</version> </dependency> <!--配置cglib动态代理 end-->
</dependencies>
<build> <finalName>spring-rabbitmq</finalName> <resources> <resource> <directory>src/main/resources</directory> <!--<includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes>--> <filtering>true</filtering> </resource> </resources> <plugins>
<plugin> <artifactId>maven-deploy-plugin</artifactId> <inherited>false</inherited> </plugin> <!--设置编译的java版本 很重要 start--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!--设置编译的java版本 很重要 end--> <plugin> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addDefaultImplementationEntries>true</addDefaultImplementationEntries> <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> </manifest> </archive> </configuration> </plugin> <plugin> <artifactId>maven-antrun-plugin</artifactId> <executions> <execution> <phase>compile</phase> <configuration> </configuration> <goals> <goal>run</goal> </goals> </execution> </executions> <dependencies> <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant</artifactId> <version>1.8.4</version> </dependency> </dependencies> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <configuration> <includes> <include>**/*Tests.java</include> </includes> <argLine>-Xmx256m</argLine> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <forkMode>once</forkMode> <argLine>-Dfile.encoding=UTF-8</argLine> </configuration> </plugin> </plugins>
</build>
<!--配置根据dev test prod等参数给不同环境的应用打包 start--> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <maven.app-name>${app-name}</maven.app-name> <maven.config-home>${project.build.directory}/../conf</maven.config-home> <maven.log-home>${project.build.directory}/../log</maven.log-home> <maven.log-level>DEBUG</maven.log-level> <maven.sql-log-level>DEBUG</maven.sql-log-level> <maven.console-log-enable>true</maven.console-log-enable> </properties> <build> <filters> <filter>src/main/resources/filter/dev.properties</filter> </filters> </build> </profile> <profile> <id>test</id> <properties> <maven.app-name>${app-name}</maven.app-name> <maven.config-home>/tmp/test/conf</maven.config-home> <maven.log-home>/usr/local/tomcat/logs</maven.log-home> <maven.log-level>INFO</maven.log-level> <maven.sql-log-level>INFO</maven.sql-log-level> <maven.console-log-enable>false</maven.console-log-enable> </properties> <activation> <activeByDefault>false</activeByDefault> </activation> <build> <filters> <filter>src/main/resources/filter/test.properties</filter> </filters> </build> </profile> <profile> <id>prod</id> <properties> <maven.app-name>${app-name}</maven.app-name> <maven.config-home>/tmp/conf</maven.config-home> <maven.log-home>/usr/local/tomcat7/logs</maven.log-home> <maven.log-level>INFO</maven.log-level> <maven.sql-log-level>INFO</maven.sql-log-level> <maven.console-log-enable>false</maven.console-log-enable> </properties> <activation> <activeByDefault>false</activeByDefault> </activation> <build> <filters> <filter>src/main/resources/filter/prod.properties</filter> </filters> </build> </profile> </profiles> <!--配置根据dev test prod等参数给不同环境的应用打包 end-->
</project>
|
10.4生产者代码
package com.cmbc.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping;
/** * @Package: com.cmbc.rabbitmq * @ClassName: Producer * @Description: * @author: zhangqin * @since: 2019/5/19 11:20 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ @Controller public class Producer {
@Autowired RabbitTemplate rabbitTemplate;
/*** * @Author zhangqin * @Description 生产者发送消息 * @Date 2019/5/19 11:31 * @Param [] * @return java.lang.String **/ @RequestMapping("/produceMsg") public String produceMsg() {
String msg = "hello zhangqin!!!"; rabbitTemplate.convertAndSend(msg); System.out.println("生产者 send msg " + msg);
return "success"; }
}
|
10.5消费者代码
package com.cmbc.rabbitmq;
/** * @Package: com.cmbc.rabbitmq * @ClassName: Consumer * @Description: 消费者监听 * @author: zhangqin * @since: 2019/5/19 11:06 * @version: 1.0 * @Copyright: 2019 zhangqin. All rights reserved. */ public class Consumer {
/*** * @Author zhangqin * @Description 消费者具体执行业务的方法 * @Date 2019/5/19 11:20 * @Param [msg] * @return void **/ public void listen(String msg) {
System.out.println("消费者受到消息:" + msg);
}
}
|