多线程--阻塞队列,原子类,ThreadLocal分析

ArrayBlockingQueue:数组实现的有界阻塞队列,按照FIFO的原则对元素进行排序

LinkedBlockingQueue:链表实现的有界阻塞队列,此队列的默认和最大长度为Integer.MAX_VALUE,按照FIFO原则进行排序

PriorityBlockingQueue:优先级排序的无界阻塞队列,默认情况下采用自然顺序升序排列,也可以自定义compareTo()方法指定元素排序规则

DelayQueue:优先级队列实现的无界阻塞队列

SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素

LinkedTransferQueue:链表实现的无界阻塞队列

LinkedBlockingDeque:链表实现的双向阻塞队列

 

ArrayBlockingQueue队列分析:

创建对象时候new ArrayBlockingQueue<>(100)
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    //初始化一个指定大小容量的数组
    this.items = new Object[capacity];
    //重入锁,出队和入队持有这把锁,创建一个非公平锁
    lock = new ReentrantLock(fair);
    //创建一个非空的等待队列condition
    notEmpty = lock.newCondition();
    //创建一个非满的等待队列condition
    notFull =  lock.newCondition();
}

添加元素:

add():

//本质上调用offer方法,如果失败抛出异常
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

 

offer():

//传入对象判空,获取重入锁,如果队列满了返回false,否则添加到队列中
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    //当putIndex等于数组长度时候重置为0
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在
    //消费者线程阻塞,可以开始获取元素
    notEmpty.signal();
}

 

put()方法:

// 和offer方法基本项目,不同的是在队列满了之后,会阻塞
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    //优先允许在等待时候由其他线程调用等待线程的interrupt方法来中断等待直接返回
    //lock方法是尝试获取锁成功后才响应中断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //队列满了的情况下,当前线程会被阻塞挂起到等待队列中
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

 

 

获取元素:

take()方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //如果队列为空,则直接阻塞
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

 

//出队列
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //默认获取0位置的元素
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    //如果拿到数组的最大值,那么重置为0,继续从头部获取数据
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒因为队列满了而导致被阻塞的线程继续添加数据
    notFull.signal();
    return x;
}

 

 

remove()方法

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列不为空
        if (count > 0) {
            //获取下一个要添加元素时的索引
            final int putIndex = this.putIndex;
            //获取要移除的元素的索引
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
  poll()方法:
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //判断当前队列是否为空,如果为空返回null,否则出队列
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

 

 

原子类操作:

原子更新基本类型:AtomicBoolean   AtomicInteger  AtomicLong

原子更新数组:AtomicIntegerArray  AtomicLongArray  AtomicReferenceArray

原子更新引用:AtomicReference AtomicReferenceFieldUpdater

原子更新字段:AtomicIntegerFieldUpdater   AtomicLongFieldUpdater

 

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
static {
    try {
        //获取当前value这个变量在内存中的偏移量,后续会基于这个偏移量从内存中获取到value值和当前值来进行比较
        //实现乐观锁
        valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

 

//通过死循环,CAS乐观锁来做原子递增
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

 

 

 

ThreadLocal分析:

调用ThreadLocal的set()方法,如果是首次调用,则会调用createMap()
public void set(T value) {
    //获取当前线程
    Thread t = Thread.currentThread();
    //通过线程获取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);//首次添加值的时候
}
//创建ThreadLocalMap
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //创建一个16大小的Entry数组
    table = new Entry[INITIAL_CAPACITY];
    //计算数组下标
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    //在指定位置存放一个Entry
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    //扩容大小
    setThreshold(INITIAL_CAPACITY);
}

 

//继承WeakReference,所以key是弱引用,value是强引用,在GC回收时候会被回收掉
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

多线程--阻塞队列,原子类,ThreadLocal分析

 

 

三者之间关系:

ThreadLocal中存在一个静态内部类ThreadLocalMap,在存储数据时候,ThreadLocal的引用作为key存入ThreadLocalMap中的Entry,Value就是对应的值,Thread类中存放ThreadLocal.ThreadLocalMap,因为key是weakRefence(弱引用),所以在每次GC时候会被回收,在ThreadLocalMap实现时候考虑了这个问题,所以在get() set() remove()时候会进行回收处理。如果我们没有手动调用这些方法则会导致内存泄漏问题

 

多线程--阻塞队列,原子类,ThreadLocal分析

 

当第二次调用set()方法时候:

public void set(T value) {
    Thread t = Thread.currentThread();
    //通过当前线程获取TheadLocalMap
    ThreadLocalMap map = getMap(t);
    //如果不为null
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
//使用线性探索来解决脏数据问题,写入:找到发生冲突最近的空闲单元  查找:从发生冲突的位置往后查找
private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    //获取数组下标位置
    int i = key.threadLocalHashCode & (len-1);
    //从i开始往后一直遍历到数组最后一个Entry(线性探索)
    for (Entry e = tab[i];e != null; e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //如果key相等,则直接覆盖
        if (k == key) {
            e.value = value;
            return;
        }
        //如果key为null,则用心的key,value覆盖,同时清理历史key=null的数据
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    //如果超过阈值则要进行扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}