SpringCloud Stream的使用

Spring Cloud Stream 是一个构建消息驱动微服务的框架。
SpringCloud Stream的使用
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 Spring Cloud Stream 目前只实现了 Kafka 和 Rabbit MQ的Binder。

以RabbitMQ为例,了解Stream的具体使用方式。

Stream集成RabbitMQ

1、导入依赖

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

2、在配置文件中,添加RabbitMQ配置

rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3、创建一个接口
使用@Input@Output注解表示消息的输入输出通道。
注意: input和output里的消息名称不能一致,在之前的比如 sprint boot 2.0.0.M3 版本是可以重复的。
同一个服务里面的名字不能一样,在不同的服务里可以相同名字的。

public interface StreamClient {
   String INPUT = "myMessage";
    String OUTPUT = "outMessage";
    @Input(OUTPUT)
    SubscribableChannel input();

    @Output(INPUT)
    MessageChannel output();
}

4、接收消息

@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
    @StreamListener(StreamClient.INPUT)
    public void process(Object message){
        log.info("StreamReceiver:{}",message);
    }
}

5、发送消息
这里通过HTTP请求发送消息。

@RestController
public class SendMessageController {
    @Autowired
    private StreamClient streamClient;

    @GetMapping("/sendMessage")
    public void process(){
        String message = "now" + new Date();
        streamClient.output().send(MessageBuilder.withPayload(message).build());
    }
}

启动项目之后,查看RabbitMQ的web管理页面,可以看到有一个myMessage队列。
SpringCloud Stream的使用
发送 http://localhost:9080/sendMessage 请求,控制台有接收消息打印。
SpringCloud Stream的使用

多个应用实例消费消息

如果启动多个应用实例时,RabbitMQ会有多个相同名称的队列产生。如图:
SpringCloud Stream的使用
当其中一个应用发送消息,其他应用实例也会接收到消息,此时我们可以在配置中进行分组配置:

  • outMessage是消息名称
spring:
  cloud:
    stream:
      bindings:
         outMessage:
            group: product

添加该配置后,启动多个应用实例,都只有一个如下一个消息队列,这样就只会有一个应用来消费这个消息。
SpringCloud Stream的使用
以上这种情况并不会发生在所有情况下,根据spring Cloud版本不同,情况不同。

RabbtiMQweb端消息查看

开发中,我们消息传递的大多数情况下都是对象,为了方便查看队列中的对象信息,可以在配置进行如下配置:

  • content-type:application/json
spring:
  cloud:
    stream:
      bindings:
         outMessage:
            group: product
            content-type: application/json

SendTo注解

如果当一个消息消费完之后,还需要回复一个消息,我们可以使用@SendTo 注解,将返回的消息转发到另一个消息队列中。
SpringCloud Stream的使用
SpringCloud Stream简化开发过程中,对消息中间件的使用复杂度,开发人员只需要关注具体业务的实现。