学习笔记之IO和Netty
I/O和Netty
(个人学习笔记,如有错误欢迎指正!!!)
BIO和NIO
BIO即阻塞I/O,不管是磁盘I/O还是网络I/O,数据在读写的过程中都有可能被阻塞,一旦阻塞,线程便会失去CPU的使用权,这对于在大规模访问量和有性能要求的情况下是不能被接受的。如果一个客户端对应一个线程处理,出现阻塞时可以阻塞一个线程而不影响其他的线程。同样为了减少线程开销,可以采用线程池。但是如果需要同时保持很多长连接,该方法同样是不可行的,因为我们不可能同时创建非常多的线程来保持连接。
NIO典型实例:
public void selector() throws Exception{
ByteBuffer buffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(8080));
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
if((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT){
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
it.remove();
}else if((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ){
SocketChannel sc = (SocketChannel)key.channel();
while(true){
buffer.clear();
int n = sc.read(buffer);
if(n <= 0){
break;
}
buffer.flip();
}
it.remove();
}
}
}
}
主要过程便是先注册Selector,然后将服务器的接受请求注册到Selector上,然后之后Selector进行轮询,如果事件类型是Accept,则将Read事件注册到Selector,如果事件类型为Read,则读取相应Channel的数据,并将Channel取消,防止重复读取。
Select、poll和epoll
- select
select仅仅知道有I/O事件发生,但是不知道具体是哪一个,所以select需要轮询所有的流,所以事件复杂度为O(n),select的轮询操作是在内核空间进行的,所以每次轮询操作都需要将数据从用户空间复制到内核空间,其中select采用数组的形式碱性数据保存,所以有上限,32位机默认位1024,64位机默认为2048。
缺点:每次调用select,都需要将fd集合拷贝到内核;并且需要遍历所有的fd,开销大;同时select支持的文件描述符数量小,默认位1024。 - poll
该方式本质上和select没有太大区别,同样需要将用户数据拷贝到内核空间,于select不同的是存储方式是采用链表的方式进行保存,所有没有存储上限。该方式也不是一直轮询,而是当有设备就绪或者主动超时,则唤醒poll线程进行遍历,如果没有,则该线程挂起。 - epoll
epoll会把注册新的事件时,会把fd拷贝到内核,不需要重复拷贝。此外,该方式不需要轮询所有的fd,当设备就绪时,会调用一个回调函数,并将fd添加到一个等待队列中,并唤醒epoll线程,该线程只需要查看等待队列中有没有准备就绪的fd,所以时间复杂度位O(1)。
缓冲区Buffer
索引 | 说明 |
---|---|
capacity | 缓冲区数组的总长度 |
position | 下一个要操作的数组元素的位置 |
limit | 缓冲区数组中不可操作的下一个元素的位置,limit<=capacity |
mark | 用于记录当前position的前一个位置或者默认是0 |
- 当调用
ByteBuffer.allocate(11)
方法创建缓冲区时,即初始状态,position位置为0,capacity和limit默认都是数组长度。 - 当调用
filp()
方法后,假如之前已经写入了5个字节的数据,则状态会由position位置为5,capacity和limit位置为11,变为position位置为0,limit位置为5,capacity位置为11。(注意,起始位置为0,不是1) - 当调用
clear()
方法后,各个索引的状态会回到第一条的初始状态。 - 当调用
mark()
方法后,mark会记录当前position上一个位置,,当调用reset()
方法时,position回复到mark记录的值的位置。
堆缓冲区和直接缓冲区
堆缓冲区 | 直接缓冲区 | |
---|---|---|
存储位置 | Java Heap中 | Native内存中 |
I/O | 需要在用户地址空间和操作系统内核地址空间复制数据 | 不需要复制 |
内存管理 | Java GC回收,创建和回收开销少 | 通过调用System.gc()释放掉Java对象引用的直接缓冲器占用的内存空间,如果Java对象长时间持有引用可能导致Native内存泄漏;创建和回收内存开销大 |
适用场景 | 并发连接数较少,I/O操作较少 | 数据量较大、生命周期比较长的情况下比较合适 |
Netty介绍
Netty是一个高性能,异步事件驱动的NIO框架,它提供了堆TCP、UDP和文件传输的支持。
NIO原生是同步非阻塞I/O模型
Netty是在NIO的基础上,实现的异步非阻塞I/O模型
异步体现在使用线程池来处理业务逻辑。
相关概念:
-
同步:相对于IO操作而言,在同一时间,只能完成一个操作。
-
异步:相对于IO操作而言,在同一时间,同时完成多个操作。
-
阻塞:相对于数据而言,判断数据有没有准备好,如果没有准备好就一直等待。
-
非阻塞:不管数据有没有准备好,都会有反馈给客户端,使客户端不用一直等待。
特点:
- 并发高:体现在采用NIO的I/O模型
- 传输快:采用直接内存(零拷贝)
- 封装好:编写代码量少
零拷贝
-
通常对数据处理需要复制到JVM中,零拷贝是指在接受和发送的ByteBuffer采用直接缓冲区,使用堆外直接内存进行Socket读写。
-
提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以向操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个buffer合并成一个大的buffer。
-
文件传输中采用transferTo方法,可以直接将文件缓冲区中的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝的问题。
Reactor线程模型(反应堆)
常用的线程模型:
(1)Reactor单线程模型
单线程模型与传统的NIO模型非常类似,只是将具体的处理过程放置到Handler中进行处理。
其中Reactor线程即负责接受客户端链接,同时还需要轮询所有的Channel,调用相应的Handler对数据进行处理。
(2)Reactor多线程模型
由于单线程模型的性能还是有待提高,如果客户端请求频繁,单线程的处理能力就显得有些不足。所以引入了多线程模型。
多线程模型中,Reactor线程同样处理客户端的连接请求,并且轮询所有的Channel,但是对数据的处理不是在Reactor线程中进行的,而是通过转交给线程池,有线程池完成数据的处理操作。
(3)主从Reactor多线程模型(epoll模型设计思想)
由于Reactor多线程模型中,Reactor线程既需要处理客户端连接请求,也需要轮询所有的Channel,所以当客户端进一步增多,Reactor线程的处理能力便会下降。
主从模型中便是将连接请求和轮询操作进行分离,分别由两个线程进行处理,进一步提高数据处理能力。
使用实例
public void start(int port)throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel client) throws Exception {
client.pipeline().addLast(new HttpResponseEncoder());
client.pipeline().addLast(new HttpRequestDecoder());
client.pipeline().addLast(new CTomcatHandler());
}
}).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture f = server.bind(port).sync();
System.out.println("Netty is started!!!" + port);
f.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}