java线程池原理
我们知道当我们要使用线程的时候就去创建一个线程,这样会很方便,但是这样会有一问题:
每创建一个线程,就要和操作系统进行交互,CPU会为每个线程分配资源,但是CPU的资源是有限的,如果在大量高并发的环境下,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,损耗CPU的性能,因为频繁创建线程和销毁线程需要时间。线程的创建需要开辟虚拟机栈,本地方法栈,程序计数器等线程线程私有的内存空间,在线程销毁时又要销毁这些资源,使用线程池就可以解决这个问题。
线程池的作用
(1)利用线程池管理并复用线程,控制最大并发数,通过重复利用已创建的线程,降低了创建和销毁线程的造成的系统资源消耗。
(2) 提高响应速度:当任务到达时,不需要等到线程创建就能立即执行。
(3)提高线程的可管理性: 线程是稀缺资源,如果过多的创建不仅消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优,监控。
线程池的工作流程如下:
线程池(ThreadPollExecutor)主要的参数
(1)corePollSize: 核心线程数,这个值非常关键,设置过大会浪费资源,过小会导致线程频繁的创建或销毁。
(2)maxinumPollSize:线程池能够容纳最大线程数。如果corePollSize=maxinumPollSize,表示固定大小线程池。
(3)keepAliveTime:线程允许空闲时间,当空闲时间达到keepAliveTime时,线程会被销毁,直到只剩下corePoolSize个线程。避免资源浪费,在默认情况下,当线程池的数量大于corePoolSize时,keepAliveTime才会起作用。但是当TreadPoolExecutor的allowCoreThreadTimeOut=true时,核心线程超时也回被回收。
(4)WorkQueue:任务缓存队列,当请求的线程数大于maxinumPoolSize时,任务进入缓存队列。
(5)handler:拒绝策略的对象,当任务缓存队列已满时,可以通过饱和策略处理请求。
饱和策略主要有四种:
(1)AbortPolicy:默认的饱和策略,该策略会抛出未检查异常RejectedExecutionExeception,调用者可以捕获这个异常,根据自己的需求编写代码;
(2)**CallerRunsPolicy:**该策略提供了一种调节机制,将任务的运行回退到任务调用者,在提交任务的线程中执行该任务。
(3)Discard:新提交的任务被抛弃。
(4)DiscardOldest策略:队列队头的任务被抛弃
测试代码如下:
public class ThreadRejectedPolicyTest {
private static final int Thread_Size=2;//线程池大小
private static final int Task_Size=5;//任务数量
//饱和策略
private static final int ABORT_POLICY=0;//AbortPolicy
private static final int CALLERRUN_POLICY=1;//CallerRunPolicy;
private static final int DISCARD_POLICY=2;
private static final int DISCARDOLD_POLICY=3;
private static int select_Policy=ABORT_POLICY;//策略选择变量,改这个值选择不同的策略
public static void main(String[] args) {
BlockingQueue bq=new LinkedBlockingDeque(1);//阻塞队列只能容纳一个任务
ThreadPoolExecutor tpe=new ThreadPoolExecutor(Thread_Size,Thread_Size,0L,TimeUnit.SECONDS,bq);//创建一个固定线程的线程池
tpe.setRejectedExecutionHandler(getPolicy(DISCARDOLD_POLICY));//设置线程池的饱和策略
for(int i=0;i<Task_Size;i++){
tpe.submit(new Task(i));
}
tpe.shutdown();//关闭线程池
}
private static RejectedExecutionHandler getPolicy(int select_policy) {
switch (select_policy){
case ABORT_POLICY:
return new ThreadPoolExecutor.AbortPolicy();
case CALLERRUN_POLICY:
return new ThreadPoolExecutor.CallerRunsPolicy();
case DISCARD_POLICY:
return new ThreadPoolExecutor.DiscardPolicy();
case DISCARDOLD_POLICY:
return new ThreadPoolExecutor.DiscardOldestPolicy();
default:
break;
} return new ThreadPoolExecutor.AbortPolicy();//默认策略
}
static class Task implements Runnable{
private int TASK_ID;
public Task(int TASK_ID){
this.TASK_ID=TASK_ID;
}
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(TASK_ID+"is running"+Thread.currentThread().getName());
}
}
}
自己简单实现的线程池代码如下
//实现固定数量线程池 execute shutDown
public class MyThreadPoll {
private int pollSize;//当前线程数
private int corePollSize;//核心线程数
private int maxPollSize;//最大线程数
private List<Thread> threadList;//存放线程的容器
private BlockingQueue<Runnable> workQueue;//任务队列
private volatile boolean isRunning=true;//标记调用了shutDown;
private volatile boolean shutDown=false;
public MyThreadPoll(int corePollSize){
this.pollSize=0;
this.corePollSize=corePollSize;
this.maxPollSize=corePollSize;
this.threadList=new ArrayList<>(corePollSize);
this.workQueue=new LinkedBlockingQueue();
}
public void MyExecute(Runnable runnable) throws InterruptedException {
if(runnable==null){
throw new NullPointerException();
}
//核心线程数未满
if (pollSize < corePollSize) {
//创建线程执行任务
addThread(runnable);
} else {//核心线程数已满,任务放入任务队列
workQueue.put(runnable);
}
}
//创建线程
private void addThread(Runnable runnable) throws InterruptedException {
pollSize++;
//任务放入任务队列
ThreadPollWork threadPollWork=new ThreadPollWork(runnable);
Thread t=new Thread(threadPollWork);
threadList.add(t);//线程存入容器
t.start();
}
class ThreadPollWork implements Runnable {
public ThreadPollWork(Runnable runnable) throws InterruptedException {
workQueue.offer(runnable);
}
@Override
public void run() {
while(isRunning||!shutDown){
Runnable task= null;
if(isRunning==false){
Thread.interrupted();
}
try {
task = workQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(isRunning==false){//保证当前线程执行完
shutDown=true;
}
}
}
//关闭线程池
public void shutDown(){
isRunning=false;
if(!threadList.isEmpty()){
for(Thread t:threadList){
t.interrupt();
}
}
shutDown=true;
}
}