Redis pub/sub模式实现消息队列 侵删

一 消息队列

1. 定义

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

2. 消息队列的适用场景

消息队列的适用场景包括异步处理,应用解耦,流量削锋和消息通讯四个场景

1. 异步处理:异步处理中消息队列保存了当前处理操作,使得动作请求方可以在发出动作请求/写入消息队列后理解返回,异步获取结果,关注点在于请求的友好程度。

 

2. 应用解耦:应用解耦用于消除请求发起方和请求处理方的耦合,提升系统的健壮性。

 

3. 流量削锋:流量削峰一般指秒杀或抢购场景,消息队列用于控制活动人数,缓解高访问压力。

 

4. 日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。

 

3. 消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

P2P模式:

 

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点

  1. 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  3. 接收者在成功接收消息之后需向队列应答成功

 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、

 

Pub/sub模式:

 

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点

  1. 每个消息可以有多个消费者
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3. 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被**(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

 

 

4. 消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

 

 

二 Redis 发布-订阅模式(pub/sub)

Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:

订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;

发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。

1.       时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。

2.       空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。

3.       同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。

分类:
按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。

 

三 Redis pub/sub的实现(非持久)

Redis通过publishsubscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

 

1. 导入Redis依赖(以Maven工程为例子):

 

 
  1. <dependency>

  2. <groupId>redis.clients</groupId>

  3. <artifactId>jedis</artifactId>

  4. <version>2.8.0</version>

  5. </dependency>


2. 增加日志配置文件,这里使用系统输出代替日至

 

 

 
  1. log4j.rootLogger=info,stdout

  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender

  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

  4. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n


3. 创建消息的发布者Publisher.java

 

 

 
  1. package com.zenhobby.redis_pub_sub;

  2.  
  3. import java.io.BufferedReader;

  4. import java.io.InputStreamReader;

  5. import redis.clients.jedis.Jedis;

  6.  
  7. public class Publisher {

  8. private Jedis publisherJedis;

  9. private String channel;

  10.  
  11. public Publisher(Jedis publishJedis,String channel){

  12. this.publisherJedis=publishJedis;

  13. this.channel=channel;

  14. }

  15. public void startPublish(){

  16. try{

  17. BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

  18. while(true){

  19. System.out.println("请输入message:");

  20. String line = reader.readLine();

  21. if(!"quit".equals(line)){

  22. publisherJedis.publish(channel, line);

  23. }else{

  24. break;

  25. }

  26. }

  27. }catch(Exception e){

  28. e.printStackTrace();

  29. }

  30. }

  31. }


 

 

4. 实现消息的接收者Subscriber.java,实现JedisPubSub接口

 

 
  1. package com.zenhobby.redis_pub_sub;

  2.  
  3. import redis.clients.jedis.JedisPubSub;

  4.  
  5. public class Subscriber extends JedisPubSub {

  6. @Override

  7. public void onMessage(String channel, String message) {

  8. System.out.println("Channel:" + channel + ",Message:" + message);

  9. }

  10.  
  11. @Override

  12. public void onPMessage(String pattern, String channel, String message) {

  13. System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);

  14. }

  15.  
  16. @Override

  17. public void onSubscribe(String channel, int subscribedChannels) {

  18. System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);

  19. }

  20.  
  21. @Override

  22. public void onPUnsubscribe(String pattern, int subscribedChannels) {

  23. System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);

  24. }

  25.  
  26. @Override

  27. public void onPSubscribe(String pattern, int subscribedChannels) {

  28. System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);

  29. }

  30. }


JedisPubSub是Redis提供的抽象类,继承这个类就完成了对客户端对订阅的监听。

 

抽象类中存在六个方法。分别表示

  1. 监听到订阅模式接受到消息时的回调 (onPMessage)
  2. 监听到订阅频道接受到消息时的回调 (onMessage )
  3. 订阅频道时的回调( onSubscribe )
  4. 取消订阅频道时的回调( onUnsubscribe )
  5. 订阅频道模式时的回调 ( onPSubscribe )
  6. 取消订阅模式时的回调( onPUnsubscribe )

 

5. 创建测试Main.java

 

 
  1. package com.zenhobby.redis_pub_sub;

  2.  
  3. import redis.clients.jedis.Jedis;

  4. import redis.clients.jedis.JedisPool;

  5. import redis.clients.jedis.JedisPoolConfig;

  6.  
  7. public class TestMain {

  8. public static final String CHANNEL = "mychannel";

  9. public static final String HOST = "127.0.0.1";

  10. public static final int PORT = 6379;

  11.  
  12. private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();

  13. private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);

  14.  
  15. public static void main(String[] args) {

  16. final Jedis subscriberJedis = JEDIS_POOL.getResource();

  17. final Jedis publisherJedis = JEDIS_POOL.getResource();

  18. final Subscriber subscriber = new Subscriber();

  19. new Thread(new Runnable() {

  20. public void run() {

  21. try {

  22. System.out.println("Subscribing to mychannel,this thread will be block");

  23. subscriberJedis.subscribe(subscriber, CHANNEL);

  24. System.out.println("subscription ended");

  25. } catch (Exception e) {

  26. e.printStackTrace();

  27. }

  28. }

  29. }).start();

  30. new Publisher(publisherJedis, CHANNEL).startPublish();

  31. publisherJedis.close();

  32.  
  33. subscriber.unsubscribe();

  34. subscriberJedis.close();

  35. }

  36. }

 

6.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下(直接把原博的图借过来啦):

Redis pub/sub模式实现消息队列 侵删

注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。

 

 

四 Redis的pub/sub实现(持久)

Redis的pub/sub的持久主要通过,在非持久化的基础上需要作如下处理:

1. 重新实现Publisher

 

 
  1. package com.zenhobby.redis.persistence;

  2. import java.util.Set;

  3. import redis.clients.jedis.Jedis;

  4.  
  5. public class PPubClient {

  6. private Jedis jedis;

  7. private String CONSTANT_CLIENTSET = "clientSet";

  8. public PPubClient(String host,int port){

  9. jedis = new Jedis(host,port);

  10. }

  11. private void put(String message){

  12. Set<String> subClients = jedis.smembers(CONSTANT);

  13. for(String clientKey:subClients){

  14. jedis.rpush(clientKey, message);

  15. }

  16. }

  17. public void pub(String channel,String message){

  18. Long txid = jedis.incr("MAXID");

  19. String content = txid+"/"+message;

  20. this.put(content);

  21. jedis.publish(channel, message);

  22. }

  23. public void close(String channel){

  24. jedis.publish(channel, "quit");

  25. jedis.del(channel);

  26. }

  27. }

 

 

在新实现的Publisher中使用Jedis存储发布的消息。

2. 重新实现SubClient

 

 
  1. package com.zenhobby.redis.persistence;

  2.  
  3. import redis.clients.jedis.Jedis;

  4. import redis.clients.jedis.JedisPubSub;

  5.  
  6. public class PPSubClient {

  7. private Jedis jedis;

  8. private JedisPubSub listener;

  9. private String CONSTANT_CLIENTSET="clientSet";

  10. public PPSubClient(String host,int port,String clientId){

  11. jedis = new Jedis(host,port);

  12. listener = new PPrintListener(clientId,new Jedis(host,port));

  13. jedis.sadd(CONSTANT_CLIENTSET, clientId);

  14. }

  15. public void sub(String channel){

  16. jedis.subscribe(listener, channel);

  17. }

  18. public void unsubscribe(String channel){

  19. listener.unsubscribe(channel);

  20. }

  21. }


这个客户端并没有继承JedisPubSub类,转而在如下的输出类进行Listener的处理

 

3. Listener类用于处理消息

 

 
  1. package com.zenhobby.persistence;

  2.  
  3. import java.util.Date;

  4.  
  5. import redis.clients.jedis.Jedis;

  6. import redis.clients.jedis.JedisPubSub;

  7.  
  8. public class PPrintListener extends JedisPubSub {

  9.  
  10. private String clientId;

  11. private PSubHandler handler;

  12. private String CONSTANT = "clientSet";

  13. public PPrintListener(String clientId, Jedis jedis) {

  14. this.clientId = clientId;

  15. handler = new PSubHandler(jedis);

  16. }

  17.  
  18. @Override

  19. public void onMessage(String channel, String message) {

  20. if (message.equalsIgnoreCase("quit")) {

  21. this.unsubscribe(channel);

  22. }

  23. handler.handle(channel, message);

  24. System.out.println("message receive:" + message + ",channel:" + channel);

  25. }

  26.  
  27. private void message(String channel, String message) {

  28. Date time = new Date();

  29. System.out.println("message receive:" + message + ",channel:" + channel + time.toString());

  30. }

  31.  
  32. @Override

  33. public void onPMessage(String pattern, String channel, String message) {

  34. System.out.println("message receive:" + message + ",pattern channel:" + channel);

  35. }

  36.  
  37. @Override

  38. public void onSubscribe(String channel, int subscribedChannels) {

  39. handler.subscribe(channel);

  40. System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);

  41. }

  42.  
  43. @Override

  44. public void onUnsubscribe(String channel, int subscribedChannels) {

  45. handler.unsubscribe(channel);

  46. System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);

  47. }

  48.  
  49. @Override

  50. public void onPSubscribe(String pattern, int subscribedChannels) {

  51. System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);

  52. }

  53.  
  54. @Override

  55. public void unsubscribe(String... channels) {

  56. super.unsubscribe(channels);

  57. for (String channel : channels) {

  58. handler.unsubscribe(channel);

  59. }

  60. }

  61.  
  62. class PSubHandler {

  63. private Jedis jedis;

  64.  
  65. PSubHandler(Jedis jedis) {

  66. this.jedis = jedis;

  67. }

  68.  
  69. public void handle(String channel, String message) {

  70. int index = message.indexOf("/");

  71. if (index < 0) {

  72. return;

  73. }

  74. Long txid = Long.valueOf(message.substring(0, index));

  75. String key = clientId + "/" + channel;

  76. while (true) {

  77. String lm = jedis.lindex(key, 0);

  78. if (lm == null) {

  79. break;

  80. }

  81. int li = lm.indexOf("/");

  82. if(li<0){

  83. String result = jedis.lpop(key);

  84. if(result == null){

  85. break;

  86. }

  87. message(channel, lm);

  88. continue;

  89. }

  90. Long lxid = Long.valueOf(lm.substring(0, li));

  91. if(txid>=lxid){

  92. jedis.lpop(key);

  93. message(channel,lm);

  94. continue;

  95. }else{

  96. break;

  97. }

  98. }

  99. }

  100. public void subscribe(String channel){

  101. String key = clientId+"/"+channel;

  102. boolean exist = jedis.sismember(CONSTANT, key);

  103. if(!exist){

  104. jedis.sadd(CONSTANT, key);

  105. }

  106. }

  107. public void unsubscribe(String channel){

  108. String key = clientId+"/"+channel;

  109. jedis.srem(CONSTANT, key);

  110. jedis.del(key);

  111. }

  112. }

  113. }


其中jedis.sismember(CONSTANT, Key)用于判断当前用户是否存在,如果不存在则添加(和Redis缓存的思路相同)。

 

4. 创建测试Main方法,具体内容如下:

 

 
  1. package com.zenhobby.redis.persistence;

  2.  
  3. public class PPubSubTestMain {

  4. public static void main(String[] args) throws Exception {

  5. String host = "127.0.0.1";

  6. int port = 6379;

  7. String clientId = "myclient";

  8. PPubClient pubClient = new PPubClient(host, port);

  9. final String channel = "mychannel";

  10. final PPSubClient subClient = new PPSubClient(host, port, clientId);

  11. Thread subThread = new Thread(new Runnable() {

  12. public void run() {

  13. System.out.println("------------sub----start------------");

  14. subClient.sub(channel);

  15. System.out.println("------------sub----end------------");

  16. }

  17. });

  18. subThread.setDaemon(true);

  19. subThread.start();

  20. int i = 0;

  21. while (i < 20) {

  22. String message = "message--" + i;

  23. pubClient.pub(channel, message);

  24. i++;

  25. Thread.sleep(100);

  26. }

  27. subClient.unsubscribe(channel);

  28. }

  29. }

5.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。这次我们是以循环调用作为输入内容作为消息发布的内容,各位看官观察控制台输出。示例如下:

 

Redis pub/sub模式实现消息队列 侵删

然后,打开Redis客户端,观察当前Redis中保留的所有数据:

Redis pub/sub模式实现消息队列 侵删

题外的话:
Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能

 

 

至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束

 

在此,对以下参考资料的作者表示感谢!:

参考资料:

redis官网:

http://redis.io/topics/pubsub

其他博文:

http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw

http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html