springCloud-Alibaba——stream+rabbitmq消息微服务集成
目录
(1)引入依赖:
A、引入Spring Cloud管理依赖:
B、引入Spring Cloud Alibaba管理依赖:
C、引入Spring Cloud Stream管理依赖:
D、引入rabbitmq依赖:
(2)修改配置文件:
A、生产者配置文件:
B、消费者配置文件:
消费者配置rabbitmq的信息与生产者配置rabbitmq的信息几乎是完全一样的,只有bindings中对应的名称不一样而已。
(3)消息持久化配置:
默认的rabbitq中的消息是保存在内存中的,开发环境可以,但是生产环境是不允许的,所以建议设置消息进行持久化。设置消息持久化关键是在消费者的程序中进行配置,配置后不用手动创建queue,只要消费端项目一启动,就会在rabbitmq对应的virtual host中自动创建对应的queue了。
在上面消费者配置文件中,配置的默认消息是不持久化的,只需要在此基础之上创建一个group分组参数,那么对应的queue名称为:destination.group,按照图中的配置,那么对应的queue为:lsy-test-topic.lsy-test-queue
(4)创建接口:
A、生产者接口:
创建一个自定义生产者的binding对应的接口,在接口中写一个out()方法,返回值为MessageChannel,并在方法上添加注解Output,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。
B、消费者接口:
创建一个自定义消费者的binding对应的接口,在接口中写一个input()方法,返回值为SubscribableChannel,并在方法上添加注解input,指定一个自定义的binding常量值,这个常量值是与配置文件中写的那个自定义binding值保持一致。
(5)修改启动类:
在启动类上,绑定刚刚创建的自定义binding对应的那个接口,如果项目中有多个接口,那么就绑定多个接口。
(6)编写代码:
A、发送消息代码:
首先通过Autowired创建自定义的接口对象,然后在方法体中,通过自定义接口对象调用output()方法发送消息。
B、消费消息代码:
创建一个实体类,类上添加service注解,在类中创建一个方法,方法有一个string类型的参数(该参数就是消息体),方法上添加StreamListener,并指定是哪个input。
(7)检查rabbitmq用户权限:
在启动项目前,先检查java项目中连接rabbitmq使用的用户和密码是否与rabbitmq控制台中显示的用户和密码相同;
然后检查java项目中访问的rabbitmq对应的virtual host是否存在,如果不存在就手动在rabbitmq控制台中创建对应的virtual host,这个virtual host类似于mysql中不同的数据库;
最后检查java项目中连接rabbitmq使用的用户是否有权限访问对应的virtual hosts,如果没有权限,就在rabbitmq控制台中点击该用户名然后增加该用户访问该virtual host的权限。
如果上述三个地方有任何一个点有问题,那么项目就无法正常连接到rabbitmq。请在启动项目前,认真检查和确认。
(8)启动测试:
启动消息生产者、消息消费者,访问接口,然后就会显示发送消息成功,在消费者中也会消费刚才发送的消息。在rabbitmq的控制台中可以看到在exchange对应的virtual host下会有该topic,在queue中在该virtual host下有对应的queue了,queue中是为了保证在消费者异常情况下能够持久化生产者发送的消息,在消费者正常后,可以让消费者消费在异常期间的消息。然后关闭消费者项目,让生产者发送几条消息,刷新rabbitmq控制台,可以看到对应queue中有几条消息就,然后启动消费者项目,会自动消费在关闭期间的消息,然后对应queue中存储的消息就为0了。这就表明stream+rabbitmq集成成功了,并且实现了消息的持久化,可以正常发送和消费消息了。