SpringBoot 2.1.3 整合 RocketMQ 4.4.0完整教程
rocketMq和activeMq,rabbitMQ,kafka为目前主流的消息中间件,其中rocketMq是阿里巴巴公司开源的一个消息中间件,现已贡献给apache组织。apache官网描述:Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。它由四部分组成:名称服务器,代理,生产者和消费者。它们中的每一个都可以水平扩展而没有单一的故障点。
今天我们将讨论springboot如何整合rocketmq。
1、安装RocketMQ,参见博客:https://blog.****.net/weixin_42315600/article/details/88677674
2、引入maven依赖
<!--RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<!--PropertySource注解解析properties文件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
3、Resource目录下常见config文件夹,创建rocketmq.properties配置文件
# 指定namesrv地址
sbootitframe.rocketmq.namesrvAddr=localhost:9876
#生产者group名称
sbootitframe.rocketmq.producerGroupName=user_group
#事务生产者group名称
sbootitframe.rocketmq.transactionProducerGroupName=order_transaction
#消费者group名称
sbootitframe.rocketmq.consumerGroupName=user_consumer_group
#生产者实例名称
sbootitframe.rocketmq.producerInstanceName=user_producer_instance
#消费者实例名称
sbootitframe.rocketmq.consumerInstanceName=user_consumer_instance
#事务生产者实例名称
sbootitframe.rocketmq.producerTranInstanceName=user_producer_transacition
#一次最大消费多少数量消息
sbootitframe.rocketmq.consumerBatchMaxSize=1
#广播消费
sbootitframe.rocketmq.consumerBroadcasting=false
#消费的topic:tag
sbootitframe.rocketmq.subscribe[0]=user-topic:white
#启动的时候是否消费历史记录
sbootitframe.rocketmq.enableHistoryConsumer=false
#启动顺序消费
sbootitframe.rocketmq.enableOrderConsumer=false
4、创建配置文件实例化bean,把rocketmq.properties读取到bean中
package com.cmos.sbootitframe.web.controller.mq;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* @Function 读取配置文件信息
*/
@Component
@Configuration
@PropertySource("classpath:config/rocketmq.properties")
@ConfigurationProperties(prefix = "sbootitframe.rocketmq")
public class RocketMQProperties {
private String namesrvAddr;
private String producerGroupName;
private String transactionProducerGroupName;
private String consumerGroupName;
private String producerInstanceName;
private String consumerInstanceName;
private String producerTranInstanceName;
private int consumerBatchMaxSize;
private boolean consumerBroadcasting;
private boolean enableHistoryConsumer;
private boolean enableOrderConsumer;
private List<String> subscribe = new ArrayList<String>();
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getProducerGroupName() {
return producerGroupName;
}
public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
}
public String getTransactionProducerGroupName() {
return transactionProducerGroupName;
}
public void setTransactionProducerGroupName(String transactionProducerGroupName) {
this.transactionProducerGroupName = transactionProducerGroupName;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public String getProducerInstanceName() {
return producerInstanceName;
}
public void setProducerInstanceName(String producerInstanceName) {
this.producerInstanceName = producerInstanceName;
}
public String getConsumerInstanceName() {
return consumerInstanceName;
}
public void setConsumerInstanceName(String consumerInstanceName) {
this.consumerInstanceName = consumerInstanceName;
}
public String getProducerTranInstanceName() {
return producerTranInstanceName;
}
public void setProducerTranInstanceName(String producerTranInstanceName) {
this.producerTranInstanceName = producerTranInstanceName;
}
public int getConsumerBatchMaxSize() {
return consumerBatchMaxSize;
}
public void setConsumerBatchMaxSize(int consumerBatchMaxSize) {
this.consumerBatchMaxSize = consumerBatchMaxSize;
}
public boolean isConsumerBroadcasting() {
return consumerBroadcasting;
}
public void setConsumerBroadcasting(boolean consumerBroadcasting) {
this.consumerBroadcasting = consumerBroadcasting;
}
public boolean isEnableHistoryConsumer() {
return enableHistoryConsumer;
}
public void setEnableHistoryConsumer(boolean enableHistoryConsumer) {
this.enableHistoryConsumer = enableHistoryConsumer;
}
public boolean isEnableOrderConsumer() {
return enableOrderConsumer;
}
public void setEnableOrderConsumer(boolean enableOrderConsumer) {
this.enableOrderConsumer = enableOrderConsumer;
}
public List<String> getSubscribe() {
return subscribe;
}
public void setSubscribe(List<String> subscribe) {
this.subscribe = subscribe;
}
}
5、创建对象监听器
package com.cmos.sbootitframe.web.controller.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.context.ApplicationEvent;
import java.util.List;
/**
* 监听对象
*/
public class MessageEvent extends ApplicationEvent {
private static final long serialVersionUID = -4468405250074063206L;
private DefaultMQPushConsumer consumer;
private List<MessageExt> msgs;
public MessageEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
super(msgs);
this.consumer = consumer;
this.setMsgs(msgs);
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public List<MessageExt> getMsgs() {
return msgs;
}
public void setMsgs(List<MessageExt> msgs) {
this.msgs = msgs;
}
}
6、初始化生产者和消费者实例
package com.cmos.sbootitframe.web.controller.mq;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Function 通过使用指定的文件读取类 来加载配置文件到字段中
*/
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class RocketMQConfiguration {
private static final Logger logger = LoggerFactory.getLogger(RocketMQConfiguration.class);
@Autowired
private RocketMQProperties rocketMQProperties;
//事件监听
@Autowired
private ApplicationEventPublisher publisher = null;
private static boolean isFirstSub = true;
private static long startTime = System.currentTimeMillis();
/**
* 容器初始化的时候 打印参数
*/
@PostConstruct
public void init() {
System.err.println(rocketMQProperties.getNamesrvAddr());
System.err.println(rocketMQProperties.getProducerGroupName());
System.err.println(rocketMQProperties.getConsumerBatchMaxSize());
System.err.println(rocketMQProperties.getConsumerGroupName());
System.err.println(rocketMQProperties.getConsumerInstanceName());
System.err.println(rocketMQProperties.getProducerInstanceName());
System.err.println(rocketMQProperties.getProducerTranInstanceName());
System.err.println(rocketMQProperties.getTransactionProducerGroupName());
System.err.println(rocketMQProperties.isConsumerBroadcasting());
System.err.println(rocketMQProperties.isEnableHistoryConsumer());
System.err.println(rocketMQProperties.isEnableOrderConsumer());
System.out.println(rocketMQProperties.getSubscribe().get(0));
}
/**
* 创建普通消息发送者实例
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(
rocketMQProperties.getProducerGroupName());
producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
producer.setInstanceName(rocketMQProperties.getProducerInstanceName());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
logger.info("rocketmq producer server is starting....");
return producer;
}
/**
* 创建支持消息事务发送的实例
* @return
* @throws MQClientException
*/
@Bean
public TransactionMQProducer transactionProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(
rocketMQProperties.getTransactionProducerGroupName());
producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
producer.setInstanceName(rocketMQProperties
.getProducerTranInstanceName());
producer.setRetryTimesWhenSendAsyncFailed(10);
// 事务回查最小并发数
producer.setCheckThreadPoolMinSize(2);
// 事务回查最大并发数
producer.setCheckThreadPoolMaxSize(2);
// 队列数
producer.setCheckRequestHoldMax(2000);
producer.start();
logger.info("rocketmq transaction producer server is starting....");
return producer;
}
/**
* 创建消息消费的实例
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
rocketMQProperties.getConsumerGroupName());
consumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
consumer.setInstanceName(rocketMQProperties.getConsumerInstanceName());
//判断是否是广播模式
if (rocketMQProperties.isConsumerBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
//设置批量消费
consumer.setConsumeMessageBatchMaxSize(rocketMQProperties
.getConsumerBatchMaxSize() == 0 ? 1 : rocketMQProperties
.getConsumerBatchMaxSize());
//获取topic和tag
List<String> subscribeList = rocketMQProperties.getSubscribe();
for (String sunscribe : subscribeList) {
consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
}
// 顺序消费
if (rocketMQProperties.isEnableOrderConsumer()) {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
context.setAutoCommit(true);
msgs = filterMessage(msgs);
if (msgs.size() == 0)
return ConsumeOrderlyStatus.SUCCESS;
publisher.publishEvent(new MessageEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
} else {
// 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//过滤消息
msgs = filterMessage(msgs);
if (msgs.size() == 0)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
publisher.publishEvent(new MessageEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
try {
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
logger.info("rocketmq consumer server is starting....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return consumer;
}
/**
* 消息过滤
* @param msgs
* @return
*/
private List<MessageExt> filterMessage(List<MessageExt> msgs) {
if (isFirstSub && !rocketMQProperties.isEnableHistoryConsumer()) {
msgs = msgs.stream()
.filter(item -> startTime - item.getBornTimestamp() < 0)
.collect(Collectors.toList());
}
if (isFirstSub && msgs.size() > 0) {
isFirstSub = false;
}
return msgs;
}
}
7、为了方便演示消息发送和消费,我们创建一个简单的User类,作为发送和消费信息的载体
package com.cmos.sbootitframe.beans.dto;
import java.io.Serializable;
public class User implements Serializable {
private String id;
private String userName;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
8、创建MQ消息生产者
package com.cmos.sbootitframe.web.controller.mq;
import com.cmos.sbootitframe.beans.dto.User;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import java.util.List;
@RestController
public class ProducerController {
@Autowired
private DefaultMQProducer defaultProducer;
@Autowired
private TransactionMQProducer transactionProducer;
/**
* 发送普通消息
*/
@GetMapping("/sendMessage")
public void sendMsg() {
for(int i=0;i<100;i++){
User user = new User();
user.setId(String.valueOf(i));
user.setUserName("jhp"+i);
String json = JSON.toJSONString(user);
Message msg = new Message("user-topic","white",json.getBytes());
try {
SendResult sendResult = defaultProducer.send(msg);
System.out.println("消息id:"+sendResult.getMsgId()+":"+","+"发送状态:"+sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 发送事务消息
* @return
*/
@GetMapping("/sendTransactionMess")
public String sendTransactionMsg() {
SendResult sendResult = null;
try {
// a,b,c三个值对应三个不同的状态
String ms = "c";
Message msg = new Message("user-topic","white",ms.getBytes());
// 发送事务消息
sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> {
String value = "";
if (arg instanceof String) {
value = (String) arg;
}
if (value == "") {
throw new RuntimeException("发送消息不能为空...");
} else if (value =="a") {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (value =="b") {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}, 4);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult.toString();
}
/**
* 支持顺序发送消息
*/
@GetMapping("/sendMessOrder")
public void sendMsgOrder() {
for(int i=0;i<100;i++) {
User user = new User();
user.setId(String.valueOf(i));
user.setUserName("jhp" + i);
String json = JSON.toJSONString(user);
Message msg = new Message("user-topic", "white", json.getBytes());
try{
SendResult sendResult = defaultProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = ((Integer) arg) % mqs.size();
return mqs.get(index);
}
},i);
System.out.println("消息id:"+sendResult.getMsgId()+":"+","+"发送状态:"+sendResult.getSendStatus());
} catch (Exception e){
e.printStackTrace();
}
}
}
}
9、创建MQ消息消费者
package com.cmos.sbootitframe.mqconsumer.mq;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* 监听消息进行消费
*/
@Component
public class ConsumerService {
@EventListener(condition = "#event.msgs[0].topic=='user-topic' && #event.msgs[0].tags=='white'")
public void rocketmqMsgListener(MessageEvent event) {
try {
List<MessageExt> msgs = event.getMsgs();
for (MessageExt msg : msgs) {
System.err.println("消费消息:"+new String(msg.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
10、启动RocketMQ,参见博客:https://blog.****.net/weixin_42315600/article/details/88677674 窗口不要关闭
11、运行springboot项目进行测试,看运行日志。至此springboot整合rocketmq已经完。我的demo是producer和consumer分层,以下为我的运行日志。