4-RabbitMQ交换机-direct
RabbitMQ交换机-direct
direct交换机
RabbitMQ交换机的概念上次已经介绍过了,今天说说direct的交换机模式。上一次我们说到了交换机的绑定
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
实质上是队列对这个交换机上过来的消息感兴趣。但有的时候我们希望特定的消息流向某个指定的队列,而不是由交换机随机分配。这个时候就应该采用direct的交换机,并且绑定一个标识码,我们称其绑定键。
// 声明fanout交换机、设置持久化
boolean durable2 =true;
channel.exchangeDeclare(EXCHANGE_NAME,"direct",durable2);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME,"error");
直接交换
在这个设置中,我们可以看到直接交换X与两个绑定的队列。第一个队列与绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定为绿色。
在这样的设置中,发布到具有路由**橙色的交换机的消息 将被路由到队列Q1。具有黑色 或绿色路由**的消息将转到Q2。所有其他消息将被丢弃。
多重绑定
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1和Q2之间添加黑色绑定键。在这种情况下,direct交换将表现得像fanout交换了,并将消息广播到所有匹配的队列。具有路由**黑色的消息将传送到 Q1和Q2。
我们现在将用此模型进行日志记录系统,将info、warning、error日志绑定到一个队列中,error绑定到另外一个队列中,具体代码如下。
生产者
package MQ.Exchange.Direct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Title: MQ.WorkQueues.NewTask.java
* @Package MQ.WorkQueues
* @Description:TODO(MQ消息发送到direct的交换机上)
* @Copyright: Copyright (c) 2017 YUANH All Rights Reserved
* @authoryuanh
* @date 2017-5-10下午3:50:35
*/
public classNewTask {
private final static String EXCHANGE_NAME="direct_logs";
private final static String QUEUE_NAME ="hello_direct";
private final static String QUEUE_NAME2 ="hello_direct1";
private final static String[] service =new String[] { "error","info",
"warning"};
private final static String[] service2 =new String[] { "error"};
public static void main(String[] args)throws IOException,TimeoutException {
// 创建连接连接到MabbitMQ
ConnectionFactoryfactory = newConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
factory.setUsername("yuanh");
factory.setPassword("yuanh");
factory.setPort(5672);
factory.setVirtualHost("y_yuanh");
Connectionconnection = factory.newConnection();
// 创建一个频道
Channelchannel = connection.createChannel();
// 声明队列、设置队列持久化
boolean durable =true;
channel.queueDeclare(QUEUE_NAME, durable,false, false,null);
channel.queueDeclare(QUEUE_NAME2, durable,false, false,null);
// 声明fanout交换机、设置持久化
boolean durable2 =true;
channel.exchangeDeclare(EXCHANGE_NAME,"direct",durable2);
// 将队列绑定到交换机上
for (String severity :service) {
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,severity);
}
for (String severity :service2) {
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,severity);
}
Stringmessage = "info";
Stringmessage2 = "error";
Stringmessage3 = "warning";
// 将消息放到队列里面
// channel.basicPublish("", QUEUE_NAME, null,message.getBytes());
// 将消息放到交换机上
channel.basicPublish(EXCHANGE_NAME,message,null,message.getBytes());
channel.basicPublish(EXCHANGE_NAME,message2,null,message2.getBytes());
channel.basicPublish(EXCHANGE_NAME,message3,null, message3.getBytes());
System.out.println("发送 '" + message +"'");
// 关闭通道和连接
channel.close();
connection.close();
}
}
执行后队列中会出现
消费者
消费者和之前的一样,只不过是将对列名称改成你需要取的对列名称
package MQ.Exchange.Direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
@SuppressWarnings("deprecation")
/**
* @Title: MQ.WorkQueues.Worker.java
* @Package MQ.WorkQueues
* @Description:TODO(MQ消息发送到direct的交换机上)
* @Copyright: Copyright (c) 2017 YUANH All Rights Reserved
* @authoryuanh
* @date 2017-5-10下午3:48:59
*/
public classWorkerResponse {
// private final static String EXCHANGE_NAME ="direct_logs";
private final static String QUEUE_NAME ="hello_direct";
// private final static String QUEUE_NAME2 ="hello_direct1";
public static void main(String[] argv)throws Exception {
// 创建连接连接到MabbitMQ
ConnectionFactoryfactory = newConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
factory.setUsername("yuanh");
factory.setPassword("yuanh");
factory.setPort(5672);
factory.setVirtualHost("y_yuanh");
Connectionconnection = factory.newConnection();
Channelchannel = connection.createChannel();
// 1声明队列、设置队列持久化
boolean durable =true;
channel.queueDeclare(QUEUE_NAME, durable,false, false,null);
QueueingConsumerconsumer = newQueueingConsumer(channel);
// 2消费者指定消费队列,打开应答机制,注意false才是打开手动应对,true为自动应答
boolean ack =false;
channel.basicConsume(QUEUE_NAME, ack, consumer);
// 3消费者设置最大服务转发消息数量,公平转发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
try {
while (true) {
QueueingConsumer.Deliverydelivery = consumer.nextDelivery();
Stringmessage = newString(delivery.getBody());
System.out.println("接收 '" + message +"'");
try {
doWork(message);
}finally{
System.out.println("结束");
// 另外需要在每次处理完成一个消息后,手动发送一次应答(ack=false)。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
}catch(Exception e) {
channel.close();
connection.close();
}
}
private static void doWork(String task)throws InterruptedException {
for (char ch :task.toCharArray()) {
if (ch =='.') {
Thread.sleep(1000);
}else{
Thread.sleep(1000);
}
}
}
}