springboot整合-ActivityMq-某个相同的消费者进行集群时的处理方法
-
业务场景:
业务系统A(生产者)
业务系统B(消费者,集群)
业务系统C(消费者,集群)
目前业务需求:
业务系统A发送一条消息
业务系统B中只有一台服务器能够接收,其他集群对象接收不到
业务系统C中只有一台服务器能够接收,其他集群对象接收不到
-
问题
使用队列模式:只会有一台服务器接收到消息,如果是cosumerA1消费了消息,则consumerA2,consumerB1,consumerB2都消费不了
使用订阅模式:4个消费者都会消费到消息 -
解决方案:
1.使用虚拟主题模式
虚拟主题类似于1对多的分支功能+消费端的cluster+failover,可以满足上述需求
2.pom引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
3.MQ配置
@Bean
public RedeliveryPolicy redeliveryPolicy() {
//配置重试机制
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setRedeliveryDelay(2000);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory factory(String url, RedeliveryPolicy redeliveryPolicy) {
//url = tcp://127.0.0.1:61616
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url);
factory.setRedeliveryPolicy(redeliveryPolicy);
return factory;
}
@Bean
JmsListenerContainerFactory<?> myJmsContainerFactory(ActiveMQConnectionFactory factory) {
DefaultJmsListenerContainerFactory listennerFactory = new DefaultJmsListenerContainerFactory();
listennerFactory.setConnectionFactory(factory);
listennerFactory.setConcurrency("1-10");
//重连间隔时间
listennerFactory.setRecoveryInterval(5000L);
listennerFactory.setSessionAcknowledgeMode(4);
return listennerFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory) {
JmsTemplate jmsTemplate = new JmsTemplate();
//设置持久化,1 非持久, 2 持久化
jmsTemplate.setDeliveryMode(2);
jmsTemplate.setConnectionFactory(factory);
//消息确认模式
jmsTemplate.setSessionAcknowledgeMode(4);
return jmsTemplate;
}
4.producer配置
@Resource
private JmsTemplate jmsTemplate;
public void publish(String message) {
Destination destination = new ActiveMQTopic("VirtualTopic.TEST");
System.out.println("============>>>>> 发布topic消息 " + message);
HashMap<String, Object> header = Maps.newHashMap();
jmsTemplate.convertAndSend(destination, message);
}
5.consumer配置
@JmsListener(destination = "Consumer.A.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
public void subscribe(final TextMessage text, Session session) throws JMSException {
try {
System.out.println("===========<<<<<<<<收到订阅的消息A:" + text.getText());
text.acknowledge();
} catch (Throwable e) {
session.recover();
}
}
@JmsListener(destination = "Consumer.A.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
public void subscribe2(final TextMessage text, Session session) throws JMSException {
try {
System.out.println("===========<<<<<<<<收到订阅的消息A1:" + text.getText());
text.acknowledge();
} catch (Throwable e) {
session.recover();
}
}
@JmsListener(destination = "Consumer.B.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
public void subscribe1(final TextMessage text, Session session) throws JMSException {
try {
System.out.println("===========<<<<<<<<收到订阅的消息B:" + text.getText());
// 此处抛异常,用于测试重试机制
// throw new RuntimeException("测试异常");
} catch (Throwable e) {
System.out.println(e.getMessage());
session.recover();
}
}
如上述,即可满足需求
参考文档:
1.https://blog.****.net/kimmking/article/details/9773085