Netty —— 搭建简单的服务器(含编码解码器)
接收报文格式:报文头(int,表示报文体数据长度,如:1024) + 报文体(客户端发送数据转换为byte数组)
响应报文格式:不做要求。
解码器作用:
接收报文并且进行预处理,将加密后或序列化后的数据转换成原数据或者对象,再传递给handler进行处理。通俗地说就是把我们报文从难以看懂的数据状态转化成我们系统可以识别的数据状态。
编码器作用:
将我们要发送出去的数据,安装报文规定的格式,对数据进行加密或者序列化或者其他操作,转化成别人系统可以接收识别的报文。和解码器相反而且对应。
服务器:
编码器:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class Encoder extends MessageToByteEncoder<byte[]> {
@Override
public void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out)
throws Exception {
System.out.println("编码器发送客户端报文: " + new String(msg, "utf-8"));
out.writeBytes(msg);
}
}
解码器:
import java.nio.charset.Charset;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class Decoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
//报文最小长度为4,若低于4则说明报文有问题
if (in.readableBytes() < 4) {
return;
}
//在读取前标记readerIndex
in.markReaderIndex();
//读取头部,获取报文长度
int length = in.readInt();
System.out.println("报文体数据长度为:" + length);
if (in.readableBytes() < length) {
//消息不完整,无法处理,将readerIndex复位
System.out.println("消息不完整,无法处理,将readerIndex复位:" + in.readBytes(in.readableBytes()).toString(Charset.forName("utf-8")));
in.resetReaderIndex();
return;
}
String mString = in.readBytes(length).toString(Charset.forName("utf-8")) ;
System.out.println("保存报文数据:" + mString);
out.add(mString);
}
}
handler类:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 处理客户端的请求
* @author zhb
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
// 读取数据时这个方法会在收到消息时被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器接收到客户端信息: " + msg.toString());
String respStr = "服务端已接收到客户端信息,返回客户端信息!";//服务端相应客户端数据
// 返回给客户端响应
ctx.write(respStr.getBytes("utf-8"));
ctx.flush();
}
// 数据读取完毕的处理
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.err.println("服务器读取数据完成");
}
// 出现异常的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("服务器读取数据异常:" + cause.getMessage());
ctx.close();
}
}
服务器规则(Main):
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* tcp/ip 服务端用netty实现
* @author zhb
*
*/
public class ServerNetty {
private int port; //服务器端口
public ServerNetty(int port){
this.port = port;
}
// netty 服务器启动
public void serverStart() throws InterruptedException{
// 用来接收进来的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// nio服务的启动类
ServerBootstrap sbs = new ServerBootstrap();
// 配置nio服务参数
sbs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
.option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
.handler(new LoggingHandler(LogLevel.INFO)) // 日志级别
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置网络超时时间
//socketChannel.pipeline().addLast(new ReadTimeoutHandler(5 * 1000));
// 设置编码器,解码器,处理接收到的请求
socketChannel.pipeline().addLast( new Encoder() , new Decoder() , new ServerHandler()); // 这里相当于过滤器,可以配置多个
}
});
System.err.println("server 开启--------------");
// 绑定端口,开始接受链接
ChannelFuture cf = sbs.bind(port).sync();
// 开多个端口
// ChannelFuture cf2 = sbs.bind(3333).sync();
// cf2.channel().closeFuture().sync();
// 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
cf.channel().closeFuture().sync();
} finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 开启netty服务器
public static void main(String[] args) throws InterruptedException {
new ServerNetty(8080).serverStart();
}
}
运行此类开启Netty服务器:
客户端:
package com.test.Netty2;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
public class TestClient {
public static final String IP_ADDR = "localhost";//服务器地址
public static final int PORT = 8080;//服务器端口号
public static void main(String[] args) {
System.out.println("客户端启动...");
// while (true) {
Socket socket = null;
try {
//创建一个流套接字并将其连接到指定主机上的指定端口号
socket = new Socket(IP_ADDR, PORT);
//向服务器端发送数据
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
String Msg = "client sent to server!";//发送到服务端的数据
byte[] value = Msg.getBytes("UTF-8") ;//报文体
String MsgLen = String.format("%04d", value.length); //报文头
System.out.println("客户端数据长度:" + MsgLen);
out.writeInt(Integer.valueOf(MsgLen));//发送报文头
System.out.println("客户端发送数据:" + Msg);
out.write(value , 0 , value.length);//发送报文体
out.flush();
//读取服务器端数据
DataInputStream input = new DataInputStream(socket.getInputStream());
StringBuffer returnMsg = new StringBuffer();
int len = 0;
byte[] b = new byte[1024]; //容器,存放数据
System.out.println("开始接收服务端返回数据。。。");
while ((len = input.read(b)) != -1) {//一直读,读到没数据为止
returnMsg.append(new String(b, 0, len, "utf-8"));
if (len < 1024) {//如果读的长度小于1024,说明是最后一次读,后面已经没有数据,跳出循环
break;
}
}
System.out.println("服务器端返回过来的是: " + returnMsg.toString());
out.close();
input.close();
} catch (Exception e) {
System.out.println("客户端异常:" + e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
socket = null;
System.out.println("客户端 finally 异常:" + e.getMessage());
}
}
}
}
}
运行此类,发送客户端请求:
测试结果:
客户端:
客户端启动...
客户端数据长度:0022
客户端发送数据:client sent to server!
开始接收服务端返回数据。。。
服务器端返回过来的是: 服务端已接收到客户端信息,返回客户端信息!