一文搞懂WebSocket
- WebSocket的概念
WebSocket是一种全双工、双向、单套接字链接的通讯协议,是HTML5平台上关键的通讯组件。
http协议只能是由client发送请求,server端接收请求,然后返回响应给client,http是基于TCP的半双工协议,且是无状态链接。
WebSocket在与服务器建立连接后,可以长时间保持此链接,服务端和客户端可以完成多从通讯。
WebSocket可以构建真正意义上的实时应用程序,相比http来说,极大的提升用户体验。
- 为什么需要WebSocket
很长时间以来,http协议是web浏览器上应用最广的应答式超文本传送协议。http在1990年问世,那时还没有建立起正式的规范标准,因此这一阶段的http即被人们称为http/0.9。
http/1.0正式作为标准被公布是在1996年5月,并记载于RFC1945。
http/1.1在1997年1月被公布,同时记载于RFC2068。截止目前,http/1.1是主流的http协议版本。
|
http/1.0 |
http/1.1 |
连接复用 |
单一连接 |
单一连接,可以重复使用 |
持久连接 |
处理完请求后立即断开 |
支持持久连接,一个连接可以同时完成多个请求和响应 |
身份认证 |
不支持 |
支持 |
其他 |
|
状态管理、Cache缓存、host字段、Chunked transfer-coding等 |
无论http如何发展,但其半双工、客户端请求(服务端不能主动推送)的弱点,一直使http不能有效完成实时应用程序对数据及时性的需求,尽管工程师们从http上衍生出http轮询、长轮询和流化技术。
WebSocket可以节约带宽、CPU资源并减少延迟;
WebSocket使实时通讯变得异常简单,相较于http上实现实时通知的技术,WebSocket带来了革命性的改变;
WebSocket是底层网络协议,可以在此基础上构建其他标准协议,例如可以使用相同的XMPP over WebSocket客户端登录不同的聊天服务器;
- WebSocket的应用场景
鉴于WebSocket的实时功能,可以构建聊天、IM、协作文档编辑、股票交易、多人在线游戏等应用程序
- WebSocket的通讯模型
- WebSocket第一次完成握手时,依然应用HTTP协议;但是会包含一个特殊的首标Upgrade:websocket,表示客户端将把连接升级到WebSocket协议;
- 服务端响应请求,返回Connection:Upgrade首标和Upgrade:WebSocket首标,完成握手;
- 客户端再次发送升级协议后的请求,此时http的头信息不再发送;
- 服务端保持该连接,用于后续多次数据更新推送数据给客户端。
- WebSocket的API
在调用WebSocket API之前,你需要构造WebSocket的对象,最简单的构造函数只需要传递一个链接地址的URL即可(或者通过protocols形参传递一个或一组协议名称)。
WebSocket API是真正意义上的事件驱动。一旦握手完成,当服务器有需要发送的数据,或者客户端关心的资源将要改变状态时,它便自动发送数据或者通知。
- 构造函数:var ws = new WebSocket(“ws://www.websocket.org”);
以上通过构造函数创建了一个WebSocket对象,在连接上服务器时,可以选择使用第二个参数列出应用程序支持的协议,用于协议协商,客户端和服务端只能使用一种协议。
如:var ws = new WebSocket(“ws://www.websocket.org”, [“protocol1”, “protocol2”]);
proctcols指定三种协议:RFC 6455注册过的标准协议;开放协议,如XMPP和STOMP;自定义协议。
- WebSocket事件:WebSocket对象调度4个事件,
open:服务器已经建立了连接
message:接收到服务器发送的消息,除了文本,消息还可以是二进制数据
error:响应发生意外故障时触发,导致WebSocket连接关闭
close:连接关闭时触发
如服务器响应了WebSocket连接请求,触发open事件并建立一个连接。
ws.open = function(e) {
console.log(“Connection open …”);
};
- WebSocket方法:send()和close()
- WebSocket对象特性:
- readyState:WebSocket对象的只读特性,表示其连接状态;
特性常量 |
取值 |
状态 |
WebSocket.CONNECTING |
0 |
连接正在进行中,但还未建立 |
WebSocket.OPEN |
1 |
连接已经建立。消息可以在客户端和服务器之间传递 |
WebSocket.CLOSING |
2 |
连接正在进行关闭握手 |
WebSocket.CLOSED |
3 |
连接已经关闭,不能打开 |
- bufferedAmount:检查发往服务器的缓冲数据量,即已经进入队列,但还未发送到服务器的字节数
- protocol:标示服务器选择的用于与客户端通讯的协议名称,在握手之前,该属性值为空,如果服务器没有选择客户端提供的某个协议,该属性值保持为空值。
- WebSocket协议
- 初始握手:
客户端以http方式发起首次握手,与http协议很相似,会在请求头中包含一个特殊的首标Upgrade:websocket,服务端返回101响应代码、Upgrade首标和Sec-WebSocket-Accept首标,表明连接成功。其中Sec-WebSocket-Accept响应首标的值从Sec-WebSocket-Key请求首标继承而来,包含一个特殊的响应键值,需要与客户端的预期精确匹配。
- WebSocket消息格式:
理论上WebSocket消息可以承载任意长度的数据,但是WebSocket为了传送数据的效率考量,将消息分解为若干帧
WebSocket帧格式包括:
- 操作码:由帧头第一个字节最后4bit组成,表示不同的消息载荷类型,1(文本)、2(二进制)、8(关闭握手)、9(ping)、10(pong)
- 长度:WebSocket利用帧头第二个字节的后7bit表示小于126字节的数据长度,后面额外的2字节表示126—216字节的数据长度,大于216字节的数据长度,利用后8字节表示,总之WebSocket的数据长度用可变长字节表示。
- 解码文本:消息数据用8位UCS转换格式即UTF-8编码;
- 屏蔽:是为了安全原因(如防止跨协议攻击),以及改进与现有http代理的兼容性;
- 多帧消息:发送多帧消息,可以发送一个fin位设置为0 的帧,最后一帧的fin位设置为1,表示消息以这一帧的载荷作为结束。
- WebSocket关闭握手
当WebSocket关闭时,终止连接的断点发送一个关闭代码和UTF-8编码的关闭原因短字符串。具体可以参考《HTML5 WebSocket权威指南》一书。
- 其他协议的支持
客户端可以通过Sec-WebSocket-Protocol首标,传递多个协议,服务器可以在支持的协议中选取一个,用于更高级的协议传递消息。
- 协议扩展
不同于协议协商,协议扩展可以有多个,扩展可以为帧格式添加新的操作码和数据字段,协议扩展是通过Sec-WebSocket-Extensions首标完成的。
- WebSocket Demo
光说不练是假把式,下面咱们就以一个简单代码示例,演示一个简单的基于WebSocket的Web应用程序。
天猫双十一,全天成交额2684亿,还记得那块大屏幕上不断滚动刷新的交易额数字吧,像这种典型的短时间内飞快刷新的数字,如果要实现最新数字的实时呈现,如果靠客户端轮询,是不太准确的,就算每秒钟轮询刷新一次,可能你看到的数字也比实际交易额落后了不知道多少千万?因为一分钟内,天猫交易额就达到65亿,这还不包括“还在路上的正飞奔赶来的订单”。
下面我就以此为参考,制作一个计数显示器,每一秒钟增加1,数据值来自后台WebSocket服务端。
- 前端代码
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Netty WebSocket 秒计数器</title> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8090/websocket"); socket.onmessage = function (event) { var ta = document.getElementById('counter'); ta.innerText = parseInt2DashString(event.data); }; socket.onopen = function (event) { var ta = document.getElementById('counter'); ta.innerText = "----"; }; socket.onclose = function (event) { var ta = document.getElementById('counter'); ta.innerText = ""; }; } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); }
function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("WebSocket连接没有建立成功!"); }
}
function parseInt2DashString(d) { if (isNaN(d)) { return "----"; } else if (parseInt(d) < 10) { return "---" + d; } else if (parseInt(d) < 100) { return "--" + d; } else if (parseInt(d) < 1000) { return "-" + d; } else { return d; } } </script>
<div style="margin-top: 100px"> <div style="width: 20%; float: left"> <input type="button" οnclick="send('start')" value="START" /> <input type="button" οnclick="send('stop')" value="STOP" /> <input type="button" οnclick="send('reset')" value="RESET" /> </div> <div> 计数器: <label id="counter">----</label> </div> </div> </body> </html> |
- 后端代码
- Netty WebSocket Server启动类
package com.constantine;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; public class WebSocketServer {
public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + "."); System.out.println("Open your browser and navigate to http://localhost:" + port + "/"); ch.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
public static void main(String[] args) throws Exception { int port = 8090; if (args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new WebSocketServer().run(port); }
} |
- Netty WebSocketServer处理类
package com.constantine;
import java.util.Timer; /*import java.util.logging.Level; import java.util.logging.Logger;*/
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil;
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { // private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
private WebSocketServerHandshaker handshaker; private static int counter = 0; private boolean stop = false; private Timer timer = new Timer(); private WebSocketTimerTask timerTask = null;
public static int getCounter() { return counter; }
public static void setCounter(int counter) { WebSocketServerHandler.counter = counter; }
@Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text(); switch (request.toUpperCase()) { case "START": stop = false; if (counter < 10000 && !stop) { timerTask = new WebSocketTimerTask(ctx, (WebSocketFrame) msg, counter, handshaker); timer.schedule(timerTask, 0, 1000); } break; case "STOP": stop = true; if (timerTask != null) { timerTask.cancel(); timerTask = null; } break; case "RESET": counter = 0; stop = true; if (timerTask != null) { timerTask.cancel(); timerTask = null; } break; default: break; } } } }
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { if (!req.getDecoderResult().isSuccess() || !"websocket".equals(req.headers().get("Upgrade"))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:8090/websocket", null, false);
handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } }
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { if (res.getStatus().code() != HttpResponseStatus.OK.code()) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpHeaders.setContentLength(res, res.content().readableBytes()); } ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
} |
- 定时任务处理器,每秒钟执行一次数值更新
package com.constantine;
import java.util.TimerTask;
import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
public class WebSocketTimerTask extends TimerTask {
ChannelHandlerContext ctx; WebSocketFrame frame; int counter; WebSocketServerHandshaker handshaker;
public ChannelHandlerContext getCtx() { return ctx; }
public void setCtx(ChannelHandlerContext ctx) { this.ctx = ctx; }
public WebSocketFrame getFrame() { return frame; }
public void setFrame(WebSocketFrame frame) { this.frame = frame; }
public int getCounter() { return counter; }
public void setCounter(int counter) { this.counter = counter; }
public WebSocketServerHandshaker getHandshaker() { return handshaker; }
public void setHandshaker(WebSocketServerHandshaker handshaker) { this.handshaker = handshaker; }
public WebSocketTimerTask(ChannelHandlerContext ctx, WebSocketFrame frame, int counter, WebSocketServerHandshaker handshaker) { this.ctx = ctx; this.frame = frame; this.counter = counter; this.handshaker = handshaker; }
@Override public void run() { this.handleWebSocketFrame(ctx, frame); }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if(frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); return; }
if(frame instanceof PingWebSocketFrame){ ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; }
if(!(frame instanceof TextWebSocketFrame)){ throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName())); }
ctx.channel().write(new TextWebSocketFrame(String.valueOf(counter++))); ctx.channel().flush(); WebSocketServerHandler.setCounter(counter); }
} |
关于如何应用Netty框架构建WebSocket的服务器,有兴趣的同学可以参考相关书籍学习,代码比较简单,也不够完善,供大家参考,欢迎批评指正。