Java 非阻塞 socket 通信
前几天我们公司C语言组的人需要一个基于socket的报文接收器来方便他们测试项目,我花了小半天给他们做了个多线程的socket服务端,我写好后在本机上测试后交给了他们,但是他们那边能连上服务器端,但是发送消息时没响应,后来我看到他们用的都是tcp/ip测试工具(一个c/s结构的socket调试工具)来测了,发送时我的后台也没有收到请求报文,但是他们的连接断了之后 我这边就收到了,又谷歌了一下,还真发现了问题,因为我的服务端 用的是serversocket类 这种是阻塞式的socket 当连接之后服务端就一直在读取流或者写出流 到缓存中,这种情况有两种方法解决,一是在客户端发送报文时末尾加个换行符,这种方法后来我试了下 好像是可以 但是客户端还是收不到实时的响应。第二种方法就是运用nio包下的非阻塞式的socket了(这种方法是最优的)。
非阻塞式socket 运用到了以下几个重要概念:
Selector:是 一个SelectableChannel对象的多路复用器,所有非阻塞式的channel都要注册到这个Selector上。
ServerSocketChannel:对应的是ServerSocket
SocketChannel:对应的是Socket
SelectionKey:这个就是来描述channel与selector之间注册关系的一个映射对象。
ByteBuffer:说这个首先要说一下channel与流的区别了,以前我们用的socket,读取写入数据用的都是流的形式,而nio包下的采用的都是块的形式,channel可以把一块数据映射到内存上,这样的话处理速度上内存速度肯定是优于流的,其实有点像操作系统的虚拟内存和分页存储。而ByteBuffer就是channel的一个容器,用来读和写数据用的,channel也只能通过ByteBuffer来处理数据。
如图所示是用NIO实现非阻塞服务端的示意图:
具体实现可以看以下的实例服务端:
public class NServer {
private Selector selector=null;
private Charset charset=Charset.forName("GBK");
/**
* ByteBuffer有几个重要概念:position、limit
* ByteBuffer.allocate(1024) 开辟一个1024字节的空间
* 当读取n个字节position就加n(从0开始),limit 默认是开辟的空间最大的值
* ByteBuffer.flip();重置position为0,limit为已用空间的最后一个值的索引(通常之后将是写数据的操作)
* ByteBuffer.clear();重置position为0,limit为开辟空间的最大的值(通常之后是重新读取数据的操作)
*/
private ByteBuffer buffer=ByteBuffer.allocate(1024);
private static final int PORT=3001;
public void init() throws IOException{
//新建selector
selector=Selector.open();
//新建serversocket
ServerSocketChannel server=ServerSocketChannel.open();
//绑定本地端口
server.socket().bind(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞模式
server.configureBlocking(false);
/**
* 注册至soket服务器至selector
* SelectionKey有4个状态:
* OP_READ 可读模式
* OP_WRITE 可写模式
* OP_CONNECT 可连接模式
* OP_ACCEPT 接受连接模式
* 4个状态可以累加
*/
server.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select()>0){
//遍历selector上的已注册的key并且处理key对应的channel
Iterator<SelectionKey> it=selector.selectedKeys().iterator();
while(it.hasNext()) {
SelectionKey sk=it.next();
if(sk.isAcceptable()){
//接受请求处理
SocketChannel socketChannel=server.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
sk.interestOps(SelectionKey.OP_ACCEPT);
}
if(sk.isReadable()){
//读取客户端数据处理
SocketChannel sc=(SocketChannel) sk.channel();
String content="";
try {
while(sc.read(buffer)>0){
//重置limit 为写数据做准备
buffer.flip();
content+=charset.decode(buffer);
System.out.println("==================读取的数据是:"+content);
sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
sk.cancel();
if(sk.channel()!=null)
sk.channel().close();
}
if(content.length()>0){
sk.attach(content);
}
//为读取数据做准备
buffer.clear();
}
if(sk.isValid()&&sk.isWritable()){
//写入客户端数据处理
SocketChannel sc=(SocketChannel) sk.channel();
String content=(String) sk.attachment();
sc.write(charset.encode(content));
sk.interestOps(SelectionKey.OP_READ);
}
//去除已经处理过的key
it.remove();
}
}
}
public static void main(String[] args){
try {
new NServer().init();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客户端可以用TCP/IP 测试工具 测试。