消息代理RabbitMQ——实施篇
1系统设计
本此实施采用B/S架构设计模式进行开发。但是不同的是,我们除了测试的时候需要用到前段界面的Ajax异步代码,其他时候并没有前端存在的意义。
Fanout接口,生产者的接口指定为4个参数,islasting、message、exchange、queue。其中islasting参数为boolean类型变量,表示传递的消息是否持久化于队列中。message即为需要传递消息的字符串类型变量。exchange为我们发布消息所用到的转换器。queue表示消息需要发布到的指定的队列名。而对于消费者的入口参数和生产者唯一不同的是,没有message参数。因为我们只要确定了其他三个参数,便可得到对应的消息。
Direct接口,生产者的接口指定为5个参数,islasting、message、exchange、queue和binding。前面四个参数与Fanout相同,binding代表的是“绑定”。只有生产者发布了一条消息,并且用binding变量进行约束以后,消费者同样用binding进行约束,才能接收到消息。这种方式是一种定向型的发布订阅方式,但只有单一消费者的情况下,其效果和Fanout相同。但是有多个消费者的情况下,所有具有相同binding约束的消费者都会收到同样的消息,这就是生活中的群发概念的实现。消费者接口同上。
Topic接口,生产者的接口指定为5个参数,islasting、message、exchange、queue和matching。matching在生产者中和消费者中的含义不同。在生产者中,表示一个完全标识,例如发布一条标识为“news.orange”的消息,“news.orange”即代表matching。在消费者中,该变量标识一种模式匹配,即消费者可以用值为“news.*”的变量接收所有以“news.”标识开头的消息进行消费。这是一个需要区分的地方。
RPC接口,该类型接口不涉及生产者的接口,因为生产者同时可以认为是消费者,或者说生产者不可见。该类型的功能描述的是消费者需要获得某种数据,但是没有生产者为其提供,但又不得不需要。所以通过远程调用的方式通知生产者发布一条自己需要的数据并返回,通过这种方式消费消息。其参数可以简化为1个,即需要生产者加工的数据源。以Fanout为例:
生产者接口:
@RequestMapping(value="/xxx",method=RequestMethod.GET)
public ResponseEntity<Object> callXXXProducer(){
//连接服务代码......
//初始化代码......
//发布消息部分代码......
//关闭服务代码......
//最后返回一个状态码告知消息发布情况
if(ok){
return newResponseEntity<>(HttpStatus.METHOD_NOT_ALLOWED);
}
return newResponseEntity<>(HttpStatus.OK);
}
消费者接口部分关键代码:
PrintWriter out= rsp.getWriter();
//接受消息......
if(list.size()!=0){
topicType.setMessage(list);
}
json=JSONArray.fromObject(topicType.getMessage());
out.print(json);//接受消息
接口大致执行过程如上,我们在消费者接口中将会返回一个Json格式的数据,同时携带状态码。
2环境配置
具体RabbitMQ的环境搭建请看《CentOS7搭建rabbitmq集群》。
3项目源码
具体源码请看Github上我的Repositoriy RabbitMQ_HTTP_Server。
4测试
下面我们进入测试阶段,这也是最激动人心的时刻。首先针对各种发布订阅策略的不同,需要分析一下我们的测试方式以及测试的预期效果。在发布消息的时候,我们通过入参就可定制化我们需要的一切测试数据,等待消费者消费。因此对于生产者的测试我们只需要通过PostMan插件进行简单的Get请求发送即可。对于消费者的测试,我们需要自行编写几个客户端,来验证各种策略的不同测试效果。测试如下:
(1)Fanout:PostMan发送Get方式请求的url为:"IP:PORT/callFonOutProducer?isLasting=true&message=FanoutMessage&queueName=fanoutQueue&exchangeName=fanoutEx"。
发送请求以后,我们的消息就已经发布到消息队列服务器当中,接下来我们将进行消费。我们打开两个Fanout的测试界面。该界面有一个接收消息的消息框,一个启动短轮询Ajax异步请求的"receiveAction"按键,以及一个取消异步请求的按键。我们将两个界面的异步请求启动后发现其中一个界面接收到了"FanoutMessage"的消息,而另一个没有。这时我们再通过PostMan多次发送发布消息的请求后,发现两个界面能持续收到消息,但是并不能同时接收。也就是说达到了我们预期的效果——即只有一个消费者能消费消息。
图1 Fanout测试(1)
图2 Fanout测试(2)
(2)Direct:PostMan发送Get方式请求的url为:"IP:PORT/callDirectProducer?isLasting=true&message=directMessage&queueName=directQueue&exchangeName=directEx&bindNames=directBinding"。
如同Fanout相同,我们打开两个Direct测试界面。其界面要素也相同,唯一不同的将会是我们的预期效果,我们**Ajax异步请求,通过点击"Action"按钮。接下来通过PostMan发送消息发布请求,将消息发布到消息服务队列中等代消费。
从界面上我们的实际效果和预期相同——我们的界面都能同时消费生产者所发布的所有消息,而且不会出现其中一个界面收到消息,另一个没收到的情况。
图3 Direct测试
(3)Topic:其PsotMan的url为:"IP:PORT/callTopicProducer?isLasting=true&exchangeName=topicEx&queueName=topicQueue&macthingKey=topic.Match1&message=topicMessage"。
测试界面请求参数:url:"/callTopicConsumer?isLasting=true&exchangeName=topicEx&queueName=topicQueue&macthingKey=topic.*"
值得注意的是,之前提到过,发布和消费的"matchingKey"参数的含义不同,前者代表完全标识,后者代表模式匹配——即前者发送的标识是后者能够模式匹配的集合的子集。
同样还是打开两个Topic的测试界面,根据上述的发布请求参数以及消费请求参数,我们期望的预期效果是能够接收到"topicMessage"这条消息,并且还是两个界面同时消费。测试效果如下所示:
图4 Topic测试
(4)RPC:RPC模块的测试不需要用到PostMan,由于该策略是通过消费者发送请求给服务端,服务端紧接着将消息发布到队列中,随后消费者进行消费。所以对于RPC的测试,我们将所有的请求发送都放在界面上进行操作。
在测试之前,我们需要点击"ActiveServer"按钮来**服务端的远程服务,然后在"send"输入框中输入我们需要服务端进行处理的数据。接下来我们需要做的就是点击"send"按钮等待消息发布到消息队列中。测试效果如下图所示:
图5 RPC测试
我执行RPC消息处理的逻辑是:
private static int fib(int n) {
if(n == 0) return 0;
if(n == 1) return 1;
returnfib(n-1) + fib(n-2);
}
5结语
由于RabbitMQ的中文资料几乎没有,尤其是Java方面的,笔者希望这三篇文章能为各位提供一点价值。