springboot整合-ActivityMq-某个相同的消费者进行集群时的处理方法

  • 业务场景:
    业务系统A(生产者)
    业务系统B(消费者,集群)
    业务系统C(消费者,集群)
    目前业务需求:
    业务系统A发送一条消息
    业务系统B中只有一台服务器能够接收,其他集群对象接收不到
    业务系统C中只有一台服务器能够接收,其他集群对象接收不到

springboot整合-ActivityMq-某个相同的消费者进行集群时的处理方法

  • 问题
    使用队列模式:只会有一台服务器接收到消息,如果是cosumerA1消费了消息,则consumerA2,consumerB1,consumerB2都消费不了
    使用订阅模式:4个消费者都会消费到消息
  • 解决方案:
    1.使用虚拟主题模式
    虚拟主题类似于1对多的分支功能+消费端的cluster+failover,可以满足上述需求
    2.pom引入
 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.7.0</version>
        </dependency>

3.MQ配置

 	@Bean
    public RedeliveryPolicy redeliveryPolicy() {
    	//配置重试机制
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次   这里设置为10次
        redeliveryPolicy.setMaximumRedeliveries(10);
        //重发时间间隔,默认为1秒
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setRedeliveryDelay(2000);
        //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }

    @Bean
    public ActiveMQConnectionFactory factory(String url, RedeliveryPolicy redeliveryPolicy) {
    	//url = tcp://127.0.0.1:61616
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url);
        factory.setRedeliveryPolicy(redeliveryPolicy);
        return factory;
    }


    @Bean
    JmsListenerContainerFactory<?> myJmsContainerFactory(ActiveMQConnectionFactory factory) {
        DefaultJmsListenerContainerFactory listennerFactory = new DefaultJmsListenerContainerFactory();
        listennerFactory.setConnectionFactory(factory);
        listennerFactory.setConcurrency("1-10");
        //重连间隔时间
        listennerFactory.setRecoveryInterval(5000L);
        listennerFactory.setSessionAcknowledgeMode(4);
        return listennerFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        //设置持久化,1 非持久, 2 持久化
        jmsTemplate.setDeliveryMode(2);
        jmsTemplate.setConnectionFactory(factory);
        //消息确认模式
        jmsTemplate.setSessionAcknowledgeMode(4);
        return jmsTemplate;
    }

4.producer配置

    @Resource
    private JmsTemplate jmsTemplate;
    public void publish(String message) {
        Destination destination = new ActiveMQTopic("VirtualTopic.TEST");
        System.out.println("============>>>>> 发布topic消息 " + message);
        HashMap<String, Object> header = Maps.newHashMap();
        jmsTemplate.convertAndSend(destination, message);
    }

5.consumer配置

@JmsListener(destination = "Consumer.A.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
    public void subscribe(final TextMessage text, Session session) throws JMSException {
        try {
            System.out.println("===========<<<<<<<<收到订阅的消息A:" + text.getText());
            text.acknowledge();
        } catch (Throwable e) {
            session.recover();
        }
    }

    @JmsListener(destination = "Consumer.A.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
    public void subscribe2(final TextMessage text, Session session) throws JMSException {
        try {
            System.out.println("===========<<<<<<<<收到订阅的消息A1:" + text.getText());
            text.acknowledge();
        } catch (Throwable e) {
            session.recover();
        }
    }


   @JmsListener(destination = "Consumer.B.VirtualTopic.TEST", containerFactory = "myJmsContainerFactory", subscription = "1")
    public void subscribe1(final TextMessage text, Session session) throws JMSException {
        try {
            System.out.println("===========<<<<<<<<收到订阅的消息B:" + text.getText());
            // 此处抛异常,用于测试重试机制
            // throw new RuntimeException("测试异常");
        } catch (Throwable e) {
            System.out.println(e.getMessage());
            session.recover();
        }
    }

如上述,即可满足需求

参考文档:
1.https://blog.****.net/kimmking/article/details/9773085