java线程池原理

我们知道当我们要使用线程的时候就去创建一个线程,这样会很方便,但是这样会有一问题:
每创建一个线程,就要和操作系统进行交互,CPU会为每个线程分配资源,但是CPU的资源是有限的,如果在大量高并发的环境下,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,损耗CPU的性能,因为频繁创建线程和销毁线程需要时间。线程的创建需要开辟虚拟机栈,本地方法栈,程序计数器等线程线程私有的内存空间,在线程销毁时又要销毁这些资源,使用线程池就可以解决这个问题。
线程池的作用
(1)利用线程池管理并复用线程,控制最大并发数,通过重复利用已创建的线程,降低了创建和销毁线程的造成的系统资源消耗。
(2) 提高响应速度:当任务到达时,不需要等到线程创建就能立即执行。
(3)提高线程的可管理性: 线程是稀缺资源,如果过多的创建不仅消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优,监控。
线程池的工作流程如下:
java线程池原理
线程池(ThreadPollExecutor)主要的参数
(1)corePollSize: 核心线程数,这个值非常关键,设置过大会浪费资源,过小会导致线程频繁的创建或销毁。
(2)maxinumPollSize:线程池能够容纳最大线程数。如果corePollSize=maxinumPollSize,表示固定大小线程池。
(3)keepAliveTime:线程允许空闲时间,当空闲时间达到keepAliveTime时,线程会被销毁,直到只剩下corePoolSize个线程。避免资源浪费,在默认情况下,当线程池的数量大于corePoolSize时,keepAliveTime才会起作用。但是当TreadPoolExecutor的allowCoreThreadTimeOut=true时,核心线程超时也回被回收。
(4)WorkQueue:任务缓存队列,当请求的线程数大于maxinumPoolSize时,任务进入缓存队列。
(5)handler:拒绝策略的对象,当任务缓存队列已满时,可以通过饱和策略处理请求。
饱和策略主要有四种:
(1)AbortPolicy:默认的饱和策略,该策略会抛出未检查异常RejectedExecutionExeception,调用者可以捕获这个异常,根据自己的需求编写代码;
(2)**CallerRunsPolicy:**该策略提供了一种调节机制,将任务的运行回退到任务调用者,在提交任务的线程中执行该任务。
(3)Discard:新提交的任务被抛弃。
(4)DiscardOldest策略:队列队头的任务被抛弃
测试代码如下:

public class ThreadRejectedPolicyTest {
    private static final int Thread_Size=2;//线程池大小
    private static final int Task_Size=5;//任务数量
    //饱和策略
    private static final int ABORT_POLICY=0;//AbortPolicy
    private static final int CALLERRUN_POLICY=1;//CallerRunPolicy;
    private static final int DISCARD_POLICY=2;
    private static final int DISCARDOLD_POLICY=3;
    private static int select_Policy=ABORT_POLICY;//策略选择变量,改这个值选择不同的策略
    public static void main(String[] args) {
        BlockingQueue bq=new LinkedBlockingDeque(1);//阻塞队列只能容纳一个任务
        ThreadPoolExecutor tpe=new ThreadPoolExecutor(Thread_Size,Thread_Size,0L,TimeUnit.SECONDS,bq);//创建一个固定线程的线程池
        tpe.setRejectedExecutionHandler(getPolicy(DISCARDOLD_POLICY));//设置线程池的饱和策略
        for(int i=0;i<Task_Size;i++){
            tpe.submit(new Task(i));
        }
        tpe.shutdown();//关闭线程池
    }

private static RejectedExecutionHandler getPolicy(int select_policy) {
    switch (select_policy){
        case ABORT_POLICY:
            return new ThreadPoolExecutor.AbortPolicy();
        case CALLERRUN_POLICY:
            return new ThreadPoolExecutor.CallerRunsPolicy();
        case DISCARD_POLICY:
            return new ThreadPoolExecutor.DiscardPolicy();
        case DISCARDOLD_POLICY:
            return new ThreadPoolExecutor.DiscardOldestPolicy();
            default:
                break;
    } return new ThreadPoolExecutor.AbortPolicy();//默认策略
}
static class Task implements Runnable{
private int TASK_ID;
public Task(int TASK_ID){
    this.TASK_ID=TASK_ID;
}
    @Override
    public void run() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(TASK_ID+"is running"+Thread.currentThread().getName());
    }
}

}

自己简单实现的线程池代码如下

//实现固定数量线程池  execute shutDown
public class MyThreadPoll {
    private int pollSize;//当前线程数
    private int corePollSize;//核心线程数
    private int maxPollSize;//最大线程数
    private List<Thread> threadList;//存放线程的容器
    private BlockingQueue<Runnable> workQueue;//任务队列
    private volatile  boolean isRunning=true;//标记调用了shutDown;
    private volatile boolean shutDown=false;
    public MyThreadPoll(int corePollSize){
        this.pollSize=0;
        this.corePollSize=corePollSize;
        this.maxPollSize=corePollSize;
        this.threadList=new ArrayList<>(corePollSize);
        this.workQueue=new LinkedBlockingQueue();
    }
    public void MyExecute(Runnable runnable) throws InterruptedException {
        if(runnable==null){
            throw new NullPointerException();
        }
        //核心线程数未满
        if (pollSize < corePollSize) {
            //创建线程执行任务
            addThread(runnable);
        } else {//核心线程数已满,任务放入任务队列
            workQueue.put(runnable);
            }
            }
     //创建线程
    private void addThread(Runnable runnable) throws InterruptedException {
        pollSize++;
        //任务放入任务队列
        ThreadPollWork threadPollWork=new  ThreadPollWork(runnable);
        Thread t=new Thread(threadPollWork);
        threadList.add(t);//线程存入容器
        t.start();
    }
    class ThreadPollWork implements Runnable {
        public ThreadPollWork(Runnable runnable) throws InterruptedException {
             workQueue.offer(runnable);
        }
    @Override
    public void run() {
        while(isRunning||!shutDown){
            Runnable task= null;
            if(isRunning==false){
                Thread.interrupted();
            }
            try {
                task = workQueue.take();
                task.run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        if(isRunning==false){//保证当前线程执行完
            shutDown=true;
        }

    }


}
//关闭线程池
public void shutDown(){
    isRunning=false;
    if(!threadList.isEmpty()){
        for(Thread t:threadList){
            t.interrupt();
        }
    }
    shutDown=true;
}

}