线程池的简单实现

线程池的简单实现

有几天没写代码,总是感觉有些心慌,随意写一篇文章,就当是自己的知识点的重新复习。
时间过得真快,眼前的困难只是暂时的,希望在职业生涯能够保持一份奋斗之心,stay young stay hungry,成长之路希望能够感知一丝丝成就感。
本篇文章介绍下线程池,尽量把代码注释写清楚。

线程池执行流程

1 新任务(runnable)待执行
2 如果开始核心线程没满(小于coreSize),新建线程执行
2 如果线程满了(线程池数量达到coreSize),判断是否队列满了,没满,任务储存进入队列等待执行
4 工作线程RunWorker执行完第一个任务之后,就阻塞队列任务,直至去除任务执行
线程池的简单实现

线程池简单代码

// An highlighted block
package com.example.demo.pool;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadExecutor {
	//阻塞队列,存放待处理任务
    private BlockingQueue<Runnable> blockingQueue;
	//线程队列
    private Set<Thread> threads = new HashSet<>();
	//核心线程数
    private int coreSize;
	//轻量级锁
    private Lock lock = new ReentrantLock();


	//构造方法,第一个参数核心线程数,第二个阻塞队列
    public MyThreadExecutor(Integer coreSize, BlockingQueue<Runnable> blockingQueue) {
        this.coreSize = coreSize;
        this.blockingQueue = blockingQueue;
    }

    public void execute(Runnable runnable){
    	//上锁,防止并发导致比较出现异常
        lock.lock();
        //核心线程数没满,新建线程执行
        if(threads.size() < coreSize){
            Thread thread = new Thread(new RunWorker(runnable));
            thread.start();
            threads.add(thread);
        }else {
        //存放队列
            blockingQueue.add(runnable);
            System.out.println("队列待处理任务" + blockingQueue.size());
        }
        lock.unlock();
    }

    class RunWorker implements Runnable{

        private Runnable runnable;

        public RunWorker(Runnable runnable){
            this.runnable = runnable;
        }


        @Override
        public void run() {
			//第一个任务(新建线程时执行)
            runnable.run();
            //死循环执行队列中待处理任务
            while (true){
                try {
                    runnable = blockingQueue.take();
                    runnable.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

	//测试方法(20个任务,只有十个线程执行,每个任务执行10s中)
	//结果第一批任务先执行完成之后,10s后再执行第二批任务
    public static void main(String[] args) {
    	//新建线程池
        MyThreadExecutor executor = new MyThreadExecutor(10, new LinkedBlockingQueue<>());
        //计数器
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for(int i = 0; i < 20; i++){
            executor.execute(() ->{
                System.out.println("执行编号:" + atomicInteger.incrementAndGet());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}