java 手写一个简易的线程池

工作闲暇时间,研究一下线程池

首先

1.先定义一个简单的线程池接口 

ThreadPool.java,内容如下:
public interface ThreadPool<Job extends Runnable> {

    //执行一个Job,这个Job需要实现Runnable
    void execute(Job job);

    //关闭线程池
    void shutdown();

    //增减工作线程
    void addWorkers(int num);

    //减少工作者线程
    void removeWorker(int num);

    //得到正在等待执行的任务数量
    int getJobSize();
}

 2.线程池接口的默认实现

DefaultThreadPool.java,内容如下:
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    //线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    //线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    //这是一个工作列表, 将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    //工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
       workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num<MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }

    //执行一个Job,这个Job需要实现Runnable
    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作,然后进行通知
            synchronized (jobs){
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    //关闭线程池
    @Override
    public void shutdown() {
        for (Worker worker : workers ) {
            worker.shutdown();
        }
    }

    //增减工作者线程
    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            //限制新增的Worker数量不能超过最大值
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }
    //减少工作者线程
    @Override
    public void removeWorker(int num) {
        synchronized (jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止Worker
            int count = 0;
            while (count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    //得到正在等待执行的任务数量
    @Override
    public int getJobSize() {
        return jobs.size();
    }

    //初始化线程工作者
    private void initializeWorkers(int num){
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            System.out.println("初始化了"+thread.getName());
            thread.start();
        }
    }

    //++++++++++++++++++++++++++++++++++++ 内部类++++++++++++++++++++++++++++++++++++++++++++++++++++++++

    //工作者,负责消费任务
    class Worker implements Runnable{

        //是否工作
        private volatile boolean running = true;

        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized(jobs){
                    //如果工作者列表是空的,那么就wait
                    while (jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对WorkerThread 的中断操作, 返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个Job
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try{
                        job.run();
                        System.out.println("------执行了一个job------");
                    }catch (Exception ex){
                        //忽略Job执行中的Exception
                    }
                }
            }
        }

        public void shutdown(){
            running = false;
        }
    }
}

3.Job工作者创建

 

public class ClientTest implements  Runnable{

    private Integer i;

    public ClientTest(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        System.out.println("我要变瘦"+i);
    }

//    public static void main(String[] args) {
//        DefaultThreadPool defaultThreadPool = new DefaultThreadPool();
//        for (int i = 0; i < 10; i++) {
//            defaultThreadPool.execute(new ClientTest(i));
//            System.out.println("初始化了,第" + i +"个Job!");
//        }
//
//    }

}

4.线程测试 

public class DefaultThreadPoolTest {

    public static void main(String[] args) {
        //线程池初始化
        DefaultThreadPool defaultThreadPool = new DefaultThreadPool(6);

        //多线程任务交给线程池,并执行
        for (int i = 0; i < 10; i++) {
            defaultThreadPool.execute(new ClientTest(i));
            System.out.println("初始化了,第" + i +"个Job!");
        }
    }
}

执行结果如下:

java 手写一个简易的线程池 

纯属娱乐,可看出线程池的执行原理。