多线程,高并发初步(5,线程池)
基础知识
接口Executor:执行器
这是最初步的东西,只有一个execute方法。
/**
*Executor:执行器,只有一个execute方法。
*
* @author zhouyi
*
*/
public class MyExecutor implements Executor {
public static void main(String[] args){
new MyExecutor().execute(()->System.out.println(“hello!zhouyi”));
}
@Override
public void execute(Runnable command) {
command.run();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
接口ExecutorService
他是这样描述的:public abstract interface ExecutorService extends Executor,显然继承了Executor。里面封装了很多的方法。
public abstract interface ExecutorService extends Executor {
public abstract void shutdown();
public abstract List<Runnable> shutdownNow();
public abstract boolean isShutdown();-------是不是结束了
public abstract boolean isTerminated();
public abstract boolean awaitTermination(long paramLong,-------------等待结束
TimeUnit paramTimeUnit) throws InterruptedException;
public abstract <T> Future<T> submit(Callable<T> paramCallable);--------提交
public abstract <T> Future<T> submit(Runnable paramRunnable, T paramT);
public abstract Future<?> submit(Runnable paramRunnable);
public abstract <T> List<Future<T>> invokeAll( ------------------------调用所有
Collection<? extends Callable<T>> paramCollection)
throws InterruptedException;
public abstract <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> paramCollection, long paramLong,
TimeUnit paramTimeUnit) throws InterruptedException;
public abstract <T> T invokeAny(
Collection<? extends Callable<T>> paramCollection)
throws InterruptedException, ExecutionException;
public abstract <T> T invokeAny(
Collection<? extends Callable<T>> paramCollection, long paramLong,
TimeUnit paramTimeUnit) throws InterruptedException,
ExecutionException, TimeoutException;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
Callable,
其中有个call()方法。Runnable是没有返回值的并且不会抛出checked异常。相反Callable是有返回值的,可以根据需要选择使用。
类Executors,是一个工具类
是这么定义的:public class Executors;这个类中有很多很多方法,不细去讲。
基础ThreadPool–线程池的概念
线程池:就是扔了一堆线程在里面,等待任务被扔进去。维护者两个队列,一个是放执行队列,一个是放执行完的队列
newFixedThreadPool–固定线程池
public class myThreadPool {
public static void main(String[] args) throws InterruptedException {
// 固定线程池 ,ExecutorService可以往里扔任务的,execute,submit
ExecutorService service = Executors.newFixedThreadPool(5); // 启5个线程池
for(int i = 0; i < 6; i++) { // 扔6个任务进去
service.execute(()->{
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
service.shutdown();
System.out.println(service.isTerminated()); // 有没有执行完了
System.out.println(service.isShutdown()); // 是否关了
System.out.println(service);
TimeUnit.SECONDS.sleep(5);
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
运行结果:显然第六个线程,不能执行进入等待。
Future (未来的-结果)的概念
是这么定义的:
public abstract interface Future<V> {
public abstract boolean cancel(boolean paramBoolean);
public abstract boolean isCancelled();
public abstract boolean isDone();
public abstract V get() throws InterruptedException, ExecutionException;
public abstract V get(long paramLong, TimeUnit paramTimeUnit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
举例子:
// 将来要执行的任务。执行后返回值;执行未来的返回值,放入未来的结果中
public class TestFuture { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); // 将来要执行的任务。执行后返回值 executor.shutdown();
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(“子线程在进行计算”);
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
执行结果:
newFixedThreadPool—线程池
并行–解决效率,以及解释线程池的概念。
public class MyparallelComputing { public static void main(String[] args) throws InterruptedException,ExecutionException{ long start = System.currentTimeMillis(); // 只用一个主线程 List<Integer> list = getPrime(1,200000); long end = System.currentTimeMillis(); System.out.println(end - start); // 利用线程池去实现 final int cpuCoreNum = 4;
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask task1 = new MyTask(1, 80000); MyTask task2 = new MyTask(80001, 130000); MyTask task3 = new MyTask(130001,170000); MyTask task4 = new MyTask(170001, 200000); Future<List<Integer>> future1 = service.submit(task1); Future<List<Integer>> future2 = service.submit(task2); Future<List<Integer>> future3 = service.submit(task3); Future<List<Integer>> future4 = service.submit(task4); start = System.currentTimeMillis(); future1.get(); // 阻塞 future2.get(); future3.get(); future4.get(); end = System.currentTimeMillis(); System.out.println(end - start);
}
static class MyTask implements Callable<List<Integer>> {
int startPos,endPos;
MyTask(int s, int e) {
this.startPos = s;
this.endPos = e;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> list = getPrime(startPos, endPos);
return list;
}
}
static boolean isPrime(int a) {
for(int i = 1; i < a/2; i++) {
if(a % i == 0) return false;
}
return true;
}
static List<Integer> getPrime(int start, int end) {
List<Integer> list = new ArrayList<Integer>();
for(int i =start; i < end; i++) {
if (isPrime(i)) list.add(i);
}
return list;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
newCachedThreadPool–线程池
线程池中没有线程,来一个任务就启动一个任务,起到你的系统能够支撑的线程,大概几万个。
他有一个生存周期:默认60秒
public class MyCachePool { public static void main(String[] args) throws InterruptedException,ExecutionException{ ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service); for(int i = 0; i < 2; i++) { service.execute(()->{ try { TimeUnit.MICROSECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }); } System.out.println(service); TimeUnit.SECONDS.sleep(60); System.out.println(service);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
运行结果:
newSingleThreadExecutor–线程池
线程池里面只有一个线程。作用:能保证线程执行的前后顺序。
实例:
public class MyCachePool { public static void main(String[] args) throws InterruptedException,ExecutionException{ ExecutorService service = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++) { final int j = i; service.execute(()->{ System.out.println(j + Thread.currentThread().getName()); }); }
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
运行结果:
newScheduledThreadPool–线程池
该线程池用来执行定时任务。
public class MyNewScheduledThreadPool {
public static void main(String[] args) throws InterruptedException,ExecutionException{
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(()->{ // 以固定的(频率)时间执行任务
try {
TimeUnit.MICROSECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MICROSECONDS); // 从0(马上执行),之后每隔500毫秒执行一次。
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
newWorkStealingPool–线程池( 经营线程)
该线程池,假如3个线程,每个线程都有一个队列,现在加入2线程已经执行完了,他会自动的去拿1,3队列里的任务执行(偷过来执行)。
public class MynewWorkStealingPool { public static void main(String[] args) throws IOException{ ExecutorService service = Executors.newWorkStealingPool(); // 判断cpu是几核的,得到结果4核 System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(1000)); // daemon 经营线程 service.execute(new R(2000)); service.execute(new R(3000)); service.execute(new R(5000)); service.execute(new R(7000)); // 这句话不要是看不到结果的,原因是因为这个线程池的特殊 System.in.read();
}
static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
运行结果:
ForkJoinPool --线程池(1.7新加)
常用通过RecursiveAction(没有返回值)和RecursiveTask(有返回值不举例了)来实现
是通过fork进行分叉,jion进行合并。这个概念很重要。用于特别难的大任务,我来这个来切分成子任务,子任务也还可以继续分。
最后合并在一起。
实例:超过阈值就分成两段。
public class ForkJoinPool {
static int[] news = new int[1000000];
static final int MAX_NUM = 50000;
static Random random = new Random();
static {
for(int i = 0; i < news.length; i++) {
news[i] = random.nextInt(100);
}
System.out.println(Arrays.stream(news).sum());
}
static class addTask extends RecursiveAction { //通过RecursiveAction和RecursiveTask
int start,end;
addTask(int s,int e) {
this.start = s;
this.end = e;
}
@Override
protected void compute() {
if (end - start < MAX_NUM) {
long sum = 0;
for(int i = start;i <end; i++) sum += news[i];
System.out.println("start:" + start + "to" + "end:" +end + "=" +sum);
} else {
int middle = start + ( end - start)/2;
addTask task1 = new addTask(start,middle);
addTask task2 = new addTask(middle,end);
task1.fork();
task2.fork();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
运行结果;
线程池背后的原理 --ThreadPoolExecutor
以上的6大线程池都是以ThreadPoolExecutor为根基的(ForkJoinPool 除外),可以用这个自定义自己想要的线程池的参数。
总结:
线程池的基础:Executor , ExecutorService(submit) , Callable 和 Runnable , ThreadPool , Future
6大线程池
newFixedThreadPool, CachedPool , newSingleThreadExecutor , newScheduledThreadPool, newWorkStealingPool , ForkJoinPool
</div>
基础知识
接口Executor:执行器
这是最初步的东西,只有一个execute方法。
/**
*Executor:执行器,只有一个execute方法。
*
* @author zhouyi
*
*/
public class MyExecutor implements Executor {