SpringCloud Stream消息驱动概述

SpringCloud Stream官网链接~

什么是SpringCloud Stream:

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka.

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

总之一句话就是:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

为什么使用SpringCloud Stream:

目前市面上流行的中间件MQ有:ActiveMQ、RabbitMQ、RocketMQ、Kafka这四种,这些MQ使用也会存在一定的差异,有时候项目中会使用到两种等多种组合。这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰, 我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的, 一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。

设计思想:

在没引入Spring Cloud Stream之前,标准的MQ是这样的:

  • 生产者/消费者之间靠消息媒介传递信息内容 :  Message
  • 消息必须走特定的通道:  消息通道MessageChannel
  • 消息通道里的消息如何被消费呢,谁负责收发处理: 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅SpringCloud Stream消息驱动概述

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离
通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder:

  • INPUT对应于消费者
  • OUTPUT对应于生产者

SpringCloud Stream消息驱动概述

 

Stream中的消息通信方式遵循了发布-订阅模式:

  • Topic主题进行广播
    • 在RabbitMQ就是Exchange交换机
    •  在kafka中就是Topic主题

 

Spring Cloud Stream 的标准流程套路:

  • Binder
    •  很方便的连接中间件,屏蔽差异
  • Channel
    • 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
  • Source和Sink
    •     简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
  • SpringCloud Stream消息驱动概述

SpringCloud Stream消息驱动概述

针对于上图下面列出以下编码API和常用注解

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列, 用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

 

Spring Cloud Stream 消息驱动 介绍到此就结束了,后面会有一个案例进行环境搭建演示,案例包含:RabbitMQ、steam8801消息提供者、stream8802消息消费者、stream8803消息消费者,还会演示 消息重复消费,分组消费和消息持久化的情况~