java 手写一个简易的线程池
工作闲暇时间,研究一下线程池
首先
1.先定义一个简单的线程池接口
ThreadPool.java,内容如下:
public interface ThreadPool<Job extends Runnable> { //执行一个Job,这个Job需要实现Runnable void execute(Job job); //关闭线程池 void shutdown(); //增减工作线程 void addWorkers(int num); //减少工作者线程 void removeWorker(int num); //得到正在等待执行的任务数量 int getJobSize(); }
2.线程池接口的默认实现
DefaultThreadPool.java,内容如下:
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //线程池最大限制数 private static final int MAX_WORKER_NUMBERS = 10; //线程池默认数量 private static final int DEFAULT_WORKER_NUMBERS = 5; //线程池最小的数量 private static final int MIN_WORKER_NUMBERS = 1; //这是一个工作列表, 将会向里面插入工作 private final LinkedList<Job> jobs = new LinkedList<>(); //工作者列表 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>()); //工作者线程的数量 private int workerNum = DEFAULT_WORKER_NUMBERS; //线程编号生成 private AtomicLong threadNum = new AtomicLong(); public DefaultThreadPool() { initializeWorkers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num<MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; initializeWorkers(workerNum); } //执行一个Job,这个Job需要实现Runnable @Override public void execute(Job job) { if(job != null){ //添加一个工作,然后进行通知 synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } } //关闭线程池 @Override public void shutdown() { for (Worker worker : workers ) { worker.shutdown(); } } //增减工作者线程 @Override public void addWorkers(int num) { synchronized (jobs){ //限制新增的Worker数量不能超过最大值 if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWorkers(num); this.workerNum += num; } }//减少工作者线程 @Override public void removeWorker(int num) { synchronized (jobs){ if(num >= this.workerNum){ throw new IllegalArgumentException("beyond workNum"); } //按照给定的数量停止Worker int count = 0; while (count < num){ Worker worker = workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } } this.workerNum -= count; } } //得到正在等待执行的任务数量 @Override public int getJobSize() { return jobs.size(); } //初始化线程工作者 private void initializeWorkers(int num){ for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); System.out.println("初始化了"+thread.getName()); thread.start(); } } //++++++++++++++++++++++++++++++++++++ 内部类++++++++++++++++++++++++++++++++++++++++++++++++++++++++ //工作者,负责消费任务 class Worker implements Runnable{ //是否工作 private volatile boolean running = true; @Override public void run() { while (running) { Job job = null; synchronized(jobs){ //如果工作者列表是空的,那么就wait while (jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部对WorkerThread 的中断操作, 返回 Thread.currentThread().interrupt(); return; } } //取出一个Job job = jobs.removeFirst(); } if(job != null){ try{ job.run(); System.out.println("------执行了一个job------"); }catch (Exception ex){ //忽略Job执行中的Exception } } } } public void shutdown(){ running = false; } } }
3.Job工作者创建
public class ClientTest implements Runnable{ private Integer i; public ClientTest(Integer i) { this.i = i; } @Override public void run() { System.out.println("我要变瘦"+i); } // public static void main(String[] args) { // DefaultThreadPool defaultThreadPool = new DefaultThreadPool(); // for (int i = 0; i < 10; i++) { // defaultThreadPool.execute(new ClientTest(i)); // System.out.println("初始化了,第" + i +"个Job!"); // } // // } }
4.线程测试
public class DefaultThreadPoolTest { public static void main(String[] args) { //线程池初始化 DefaultThreadPool defaultThreadPool = new DefaultThreadPool(6); //多线程任务交给线程池,并执行 for (int i = 0; i < 10; i++) { defaultThreadPool.execute(new ClientTest(i)); System.out.println("初始化了,第" + i +"个Job!"); } } }
执行结果如下:
纯属娱乐,可看出线程池的执行原理。