springCloud-Alibaba——stream+rabbitmq消息微服务集成

目录

(1)引入依赖:

A、引入Spring Cloud管理依赖:

B、引入Spring Cloud Alibaba管理依赖:

C、引入Spring Cloud Stream管理依赖:

D、引入rabbitmq依赖:

(2)修改配置文件:

A、生产者配置文件:

B、消费者配置文件:

(3)消息持久化配置:

(4)创建接口:

A、生产者接口:

B、消费者接口:

(5)修改启动类:

(6)编写代码:

A、发送消息代码:

B、消费消息代码:

(7)检查rabbitmq用户权限:

(8)启动测试:


(1)引入依赖:

A、引入Spring Cloud管理依赖:

springCloud-Alibaba——stream+rabbitmq消息微服务集成

B、引入Spring Cloud Alibaba管理依赖:

springCloud-Alibaba——stream+rabbitmq消息微服务集成

C、引入Spring Cloud Stream管理依赖:

springCloud-Alibaba——stream+rabbitmq消息微服务集成

D、引入rabbitmq依赖:

springCloud-Alibaba——stream+rabbitmq消息微服务集成

(2)修改配置文件:

A、生产者配置文件:

springCloud-Alibaba——stream+rabbitmq消息微服务集成

 

B、消费者配置文件:

消费者配置rabbitmq的信息与生产者配置rabbitmq的信息几乎是完全一样的,只有bindings中对应的名称不一样而已。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

 

(3)消息持久化配置:

默认的rabbitq中的消息是保存在内存中的,开发环境可以,但是生产环境是不允许的,所以建议设置消息进行持久化。设置消息持久化关键是在消费者的程序中进行配置,配置后不用手动创建queue,只要消费端项目一启动,就会在rabbitmq对应的virtual host中自动创建对应的queue了。

在上面消费者配置文件中,配置的默认消息是不持久化的,只需要在此基础之上创建一个group分组参数,那么对应的queue名称为:destination.group,按照图中的配置,那么对应的queue为:lsy-test-topic.lsy-test-queue

springCloud-Alibaba——stream+rabbitmq消息微服务集成

(4)创建接口:

A、生产者接口:

创建一个自定义生产者的binding对应的接口,在接口中写一个out()方法,返回值为MessageChannel,并在方法上添加注解Output,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

B、消费者接口:

创建一个自定义消费者的binding对应的接口,在接口中写一个input()方法,返回值为SubscribableChannel,并在方法上添加注解input,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

(5)修改启动类:

在启动类上,绑定刚刚创建的自定义binding对应的那个接口,如果项目中有多个接口,那么就绑定多个接口。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

(6)编写代码:

A、发送消息代码:

首先通过Autowired创建自定义的接口对象,然后在方法体中,通过自定义接口对象调用output()方法发送消息。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

B、消费消息代码:

创建一个实体类,类上添加service注解,在类中创建一个方法,方法有一个string类型的参数(该参数就是消息体),方法上添加StreamListener,并指定是哪个input。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

(7)检查rabbitmq用户权限:

在启动项目前,先检查java项目中连接rabbitmq使用的用户和密码是否与rabbitmq控制台中显示的用户和密码相同;

然后检查java项目中访问的rabbitmq对应的virtual host是否存在,如果不存在就手动在rabbitmq控制台中创建对应的virtual host,这个virtual host类似于mysql中不同的数据库;

最后检查java项目中连接rabbitmq使用的用户是否有权限访问对应的virtual hosts,如果没有权限,就在rabbitmq控制台中点击该用户名然后增加该用户访问该virtual host的权限。

如果上述三个地方有任何一个点有问题,那么项目就无法正常连接到rabbitmq。请在启动项目前,认真检查和确认。

(8)启动测试:

启动消息生产者、消息消费者,访问接口,然后就会显示发送消息成功,在消费者中也会消费刚才发送的消息。在rabbitmq的控制台中可以看到在exchange对应的virtual host下会有该topic,在queue中在该virtual host下有对应的queue了,queue中是为了保证在消费者异常情况下能够持久化生产者发送的消息,在消费者正常后,可以让消费者消费在异常期间的消息。然后关闭消费者项目,让生产者发送几条消息,刷新rabbitmq控制台,可以看到对应queue中有几条消息就,然后启动消费者项目,会自动消费在关闭期间的消息,然后对应queue中存储的消息就为0了。这就表明stream+rabbitmq集成成功了,并且实现了消息的持久化,可以正常发送和消费消息了。

springCloud-Alibaba——stream+rabbitmq消息微服务集成

springCloud-Alibaba——stream+rabbitmq消息微服务集成