《Java并发编程的艺术》读书笔记
君子谋道不谋食,恍惚半载被忧贫
第一章:并发编程的问题
多线程是为了解决效率问题,尽可能使用处理器资源,同时保持线程共享资源安全。
2. 1. 上下文切换:多个线程之间,频繁的切换,浪费调度资源
2. 死锁:由于死循环或者互斥等造成的线程无法继续进行,永久等待的情况
3. 资源限制的挑战:读入速度大于写入速度
第二章:java并发机制的底层实现和主要工具
- 轻量锁:volatile-单词的意思是不稳定,即告诉处理器此变量不稳定,不能够进行重排序,需要可见性
volatile使用缓存一致性来保持安全,即线程内存缓存中的变量被修改后,修改立即通知其他的处理器,检查当前变量,修改为最新的数据。同时不能使用重排序;
2.重量级锁:synchronized。修饰方法,代码块;后续synchronized引入的等级锁,在某些情况下,并不是重量级的。完全的线程屏蔽
三种锁的类型:
- 偏向锁:时间发现,大部分情况下,都是同一个线程在反复使用资源。此时如果加锁,会频繁获取和释放锁,这种操作会浪费资源,所以,会出现偏向锁,如果下一次获取的还是当前锁,则直接将锁给予当前线程。否则,有竞争,撤销偏向锁。
- 轻量级锁:当发生线程竞争是,竞争的一方如果没有获取锁,就会采取自旋操作,等待获取,长期获取不到,则升级到重量级锁。锁的等级只能上升,不能下降
- 重量级锁:即synchronized锁-默认偏向锁。
3.原子操作
一个操作内需要全部完成,不能受外界干扰。原子操作是一个集合,一个事物,即一个任务要么执行完全,要么不执行。
第三章:java内存模型
并发的重要数据是指堆中的数据,这里的数据是线程共享的,包括实例域、静态域和数组元素。
1.指令重排序:处理器底层为了计算的优化,会对数据指令进行重新排序,从而降低元素的重复获取,而不影响单线程逻辑。但是在多线程下可能出现问题。
2.顺序一致性:先来的先操作完,后来的等待
3.volatile:即不稳定,告诉处理器内部,操作此元素,不经过缓存,不重排序,直接对内存进行操作。修改后通知其他线程。
4.锁的定义:
5.final
6.happens-before:as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同
步的多线程程序的执行结果不被改变。
7.双重检查锁定-延迟初始化
8.java内存模型综述
第四章:Java并发编程基础
什么是线程:
线程是操作系统调度的最小单元,是一种轻量级进程。一个进程一般包括多个线程,每一个线程都有自己的计数器、堆栈、局部变量属性。进程中的多线程共享一个内存。
多线程的作用,优势:
* 充分应用多核处理器的性能
* 更快速的响应
线程优先级:
操作系统同时时间片来控制线程的使用和切换,一般包括以下四种:
* 先来先服务
* 最短作业优先
* 最大优先级调度
* 固定时间片轮转
设定优先级可以按照定义的优先级顺序获取锁(尽量)-默认优先级是5
使用方法:
一般而言,优先级的设定是为了防止CPU独占。偏重于计算的优先级低,防止CPU获取不放;偏重于频繁阻塞的线程,优先级高,这样可以快速得到CPU,但是不长久持有,即频率高的。一般是IO操作。
线程的状态:
初始化new:线程实例化。线程调用start方法,启动
运行态:running(包括就绪态课运行状态),线程获得CPU,进行业务操作
阻塞状态:blocked线程未获得锁,进入阻塞队列
等待状态:waiting线程进入等待状态,但是不释放锁,等待通知,或者睡眠醒来或者中断
等待超时:time_waiting
终止状态:terminated线程业务完成
两个命令:
jps:获取当前的java线程
jstack 929:获取线程ID运行信息
1.new实例化
2.调用start方法进入就绪状态,等待系统调度
3.如果获取到系统时间片,即进入运行态running
4.若调用yield-退让,则表示放弃时间片,重新进入就绪状态(注意,也可能会再次获得时间片)
5.若执行wait方法,join方法,LockSuport.Park方法,则会进入等待线程。同时放弃时间片
* wait方法:等待线程,让出时间片;需要notify唤醒,或者notifyAll全部唤醒
* join方法:等待主线程终止之后,才能从当前线程返回
* LockSupport.park泊车,即退出时间片,让出车位。需要调用unpark再次进入就绪
6.wait方法,join方法,LockSuport.Park方法如果带有超时设置参数,会进入等待超时,如果超时,则返回线程,继续后面的步骤,一般用于后续步骤不受等待线程影响时。其中sleep不释放锁,时间超时后重新执行之后的流程。
7.如果当前线程在运行中需要进入某个同步方法,如果未获得锁,则进入阻塞状态,等待获取锁,获取到锁之后,才能继续进行。
8.执行完毕,中止线程
注意等待和阻塞的区别:
等待是当前线程的自我行为,一般是为了等待外界的一个信号,终止当前的线程,如wait放弃时间片等待唤醒和sleep等待一段时间重新运行。lock锁也是等待,因为lock的底层使用的是LockSuppor相关方法
阻塞是当前线程遇到了锁,但是获取不到,导致进不了门而阻塞,主要是synchronized锁。这种状态一般不会释放之前获取的锁。
Daemon线程:
daemon即后台、守护的意思,即后台进程。
* 后台进程不在终端显示
* 后台进程只是一个陪伴进程。即如果没有外部进程存在,后台进程也会死掉。
线程中断:
中断用于获取线程的标志,用来判断线程运行中是否被中断了。
* 通过isInterrupted(判断是否被中断
* 通过静态方法Thread.interrupted()对当前中短线程进行复位
其他详细细节再补
线程中断用于在完成固定任务后自动结束线程。通常需要一个信号变量来标志
过期的suspend()、resume()和stop()
相当于cd机的暂停、开始、重新开始,已舍去
4.3. 线程间通信
volatile和synchronized关键词
volatile意为不稳定的,表示所修饰的变量容易被修改,需要操作系统底层,及时的将变化的数值写入到内存中,而不经过缓存。即达到可见性。同时操作系统对含有当前变量的操作不执行重排序。但是volatile的符合操作并不是原子性的,即读取修改是两个操作,不能一步完成。只能单独的读取,写入。一般用作信号标志,比如flag=ture,这个地方只存在写入,不存在读取;而flag=num,就存在读取和写入:读取num的值,然后写入flag的新值
synchronized是重量级锁,可见性原子性都能保证。可以修饰同步快、方法。实例方法对实例对象加锁,静态方法对类进行加锁,同步块需要自己指定对象。
synchronized同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取。统一时刻只有一个线程能够获取对象的监视器。
线程对对象进行操作时,首先需要获取对象的监视器,才能对对象进行操作;否则进入同步队列,等待正在使用的线程的exit通知,进而再次尝试获取监视器。
4.3.2. 等待通知机制
等待通知机制类似于生产者消费者模式,一个线程等待另一个线程的通知信息,进而完成后续任务。
以往的生产者消费者模式:生产者生产大米,消费者消费。如果没有通知,消费者每次去厂里买大米时,可能存在一种情况,有时候没有,有时候有,这样的话,消费者需要不间断的定期去查看,资源消耗的。
使用等待通知后:生产者生产了大米,就给消费者打个电话,通知米有了。
java中的等待通知机制主要使用wait和notify
* 使用wait()、notify()和notifyAll()时需要先对调用对象加锁,因为方法需要获取对象的监视器
* notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
同步队列:存放阻塞线程
等待队列:存放等待线程
- join方法:等待主线程终止之后才能执行当前线程
- ThreadLocal:线程变量的副本,每个线程单独持有一个变量。一般用于测试当前线程的业务。比如测试单个线程方法的执行时间,需要排除其他线程的干扰,就可以顶一个一个时间变量ThreadLocal
4.4 线程应用实例:
- 等待超时模式:线程超时后,自动返回默认值,继续后续步骤:
应用:数据库连接池
在数据库连接池中,连接线程有限,当线程已满时,会导致后续的线程一直处于获取连接池的状态,系统可能会奔溃。因而,加入超时机制后,超过一定时间会自动返回默认值NULL,让客户端等待再次获取。 - 线程池技术
作用:
* 降低每次创建线程的消耗
* 方便管理
结构:线程队列
第五章:Java中的锁
5.1. lock接口
synchronized是隐式锁,即在底层内化,获取和释放在一个继承中,不需要开发者手动操作。这样也就导致了开发的不便捷,无法提供其它的功能
lock锁,提供显示的加锁和释放锁,因而,可以在加锁后进行其他的处理,比如特殊情况的中断操作,超时返回等。
5.2. 队列同步器AQS
队列同步器AbstractQueuedSynchronizer-抽象队列同步器,是同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)基本构成。
同步器是基于模板模式,其他的同步类通过通过继承同步类,并修改相应的方法来达到相应的同步机制
同步器内部采用一个双向队列,来完成同步状态
同步状态:用于标记是否可以获取锁
同步队列:用于保存被阻塞的线程
一句话概述:
AQS的主要作用是维持了一个线程阻塞的双端队列;对于一个线程,首先尝试获取同步状态,获取即执行,否则:如果队列初始化了,直接将当前线程包装为Node通过CAS加入到队列中(addWaiter方法);否则进入enq方法-判断队列是否初始化,进行初始化后,重新入队。
然后进入acquireQueued方法,再次尝试获取锁:判断当前节点的前驱是否为头结点,如果是,表明是第一个节点,直接尝试获取同步状态,成功则返回false,禁止selfInterrupt中断调用;否则进入shouldParkAfterFailedAcquire方法,将节点状态设置为SIGNAL,表示需要unpark解除,同时LockSupport.park阻塞线程,然后执行自旋操作。
参照:https://juejin.im/post/5aeb07ab6fb9a07ac36350c8
5.3. 重入锁ReentrantLock
支持可重入的锁,即当前线程可以进一步获取自己的所,相当于递归操作。synchronized也是可重入锁。
可重入锁的实现:
* 判断是否是当前锁
* 多重锁的释放,保持一个计数状态
以非公平锁为例获取同步状态码:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 非公平锁的获取:获取同步状态锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 如果为0,表明没有锁,使用CAS修改同步状态,并将独占锁设置为当前线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 否则如果独占锁的拥有者就是当前线程,则重入;
// 增加同步状态值,返回true;否则独占锁被其他线程拥有,获取独占锁失败,返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 同理释放同步状态锁
// 直接减少同步状态值,直到同步状态为0,返回true;否则false
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 公平锁的获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 唯一的不同是,公平锁的获取,需要检查同步队列是否直接前驱,如果有,则等待;相当于先来先得。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5.4. 读写锁ReentrantReadWriteLock
读写锁是一种读写分离的锁,允许多个线程度读,只允许单个线程写。
同步状态的设计:
与可重入锁不同的是,可重入锁只需要维持一个同步状态就可以最为单一状态,但是读写锁,需要两个状态作为符号,为了节约空间或者说为了方便计算。设计者使用了位运算将读状态和写状态统一到一个32位整形中,左16为表示读状态,右16位表示写状态。
读锁即共享锁:share标志
写锁即独占锁:exclusive标志
// 读锁的获取,即获取共享锁
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 1.计算写锁状态,如果不等于0,且不是当前线程。获取所失败,返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 否则,计算共享锁状态
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// 2.获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
// 3.之后的逻辑是为了获取其他信息,比如读锁的个数等
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//4. 处理在第二步中CAS操作失败的自旋已经实现重入性
return fullTryAcquireShared(current);
}
// 读锁的释放
protected final boolean tryReleaseShared(int unused) {
// 前面还是为了实现getReadHoldCount等新功能
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 直接CAS操作尝试释放锁
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
// 写锁的获取
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 获取写状态
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果读锁被持有或者写锁被其他线程占有
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写状态大于16位存储量,异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 当前线程持有写锁,重入
setState(c + acquires);
return true;
}
// 没有锁,直接尝试获取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 写锁的释放
protected final boolean tryRelease(int releases) {
// 独占锁是否是当前的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 释放锁:减少读状态。如果读状态为0,同时则释放所线程独占;
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 锁降级是指,ReentrantReadWriteLock支持在一个线程中,可以在获取读取的前提下,再次获取写锁,而不需要释放写锁
//示例如下:
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
5.5. LockSupport工具
LockSupport工具提供基本的线程阻塞和唤醒功能,AQS的等待通知机制就是使用的这个方法。
与wait方法不同的是,wait方法是object的方法,每次调用都需要释放锁,因而需要在synchronized中使用。
LockSupport工具主要提供两个重要的方法用于阻塞和唤醒线程:
park():停车;park的底层使用的是本地方法,具体的逻辑是,每次调用需要获取一个“许可”信息,成功设置为0。
unpark():启动离开;直接修改“许可”信息为1,如果之前的值为0,表明有线程获取了它,需要调用pthread_cond_signal唤醒之前park的线程
其他park方法,
park(Object blocker):其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。
parkNanos(long nanos) :阻塞当前线程多少时间
parkNanos(Object blocker, long nanos)
parkUntil(long deadline):阻塞线程直到什么时候
parkUntil(Object blocker, long deadline)
5.6. Condition接口
监视器monitor方法:任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等(英文:条件)
待/通知模式.。
condition对象通过lock来创建:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition通过对应的两个方法来达到等待和唤醒机制(都由AQS提供,一个内部类)
await():等待
signal():唤醒
// 等待唤醒案例-生产者消费者模式
public class BoundedQueue<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
Condition的实现分析:
condition的实现方式是通过ConditionObject实现,ConditionObject是同步器AbstractQueuedSynchronizer的内部类。每一个condition都有一个等待队列,通过这个队列来实现等待通知机制
等待队列Node:
等待队列是一个单向队列,同时又头结点和尾结点。每个节点都是一个线程的引用,即等待的线程
await:释放锁,同时加入等待队列
signal:获取锁,出队
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列(同步队列是竞争队列,等待队列是任务等待)
源码分析:
// 等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到队尾
Node node = addConditionWaiter();
// 释放同步状态-释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 自旋获取同步状态-即lock,循环检查
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 加入队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.如果尾结点不为null,设置t为尾结点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 如果尾结点为null,表明未初始化,将头结点指向当前node;
// 否则入队
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 唤醒
public final void signal() {
// 确保获取独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 如果队列不为null,唤醒队列中的一个
if (first != null)
doSignal(first);
}
// 唤醒第一个节点
private void doSignal(Node first) {
do {
// 先将头结点从队列中移除-出队
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 对头结点进行处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 设置头结点的状态为-1
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// enq将头结点从阻塞队列放入到同步队列队尾,等待时间片
Node p = enq(node);
int ws = p.waitStatus;
// 设置通知信号,并unpark解除
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
解说:lock维护同步队列,condition维护等待通知队列
参考:https://www.jianshu.com/p/28387056eeb4
第六章:Java并发容器和框架
java中提供了大量的并发容器,便于多线程的开发;常见的有hashtable-concurrentHashMap-等
ConcurrentHashMap
一句话综述发展史:
- hashmap:hashmap只能支持单线程,在多线程情况下;在rehash扩容时,会存在循环死机问题:线程A对链表进行翻转式,线程B可能同样翻转,从而造成节点互相引用问题。
- hashtable使用synchronized对读写都进行重量级加锁,效率低
- jdk1.7的concurrenthashmap,使用数组+链表结构,使用segment细粒度锁对一部分数组进行分桶加锁,通过链表解决哈市冲突,加锁方式改用ReentrantLock
- jdk1.8,1.8版本舍弃了segment,并且大量使用了synchronized,以及CAS无锁操作以保证ConcurrentHashMap操作的线程安全性。底层数据结构改变为采用数组+链表+红黑树的数据形式。
具体改进:- 由于synchronized的改进,引用了锁提升,偏向锁-轻量级锁-重量级锁;在大多数情况下,平均效率并不比ReentrantLock、低
- 采用链表+红黑树,链表长度大于8之后,会上升为红黑树结构。查找效率更高。
- 去掉segment,直接对每一个数组项进行加锁
详细源码细节:待补充
ConcurrentLinkedQueue:
在单线程编程中我们会经常用到一些集合类,比如ArrayList,HashMap等,但是这些类都不是线程安全的类。在面试中也经常会有一些考点,比如ArrayList不是线程安全的,Vector是线程安全。而保障Vector线程安全的方式,是非常粗暴的在方法上用synchronized独占锁,将多线程执行变成串行化。要想将ArrayList变成线程安全的也可以使用Collections.synchronizedList(List list)方法ArrayList转换成线程安全的,但这种转换方式依然是通过synchronized修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea大师为我们准备了ConcurrentLinkedQueue这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。
Java中的阻塞队列
定义:阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不
满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
JDK 7提供了7个阻塞队列,如下。
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
Fork/Join框架:
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
相当于map-reduce计算模式,或者归并算法。将任务分给多个线程fork并行执行,然后等待所有的都完成后,进行join操作合并运算。
补充知识点:工作窃取算法
是指一个线程在完成自己的任务之后,再去运行其他线程的任务。为了避免任务冲突,使用队列模式,从另一线程的队尾获取任务。如下图:
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
// fork/join案例
/**
* 计数器任务
*
* @author tengfei.fangtf
* @version $Id: CountTask.java, v 0.1 2015-8-1 上午12:00:29 tengfei.fangtf Exp $
*/
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 阈值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}
第七章:Java中的13个原子操作类
原子类的底层都是通过unsafe类提供的CAS操作实现的,即比较交换。
CAS保留三个值,原始值地址,最初获取的值,将要修改的值。每次修改时,都会比较原始值地址所指引的值和获取的值是否相等,如果相等,说明,原始值没有被其他地址修改,则修改当前值。否则,被获取的值被其他线程修改了,需要重新获取,直到获取成功(自旋操作)
3原子包装类型:原理相同
AtomicBoolean:原子更新布尔类型。
AtomicInteger:原子更新整型。
AtomicLong:原子更新长整型。
3个原子数组类型:
AtomicIntegerArray:原子更新整型数组里的元素。
AtomicLongArray:原子更新长整型数组里的元素。
AtomicReferenceArray:原子更新引用类型数组里的元素。
1个原子更新引用类型
AtomicReference:原子更新引用类型。可以包装 自定义对象
第八章:Java中的并发工具类
Java中主要包括四个并发工具类
countdownlatch:技术下降门栓-通过维持一个计数器来判断是否到达指定点
cyclicbarrier:同步屏障-通过屏障挡板来阻挡一列线程
Semaphore:信号,控制并发线程数,相当于路口信号灯,只能进有限数量的线程
exchanger:信息交换,两个线程之间通信交换数据
CountDownLatch和CyclicBarrier区别:
* CountDownLatct通过countdown方法消耗计数,可以在同一个线程中多次使用;而CyclicBarrier一个线程只能使用一次
* CountDownLatch不能复位,即如果出现差错不能重新复位,操作不便。CyclicBarrier可以复位
* CountDownLatch构造函数只有一个int值,只提供等待,没有等待完成后操作;CyclicBarrier提供带动做的构造器CyclicBarrier(int parties, Runnable barrierAction) 可以全部都到达等待后,先完成统一的barrierAction操作,在继续各自的线程任务
第九章:Java中的线程池
线程池的作用:
* 重复利用已创建的线程,降低资源消耗
* 线程已经创建好,直接使用,提高响应速度
* 便于统一管理
创建线程池:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue)
参数:
* 核心线程大小:corePoolSize
* 最大允许线程:maximumPoolSize
* 超过核心线程的等待时间:keepAliveTime
* 等待时间单位:unit
* 使用何种阻塞工作队列:workQueue
线程池的工作逻辑:
流程:初始化线程池,线程为0;随着任务的到来,创建线程直到到达核心线程(期间线程即便不用也不销毁);当达到核心线程后,新来的任务加入到任务队列,等待获取;直到任务队列也满了,那么就继续创建线程直到达到最大线程数,此时如果继续到来任务,已经无法处理,按照策略进行处理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果线程数小于基本线程数,则创建线程并执行当前任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不在运行中或者无法加入队列,并且当前线程数小于最大线程,则创建线程
else if (!addWorker(command, false))
reject(command);
}
runnableTaskQueue任务队列:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原
则对元素进行排序。 - LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通
常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 - SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用
移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工
厂方法Executors.newCachedThreadPool使用了这个队列。 - PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
RejectedExecutionHandler(饱和策略):默认情况下是AbortPolicy
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
向线程池提交任务:两种方式
- execute():没有返回值,直接执行任务
- submit()方法:返回一个future对象,用于获取特定结果信息
关闭线程池:
- shutdown:依次遍历关闭线程池,调用interrupt,一般只能中断已完成的任务
- shutdownNow方法:不需要等待所有的线程执行完。先将所有的设置为stop,在调用interrupt
线程池的合理设置:
偏计算CPU:Ncpu+1
偏IO:2*Ncpu
混合型:自动搭配
举例:依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
即:CPU用的时间短,频繁调用,则多创建线程
第十章: Executor框架
工作单元和执行单元的解耦:
以往的线程任务是,首先创建Thread对象,并提供Run方法创建工作单元,然后调用start方法执行单元计算结果。所有的逻辑都包含在一个thread类中,无法完全分离。因为,提供一个Executor来解耦这种关系。
Executor框架由3大部分组成:
* 任务创建:被执行任务需要实现接口Runnable、Callable。
* 任务执行:任务执行机制的核心接口Executor,继承Executor的ExecutorService、CompletionService等。
* 任务结果获取:Future和实现了Future接口的FutureTask类。
任务创建省略
任务执行:
* 主接口Executor:只定义了一个方法void execute(Runnable command);该方法接受一个Runnable实例,作用就是执行提交的任务,实现在子类中。
* 子接口ExecutorService:继承自Executor接口,提供了更多的方法,提供了生命周期的管理方法,以及可跟踪一个或多个异步任务执行状况的方法。方法:shutdown、isShutdown、submit
执行期工具类:Executors
提供了一系列静态工厂方法用于创建各种线程池。返回的线程池都实现了ExecutorService接口。这些实现方式都是基于ThreadPoolExecutor类构造的,只是参数不同。Java编码规约要求开发者,尽量直接使用ThreadPoolExecutor去手动创建,而不使用静态Executors类提供的方法,这样处理便于其他开发者了解ThreadPoolExecutor参数和运行规则
几个提供的执行器线程池:
* newFixedThreadPool(int nThreads):数量固定,使用无界队列LinkedBlockingQueu。可能会导致大量线程进入队列,奔溃
* newSingleThreadExecutor:单线程,也是使用无界队列LinkedBlockingQueu。会造成不断地从队列中获取线程
* newCachedThreadPool:最大数量无限制,队列使用没有容量的SynchronousQueue作为线程池的工作队列。也就是如果线程处理的速度小于生产的速度,线程就会不断创建,占完CPU和资源
ScheduledThreadPoolExecutor-定时调度器-spring中
它主要用来在给定的延迟之后运行任务,或者定期执行任务。比如论坛上定期处理过期帖子
ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
添加任务:一个DelayQueue无界队列,当调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向队列中添加一个未来定期任务ScheduledFutureTask
执行任务:线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
(注意:最新的做了修改,使用了DelayedWorkQueue队列)
如何排序?
DelayQueue封装了一个PriorityQueue,会对任务进行时间、编号排序,时间最前面的在前
执行顺序,对于同一个定期任务:
1、线程从后队列中获取第一个任务
2、如果任务时间到期了,就执行任务
3、执行完毕后,更具定期时间,修改任务时间
4、重新将日期修改后的任务放到任务队列,注意排序
// 1.任务获取
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队列第一个元素
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 计算任务时间,是否到期,到期则出队
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 任务分配给其他线程,等待;否则,分配给当前任务线程
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待任务的延迟时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// 任务加入
public boolean offer(Runnable x) {
//参数校验
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看当前元素数量,如果大于队列长度则进行扩容
int i = size;
if (i >= queue.length)
grow();
//元素数量加1
size = i + 1;
//如果当前队列还没有元素,则直接加入头部
if (i == 0) {
queue[0] = e;
//记录索引
setIndex(e, 0);
} else {
//把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
//把需要最早执行的任务放在前面
siftUp(i, e);
}
//如果新加入的元素就是队列头,这里有两种情况
//1.这是用户提交的第一个任务
//2.新任务进行堆调整以后,排在队列头
if (queue[0] == e) {
//这个变量起优化作用,后面说
leader = null;
//加入元素以后,唤醒worker线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
FutureTask详解:
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。
第11章:Java并发编程实践
设计模式:设计模式并不是一种算法,而是一种思想,一种软件开发思想,便于开发维护
设计模式解耦:为了便于程序的扩展和维护,往往需要将复杂的内部逻辑拆开,从而达到解耦的目的。一下设计模式就是这个作用:
工厂模式:工厂模式通过第三方工厂类,来代替主题类生产商品
模板模式:通过创建一个模板接口,让实现者自己去实现自己的逻辑,从而提取共同点,简化编码
生产者消费者模式:通过提供一个阻塞队列,作为生产者和消费者的桥梁,来达到缓冲的目的