Concurrent之_线程池
一、线程池
线程池 - 如果每一个请求对应一个线程,那么会导致线程大量的创建和销毁。减少线程的创建和销毁,希望能够重复使用已有的线程,有了线程池 — 存储线程的队列
特点:
1. 线程池在创建的时候里面是没有线程的
2. 当过来请求的时候,会在线程池中创建一个线程来处理这个请求。当请求处理完毕的时候,线程就会还回线程池,等待下一个请求
3. 核心线程在线程池中需要限定数量
4. 如果所有的核心线程都在使用,那么再来的请求就会放入工作队列中。工作队列是一个阻塞式队列。
5. 如果所有的核心线程都被占用并且工作队列已满,那么会创建临时线程去处理新的请求
6.临时线程处理完请求之后并不是立即销毁,而是存活一段时间,如果过了这段时间依然没有新的请求,那么临时线程就被销毁
package cn.tedu.concurrent.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 执行器服务
// corePoolSize - 核心线程数量
// maximumPoolSize - 线程池的容量 = 核心线程数量 + 临时线程数量
// keepAliveTime - 临时线程的存活时间
// unit - 时间单位
// workQueue - 工作队列
// handler - 拒绝执行助手
ExecutorService es = new ThreadPoolExecutor(5, 10, 3000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(new Thread(r).getName() + "线程被拒绝~~~");
}
});
// 执行线程
for (int i = 0; i < 17; i++) {
es.execute(new RDemo());
}
// 关闭线程池
es.shutdown();
}
}
class RDemo implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":hello");
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、Java提供的线程池
package cn.tedu.concurrent.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo2 {
public static void main(String[] args) {
// 缓存线程池
// 特点:
// 1. 没有核心线程,所有线程都是临时线程
// 2. 线程池的容量可以认为是无限的 (大池子)
// 3. 每一个临时线程存活时间都是1min - 存活时间不长
// 4. 工作队列是一个同步队列 - 只能存储一个元素 (小队列)
// 总结:
// 1. 大池子小队列
// 2. 理论上能够处理任意多的请求
// 3. 适合于短任务场景,例如:聊天
// ExecutorService es = Executors.newCachedThreadPool();
// 混合线程池
// 特点:
// 1. 需要指定核心线程数量
// 2. 只有核心线程,没有临时线程 (小池子)
// 3. 工作队列是阻塞式链式队列,没有指定容量 (大队列)
// 总结:
// 1. 小池子大队列
// 2. 理论上能够存储任意多的请求
// 3. 适合于长任务场景,例如:文件下载
ExecutorService es = Executors.newFixedThreadPool(5);
}
}
三、Callable和Runnable的区别:
1. Runnable线程执行完成之后没有返回值;Callable需要定义返回值类型并且返回结果
2. Runnable线程可以通过Thread启动执行,也可以通过线程池启动执行;Callable只能通过线程池来执行
3. Runnable没有容错机制,一旦出现异常需要自己处理;Callable可以将异常抛出利用全局的方式来进行处理
package cn.tedu.concurrent.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Runnable线程可以通过Thread来启动,也可以通过线程池来执行
// Callable线程只能通过线程池来执行
ExecutorService es = Executors.newCachedThreadPool();
// execute方法只能执行Runnable线程
// es.execute(command);
// 执行Callable线程
// 将结果封装为Future
// submit方法即可以执行Runnable也可以执行Callable
Future<String> f = es.submit(new CDemo());
System.out.println(f.get());
es.shutdown();
}
}
// <V> 表示结果类型
class CDemo implements Callable<String> {
@Override
public String call() throws Exception {
return "SUCCESS";
}
}
三、ScheduledExecutorService - 定时执行者服务
package cn.tedu.concurrent.pool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo2 {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
// 每5秒钟循环执行一个线程
// command - 线程
// initialDelay - 初始延迟时间 - 当线程池启动以后,线程需要延迟多长时间才能执行
// period - 间隔时间
// unit - 时间单位
// 以上一次的起始时间开始计算
// 如果线程的执行时间超过了指定的间隔时间
// 那么第二次线程不会立即启动而是等上一个线程执行完成之后再启动执行
// ses.scheduleAtFixedRate(new RDemo2(), 0, 5, TimeUnit.SECONDS);
// delay - 表示线程上次完成之后需要间隔多长时间再执行下一次
// 以上一次的结束实际开始计算
ses.scheduleWithFixedDelay(new RDemo2(), 0, 5, TimeUnit.SECONDS);
// 凌晨1点对数据进行更新
// 获取当前时间 - 计算当前时间到凌晨一点的时间间隔
// 每执行一次这个方法,就在当前时间上加上1天
}
}
class RDemo2 implements Runnable {
@Override
public void run() {
System.out.println("hi");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
四、分叉合并 - Fork and Join
1.将一个大任务不断的分成许多个小任务(分叉),小任务在执行完成之后再将结果进行汇总(合并)
2.任务在分叉之后会分发到CPU不同的核上去,然后利用核进行数据的处理。原来的for循环只是在一个核上来执行的代码,所以就导致分叉合并的效率要高于for循环
3.分叉合并采取了work-stealing(工作窃取)策略来实现线程的高效操作 -当一个核上的所有线程执行完成,就会随机挑一个核,从这个核的线程队列中偷取最后一个线程回来,然后执行
分叉合并会带来资源共享问题
package cn.tedu.concurrent.pool;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
ForkJoinPool fk = new ForkJoinPool();
Long sum = fk.invoke(new Calc(1, 100000000000L));
fk.shutdown();
System.out.println(sum);
// long sum = 0;
// for(long i = 1; i <= 100000000000L; i++)
// sum += i;
// System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
// RecursiveTask和RecursiveAction都可以实现分叉合并
// RecursiveTask表示有返回结果
// RecursiveAction不需要返回结果
class Calc extends RecursiveTask<Long> {
private static final long serialVersionUID = 2145165687757218781L;
private long start;
private long end;
private static final long THREHOLD = 1000; // 阈值
public Calc(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果start-end之间的数字个数>阈值,需要继续进行分叉
// 如果start-end之前的数字个数<=阈值,进行求和
long count = end - start;
if (count <= THREHOLD) {
long sum = 0;
for (long i = start; i <= end; i++)
sum += i;
return sum;
} else {
long mid = (start + end) / 2;
Calc left = new Calc(start, mid);
Calc right = new Calc(mid + 1, end);
// 将这个线程分叉出去
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
五、Lock
比synchronized更加的灵活
公平和非公平策略:
非公平策略是当线程释放锁之后依然可以再次抢占;
公平策略是当线程释放锁之后会到队列中让其他线程进行抢占
从效率上考虑,非公平的效率会高
package cn.tedu.concurrent.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
public static void main(String[] args) {
Product p = new Product();
// 默认是非公平的
// Lock l = new ReentrantLock();
// 表示将其设置为公平策略
Lock l = new ReentrantLock(true);
new Thread(new Producer(p, l)).start();
new Thread(new Consumer(p, l)).start();
}
}
class Product {
private int count;
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
class Producer implements Runnable {
private Product p;
private Lock l;
public Producer(Product p, Lock l) {
this.p = p;
this.l = l;
}
@Override
public void run() {
while (true) {
// 加锁
l.lock();
int count = (int) (Math.random() * 1000 + p.getCount());
p.setCount(count);
System.out.println("生产了" + count);
// 解锁
l.unlock();
}
}
}
class Consumer implements Runnable {
private Product p;
private Lock l;
public Consumer(Product p, Lock l) {
this.p = p;
this.l = l;
}
@Override
public void run() {
while (true) {
l.lock();
int count = (int) (p.getCount() * Math.random());
p.setCount(p.getCount() - count);
System.out.println("本次剩余了" + p.getCount());
l.unlock();
}
}
}
六、ReadWriteLock - 读写锁
readlock:允许多个线程读,但是不允许线程写(增加/删除/更新)
writelock:允许一个线程写,但是不允许线程读
原子性操作
volatile本身是一个关键字,保证属性在多线程场景下的原子性
A — 1 -> 2 -> 3
线程在底层的执行过程中可能产生的指令的错乱 — 寄存器/PC计数器
A — 1 ->3 -> 2
导致实际结果和预期结果不符
volatile — 限定线程在执行过程中必须按照给定的指令顺序来执行
package cn.tedu.concurrent.atomic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo2 {
// jdk1.8之前是 volatile + 锁
// jdk1.8开始是 volatile + CAS
static AtomicInteger ai = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(2);
new Thread(new Add2(cdl)).start();
new Thread(new Add2(cdl)).start();
cdl.await();
System.out.println(ai.get());
}
}
class Add2 implements Runnable {
private CountDownLatch cdl;
public Add2(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
// AtomicDemo2.ai.addAndGet(1);
// 默认+1
AtomicDemo2.ai.incrementAndGet();
}
cdl.countDown();
}
}
扩展:
进程是计算机进行资源分配最小单位。
线程是计算机进行任务调度的最小单位,线程可以认为是简化版的进程。每一个进程中至少包含一个线程。
线程只会向前执行 ;并且一个核在同一个时刻内只能执行一个线程。
多道编程的意义:
提高CPU的利用率。一个线程在执行任务的过程中,有一部分时间在CPU上进行计算, 另一部分时间和硬件进行IO交互。线程在进行IO操作的时候是不会利用CPU的。
1 0.8 0.2
2 0.80.8=0.64 0.36
3 0.80.8*0.8=0.512 0.488
Concurrent:
1. BlockingQueue
2. ConcurrentMap
3. ExecutorService
4. Lock
5. Atomic