《Java并发编程的艺术》读书笔记

君子谋道不谋食,恍惚半载被忧贫

第一章:并发编程的问题
多线程是为了解决效率问题,尽可能使用处理器资源,同时保持线程共享资源安全。
2. 1. 上下文切换:多个线程之间,频繁的切换,浪费调度资源
2. 死锁:由于死循环或者互斥等造成的线程无法继续进行,永久等待的情况
3. 资源限制的挑战:读入速度大于写入速度

第二章:java并发机制的底层实现和主要工具

  1. 轻量锁:volatile-单词的意思是不稳定,即告诉处理器此变量不稳定,不能够进行重排序,需要可见性
    volatile使用缓存一致性来保持安全,即线程内存缓存中的变量被修改后,修改立即通知其他的处理器,检查当前变量,修改为最新的数据。同时不能使用重排序;
    2.重量级锁:synchronized。修饰方法,代码块;后续synchronized引入的等级锁,在某些情况下,并不是重量级的。完全的线程屏蔽
    三种锁的类型:
  • 偏向锁:时间发现,大部分情况下,都是同一个线程在反复使用资源。此时如果加锁,会频繁获取和释放锁,这种操作会浪费资源,所以,会出现偏向锁,如果下一次获取的还是当前锁,则直接将锁给予当前线程。否则,有竞争,撤销偏向锁。
  • 轻量级锁:当发生线程竞争是,竞争的一方如果没有获取锁,就会采取自旋操作,等待获取,长期获取不到,则升级到重量级锁。锁的等级只能上升,不能下降
  • 重量级锁:即synchronized锁-默认偏向锁。
    3.原子操作
    一个操作内需要全部完成,不能受外界干扰。原子操作是一个集合,一个事物,即一个任务要么执行完全,要么不执行。

第三章:java内存模型
并发的重要数据是指堆中的数据,这里的数据是线程共享的,包括实例域、静态域和数组元素。
《Java并发编程的艺术》读书笔记
1.指令重排序:处理器底层为了计算的优化,会对数据指令进行重新排序,从而降低元素的重复获取,而不影响单线程逻辑。但是在多线程下可能出现问题。
2.顺序一致性:先来的先操作完,后来的等待
3.volatile:即不稳定,告诉处理器内部,操作此元素,不经过缓存,不重排序,直接对内存进行操作。修改后通知其他线程。
4.锁的定义:
5.final
6.happens-before:as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同
步的多线程程序的执行结果不被改变。
7.双重检查锁定-延迟初始化
8.java内存模型综述

第四章:Java并发编程基础

什么是线程:
线程是操作系统调度的最小单元,是一种轻量级进程。一个进程一般包括多个线程,每一个线程都有自己的计数器、堆栈、局部变量属性。进程中的多线程共享一个内存。

多线程的作用,优势:
* 充分应用多核处理器的性能
* 更快速的响应

线程优先级:
操作系统同时时间片来控制线程的使用和切换,一般包括以下四种:
* 先来先服务
* 最短作业优先
* 最大优先级调度
* 固定时间片轮转
设定优先级可以按照定义的优先级顺序获取锁(尽量)-默认优先级是5
使用方法:
一般而言,优先级的设定是为了防止CPU独占。偏重于计算的优先级低,防止CPU获取不放;偏重于频繁阻塞的线程,优先级高,这样可以快速得到CPU,但是不长久持有,即频率高的。一般是IO操作。

线程的状态:
初始化new:线程实例化。线程调用start方法,启动
运行态:running(包括就绪态课运行状态),线程获得CPU,进行业务操作
阻塞状态:blocked线程未获得锁,进入阻塞队列
等待状态:waiting线程进入等待状态,但是不释放锁,等待通知,或者睡眠醒来或者中断
等待超时:time_waiting
终止状态:terminated线程业务完成

两个命令:
jps:获取当前的java线程
jstack 929:获取线程ID运行信息

《Java并发编程的艺术》读书笔记

1.new实例化
2.调用start方法进入就绪状态,等待系统调度
3.如果获取到系统时间片,即进入运行态running
4.若调用yield-退让,则表示放弃时间片,重新进入就绪状态(注意,也可能会再次获得时间片)
5.若执行wait方法,join方法,LockSuport.Park方法,则会进入等待线程。同时放弃时间片
* wait方法:等待线程,让出时间片;需要notify唤醒,或者notifyAll全部唤醒
* join方法:等待主线程终止之后,才能从当前线程返回
* LockSupport.park泊车,即退出时间片,让出车位。需要调用unpark再次进入就绪
6.wait方法,join方法,LockSuport.Park方法如果带有超时设置参数,会进入等待超时,如果超时,则返回线程,继续后面的步骤,一般用于后续步骤不受等待线程影响时。其中sleep不释放锁,时间超时后重新执行之后的流程。
7.如果当前线程在运行中需要进入某个同步方法,如果未获得锁,则进入阻塞状态,等待获取锁,获取到锁之后,才能继续进行。
8.执行完毕,中止线程

注意等待和阻塞的区别:
等待是当前线程的自我行为,一般是为了等待外界的一个信号,终止当前的线程,如wait放弃时间片等待唤醒和sleep等待一段时间重新运行。lock锁也是等待,因为lock的底层使用的是LockSuppor相关方法
阻塞是当前线程遇到了锁,但是获取不到,导致进不了门而阻塞,主要是synchronized锁。这种状态一般不会释放之前获取的锁。

Daemon线程:
daemon即后台、守护的意思,即后台进程。
* 后台进程不在终端显示
* 后台进程只是一个陪伴进程。即如果没有外部进程存在,后台进程也会死掉。

线程中断:
中断用于获取线程的标志,用来判断线程运行中是否被中断了。
* 通过isInterrupted(判断是否被中断
* 通过静态方法Thread.interrupted()对当前中短线程进行复位
其他详细细节再补
线程中断用于在完成固定任务后自动结束线程。通常需要一个信号变量来标志

过期的suspend()、resume()和stop()
相当于cd机的暂停、开始、重新开始,已舍去

4.3. 线程间通信

volatile和synchronized关键词
volatile意为不稳定的,表示所修饰的变量容易被修改,需要操作系统底层,及时的将变化的数值写入到内存中,而不经过缓存。即达到可见性。同时操作系统对含有当前变量的操作不执行重排序。但是volatile的符合操作并不是原子性的,即读取修改是两个操作,不能一步完成。只能单独的读取,写入。一般用作信号标志,比如flag=ture,这个地方只存在写入,不存在读取;而flag=num,就存在读取和写入:读取num的值,然后写入flag的新值
synchronized是重量级锁,可见性原子性都能保证。可以修饰同步快、方法。实例方法对实例对象加锁,静态方法对类进行加锁,同步块需要自己指定对象。
synchronized同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取。统一时刻只有一个线程能够获取对象的监视器。
《Java并发编程的艺术》读书笔记
线程对对象进行操作时,首先需要获取对象的监视器,才能对对象进行操作;否则进入同步队列,等待正在使用的线程的exit通知,进而再次尝试获取监视器。

4.3.2. 等待通知机制
等待通知机制类似于生产者消费者模式,一个线程等待另一个线程的通知信息,进而完成后续任务。

以往的生产者消费者模式:生产者生产大米,消费者消费。如果没有通知,消费者每次去厂里买大米时,可能存在一种情况,有时候没有,有时候有,这样的话,消费者需要不间断的定期去查看,资源消耗的。

使用等待通知后:生产者生产了大米,就给消费者打个电话,通知米有了。

java中的等待通知机制主要使用wait和notify
* 使用wait()、notify()和notifyAll()时需要先对调用对象加锁,因为方法需要获取对象的监视器
* notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
《Java并发编程的艺术》读书笔记
同步队列:存放阻塞线程
等待队列:存放等待线程

  • join方法:等待主线程终止之后才能执行当前线程
  • ThreadLocal:线程变量的副本,每个线程单独持有一个变量。一般用于测试当前线程的业务。比如测试单个线程方法的执行时间,需要排除其他线程的干扰,就可以顶一个一个时间变量ThreadLocal

4.4 线程应用实例:

  1. 等待超时模式:线程超时后,自动返回默认值,继续后续步骤:
    应用:数据库连接池
    在数据库连接池中,连接线程有限,当线程已满时,会导致后续的线程一直处于获取连接池的状态,系统可能会奔溃。因而,加入超时机制后,超过一定时间会自动返回默认值NULL,让客户端等待再次获取。
  2. 线程池技术
    作用:
    * 降低每次创建线程的消耗
    * 方便管理
    结构:线程队列

第五章:Java中的锁

5.1. lock接口
synchronized是隐式锁,即在底层内化,获取和释放在一个继承中,不需要开发者手动操作。这样也就导致了开发的不便捷,无法提供其它的功能
lock锁,提供显示的加锁和释放锁,因而,可以在加锁后进行其他的处理,比如特殊情况的中断操作,超时返回等。

5.2. 队列同步器AQS
队列同步器AbstractQueuedSynchronizer-抽象队列同步器,是同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)基本构成。

同步器是基于模板模式,其他的同步类通过通过继承同步类,并修改相应的方法来达到相应的同步机制
同步器内部采用一个双向队列,来完成同步状态

同步状态:用于标记是否可以获取锁
同步队列:用于保存被阻塞的线程
《Java并发编程的艺术》读书笔记
《Java并发编程的艺术》读书笔记

一句话概述:
AQS的主要作用是维持了一个线程阻塞的双端队列;对于一个线程,首先尝试获取同步状态,获取即执行,否则:如果队列初始化了,直接将当前线程包装为Node通过CAS加入到队列中(addWaiter方法);否则进入enq方法-判断队列是否初始化,进行初始化后,重新入队。
然后进入acquireQueued方法,再次尝试获取锁:判断当前节点的前驱是否为头结点,如果是,表明是第一个节点,直接尝试获取同步状态,成功则返回false,禁止selfInterrupt中断调用;否则进入shouldParkAfterFailedAcquire方法,将节点状态设置为SIGNAL,表示需要unpark解除,同时LockSupport.park阻塞线程,然后执行自旋操作。
参照:https://juejin.im/post/5aeb07ab6fb9a07ac36350c8

5.3. 重入锁ReentrantLock
支持可重入的锁,即当前线程可以进一步获取自己的所,相当于递归操作。synchronized也是可重入锁。
可重入锁的实现:
* 判断是否是当前锁
* 多重锁的释放,保持一个计数状态

以非公平锁为例获取同步状态码:

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
// 非公平锁的获取:获取同步状态锁
final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	// 获取同步状态
	int c = getState();
	// 如果为0,表明没有锁,使用CAS修改同步状态,并将独占锁设置为当前线程
	if (c == 0) {
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// 否则如果独占锁的拥有者就是当前线程,则重入;
	// 增加同步状态值,返回true;否则独占锁被其他线程拥有,获取独占锁失败,返回false
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

// 同理释放同步状态锁
// 直接减少同步状态值,直到同步状态为0,返回true;否则false
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

// 公平锁的获取
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
				// 唯一的不同是,公平锁的获取,需要检查同步队列是否直接前驱,如果有,则等待;相当于先来先得。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

5.4. 读写锁ReentrantReadWriteLock

读写锁是一种读写分离的锁,允许多个线程度读,只允许单个线程写。

同步状态的设计:
与可重入锁不同的是,可重入锁只需要维持一个同步状态就可以最为单一状态,但是读写锁,需要两个状态作为符号,为了节约空间或者说为了方便计算。设计者使用了位运算将读状态和写状态统一到一个32位整形中,左16为表示读状态,右16位表示写状态。
《Java并发编程的艺术》读书笔记
读锁即共享锁:share标志
写锁即独占锁:exclusive标志

// 读锁的获取,即获取共享锁
protected final int tryAcquireShared(int unused) {
	/*
	 * Walkthrough:
	 * 1. If write lock held by another thread, fail.
	 * 2. Otherwise, this thread is eligible for
	 *    lock wrt state, so ask if it should block
	 *    because of queue policy. If not, try
	 *    to grant by CASing state and updating count.
	 *    Note that step does not check for reentrant
	 *    acquires, which is postponed to full version
	 *    to avoid having to check hold count in
	 *    the more typical non-reentrant case.
	 * 3. If step 2 fails either because thread
	 *    apparently not eligible or CAS fails or count
	 *    saturated, chain to version with full retry loop.
	 */
	Thread current = Thread.currentThread();
	int c = getState();
	// 1.计算写锁状态,如果不等于0,且不是当前线程。获取所失败,返回-1
	if (exclusiveCount(c) != 0 &&
		getExclusiveOwnerThread() != current)
		return -1;
	// 否则,计算共享锁状态
	int r = sharedCount(c);
	if (!readerShouldBlock() &&
		r < MAX_COUNT &&
		// 2.获取读锁
		compareAndSetState(c, c + SHARED_UNIT)) {
		// 3.之后的逻辑是为了获取其他信息,比如读锁的个数等
		if (r == 0) {
			firstReader = current;
			firstReaderHoldCount = 1;
		} else if (firstReader == current) {
			firstReaderHoldCount++;
		} else {
			HoldCounter rh = cachedHoldCounter;
			if (rh == null || rh.tid != getThreadId(current))
				cachedHoldCounter = rh = readHolds.get();
			else if (rh.count == 0)
				readHolds.set(rh);
			rh.count++;
		}
		return 1;
	}
	//4. 处理在第二步中CAS操作失败的自旋已经实现重入性
	return fullTryAcquireShared(current);
}

// 读锁的释放
protected final boolean tryReleaseShared(int unused) {
	// 前面还是为了实现getReadHoldCount等新功能
	Thread current = Thread.currentThread();
	if (firstReader == current) {
		// assert firstReaderHoldCount > 0;
		if (firstReaderHoldCount == 1)
			firstReader = null;
		else
			firstReaderHoldCount--;
	} else {
		HoldCounter rh = cachedHoldCounter;
		if (rh == null || rh.tid != getThreadId(current))
			rh = readHolds.get();
		int count = rh.count;
		if (count <= 1) {
			readHolds.remove();
			if (count <= 0)
				throw unmatchedUnlockException();
		}
		--rh.count;
	}
	for (;;) {
		int c = getState();
		int nextc = c - SHARED_UNIT;
		// 直接CAS操作尝试释放锁
		if (compareAndSetState(c, nextc))
			// Releasing the read lock has no effect on readers,
			// but it may allow waiting writers to proceed if
			// both read and write locks are now free.
			return nextc == 0;
	}
}

// 写锁的获取
protected final boolean tryAcquire(int acquires) {
	/*
	 * Walkthrough:
	 * 1. If read count nonzero or write count nonzero
	 *    and owner is a different thread, fail.
	 * 2. If count would saturate, fail. (This can only
	 *    happen if count is already nonzero.)
	 * 3. Otherwise, this thread is eligible for lock if
	 *    it is either a reentrant acquire or
	 *    queue policy allows it. If so, update state
	 *    and set owner.
	 */
	Thread current = Thread.currentThread();
	int c = getState();
	// 获取写状态
	int w = exclusiveCount(c);
	if (c != 0) {
		// (Note: if c != 0 and w == 0 then shared count != 0)
		// 如果读锁被持有或者写锁被其他线程占有
		if (w == 0 || current != getExclusiveOwnerThread())
			return false;
		// 如果写状态大于16位存储量,异常
		if (w + exclusiveCount(acquires) > MAX_COUNT)
			throw new Error("Maximum lock count exceeded");
		// Reentrant acquire
		// 当前线程持有写锁,重入
		setState(c + acquires);
		return true;
	}
	// 没有锁,直接尝试获取
	if (writerShouldBlock() ||
		!compareAndSetState(c, c + acquires))
		return false;
	setExclusiveOwnerThread(current);
	return true;
}

// 写锁的释放
protected final boolean tryRelease(int releases) {
	// 独占锁是否是当前的线程
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	// 释放锁:减少读状态。如果读状态为0,同时则释放所线程独占;
	int nextc = getState() - releases;
	boolean free = exclusiveCount(nextc) == 0;
	if (free)
		setExclusiveOwnerThread(null);
	setState(nextc);
	return free;
}

// 锁降级是指,ReentrantReadWriteLock支持在一个线程中,可以在获取读取的前提下,再次获取写锁,而不需要释放写锁
//示例如下:
void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // Must release read lock before acquiring write lock
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // Recheck state because another thread might have
                // acquired write lock and changed state before we did.
                if (!cacheValid) {
                    data = ...
            cacheValid = true;
          }
          // Downgrade by acquiring read lock before releasing write lock
          rwl.readLock().lock();
        } finally {
          rwl.writeLock().unlock(); // Unlock write, still hold read
        }
      }
 
      try {
        use(data);
      } finally {
        rwl.readLock().unlock();
      }
    }
}

5.5. LockSupport工具
LockSupport工具提供基本的线程阻塞和唤醒功能,AQS的等待通知机制就是使用的这个方法。
与wait方法不同的是,wait方法是object的方法,每次调用都需要释放锁,因而需要在synchronized中使用。

LockSupport工具主要提供两个重要的方法用于阻塞和唤醒线程:
park():停车;park的底层使用的是本地方法,具体的逻辑是,每次调用需要获取一个“许可”信息,成功设置为0。
unpark():启动离开;直接修改“许可”信息为1,如果之前的值为0,表明有线程获取了它,需要调用pthread_cond_signal唤醒之前park的线程

其他park方法,
park(Object blocker):其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。
parkNanos(long nanos) :阻塞当前线程多少时间
parkNanos(Object blocker, long nanos)
parkUntil(long deadline):阻塞线程直到什么时候
parkUntil(Object blocker, long deadline)

5.6. Condition接口
监视器monitor方法:任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等(英文:条件)
待/通知模式.。
《Java并发编程的艺术》读书笔记

condition对象通过lock来创建:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

condition通过对应的两个方法来达到等待和唤醒机制(都由AQS提供,一个内部类)
await():等待
signal():唤醒

// 等待唤醒案例-生产者消费者模式
public class BoundedQueue<T> {
    private Object[]  items;
    // 添加的下标,删除的下标和数组当前数量
    private int       addIndex, removeIndex, count;
    private Lock      lock     = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull  = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

Condition的实现分析:

condition的实现方式是通过ConditionObject实现,ConditionObject是同步器AbstractQueuedSynchronizer的内部类。每一个condition都有一个等待队列,通过这个队列来实现等待通知机制

等待队列Node:
等待队列是一个单向队列,同时又头结点和尾结点。每个节点都是一个线程的引用,即等待的线程
await:释放锁,同时加入等待队列
signal:获取锁,出队
《Java并发编程的艺术》读书笔记

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列(同步队列是竞争队列,等待队列是任务等待)
《Java并发编程的艺术》读书笔记

源码分析:


// 等待
public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	// 将当前线程加入到队尾	
	Node node = addConditionWaiter();
	// 释放同步状态-释放锁
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	// 自旋获取同步状态-即lock,循环检查
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

// 加入队列
private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.如果尾结点不为null,设置t为尾结点
	if (t != null && t.waitStatus != Node.CONDITION) {
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	// 如果尾结点为null,表明未初始化,将头结点指向当前node;
	// 否则入队
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

final int fullyRelease(Node node) {
	boolean failed = true;
	try {
		int savedState = getState();
		if (release(savedState)) {
			//成功释放同步状态
			failed = false;
			return savedState;
		} else {
			throw new IllegalMonitorStateException();
		}
	} finally {
		if (failed)
			node.waitStatus = Node.CANCELLED;
	}
}

// 唤醒
public final void signal() {
	// 确保获取独占锁
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	Node first = firstWaiter;
	// 如果队列不为null,唤醒队列中的一个
	if (first != null)
		doSignal(first);
}

// 唤醒第一个节点
private void doSignal(Node first) {
	do {
		// 先将头结点从队列中移除-出队
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		first.nextWaiter = null;
		// 对头结点进行处理
	} while (!transferForSignal(first) &&
			 (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
		// 设置头结点的状态为-1
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
		// enq将头结点从阻塞队列放入到同步队列队尾,等待时间片
        Node p = enq(node);
        int ws = p.waitStatus;
		// 设置通知信号,并unpark解除
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

《Java并发编程的艺术》读书笔记

解说:lock维护同步队列,condition维护等待通知队列
参考:https://www.jianshu.com/p/28387056eeb4

第六章:Java并发容器和框架

java中提供了大量的并发容器,便于多线程的开发;常见的有hashtable-concurrentHashMap-等

ConcurrentHashMap

一句话综述发展史:

  • hashmap:hashmap只能支持单线程,在多线程情况下;在rehash扩容时,会存在循环死机问题:线程A对链表进行翻转式,线程B可能同样翻转,从而造成节点互相引用问题。
  • hashtable使用synchronized对读写都进行重量级加锁,效率低
  • jdk1.7的concurrenthashmap,使用数组+链表结构,使用segment细粒度锁对一部分数组进行分桶加锁,通过链表解决哈市冲突,加锁方式改用ReentrantLock
  • jdk1.8,1.8版本舍弃了segment,并且大量使用了synchronized,以及CAS无锁操作以保证ConcurrentHashMap操作的线程安全性。底层数据结构改变为采用数组+链表+红黑树的数据形式。
    具体改进:
    • 由于synchronized的改进,引用了锁提升,偏向锁-轻量级锁-重量级锁;在大多数情况下,平均效率并不比ReentrantLock、低
    • 采用链表+红黑树,链表长度大于8之后,会上升为红黑树结构。查找效率更高。
    • 去掉segment,直接对每一个数组项进行加锁

详细源码细节:待补充

ConcurrentLinkedQueue:
在单线程编程中我们会经常用到一些集合类,比如ArrayList,HashMap等,但是这些类都不是线程安全的类。在面试中也经常会有一些考点,比如ArrayList不是线程安全的,Vector是线程安全。而保障Vector线程安全的方式,是非常粗暴的在方法上用synchronized独占锁,将多线程执行变成串行化。要想将ArrayList变成线程安全的也可以使用Collections.synchronizedList(List list)方法ArrayList转换成线程安全的,但这种转换方式依然是通过synchronized修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea大师为我们准备了ConcurrentLinkedQueue这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。

Java中的阻塞队列
定义:阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不
满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

JDK 7提供了7个阻塞队列,如下。
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的*阻塞队列。
·DelayQueue:一个使用优先级队列实现的*阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

Fork/Join框架:

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
相当于map-reduce计算模式,或者归并算法。将任务分给多个线程fork并行执行,然后等待所有的都完成后,进行join操作合并运算。
《Java并发编程的艺术》读书笔记

补充知识点:工作窃取算法
是指一个线程在完成自己的任务之后,再去运行其他线程的任务。为了避免任务冲突,使用队列模式,从另一线程的队尾获取任务。如下图:
《Java并发编程的艺术》读书笔记
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

// fork/join案例
/**
 * 计数器任务
 * 
 * @author tengfei.fangtf
 * @version $Id: CountTask.java, v 0.1 2015-8-1 上午12:00:29 tengfei.fangtf Exp $
 */
public class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2; // 阈值
    private int              start;
    private int              end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        // 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成一个计算任务,负责计算1+2+3+4
        CountTask task = new CountTask(1, 4);
        // 执行一个任务
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }

}

第七章:Java中的13个原子操作类

原子类的底层都是通过unsafe类提供的CAS操作实现的,即比较交换。
CAS保留三个值,原始值地址,最初获取的值,将要修改的值。每次修改时,都会比较原始值地址所指引的值和获取的值是否相等,如果相等,说明,原始值没有被其他地址修改,则修改当前值。否则,被获取的值被其他线程修改了,需要重新获取,直到获取成功(自旋操作)

3原子包装类型:原理相同
AtomicBoolean:原子更新布尔类型。
AtomicInteger:原子更新整型。
AtomicLong:原子更新长整型。

3个原子数组类型:
AtomicIntegerArray:原子更新整型数组里的元素。
AtomicLongArray:原子更新长整型数组里的元素。
AtomicReferenceArray:原子更新引用类型数组里的元素。

1个原子更新引用类型
AtomicReference:原子更新引用类型。可以包装 自定义对象

第八章:Java中的并发工具类
Java中主要包括四个并发工具类
countdownlatch:技术下降门栓-通过维持一个计数器来判断是否到达指定点
cyclicbarrier:同步屏障-通过屏障挡板来阻挡一列线程
Semaphore:信号,控制并发线程数,相当于路口信号灯,只能进有限数量的线程
exchanger:信息交换,两个线程之间通信交换数据

CountDownLatch和CyclicBarrier区别:
* CountDownLatct通过countdown方法消耗计数,可以在同一个线程中多次使用;而CyclicBarrier一个线程只能使用一次
* CountDownLatch不能复位,即如果出现差错不能重新复位,操作不便。CyclicBarrier可以复位
* CountDownLatch构造函数只有一个int值,只提供等待,没有等待完成后操作;CyclicBarrier提供带动做的构造器CyclicBarrier(int parties, Runnable barrierAction) 可以全部都到达等待后,先完成统一的barrierAction操作,在继续各自的线程任务

第九章:Java中的线程池

线程池的作用:
* 重复利用已创建的线程,降低资源消耗
* 线程已经创建好,直接使用,提高响应速度
* 便于统一管理

创建线程池:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue)
参数:
* 核心线程大小:corePoolSize
* 最大允许线程:maximumPoolSize
* 超过核心线程的等待时间:keepAliveTime
* 等待时间单位:unit
* 使用何种阻塞工作队列:workQueue

线程池的工作逻辑:
《Java并发编程的艺术》读书笔记
流程:初始化线程池,线程为0;随着任务的到来,创建线程直到到达核心线程(期间线程即便不用也不销毁);当达到核心线程后,新来的任务加入到任务队列,等待获取;直到任务队列也满了,那么就继续创建线程直到达到最大线程数,此时如果继续到来任务,已经无法处理,按照策略进行处理。
《Java并发编程的艺术》读书笔记

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
		// 如果线程数小于基本线程数,则创建线程并执行当前任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
		// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 如果线程池不在运行中或者无法加入队列,并且当前线程数小于最大线程,则创建线程
        else if (!addWorker(command, false))
            reject(command);
    }

runnableTaskQueue任务队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原
    则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通
    常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用
    移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工
    厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

RejectedExecutionHandler(饱和策略):默认情况下是AbortPolicy

  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

向线程池提交任务:两种方式

  • execute():没有返回值,直接执行任务
  • submit()方法:返回一个future对象,用于获取特定结果信息

关闭线程池:

  • shutdown:依次遍历关闭线程池,调用interrupt,一般只能中断已完成的任务
  • shutdownNow方法:不需要等待所有的线程执行完。先将所有的设置为stop,在调用interrupt

线程池的合理设置:
偏计算CPU:Ncpu+1
偏IO:2*Ncpu
混合型:自动搭配
举例:依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
即:CPU用的时间短,频繁调用,则多创建线程

第十章: Executor框架
工作单元和执行单元的解耦:
以往的线程任务是,首先创建Thread对象,并提供Run方法创建工作单元,然后调用start方法执行单元计算结果。所有的逻辑都包含在一个thread类中,无法完全分离。因为,提供一个Executor来解耦这种关系。

Executor框架由3大部分组成:
* 任务创建:被执行任务需要实现接口Runnable、Callable。
* 任务执行:任务执行机制的核心接口Executor,继承Executor的ExecutorService、CompletionService等。
* 任务结果获取:Future和实现了Future接口的FutureTask类。

任务创建省略
任务执行:
* 主接口Executor:只定义了一个方法void execute(Runnable command);该方法接受一个Runnable实例,作用就是执行提交的任务,实现在子类中。
* 子接口ExecutorService:继承自Executor接口,提供了更多的方法,提供了生命周期的管理方法,以及可跟踪一个或多个异步任务执行状况的方法。方法:shutdown、isShutdown、submit

执行期工具类:Executors
提供了一系列静态工厂方法用于创建各种线程池。返回的线程池都实现了ExecutorService接口。这些实现方式都是基于ThreadPoolExecutor类构造的,只是参数不同。Java编码规约要求开发者,尽量直接使用ThreadPoolExecutor去手动创建,而不使用静态Executors类提供的方法,这样处理便于其他开发者了解ThreadPoolExecutor参数和运行规则

几个提供的执行器线程池:
* newFixedThreadPool(int nThreads):数量固定,使用*队列LinkedBlockingQueu。可能会导致大量线程进入队列,奔溃
* newSingleThreadExecutor:单线程,也是使用*队列LinkedBlockingQueu。会造成不断地从队列中获取线程
* newCachedThreadPool:最大数量无限制,队列使用没有容量的SynchronousQueue作为线程池的工作队列。也就是如果线程处理的速度小于生产的速度,线程就会不断创建,占完CPU和资源

ScheduledThreadPoolExecutor-定时调度器-spring中
它主要用来在给定的延迟之后运行任务,或者定期执行任务。比如论坛上定期处理过期帖子
ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

添加任务:一个DelayQueue*队列,当调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向队列中添加一个未来定期任务ScheduledFutureTask
执行任务:线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
《Java并发编程的艺术》读书笔记
(注意:最新的做了修改,使用了DelayedWorkQueue队列)

如何排序?
DelayQueue封装了一个PriorityQueue,会对任务进行时间、编号排序,时间最前面的在前

《Java并发编程的艺术》读书笔记

执行顺序,对于同一个定期任务:
1、线程从后队列中获取第一个任务
2、如果任务时间到期了,就执行任务
3、执行完毕后,更具定期时间,修改任务时间
4、重新将日期修改后的任务放到任务队列,注意排序

// 1.任务获取
public RunnableScheduledFuture<?> take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			// 获取队列第一个元素
			RunnableScheduledFuture<?> first = queue[0];
			if (first == null)
				available.await();
			else {
				// 计算任务时间,是否到期,到期则出队
				long delay = first.getDelay(NANOSECONDS);
				if (delay <= 0)
					return finishPoll(first);
				first = null; // don't retain ref while waiting
				// 任务分配给其他线程,等待;否则,分配给当前任务线程
				if (leader != null)
					available.await();
				else {
					Thread thisThread = Thread.currentThread();
					leader = thisThread;
					try {
						// 等待任务的延迟时间
						available.awaitNanos(delay);
					} finally {
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		if (leader == null && queue[0] != null)
			available.signal();
		lock.unlock();
	}
}

// 任务加入
public boolean offer(Runnable x) {
	//参数校验
	if (x == null)
		throw new NullPointerException();
	RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		//查看当前元素数量,如果大于队列长度则进行扩容
		int i = size;
		if (i >= queue.length)
			grow();
		//元素数量加1
		size = i + 1;
		//如果当前队列还没有元素,则直接加入头部
		if (i == 0) {
			queue[0] = e;
			//记录索引
			setIndex(e, 0);
		} else {
			//把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
			//把需要最早执行的任务放在前面
			siftUp(i, e);
		}
		//如果新加入的元素就是队列头,这里有两种情况
		//1.这是用户提交的第一个任务
		//2.新任务进行堆调整以后,排在队列头
		if (queue[0] == e) {
			//这个变量起优化作用,后面说
			leader = null;
			//加入元素以后,唤醒worker线程
			available.signal();
		}
	} finally {
		lock.unlock();
	}
	return true;
}

FutureTask详解:
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

第11章:Java并发编程实践

设计模式:设计模式并不是一种算法,而是一种思想,一种软件开发思想,便于开发维护
设计模式解耦:为了便于程序的扩展和维护,往往需要将复杂的内部逻辑拆开,从而达到解耦的目的。一下设计模式就是这个作用:
工厂模式:工厂模式通过第三方工厂类,来代替主题类生产商品
模板模式:通过创建一个模板接口,让实现者自己去实现自己的逻辑,从而提取共同点,简化编码
生产者消费者模式:通过提供一个阻塞队列,作为生产者和消费者的桥梁,来达到缓冲的目的

《Java并发编程的艺术》读书笔记