Java—IO机制、BIO、NIO、AIO
BIO用厕所做比例 一直等待别人厕所上完 ,自己再去
AIO就是 别人上厕所,别人通知你上上完,你在别人上厕所的同时,可以在外面抽烟,做点其他事情,等待
NIO就是 在厕所里面 一直看着别人那里空位 直接上厕所
BIO
Block-IO:InputStream和OutputStream,Reade和Writer
package com.binglian.IO;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOPlainEchoServer {
public void server(int port) throws IOException{
//将ServerSoket绑定在指定的端口里
final ServerSocket socket=new ServerSocket(port);
while(true){
//堵塞直到收到新的客户端连接
final Socket clientSocket=socket.accept();
System.out.println("Accepted connection from"+clientSocket);
//创建一个子线程去处理客户端的请求
new Thread(new Runnable() {
public void run() {
try(BufferedReader reader=new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))){
PrintWriter writer=new PrintWriter(clientSocket.getOutputStream(),true);
//从客户端读取数据并原封不动回写回去
while(true){
writer.println(reader.readLine());
writer.flush();
}
}catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
public void improvedService(int port) throws IOException{
//将ServerSocket绑定指定的端口里
final ServerSocket socket=new ServerSocket(port);
//创建一个线程池
ExecutorService executorService=Executors.newFixedThreadPool(6);
while(true){
//堵塞直到收到新的客户端连接
final Socket clientSocket=socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//将请求提交给线程池去执行
executorService.execute(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
//从客户端读取数据并原封不动回写回去
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
NIO
NonBlock-IO:构建多路复用的、同步非堵塞的IOC操作
这里更多详情可以看下https://blog.****.net/pjmike233/article/details/82254000
通道Channel
缓冲区 Buffer
多路复用器 Selector
通道Channel
Channel是一个通道,它就像自来水管一样,网络数据通过Channel读取和写入
通道与流的不同之处在于:
通道是双向的,既可以从通道中读取数据,也可以写数据到通道,而流的读写通常是单向的,它只是在一个方向上移动
通道可以异步地读写
通道中的数据总是要先读到一个缓冲区Buffer,或者总是从一个Buffer写入channel_buffer
Channel可以写入Buffer
Buffer也可以读Channel
NIO-Channels(通道)
FileChannel
FileChannel 从文件中读取数据,也可以将数据写到文件中,FileChannel无法设置非堵塞模式,它总是运行在堵塞模式下,
DatagramChannel
DatagramChannel通过UDP读写网络中的数据
SocketChannel
SocketChannel 通过TCP读写网络中的数据
ServerSocketChannel
可以监听新进来的TCP连接,像web服务器那样,堆每一个新进来的连接都会创建一个SocketChannel
SocketChannel和ServerSocketChannel就对应 传统网络编程中的Socket类和ServerSocket类
通道与缓冲区的例子
package com.binglian.IO;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class ChannelTest {
public static void main(String[] args) throws IOException{
RandomAccessFile accessFile=new RandomAccessFile("C:\\Users\\binglian\\Desktop\\api1.txt", "rw");
//打开FileChannel
FileChannel fileChannel=accessFile.getChannel();
ByteBuffer byteBuffer=ByteBuffer.allocate(48);
//从通道里读取数据到缓冲区
int bytesRead=fileChannel.read(byteBuffer);
while(bytesRead !=-1){
System.out.println("Read:"+bytesRead);
//反转
byteBuffer.flip();
//从缓冲区中读取数据
while(byteBuffer.hasRemaining()){
System.out.println((char)byteBuffer.get());
}
byteBuffer.clear();
bytesRead=fileChannel.read(byteBuffer);
}
accessFile.close();
}
}
在使用 FileChannel 之前,必须先打开它,我们无法直接打开它,需要通过一个InputStream,OutputStream或者RandomAccessFile 来打开它
从 FileChannel 中读取数据,先分配一个Buffer(关于Buffer的介绍参见下文),调用 FileChannel 的read()方法,该方法返回的 int 值表示了有多少字节被读到了 Buffer 中
缓冲区 Buffer
在NIO库中,数据是从通道读入到缓冲区,从缓冲区写入到通道中的。
缓冲区本质上是一块可以写入的数据,然后可以从读取数据的内存。这块内存被封装成了 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
Buffer的类型
Java NIO有以下几种Buffer 类型
buffer
- ByteBuffer: 字节缓冲区
- MappedByteBuffer: 用于实现内存映射文件
- CharBuffer: 字符缓冲区
- ShortBuffer: 短整型缓冲区
- IntBuffer: 整型缓冲区
- LongBuffer: 长整形缓冲区
- FloatBuffer: 浮点型缓冲区
- DoubleBuffer: 双精度浮点型缓冲区
多路复用器 Selector
Selector 是Java NIO实现多路复用的基础,简单的讲,Selector 会不断地轮询注册在其上的 Channel,如果某个Channel 上面发生读或者写事件,这个Channel 就处于就绪状态,会被Selector轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
这样,一个单独的线程可以管理多个 Channel ,从而管理多个网络连接,跟 I/O多路复用模型思想一样。
package com.binglian.IO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOPlainEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//将ServerSocket绑定到指定的端口里
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
//将channel注册到Selector里,并说明让Selector关注的点,这里是关注建立连接这个事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
try {
//阻塞等待就绪的Channel,即没有与客户端建立连接前就一直轮询
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
//代码省略的部分是结合业务,正确处理异常的逻辑
break;
}
//获取到Selector里所有就绪的SelectedKey实例,每将一个channel注册到一个selector就会产生一个SelectedKey
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//将就绪的SelectedKey从Selector中移除,因为马上就要去处理它,防止重复执行
iterator.remove();
try {
//若SelectedKey处于Acceptable状态
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//接受客户端的连接
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
//像selector注册socketchannel,主要关注读写,并传入一个ByteBuffer实例供读写缓存
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
}
//若SelectedKey处于可读状态
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
//从channel里读取数据存入到ByteBuffer里面
client.read(output);
}
//若SelectedKey处于可写状态
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
//将ByteBuffer里的数据写入到channel里
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
AIO如何进一步加工处理结果
基于回调:实现CompletionHandler接口,调用时触发会调函数
返回Futrue:通过isDone()查看是否准备好,通过get()等待返回数据
package com.binglian.IO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AIOPlainEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
// 将ServerSocket绑定到指定的端口里
serverChannel.bind(address);
final CountDownLatch latch = new CountDownLatch(1);
// 开始接收新的客户端请求. 一旦一个客户端请求被接收, CompletionHandler 就会被调用.
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
// 一旦完成处理,再次接收新的客户端请求
serverChannel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(100);
// 在channel里植入一个读操作EchoCompletionHandler,一旦buffer有数据写入,EchoCompletionHandler 便会被唤醒
channel.read(buffer, buffer, new EchoCompletionHandler(channel));
}
@Override
public void failed(Throwable throwable, Object attachment) {
try {
// 若遇到异常,关闭channel
serverChannel.close();
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;
EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
// 在channel里植入一个读操作CompletionHandler,一旦channel有数据写入,CompletionHandler 便会被唤醒
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
// 如果buffer里还有内容,则再次触发写入操作将buffer里的内容写入channel
channel.write(buffer, buffer, this);
} else {
buffer.compact();
// 如果channel里还有内容需要读入到buffer里,则再次触发写入操作将channel里的内容读入buffer
channel.read(buffer, buffer, EchoCompletionHandler.this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
}
}