分布式网络通信框架Netty实战_Netty实现客户端和服务器NIO通信
分布式网络通信框架Netty实战
Netty实现客户端和服务器NIO通信
代码实战
田超凡 2019年10月28日
Netty线程组NioEventLoopGroup工作原理图
代码搭建Netty服务器和客户端,基于NIO模型、ByteBuffer缓冲区、Netty内置编码器解决TCP粘包拆包问题,实现IO通信:
package com.tcf.netty.demo.server;
import com.tcf.netty.demo.server.handle.NettyServerEventListenHandle;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
/***
* TODO TCF Netty服务器
* @author Hasee
*
*/
public class NettyServer {
//TODO TCF 服务器端口号:8080
private static int port=8080;
public static void main(String[] args)
{
//TODO TCF Boss线程组
NioEventLoopGroup bossGroup=new NioEventLoopGroup();
//TODO TCF 工作线程组
NioEventLoopGroup workGroup=new NioEventLoopGroup();
//TODO TCF Netty-NIO服务器
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
//TODO TCF 注册事件处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
{
//TODO TCF Netty内置编码器,解决TCP拆包粘包问题
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringEncoder());
//TODO TCF 自定义事件监听处理器
socketChannel.pipeline().addLast(new NettyServerEventListenHandle());
}
});
//TODO TCF 接收客户端发送的消息
try
{
//TODO TCF 同步处理方式-sync
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
System.out.println("Netty服务器启动成功....");
//TODO TCF 等待服务器监听端口-同步sync
channelFuture.channel().closeFuture().sync();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
//TODO TCF 关闭连接,释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
package com.tcf.netty.demo.server.handle;
import java.nio.charset.Charset;
import org.apache.commons.lang.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/***
* TODO TCF Netty服务器-事件监听处理器
* @author Hasee
*
*/
public class NettyServerEventListenHandle extends SimpleChannelInboundHandler<ByteBuf> {
//TODO TCF 接收到客户端发送消息计数器
private static int count=0;
/***
* TODO TCF 监听接收客户端发送消息事件
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
String message=byteBuf.toString(Charset.forName("UTF-8"));
if(StringUtils.isNotEmpty(message))
{
//TODO TCF Example 1 指定分隔符解决TCP拆包粘包问题
/*String[] arrays=message.split("\n");
if(arrays!=null && arrays.length>0)
{
System.out.println("服务器接收到的消息:");
for(String str:arrays)
{
System.out.println(str);
}
}*/
//TODO TCF Example 2 基于Netty内置编码器解决TCP拆包粘包问题
System.out.println("服务器接收到的消息:"+message);
//TODO TCF 给客户端返回响应
ctx.writeAndFlush(Unpooled.copiedBuffer("Response Message From NioServer"+count+"\n",Charset.forName("UTF-8")));
count++;
}
}
}
package com.tcf.netty.demo.client;
import java.net.InetSocketAddress;
import com.tcf.netty.demo.client.handle.NettyClientEventListenHandle;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/***
* TODO TCF Netty客户端
* @author Hasee
*
*/
public class NettyClient {
public static void main(String[] args)
{
//TODO TCF 工作线程组
NioEventLoopGroup workGroup=new NioEventLoopGroup();
//TODO TCF Netty客户端
Bootstrap client=new Bootstrap();
client.group(workGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(8080))
.handler(new ChannelInitializer<SocketChannel>() {
//TODO TCF Netty客户端初始化事件-自定义初始化事件处理器
@Override
protected void initChannel(SocketChannel channel) throws Exception
{
channel.pipeline().addLast(new NettyClientEventListenHandle());
}
});
try
{
//TODO TCF Netty客户端处理通道
ChannelFuture channelFuture=client.connect().sync();
channelFuture.channel().closeFuture().sync();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
//TODO TCF 关闭线程组,释放资源
workGroup.shutdownGracefully();
}
}
}
package com.tcf.netty.demo.client.handle;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/***
* TODO TCF Netty客户端-事件监听处理器
* @author Hasee
*
*/
public class NettyClientEventListenHandle extends SimpleChannelInboundHandler<ByteBuf> {
//TODO TCF 接收服务器返回的数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
{
//TODO TCF 服务器返回的数据
String message=byteBuf.toString(Charset.forName("UTF-8"));
if(StringUtils.isNotEmpty(message))
{
System.out.println(message);
}
}
//TODO TCF 向服务器发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
//TODO TCF Example 1 指定分隔符解决TCP拆包粘包问题
/*for(int i=0;i<10;i++)
{
ByteBuf byteBuf=Unpooled.copiedBuffer("发送消息-"+i+"\n",Charset.forName("UTF-8"));
ctx.writeAndFlush(byteBuf);
}*/
//TODO TCF Example 2 基于Netty服务器内置编码器解决TCP拆包粘包问题
for(int i=0;i<10;i++)
{
ByteBuf byteBuf=Unpooled.copiedBuffer("发送消息-"+i+"\n",Charset.forName("UTF-8"));
ctx.writeAndFlush(byteBuf);
}
}
}