rocketMQ-记录分析produce服务启动的流程

1、Producer.star()启动时,主要调用了以下的5个类,从上到下表示调用的先后顺序。
rocketMQ-记录分析produce服务启动的流程
2、从主函数Producer开始,先初始化一个DefaultMQProducer类的producer对象,需要给定producerGroup名和nameServer的地址(我这里为"10.61.2.123:9876;10.61.2.124:9876")。这里初始化的主要内容是初始化自己的prodecerGroup的值,以及DefaultMQProducerImpl类的对象defaultMQProducerImpl。

3、producer.start()调用成员对象defaultMQProducerImpl的start方法。
  1. start方法首先根据defaultMQProducerImpl的serviceState进行跳转,它最开始为CREATE_JUST,执行其它命令前先更改为START_FAILED
  2. 执行checkConfig()函数,针对producerGroup是否为空,是否占用默认的groupName等一系列的验证。包括producerGroup不能为空以及不能为MixAll中默认的设定值“DEFAULT_PRODUCER”;
  3. 当producer的instanceName为默认值(即"rocketmq.client.name""DEFAULT")时,把instanceName改为pid;
  4. 用getAndCreateMQClientInstance()方法初始化MQClientInstance类的成员对象mQClientFactory,把当前的producer对象,以及rpcHook作为参数传入。
    1. (创建一个当前producer的clientId格式为[email protected]如果当前客户端不在mq客户端实例集合中,则创建一个实例并加入);
    2. 这里的MQClientManager为单例模式,目的是全局只能有一个mQClientManager对象,其它类只能通过MQClientManager.getInstance()来获取mQClientManager对象。
  5. 在mQClientFactory对象中把producer注册到producer集合中。这里要注意的是一个进程下,一个producer group只能对应一个producer,即我们的进程里,不能用相同的group名参数初始化多个producer。可以分开多个进程进行这个操作,但是它为什么要这样做还不能理解。
  6. this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());将topic放入topic集合中.createTopicKey其实是在程序配置的一个默认的topic名字. 为什么会传入一个默认的topic名字呢? 应为在创建每一个topic时,程序会去这个默认的topic作为模板,改topic以及所有创建的topic会保存在当前用户目录下的store/config/topics.json文件中。
  7. 调用mQClientFactory的start方法。

4、MQClientInstance.start()方法主要分为以下几个部分:
  1. 同样先根据mQClientFactory的serviceState进行跳转,它最开始为CREATE_JUST,执行其它命令前先更改为START_FAILED;
  2. 获取nameService地址
  3. 启动client端远程通信
  4. 启动各种定时
               1.更新nameService地址
               2.更新从nameService更新topic路由信息
               3.清理挂掉的broker,向broker发送心跳信息
               4.持久化consumerOffset
               5.调整消费线程池
  1. 启动拉取消息服务
  2. 启动消费端负载均衡服务
  3. 再次调用DefaultMQProducerImpl.start(false)。代码为this.defaultMQProducer.getDefaultMQProducerImpl().start(false);这里因为defaultMQProducerImpl在DefaultMQProducer类中为protected,所以需要用get函数获取。这里参数为false,所以在再次循环时,会跳过mQClientInstance,start(),从而不会死循环。需要注意的是,这里再次调用的虽然是同一个函数,但是执行函数的主题对象是不同的,之前是producer的defaultMQProducerImpl成员对象,现在是mQClientInstance自己初始化时的默认DefaultMQProducerImpl类对象。所以在最开始,它们的serviceState都是从CREATE_JUST开始。
  4. 改变serviceState状态   
    • 此调用返回后,serviceState会依次由START_FAILED变为RUNNING状态,总共有3个对象会进行这个变化。

5、  producer用自己的mQClientFactory对象执行心跳服务。包括两个主要函数:
  1. 发送心跳;
  2. uploadFilterClassSource();这个函数是对consumerTable表中的每个对象执行订阅相关服务。目前还不能理解它的作用,一个是这个函数是最开始启动服务的时候运行,那么consumerTable应该为空,这样的话这个函数就相当于没有执行。如果不是,那么consumeTable中的内容是如何产生,以及此函数的目的还待后面继续探索。