RabbitMQ基础

RabbitMQ是目前非常热门的消息中间件之一。

RabbitMQ基础

命令行

命令行工具能方便地进行 RabbitMQ的管理

rabbitmq-server

  • rabbitmq-server start 启动RabbitMQ

rabbitmqctl

  • rabbitmqctl reset 移除所有数据
  • rabbitmqctl stop_app 服务的停止
  • rabbitmqctl change_cluster_node_type disc | ram 修改集群节点的存储形式
  • rabbitmqctl forget_cluster_node [--offline] 忘记节点/节点摘除
  • rabbitmqctl rename_cluster_node oldName newName

rabbitmq-plugins

  • rabbitmq-plugins enable rabbitmq_management

消息生产与消费模型

这里通过Java代码,演示RabbitMQ最基本的数据流转

生产者基础代码

public class Producer {
	public static void main(String[] args) throws Exception {
		
		String exchangeName = "exchange_test";
		String queueName = "test";
		
		// 创建一个 ConnectionFactory
		ConnectionFactory factory = new ConnectionFactory();
		
		// 设置RabbitMQ参数
		factory.setHost("localhost");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		
		// 获取一个连接
		Connection connection = factory.newConnection();
		
		// 创建Channel
		Channel channel = connection.createChannel();
		
		// 创建一个"direct"类型、非排他、非自动删除的交换机
		channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
		
		// 创建一个持久化、非排他的、非自动删除的队列
		channel.queueDeclare(queueName, true, false, false, null);
		
		channel.queueBind(queueName, exchangeName, "key_test");
		// 发送数据
		String message = "Hello world!";
		channel.basicPublish(exchangeName, "key_test", null, message.getBytes());
		
		// 释放资源,关闭连接
		channel.close();
		connection.close();
		
	}
}

消费者基础代码

public class Consumer {

	public static void main(String[] args) throws Exception {
		// 创建一个 ConnectionFactory
		ConnectionFactory factory = new ConnectionFactory();
		
		// 设置RabbitMQ连接参数
		factory.setHost("localhost");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		
		// 获取一个连接
		Connection connection = factory.newConnection();
		
		// 创建信道Channel
		Channel channel = connection.createChannel();
		
		// 声明一个队列
		String queueName = "test";

		// 创建消费者
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("Received message :" + new String(body));
				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		
		// 获取消息
		channel.basicConsume(queueName, true, consumer);
		
		// 释放资源,关闭连接
		channel.close();
		connection.close();
		
	}
}
  • ConnectionFactory:获取连接工厂
  • Connection:获取一个连接
  • Channel:得到数据通信信道
  • Queue:具体的消息存储队列
  • Producer:消息的生产
  • Consumer:消费消息

RabbitMQ的API设计的有些复杂,参数很多,在Java代码中显得凌乱。如果不对其中一些概念有认知,读起来会很吃力。

消息队列 Queue

RabbitMQ基础

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable -> 是,Transient -> 否
  • Auto delete:自动删除,如果最后一个监听被移除以后,是否自动删除该Queue

消息 Message

  • 服务器和应用程序之间传送的数据
  • 本质是一段数据,有Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)、content_type、content_encoding、priority:优先级、correlation_id(可以当做唯一ID)

交换机 Exchange

接收消息,并根据路由键转发消息所绑定的队列

RabbitMQ基础

在实际实现中,RabbitMQ将消息发送给Exchange而不是直接投递到队列中,交换机再把消息路由到队列中。

  • Name:交换机的名称
  • Type:交换机类型 -> direct、topic、fanout、headers
  • Durability:是否需要持久化
  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认False。在扩展RabbitMQ的功能时才可能用到
  • Arguments:扩展参数,用于扩展AMQP协议,自定义使用

交换机类型

RabbitMQ常用的交换机类型有四种:fanout、direct、topic、headers。实际上,AMQP协议还规定了其他的几种类型,只是并不常用。

direct 直连

RabbitMQ基础

  • 把消息路由到BindingKey和RoutingKey完全匹配的队列中

该方式比较严格,实际业务中并不常见

topic

RabbitMQ基础

  • 所有发送到 Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

通配符匹配:
- # 匹配一个或多个词:“log.#” 可以匹配 “log.info”,“log.info.msg”
- * 匹配一个词:“log.#” 可以匹配"log.info", “log.error”

次方式实际上是direct的一种扩展

fanout

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • 转发消息最快

headers

  • 不依赖路由键的匹配
  • 通过消息头来路由
  • 不常用

此种方式局限性比较大,而且性能比较差,很少在业务中需要这种类型

绑定 Binding

  • Exchange和Exchange、Queue之间的连接关系
  • Binding中可以包含RoutingKey或者参数

BingingKey与RoutingKey

BingingKey与RoutingKey有时候可以看做是同一种东西。在direct模式下,二者必须匹配才能通信。在topic模式下,二者有些许差别。

  • BingdingKey可以看成一种特殊的RoutingKey
  • 需要使用绑定的时候,使用BingdingKey,如channel.queueBind方法
  • 需要发送消息的时候,使用RoutingKey,如channel.basicPublish方法

一般,并不需要刻意地去分清它们

Channel 信道

它是建立在Connection上的一种虚拟连接

它建立在TCP连接中,数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。Channel存在的意义重大,对于操作系统来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的,RabbitMQ采用类似NIO的做法,使得TCP连接复用,不但减少开销,还便于管理。

RabbitMQ中的概念比较多,交换机、队列、绑定、路由键。。。它们都是AMQP协议中的组成部分