Spring Websocket+SockJS+STOMP 实现即时通信(五)—— UserRegistryMessageHandler与NoOpMessageHandler
UserRegistryMessageHandler
- 用来处理来自其他应用服务的用户注册表广播,同时定期地广播本地用户注册表的内容;
- 用户注册表的聚合信息,被维护在一个MultiServerUserRegistry成员变量中;
- 无需订阅MessageChannel,所以没有实现
SmartLiftCycle
接口;
处理来自其它应用服务的注册表广播
UserRegistryMessageHandler :
public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageConverter converter = this.brokerTemplate.getMessageConverter();
this.userRegistry.addRemoteRegistryDto(message, converter, getRegistryExpirationPeriod());
}
}
定期广播本地注册表
UserRegistryMessageHandler :
public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
private final UserRegistryTask schedulerTask = new UserRegistryTask();
@Nullable
private volatile ScheduledFuture<?> scheduledFuture;
private long registryExpirationPeriod = TimeUnit.SECONDS.toMillis(20);
@Override
public void onApplicationEvent(BrokerAvailabilityEvent event) {
if (event.isBrokerAvailable()) {
long delay = getRegistryExpirationPeriod() / 2;
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this.schedulerTask, delay);
}
else {
ScheduledFuture<?> future = this.scheduledFuture;
if (future != null ){
future.cancel(true);
this.scheduledFuture = null;
}
}
}
private class UserRegistryTask implements Runnable {
@Override
public void run() {
try {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
accessor.setLeaveMutable(true);
Object payload = userRegistry.getLocalRegistryDto();
brokerTemplate.convertAndSend(getBroadcastDestination(), payload, accessor.getMessageHeaders());
}
finally {
userRegistry.purgeExpiredRegistries();
}
}
}
}
使用场景
启用代理中继,连接外部消息代理。 通常情况下,为集群模式,不止一个应用服务程序 —— 即不止一个代理中继,如下图所示:
使用条件
配置UserRegistryBroadcast目的地。
在AbstractMessageBrokerConfiguration
类的userRegistryMessageHandler()Bean方法中,我们可以看到,当UserRegistryBroadcast目的地没被设置的时候,将会new NoOpMessageHandler()。所以必须满足两个条件:
- 启用代理中继;
- 配置UserRegistryBroadcast广播地址;
AbstractMessageBrokerConfiguration:
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
@Bean
public MessageHandler userRegistryMessageHandler() {
if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
return new NoOpMessageHandler();
}
SimpUserRegistry userRegistry = userRegistry();
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
messageBrokerTaskScheduler());
}
}
启用配置
WebSocketMessageBrokerConfigurer实现类:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfigurer implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("topic").setUserRegistryBroadcast("/topic/registry");
}
}
NoOpMessageHandler
“无操作的消息处理器” —— 在上面AbstractMessageBrokerConfiguration
类的userRegistryMessageHandler()Bean方法中,我们可以看到,当UserRegistryBroadcast目的地没被设置的时候,将会new NoOpMessageHandler()。
AbstractMessageBrokerConfiguration :
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
private static class NoOpMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) {
}
}
}
说白了,NoOpMessageHandler就是在没有配置UserRegistryBroadcast广播地址时,用来代替UserRegistryMessageHandler的 ——
- 不订阅MessageChannel;
- 不做任何Message处理;