从0开始学Spring Boot集成Rabbit MQ
目录
概述
记录一次学习RabbitMQ使用的实例,整个过程也是学习的过程,一起学习,一起进步。
开发环境
JDK1.8、Spring Boot 1.5.9.RELEASE、RabbitMQ 3.7.4
Windows下RabbitMQ安装
RabbitMQ是基于Erlang语言开发的,所以我们想要安装还得先安装Erlang的环境。
下载地址
Erlang官网下载地址:https://www.erlang.org/downloads
RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html
安装步骤
安装Erlang,默认选项就行,一直点下去,如下图所示:
安装rabbitmq,如下图所示:
默认一直点下去,如果有杀毒软件提示安装服务,请点击允许,如下图所示:
至此服务安装完毕
访问RabbitMQ管理控制台
网址: http://localhost:15672,用户名:guest,密码:guest
如图所示:
登录成功后看到如下界面:
Spring Boot项目创建
推荐spring官方快速创建SpringBoot项目的一个在线网站
在线即可完成Spring Boot项目的创建。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>demo</name>
<description>Spring Boot & Rabbit MQ集成示例</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath />
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.18</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<debug>false</debug>
</configuration>
</plugin>
</plugins>
</build>
</project>
Spring Boot启动类
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
开发及配置RabbitMQ
我们采用了监听方式来消费消息,消息类型Topic
RabbitnqConfiguration
package com.example.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfiguration {
//对象序列化
@Bean
MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
创建一个RabbitMQ消费者
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Rabbit消费者
*/
@Component
public class RabbitmqConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = "${my_exchange}", type = "topic", durable = "true"),
value = @Queue(value = "${my_queue}", durable = "true"),
key = "${my_routing_key}"
)
)
public void consumer(String body, Channel channel, Message message) {
try {
//打印接收到的消息
System.out.println("body=" + body);
/**
* 告诉服务器收到这条消息 已经被我消费了
* 可以在队列删掉 这样以后就不会再发了
* 否则消息服务器以为这条消息没处理掉 后续还会在发
**/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
logger.error("error:", e);
}
}
}
创建一个RabbitMQ生产者
package com.example.producter;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* 消息生产者
*/
@RestController
public class RabbitmqProducter {
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${my_exchange}")
private String exchange;
@Value("${my_routing_key}")
private String routingKey;
//发送MQ交易通知消息
@RequestMapping(value = "/api/sender", method = RequestMethod.POST)
public void sender(@RequestBody String body) {
//发送MQ消息
rabbitTemplate.convertAndSend(exchange, routingKey, body);
}
}
最后配置SpringBoot文件
rabbitmq:
username: guest
password: guest
host: 127.0.0.1
port: 5672
#支持发布确认
publisher-confirms: true
#支持发布返回
publisher-returns: true
listener:
# prefetch: 10
simple:
# 采用手动应答
acknowledge-mode: manual
# 当前监听容器数
concurrency: 10
# 最大数
max-concurrency: 20
# 是否支持重试
retry:
enabled: true
my_exchange: abcdefg
my_queue: q123456
my_routing_key: k888888
小结
@RabbitListener是整个过程的核心
RabbitTemplate是我们的核心对象
希望能给刚接触RabbitMQ的小伙伴一点帮助~