netty之helloworld示例

介绍

Netty是基于NIO,实现了对NIO的封装。netty常用来做RPC通信,对于传统的NIO而言,API的使用及其不便,在每次读写后都需要flip()操作才能保证整个字节数组的正确读写,因此Netty作者对此实现了封装,他希望netty用户专注于业务代码,而其余部分交给netty来实现。
关于netty入门的代码在我的github上可以下载。

Netty设计

NIO单线程模型
netty之helloworld示例
NIO线程池模型
netty之helloworld示例
Netty模型
netty之helloworld示例
ChannelHandlerContext:
netty之helloworld示例
netty的操作的基于Future模型,通过callable实现异步。
有了上述几个模型,基本可以模拟Netty实现对NIO的简单封装,目标是将handler和通信部分分离,如有机会,笔者会在后续的博客中模拟Netty实现对NIO的一个简单封装。

helloworld

netty代码诸如一致,其只需要用户编写自己的handler对业务代码进行处理,其余部分大体相同。
服务端

package netty.helloword;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup boss = new NioEventLoopGroup();
		EventLoopGroup worker = new NioEventLoopGroup();

		try{
			ServerBootstrap bootstrap = new ServerBootstrap();				//Server配置器
			bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)		
			.option(ChannelOption.SO_BACKLOG, 1024)			//设置最多等待建立连接数
			.option(ChannelOption.SO_BACKLOG, 32 * 1024)	//设置发送缓冲大小
			.option(ChannelOption.SO_RCVBUF, 32 * 1024)		//设置接收缓冲大小
			.childOption(ChannelOption.SO_KEEPALIVE, true)		//保持连接
			.childHandler(new ChannelInitializer<SocketChannel>() {		//增加监听
				@Override
				protected void initChannel(SocketChannel sc) throws Exception {
					sc.pipeline().addLast(new ServerHandler());			//设置业务处理类
				}
			});

			ChannelFuture cf1 = bootstrap.bind(9999).sync();	//bind()异步返回Future通过sync()同步,参考Future模式
			cf1.channel().closeFuture().sync();					//为了让程序监听,这里需要保持程序运行,相当于一个死循环
		}finally{
			boss.shutdownGracefully();
			worker.shutdownGracefully();			
		}
	}
}

ServerHandler

package netty.helloword;

import java.io.UnsupportedEncodingException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws UnsupportedEncodingException{
		byte[]req = new byte[msg.readableBytes()];
		msg.readBytes(req);
		String body = new String(req,"utf-8");
		System.out.println("server : " + body);
		
		String response = "Hi Client";
		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));			//会自动释放Buf
	}
}

Client

package netty.helloword;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	public static void main(String[] args) {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();		//创建客户端配置器
			bootstrap.group(group);						//加入到NIO线程池

			bootstrap.channel(NioSocketChannel.class);
			bootstrap.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new ClientHandler());
				}
			});
			ChannelFuture cf = bootstrap.connect("127.0.0.1",9999).sync();//连接
			cf.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally{
			group.shutdownGracefully();
		}				
	}
}

ClientHandler

package netty.helloword;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
	//收到数据时触发
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
			throws Exception {
			byte buf[] = new byte[msg.readableBytes()];
			msg.readBytes(buf);
			System.out.println(new String(buf,"utf-8"));
			ReferenceCountUtil.release(msg);
	}
	
	//通道**时触发
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ctx.writeAndFlush(Unpooled.copiedBuffer("hello Server!",CharsetUtil.UTF_8));
	}
	
	//异常直接关闭通道
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();			//打印异常
		ctx.close();
	}
}