从RocketMQ源码的Store模块分析RocketMQ

前言

前几篇都是基于RockeMQ的工具类的源码来分析其架构,功能等,本节从RocketMQ的存储方面来分析,RocketMQ是如何实现回调,高可用的,这里引入一篇总结不错的文章.
从RocketMQ源码的Store模块分析RocketMQ

消息追加的回调方法AppendCallback

这个的追加的消息体分为批量的以及单一的消息体两种方式,都是实现了MessageExt类,而MessageExt继承了Message的基类,包括定义了消息的offset,queueID,storeSize等重要信息,而对于批量核心的方法是wrap()对批量的消息存放到byteBuffer,实现批量的处理。具体的源码如下

public class MessageExtBatch extends MessageExt {

    private static final long serialVersionUID = -2353110995348498537L;

    public ByteBuffer wrap() {
        assert getBody() != null;
        return ByteBuffer.wrap(getBody(), 0, getBody().length);
    }

    private ByteBuffer encodedBuff;

    public ByteBuffer getEncodedBuff() {
        return encodedBuff;
    }

    public void setEncodedBuff(ByteBuffer encodedBuff) {
        this.encodedBuff = encodedBuff;
    }
}

//而这个就是JDK源码byteBuffer的wrap()的应用
public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)
    {
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

消费队列ConsumeQueue

消费队列的核心类是自定义的一个CqExtUnit类,而这个类是对JDK的ByteBuffer的封装,比如read()以及write()

    /**
         * build unit from buffer from current position.
         */
        private boolean read(final ByteBuffer buffer) {
            if (buffer.position() + 2 > buffer.limit()) {
                return false;
            }

            this.size = buffer.getShort();
            //.....省略源码对buffer取值的判断,有兴趣读者可以下载查看源码
            if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
                this.filterBitMap = new byte[bitMapSize];
            }

            buffer.get(this.filterBitMap);
            return true;
        }
     //写的逻辑如下
   private byte[] write(final ByteBuffer container) {
            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
            this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);

            ByteBuffer temp = container;

            if (temp == null || temp.capacity() < this.size) {
                temp = ByteBuffer.allocate(this.size);
            }

            temp.flip();
            temp.limit(this.size);

            temp.putShort(this.size);
            temp.putLong(this.tagsCode);
            temp.putLong(this.msgStoreTime);
            temp.putShort(this.bitMapSize);
            if (this.bitMapSize > 0) {
                temp.put(this.filterBitMap);
            }

            return temp.array();
        }

默认的消息存储DefaultMessageStore

该类实现了MessageStore接口,这个提供了用户自定义消息的存储方式,包括通过offset获取消息的详细信息。而MessageStore的存储记录都使用CommitLog来实现对存储信息的记录,而这里的核心的类是StoreStatsService,这个类的具体核心就是一个静态的内部类CallSnapshot 快照形式来记录消息的一切行为,具体源码如下

static class CallSnapshot {
        public final long timestamp;
        public final long callTimesTotal;

        public CallSnapshot(long timestamp, long callTimesTotal) {
            this.timestamp = timestamp;
            this.callTimesTotal = callTimesTotal;
        }

        public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
            long total = end.callTimesTotal - begin.callTimesTotal;
            Long time = end.timestamp - begin.timestamp;

            double tps = total / time.doubleValue();

            return tps * 1000;
        }
    }

而上面提及到的StoreStatsService,就是实现了ThreadService的一个子类,继承关系如下,实质上是一个线程。该线程使用了JDK的Concurrent包以及reentranlock以及unsafe类的原子类来实现线程安全。
从RocketMQ源码的Store模块分析RocketMQ

另外补充一个知识点就是StoreCheckpoint,定义为存储检查点,类似于mybatis的事务回滚点rollpoint一样,而这里的StoreCheckPoit的存储检查点核心的逻辑就是FileChannel,下面是该类的继承类图,后续有时间会详细的针对这个channel的设计思路来分析这里实现的逻辑。
从RocketMQ源码的Store模块分析RocketMQ

总结

本节分别从消息如何存储,以及存储在哪里,如何实现数据安全这几点分析了RocketMQ的实现逻辑,毕竟源码还有更加丰富的知识点,譬如Channel部分的实现以及使用针对消息追加的AppendCallback的使用,针对批量的操作以及默认的操作等等的实现。后续会在源码的自定义方面来扩展接口的使用,让这些接口能在更多的场景发挥作用,避免重复造轮子。