从RocketMQ源码的Store模块分析RocketMQ
前言
前几篇都是基于RockeMQ的工具类的源码来分析其架构,功能等,本节从RocketMQ的存储方面来分析,RocketMQ是如何实现回调,高可用的,这里引入一篇总结不错的文章.
消息追加的回调方法AppendCallback
这个的追加的消息体分为批量的以及单一的消息体两种方式,都是实现了MessageExt类,而MessageExt继承了Message的基类,包括定义了消息的offset,queueID,storeSize等重要信息,而对于批量核心的方法是wrap()对批量的消息存放到byteBuffer,实现批量的处理。具体的源码如下
public class MessageExtBatch extends MessageExt {
private static final long serialVersionUID = -2353110995348498537L;
public ByteBuffer wrap() {
assert getBody() != null;
return ByteBuffer.wrap(getBody(), 0, getBody().length);
}
private ByteBuffer encodedBuff;
public ByteBuffer getEncodedBuff() {
return encodedBuff;
}
public void setEncodedBuff(ByteBuffer encodedBuff) {
this.encodedBuff = encodedBuff;
}
}
//而这个就是JDK源码byteBuffer的wrap()的应用
public static ByteBuffer wrap(byte[] array,
int offset, int length)
{
try {
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}
消费队列ConsumeQueue
消费队列的核心类是自定义的一个CqExtUnit类,而这个类是对JDK的ByteBuffer的封装,比如read()以及write()
/**
* build unit from buffer from current position.
*/
private boolean read(final ByteBuffer buffer) {
if (buffer.position() + 2 > buffer.limit()) {
return false;
}
this.size = buffer.getShort();
//.....省略源码对buffer取值的判断,有兴趣读者可以下载查看源码
if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
this.filterBitMap = new byte[bitMapSize];
}
buffer.get(this.filterBitMap);
return true;
}
//写的逻辑如下
private byte[] write(final ByteBuffer container) {
this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
ByteBuffer temp = container;
if (temp == null || temp.capacity() < this.size) {
temp = ByteBuffer.allocate(this.size);
}
temp.flip();
temp.limit(this.size);
temp.putShort(this.size);
temp.putLong(this.tagsCode);
temp.putLong(this.msgStoreTime);
temp.putShort(this.bitMapSize);
if (this.bitMapSize > 0) {
temp.put(this.filterBitMap);
}
return temp.array();
}
默认的消息存储DefaultMessageStore
该类实现了MessageStore接口,这个提供了用户自定义消息的存储方式,包括通过offset获取消息的详细信息。而MessageStore的存储记录都使用CommitLog来实现对存储信息的记录,而这里的核心的类是StoreStatsService,这个类的具体核心就是一个静态的内部类CallSnapshot 快照形式来记录消息的一切行为,具体源码如下
static class CallSnapshot {
public final long timestamp;
public final long callTimesTotal;
public CallSnapshot(long timestamp, long callTimesTotal) {
this.timestamp = timestamp;
this.callTimesTotal = callTimesTotal;
}
public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
long total = end.callTimesTotal - begin.callTimesTotal;
Long time = end.timestamp - begin.timestamp;
double tps = total / time.doubleValue();
return tps * 1000;
}
}
而上面提及到的StoreStatsService,就是实现了ThreadService的一个子类,继承关系如下,实质上是一个线程。该线程使用了JDK的Concurrent包以及reentranlock以及unsafe类的原子类来实现线程安全。
另外补充一个知识点就是StoreCheckpoint,定义为存储检查点,类似于mybatis的事务回滚点rollpoint一样,而这里的StoreCheckPoit的存储检查点核心的逻辑就是FileChannel,下面是该类的继承类图,后续有时间会详细的针对这个channel的设计思路来分析这里实现的逻辑。
总结
本节分别从消息如何存储,以及存储在哪里,如何实现数据安全这几点分析了RocketMQ的实现逻辑,毕竟源码还有更加丰富的知识点,譬如Channel部分的实现以及使用针对消息追加的AppendCallback的使用,针对批量的操作以及默认的操作等等的实现。后续会在源码的自定义方面来扩展接口的使用,让这些接口能在更多的场景发挥作用,避免重复造轮子。