多线程--阻塞队列,原子类,ThreadLocal分析
ArrayBlockingQueue:数组实现的有界阻塞队列,按照FIFO的原则对元素进行排序
LinkedBlockingQueue:链表实现的有界阻塞队列,此队列的默认和最大长度为Integer.MAX_VALUE,按照FIFO原则进行排序
PriorityBlockingQueue:优先级排序的无界阻塞队列,默认情况下采用自然顺序升序排列,也可以自定义compareTo()方法指定元素排序规则
DelayQueue:优先级队列实现的无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素
LinkedTransferQueue:链表实现的无界阻塞队列
LinkedBlockingDeque:链表实现的双向阻塞队列
ArrayBlockingQueue队列分析:
创建对象时候new ArrayBlockingQueue<>(100)
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //初始化一个指定大小容量的数组 this.items = new Object[capacity]; //重入锁,出队和入队持有这把锁,创建一个非公平锁 lock = new ReentrantLock(fair); //创建一个非空的等待队列condition notEmpty = lock.newCondition(); //创建一个非满的等待队列condition notFull = lock.newCondition(); }
添加元素:
add():
//本质上调用offer方法,如果失败抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
offer():
//传入对象判空,获取重入锁,如果队列满了返回false,否则添加到队列中 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //当putIndex等于数组长度时候重置为0 if (++putIndex == items.length) putIndex = 0; count++; //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在 //消费者线程阻塞,可以开始获取元素 notEmpty.signal(); }
put()方法:
// 和offer方法基本项目,不同的是在队列满了之后,会阻塞 public void put(E e) throws InterruptedException { checkNotNull(e); //优先允许在等待时候由其他线程调用等待线程的interrupt方法来中断等待直接返回 //lock方法是尝试获取锁成功后才响应中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列满了的情况下,当前线程会被阻塞挂起到等待队列中 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
获取元素:
take()方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果队列为空,则直接阻塞 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
//出队列 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") //默认获取0位置的元素 E x = (E) items[takeIndex]; items[takeIndex] = null; //如果拿到数组的最大值,那么重置为0,继续从头部获取数据 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒因为队列满了而导致被阻塞的线程继续添加数据 notFull.signal(); return x; }
remove()方法
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列不为空 if (count > 0) { //获取下一个要添加元素时的索引 final int putIndex = this.putIndex; //获取要移除的元素的索引 int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
poll()方法:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //判断当前队列是否为空,如果为空返回null,否则出队列 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
原子类操作:
原子更新基本类型:AtomicBoolean AtomicInteger AtomicLong
原子更新数组:AtomicIntegerArray AtomicLongArray AtomicReferenceArray
原子更新引用:AtomicReference AtomicReferenceFieldUpdater
原子更新字段:AtomicIntegerFieldUpdater AtomicLongFieldUpdater
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
static { try { //获取当前value这个变量在内存中的偏移量,后续会基于这个偏移量从内存中获取到value值和当前值来进行比较 //实现乐观锁 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }
//通过死循环,CAS乐观锁来做原子递增 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
ThreadLocal分析:
调用ThreadLocal的set()方法,如果是首次调用,则会调用createMap() public void set(T value) { //获取当前线程 Thread t = Thread.currentThread(); //通过线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value);//首次添加值的时候 }
//创建ThreadLocalMap void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { //创建一个16大小的Entry数组 table = new Entry[INITIAL_CAPACITY]; //计算数组下标 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); //在指定位置存放一个Entry table[i] = new Entry(firstKey, firstValue); size = 1; //扩容大小 setThreshold(INITIAL_CAPACITY); }
//继承WeakReference,所以key是弱引用,value是强引用,在GC回收时候会被回收掉 static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
三者之间关系:
ThreadLocal中存在一个静态内部类ThreadLocalMap,在存储数据时候,ThreadLocal的引用作为key存入ThreadLocalMap中的Entry,Value就是对应的值,Thread类中存放ThreadLocal.ThreadLocalMap,因为key是weakRefence(弱引用),所以在每次GC时候会被回收,在ThreadLocalMap实现时候考虑了这个问题,所以在get() set() remove()时候会进行回收处理。如果我们没有手动调用这些方法则会导致内存泄漏问题
当第二次调用set()方法时候:
public void set(T value) { Thread t = Thread.currentThread(); //通过当前线程获取TheadLocalMap ThreadLocalMap map = getMap(t); //如果不为null if (map != null) map.set(this, value); else createMap(t, value); }
//使用线性探索来解决脏数据问题,写入:找到发生冲突最近的空闲单元 查找:从发生冲突的位置往后查找 private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; //获取数组下标位置 int i = key.threadLocalHashCode & (len-1); //从i开始往后一直遍历到数组最后一个Entry(线性探索) for (Entry e = tab[i];e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); //如果key相等,则直接覆盖 if (k == key) { e.value = value; return; } //如果key为null,则用心的key,value覆盖,同时清理历史key=null的数据 if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; //如果超过阈值则要进行扩容 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }