一文搞懂WebSocket

  • WebSocket的概念

WebSocket是一种全双工、双向、单套接字链接的通讯协议,是HTML5平台上关键的通讯组件。

http协议只能是由client发送请求,server端接收请求,然后返回响应给client,http是基于TCP的半双工协议,且是无状态链接。

WebSocket在与服务器建立连接后,可以长时间保持此链接,服务端和客户端可以完成多从通讯。

WebSocket可以构建真正意义上的实时应用程序,相比http来说,极大的提升用户体验。

  • 为什么需要WebSocket

很长时间以来,http协议是web浏览器上应用最广的应答式超文本传送协议。http在1990年问世,那时还没有建立起正式的规范标准,因此这一阶段的http即被人们称为http/0.9。

http/1.0正式作为标准被公布是在1996年5月,并记载于RFC1945。

http/1.1在1997年1月被公布,同时记载于RFC2068。截止目前,http/1.1是主流的http协议版本。

 

http/1.0

http/1.1

连接复用

单一连接

单一连接,可以重复使用

持久连接

处理完请求后立即断开

支持持久连接,一个连接可以同时完成多个请求和响应

身份认证

不支持

支持

其他

 

状态管理、Cache缓存、host字段、Chunked transfer-coding等

 

无论http如何发展,但其半双工、客户端请求(服务端不能主动推送)的弱点,一直使http不能有效完成实时应用程序对数据及时性的需求,尽管工程师们从http上衍生出http轮询、长轮询和流化技术。

 

WebSocket可以节约带宽、CPU资源并减少延迟;

WebSocket使实时通讯变得异常简单,相较于http上实现实时通知的技术,WebSocket带来了革命性的改变;

WebSocket是底层网络协议,可以在此基础上构建其他标准协议,例如可以使用相同的XMPP over WebSocket客户端登录不同的聊天服务器;

  • WebSocket的应用场景

鉴于WebSocket的实时功能,可以构建聊天、IM、协作文档编辑、股票交易、多人在线游戏等应用程序

  • WebSocket的通讯模型

 

一文搞懂WebSocket

  1. WebSocket第一次完成握手时,依然应用HTTP协议;但是会包含一个特殊的首标Upgrade:websocket,表示客户端将把连接升级到WebSocket协议;
  2. 服务端响应请求,返回Connection:Upgrade首标和Upgrade:WebSocket首标,完成握手;
  3. 客户端再次发送升级协议后的请求,此时http的头信息不再发送;
  4. 服务端保持该连接,用于后续多次数据更新推送数据给客户端。
  • WebSocket的API

在调用WebSocket API之前,你需要构造WebSocket的对象,最简单的构造函数只需要传递一个链接地址的URL即可(或者通过protocols形参传递一个或一组协议名称)。

WebSocket API是真正意义上的事件驱动。一旦握手完成,当服务器有需要发送的数据,或者客户端关心的资源将要改变状态时,它便自动发送数据或者通知。

  1. 构造函数:var ws = new WebSocket(“ws://www.websocket.org”);

以上通过构造函数创建了一个WebSocket对象,在连接上服务器时,可以选择使用第二个参数列出应用程序支持的协议,用于协议协商,客户端和服务端只能使用一种协议。

如:var ws = new WebSocket(“ws://www.websocket.org”, [“protocol1”, “protocol2”]);

proctcols指定三种协议:RFC 6455注册过的标准协议;开放协议,如XMPP和STOMP;自定义协议。

  1. WebSocket事件:WebSocket对象调度4个事件,

open:服务器已经建立了连接

message:接收到服务器发送的消息,除了文本,消息还可以是二进制数据

error:响应发生意外故障时触发,导致WebSocket连接关闭

close:连接关闭时触发

如服务器响应了WebSocket连接请求,触发open事件并建立一个连接。

ws.open = function(e) {

  console.log(“Connection open …”);

};

  1. WebSocket方法:send()和close()
  2. WebSocket对象特性:
  • readyState:WebSocket对象的只读特性,表示其连接状态;

特性常量

取值

状态

WebSocket.CONNECTING

0

连接正在进行中,但还未建立

WebSocket.OPEN

1

连接已经建立。消息可以在客户端和服务器之间传递

WebSocket.CLOSING

2

连接正在进行关闭握手

WebSocket.CLOSED

3

连接已经关闭,不能打开

  • bufferedAmount:检查发往服务器的缓冲数据量,即已经进入队列,但还未发送到服务器的字节数
  • protocol:标示服务器选择的用于与客户端通讯的协议名称,在握手之前,该属性值为空,如果服务器没有选择客户端提供的某个协议,该属性值保持为空值。
  • WebSocket协议
  1. 初始握手

客户端以http方式发起首次握手,与http协议很相似,会在请求头中包含一个特殊的首标Upgrade:websocket,服务端返回101响应代码、Upgrade首标和Sec-WebSocket-Accept首标,表明连接成功。其中Sec-WebSocket-Accept响应首标的值从Sec-WebSocket-Key请求首标继承而来,包含一个特殊的响应键值,需要与客户端的预期精确匹配。

  1. WebSocket消息格式

理论上WebSocket消息可以承载任意长度的数据,但是WebSocket为了传送数据的效率考量,将消息分解为若干帧

 

WebSocket帧格式包括:

一文搞懂WebSocket

  • 操作码:由帧头第一个字节最后4bit组成,表示不同的消息载荷类型,1(文本)、2(二进制)、8(关闭握手)、9(ping)、10(pong)
  • 长度:WebSocket利用帧头第二个字节的后7bit表示小于126字节的数据长度,后面额外的2字节表示126—216字节的数据长度,大于216字节的数据长度,利用后8字节表示,总之WebSocket的数据长度用可变长字节表示。
  • 解码文本:消息数据用8位UCS转换格式即UTF-8编码;
  • 屏蔽:是为了安全原因(如防止跨协议攻击),以及改进与现有http代理的兼容性;
  • 多帧消息:发送多帧消息,可以发送一个fin位设置为0 的帧,最后一帧的fin位设置为1,表示消息以这一帧的载荷作为结束。
  1. WebSocket关闭握手

当WebSocket关闭时,终止连接的断点发送一个关闭代码和UTF-8编码的关闭原因短字符串。具体可以参考《HTML5 WebSocket权威指南》一书。

  1. 其他协议的支持

客户端可以通过Sec-WebSocket-Protocol首标,传递多个协议,服务器可以在支持的协议中选取一个,用于更高级的协议传递消息。

  1. 协议扩展

不同于协议协商,协议扩展可以有多个,扩展可以为帧格式添加新的操作码和数据字段,协议扩展是通过Sec-WebSocket-Extensions首标完成的。

  • WebSocket Demo

光说不练是假把式,下面咱们就以一个简单代码示例,演示一个简单的基于WebSocket的Web应用程序。

天猫双十一,全天成交额2684亿,还记得那块大屏幕上不断滚动刷新的交易额数字吧,像这种典型的短时间内飞快刷新的数字,如果要实现最新数字的实时呈现,如果靠客户端轮询,是不太准确的,就算每秒钟轮询刷新一次,可能你看到的数字也比实际交易额落后了不知道多少千万?因为一分钟内,天猫交易额就达到65亿,这还不包括“还在路上的正飞奔赶来的订单”。

下面我就以此为参考,制作一个计数显示器,每一秒钟增加1,数据值来自后台WebSocket服务端。

 

  • 前端代码

<!DOCTYPE html>

<html>

<head>

    <meta charset="UTF-8">

    <title>Netty WebSocket  秒计数器</title>

</head>

  <body>

  <script type="text/javascript">

      var socket;

      if (!window.WebSocket) {

          window.WebSocket = window.MozWebSocket;

      }

      if (window.WebSocket) {

          socket = new WebSocket("ws://localhost:8090/websocket");

          socket.onmessage = function (event) {

              var ta = document.getElementById('counter');

              ta.innerText = parseInt2DashString(event.data);

          };

          socket.onopen = function (event) {

              var ta = document.getElementById('counter');

              ta.innerText = "----";

          };

          socket.onclose = function (event) {

              var ta = document.getElementById('counter');

              ta.innerText = "";

          };

      }

      else {

          alert("抱歉,您的浏览器不支持WebSocket协议!");

      }

 

      function send(message) {

          if (!window.WebSocket) {

              return;

          }

          if (socket.readyState == WebSocket.OPEN) {

              socket.send(message);

          }

          else {

              alert("WebSocket连接没有建立成功!");

          }

 

      }

 

      function parseInt2DashString(d) {

          if (isNaN(d)) {

              return "----";

          } else if (parseInt(d) < 10) {

              return "---" + d;

          } else if (parseInt(d) < 100) {

              return "--" + d;

          } else if (parseInt(d) < 1000) {

              return "-" + d;

          } else {

              return d;

          }

      }

  </script>

 

    <div style="margin-top: 100px">

             <div style="width: 20%; float: left">

                       <input type="button" οnclick="send('start')" value="START" />

                       <input type="button" οnclick="send('stop')" value="STOP" />

                       <input type="button" οnclick="send('reset')" value="RESET" />

             </div>

             <div>

                       计数器:&nbsp;&nbsp; <label id="counter">----</label>

             </div>

    </div>

  </body>

</html>

 

  • 后端代码
  1. Netty WebSocket Server启动类

package com.constantine;

 

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {

 

public void run(int port) throws Exception {

     EventLoopGroup bossGroup = new NioEventLoopGroup();

     EventLoopGroup workerGroup = new NioEventLoopGroup();

     try {

         ServerBootstrap b = new ServerBootstrap();

         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

         .childHandler(new ChannelInitializer<SocketChannel>() {

 

             @Override

             protected void initChannel(SocketChannel ch) throws Exception {

                 ChannelPipeline pipeline = ch.pipeline();

                 pipeline.addLast("http-codec", new HttpServerCodec());

                 pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

                 pipeline.addLast("http-chunked", new ChunkedWriteHandler());

                 pipeline.addLast("handler", new WebSocketServerHandler());

             }

         });

         Channel ch = b.bind(port).sync().channel();

         System.out.println("Web socket server started at port " + port + ".");

         System.out.println("Open your browser and navigate to http://localhost:" + port + "/");

         ch.closeFuture().sync();

     } catch (Exception e) {

         e.printStackTrace();

     } finally {

         bossGroup.shutdownGracefully();

         workerGroup.shutdownGracefully();

     }

}

 

public static void main(String[] args) throws Exception {

     int port = 8090;

     if (args.length > 0) {

         try {

         port = Integer.parseInt(args[0]);

         } catch (NumberFormatException e) {

             e.printStackTrace();

         }

     }

     new WebSocketServer().run(port);

}

 

}

  1. Netty WebSocketServer处理类

package com.constantine;

 

import java.util.Timer;

/*import java.util.logging.Level;

import java.util.logging.Logger;*/

 

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.DefaultFullHttpResponse;

import io.netty.handler.codec.http.FullHttpRequest;

import io.netty.handler.codec.http.FullHttpResponse;

import io.netty.handler.codec.http.HttpHeaders;

import io.netty.handler.codec.http.HttpResponseStatus;

import io.netty.handler.codec.http.HttpVersion;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;

import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;

import io.netty.util.CharsetUtil;

 

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

   // private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());

 

   private WebSocketServerHandshaker handshaker;

   private static int counter = 0;

   private boolean stop = false;

   private Timer timer = new Timer();

   private WebSocketTimerTask timerTask = null;

  

  

   public static int getCounter() {

            return counter;

   }

 

   public static void setCounter(int counter) {

            WebSocketServerHandler.counter = counter;

   }

  

 

   @Override

   protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {

            if (msg instanceof FullHttpRequest) {

                     handleHttpRequest(ctx, (FullHttpRequest) msg);

            } else if (msg instanceof WebSocketFrame) {

                     if (msg instanceof TextWebSocketFrame) {

 

                               String request = ((TextWebSocketFrame) msg).text();

                               switch (request.toUpperCase()) {

                               case "START":

                                        stop = false;

                                        if (counter < 10000 && !stop) {

                                                 timerTask = new WebSocketTimerTask(ctx, (WebSocketFrame) msg, counter, handshaker);

                                                 timer.schedule(timerTask, 0, 1000);

                                        }

                                        break;

                               case "STOP":

                                        stop = true;

                                        if (timerTask != null) {

                                                 timerTask.cancel();

                                                 timerTask = null;

                                        }

                                        break;

                               case "RESET":

                                        counter = 0;

                                        stop = true;

                                        if (timerTask != null) {

                                                 timerTask.cancel();

                                                 timerTask = null;

                                        }

                                        break;

                               default:

                                        break;

                               }

                     }

            }

   }

 

   private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

            if (!req.getDecoderResult().isSuccess() || !"websocket".equals(req.headers().get("Upgrade"))) {

                     sendHttpResponse(ctx, req,

                                        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

                     return;

            }

            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

                               "ws://localhost:8090/websocket", null, false);

 

            handshaker = wsFactory.newHandshaker(req);

            if (handshaker == null) {

               WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());

            } else {

                     handshaker.handshake(ctx.channel(), req);

            }

   }

 

   private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {

            if (res.getStatus().code() != HttpResponseStatus.OK.code()) {

                     ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);

                     res.content().writeBytes(buf);

                     buf.release();

                     HttpHeaders.setContentLength(res, res.content().readableBytes());

            }

            ChannelFuture f = ctx.channel().writeAndFlush(res);

 

            if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {

                     f.addListener(ChannelFutureListener.CLOSE);

            }

   }

 

   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

            cause.printStackTrace();

            ctx.close();

   }

 

   @Override

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

            ctx.flush();

   }

 

}

 

  1. 定时任务处理器,每秒钟执行一次数值更新

package com.constantine;

 

import java.util.TimerTask;

 

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;

import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;

import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;

 

public class WebSocketTimerTask extends TimerTask {

 

ChannelHandlerContext ctx;

WebSocketFrame frame;

int counter;

WebSocketServerHandshaker handshaker;

 

public ChannelHandlerContext getCtx() {

     return ctx;

}

 

public void setCtx(ChannelHandlerContext ctx) {

     this.ctx = ctx;

}

 

public WebSocketFrame getFrame() {

     return frame;

}

 

public void setFrame(WebSocketFrame frame) {

     this.frame = frame;

}

 

public int getCounter() {

     return counter;

}

 

public void setCounter(int counter) {

     this.counter = counter;

}

 

public WebSocketServerHandshaker getHandshaker() {

     return handshaker;

}

 

public void setHandshaker(WebSocketServerHandshaker handshaker) {

     this.handshaker = handshaker;

}

 

public WebSocketTimerTask(ChannelHandlerContext ctx, WebSocketFrame frame, int counter,   WebSocketServerHandshaker handshaker) {

     this.ctx = ctx;

     this.frame = frame;

     this.counter = counter;

     this.handshaker = handshaker;

}

 

@Override

public void run() {

     this.handleWebSocketFrame(ctx, frame);

}

 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

 

        if(frame instanceof CloseWebSocketFrame){

            handshaker.close(ctx.channel(),

                    (CloseWebSocketFrame)frame.retain());

            return;

        }

 

        if(frame instanceof PingWebSocketFrame){

            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

            return;

        }

 

        if(!(frame instanceof TextWebSocketFrame)){

            throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName()));

        }     

 

        ctx.channel().write(new TextWebSocketFrame(String.valueOf(counter++)));

        ctx.channel().flush();

        WebSocketServerHandler.setCounter(counter);

    }

 

}

关于如何应用Netty框架构建WebSocket的服务器,有兴趣的同学可以参考相关书籍学习,代码比较简单,也不够完善,供大家参考,欢迎批评指正。