《Scalable IO in Java》学习笔记--- Worker Thread Pools
代码大部分和Basic Reactor Design一样,只是handler中对读写进行了多线程处理
ExecutorService pool = Executors.newCachedThreadPool();新建一个线程池
pool.execute(new readProcesser());//线程池运行读线程
pool.execute(new sendProcesser());//线程池运行写线程
原文的io操作在创建线程之前,我将io操作写到了线程池中,防止Reactor线程堵塞
package douglea.WorkerThreadPools;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lzming
* @create 2019-01-29 9:51
*/
public class WorkerThreadPools {
public static void main(String[] args) throws IOException {
Thread thread=new Thread(new Reactor(8080));
thread.start();//这里新建了一个线程
//堵塞住防止程序运行完
}
}
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false); //非阻塞
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件
sk.attach(new Acceptor()); //attach callback object, Acceptor《=这里传入的是Acceptor
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next())); //Reactor负责dispatch收到的事件
selected.clear();
}
} catch (IOException ex) {
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
if (r != null)
r.run();//这里只运行,没有start新建线程
}
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) {
}
}
}
}
final class Handler implements Runnable {
private static final int MAXIN = 100;
private static final int MAXOUT = 100;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1,PROCESSING = 3;
ExecutorService pool = Executors.newCachedThreadPool();
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this); //《=将Handler作为callback对象
sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
sel.wakeup();
}
boolean inputIsComplete() {
return true;
}
boolean outputIsComplete() {
return true;
}
public void run() {
try {
if (state == READING)
read();
else if (state == SENDING)
send();
} catch (IOException ex) {
}
}
void read() throws IOException {
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new readProcesser());
}
}
void send() throws IOException {
if (outputIsComplete()) {
state = PROCESSING;
pool.execute(new sendProcesser());
}
}
class readProcesser implements Runnable {
synchronized public void run() {
try {
input.clear();
socket.read(input);
} catch (IOException e) {
e.printStackTrace();
}
if (inputIsComplete()) {
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
}
}
}
class sendProcesser implements Runnable {
synchronized public void run() {
try {
output=input;
output.flip();
socket.write(output);
} catch (IOException e) {
e.printStackTrace();
}
if (outputIsComplete()) {
state = READING;
sk.interestOps(SelectionKey.OP_READ); //第三步,接收write事件
}
}
}
}