Redis pub/sub模式实现消息队列 侵删
一 消息队列
1. 定义
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
2. 消息队列的适用场景
消息队列的适用场景包括异步处理,应用解耦,流量削锋和消息通讯四个场景
1. 异步处理:异步处理中消息队列保存了当前处理操作,使得动作请求方可以在发出动作请求/写入消息队列后理解返回,异步获取结果,关注点在于请求的友好程度。
2. 应用解耦:应用解耦用于消除请求发起方和请求处理方的耦合,提升系统的健壮性。
3. 流量削锋:流量削峰一般指秒杀或抢购场景,消息队列用于控制活动人数,缓解高访问压力。
4. 日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
3. 消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
P2P模式:
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。、
Pub/sub模式:
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被**(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
4. 消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步:订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
二 Redis 发布-订阅模式(pub/sub)
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:
订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;
发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
1. 时间非耦合:发布者和订阅者不必同时在线,它们不必同时参与交互。
2. 空间非耦合:发布者和订阅者不必相互知道对方所在的位置。发布者通过事件服务发布事件,订阅者通过事件服务间接获得事件。发布者和订阅者不需要拥有直接到对方的引用,也不必知道有多少个订阅者或者是发布者参与交互。
3. 同步非耦合:发布者/订阅者是异步模式。发布者可不断地生产事件,而订阅者(通过一个回调)则可异步地得到产生事件的通知。
分类:
按照订阅方式分为基于主题(topic-based)、基于内容(content-based)、基于类型(type-based)的pub/sub方式。
三 Redis pub/sub的实现(非持久)
Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。
1. 导入Redis依赖(以Maven工程为例子):
-
<dependency>
-
<groupId>redis.clients</groupId>
-
<artifactId>jedis</artifactId>
-
<version>2.8.0</version>
-
</dependency>
2. 增加日志配置文件,这里使用系统输出代替日至
-
log4j.rootLogger=info,stdout
-
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
3. 创建消息的发布者Publisher.java
-
package com.zenhobby.redis_pub_sub;
-
import java.io.BufferedReader;
-
import java.io.InputStreamReader;
-
import redis.clients.jedis.Jedis;
-
public class Publisher {
-
private Jedis publisherJedis;
-
private String channel;
-
public Publisher(Jedis publishJedis,String channel){
-
this.publisherJedis=publishJedis;
-
this.channel=channel;
-
}
-
public void startPublish(){
-
try{
-
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
-
while(true){
-
System.out.println("请输入message:");
-
String line = reader.readLine();
-
if(!"quit".equals(line)){
-
publisherJedis.publish(channel, line);
-
}else{
-
break;
-
}
-
}
-
}catch(Exception e){
-
e.printStackTrace();
-
}
-
}
-
}
4. 实现消息的接收者Subscriber.java,实现JedisPubSub接口
-
package com.zenhobby.redis_pub_sub;
-
import redis.clients.jedis.JedisPubSub;
-
public class Subscriber extends JedisPubSub {
-
@Override
-
public void onMessage(String channel, String message) {
-
System.out.println("Channel:" + channel + ",Message:" + message);
-
}
-
@Override
-
public void onPMessage(String pattern, String channel, String message) {
-
System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);
-
}
-
@Override
-
public void onSubscribe(String channel, int subscribedChannels) {
-
System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
-
}
-
@Override
-
public void onPUnsubscribe(String pattern, int subscribedChannels) {
-
System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
-
}
-
@Override
-
public void onPSubscribe(String pattern, int subscribedChannels) {
-
System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
-
}
-
}
JedisPubSub是Redis提供的抽象类,继承这个类就完成了对客户端对订阅的监听。
抽象类中存在六个方法。分别表示
- 监听到订阅模式接受到消息时的回调 (onPMessage)
- 监听到订阅频道接受到消息时的回调 (onMessage )
- 订阅频道时的回调( onSubscribe )
- 取消订阅频道时的回调( onUnsubscribe )
- 订阅频道模式时的回调 ( onPSubscribe )
- 取消订阅模式时的回调( onPUnsubscribe )
5. 创建测试Main.java
-
package com.zenhobby.redis_pub_sub;
-
import redis.clients.jedis.Jedis;
-
import redis.clients.jedis.JedisPool;
-
import redis.clients.jedis.JedisPoolConfig;
-
public class TestMain {
-
public static final String CHANNEL = "mychannel";
-
public static final String HOST = "127.0.0.1";
-
public static final int PORT = 6379;
-
private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
-
private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);
-
public static void main(String[] args) {
-
final Jedis subscriberJedis = JEDIS_POOL.getResource();
-
final Jedis publisherJedis = JEDIS_POOL.getResource();
-
final Subscriber subscriber = new Subscriber();
-
new Thread(new Runnable() {
-
public void run() {
-
try {
-
System.out.println("Subscribing to mychannel,this thread will be block");
-
subscriberJedis.subscribe(subscriber, CHANNEL);
-
System.out.println("subscription ended");
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}).start();
-
new Publisher(publisherJedis, CHANNEL).startPublish();
-
publisherJedis.close();
-
subscriber.unsubscribe();
-
subscriberJedis.close();
-
}
-
}
6.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。并且我们是以控制台输入内容作为消息发布的内容,各位看官可以在控制台输入任意内容,点击回车键,观察控制台输出。示例如下(直接把原博的图借过来啦):
注意:此方法实现的发布与订阅功能,消息不会在Redis客户端进行缓存。
四 Redis的pub/sub实现(持久)
Redis的pub/sub的持久主要通过,在非持久化的基础上需要作如下处理:
1. 重新实现Publisher
-
package com.zenhobby.redis.persistence;
-
import java.util.Set;
-
import redis.clients.jedis.Jedis;
-
public class PPubClient {
-
private Jedis jedis;
-
private String CONSTANT_CLIENTSET = "clientSet";
-
public PPubClient(String host,int port){
-
jedis = new Jedis(host,port);
-
}
-
private void put(String message){
-
Set<String> subClients = jedis.smembers(CONSTANT);
-
for(String clientKey:subClients){
-
jedis.rpush(clientKey, message);
-
}
-
}
-
public void pub(String channel,String message){
-
Long txid = jedis.incr("MAXID");
-
String content = txid+"/"+message;
-
this.put(content);
-
jedis.publish(channel, message);
-
}
-
public void close(String channel){
-
jedis.publish(channel, "quit");
-
jedis.del(channel);
-
}
-
}
在新实现的Publisher中使用Jedis存储发布的消息。
2. 重新实现SubClient
-
package com.zenhobby.redis.persistence;
-
import redis.clients.jedis.Jedis;
-
import redis.clients.jedis.JedisPubSub;
-
public class PPSubClient {
-
private Jedis jedis;
-
private JedisPubSub listener;
-
private String CONSTANT_CLIENTSET="clientSet";
-
public PPSubClient(String host,int port,String clientId){
-
jedis = new Jedis(host,port);
-
listener = new PPrintListener(clientId,new Jedis(host,port));
-
jedis.sadd(CONSTANT_CLIENTSET, clientId);
-
}
-
public void sub(String channel){
-
jedis.subscribe(listener, channel);
-
}
-
public void unsubscribe(String channel){
-
listener.unsubscribe(channel);
-
}
-
}
这个客户端并没有继承JedisPubSub类,转而在如下的输出类进行Listener的处理
3. Listener类用于处理消息
-
package com.zenhobby.persistence;
-
import java.util.Date;
-
import redis.clients.jedis.Jedis;
-
import redis.clients.jedis.JedisPubSub;
-
public class PPrintListener extends JedisPubSub {
-
private String clientId;
-
private PSubHandler handler;
-
private String CONSTANT = "clientSet";
-
public PPrintListener(String clientId, Jedis jedis) {
-
this.clientId = clientId;
-
handler = new PSubHandler(jedis);
-
}
-
@Override
-
public void onMessage(String channel, String message) {
-
if (message.equalsIgnoreCase("quit")) {
-
this.unsubscribe(channel);
-
}
-
handler.handle(channel, message);
-
System.out.println("message receive:" + message + ",channel:" + channel);
-
}
-
private void message(String channel, String message) {
-
Date time = new Date();
-
System.out.println("message receive:" + message + ",channel:" + channel + time.toString());
-
}
-
@Override
-
public void onPMessage(String pattern, String channel, String message) {
-
System.out.println("message receive:" + message + ",pattern channel:" + channel);
-
}
-
@Override
-
public void onSubscribe(String channel, int subscribedChannels) {
-
handler.subscribe(channel);
-
System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);
-
}
-
@Override
-
public void onUnsubscribe(String channel, int subscribedChannels) {
-
handler.unsubscribe(channel);
-
System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);
-
}
-
@Override
-
public void onPSubscribe(String pattern, int subscribedChannels) {
-
System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);
-
}
-
@Override
-
public void unsubscribe(String... channels) {
-
super.unsubscribe(channels);
-
for (String channel : channels) {
-
handler.unsubscribe(channel);
-
}
-
}
-
class PSubHandler {
-
private Jedis jedis;
-
PSubHandler(Jedis jedis) {
-
this.jedis = jedis;
-
}
-
public void handle(String channel, String message) {
-
int index = message.indexOf("/");
-
if (index < 0) {
-
return;
-
}
-
Long txid = Long.valueOf(message.substring(0, index));
-
String key = clientId + "/" + channel;
-
while (true) {
-
String lm = jedis.lindex(key, 0);
-
if (lm == null) {
-
break;
-
}
-
int li = lm.indexOf("/");
-
if(li<0){
-
String result = jedis.lpop(key);
-
if(result == null){
-
break;
-
}
-
message(channel, lm);
-
continue;
-
}
-
Long lxid = Long.valueOf(lm.substring(0, li));
-
if(txid>=lxid){
-
jedis.lpop(key);
-
message(channel,lm);
-
continue;
-
}else{
-
break;
-
}
-
}
-
}
-
public void subscribe(String channel){
-
String key = clientId+"/"+channel;
-
boolean exist = jedis.sismember(CONSTANT, key);
-
if(!exist){
-
jedis.sadd(CONSTANT, key);
-
}
-
}
-
public void unsubscribe(String channel){
-
String key = clientId+"/"+channel;
-
jedis.srem(CONSTANT, key);
-
jedis.del(key);
-
}
-
}
-
}
其中jedis.sismember(CONSTANT, Key)用于判断当前用户是否存在,如果不存在则添加(和Redis缓存的思路相同)。
4. 创建测试Main方法,具体内容如下:
-
package com.zenhobby.redis.persistence;
-
public class PPubSubTestMain {
-
public static void main(String[] args) throws Exception {
-
String host = "127.0.0.1";
-
int port = 6379;
-
String clientId = "myclient";
-
PPubClient pubClient = new PPubClient(host, port);
-
final String channel = "mychannel";
-
final PPSubClient subClient = new PPSubClient(host, port, clientId);
-
Thread subThread = new Thread(new Runnable() {
-
public void run() {
-
System.out.println("------------sub----start------------");
-
subClient.sub(channel);
-
System.out.println("------------sub----end------------");
-
}
-
});
-
subThread.setDaemon(true);
-
subThread.start();
-
int i = 0;
-
while (i < 20) {
-
String message = "message--" + i;
-
pubClient.pub(channel, message);
-
i++;
-
Thread.sleep(100);
-
}
-
subClient.unsubscribe(channel);
-
}
-
}
5.测试方法:首先,启动main方法中所示地址的Redis服务器;然后,运行main方法,观察控制台输出。这次我们是以循环调用作为输入内容作为消息发布的内容,各位看官观察控制台输出。示例如下:
然后,打开Redis客户端,观察当前Redis中保留的所有数据:
题外的话:
Redis目前提供的发布与订阅功能,将会完全阻塞订阅者的客户端,在java实现时,即需要保留一个线程来专门处理发布者与订阅者的连接。因此,在实际应用时,更加推荐的做法是使用MQ组件来实现该功能。
至此,NoSQL之Redis---PUB/SUB(订阅与发布)---JAVA实现 结束
在此,对以下参考资料的作者表示感谢!:
参考资料:
redis官网:
其他博文:
http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw
http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html