RocketMQ技术调研
RocketMQ集群部署和架构
RocketMQ集群部署
版本: rocketmq-all-4.5.2-bin-release
Master-a:
10.*.*.11 hostname:11
Slave-a:
10.*.*.12 hostname:12
Master-b:
10.*.*.237 hostname:237
Slave-b:
10.*.*.241 hostname:241
Name Server :
10.*.14.11 10.*.14.12 10.*.14.241
说明:以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave的BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。
集群监控服务
rocketmq-console-ng-1.0.1.jar
启动服务命令:nohup java –jar rocketmq-console-ng-1.0.1.jar> /data/rocketmq-console-log/rocketmq-console.log 2>&1 &
地址:http://10.*.14.241:14300/#/cluster
集群服务可用状态检查
通过 mqadmin 运维命令查看集群状态
bin/mqadmin clusterList -n 10.*.14.241:9876
说明:其中 BID 为 0 的表示 Master,其余都是 Follower
过程中遇到的问题
主要原因是linux下缺少依赖包。$JAVA_HOME/jre/lib/ext/,找到sunjce_provider.jar 然后拷贝至linux下对应的目录即可。 或者配置环境变量能加载到这个jar。
其他网络端口权限问题:
涉及服务端口:
10.*.14.11:9876;10.*.14.12:9876;10.*.14.241:9876
10.*.14.241:14300
RocketMQ技术基本概念
Topic:对于RocketMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。这样设计从开始就是为了解决大规模高并发下消息的顺序消费问题。和kafka中的topic类似
Tag :给消息打标签,用于区分一类消息,可为null,只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。反之订阅了不存在的tags 也消费不到任何数据,也不会报异常。可以达到再次过滤目的。比如消费端可以只消费topic001中TagA的消息.
Queue:消息队列缓存数据,一个topic下有1或多个,可以类比kafka中的partition
Group:生产或消费的客户端标识,生产和消费都有,与kafka不同
msgID:消息ID ,msgId是mq自动生成的,可在控制台message中查找数据。如:msgId=0A06CD634F612437C6DC3ABF77950001
msgKey:消息唯一key,可以在控制台根据key查询消息或用于去重,需要在发送时设定,也可为null。 每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题回溯。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
queueOffset(队列消息偏移量)、InstanceName(生产客户端实例名)
rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。
rocketmq-client:提供发送、接受消息的客户端API。
rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。(有点NameNode的味道)
rocketmq-store:消息、索引存储等
RocketMQ 非集群模式测试
测试环境:10.*.100.33:10911
- 多队列生产:且指定生产topic 的指定队列顺序
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C3469D0000,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346BE0001,发送状态:SEND_OK
msg2发送响应:MsgId:0A06CD63432F2437C6DC31C346C60002,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346CD0003,发送状态:SEND_OK
msg3发送响应:MsgId:0A06CD63432F2437C6DC31C346D50004,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346DA0005,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346E00006,发送状态:SEND_OK
msg2发送响应:MsgId:0A06CD63432F2437C6DC31C346E40007,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346E90008,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346ED0009,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346F1000A,发送状态:SEND_OK
msg2发送响应:MsgId:0A06CD63432F2437C6DC31C346F4000B,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346F8000C,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C346FB000D,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C34701000E,发送状态:SEND_OK
msg2发送响应:MsgId:0A06CD63432F2437C6DC31C34704000F,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C347080010,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C3470C0011,发送状态:SEND_OK
msg3发送响应:MsgId:0A06CD63432F2437C6DC31C3470F0012,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C347130013,发送状态:SEND_OK
msg2发送响应:MsgId:0A06CD63432F2437C6DC31C347160014,发送状态:SEND_OK
msg1发送响应:MsgId:0A06CD63432F2437C6DC31C347190015,发送状态:SEND_OK
Process finished with exit code 0
- 消费端订阅指定tag,且有序消费
消费TagB的情况: msgId: 0A06CD63432F2437C6DC31C346C60002 QueueId: 2 topic#tags: t_luo01,TagB msgs大小: 199 msg:北京 Hello RocketMQ 3
消费TagB的情况: msgId: 0A06CD63432F2437C6DC31C346E40007 QueueId: 2 topic#tags: t_luo01,TagB msgs大小: 199 msg:北京 Hello RocketMQ 9
消费TagB的情况: msgId: 0A06CD63432F2437C6DC31C346F4000B QueueId: 2 topic#tags: t_luo01,TagB msgs大小: 201 msg:北京 Hello RocketMQ 15
消费TagB的情况: msgId: 0A06CD63432F2437C6DC31C34704000F QueueId: 2 topic#tags: t_luo01,TagB msgs大小: 201 msg:北京 Hello RocketMQ 21
消费TagB的情况: msgId: 0A06CD63432F2437C6DC31C347160014 QueueId: 2 topic#tags: t_luo01,TagB msgs大小: 201 msg:北京 Hello RocketMQ 27
测试结论:
其中以TagB为例:生产中:30条数据 其中TagB 5条
topic:t_luo01
消费的数据量和顺序完全一致。
- 生产单线程且有序:1000条数据
- 单线程多队列顺序生产10000条:生产耗时 22s
生产1000条数据 多队列有序
- 多个消费客户端是否支持消息顺序执行
日志添加 "消费者1" "消费者2" 用来区分 不同的消费者
启动C1,C2查看控制台消费者状况
消费者1日志如下:
消费者2日志如下:
测试结论:
可以看到负载均衡,并且消息是有顺序的。同一个类型的消息都会放到同一个队列中。
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 0
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 1
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 2
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 3
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 4
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 5
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 6
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 7
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 8
发送状态:SEND_OKtag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 9
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 10
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 11
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 12
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 13
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 14
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 15
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 16
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 17
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 18
发送状态:SEND_OKtag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 19
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 20
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 21
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 22
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 23
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 24
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 25
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 26
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 27
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 28
发送状态:SEND_OKtag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 29
生产数据总耗时: 548
消费者1 ==> 当前线程:ConsumeMessageThread_10 ,quenuID: 1 ,content: 北京 Hello RocketMQ 0
生产数据总耗时: 592080
消费者1 ==> 当前线程:ConsumeMessageThread_11 ,quenuID: 2 ,content: 北京 Hello RocketMQ 10
生产数据总耗时: 592080
消费者1 ==> 当前线程:ConsumeMessageThread_10 ,quenuID: 1 ,content: 北京 Hello RocketMQ 1
生产数据总耗时: 592080
消费者1 ==> 当前线程:ConsumeMessageThread_11 ,quenuID: 2 ,content: 北京 Hello RocketMQ 11
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_12 ,quenuID: 3 ,content: 北京 Hello RocketMQ 20
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_10 ,quenuID: 1 ,content: 北京 Hello RocketMQ 2
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_12 ,quenuID: 3 ,content: 北京 Hello RocketMQ 21
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_10 ,quenuID: 1 ,content: 北京 Hello RocketMQ 3
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_10 ,quenuID: 1 ,content: 北京 Hello RocketMQ 4
生产数据总耗时: 592081
消费者1 ==> 当前线程:ConsumeMessageThread_13 ,quenuID: 1 ,content: 北京 Hello RocketMQ 5
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_13 ,quenuID: 1 ,content: 北京 Hello RocketMQ 6
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_13 ,quenuID: 1 ,content: 北京 Hello RocketMQ 7
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_13 ,quenuID: 1 ,content: 北京 Hello RocketMQ 8
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_13 ,quenuID: 1 ,content: 北京 Hello RocketMQ 9
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 12
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 13
生产数据总耗时: 592084
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 14
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 15
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 16
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 17
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 18
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_14 ,quenuID: 2 ,content: 北京 Hello RocketMQ 19
生产数据总耗时: 592085
消费者1 ==> 当前线程:ConsumeMessageThread_15 ,quenuID: 3 ,content: 北京 Hello RocketMQ 22
生产数据总耗时: 592369
消费者1 ==> 当前线程:ConsumeMessageThread_15 ,quenuID: 3 ,content: 北京 Hello RocketMQ 23
生产数据总耗时: 592370
消费者1 ==> 当前线程:ConsumeMessageThread_15 ,quenuID: 3 ,content: 北京 Hello RocketMQ 24
生产数据总耗时: 592370
消费者1 ==> 当前线程:ConsumeMessageThread_15 ,quenuID: 3 ,content: 北京 Hello RocketMQ 25
生产数据总耗时: 592370
消费者1 ==> 当前线程:ConsumeMessageThread_16 ,quenuID: 3 ,content: 北京 Hello RocketMQ 26
生产数据总耗时: 592373
消费者1 ==> 当前线程:ConsumeMessageThread_16 ,quenuID: 3 ,content: 北京 Hello RocketMQ 27
生产数据总耗时: 592373
消费者1 ==> 当前线程:ConsumeMessageThread_16 ,quenuID: 3 ,content: 北京 Hello RocketMQ 28
生产数据总耗时: 592373
消费者1 ==> 当前线程:ConsumeMessageThread_16 ,quenuID: 3 ,content: 北京 Hello RocketMQ 29
生产数据总耗时: 592373
测试结论:
可以看到生产端每个队列数据是有序的,从全局上来看,消息的消费不是有序的,但是每一个队列的消息是顺序消费的,且即便是多线程情况下(其实broker并不能完全保证消息的顺序消费,它仅仅能保证的消息的顺序发送)
所以需要保证消息的顺序消费,那么首先需要做到一组需要有序消费的消息发往同一个broker的同一个队列上,其次消费者端采用有序Listener(MessageListenerOrderly监听器)即可。
- 一个消费端订阅一个topic的多个tag消费,且有序:
生产端代码和测试:
RocketMQLog:WARN Please initialize the logger system properly.
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 0
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 1
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 2
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 3
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 4
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 5
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 6
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 7
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 8
发送状态:SEND_OK,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 9
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 10
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 11
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 12
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 13
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 14
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 15
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 16
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 17
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 18
发送状态:SEND_OK,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 19
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 20
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 21
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 22
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 23
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 24
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 25
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 26
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 27
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 28
发送状态:SEND_OK,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 29
生产数据总耗时: 578
Process finished with exit code 0
消费端代码和测试情况:
Consumer Started.
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 0
生产数据总耗时: 24095
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 1
生产数据总耗时: 24096
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 2
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 3
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 4
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 5
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 6
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 7
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 8
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 1 ,content: 北京 Hello RocketMQ 9
生产数据总耗时: 24097
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 20
生产数据总耗时: 44106
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 21
生产数据总耗时: 44106
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 22
生产数据总耗时: 44106
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 23
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 24
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 25
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 26
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 27
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 28
生产数据总耗时: 44107
消费者2 ==> 当前线程:ConsumeMessageThread_2 ,quenuID: 3 ,content: 北京 Hello RocketMQ 29
生产数据总耗时: 44107
- 同一个topic下tag的问题:
同一个消费组中,设置不同tag时,后启动的消费者会覆盖先启动的消费者设置的tag,会导致客户端发现数据有丢失情况。
所以每个客户端尽量都使用不同的唯一groupID,规避这种问题的出现。这个设计原理和kafka counsumer 同一个group同一时刻消费同一个topic的类似。
具体测试情况如下:
先启动的consumer1客户端: 没有消费到任何数据
后启动的consumer2客户端: 消费到了一部分数据
- Rocketmq实现顺序消费机制:
1.rocketmq保证同一个订单的消息,一定要发送到同一个队列 - 并且该队列只有一个消费者,也就是说 同一个队列,不能出现多个消费者并行消费的情况。但一个队列只有一个消费者,性能如何?关于这个情况Rocketmq的解决方法是同一个队列不能并行消费,但是可以并行消费不同的队列。比如:同时多笔订单之间是可以并行消费的。
- RocketMQ中同一个Topic 不同的group订阅消费不同的tag场景:且保证有序
生产端:
topic name: t_luo01
group:p_luo01
tags: TagA、TagB、TagC
数据量:分别10条 总共30条
消费端 多线程消费服务1:
topic name: t_luo01
group: c_luo02
tags: TTagB
消费的数据量:10条 且有序
Consumer Started.
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 10
生产数据总耗时: 4121
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 11
生产数据总耗时: 4122
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 12
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 13
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 14
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 15
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 16
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 17
生产数据总耗时: 4123
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 18
生产数据总耗时: 4124
消费者1 ==> 当前线程:ConsumeMessageThread_1 ,quenuID: 2 ,content: 北京 Hello RocketMQ 19
生产数据总耗时: 4124
消费端 多线程消费服务2:
topic name: t_luo01
group:c_luo03
tags: TagA、TagC
消费的数据量:20条 且分别有序
消费者2 ==> 当前线程:ConsumeMessageThread_3 ,quenuID: 1 ,content: 北京 Hello RocketMQ 0
生产数据总耗时: 25239
消费者2 ==> 当前线程:ConsumeMessageThread_3 ,quenuID: 1 ,content: 北京 Hello RocketMQ 1
生产数据总耗时: 25239
消费者2 ==> 当前线程:ConsumeMessageThread_3 ,quenuID: 1 ,content: 北京 Hello RocketMQ 2
生产数据总耗时: 25239
消费者2 ==> 当前线程:ConsumeMessageThread_3 ,quenuID: 1 ,content: 北京 Hello RocketMQ 3
生产数据总耗时: 25239
消费者2 ==> 当前线程:ConsumeMessageThread_3 ,quenuID: 1 ,content: 北京 Hello RocketMQ 4
生产数据总耗时: 25239
消费者2 ==> 当前线程:ConsumeMessageThread_4 ,quenuID: 1 ,content: 北京 Hello RocketMQ 5
生产数据总耗时: 25242
消费者2 ==> 当前线程:ConsumeMessageThread_4 ,quenuID: 1 ,content: 北京 Hello RocketMQ 6
生产数据总耗时: 25242
消费者2 ==> 当前线程:ConsumeMessageThread_4 ,quenuID: 1 ,content: 北京 Hello RocketMQ 7
生产数据总耗时: 25242
消费者2 ==> 当前线程:ConsumeMessageThread_4 ,quenuID: 1 ,content: 北京 Hello RocketMQ 8
生产数据总耗时: 25242
消费者2 ==> 当前线程:ConsumeMessageThread_4 ,quenuID: 1 ,content: 北京 Hello RocketMQ 9
生产数据总耗时: 25243
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 20
生产数据总耗时: 25496
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 21
生产数据总耗时: 25496
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 22
生产数据总耗时: 25496
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 23
生产数据总耗时: 25496
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 24
生产数据总耗时: 25496
消费者2 ==> 当前线程:ConsumeMessageThread_5 ,quenuID: 3 ,content: 北京 Hello RocketMQ 25
生产数据总耗时: 25497
消费者2 ==> 当前线程:ConsumeMessageThread_6 ,quenuID: 3 ,content: 北京 Hello RocketMQ 26
生产数据总耗时: 25501
消费者2 ==> 当前线程:ConsumeMessageThread_6 ,quenuID: 3 ,content: 北京 Hello RocketMQ 27
生产数据总耗时: 25502
消费者2 ==> 当前线程:ConsumeMessageThread_6 ,quenuID: 3 ,content: 北京 Hello RocketMQ 28
生产数据总耗时: 25502
消费者2 ==> 当前线程:ConsumeMessageThread_6 ,quenuID: 3 ,content: 北京 Hello RocketMQ 29
生产数据总耗时: 25502
- RocketMQ中同一个Topic 同一个的group订阅消费不同的tag场景:且保证有序
生产端:
topic name: t_luo01
tags: TagA、TagB、TagC
数据量:分别10条 总共30条
测试结论: 不同的消费端用同一个group 订阅不同的tag,后启动的消费端服务会覆盖前一个的的tag,会导致前一个消费客户端消费不到数据。所有建议消费端都尽量采用不同的group 规避这种问题的发生。
- RocketMQ提供了3种模式的Producer:
NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务)
NormalProducer模式调用传统的send方法,消息是无序的。
OrderProducer 模式调用Send方法中有一个MessageQueueSelector,将用于指定特定的消息发往特定的队列中。
TransactionProducer模式对数据状态要求严格一致性的场景,但性能有降低。 - Producer客户端发送消息的三种状态方式:
- 可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
相关代码测试:
说明:sendStatus=SLAVE_NOT_AVAILABLE
目前集群部署模式是 2s-2m-sync 双主双从+同步 如果代理的角色是SYNC_MASTER(默认是ASYNC_MASTER),但是没有配置 Slave Broker ,将获得这个状态。
其他返回状态说明:SEND_OK
SEND_OK 并不意味着它是可靠的。为了确保没有信息会丢失,应启用 SYNC_MASTER 或 SYNC_FLUSH。但性能降低10%左右。
其中:
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD634D2B2437C6DC372832620000, offsetMsgId=0A030E0B00002A9F00000000000626BC, messageQueue=MessageQueue [topic=PushTopic, brokerName=broker-a, queueId=1], queueOffset=51]
已发送到master节点 但slave配置是异步同步模式
同步发送中 还有同步有序发送消息到指定队列中:
主要依赖MessageQueueSelector方法实现,RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash 根据实际业务情况可灵活选择设计
具体代码实现和测试如下:
- 可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
相关代码测试:
由于本地开发连接虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP
的会话,关闭后再读写,就会导致这个异常问题。
- 单向(Oneway)发送
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
相关代码测试:
说明:生产端无返回状态,消费端多线程可正常消费数据
测试结论三种方式比较:
如果希望在一个Producer JVM客户端中有多个生产者进行大规模数据处理,建议:
1.可以多个生产者一起使用异步发送
2.为每个生产者单独设置setInstanceName 做区分
代码如:producer.setInstanceName("TxProducer-instance1");
- RocketMQ消费端模式:
目前支持两种消费监听器模式MessageListenerConcurrently和MessageListenerOrderly。
其中MessageListenerOrderly 保证顺序消费,消费端接收的是同一个队列的消息,避免多线程时顺序错乱
代码实现和测试:
MessageListenerConcurrently 监听模式用来高并发无序消费数
代码实现和测试: - RocketMQ 事务型生产消费模式:
RocketMQ从4.3.0版本开始,支持了事务消息。
一般分布式采用2pc(2 phase commit)模式(两阶段提交协议:预留和确认),安全性高,但是因为长连接导致长时间等待。(当然也有三阶段提交)
而RocketMQ采用两阶段补偿型,TCC(Try-Confirm-Cancel)的简称。采用2PC的方案来提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
上图说明了事务消息的大致方案,分为两个逻辑:正常事务消息的发送及提交、事务消息的补偿流程
事务消息发送及提交:
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
Ack消息的3种状态
Broker是根据Producer发送过来的状态码,来决定下一步的操作TransactionStatus.CommitTransaction、TransactionStatus.RollbackTransaction、TransactionStatus.Unknown(提交、回滚、重试) - 事务型生产消费代码实现和测试:
生产者代码修改,需添加:
sendMessage变化:TransactionSendResult
producer变化: TransactionMQProducer
添加监听: TransactionListener
遇到问题:
transactionProducer 生产后无法消费
maven中缺少 rocketmq-all依赖导致
解决:引入下面的依赖
org.apache.rocketmq
rocketmq-all
4.5.2
生产事务数据代码和测试:
msg :hello 0
arg :tq
main ---> SendResult [sendStatus=SEND_OK, msgId=0A06CD6353172437C6DC3CCF29760000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=0], queueOffset=239]
msg :hello 1
arg :tq
这里处理业务逻辑,如操作数据库,失败情况下进行ROLLBACK
main ---> SendResult [sendStatus=SEND_OK, msgId=0A06CD6353172437C6DC3CCF299F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=1], queueOffset=240]
msg :hello 2
arg :tq
main ---> SendResult [sendStatus=SEND_OK, msgId=0A06CD6353172437C6DC3CCF29B10002, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=2], queueOffset=241]
Process finished with exit code 0
消费端代码和测试:和普通消费一样
transaction Consumer started...
ConsumeMessageThread_1,Receive: [MessageExt [queueId=0, storeSize=274, queueOffset=3, sysFlag=8, bornTimestamp=1576149809528, bornHost=/10.6.205.99:56034, storeTimestamp=1576149809565, storeHost=/10.2.100.33:10911, msgId=0A02642100002A9F000000000001573B, commitLogOffset=87867, bodyCRC=552077809, reconsumeTimes=0, preparedTransactionOffset=87585, toString()=Message{topic='TopicTransaction', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTransaction, MAX_OFFSET=4, KEYS=key, TRAN_MSG=true, CONSUME_START_TIME=1576149809578, UNIQ_KEY=0A06CD6353172437C6DC3CCF29760000, WAIT=true, PGROUP=transaction_producer, TAGS=transaction0, REAL_QID=0}, body=[104, 101, 108, 108, 111, 32, 48], transactionId='0A06CD6353172437C6DC3CCF29760000'}]]
收到消息: topic:TopicTransaction tags:transaction0 msg:hello 0
ConsumeMessageThread_2,Receive: [MessageExt [queueId=2, storeSize=274, queueOffset=6, sysFlag=8, bornTimestamp=1576149809585, bornHost=/10.6.205.99:56034, storeTimestamp=1576149809600, storeHost=/10.2.100.33:10911, msgId=0A02642100002A9F0000000000015B95, commitLogOffset=88981, bodyCRC=1323722973, reconsumeTimes=0, preparedTransactionOffset=88699, toString()=Message{topic='TopicTransaction', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTransaction, MAX_OFFSET=7, KEYS=key, TRAN_MSG=true, CONSUME_START_TIME=1576149809605, UNIQ_KEY=0A06CD6353172437C6DC3CCF29B10002, WAIT=true, PGROUP=transaction_producer, TAGS=transaction2, REAL_QID=2}, body=[104, 101, 108, 108, 111, 32, 50], transactionId='0A06CD6353172437C6DC3CCF29B10002'}]]
收到消息: topic:TopicTransaction tags:transaction2 msg:hello 2
本地事务代码:
非集群模式下,事务型生产消息代码和测试:
事务机制相关代码:
RocketMQLog:WARN Please initialize the logger system properly.
处理业务逻辑,如操作数据库,失败情况下进行事务ROLLBACK
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 0,Tag: TagA, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07F9800000, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=3], queueOffset=302]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 1,Tag: TagB, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07F9B30001, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=0], queueOffset=303]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 2,Tag: TagC, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07F9BB0002, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=1], queueOffset=304]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 3,Tag: TagD, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07F9CB0003, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=2], queueOffset=305]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 4,Tag: TagE, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07F9F60004, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=3], queueOffset=306]
处理业务逻辑,如操作数据库,失败情况下进行事务ROLLBACK
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 5,Tag: TagA, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07FA060005, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=0], queueOffset=307]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 6,Tag: TagB, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07FA180006, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=1], queueOffset=308]
Thu Dec 12 20:25:32 CST 2019消息: 事务 message bj RocketMQ 7,Tag: TagC, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07FA280007, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=2], queueOffset=309]
Thu Dec 12 20:25:33 CST 2019消息: 事务 message bj RocketMQ 8,Tag: TagD, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07FA380008, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=3], queueOffset=310]
Thu Dec 12 20:25:33 CST 2019消息: 事务 message bj RocketMQ 9,Tag: TagE, SendResult [sendStatus=SEND_OK, msgId=0A06CD6354502437C6DC3D07FA4A0009, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-a, queueId=0], queueOffset=311]
Process finished with exit code 0
非集群模式下,消费端代码和测试:
transaction Consumer Started...
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_4,storeHost: /10.2.100.33:10911,Tag: TagB,quenuID: 0 ,content: 事务 message bj RocketMQ 1
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_2,storeHost: /10.2.100.33:10911,Tag: TagD,quenuID: 2 ,content: 事务 message bj RocketMQ 3
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_1,storeHost: /10.2.100.33:10911,Tag: TagC,quenuID: 1 ,content: 事务 message bj RocketMQ 2
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_3,storeHost: /10.2.100.33:10911,Tag: TagE,quenuID: 3 ,content: 事务 message bj RocketMQ 4
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_4,storeHost: /10.2.100.33:10911,Tag: TagE,quenuID: 0 ,content: 事务 message bj RocketMQ 9
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_2,storeHost: /10.2.100.33:10911,Tag: TagC,quenuID: 2 ,content: 事务 message bj RocketMQ 7
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_3,storeHost: /10.2.100.33:10911,Tag: TagD,quenuID: 3 ,content: 事务 message bj RocketMQ 8
Thu Dec 12 20:26:09 CST 2019 消费者 ==> 当前线程: ConsumeMessageThread_1,storeHost: /10.2.100.33:10911,Tag: TagB,quenuID: 1 ,content: 事务 message bj RocketMQ 6
Process finished with exit code 130
结果分析说明:
- RocetkMQ生产端本地事务未提交的事务数据,这些对消费端是不可见的。
其中TagA的数据未提交,在消费端中的数据只消费到了TagA以外的其他所有数据。 - 发现事务型消息消费端 有30s的延迟消费情况。
- 消费端要能成功消费 必须 sendStatus=SEND_OK,否则消费不到数据。(集群模式会测试到)
- 事务型消息衍生出来要考虑的问题:
- 这在监控界面会看到生产和消费的数据数量不一致,那不一致是未提交事务的影响,还是消费延迟阻塞的影响。会对监控者 造成干扰,需要考虑区分。
- 事务中未提交的 ROLLBACK_MESSAGE 数据,如何回溯和补偿问题。需要业务端技术设计和代码实现。
- RocketMQ 3.2.6 版本前官方社区版本事务消息的补偿机制流程:
(1)对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2)Producer收到回查消息,检查回查消息对应的本地事务的状态
(3)根据本地事务状态,重新Commit或者Rollback
补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
因非商业RocketMQ 3.2.6版本后取消回查机制:
//事务回查最大数
producer.setCheckRequestHoldMax(200);
//事务最大并发数
producer.setCheckThreadPoolMaxSize(20);
//事务最小并发数
producer.setCheckThreadPoolMinSize(5);
producer.setExecutorService(executorService);
等都已经过期没有作用,本来是用于开启多线程进行回查用。
测试结论:3.2.6 版本后非社区的Rocketmq取消了事务回查机制,如果丢失需要自己手动通过key值回查补偿。(可先在代码中实现代码retry机制,然后在回溯补偿机制)
集群模式生产消费:
集群环境:
RocketMQ NAMESRV_ADDR=10.3.14.11:9876;10.3.14.12:9876;10.3.14.241:9876
- 非事务型有序生产数据代码和测试:
RocketMQLog:WARN Please initialize the logger system properly.
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 0
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 1
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 2
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 3
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 4
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 5
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 6
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 7
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 8
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 9
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 10
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 11
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 12
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 13
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 14
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 15
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 16
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 17
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 18
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 19
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 20
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 21
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 22
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 23
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 24
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 25
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 26
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 27
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 28
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 29
Process finished with exit code 0
多线程指定Tag顺序消费:
consumer.subscribe("t_luo01", "TagA||TagC");
测试结果问题:
- 发现集群模式,消费数据时 只从storeHost: /10.3.14.11:10911 所在master节点push数据。(数据均衡问题?)
- 监控界面分析:
集群模式生产消费 发现数据的生产和消费也都在一个master-a 上。
监控工具问题: master-a stop后,在start 发现broker-a 上的监控数据都归零。 继续生产消费发现如下异常问题:生产数据>消费数量
- 集群模式下生产消费 同一个topic生产过程中,增加Tag 消费情况:
- 集群模式生产消费HA:
master节点生产消费HA:非事务生产数据
RocketMQLog:WARN Please initialize the logger system properly.
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 0
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 1
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 2
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 3
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 4
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 5
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 6
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 7
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 8
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 9
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 10
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 11
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 12
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 13
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 14
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 15
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 16
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 17
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 18
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 19
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 20
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 21
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 22
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 23
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 24
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 25
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 26
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 27
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 28
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 29
Process finished with exit code 0
预先集群模式下给topic:t_luo01 成功发送30条数据:
stop broker-a master 节点服务:
集群模式: 生产端对topic: t_luo01 发送数据,客户端报以下异常
启动消费端看是否能从slave节点消费到之前已经发送的数据
结果:没有从slave节点消费到这30条数据! 需要排查分析原因!!!
推断可能和 发送返回状态:SLAVE_NOT_AVAILABLE 状态有关。Slave节点没生效。
新建topic 对新topic 生产端 发送数据:
RocketMQLog:WARN Please initialize the logger system properly.
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 0
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 1
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 2
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 3
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 4
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 5
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 6
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 7
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 8
发送状态:SLAVE_NOT_AVAILABLE,tag: TagA,队列ID: 1,消息体: 北京 Hello RocketMQ 9
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 10
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 11
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 12
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 13
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 14
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 15
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 16
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 17
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 18
发送状态:SLAVE_NOT_AVAILABLE,tag: TagB,队列ID: 2,消息体: 北京 Hello RocketMQ 19
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 20
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 21
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 22
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 23
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 24
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 25
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 26
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 27
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 28
发送状态:SLAVE_NOT_AVAILABLE,tag: TagC,队列ID: 3,消息体: 北京 Hello RocketMQ 29
Process finished with exit code 0
测试结果: 可正常生产30条数据
新创建topic 消费数据:
Consumer Started.
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 20
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 21
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 22
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 23
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 24
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 25
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 26
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 27
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 28
消费者2 ==> 当前线程:ConsumeMessageThread_1,storeHost: /10.3.14.237:10911,Tag: TagC ,quenuID: 3 ,content: 北京 Hello RocketMQ 29
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 10
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 11
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 12
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 13
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 14
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 15
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 16
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 17
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 18
消费者2 ==> 当前线程:ConsumeMessageThread_2,storeHost: /10.3.14.237:10911,Tag: TagB ,quenuID: 2 ,content: 北京 Hello RocketMQ 19
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 0
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 1
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 2
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 3
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 4
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 5
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 6
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 7
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 8
消费者2 ==> 当前线程:ConsumeMessageThread_3,storeHost: /10.3.14.237:10911,Tag: TagA ,quenuID: 1 ,content: 北京 Hello RocketMQ 9
测试结果:可正常消费到30条数据。
集群Name Server高可用测试:
Name Server做的是Rocket的寻址服务, 用于将Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对那些Broker做链接。
Name Server是一个几乎无状态的节点, Name Server之间采用Share-Nothing的设计, 互不通信。
对于一个Name Server集群列表, 客户端链接Name Server的时候会随机选择一个节点, 以做到负载均衡。
Stop 10.3.14.11:9876的Name Server服务:
监控页面的Name Server 10.3.14.11:9876 更新、刷新后还在页面,应该是监控工具的BUG
如果生产客户端只连接了一个服务异常的Name Server,就会发生异常。不能send消息。
但消费端只连接了一个服务异常的Name Server的话,不会包报异常
如果客户端代码连接所有Name Server的话,可正常生产数据。
但连接了一个异常Name Server的消费客户端,数据无法正常消费到,也不会有异常:
集群消费客户端修改为连接所有的Name Server 可正常消费到上面生产的30条数据:
测试结论:
Name Server服务 相互之间无自动发现,且都无状态。为了客户端生产消费高可用建议代码中写入全部Name Server list所有服务。并且集群部署时Name Server服务尽量最好部署3个以上 保障高可用。
- 集群事务型消息生产代码和测试:
RocketMQLog:WARN Please initialize the logger system properly.
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 0,Tag: TagA, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3210000, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=2], queueOffset=44]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 1,Tag: TagB, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3400001, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=3], queueOffset=45]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 2,Tag: TagC, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3470002, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=0], queueOffset=46]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 3,Tag: TagD, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C34E0003, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=1], queueOffset=47]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 4,Tag: TagE, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3550004, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=2], queueOffset=48]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 5,Tag: TagA, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C35B0005, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=3], queueOffset=49]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 6,Tag: TagB, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3620006, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=0], queueOffset=50]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 7,Tag: TagC, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3690007, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=1], queueOffset=51]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 8,Tag: TagD, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3700008, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=2], queueOffset=52]
Fri Dec 13 16:46:28 CST 2019消息: 事务 message bj RocketMQ 9,Tag: TagE, SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=0A06CD6359492437C6DC4165C3760009, offsetMsgId=null, messageQueue=MessageQueue [topic=tx_t002, brokerName=broker-b, queueId=3], queueOffset=53]
Process finished with exit code 0
集群模式消费端消费事务消息,没有消费到 也没有异常报错。
测试结论:
应该和集群主从模式配置有关,需要排查分析。
估计和发送消息返回状态sendStatus=SLAVE_NOT_AVAILABLE 有关。
5. RocketMQ生产端发送消息返回的4中状态:
SEND_OK, //状态成功,无论同步还是存储
FLUSH_DISK_TIMEOUT, // broker刷盘策略为同步刷盘(SYNC_FLUSH)的话时候,等待刷盘的时候超时
FLUSH_SLAVE_TIMEOUT, // master role采取同步复制策略(SYNC_MASTER)的时候,消息尝试同步到slave超时
SLAVE_NOT_AVAILABLE, //slave不可用
RocketMQ集群主从服务架构HA
- 目前社区版本存在问题
目前RocketMQ集群主从模式,只要rocketmq master服务异常或服务器宕机,就会导致主从数据不一致。不管同步刷盘,还是异步刷盘都可能出现这种问题。
异常场景:假如master服务宕机,消息不会往savle上写,这时候如果是异步刷盘,可能会导致,master,slave得数据不一直,salve可能会多比master多。当这个时候重启master,消费者会拿slave得消费进度去消费master得队列。导致出问题。
存在问题:
1.如何从新master上拉去消费进度
2.主从数据一致性怎么保证?这时候savle得数据比master数据多。数据不一致,如果继续往master上写数据,会不会有问题?
解决问题: - 如何保障主从自动切换,且切换后服务依然可用(集群、生产、消费)
- 切换后,主从数据能保证一致性
- 社区版本主从自动切换技术方案:
在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,虽然这种模式可以提供一定的高可用性但也存在比较大的缺陷。为了实现新的高可用多副本架构,RockeMQ 最终选用了基于 Raft 协议的 commitlog 存储库 DLedger。
基于 DLedger 的可以自动容灾切换的 RocketMQ 集群
RocketMQ集群2m-2s-sync部署模式架构原理图:
在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,一组 Broker 中有一个 Master,有零到多个 Slave,Slave 通过同步复制或异步复制方式去同步 Master 的数据。Master/Slave 部署模式,提供了一定的高可用性。
但这样的部署模式有一定缺陷。比如故障转移方面,如果主节点挂了还需要人为手动的进行重启或者切换,无法自动将一个从节点转换为主节点。因此,我们希望能有一个新的多副本架构,去解决这个问题。
新的多副本架构首先需要解决自动故障转移的问题,本质上来说是自动选主的问题。这个问题的解决方案基本可以分为两种:
利用第三方协调服务集群完成选主,比如 zookeeper 或者 etcd。这种方案会引入了重量级外部组件,加重部署,运维和故障诊断成本,比如在维护 RocketMQ 集群还需要维护 zookeeper 集群,并且 zookeeper 集群故障会影响到 RocketMQ 集群。
常用的分布式事务与一致性(选举)算法Paxos & raft & zab,其中zookeeper就用的zab原子广播协议,其实ZAB 也是在 Paxos 算法基础上进行扩展而来的。
利用 raft 协议来完成一个自动选主,raft 协议相比前者的优点,是它不需要引入外部组件,自动选主逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主。
因此最后选择用 raft 协议来解决这个问题,而 DLedger 就是一个基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。
DLedger 的其中一个应用就是在分布式消息系统中,RocketMQ 4.5 版本发布后,可以采用 RocketMQ on DLedger 方式进行部署。
DLedger commitlog 代替了原来的 commitlog,使得 commitlog 拥有了选举复制能力,然后通过角色透传的方式,raft 角色透传给外部 broker 角色,leader 对应原来的 Master,follower 和 candidate 对应原来的 Slave。
因此 RocketMQ 的 broker 拥有了自动故障转移的能力,如果在一组 broker 中,一个 Master 挂了以后,依靠 DLedger 自动选主的能力,会重新选出 leader,然后通过角色透传变成新的 Master。
目前没有集成DLedger,待部署
RocketMQ技术遗留和补充问题
- RocketMQ多主多从模式 从节点没有数据问题:
RocketMQ中broker配置brokcerIP1和brokerIP2的作用
brokerIP1 当前broker监听的IP
brokerIP2 存在broker主从时,在broker主节点上配置了brokerIP2的话,broker从节点会连接主节点配置的brokerIP2来同步。 - 默认不配置brokerIP1和brokerIP2时,都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP,结果默认选择的走公网IP,这是不正确的,我们期望的是所有业务内部通信都走内网。
目前在阿里云服务器中就遇到了这个问题,没有配置brokerIP2,导致默认的公网IP,slave同步找不到,数据无法同步到过去。Slave节点始终没有数据。 - 需要先配置和创建相关数据和元数据的存储地址:如下
#存储路径
storePathRootDir=/data/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/data/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/data/rocketMQ-2m2s/store-s/abort
Master目录设置:
mkdir -p /data/rocketMQ-2m2s/store
mkdir -p /data/rocketMQ-2m2s/store/commitlog
mkdir -p /data/rocketMQ-2m2s/store/consumequeue
mkdir -p /data/rocketMQ-2m2s/store/index
mkdir -p /data/rocketMQ-2m2s/store/checkpoint
mkdir -p /data/rocketMQ-2m2s/store/abort
Slave目录设置:
mkdir -p /data/rocketMQ-2m2s/store-s
mkdir -p /data/rocketMQ-2m2s/store-s/commitlog
mkdir -p /data/rocketMQ-2m2s/store-s/consumequeue
mkdir -p /data/rocketMQ-2m2s/store-s/index
mkdir -p /data/rocketMQ-2m2s/store-s/checkpoint
mkdir -p /data/rocketMQ-2m2s/store-s/abort
- 集群模式 生产端SLAVE_NOT_AVAILABLE问题
send数据 成功后,但返回的结果状态是:sendStatus=SLAVE_NOT_AVAILABLE,
主要原因:
-
- Slave 没有同步到数据
2.brokerRole=SYNC_MASTER之前为了提高性能采用ASYNC_MASTER机制,改成同步机制sync_master后返回状态是sendStatus=SEND_OK。 但其实SEND_OK并不一定代表着正常。
- Slave 没有同步到数据
- 同一个Topic中多个队列在一个master borker上,不能多master并发和生产的HA保障
原因:
1.生产是默认的4个队列,没有路由到多个master上和多个队列级别的备份
2.rocketmq源码中默认在一个broker上路由4个或4的倍数的队列数,所有在创建队列数的时候需要注意这块问题
- 集群Master节点都异常不可用后,slave能正常支持消费端消费数据
- RocketMQ消息堆积对内存占用过大,消费端负载和性能问题(读写分离问题)
RocketMQ的读写分离机制又跟上述描写的不太一致,RocketMQ 有属于自己的一套读写分离逻辑,它会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。
slaveReadEnable 值来决定是否可以从从服务器拉取消息:
如果slaveReadEnable=true,并且堆积量已经超过物理内存 40%时,则建议从从服务器拉取消息,否则还是从主服务器拉取消息;
如果 slaveReadEnable=false,则消息者只能从主服务器中拉取消息。
- RocketMQ和spring集成方案:
https://github.com/apache/rocketmq-spring
Apache RocketMQ Spring Integration https://rocketmq.apache.org/ - benchmark集群压测和服务JVM heap问题
目前测试集群 资源太小 benchmark 压测,比较吃力。
./producer.sh -t zs-cluster-001 -w 20 -s 1024 -n 10.3.14.11:9876;10.3.14.12:9876;10.3.14.241:9876
20个线程并发 +每条1kb消息 一台机器 已经卡死。
nameserver 服务本身官方默认jvm 初始值 都给的8g 我们服务器资源太小 默认起不来 直接给了2g 但并发性能 肯定会影响,并发压测 也会受限。主要在runserver.sh中
broker节点服务的 jvm heap 也是一样。在runbroker.sh脚本中
看脚本里官方默认都用的G1 垃圾回收器,说明是期望在大堆栈环境中运行的,要不G1不一定比CMS 表现好。