分布式网络通信框架Netty实战_Netty实现客户端和服务器NIO通信

                                  分布式网络通信框架Netty实战

                                              Netty实现客户端和服务器NIO通信

                                                                                             代码实战

                                                                                                                                                                    田超凡 2019年10月28日

Netty线程组NioEventLoopGroup工作原理图

分布式网络通信框架Netty实战_Netty实现客户端和服务器NIO通信

代码搭建Netty服务器和客户端,基于NIO模型、ByteBuffer缓冲区、Netty内置编码器解决TCP粘包拆包问题,实现IO通信:

package com.tcf.netty.demo.server;

import com.tcf.netty.demo.server.handle.NettyServerEventListenHandle;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringEncoder;

/***
 * TODO TCF Netty服务器
 * @author Hasee
 *
 */
public class NettyServer {

    //TODO TCF 服务器端口号:8080
    private static int port=8080;
    
    public static void main(String[] args) 
    {
        //TODO TCF Boss线程组
        NioEventLoopGroup bossGroup=new NioEventLoopGroup();
        
        //TODO TCF 工作线程组
        NioEventLoopGroup workGroup=new NioEventLoopGroup();
        
        //TODO TCF Netty-NIO服务器
        ServerBootstrap serverBootstrap=new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup)
                       .channel(NioServerSocketChannel.class)
                       .childHandler(new ChannelInitializer<SocketChannel>() {
                        
                        //TODO TCF 注册事件处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception 
                        {
                            //TODO TCF Netty内置编码器,解决TCP拆包粘包问题
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringEncoder());
                            
                            //TODO TCF 自定义事件监听处理器
                            socketChannel.pipeline().addLast(new NettyServerEventListenHandle());
                        }
                    });
        
        //TODO TCF 接收客户端发送的消息
        try
        {
            //TODO TCF 同步处理方式-sync
            ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
            
            System.out.println("Netty服务器启动成功....");
            
            //TODO TCF 等待服务器监听端口-同步sync
            channelFuture.channel().closeFuture().sync();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            //TODO TCF 关闭连接,释放资源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    
}
 

package com.tcf.netty.demo.server.handle;

import java.nio.charset.Charset;

import org.apache.commons.lang.StringUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/***
 * TODO TCF Netty服务器-事件监听处理器
 * @author Hasee
 *
 */
public class NettyServerEventListenHandle extends SimpleChannelInboundHandler<ByteBuf> {

    //TODO TCF 接收到客户端发送消息计数器
    private static int count=0;
    
    /***
     * TODO TCF 监听接收客户端发送消息事件
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception 
    {
        String message=byteBuf.toString(Charset.forName("UTF-8"));
        if(StringUtils.isNotEmpty(message))
        {
            //TODO TCF Example 1 指定分隔符解决TCP拆包粘包问题
            /*String[] arrays=message.split("\n");
            
            if(arrays!=null && arrays.length>0)
            {
                System.out.println("服务器接收到的消息:");
                for(String str:arrays)
                {
                    System.out.println(str);
                }
            }*/
            
            //TODO TCF Example 2 基于Netty内置编码器解决TCP拆包粘包问题
            System.out.println("服务器接收到的消息:"+message);
            
            //TODO TCF 给客户端返回响应
            ctx.writeAndFlush(Unpooled.copiedBuffer("Response Message From NioServer"+count+"\n",Charset.forName("UTF-8")));
            count++;
        }
    }
    
}
 

package com.tcf.netty.demo.client;

import java.net.InetSocketAddress;

import com.tcf.netty.demo.client.handle.NettyClientEventListenHandle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/***
 * TODO TCF Netty客户端
 * @author Hasee
 *
 */
public class NettyClient {

    public static void main(String[] args) 
    {
        //TODO TCF 工作线程组
        NioEventLoopGroup workGroup=new NioEventLoopGroup();
        
        //TODO TCF Netty客户端
        Bootstrap client=new Bootstrap();
        client.group(workGroup)
              .channel(NioSocketChannel.class)
              .remoteAddress(new InetSocketAddress(8080))
              .handler(new ChannelInitializer<SocketChannel>() {

                //TODO TCF Netty客户端初始化事件-自定义初始化事件处理器
                @Override
                protected void initChannel(SocketChannel channel) throws Exception 
                {
                    channel.pipeline().addLast(new NettyClientEventListenHandle());
                }
                
            });
        
        try
        {
            //TODO TCF Netty客户端处理通道
            ChannelFuture channelFuture=client.connect().sync();
            channelFuture.channel().closeFuture().sync();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        finally 
        {
            //TODO TCF 关闭线程组,释放资源
            workGroup.shutdownGracefully();
        }
    }
    
}

 

package com.tcf.netty.demo.client.handle;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/***
 * TODO TCF Netty客户端-事件监听处理器
 * @author Hasee
 *
 */
public class NettyClientEventListenHandle extends SimpleChannelInboundHandler<ByteBuf> {

    //TODO TCF 接收服务器返回的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception 
    {
        //TODO TCF 服务器返回的数据
        String message=byteBuf.toString(Charset.forName("UTF-8"));
        if(StringUtils.isNotEmpty(message))
        {
            System.out.println(message);
        }
    }
    
    //TODO TCF 向服务器发送数据
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //TODO TCF Example 1 指定分隔符解决TCP拆包粘包问题
        /*for(int i=0;i<10;i++)
        {
            ByteBuf byteBuf=Unpooled.copiedBuffer("发送消息-"+i+"\n",Charset.forName("UTF-8"));
            ctx.writeAndFlush(byteBuf);
        }*/
        
        //TODO TCF Example 2 基于Netty服务器内置编码器解决TCP拆包粘包问题
        for(int i=0;i<10;i++)
        {
            ByteBuf byteBuf=Unpooled.copiedBuffer("发送消息-"+i+"\n",Charset.forName("UTF-8"));
            ctx.writeAndFlush(byteBuf);
        }
    }

}