Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码
Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码
零、 目录
- IM系统简介
- Netty 简介
- Netty 环境配置
- 服务端启动流程
- 客户端启动流程
- 实战: 客户端和服务端双向通信
- 数据传输载体ByteBuf介绍
- 客户端与服务端通信协议编解码
- 实现客户端登录
- 实现客户端与服务端收发消息
- pipeline与channelHandler
- 构建客户端与服务端pipeline
- 拆包粘包理论与解决方案
- channelHandler的生命周期
- 使用channelHandler的热插拔实现客户端身份校验
- 客户端互聊原理与实现
- 群聊的发起与通知
- 群聊的成员管理(加入与退出,获取成员列表)
- 群聊消息的收发及Netty性能优化
- 心跳与空闲检测
- 总结
- 扩展
八、 服务端和客户端通信协议编解码
- 上一小节我们学习了ByteBuf 的API , 这一小节我们拉学习如何设计并实现客户端与服务端的通信协议
- 什么是服务端与客户端的通信协议?
- 无论是Netty 还是原始的Socket编程 , 基于TCP通信的数据包格式均为二进制 , 协议指的就是客户端和服务端事先商量好的 , 每一个二进制数据包中每一段字节分别表示什么含义的规则 , 如下图的一个简单的登录指令
- 这个数据包中 , 第一个字节 为1 表示这是一个登录指令 , 接下来是用户名和密码 , 这两个值以\0 分割 , 客户端发送这段二进制数据包到服务端 , 服务端就能根据这个协议取出来用户名密码 , 进行登录逻辑 , 实际的通信协议设计中 , 我们会考虑更多细节 , 比这个稍微复杂一点 。
- 那么协议设计好之后 , 客户端和服务端之间的通信过程又是怎样的呢?
- 如上图所示 , 客户端和服务端通信:
- 首先 , 客户端把一个java对象按照通信协议转换成二进制数据包
- 然后通过网络 , 把这段二进制数据包发送到服务端 , 数据的传输过程由TCP/IP协议负责数据的传输 , 和我们应用层无关
- 服务端接收到数据之后 , 按照协议取出二进制数据包中的相应的字段 , 包装成java对象 , 交给应用逻辑处理
- 服务端处理完之后, 如果需要突出相应给客户端 , 那么按照相同的逻辑进行。
- 如上图所示 , 客户端和服务端通信:
- 在本系列的第一小节中我们一进列出了实现一个支持单聊和群聊的IM指令集合 , 我们设计协议的目的就是为了客户端与服务端能够识别出这些具体的指令。
- 无论是Netty 还是原始的Socket编程 , 基于TCP通信的数据包格式均为二进制 , 协议指的就是客户端和服务端事先商量好的 , 每一个二进制数据包中每一段字节分别表示什么含义的规则 , 如下图的一个简单的登录指令
- 通信协议的设计
-
- 首先第一个数是 魔数 , 通常情况下为固定的几个字节 (我们这里规定4个字节) , 为什么需要这个字段 , 而且还是一个固定的数? 假设我们在服务器上开了一个端口 , 比如 80 端口 , 如果没有这个魔数 , 任何数据包传递到服务器 , 服务器都会根据自定义的协议进行处理 , 包括不符合自定义协议规范的数据包 。 例如: 我们直接通过http://IP:port来访问服务器 , 服务器收到的是一个标准的Http协议数据包 , 但是他仍然会按照事先约定好的协议来处理HTTP协议 , 显然这时会解析出错的 , 而有了这个魔数之后 , 服务器首先取出前四个字节进行比对 , 能够在第一时间识别出这个数据包并非是遵循自定义协议的 , 也就是说无效的数据包 , 为了安全考虑 , 可以直接关闭连接以节省资源 。 在java的二进制文件中 , 开头的4个字节为0xcafebabe 用来表示这是一个字节码文件 , 也是异曲同工之妙 。
- 接下来一个字节是版本号 , 通常情况下是预留字段 , 用于协议升级的时候用到 , 有点类似TCP/IP 协议中的一个字段表示是IPV4还是IPV6 , 大多数情况下 , 这个字段是用不到的 , 不过为了协议能够支持升级 , 我们留着 。
- 第三部分 ,序列化算法表示如何把java对象转换为二进制数据以及二进制数据转换会java对象 , 比如java自带的序列化 , json 、 hessian 等序列化方式。
- 第四部分 表示 指令 , 关于指令的介绍 , 我们在前面已经讨论过 , 服务端或者客户端每收到一种指令都会有相应的处理逻辑 , 这里我们用一个字节来表示 , 最高支持256中指令 , 对于我们的IM 系统来说 完全够用了
- 接下来 的字段为数据部分的长度 , 占四个字节
- 最后一个部分为数据部分 , 每一种指令对应的数据是不一样的 , 比如登录的时候需要用户名密码 , 收消息的时候需要用户标识和具体的消息内容
- 通常情况下 ,这样一套标准的协议能够适配大多数情况下的服务端与客户端的通信场景 , 接下来我们就来看一下 如何使用Netty 来实现这套协议
-
- 通信协议的实现
-
我们把java对象根据协议封装成二进制数据包的过程称为编码 , 而把从二进制数据包中解析出就java对象的过程称为解码 。 在学习如何使用Netty 进行通信协议编解码之前 , 我们先来定义一下客户端和服务端通信的java 对象 。
-
java 对象
/** * 数据包对象 * @author outman * */ @Data abstract class Packet{ /** * 协议版本 * */ private Byte version = 1; /** * 获取指令 * */ public abstract Byte getCommand(); /** * 指令集合内部接口 * */ interface Command{ public static final Byte LOGIN_REQUEST = 1; } }
-
以上是通信过程中 java对象的抽象类 , 可以看到我们定义了一个版本号(默认值为1) 以及一个获取指令的抽象方法 , 所有的指令数据包都必须实现这个方法 , 这样我们就可以知道某种指令的含义
-
@Data 注解由lombok 提供 , 他会自动帮我们产生getter/setter 方法 , 减少大量的重复代码 , 需要添加依赖
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency>
-
接下来 , 我们一客户登录请求为例 , 定义登录请求数据包 :
@Data class LoginRequestPacket extends Packet{ private Integer uerId ; private String userName; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; } }
- 登录请求数据包 继承自Packet 然后定义了三个字段 , 分别是用户ID , 用户名 、密码 , 这里最为重要的是覆盖了父类的getCommand() 方法 值为常量Command.LOGIN_REQUEST
-
java对象定义完成之后 , 接下来我们就要定义一种规则 , 如何把一个java对象转换成二进制数据 , 这个规则叫做java对象的序列化
-
-
序列化
-
我们如下定义序列化接口
/** * 序列化接口 * @author outman * */ interface Serializer{ /** * 序列化算法 * */ byte getSerializerAlgorithm(); /** * java 对象转换成二进制 (序列化) * */ byte[] serialize(Object obj); /** * 二进制转换为java对象 (反序列化) * */ <T> T deSerialize(Class<T> clazz , byte[] bytes); /** * 序列化算法标识集合接口 * */ interface SerializerAlgorithm{ public static final byte JSON = 1; } }
- 序列化接口有三个方法: getSerializerAlgorithm() 获取具体的序列化算法标识 , serialize() 将java对象转换为字节数组 , deSerialize()将字节数组转转为对应类型的java对象 , 在本小节中 , 我们使用最简单的json序列化方式 , 使用阿里巴巴的fastJson作为序列化框架; 接口中还有一个内部接口 , 用于让我们定义序列化算法标识的集合
-
fastjson 依赖
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency>
-
Json序列化实现类
/** * JSON 序列化实现类 * @author outman * */ class JSONSerializer implements Serializer{ @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; } @Override public byte[] serialize(Object obj) { return JSONObject.toJSONBytes(obj); } @Override public <T> T deSerialize(Class<T> clazz, byte[] bytes) { return JSONObject.parseObject(bytes, clazz); } }
-
我们设置 Srializer 的默认序列化方式为JSONSerializer
/** * 序列化接口 * @author outman * */ interface Serializer{ /** * 默认的序列化对象 * */ Serializer DEFAULT = new JSONSerializer(); /** * 序列化算法 * */ byte getSerializerAlgorithm(); /** * java 对象转换成二进制 (序列化) * */ byte[] serialize(Object obj); /** * 二进制转换为java对象 (反序列化) * */ <T> T deSerialize(Class<T> clazz , byte[] bytes); /** * 序列化算法标识集合接口 * */ interface SerializerAlgorithm{ public static final byte JSON = 1; } }
-
- 序列化接口有三个方法: getSerializerAlgorithm() 获取具体的序列化算法标识 , serialize() 将java对象转换为字节数组 , deSerialize()将字节数组转转为对应类型的java对象 , 在本小节中 , 我们使用最简单的json序列化方式 , 使用阿里巴巴的fastJson作为序列化框架; 接口中还有一个内部接口 , 用于让我们定义序列化算法标识的集合
-
这样我们就是实现了序列化相关的逻辑 , 如果想要实现其他的序列化算法的话 , 只需要实现一下Serializer接口 , 然后定义一下 序列化算法标识 就好啦。
-
-
编码: 封装成二进制数据的过程
/** * 数据包编解码类 * @author outman * */ class PacketCodec{ // 魔数 private static final int MAGIC_NUMBER = 0x12345678; public ByteBuf enCode(Packet packet) { // 1. 创建ByteBuf 对象 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); // 2. 序列化java对象 byte[] bs = Serializer.DEFAULT.serialize(packet); // 3. 实际编码过程 byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数 byteBuf.writeInt(packet.getVersion()); // 写入协议版本号 byteBuf.writeInt(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法 byteBuf.writeByte(packet.getCommand()); // 写入指令 byteBuf.writeInt(bs.length); // 写入数据长度 byteBuf.writeBytes(bs); // 写入数据 return byteBuf; } }
- 编码过程分为三个过程:
- 首先我们创建一个ByteBuf , 这里我们调用Netty 的ByteBuf分配器来创建 , ioBuffrer() 方法会返回适配io读写相关的内存 , 他尽可能创建一个直接内存 ,直接内存可以理解为不受jvm 对内存法管理 , 写到Io 缓冲区的效果更高
- 接下来我们把java 对象序列化成二进制数据包
- 最后我们对照本小节开头的协议的设计以及上一小节ByeBuf 的API , 逐个往bytebuf 中写入字段 , 及实现了编码过程
- 一端实现了编码 。 Netty 会将次ByteBuf 写到另外一端 , 另外一端拿到的也是一个ByteBuf 对象, 基于这个ByteBuf 对象 , 就可以反解出对端创建的java对象 , 这个过程我们称之为解码
- 编码过程分为三个过程:
-
解码: 解析java对象的过程
-
还是刚刚的PacketCodec.java 中添加deCode(buf) 、 getRequestPacket(byte command) 、 getSerializer(byte serializeAlgorithm)方法
/** * 数据包编解码类 * * @author outman */ class PacketCodec { // 魔数 private static final int MAGIC_NUMBER = 0x12345678; // 指令 与 数据包 映射 private final Map<Byte, Class<? super Packet>> packetTypeMap; // 序列化算法 与 序列化类 映射 private final Map<Byte, Class<? super Serializer>> serializerMap; // 单例 public static final PacketCodec INSTANCE = new PacketCodec(); // 注册Packet集合 List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class }); // 注册序列化算法集合 List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class }); /** * 初始化 指令 与 数据包 映射 序列化算法 与 序列化类 映射 */ private PacketCodec() { // 初始化 指令 与 数据包 映射 packetTypeMap = new HashMap<Byte, Class<? super Packet>>(); packetList.forEach(clazz -> { try { Method method = clazz.getMethod("getCommand"); Byte command = (Byte) method.invoke(clazz); packetTypeMap.put(command, clazz); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }); // 初始化序列化算法 与 序列化类 映射 serializerMap = new HashMap<Byte, Class<? super Serializer>>(); serializerList.forEach(clazz -> { try { Method method = clazz.getMethod("getSerializerAlgorithm"); Byte serializerAlgorithm = (Byte) method.invoke(clazz); serializerMap.put(serializerAlgorithm, clazz); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } // 编码 public ByteBuf enCode(Packet packet) { // 1. 创建ByteBuf 对象 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); // 2. 序列化java对象 byte[] bs = Serializer.DEFAULT.serialize(packet); // 3. 实际编码过程 byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数 byteBuf.writeByte(packet.getVersion()); // 写入协议版本号 byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法 byteBuf.writeByte(packet.getCommand()); // 写入指令 byteBuf.writeInt(bs.length); // 写入数据长度 byteBuf.writeBytes(bs); // 写入数据 return byteBuf; } // 解码 public Packet deCode(ByteBuf byteBuf) throws Exception { // 跳过 魔数 校验 byteBuf.skipBytes(4); // 跳过版本号 byteBuf.skipBytes(1); // 序列化算法标识 byte serializeAlgorithm = byteBuf.readByte(); // 指令标识 byte command = byteBuf.readByte(); // 数据包长度 int length = byteBuf.readInt(); // 数据 byte[] bs = new byte[length]; byteBuf.readBytes(bs); // 通过序列化算法标识获取对应的 序列化对象 Serializer serializer = getSerializer(serializeAlgorithm); // 通过指令标识 获取对应的 数据包类 Packet packet = getRequestPacket(command); // 执行解码 if (serializer != null && packet != null) { return serializer.deSerialize(packet.getClass(), bs); } else { System.out.println("没有找到对应的序列化对象或数据包对象"); return null; } } // 通过指令获取对应的 数据包类 private Packet getRequestPacket(byte command) throws Exception { return (Packet) packetTypeMap.get(command).newInstance(); } // 通过序列化算法标识 获取对应的序列化类 private Serializer getSerializer(byte serializeAlgorithm) throws Exception { return (Serializer) serializerMap.get(serializeAlgorithm).newInstance(); } }
- 解码的流程如下
- 我们假定deCode方法传递进来的ByteBuf已经合法(后面的小节我们会实现校验) 即首部4个字节是我们前面定义的魔数 , 这里我们跳过
- 这里我们暂时不关注协议版本 , 通常我们没有遇到协议升级的时候 , 这个字段暂不处理 , 因为你会发现 , 在大多数情况下 , 这个字段几乎用不着 , 但是我们仍然保留
- 接下来我们拿到 序列化算法标识 、 指令标识 、 数据长度 、 数据
- 最后我们根据拿到的数据长度取出数据 , 通过指令标识拿到对应的java对象 , 根据序列化算法标识 拿到序列化对象 , 将字节码转换为java对象
- 解码的流程如下
-
-
-
完整代码
package com.tj.NIO_test_maven; import java.lang.reflect.Method; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import lombok.Data; /** * @author outman */ public class Test_09_客戶端与服务端通信协议编解码 { public static void main(String[] args) { } } /** * 数据包对象 * * @author outman */ @Data abstract class Packet { /** * 协议版本 */ private Byte version = 1; /** * 获取指令 */ public abstract Byte getCommand(); /** * 指令集合内部接口 */ interface Command { public static final Byte LOGIN_REQUEST = 1; } } /** * 登录请求数据包 * * @author outman */ @Data class LoginRequestPacket extends Packet { private Integer uerId; private String userName; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; } } /** * 序列化接口 * * @author outman */ interface Serializer { /** * 默认的序列化对对象 */ Serializer DEFAULT = new JSONSerializer(); /** * 序列化算法 */ byte getSerializerAlgorithm(); /** * java 对象转换成二进制 (序列化) */ byte[] serialize(Object obj); /** * 二进制转换为java对象 (反序列化) */ <T> T deSerialize(Class<T> clazz, byte[] bytes); /** * 序列化算法标识集合接口 */ interface SerializerAlgorithm { public static final byte JSON = 1; } } /** * JSON 序列化实现类 * * @author outman */ class JSONSerializer implements Serializer { @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; } @Override public byte[] serialize(Object obj) { return JSONObject.toJSONBytes(obj); } @Override public <T> T deSerialize(Class<T> clazz, byte[] bytes) { return JSONObject.parseObject(bytes, clazz); } } /** * 数据包编解码类 * * @author outman */ class PacketCodec { // 魔数 private static final int MAGIC_NUMBER = 0x12345678; // 指令 与 数据包 映射 private final Map<Byte, Class<? super Packet>> packetTypeMap; // 序列化算法 与 序列化类 映射 private final Map<Byte, Class<? super Serializer>> serializerMap; // 单例 public static final PacketCodec INSTANCE = new PacketCodec(); // 注册Packet集合 List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class }); // 注册序列化算法集合 List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class }); /** * 初始化 指令 与 数据包 映射 序列化算法 与 序列化类 映射 */ private PacketCodec() { // 初始化 指令 与 数据包 映射 packetTypeMap = new HashMap<Byte, Class<? super Packet>>(); packetList.forEach(clazz -> { try { Method method = clazz.getMethod("getCommand"); Byte command = (Byte) method.invoke(clazz); packetTypeMap.put(command, clazz); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }); // 初始化序列化算法 与 序列化类 映射 serializerMap = new HashMap<Byte, Class<? super Serializer>>(); serializerList.forEach(clazz -> { try { Method method = clazz.getMethod("getSerializerAlgorithm"); Byte serializerAlgorithm = (Byte) method.invoke(clazz); serializerMap.put(serializerAlgorithm, clazz); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } // 编码 public ByteBuf enCode(Packet packet) { // 1. 创建ByteBuf 对象 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); // 2. 序列化java对象 byte[] bs = Serializer.DEFAULT.serialize(packet); // 3. 实际编码过程 byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数 byteBuf.writeByte(packet.getVersion()); // 写入协议版本号 byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法 byteBuf.writeByte(packet.getCommand()); // 写入指令 byteBuf.writeInt(bs.length); // 写入数据长度 byteBuf.writeBytes(bs); // 写入数据 return byteBuf; } // 解码 public Packet deCode(ByteBuf byteBuf) throws Exception { // 跳过 魔数 校验 byteBuf.skipBytes(4); // 跳过版本号 byteBuf.skipBytes(1); // 序列化算法标识 byte serializeAlgorithm = byteBuf.readByte(); // 指令标识 byte command = byteBuf.readByte(); // 数据包长度 int length = byteBuf.readInt(); // 数据 byte[] bs = new byte[length]; byteBuf.readBytes(bs); // 通过序列化算法标识获取对应的 序列化对象 Serializer serializer = getSerializer(serializeAlgorithm); // 通过指令标识 获取对应的 数据包类 Packet packet = getRequestPacket(command); // 执行解码 if (serializer != null && packet != null) { return serializer.deSerialize(packet.getClass(), bs); } else { System.out.println("没有找到对应的序列化对象或数据包对象"); return null; } } // 通过指令获取对应的 数据包类 private Packet getRequestPacket(byte command) throws Exception { return (Packet) packetTypeMap.get(command).newInstance(); } // 通过序列化算法标识 获取对应的序列化类 private Serializer getSerializer(byte serializeAlgorithm) throws Exception { return (Serializer) serializerMap.get(serializeAlgorithm).newInstance(); } }
-
- 总结:
- 通信协议书为了服务端和客户端交互 , 双方协商出来的满足一定规则的二进制数据格式
- 介绍了一种通用的通信协议的设计 , 包括魔数 、 版本号 、 序列化算法标识 、 指令标识 、数据长度 、 数据几个字段 , 该协议能够满足大多数通信的场景
- java对象以及序列化 , 目的就是实现java对象与二进制数据的互换
- 最后我们依照我们设计的协议以及ByteBuf 的API 实现了通信协议 , 这个过程成为编码过程
- 思考:
- 序列化和编码都是 把java对象封装成二进制数据的过程 , 这两者有什么区别?
- 序列化是把内容变成计算机可传输的资源,而编码则是让程序认识这份资源。
- 指令标识 、 序列化算法标识为什么不用枚举?
- 请问这节课自己设计的通信协议是属于应用层协议,和http协议是同一级别是吧?
- 对的,这样理解起来完全正确,只不过自定义协议属于私有协议,http属于共有协议
- 使用protobuf 生成的对象的二进制要小很多,使用protobuf 可以减小数据包的大小。一个数据包无法装下一个对象的这种情况怎么处理呢(就是一个ByteBuf 被很多个物理层数据包传输的情况)?
- 所以设计协议的时候,长度字段的长度需要考量,需要支持最大的数据包大小,这里是4个字节,最大值为 2147483647,已经完全足够了,然后一个物理层数据包如果塞不下,会被拆成多个数据包,另外一端接受的时候把这些数据包粘合起来,可参考13小结
- 序列化和编码都是 把java对象封装成二进制数据的过程 , 这两者有什么区别?