Netty使用MessagePack自定义编解码POJO对象实例
一、MessagePack
本文使用的MessagePack为0.8.13,相关知识参考MessagePack-0.8.13-msgpack-core。
MessagePack,官方描述:
It's like JSON.
but fast and small.
它想JSON,但更快更小。
1、maven依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.msgpack/jackson-dataformat-msgpack -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.8.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.35</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
2、MessagePack序列化和反序列化
(1)创建实体类Persion
public class Person {
private String name;
private int age;
private boolean sex;
public Person(){}
public Person(String name, int age, boolean sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
//getter和set方法略
@Override
public String toString() {
return "Person[name = " + name + ",age = " + age + ",sex = " + sex + "]";
}
}
(2)将对象序列化成byte数组以及将byte数组反序列化为对象:
Person person = new Person("jemmy",18, true);
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
byte[] bytes = mapper.writeValueAsBytes(person);
System.out.println(bytes.length);
System.out.println(Arrays.toString(bytes));
Person p = mapper.readValue(bytes, Person.class);
System.out.println(p);
(3)List序列化成byte数组和反序列化
List<Person> list = new ArrayList<Person>();
list.add(person);
list.add(new Person("tomy", 20, false));
byte[] listBytes = mapper.writeValueAsBytes(list);
System.out.println(listBytes.length);
System.out.println(Arrays.toString(listBytes));
System.out.println("===================>");
JavaType type = mapper.getTypeFactory().constructParametricType(ArrayList.class,Person.class);
List<Person> persons = mapper.readValue(listBytes, type);
System.out.println(persons);
System.out.println("==============================================>");
抽象成MessagePackUtil工具类,代码如下:
package com.jemmy.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* MessagePack工具类
* 1、实现对象转byte数组
* 2、实现byte数组转对象
* 3、实现byte数组转List
*/
public class MessagePackUtil {
//静态对象
private static ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
/**
* 对象转byte数组,
* @param obj
* @param <T>
* @return
*/
public static <T> byte[] toBytes(T obj){
try {
return mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
/**
* byte数组转List
* @param bytes
* @param clazz
* @param <T>
* @return
*/
public static <T> List<T> toList(byte[] bytes, Class<T> clazz){
List<T> list = null;
try {
list = mapper.readValue(bytes, MessagePackUtil.<T>List(clazz));
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
/**
* byte数组转对象
* @param bytes
* @param clazz
* @param <T>
* @return
*/
public static <T> T toObject(byte[] bytes, Class<T> clazz) {
try {
return mapper.readValue(bytes, clazz);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 私有静态方法,获取List的实例化类型
* @param clazz //List元素对象类型
* @param <T>
* @return
*/
private static <T> JavaType List(Class<?> clazz){
return mapper.getTypeFactory().constructParametricType(ArrayList.class, clazz);
}
}
二、自定义编码器和解码器
MessagePack编码器开发,代码如下,参考《Netty权威指南》(第二版,高清,有需要的,评论区留邮箱)。
继承MessageToMessageDecoder,覆写decode方法,将channel中的msg读取出来,然后加入到List<Object>中
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
out.add(MessagePackUtil.toObject(bytes, Object.class));
}
}
MessagePack解码器开发,代码如下:
将msg对象序列化成byte数组,写入channel的ByteBuf缓冲区中。
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
byte[] raw = MessagePackUtil.toBytes(msg);
out.writeBytes(raw);
}
}
使用EchoServer案例使用和验证自定义的编解码器。
EchoServer代码:
package com.jemmy.chapter07.messagePack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class EchoServer {
private int port;
public EchoServer(int port){
this.port = port;
}
public void run(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//首先加入LengthFieldBasedFrameDecoder解码器,解决tcp粘包和拆包的问题
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535, 0,2,0,2));
//加入MessagePackDcoder
ch.pipeline().addLast("msgPack decoder", new MsgPackDecoder());
//加入LengthFieldPrepender编码器,解决tcp粘包和拆包的问题
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
//加入MessagePackEncoder
ch.pipeline().addLast("msgPack encoder", new MsgPackEncoder());
//业务逻辑
ch.pipeline().addLast(new EchoServerHandler());
}
});
System.out.println("Echo Server start at port : " + port);
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 1){
try{
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
e.printStackTrace();
}
}
new EchoServer(port).run();
}
}
EchoServerHandler代码:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//由于MessagePack编解码器的存在,到此处的msg,已经是Person对象,可以强转成Persion对象
System.out.println("The Server Recieve message:" + msg);
ctx.writeAndFlush(msg); //收到消息后直接返回给Client
}
/**
* 异常的时候打印栈和关闭channel上下文对象
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoClient代码:(代码与服务端类似)
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import java.net.InetSocketAddress;
public class EchoClient {
private int port;
private String host;
public EchoClient(String host, int port){
this.port = port;
this.host = host;
}
public void run(){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535, 0,2,0,2));
ch.pipeline().addLast("msgPack decoder", new MsgPackDecoder());
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("msgPack encoder", new MsgPackEncoder());
ch.pipeline().addLast(new EchoClientHandler(10));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
new EchoClient("127.0.0.1", 8080).run();
}
}
EchoClientHandler代码:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int sendNumber;
public EchoClientHandler(int sendNumber){
this.sendNumber = sendNumber;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送对象数据给服务端
Person[] persons = persionInfo();
for (Person p:persons) {
ctx.write(p);
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印服务端发送的数据
System.out.println("Client recieve message:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 初始化发送的消息对象
* @return
*/
private Person [] persionInfo(){
Person[] persons = new Person[sendNumber];
Person person = null;
for (int i = 0; i < sendNumber ; i++) {
person = new Person("Jemmy===>" + i, 10 + i, true);
persons[i] = person;
}
return persons;
}
}
代码结构:
测试结果:
服务端打印:
The Server Recieve message:{name=Jemmy===>0, age=10, sex=true}
The Server Recieve message:{name=Jemmy===>1, age=11, sex=true}
The Server Recieve message:{name=Jemmy===>2, age=12, sex=true}
The Server Recieve message:{name=Jemmy===>3, age=13, sex=true}
The Server Recieve message:{name=Jemmy===>4, age=14, sex=true}
The Server Recieve message:{name=Jemmy===>5, age=15, sex=true}
The Server Recieve message:{name=Jemmy===>6, age=16, sex=true}
The Server Recieve message:{name=Jemmy===>7, age=17, sex=true}
The Server Recieve message:{name=Jemmy===>8, age=18, sex=true}
The Server Recieve message:{name=Jemmy===>9, age=19, sex=true}
客户端打印:
Client recieve message:{name=Jemmy===>0, age=10, sex=true}
Client recieve message:{name=Jemmy===>1, age=11, sex=true}
Client recieve message:{name=Jemmy===>2, age=12, sex=true}
Client recieve message:{name=Jemmy===>3, age=13, sex=true}
Client recieve message:{name=Jemmy===>4, age=14, sex=true}
Client recieve message:{name=Jemmy===>5, age=15, sex=true}
Client recieve message:{name=Jemmy===>6, age=16, sex=true}
Client recieve message:{name=Jemmy===>7, age=17, sex=true}
Client recieve message:{name=Jemmy===>8, age=18, sex=true}
Client recieve message:{name=Jemmy===>9, age=19, sex=true}