Java网络编程-IO多路复用(多线程)
1. 简述
IO复用网路模型中的角色Handler,将以状态模式(State Pattern)实现,
并且将Handler接收完网络消息后的业务逻辑处理交由工作线程实现,所有的工作线程以线程池来维护。
2. 网络模型
3. 代码实现
[TCPReactor.Java]
- // Reactor線程
- package server;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.util.Iterator;
- import java.util.Set;
- public class TCPReactor implements Runnable {
- private final ServerSocketChannel ssc;
- private final Selector selector;
- public TCPReactor(int port) throws IOException {
- selector = Selector.open();
- ssc = ServerSocketChannel.open();
- InetSocketAddress addr = new InetSocketAddress(port);
- ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
- ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
- SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
- sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象
- }
- @Override
- public void run() {
- while (!Thread.interrupted()) { // 在線程被中斷前持續運行
- System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
- try {
- if (selector.select() == 0) // 若沒有事件就緒則不往下執行
- continue;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
- Iterator<SelectionKey> it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
- it.remove();
- }
- }
- }
- /*
- * name: dispatch(SelectionKey key)
- * description: 調度方法,根據事件綁定的對象開新線程
- */
- private void dispatch(SelectionKey key) {
- Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
- if (r != null)
- r.run();
- }
- }
[Acceptor.java]
- // 接受連線請求線程
- package server;
- import java.io.IOException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- public class Acceptor implements Runnable {
- private final ServerSocketChannel ssc;
- private final Selector selector;
- public Acceptor(Selector selector, ServerSocketChannel ssc) {
- this.ssc=ssc;
- this.selector=selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel sc= ssc.accept(); // 接受client連線請求
- System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
- if(sc!=null) {
- sc.configureBlocking(false); // 設置為非阻塞
- SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key
- selector.wakeup(); // 使一個阻塞住的selector操作立即返回
- sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- // Handler線程
- package server;
- import java.io.IOException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class TCPHandler implements Runnable {
- private final SelectionKey sk;
- private final SocketChannel sc;
- private static final int THREAD_COUNTING = 10;
- private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
- THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>()); // 線程池
- HandlerState state; // 以狀態模式實現Handler
- public TCPHandler(SelectionKey sk, SocketChannel sc) {
- this.sk = sk;
- this.sc = sc;
- state = new ReadState(); // 初始狀態設定為READING
- pool.setMaximumPoolSize(32); // 設置線程池最大線程數
- }
- @Override
- public void run() {
- try {
- state.handle(this, sk, sc, pool);
- } catch (IOException e) {
- System.out.println("[Warning!] A client has been closed.");
- closeChannel();
- }
- }
- public void closeChannel() {
- try {
- sk.cancel();
- sc.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- public void setState(HandlerState state) {
- this.state = state;
- }
- }
[HandlerState.java]
- package server;
- import java.io.IOException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.ThreadPoolExecutor;
- public interface HandlerState {
- public void changeState(TCPHandler h);
- public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
- ThreadPoolExecutor pool) throws IOException ;
- }
- package server;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.ThreadPoolExecutor;
- public class ReadState implements HandlerState{
- private SelectionKey sk;
- public ReadState() {
- }
- @Override
- public void changeState(TCPHandler h) {
- // TODO Auto-generated method stub
- h.setState(new WorkState());
- }
- @Override
- public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
- ThreadPoolExecutor pool) throws IOException { // read()
- this.sk = sk;
- // non-blocking下不可用Readers,因為Readers不支援non-blocking
- byte[] arr = new byte[1024];
- ByteBuffer buf = ByteBuffer.wrap(arr);
- int numBytes = sc.read(buf); // 讀取字符串
- if(numBytes == -1)
- {
- System.out.println("[Warning!] A client has been closed.");
- h.closeChannel();
- return;
- }
- String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
- if ((str != null) && !str.equals(" ")) {
- h.setState(new WorkState()); // 改變狀態(READING->WORKING)
- pool.execute(new WorkerThread(h, str)); // do process in worker thread
- System.out.println(sc.socket().getRemoteSocketAddress().toString()
- + " > " + str);
- }
- }
- /*
- * 執行邏輯處理之函數
- */
- synchronized void process(TCPHandler h, String str) {
- // do process(decode, logically process, encode)..
- // ..
- h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
- this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
- this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
- }
- /*
- * 工作者線程
- */
- class WorkerThread implements Runnable {
- TCPHandler h;
- String str;
- public WorkerThread(TCPHandler h, String str) {
- this.h = h;
- this.str=str;
- }
- @Override
- public void run() {
- process(h, str);
- }
- }
- }
- package server;
- import java.io.IOException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.ThreadPoolExecutor;
- public class WorkState implements HandlerState {
- public WorkState() {
- }
- @Override
- public void changeState(TCPHandler h) {
- // TODO Auto-generated method stub
- h.setState(new WriteState());
- }
- @Override
- public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
- ThreadPoolExecutor pool) throws IOException {
- // TODO Auto-generated method stub
- }
- }
- package server;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.ThreadPoolExecutor;
- public class WriteState implements HandlerState{
- public WriteState() {
- }
- @Override
- public void changeState(TCPHandler h) {
- // TODO Auto-generated method stub
- h.setState(new ReadState());
- }
- @Override
- public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
- ThreadPoolExecutor pool) throws IOException { // send()
- // get message from message queue
- String str = "Your message has sent to "
- + sc.socket().getLocalSocketAddress().toString() + "\r\n";
- ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
- while (buf.hasRemaining()) {
- sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
- }
- h.setState(new ReadState()); // 改變狀態(SENDING->READING)
- sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
- sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
- }
- }
最後是主程序
[Main.java]
- package server;
- import java.io.IOException;
- public class Main {
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- try {
- TCPReactor reactor = new TCPReactor(1333);
- reactor.run();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
相关推荐
- 超过1000多程序员面试经历,收集了阿里巴巴面试3年总结的108道Java面试题:MySQL+redis+计算机网络+操作系统+Java编程+架构设计
- java之文件与多线程的简单编程
- Java 并发编程 多线程的交互模式
- Java多线程编程-(3)-从一个错误的双重校验锁代码谈一下volatile关键字
- Java多线程编程-(1)-线程安全和锁Synchronized概念
- Java多线程编程-(1)-线程安全和锁Synchronized概念
- Java多线程编程-(1)-线程安全和锁Synchronized概念
- 给学校公司同学出的一些题目---java, jquery, spring, springmvc, mybatis, 网络编程, linux, mysql(持续更新)
- Java编程思想——多线程的三大核心源码层解密
- 《java多线程编程实战指南 核心篇》读书笔记三
- 云计算网络知识学习-linux网络基础
- Android的网络编程