Ubuntu RabbitMQ安装及springboot集成使用

安装

rabbitMQ是用erlang语言编写的,先安装erlang

sudo apt-get install erlang-nox
#安装完成
erl

Ubuntu RabbitMQ安装及springboot集成使用
接下来添加公钥

wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -

安装rabbitMQ,安装之后会自动启动

sudo apt-get update && sudo apt-get install rabbitmq-server

安装完成后查看状态

systemctl status rabbitmq-server.service
#启动
sudo service rabbitmq-server start
#停止
sudo service rabbitmq-server stop
#重启
sudo service rabbitmq-server restart

Ubuntu RabbitMQ安装及springboot集成使用
启用web端后台管理界面

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-server restart
#查看rabbitMQ信息
sudo rabbitmqctl status

访问http://192.168.0.106:15672/#/ 用guest/guest登录

Ubuntu RabbitMQ安装及springboot集成使用
参考信息 https://www.rabbitmq.com/install-debian.html
Default user access
The broker creates a user guest with password guest. Unconfigured clients will in general use these credentials. By default, these credentials can only be used when connecting to the broker as localhost so you will need to take action before connecting from any other machine.

See the documentation on access control for information on how to create more users, delete the guest user, or allow remote access to the guest user.
Ubuntu RabbitMQ安装及springboot集成使用
官网上描述guest只能在localhost使用
方案1:添加新的用户

#添加用户
sudo rabbitmqctl  add_user admin admin
#变更密码
sudo rabbitmqctl change_password admin admin
#添加分组
sudo rabbitmqctl set_user_tags admin administrator
#查看用户
sudo rabbitmqctl  list_users

Ubuntu RabbitMQ安装及springboot集成使用
成功登录,安装完毕。
方案2:根据官方文档提示,修改rabbitmq-env.conf或创建rabbitmq.config,重启服务
参考地址 https://www.rabbitmq.com/configure.html#erlang-term-config-file

#修改rabbitmq-env.conf
 loopback_users = none
# or
#新建rabiitmq.config
 [{rabbit, [{loopback_users, []}]}].

Ubuntu RabbitMQ安装及springboot集成使用

springboot 集成使用

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.mybatis.spring.boot</groupId>-->
            <!--<artifactId>mybatis-spring-boot-starter</artifactId>-->
            <!--<version>2.0.1</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>
    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->

</project>

application-dev.yml

spring:
  rabbitmq:
    host: 192.168.0.106
    port: 5672
    username: admin
    password: admin
    listener:
      simple:
        concurrency: 10
        max-concurrency: 20
        prefetch: 5
    virtual-host: /
    publisher-confirms: true
    connection-timeout: 60000ms
    template:
      reply-timeout: 30000ms

DemoApplication.java

package com.example.demo;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
@EnableRabbit
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

AsyncConfig.java

package com.example.demo.config;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.*;

/**
 * @ClassName AsyncConfig
 * @Description TODO
 * @Date 2019/5/11 23:40
 **/
@Configuration
public class AsyncConfig {

    /**
     * 消息队列线程池
     *
     * @return
     */
    @Bean
    public ExecutorService buildConsumerQueueThreadPool() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d")
                .build();
        ExecutorService pool = new ThreadPoolExecutor(400, 400, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(512), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        return pool;
    }
}

RabbitmqConfig.java

package com.example.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.concurrent.ExecutorService;

/**
 * @ClassName RabbitmqConfig
 * @Description TODO
 * @Date 2019/5/11 22:17
 **/
@Configuration
@Slf4j
public class RabbitmqConfig {
    @Autowired
    private Environment env;
    
    @Autowired
    private ExecutorService executorService;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;


    /**
     * 单一消费者
     *
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        factory.setTaskExecutor(executorService);
        return factory;
    }

    /**
     * 多消费者
     *
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency", Integer.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency", Integer.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch", Integer.class));
        factory.setTaskExecutor(executorService);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, b, s) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, b, s));
        rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", s1, s2, i, s, message));
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("testQueue")
    public Queue testQueue(){
        return new Queue("testQueue");
    }
}

生产者

DemoApplicationTests.java

package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@ActiveProfiles("dev")
@SpringBootTest(classes = {DemoApplication.class})
public class DemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void contextLoads() {
        System.out.println("sss");

    }

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend("testQueue", "啦啦啦");
        System.out.println("发送完毕");
    }

}

运行测试方法testSend
Ubuntu RabbitMQ安装及springboot集成使用

消费者

MqListener.java

package com.example.demo.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * @ClassName MqListener
 * @Description TODO
 * @Date 2019/5/12 9:02
 **/
@Component
public class MqListener {

    @RabbitListener(queues = "testQueue", containerFactory = "singleListenerContainer")
    public void consumeQueue(@Payload Object message){
        System.out.println("消费者 message:"+ message);
    }
}

Ubuntu RabbitMQ安装及springboot集成使用