使用Spring Cloud Stream对RabbitMq进行消息发送
大家如果觉得我写的好,可以关注这个项目,之后我的学习过程中都会把学习笔记放入这个项目中。
https://github.com/IndustriousSnail/learning-notes
使用Spring Cloud Stream对RabbitMq进行消息发送
目录
一、Spring Cloud Stream简介
Spring Cloud Stream是Spring Cloud的组件之一。 它是为微服务构建消息驱动能力的框架。其架构图如图所示:>应用程序通过inputs和outputs与Stream中的Binder进行交互。Binder于中间件交互。 >对中间件进一步封装,这样代码层面对中间件无感知。也可以动态切换中间件。
二、Stream 的使用
2.1 简单使用案例
1.引入依赖<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-start-stream-rabbitmq</artifactId>
</dependency>
此处是使用RabbitMq作为中间件
2.在yaml配置中增添如下配置
spring:
rabbitmq:
password: guest
username: guest
addresses: 127.0.0.1
port: 5672 # 注意,这里是5672,不是访问界面用的15672,这个是默认值
这里取得都是默认值,如果没有改过,可以不配置此项。
3.定义Input/Output接口,用于注册Bean
public interface StreamClient { //消息接受发送接口
@Input("testMessage")
SubscribableChannel input(); //用于接受消息
@Output("testMessage")
MessageChannel output(); //用于发送消息
}
在老版本中,如果名称一致,会报错,如果出现报错,可以升级成新版本。
4.定义消息接受类
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver { //消息接受类
@StreamListener("testMessage2") //监听testMessage这个消息队列, StreamClient类中必须定义相应的Input。
public void receiver(Object message){
System.out.println("接收到消息:"+message);
}
}
5.定义消息发送Controller,测试消息发送
@RestController
public class StreamController {
@Autowired
private StreamClient streamClient;
@GetMapping("/sendMessage")
public void send(){
//org.springframework.messaging.support.MessageBuilder;
streamClient.output().send(MessageBuilder.withPayload("it is test message.").build()); //构建消息并且发送
}
}
6.访问sendMessage接口。
访问之后,可以看到消息队列中多出了一条消息。
testMessage.anonymous.qVYvqBtFQwyWwlt-ejib1g
同时,该程序也收到了这条消息,控制台打印了接受消息
接收到消息:it is test message.
此时会发现,如果我们多起几个实例,如果队列中有消息,那么监听了该队列的实例都会执行监听方法。
但是我们只想让一个实例执行即可。此时需要对实例进行分组
7.对队列进行分组,解决多个实例都接受到消息。
增加配置:
spring:
stream:
# 增加该配置,对队列进行分组。保证一个服务只有一台实例接受到消息。
bindings:
# 监听的消息队列的名称。
testMessage:
# 服务的名称
group: order
此时就可以发现,当向消息队列中发送消息时,只有一个order服务实例会接收到消息。
2.2 发送对象消息
1.将上面的String改为一个可序列化的对象即可。2.增加content-type配置,如果不增加该配置,则在rabbitmq上,看到的是被base64编码后的不可读的东西,不利于调试。
spring:
stream:
# 增加该配置,对队列进行分组。保证一个服务只有一台实例接受到消息。
bindings:
# 监听的消息队列的名称。
testMessage:
# 服务的名称
group: order
# 将发送的对象消息转化为json,方便调试
content-type: application/json
2.3 消息处理完成之后发送响应消息
使用@SendTo("响应消息的消息队列")来进行响应消息的发送。 响应内容为该方法的返回值。@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver { //消息接受类
@StreamListener("testMessage") //监听testMessage这个消息队列, StreamClient类中必须定义相应的Input。
@SendTo("responseMessage") //该注解会在消息处理完成后,向responseMessage这个队列发送消息。消息内容就是该方法的返回值。
public String receiver(Object message){
System.out.println("接收到消息:"+message);
return "处理消息完成"; //当消息处理完成之后,会将该返回值发送到@SendTo指定的responseMesssage消息队列中。
}
}