基于redis的消息队列的设计及实现
消息队列
所谓消息队列,就是一个以队列数据结构为基础的一个真实存在的实体,如数组,redis中的队列集合等等,都可以。
为什么要使用队列
主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
比如说点赞这个功能,这个在高并发的情况下,很容易造成数据库连接数占满,到时整个网站响应缓慢,才是就是想到要解决数据库的压力问题,一般就是两种方案,一是提高数据库本身的能力(如增加连接数,读写分离等),但是数据库总是有极限的,到达了极限是没有办法在提升了的,此时就要考虑第二种方案,释放数据库的压力,将压力转移到缓存里面。就拿实际的点赞来说吧,用户的点赞请求到来,我只是将点赞请求投递到消息队列里面,后续的点赞请求可以将消息合并,即只更新点赞数,不产生新的任务,此时有个进行再不断的轮训消息队列,将点赞消息消耗,并将值更新到数据库里面,这样就有效的降低了数据库的压力,因为在缓存层将数个数据库更新请求合并成一个,大大提高了效率,降低了负载。
Redis实现的消息队列
Redis提供了两种方式来作消息队列。 生产者消费模式和发布订阅者模式。
生产者消费模式会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。 其实在生产者消费模式中生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。
发布订阅者模式也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。
这里使用的是生产者消费模式。
基于Redis的消息队列实现的异步操作原理图如下:
代码实现
1.首先定义事件的类型,使用枚举类,便于取出各种事件
package com.springboot.springboot.async;
/**
* @author WilsonSong
* @date 2018/6/3
* 枚举类,就是事件的各种类型
*/
public enum EventType {
LIKE(0), COMMENT(1), LOGIN(2),MAIL(3);
private int value;
EventType(int value){
this.value = value;
}
public int getValue(){
return value;
}
}
2.定义事件的具体实现类
类里面很多的实现方法都是返回的是EventModel这个类,是为了以后点赞的时候能够链式的取出与这个事件相关的参数
/**
* @author WilsonSong
* @date 2018/6/3
* 不同的事件肯定是有不同的类型的
*/
public class EventModel {
//例如,有人评论了一个问题,那type就是评论, actorId就是谁评论的,
// entityId和entityType就是评论的是那个问题,entityOwnerId就是那个问题关联的对象
private EventType type; //事件的类型
private int actorId; //事件的触发者
private int entityType; //触发事件的载体
private int entityId; //和entityType组合成触发事件的载体 可以使任何一个实体的id,问题,评论,用户,站内信等等
private int entityOwnerId; //载体关联的对象,当我们给一个人点赞时,系统要给那个人(也就是entityOwnerId)发送一个站内信,通知那个人他被点赞了。
public EventModel(){
}
public EventModel(EventType type){
this.type = type;
}
//定义可扩展的字段
private Map<String, String> exts = new HashMap<>();
public EventModel setExts(String key, String value){
exts.put(key,value);
return this;
}
public String getExts(String key){
return exts.get(key);
}
public EventType getType() {
return type;
}
//为了能够实现链状的设置
public EventModel setType(EventType type) {
this.type = type;
return this; //这个就是为了实现这个xxx.setType().setXX();
}
public int getActorId() {
return actorId;
}
public EventModel setActorId(int actorId) {
this.actorId = actorId;
return this;
}
public int getEntityType() {
return entityType;
}
public EventModel setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public EventModel setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityOwnerId() {
return entityOwnerId;
}
public EventModel setEntityOwnerId(int entityOwnerId) {
this.entityOwnerId = entityOwnerId;
return this;
}
public Map<String, String> getExts() {
return exts;
}
public EventModel setExts(Map<String, String> exts) {
this.exts = exts;
return this;
}
}
3.EventProducer的实现--生产者,作用是把事件分发到队列中
/**
* @author WilsonSong
* @date 2018/6/3
* 事件的入口,用来统一分发事件,就是在队列中插入
*/
@Service
public class EventProducer {
@Autowired
JedisAdapter jedisAdapter;
//把事件分发出去 EventProducer
public boolean fireEvent(EventModel eventModel){
try{
//序列化,将EventModel 转换WieJSON的字符串
String json = JSONObject.toJSONString(eventModel);
String key = RedisKeyUtil.getEventQueueKey();
jedisAdapter.lpush(key, json);
return true;
}catch (Exception e){
return false;
}
}
//事件的取出与消费
}
4、Redis的统一封装 --队列
因为这里是基于Redis的队列实现异步操作,需要对Redis的一些函数重新封装,并与redis缓存进行数据交互
/**
* @author WilsonSong
* @date 2018/6/1
*/
@Service
public class JedisAdapter implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(JedisAdapter.class);
private JedisPool pool;
public static void print(int index, Object object) {
System.out.println(String.format("%d, %s", index, object.toString()));
}
@Override
public void afterPropertiesSet() throws Exception {
pool = new JedisPool("redis://localhost:6379/10");
}
public long lpush(String key, String value){
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.lpush(key,value);
}catch (Exception e){
logger.error("Redis队列添加异常");
}finally {
if (jedis != null){
jedis.close();
}
}
return 0;
}
public List<String> brpop(int timeout, String key){
Jedis jedis = null;
try{
jedis = pool.getResource();
return jedis.brpop(timeout,key);
}catch (Exception e){
logger.error("Redis队列弹出数据异常");
}finally {
if (jedis != null){
jedis.close();
}
}
return null;
}
因为Redis是key--value的模式,每一个事件都应该有与其对应的key,为了统一管理并且不产生混淆,定义统一的key的生成
/**
* @author WilsonSong
* @date 2018/6/2
* 为了防止生成的key有冲突
*/
public class RedisKeyUtil {
private static String SPLIT = ":";
private static String BIZ_LIKE = "LIKE";
private static String BIZ_DISLIKE = "DISLIKE";
private static String BIZ_EVENTQUEUE = "EVENTQUEUE";
//获取点赞的key
public static String getLikeKey(int entityType, int entityId){
return BIZ_LIKE + SPLIT + String.valueOf(entityType) + SPLIT +String.valueOf(entityId);
}
//获取点踩的key
public static String getDislikeKey(int entityType, int entityId){
return BIZ_DISLIKE +SPLIT + String.valueOf(entityType) + SPLIT + String.valueOf(entityId);
}
public static String getEventQueueKey(){
return BIZ_EVENTQUEUE;
}
}
5.EventHandler接口
在消费者与事件之间写一个handler的接口,实现Consumer和handler之间的交互,因为消费者就是找到哪些EventHandler对当前的事件感兴趣
/**
* @author WilsonSong
* @date 2018/6/3
* 用来处理事件的,谁关心这个事件,谁来做这个事件
*/
public interface EventHandler {
void doHander(EventModel model); //谁来处理事件
List<EventType> getSupportEventTypes(); //有哪些关心这些事件的
}
6. EventConsumer的实现---消费者
创建一个类型为Map<EventType, List<EventHandler>>的map,用于存放所有的Handler,然后将所有的事件注册到config中,即通过applicationContext获取实现了EventHandler接口的全部Handler。
启动线程去不断的去队列中查询事件并用brpop把事件拉出来,通过序列化和反序列化将取出的JSON转化为EventModel,寻找是否有能处理EventModel的Handler,调用每一个对该事件感兴趣的EventType的doHandle方法去处理事件
**
* @author WilsonSong
* @date 2018/6/3
* 处理队列中的事件并与各个handler沟通
* InitializingBean接口的作用在spring 初始化后,执行完所有属性设置方法(即setXxx)将
* 自动调用 afterPropertiesSet(), 在配置文件中无须特别的配置
*/
@Service
public class EventConsumer implements InitializingBean,ApplicationContextAware{
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
private Map<EventType, List<EventHandler>> config = new HashMap<>();
private ApplicationContext applicationContext; //sping的上下文
@Autowired
JedisAdapter jedisAdapter;
//这个方法将在所有的属性被初始化后调用
@Override
public void afterPropertiesSet() throws Exception {
//获取现在有多少个eventHandler初始化了
Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
if (beans != null){
for (Map.Entry<String,EventHandler> entry : beans.entrySet()){
List<EventType> eventTypes = entry.getValue().getSupportEventTypes(); //找到那些handler对当前的事件感兴趣
for (EventType type : eventTypes){
if (!config.containsKey(type)){ //有可能是第一次注册这个事件,所以就可能初始的时候是null
//把handler放到config中
config.put(type, new ArrayList<EventHandler>()); //把event注册到config中
}
config.get(type).add(entry.getValue()); //把对这些event感兴趣的handler添加到config中
}
}
}
//打开线程去找队列中的事件
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true){ //一直取
String key = RedisKeyUtil.getEventQueueKey();
List<String> events = jedisAdapter.brpop(0,key); //若队列中没有这个事件的话就一直等待
for (String message : events){
if (message.equals(key)){ //返回的第一个值可能是key,把他先过滤掉,取后面的event
continue;
}
//通过JSon的方式反序列化
EventModel eventModel = JSON.parseObject(message,EventModel.class);
if (!config.containsKey(eventModel.getType())){ //是不是有对这个事件有处理的handler
logger.error("不能识别的事件");
continue;
}
for (EventHandler handler : config.get(eventModel.getType())){
handler.doHander(eventModel);
}
}
}
}
});
thread.start();
}
//将config中所有的配置的接口
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
7.处理具体事件的具体的XXXhandler
例如这里写的点赞的handler
**
* @author WilsonSong
* @date 2018/6/4
* 处理点赞事件的handler
*/
@Component //就是把普通的对象在spring容器中初始化
public class LikeHandler implements EventHandler {
@Autowired
MessageService messageService;
@Autowired
userService uService;
@Override
public void doHander(EventModel model) {
Message message = new Message();
message.setFrom_id(WendaUtil.SYSTEMCONTROLLER_userId); //以系统管理员的额身份给你发消息说谁给你点了赞
message.setTo_id(model.getEntityOwnerId()); //发给谁,就是那个entity拥有者的id
message.setCreated_date(new Date());
User user = uService.getUser(model.getActorId()); //触发这个事件的用户id
message.setContent("用户" + user.getName() + "赞了你的评论,http://127.0.0.1:8080/question" + model.getExts("questionId"));
message.setConversationId(message.getConversationId());
messageService.addMessage(message);
}
@Override
public List<EventType> getSupportEventTypes() {
return Arrays.asList(EventType.LIKE); //只需要返回点赞的事件即可
}
}
原文链接:https://blog.****.net/WilsonSong1024/article/details/80573611