线程和并发(三)阻塞队列和线程池
concurrent之atomic相关
概述
java.util.concurrent.atomic原子操作类包里面提供了一组原子变量类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。
java.util.concurrent.atomic中的类可以分成4组:
- 标量类:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
- 数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
- 更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
- 复合变量类:AtomicMarkableReference,AtomicStampedReference
标量类
AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。其实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。
以为例,源码如下:
public class AtomicInteger extends Number implements java.io.Serializable {
// ……
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
//……
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
public final int getAndSet(int newValue) {
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}
虽然原子的标量类扩展了基本类型的类,但是并没有扩展基本类型的包装类,如Integer或Long,事实上它们也不能直接扩展。因为基本类型的包装类是不可以修改的,而原子变量类是可以修改的。在原子变量类中没有重新定义hashCode或equals方法,每个实例都是不同的,他们也不宜用做基于散列容器中的键值。
数组类
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供volatile访问语义方面也引人注目,这对于普通数组来说是不受支持的。其内部并不是像AtomicInteger一样维持一个volatile变量,而是全部由native方法实现。数组变量进行volatile没有意义,因此set/get就需要unsafe来做了,但是多了一个index来指定操作数组中的哪一个元素。
更新器类
AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater和AtomicLongFieldUpdater 是基于反射的实用工具,可以提供对关联字段类型的访问,可用于获取任意选定volatile字段上的compareAndSet操作。它们主要用于原子数据结构中,该结构中同一节点的几个 volatile 字段都独立受原子更新控制。这些类在如何以及何时使用原子更新方面具有更大的灵活性,但相应的弊端是基于映射的设置较为拙笨、使用不太方便,而且在保证方面也较差。
注:netty5.0中类ChannelOutboundBuffer统计发送的字节总数,由于使用volatile变量已经不能满足,所以使用AtomicIntegerFieldUpdater 来实现的,看下面代码:
//定义
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
private volatile long totalPendingSize;
//使用
long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue + size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
oldValue = totalPendingSize;
newWriteBufferSize = oldValue + size;
}
复合变量类
AtomicMarkableReference 类将单个布尔值与引用关联起来。维护带有标记位的对象引用,可以原子方式更新带有标记位的引用类型。
AtomicStampedReference 类将整数值与引用关联起来。维护带有整数“标志”的对象引用,可以原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
ABA问题:
JDK1.5之后,Atomic包提供的类AtomicStampedReference就解决了这个问题。这个类的compareAndSet方法先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
public V getReference() {
return pair.reference;
}
public int getStamp() {
return pair.stamp;
}
public V get(int[] stampHolder) {
Pair<V> pair = this.pair;
stampHolder[0] = pair.stamp;
return pair.reference;
}
public boolean weakCompareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
return compareAndSet(expectedReference, newReference,
expectedStamp, newStamp);
}
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
}
concurrent之collections相关
java.util.concurrent包,此包下的集合都不允许添加null元素。具体集合见下表:
类 | 对应接口 | 特性 | 并发技术 | 使用场景 |
ConcurrentHashMap | Map | 非阻塞、线程安全 | CAS,synchronized,volatile | |
ConcurrentSkipListMap | Map | 线程安全 | 构造函数支持排序,甚至可按照复合类型字段排序 | |
ConcurrentSkipListSet | Map | 线程安全 | 构造函数支持排序,甚至可按照复合类型字段排序 | |
CopyOnWriteArrayList | List | 线程安全 | 1、读写分离,读和写分开,需要读和写时都是对拷贝的副本进行操作。 2、最终一致性 缺点: 1、由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc 2、不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求; 3、CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用 因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。 |
|
CopyOnWriteArraySet | Set | 线程安全 | 1、读写分离,读和写分开,需要读和写时都是对拷贝的副本进行操作。 2、不存储重复对象 3、最终一致性 |
|
ConcurrentLinkedQueue | Queue | 线程安全、FIFO | 线程安全,但一边遍历一边poll还是不行的 | |
ConcurrentLinkedDeque | Deque | 线程安全、链表 | 链表式操作,可对队首队尾直接操作 | |
ArrayBlockingQueue | Queue、Collection | 有界、阻塞、线程安全、FIFO | 生产者、消费者场景比较合适,并且支持FIFO | |
LinkedBlockingQueue | Queue | 阻塞、线程安全、FIFO、链表 | ||
LinkedTransferQueue | Queue | 阻塞、线程安全、FIFO | LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法: 1. transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。 2. tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。 3. tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。 4. hasWaitingConsumer():判断是否存在消费者线程。 5. getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。 |
|
LinkedBlockingDeque | Deque | 阻塞、线程安全、FIFO、链表 | ||
DelayQueue | Queue | 阻塞、线程安全、FIFO、链表 | ||
PriorityBlockingQueue | Queue | 阻塞、优先级 | 要使用FIFO,您需要插入一个新的FIFOEntry(anEntry),而不是普通的entry对象 | |
SynchronousQueue | Queue | 线程安全、链表 | 只允许一个值添加、取出,无容量 |
ConcurrentHashMap
ConcurrentHashMap 1.7
还在jdk1.7的时候ConcurrentHashMap的底层数据结构其实是由Segment
数组和多个HashEntry
组成,如下图所示:
Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分段技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的散列表+链表的数据存储结构是一样的。这里的每个table就像我们之前所说的HashTable一样。
同时从源码中我们可以看出,Segment继承了ReentrantLock(独占锁)类。使大table中的每个小table都有了一个锁。
static final class Segment<K,V> extends ReentrantLock implements Serializable
ConcurrentHashMap 1.8
在jdk1.8中,ConCurrentHashMap的数据结构底层是:散列表+链表+红黑树,其变化与HashMap在1.8的变化是一样的。
concurrentHashMap特点:
- ConcurrentHashMap支持高并发情况下对哈希表的访问和更新。
- ConcurrentHashMap与HashTable相似,与HashMap不同。
- ConcurrentHashMap的所有操作都是线程安全的。
- 它不允许null用作键或值
- get操作没有上锁。是非阻塞的。所以在并发情况下可以与阻塞的put或remove函数交迭。但在聚合操作下比如putAll和clean,并发情况下由于线程调度的原因get函数可能只能检索到插入和删除的一些Entries(函数还未执行完)。
- 与get函数的处理相类似的还有
Iterators, Spliterators,Enumerations
,在其创建时或之后,倘若ConcurrentHashMap再发生改变就不会再抛ConcurrentModificationException
了。取而代之的是在其改变时new新的数据从而不影响原有的数据,Iterator会在其完成后再将头指针替换为新的数据,这样Iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。 - 不过,迭代器被设计成每次仅由一个线程使用。
- 同时需要注意:
size,isEmpty,containsValue
等函数的使用,在ConcurrentHashMap实例并发情况下是无意义的。它只能反映该实例的一个暂态,除非此时它并未发生并发修改。
ConcurrrentHashMap关键属性
-
volatile Node<K,V>[] table
:装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方(因为继承自HashMap)。 -
transient volatile Node<K,V>[] nextTable
:扩容时使用,平时为null,只有在扩容的时候才为非null。逻辑机制和ArrayList底层的数组扩容一致。 -
transient volatile long baseCount
:元素数量基础计数器,该值也是一个阶段性的值(产出的时候可能容器正在被修改)。通过CAS
的方式进行更改。 -
transient volatile int sizeCtl
:散列表初始化和扩容的大小都是由该变量来控制。当为负数时,它正在被初始化或者扩容。 -
static final sun.misc.Unsafe U
:在ConcurrentHashMap的实现中可以看到大量的U.compareAndSwapXXXX的方法去修改ConcurrentHashMap的一些属性。这些方法实际上是利用了CAS算法保证了线程安全性,这是一种乐观策略,假设每一次操作都不会产生冲突(变量实际值!=期望值),当且仅当冲突发生的时候再去尝试。
Java中transient关键字的作用,简单地说,就是让某些被修饰的成员属性变量不被序列化,这一看好像很好理解,就是不被序列化,那么什么情况下,一个对象的某些字段不需要被序列化呢?如果有如下情况,可以考虑使用关键字transient修饰:
- 类中的字段值可以根据其它字段推导出来,如一个长方形类有三个属性:长度、宽度、面积(示例而已,一般不会这样设计),那么在序列化的时候,面积这个属性就没必要被序列化了;
- 其它,看具体业务需求吧,哪些字段不想被序列化;
ConcurrentHashMap构造函数
// 构造一个空map映射,初始容量16
public ConcurrentHashMap() {
}
// 初始化时明确给定一个初始容量,减少resize次数
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// tableSizeFor 函数返回一个最接近入参 initialCapacity 容量的2进制整数
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 创建一个与给定的 Map 映射具有相同元素的 ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
// 我们前面所述 sizeCtl 两个含义,构造和扩容。
// 此处必然是构造容量为 DEFAULT_CAPACITY = 16 的 ConcurrentHashMap
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public void putAll(Map<? extends K, ? extends V> m) {
// 初始化数组容量,防止直接迭代insert导致频繁扩容
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
// 构造一个空的 Map 映射,并给定其初始容量与加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 构造一个空的 Map 映射,并给定其初始容量,加载因子与预估的并发更新的线程数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
// 该情况下,一个更新线程负责一个HashEntry
initialCapacity = concurrencyLevel; // as estimated threads
// 确定 table 的真实容器大小 = 元素数量 initialCapacity / 元素密度 loadFactor
// 比如你要存30个元素,构造Map的时候传入30和0.75,那么table真实容量就应该是 30/0.75。保证你要存的元素数量是table容器的0.75倍
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
常见API
initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)// sizeCtl变量小于0,即我们前面所提到的为-1。说明此时其他线程已经修改过sizeCtl变量的值,并赋值为-1,说明此时正有线程在构造ConcurrentHashMap对象。此时我们应该终止当前线程的构造操作。
// 1. 保证只有一个线程正在进行初始化操作
// 小知识:Thread.yield()函数在程序只有一个线程运行的时候,会继续运行不再暂停唯一的线程对象。
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 通过CAS函数比较并设置SIZECTL(sizeCtl)常量的值,我们前面对sizeCtl变量解析的时候说过,当ConcurrentHashMap在构造的时候sizeCtl为-1。
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);//无符号右移两位 n-(1/4)n
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。
putVal(K key, V value, boolean onlyIfAbsent)
当且仅当table中不存在该key对应的Entry时才插入该Entry。否则不替换table中原有的Entry中的value。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 1. 计算key的hash值,与HashMap处理逻辑一样
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
/* 4. 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容
这里我有个问题:为什么该位置会是forwordingNode节点 */
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 5. 进行到这一步,说明要插入的位置有值,需要对该桶加锁。
synchronized (f) {
// 确定f是tab中的头节点
if (tabAt(tab, i) == f) {
// fh = 桶首元素的hast值。如果头结点的哈希值大于等于0,说明要插入的节点在(链表)中。否则有可能该桶的数据结构不是链表而是红黑树
if (fh >= 0) {
binCount = 1;
// 开始迭代找key的节点,f = 桶首元素
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果某一节点的key的哈希值及key与参数相等,替换该节点的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 没有找到则继续向后迭代,当迭代到最后一个元素还没有找到时,将该Entry插入到链表尾。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6. 如果要插入的节点在红黑树中,则按照树的方式插入或替换节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7. 如果binCount不为0,说明插入或者替换操作完成了
if (binCount != 0) {
// 判断节点数量是否大于等于8,如果是就需要把链表转化成红黑树
if (binCount >= TREEIFY_THRESHOLD)
// 链表转成红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
// 能执行到这一步,说明节点不是被替换的,是被插入的,否则在binCount判断 !=0 的时候就要被return了。
addCount(1L, binCount);
return null;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
整体流程下来就是:
- 计算key哈希值
- 根据哈希值计算在table中的位置
- 根据哈希值执行插入或替换操作
- 如果这个位置没有值,直接将键值对放进去,不需要加锁。(CAS)
- 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容
- 加锁。以下操作都需要加锁。
- 如果要插入的节点在链表中,遍历链表中的所有节点,如果某一节点的key哈希值和key与参数相等,替换节点的value,记录被替换的值;如果遍历到了最后一个节点,还没找到key对应的节点,根据参数新建节点,插入链表尾部
- 如果要插入的节点在树中,则按照树的方式插入或替换节点。如果是替换操作,记录被替换的值
- 判断节点数量是否大于8,如果大于就需要把链表转化成红黑树
- 如果操作3中执行的是替换操作,返回被替换的value。程序结束。
- 能执行到这一步,说明节点不是被替换的,是被插入的,所以要将map的元素数量加1。
get(Object key)
据我们之前的类头注释所译。get函数没有必要加锁的。但是可以看到的是ConcurrentHashMap对其Node类的next属性加上了volatile关键字进行修饰。来保证并发情况下其他线程若正在与get函数同步的修改该节点的next属性,保证了它的可见性。
Node类
Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
另外可以看出很多属性都是用volatile进行修饰的,也是为了保证并发情况下的该属性的可见性。同事对hash和key用final进行修饰也是提供了这两个常用变量的缓存,性能上有所提高。
CAS
在ConcurrentHashMap中会有大量的CAS操作来修改它的一些属性和操作。所以先来看一些常用的CAS操作是如何保证线程安全的.
tabAt:获取table数组中索引为i的Node元素。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
casTabAt:利用CAS操作设置table数组中索引为i的元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
总结
- 底层结构是散列表(数组+链表)+红黑树,这一点和HashMap是一样的。
- Hashtable是将所有的方法进行同步,效率低下。而ConcurrentHashMap作为一个高并发的容器,它是通过synchronized+CAS算法来进行实现线程安全的。使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。
- 采用synchronized而不是ReentrantLock。
- CAS算法是乐观锁的一种
- ConcurrentHashMap的key和Value都不能为null
- get方法是非阻塞,无锁的。重写Node类,通过volatile修饰next来实现每次获取都是最新设置的值。相比于在jdk1.7中的变化就是不采用segment而采用node,锁住node来实现减小锁粒度。
- sizeCtl的不同值来代表不同含义,起到了控制的作用。
ConcurrentLinkedQueue
ConcurrentLinkedQueue是并发大师Doug Lea(如果看了jdk的concurrent包的源码,相信读者对此人不会陌生)根据Michael-Scott提出的非阻塞链接队列算法的基础上修改而来,它是一个基于链表的无界线程安全队列,它采用先入先出的规则对节点进行排序,当我们添加一个节点的时候,它会添加到队列的尾部;当我们获取一个元素的时,它会返回队列头部的元素。它通过使用head和tail引用延迟更改的方式,减少CAS操作,在满足线程安全的前提下,提高了队列的操作效率。
- offer,add区别:
一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。这时新的 offer 方法就可以起作用了。它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。
- poll,remove区别:
remove() 和 poll() 方法都是从队列中删除第一个元素。remove() 的行为与 Collection 接口的版本相似,但是新的 poll() 方法在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合容易出现异常条件的情况。
- peek,element区别:
element() 和 peek() 用于在队列的头部查询元素。与 remove() 方法类似,在队列为空时, element() 抛出一个异常,而 peek() 返回 null
CopyOnWriteArrayList
其对并发场景下操作提供了更好的支持,其内部使用ReentrantLock进行并发的控制,它主要具有以下特性:
- 所有元素都存储在数组里面, 只有当数组进行remove, update时才在方法上加上ReentrantLock, 拷贝一份snapshot的数组, 只改变snapshot中的元素,最后再赋值到CopyOnWriteArrayList中。
- 所有的get方法只是获取数组对应下标上的元素(无需加锁控制)。
以add方法为例:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//1、获取锁
lock.lock();
try {
//2、获取当前数组元素
Object[] elements = getArray();
//3、获取数组长度
int len = elements.length;
//4、复制旧数组数据到新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
//5、将新元素添加到新数组的尾部
newElements[len] = e;
//6、将新数组更新为当前数组
setArray(newElements);
return true;
} finally {
//7、释放锁
lock.unlock();
}
}
整体操作比较简单。
LinkedBlockingQueue
LinkedBlockingQueue是一个使用链表实现的阻塞队列,支持多线程并发操作,可保证数据的一致性。
与ArrayBlockingQueue相同的是,LinkedBlockingQueue也实现了元素“先进先出(FIFO)”规则,也使用ReentrantLock来保证数据的一致性;
与ArrayBlockingQueue不同的是,LinkedBlockingQueue通常被认为是“无界”的,在默认情况下LinkedBlockingQueue的链表长度为Integer.MAX_VALUE。
成员变量
对于ArrayBlockingQueue来说,当队列在进行入队和出队时,永远只能有一个操作被执行。因为该队列只有一把锁,所以在多线程执行中并不允许同时出队和入队。
与ArrayBlockingQueue不同的是,LinkedBlockingQueue拥有两把锁,一把读锁,一把写锁,可以在多线程情况下,满足同时出队和入队的操作。
在ArrayBlockingQueue中,由于出队入队使用了同一把锁,无论元素增加还是减少,都不会影响到队列元素数量的统计,所以使用了int类型的变量作为队列数量统计。
但是,在LinkedBlockingQueue中则不同。上面说了,在LinkedBlockingQueue中使用了2把锁,在同时出队入队时,都会涉及到对元素数量的并发修改,会有线程安全的问题。因此,在LinkedBlockingQueue中使用了原子操作类AtomicInteger,底层使用CAS(compare and set)来解决数据安全问题。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//队列容量大小,默认为Integer.MAX_VALUE
private final int capacity;
//队列中元素个数:(与ArrayBlockingQueue的不同)
//出队和入队是两把锁
private final AtomicInteger count = new AtomicInteger(0);
//队列--头结点
private transient Node<E> head;
//队列--尾结点
private transient Node<E> last;
//与ArrayBlockingQueue的不同,两把锁
//读取锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队等待条件
private final Condition notEmpty = takeLock.newCondition();
//插入锁
private final ReentrantLock putLock = new ReentrantLock();
//入队等待条件
private final Condition notFull = putLock.newCondition();
}
构造函数
既可以是有界又可以是无解的 :
//默认构造函数:
public LinkedBlockingQueue() {
//默认队列长度为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
//指定队列长度的构造函数:
public LinkedBlockingQueue(int capacity) {
//初始化链表长度不能为0
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//设置头尾结点,元素为null
last = head = new Node<E>(null);
}
插入元素(入队)
LinkedBlockingQueue的插入获取和ArrayBlockingQueue基本类似,都包含有阻塞式和非阻塞式。
put(E e)是阻塞式插入,如果队列中的元素与链表长度相同,则此线程等待,直到有空余空间时,才执行。
//向队列尾部插入元素:队列满了线程等待
public void put(E e) throws InterruptedException {
//不能插入为null元素:
if (e == null) throw new NullPointerException();
int c = -1;
//创建元素结点:
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加插入锁,保证数据的一致性:
putLock.lockInterruptibly();
try {
//当队列元素个数==链表长度
while (count.get() == capacity) {
//插入线程等待:
notFull.await();
}
//插入元素:
enqueue(node);
//队列元素增加:count+1,但返回+1前的count值:
c = count.getAndIncrement();
//容量还没满,唤醒生产者线程
// (例如链表长度为5,此时第五个元素已经插入,c=4,+1=5,所以超过了队列容量,则不会再唤醒生产者线程)
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁:
putLock.unlock();
}
//当c=0时,即意味着之前的队列是空队列,消费者线程都处于等待状态,需要被唤醒进行消费
if (c == 0)
//唤醒消费者线程:
signalNotEmpty();
}
offer(E e)是非阻塞式插入,队列中的元素与链表长度相同时,直接返回false,不会阻塞线程。
//向队列尾部插入元素:返回true/false
public boolean offer(E e) {
//插入元素不能为空
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果队列元素==链表长度,则直接返回false
if (count.get() == capacity)
return false;
int c = -1;
//创建元素结点对象:
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
//加锁,保证数据一致性
putLock.lock();
try {
//队列元素个数 小于 链表长度
if (count.get() < capacity) {
//向队列中插入元素:
enqueue(node);
//增加队列元素个数:
c = count.getAndIncrement();
//容量还没满,唤醒生产者线程:
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁:
putLock.unlock();
}
//此时,代表队列中还有一条数据,可以进行消费,唤醒消费者线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
获取元素(出队)
take():阻塞式出队,获取队列头部元素,如果队列中没有元素,则此线程的等待,直到队列中有元素才执行。
//从队列头部获取元素,并返回。队列为null,则一直等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//设置读取锁:
takeLock.lockInterruptibly();
try {
//如果此时队列为空,则获取线程等待
while (count.get() == 0) {
notEmpty.await();
}
//从队列头部获取元素:
x = dequeue();
//减少队列元素-1,返回count减少前的值;
c = count.getAndDecrement();
//队列中还有可以消费的元素,唤醒其他消费者线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁:
takeLock.unlock();
}
//队列中出现了空余元素,唤醒生产者进行生产。
// (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)
if (c == capacity)
signalNotFull();
return x;
}
poll():非阻塞式出队,当队列中没有元素,则返回null.
//获取头部元素,并返回。队列为空,则直接返回null
public E poll() {
final AtomicInteger count = this.count;
//如果队列中还没有元素,则直接返回 null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
//加锁,保证数据的安全
takeLock.lock();
try {
//此时在判断,队列元素是否大于0
if (count.get() > 0) {
//移除队头元素
x = dequeue();
//减少队列元素个数
c = count.getAndDecrement();
//此时队列中,还有1个元素,唤醒消费者线程继续执行
if (c > 1)
notEmpty.signal();
}
} finally {
//释放锁:
takeLock.unlock();
}
//队列中出现了空余元素,唤醒生产者进行生产。
// (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)
if (c == capacity)
signalNotFull();
return x;
}
ArrayBlockingQueue与LinkedBlockingQueue对比
ArrayBlockingQueue底层基于数组实现,需要使用者指定队列长度,是一个不折不扣的有界队列。
LinkedBlockingQueue底层基于链表实现,无需使用者指定队列长度(可自定义),当使用默认大小时候,是一个无界队列。
ArrayBlockingQueue由于默认必须设置队列长度,所以在使用时会能更好的预测系统性能;而LinkedBlockingQueue默认无参构造,无需指定队列长度,所以在使用时一定要多加注意,当队列中元素短时间内暴增时,可能会对系统产生灾难性影响。
但是,LinkedBlockingQueue的一大优点也是ArrayBlockingQueue所不具备的,那么就是在多个CPU的情况下,LinkedBlockingQueue可以做到同一时刻既消费、又生产。故LinkedBlockingQueue的性能也要优于ArrayBlockingQueue。
生产者消费者实现:
使用LinkedBlockingQueue简单模拟消费者生产者实现;
public class LinkedBlockingQueueTest {
static class Apple{
String colour;
public Apple(String colour){
this.colour = colour;
}
public String getColour() {
return colour;
}
public void setColour(String colour) {
this.colour = colour;
}
}
//生产者
static class Producer implements Runnable{
LinkedBlockingQueue<Apple> queueProducer ;
Apple apple;
public Producer( LinkedBlockingQueue<Apple> queueProducer,Apple apple){
this.queueProducer = queueProducer;
this.apple = apple;
}
public void run() {
try {
System.out.println("生产"+apple.getColour()+"的苹果");
queueProducer.put(apple);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//消费者
static class Consumer implements Runnable{
LinkedBlockingQueue<Apple> queueConsumer ;
public Consumer(LinkedBlockingQueue<Apple> queueConsumer){
this.queueConsumer = queueConsumer;
}
public void run() {
try {
Apple apple = queueConsumer.take();
System.out.println("消费"+apple.getColour()+"的苹果");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Apple> queue = new LinkedBlockingQueue<Apple>();
Apple appleRed = new Apple("红色");
Apple appleGreen = new Apple("绿色");
Producer producer1 = new Producer(queue,appleRed);
Producer producer2 = new Producer(queue,appleGreen);
Consumer consumer = new Consumer(queue);
producer1.run();
producer2.run();
consumer.run();
Thread.sleep(10000);
}
}
concurrent之executors相关
构造函数
在Java中,线程池的概念是Executor这个接口,具体实现为ThreadPoolExecutor类,学习Java中的线程池,就可以直接学习他了
//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- int corePoolSize => 该线程池中核心线程数最大值
线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程
核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉
- int maximumPoolSize:该线程池中线程总数最大值
线程总数 = 核心线程数 + 非核心线程数。
- long keepAliveTime:非核心线程闲置超时时长
一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉
如果设置allowCoreThreadTimeOut = true,则会作用于核心线程
TimeUnit unit:eepAliveTime的单位,TimeUnit是一个枚举类型
BlockingQueue<Runnable> workQueue
该线程池中的任务队列:维护着等待执行的Runnable对象
当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
常用的workQueue类型:
-
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
-
LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
-
ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
-
DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
ThreadFactory threadFactory:创建线程工厂:
new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread new Thread(Runnable r) {
return new Thread(r,"AsyncTask #" + mCount.getAndIncrement());
}
}
RejectedExecutionHandler handler
拒绝策略:
-
AbortPolicy
-- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 -
CallerRunsPolicy
-- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 -
DiscardOldestPolicy
-- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。 -
DiscardPolicy
-- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝 的任务。
执行策略
上面介绍参数的时候其实已经说到了ThreadPoolExecutor执行的策略,这里给总结一下,当一个任务被添加进线程池时:
- 线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务
- 线程数量达到了corePools,则将任务移入队列等待
- 队列已满,新建线程(非核心线程)执行任务
- 队列已满,总线程数又达到了maximumPoolSize,就会(RejectedExecutionHandler)抛出异常
线程池类型
FixedThreadPool()
由Executors
的newFixedThreadPool
方法创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。FixedThreadPool
只有核心线程,且该核心线程都不会被回收,这意味着它可以更快地响应外界的请求
特点:
- 可控制线程最大并发数(同时执行的线程数)
- 超出的线程会在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
CachedThreadPool()
由Executors
的newCachedThreadPool
方法创建,不存在核心线程,只存在数量不定的非核心线程,而且其数量最大值为Integer.MAX_VALUE
。当线程池中的线程都处于活动时(全满),线程池会创建新的线程来处理新的任务,否则就会利用新的线程来处理新的任务,线程池中的空闲线程都有超时机制,默认超时时长为60s,超过60s的空闲线程就会被回收。和FixedThreadPool不同的是,CachedThreadPool
的任务队列其实相当于一个空的集合,这将导致任何任务都会被执行,因为在这种场景下SynchronousQueue
是不能插入任务的,SynchronousQueue
是一个特殊的队列,在很多情况下可以理解为一个无法储存元素的队列。从CachedThreadPool
的特性看,这类线程比较适合执行大量耗时较小的任务。当整个线程池都处于闲置状态时,线程池中的线程都会因为超时而被停止回收,几乎是不占任何系统资源。
特点:
- 线程数无限制
- 有空闲线程则复用空闲线程,若无空闲线程则新建线程
- 一定程序减少频繁创建/销毁线程,减少系统开销
创建方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ScheduledThreadPool()
通过Executors
的newScheduledThreadPool
方式创建,核心线程数量是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时它会被立即回收,ScheduledThreadPool
这类线程池主要用于执行定时任务和具有固定时期的重复任务。
特点:
- 支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
SingleThreadExecutor()
通过Executors
的newSingleThreadExecutor
方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor
的意义在于统一所有外界任务一个线程中,这使得这些任务之间不需要处理线程同步的问题
特点:
- 有且仅有一个工作线程执行任务
- 所有任务按照指定顺序执行,即遵循队列的入队出队规则
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用案例
案例一:countDownLatch
package com.br.lucky.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class FatureTest {
//1、配置线程池
private static ExecutorService es = Executors.newFixedThreadPool(20);
//2、封装响应Feature
class BizResult{
public String orderId;
public String data;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
//3、实现Callable接口
class BizTask implements Callable {
private String orderId;
private Object data;
//可以用其他方式
private CountDownLatch countDownLatch;
public BizTask(String orderId, Object data, CountDownLatch countDownLatch) {
this.orderId = orderId;
this.data = data;
this.countDownLatch = countDownLatch;
}
@Override
public Object call() {
try {
//todo business
System.out.println("当前线程Id = " + this.orderId);
BizResult br = new BizResult();
br.setOrderId(this.orderId);
br.setData("some key about your business" + this.getClass());
return br;
}catch (Exception e){
e.printStackTrace();
}finally {
//线程结束时,将计时器减一
countDownLatch.countDown();
}
return null;
}
}
/**
* 业务逻辑入口
*/
public List<Future> beginBusiness() throws InterruptedException {
//模拟批量业务数据
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 1000 ; i++) {
list.add(String.valueOf(i));
}
//设置计数器
CountDownLatch countDownLatch = new CountDownLatch(list.size());
//接收多线程响应结果
List<Future> resultList = new ArrayList<>();
//begin thread
for( int i = 0 ,size = list.size() ; i<size; i++){
//todo something befor thread
resultList.add(es.submit(new BizTask(list.get(i), null, countDownLatch)));
}
//wait finish
countDownLatch.await();
return resultList;
}
public static void main(String[] args) throws InterruptedException {
FatureTest ft = new FatureTest();
List<Future> futures = ft.beginBusiness();
System.out.println("futures.size() = " + futures.size());
//todo some operate
System.out.println(" ==========================end========================= " );
}
}
案例二:future
package com.br.lucky.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class FatureTest {
/**
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 200,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<1000;i++) {
pool.execute(() -> {
//todo 业务逻辑在此
});
}
*/
//1、配置线程池
private static ExecutorService es = Executors.newFixedThreadPool(20);
//2、封装响应Feature
class BizResult{
public String orderId;
public String data;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
//3、实现Callable接口
class BizTask implements Callable {
private String orderId;
private Object data;
public BizTask(String orderId, Object data) {
this.orderId = orderId;
this.data = data;
}
@Override
public Object call() {
try {
//todo business
System.out.println("当前线程Id = " + this.orderId);
BizResult br = new BizResult();
br.setOrderId(this.orderId);
br.setData("some key about your business" + this.getClass());
Thread.sleep(3000);
return br;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
/**
* 业务逻辑入口
*/
public List<Future> beginBusiness() throws InterruptedException, ExecutionException {
//模拟批量业务数据
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 100 ; i++) {
list.add(String.valueOf(i));
}
//接收多线程响应结果
List<Future> resultList = new ArrayList<>();
//begin thread
for( int i = 0 ,size = list.size() ; i<size; i++){
//todo something befor thread
Future future = es.submit(new BizTask(list.get(i), null));
resultList.add(future);
}
for (Future f : resultList) {
f.get();
}
System.out.println(" =====多线程执行结束====== ");
//wait finish
return resultList;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
FatureTest ft = new FatureTest();
List<Future> futures = ft.beginBusiness();
System.out.println("futures.size() = " + futures.size());
//todo some operate
System.out.println(" ==========================end========================= " );
}
}
参考文档:
https://blog.****.net/lmb55/article/details/79547685
https://www.cnblogs.com/zhuwenjoyce/p/9578382.html
https://www.jianshu.com/p/5a9a814c420e
https://blog.****.net/wtopps/article/details/85267115
https://www.jianshu.com/p/d2bbb300ce95