SpringCloud Stream的使用
Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 Spring Cloud Stream 目前只实现了 Kafka 和 Rabbit MQ的Binder。
以RabbitMQ为例,了解Stream的具体使用方式。
Stream集成RabbitMQ
1、导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2、在配置文件中,添加RabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3、创建一个接口
使用@Input
和@Output
注解表示消息的输入输出通道。
注意: input和output里的消息名称不能一致,在之前的比如 sprint boot 2.0.0.M3 版本是可以重复的。
同一个服务里面的名字不能一样,在不同的服务里可以相同名字的。
public interface StreamClient {
String INPUT = "myMessage";
String OUTPUT = "outMessage";
@Input(OUTPUT)
SubscribableChannel input();
@Output(INPUT)
MessageChannel output();
}
4、接收消息
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
@StreamListener(StreamClient.INPUT)
public void process(Object message){
log.info("StreamReceiver:{}",message);
}
}
5、发送消息
这里通过HTTP请求发送消息。
@RestController
public class SendMessageController {
@Autowired
private StreamClient streamClient;
@GetMapping("/sendMessage")
public void process(){
String message = "now" + new Date();
streamClient.output().send(MessageBuilder.withPayload(message).build());
}
}
启动项目之后,查看RabbitMQ的web管理页面,可以看到有一个myMessage队列。
发送 http://localhost:9080/sendMessage 请求,控制台有接收消息打印。
多个应用实例消费消息
如果启动多个应用实例时,RabbitMQ会有多个相同名称的队列产生。如图:
当其中一个应用发送消息,其他应用实例也会接收到消息,此时我们可以在配置中进行分组配置:
- outMessage是消息名称
spring:
cloud:
stream:
bindings:
outMessage:
group: product
添加该配置后,启动多个应用实例,都只有一个如下一个消息队列,这样就只会有一个应用来消费这个消息。
以上这种情况并不会发生在所有情况下,根据spring Cloud版本不同,情况不同。
RabbtiMQweb端消息查看
开发中,我们消息传递的大多数情况下都是对象,为了方便查看队列中的对象信息,可以在配置进行如下配置:
content-type:application/json
spring:
cloud:
stream:
bindings:
outMessage:
group: product
content-type: application/json
SendTo注解
如果当一个消息消费完之后,还需要回复一个消息,我们可以使用@SendTo
注解,将返回的消息转发到另一个消息队列中。
SpringCloud Stream简化开发过程中,对消息中间件的使用复杂度,开发人员只需要关注具体业务的实现。