RabbitMQ入门(四)-Routing(路由)

首先可以去官方看看第四章节讲的一些内容:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

Routing:绑定交换机和队列之间的关系。

上文我们用的fanout模式,属于广播模式,不能将交换机与相关队列进行绑定,用DIRECT模式,我们可以将详细发送到交换机,再由交换机决定发送给队列X,并决定往队列中发送几条消息。

RabbitMQ入门(四)-Routing(路由)

1.生产者

package com.baidu.RabbitMQ.mq03;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MultiConsumerOne {
	
	private static final String Exchange_Name="rabbit:mq03:exchange:e01";
	
	private static final String Queue_Name_01="rabbit:mq03:queue:q01";
	private static final String Routing_Key_01="rabbit:mq03:routing:key:r01";
	
	public static void main(String[] args) {
		try {
			ConnectionFactory factory = new ConnectionFactory();
		    factory.setHost("127.0.0.1");
		    Connection connection = factory.newConnection();
		    Channel channel = connection.createChannel();
		    
		   
		    channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
		    channel.queueDeclare(Queue_Name_01, true, false, false, null);
		    channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);
		    
		    Consumer consumer = new DefaultConsumer(channel) {
		      @Override
		      public void handleDelivery(String consumerTag, Envelope envelope,
		                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
		        String message = new String(body, "UTF-8");
		        System.out.println("消费者1接收到消息成功---> "+message);
		      }
		    };
		    
		    channel.basicConsume(Queue_Name_01, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

2.消费者one

package com.baidu.RabbitMQ.mq03;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MultiConsumerOne {
	
	private static final String Exchange_Name="rabbit:mq03:exchange:e01";
	
	private static final String Queue_Name_01="rabbit:mq03:queue:q01";
	private static final String Routing_Key_01="rabbit:mq03:routing:key:r01";
	
	public static void main(String[] args) {
		try {
			ConnectionFactory factory = new ConnectionFactory();
		    factory.setHost("127.0.0.1");
		    Connection connection = factory.newConnection();
		    Channel channel = connection.createChannel();
		    
		   
		    channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
		    channel.queueDeclare(Queue_Name_01, true, false, false, null);
		    channel.queueBind(Queue_Name_01, Exchange_Name, Routing_Key_01);
		    
		    Consumer consumer = new DefaultConsumer(channel) {
		      @Override
		      public void handleDelivery(String consumerTag, Envelope envelope,
		                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
		        String message = new String(body, "UTF-8");
		        System.out.println("消费者1接收到消息成功---> "+message);
		      }
		    };
		    
		    channel.basicConsume(Queue_Name_01, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 3.消费者two

package com.baidu.RabbitMQ.mq03;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MultiConsumerTwo {
	
	private static final String Exchange_Name="rabbit:mq03:exchange:e01";
	
	private static final String Queue_Name_02="rabbit:mq03:queue:q02";
	
	private static final String Routing_Key_02="rabbit:mq03:routing:key:r02";
	private static final String Routing_Key_03="rabbit:mq03:routing:key:r03";
	
	public static void main(String[] args) {
		try {
			ConnectionFactory factory = new ConnectionFactory();
		    factory.setHost("127.0.0.1");
		    Connection connection = factory.newConnection();
		    Channel channel = connection.createChannel();
		    
		
		    channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);
		    channel.queueDeclare(Queue_Name_02, true, false, false, null);
		    channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_02);
		    channel.queueBind(Queue_Name_02, Exchange_Name, Routing_Key_03);
		    
		    Consumer consumer = new DefaultConsumer(channel) {
		      @Override
		      public void handleDelivery(String consumerTag, Envelope envelope,
		                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
		        String message = new String(body, "UTF-8");
		        System.out.println("消费者1接收到消息成功---> "+message);
		      }
		    };
		    
		    channel.basicConsume(Queue_Name_02, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}