websocket (java)集群方案
redis 用来存用户与服务器的关系(redis可以任意替换为一种存储形式)
服务器 A 会订阅topic 为 WebSocket-A 的消息
服务器 B 会订阅topic 为 WebSocket-B 的消息
kafka 用来接收推送消息(替换为任意一种mq)
如果用户1需要给用户3发送信息,如果在单机情况下,由于用户3没有与server-A建立链接,所以推送不到。但是现在因为在redis存储了用户3和server-B的关联关系,所以直接推送到WebSocket-B主题之中即可。由server-B接受消息并推送给用户3。
优点:
1. 解决了集群情况下跨机器通讯问题。
缺点:
1. 没有达到随意增减节点的需求,新加机器需要配置server-id。
2. 编程复杂,需要手动注册kafka-listener消息节点。
再宕机的情况下,假如A服务器宕机了,WEBSOCKET-A的消息就没有消费者了,所以只要在能重启一台服务器名继续叫做A的机器即可,因为在代码中多了一次判断,调用逻辑如下
@KafkaListener(id="server-${websocket.server.id}", topics = AppConstants.WEBSOCKET_TOPIC_PREFIX + "${websocket.server.id}", containerFactory = "containerFactory") public void handlerKafkaMessage(String data){ BaseMessage message = JsonUtils.deserialize(data, BaseMessage.class); handleDataToUser(message); } public void handleDataToUser(BaseMessage message) { if(StringUtils.isEmpty(message.getName())){ return; } String sessionMessage = cacheClient.getValue(AppConstants.WEBSOCKET_SESSION_ID_REDIS_PREFIX + message.getName()); if(StringUtils.isEmpty(sessionMessage)){ processAfterUserIsLogout(message); return; } WebSocketSessionInfo sessionInfo = JsonUtils.deserialize(sessionMessage, WebSocketSessionInfo.class); // 如果接收方连接在本机,直接发送 // 否则发送至kafka broker if(Objects.equals(sessionInfo.getServerId(), serverId)) { messagingTemplate.convertAndSendToUser(message.getName(), message.getDestination(), message.getMessageBody()); }else { kafkaTemplate.send(AppConstants.WEBSOCKET_TOPIC_PREFIX + sessionInfo.getServerId(), JsonUtils.toJson(message)); } } private void processAfterUserIsLogout(BaseMessage message) { logger.warn("用户[{}]已退出,未接收到消息[{}]", message.getName(), message.getMessageBody()); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
即即便是A服务器接收到了这条消息,我们也不确定客户是否在此中间重连了,所以需要去redis拿到当前客户真实的服务器,但是如果客户不断的重练,可能会导致消息一直发送不出去。但是这种也是能被接收的。
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.****.net/jiangjunshow
redis 用来存用户与服务器的关系(redis可以任意替换为一种存储形式)
服务器 A 会订阅topic 为 WebSocket-A 的消息
服务器 B 会订阅topic 为 WebSocket-B 的消息
kafka 用来接收推送消息(替换为任意一种mq)
如果用户1需要给用户3发送信息,如果在单机情况下,由于用户3没有与server-A建立链接,所以推送不到。但是现在因为在redis存储了用户3和server-B的关联关系,所以直接推送到WebSocket-B主题之中即可。由server-B接受消息并推送给用户3。
优点:
1. 解决了集群情况下跨机器通讯问题。
缺点:
1. 没有达到随意增减节点的需求,新加机器需要配置server-id。
2. 编程复杂,需要手动注册kafka-listener消息节点。
再宕机的情况下,假如A服务器宕机了,WEBSOCKET-A的消息就没有消费者了,所以只要在能重启一台服务器名继续叫做A的机器即可,因为在代码中多了一次判断,调用逻辑如下
@KafkaListener(id="server-${websocket.server.id}", topics = AppConstants.WEBSOCKET_TOPIC_PREFIX + "${websocket.server.id}", containerFactory = "containerFactory") public void handlerKafkaMessage(String data){ BaseMessage message = JsonUtils.deserialize(data, BaseMessage.class); handleDataToUser(message); } public void handleDataToUser(BaseMessage message) { if(StringUtils.isEmpty(message.getName())){ return; } String sessionMessage = cacheClient.getValue(AppConstants.WEBSOCKET_SESSION_ID_REDIS_PREFIX + message.getName()); if(StringUtils.isEmpty(sessionMessage)){ processAfterUserIsLogout(message); return; } WebSocketSessionInfo sessionInfo = JsonUtils.deserialize(sessionMessage, WebSocketSessionInfo.class); // 如果接收方连接在本机,直接发送 // 否则发送至kafka broker if(Objects.equals(sessionInfo.getServerId(), serverId)) { messagingTemplate.convertAndSendToUser(message.getName(), message.getDestination(), message.getMessageBody()); }else { kafkaTemplate.send(AppConstants.WEBSOCKET_TOPIC_PREFIX + sessionInfo.getServerId(), JsonUtils.toJson(message)); } } private void processAfterUserIsLogout(BaseMessage message) { logger.warn("用户[{}]已退出,未接收到消息[{}]", message.getName(), message.getMessageBody()); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
即即便是A服务器接收到了这条消息,我们也不确定客户是否在此中间重连了,所以需要去redis拿到当前客户真实的服务器,但是如果客户不断的重练,可能会导致消息一直发送不出去。但是这种也是能被接收的。