Java 非阻塞 socket 通信

前几天我们公司C语言组的人需要一个基于socket的报文接收器来方便他们测试项目,我花了小半天给他们做了个多线程的socket服务端,我写好后在本机上测试后交给了他们,但是他们那边能连上服务器端,但是发送消息时没响应,后来我看到他们用的都是tcp/ip测试工具(一个c/s结构的socket调试工具)来测了,发送时我的后台也没有收到请求报文,但是他们的连接断了之后 我这边就收到了,又谷歌了一下,还真发现了问题,因为我的服务端 用的是serversocket类 这种是阻塞式的socket 当连接之后服务端就一直在读取流或者写出流 到缓存中,这种情况有两种方法解决,一是在客户端发送报文时末尾加个换行符,这种方法后来我试了下 好像是可以 但是客户端还是收不到实时的响应。第二种方法就是运用nio包下的非阻塞式的socket了(这种方法是最优的)。

非阻塞式socket 运用到了以下几个重要概念:

Selector:是 一个SelectableChannel对象的多路复用器,所有非阻塞式的channel都要注册到这个Selector上。

ServerSocketChannel:对应的是ServerSocket

SocketChannel:对应的是Socket

SelectionKey:这个就是来描述channel与selector之间注册关系的一个映射对象。

ByteBuffer:说这个首先要说一下channel与流的区别了,以前我们用的socket,读取写入数据用的都是流的形式,而nio包下的采用的都是块的形式,channel可以把一块数据映射到内存上,这样的话处理速度上内存速度肯定是优于流的,其实有点像操作系统的虚拟内存和分页存储。而ByteBuffer就是channel的一个容器,用来读和写数据用的,channel也只能通过ByteBuffer来处理数据。

如图所示是用NIO实现非阻塞服务端的示意图:

Java 非阻塞 socket 通信

具体实现可以看以下的实例服务端:

 

public class NServer {
	private Selector selector=null;
	private Charset charset=Charset.forName("GBK");
	/**
	 * ByteBuffer有几个重要概念:position、limit
	 * ByteBuffer.allocate(1024) 开辟一个1024字节的空间
	 * 当读取n个字节position就加n(从0开始),limit 默认是开辟的空间最大的值
	 * ByteBuffer.flip();重置position为0,limit为已用空间的最后一个值的索引(通常之后将是写数据的操作)
	 * ByteBuffer.clear();重置position为0,limit为开辟空间的最大的值(通常之后是重新读取数据的操作)
	 */
	private ByteBuffer buffer=ByteBuffer.allocate(1024);
	private static final int PORT=3001;
	
	public void init() throws IOException{
		//新建selector
		selector=Selector.open();
		//新建serversocket
		ServerSocketChannel server=ServerSocketChannel.open();
		//绑定本地端口
		server.socket().bind(new InetSocketAddress("127.0.0.1", PORT));
		//设置非阻塞模式
		server.configureBlocking(false);
		/**
		 * 注册至soket服务器至selector
		 * SelectionKey有4个状态:
		 * OP_READ	可读模式
		 * OP_WRITE	可写模式
		 * OP_CONNECT	可连接模式
		 * OP_ACCEPT	接受连接模式
		 * 4个状态可以累加
		 */
		server.register(selector, SelectionKey.OP_ACCEPT);
		
		while(selector.select()>0){
			//遍历selector上的已注册的key并且处理key对应的channel
			Iterator<SelectionKey> it=selector.selectedKeys().iterator();
			while(it.hasNext()) {
				SelectionKey sk=it.next();
				if(sk.isAcceptable()){
					//接受请求处理
					SocketChannel socketChannel=server.accept();
					socketChannel.configureBlocking(false);
					socketChannel.register(selector, SelectionKey.OP_READ);
					sk.interestOps(SelectionKey.OP_ACCEPT);
				}
				if(sk.isReadable()){
					//读取客户端数据处理
					SocketChannel sc=(SocketChannel) sk.channel();
					String content="";
					try {
						while(sc.read(buffer)>0){
							//重置limit 为写数据做准备
							buffer.flip();
							content+=charset.decode(buffer);
							System.out.println("==================读取的数据是:"+content);
							sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
						}
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						sk.cancel();
						if(sk.channel()!=null)
							sk.channel().close();
					}
					if(content.length()>0){
						sk.attach(content);
					}
					//为读取数据做准备
					buffer.clear();
				}
				if(sk.isValid()&&sk.isWritable()){
					//写入客户端数据处理
					SocketChannel sc=(SocketChannel) sk.channel();
					String content=(String) sk.attachment();
					sc.write(charset.encode(content));
					sk.interestOps(SelectionKey.OP_READ);
				}
				//去除已经处理过的key
				it.remove();
			}
		}
	}
	public static void main(String[] args){
		try {
			new NServer().init();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

 

客户端可以用TCP/IP 测试工具 测试。