Cat源码分析(二):Server端
初始化
服务端消费客户端发来的消息进行分析和展示,所以这个的初始化指的是CatHomeModule的初始化
CatHomeModule依赖TcpSocketReceiver和MessageConsumer,前者用来接收客户端发送的消息,后者用来消费消息。
TcpSocketReceiver通过Messagecodec对MessageQueue中的MessageTree进行解码,还原成为MessageTree,然后通过MessageHandler调用Consumer对消息进行消费。(这个消费的过程其实是一个消息分发的过程。消息有不同的消息分析器)
消费的过程是一个周期性的过程,对应上图右边部分。一个Period代表一个周期,每个周期对应一个持续时间(duration),默认为一小时。
RealTimeConsumer是MessageConsumer的实现类,他的作用是进行实时的消费,如何实现周期性消费呢?他需要依赖PeriodManager,进行周期管理。所以在初始化MessageConsumer的过程中会初始化PeriodManager。并且开启periodmanager的守护线程,进行周期开始和结束的控制。
通过m_strategy.next(now)方法进行时间对比,返回大于零或小于零的值,来决定是开始新的周期还是结束旧的周期。这个线程是每隔一秒执行一次的。
public long next(long now) {
long startTime = now - now % m_duration;//得到一个整点的时间,作为开始时间,如果now是10:05,startTime就是10:00
Date nowTime = new Date(startTime);
// for current period 当前周期返回开始时间,
//第一次进入的时候m_lastStartTime初始化为周期开始时间
if (startTime > m_lastStartTime) {
m_lastStartTime = startTime;
return startTime;
}
// prepare next period ahead
//下一个时期 返回大于0的值,则开始新的周期
if (now - m_lastStartTime >= m_duration - m_aheadTime) {
m_lastStartTime = startTime + m_duration;
return startTime + m_duration;
}
// last period is over 上一个周期已经结束了
//返回小于零的值,销毁上一个周期
if (now - m_lastEndTime >= m_duration + m_extraTime) {
long lastEndTime = m_lastEndTime;
m_lastEndTime = startTime;
return -lastEndTime;
}
在初始化周期管理器的时候,会执行startPeriod方法,来开启一个周期。实例化Period,在这个过程中,会通过analyzerManager得到12种分析器,并声明一个m_task的HashMap<String,List> key就是分析器名称,value就是List,分析器不同对应的list的size也不同,例如transaction分析比较耗时,就会分配两个PeriodTask,如下图
private void startPeriod(long startTime) {
long endTime = startTime + m_strategy.getDuration();
Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
m_periods.add(period);
period.start();
}
消息消费
实例化完成后,将period加入到m_periods中,然后调用period.start方法,这个方法循环m_task每一种分析器,启动periodTask的线程,进行analyze,这个方法会一直对队列进行轮询,从队列中取出tree,进行process处理, process是抽象方法,具体会由重写了该方法的子类去执行process方法
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
List<PeriodTask> taskList = tasks.getValue();
for (int i = 0; i < taskList.size(); i++) {
PeriodTask task = taskList.get(i);
task.setIndex(i);
Threads.forGroup("Cat-RealtimeConsumer").start(task);
}
}
@Override
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (Exception e) {
Cat.logError(e);
}
}
消息分发
讲完有关周期的初始化过程,我们在回头看看server接收到消息是如何放到消费队列中的
tcp接收到消息---->MessageDocoder: decode—>DefaulMessageHandler : handle(MessageTree tree) m_consumer.consume(tree)----> RealtimeConsumer consume方法
@Override
public void consume(MessageTree tree) {
long timestamp = tree.getMessage().getTimestamp();
Period period = m_periodManager.findPeriod(timestamp);
if (period != null) {
period.distribute(tree);
} else {
m_serverStateManager.addNetworkTimeError(1);
}
}
- 根据当前时间,找到当前时间对应的周期Period
- 然后调用周期的distribute方法进行消息的分发
a) 循环m_tasks
i. 获得list中的某一个PeriodTask
ii. 将消息放到队列(m_queue)中
可以看出,一个消息将被放到所有类型的消息分析器进行分析
写在后边
分析器将与后边做具体的分析
说明:
本文涉及到的UML图片来源于大众点评Cat–Server模块架构分析这篇文章,部分内容也有参考