RabbitMQ入门(四)-Routing(路由)
首先可以去官方看看第四章节讲的一些内容:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
Routing:绑定交换机和队列之间的关系。
上文我们用的fanout模式,属于广播模式,不能将交换机与相关队列进行绑定,用DIRECT模式,我们可以将详细发送到交换机,再由交换机决定发送给队列X,并决定往队列中发送几条消息。
1.生产者
package com.baidu.RabbitMQ.mq03;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MultiConsumerOne {
private static final String Exchange_Name="rabbit:mq03:exchange:e01";
private static final String Queue_Name_01="rabbit:mq03:queue:q01";
private static final String Routing_Key_01="rabbit:mq03:routing:key:r01";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Queue_Name_01, true, false, false, null);
channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者1接收到消息成功---> "+message);
}
};
channel.basicConsume(Queue_Name_01, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.消费者one
package com.baidu.RabbitMQ.mq03;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MultiConsumerOne {
private static final String Exchange_Name="rabbit:mq03:exchange:e01";
private static final String Queue_Name_01="rabbit:mq03:queue:q01";
private static final String Routing_Key_01="rabbit:mq03:routing:key:r01";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Queue_Name_01, true, false, false, null);
channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者1接收到消息成功---> "+message);
}
};
channel.basicConsume(Queue_Name_01, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.消费者two
package com.baidu.RabbitMQ.mq03;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MultiConsumerTwo {
private static final String Exchange_Name="rabbit:mq03:exchange:e01";
private static final String Queue_Name_02="rabbit:mq03:queue:q02";
private static final String Routing_Key_02="rabbit:mq03:routing:key:r02";
private static final String Routing_Key_03="rabbit:mq03:routing:key:r03";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Queue_Name_02, true, false, false, null);
channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);
channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_03);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者1接收到消息成功---> "+message);
}
};
channel.basicConsume(Queue_Name_02, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}