java Concurrent包学习笔记(四):BlockingQueue
一、BlockingQueue概述
1、阻塞的含义
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
- ,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,即线程会挂起直到队列不满时,线程才继续入队
- 当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,即线程会挂起直到队列不空,线程才继续出队
2、为什么要使用BlockingQueue
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
3、BlockingQueue的一组方法
1)放入数据
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
add(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则抛出异常。
2). 获取数据
poll(time): 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
以下重点讲三个队列,都实现了BlockingQueue
二、DelayQueue
1、使用场景
常常会遇到一些延迟任务(100ms后执行该任务)、周期任务(每10ms执行一次)、超时任务(比如缓存,超时就要移除)等。如果我们要创建一个处理这样任务的调度服务,那么DelayedQueue将是首选!
2、DelayQueue和Delayed简介
DelayedQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,将发生到期。即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使用null元素。
因此DelayQueue可以理解为一个使用时间作为比较条件的PriorityBlockingQueue(优先级阻塞队列)。
priorityQueue是一种优先级队列,这里优先级就是延迟时间,也就是说进入队列的任务安装优先级进行排序,延迟时间最短的在队列前面,先被处理,也就是说,每次从队列中取出的任务都将是到期的任务
DelayQueue里面的元素必须是实现了Delayed接口的元素,Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了**日期。该接口强制执行下列两个方法,也就是说不需要程序去显示调用。
- CompareTo(Delayed o):Delayed接口继承了Comparable接口,进入队列里面的元素会被队列自动用此方法比较两个元素的延迟时间进行排序
- getDelay(TimeUnit unit):这个方法返回到**日期的剩余时间,时间单位由单位参数指定。
3、示例
夏天买来一批食品放入冷藏室,每种食品在冷藏室都有一个保存时间,超过该时间就会变质。食品检查员经常检查食品,超过冷藏时间的食品就要拿出来扔掉。
1)定义一个食品类:Food
class Food implements Delayed{ private String foodName; private long saveTime;//保存时间 private long expireTime;//过期时刻=当前时间+保存时间 public Food(String foodName,long saveTime){ this.foodName=foodName; this.saveTime=saveTime; this.expireTime=TimeUnit.NANOSECONDS.convert(saveTime, TimeUnit.SECONDS)+System.nanoTime(); } @Override public int compareTo(Delayed o) { Food that = (Food)o; //that指的是同一队列里面的其他元素。 if(this.expireTime > that.expireTime){//过期时刻越靠后,越排在队尾. return 1; }else if(this.expireTime==that.expireTime){ return 0; }else{ return -1; } } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expireTime-System.nanoTime(), TimeUnit.NANOSECONDS); } public String getFoodName(){ return this.foodName; } public long getExpireTime(){ return this.expireTime; } public long getSaveTime(){ return this.saveTime; } }
这里,食品类实现了Delayed接口,因此重写了compareTo和getDelay方法。getDelay返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
下面定义一个食品检查员的类:
2)FoodChecker
class FoodChecker implements Runnable{ private DelayQueue<Food> queue; public FoodChecker(DelayQueue<Food> queue){ this.queue=queue; } @Override public void run() { try{ System.out.println("开始检查!"); boolean flag = true; while(flag){ Food food = queue.take();//此处会阻塞,没有时过期食品时不会取出 System.out.println(food.getFoodName()+"食品过期!保存时间:"+food.getSaveTime()+"天."); if(queue.isEmpty()){ flag=false; } } System.out.println("检查完毕!"); }catch(Exception e){ e.printStackTrace(); } } }
食品检查员持有一个DelayQueue,并且不断从该队列中取出过期食品处理掉。
3)主任务类
DelayQueue<Food> queue=new DelayQueue<Food>(); Random r = new Random(); queue.add(new Food("A", getRandomDay(r))); queue.add(new Food("B", getRandomDay(r))); queue.add(new Food("C", getRandomDay(r))); queue.add(new Food("D", getRandomDay(r))); queue.add(new Food("E", getRandomDay(r))); queue.add(new Food("F", getRandomDay(r))); ExecutorService es = Executors.newSingleThreadExecutor(); es.execute(new FoodChecker(queue)); es.shutdown();
在队列中放入几种食品,并随机给一个保存时间。由于FoodChecker只有一个线程,所以这里使用的是SingleThreadExecutor。
运行结果如下:
开始检查! B食品过期!保存时间:1天. C食品过期!保存时间:2天. A食品过期!保存时间:5天. D食品过期!保存时间:5天. E食品过期!保存时间:6天. F食品过期!保存时间:10天. 检查完毕!
这个例子中使用的是DelayQueue的take方法,该方法获取并移除此队列的头部,在可从此队列获得延迟到期的元素之前会一直等待(即阻塞)。另外,还有几个常用方法如下:
poll():获取并移除此队列的头,如果此队列不包含具有已到期延迟时间的元素,则返回 null。(非阻塞)
peek():获取但不移除此队列的头部;如果此队列为空,则返回 null。与poll不同,如果队列中没有到期元素可用,则此方法返回下一个将到期的元素(如果存在一个这样的元素)。
三、ArrayBlockingQueue和LinkedBlockingQueue
1、使用场景 :主要用于实现“生产者-消费者”模式
2、特点
1)ArrayBlockingQueue
- 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,takeIndex和putIndex来维护队列头和尾部的游标
- ArrayBlockingQueue 的生产和消费使用同一个ReentranLock来同步,使用Condition的方法来同步和通信:await()和signal(),这导致生产和消费不能同时进行
-
ArrayBlockingQueue 初始化必须指定容量大小
- ArrayBlockingQueue 进行插入和删除时,直接将对象插入或移除,不会产生或销毁任何额外的对象实例
2)LinkedBlockingQueue
- 内部也维持着一个数据缓冲队列(该队列由一个链表构成),无边界的堵塞队列
- LinkedBlockingQueue的消费和生产 使用 读写2个锁来对写和读进行加锁操作,这样相比一个锁的好处是细化了锁的跨度,读写分离,减小了锁的竞争
- LinkedBlockingQueue 有默认的容量大小为:Integer.MAX_VALUE,当然也可以传入指定的容量大小(非必须指定),而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了
- LinkedBlockingQueue 在生产和消费的时候,需要创建Node对象进行插入或移除,大批量数据的系统中,其对于GC的压力会比较大
3、示例程序
1)生产者:
package blockingqueue.demo; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author Administrator * @date 2018/12/26 */ public class FoodMaker implements Runnable { private BlockingQueue blockingQueue; private String makerName; private Boolean runFlag = true; //这里为什么用AtomicInteger,而不用int i=0;i++这种。因为在高并发的情况下i++是线程不安全的, // 而AtomicInteger的incrementAndGet()方法在一个无限循环体内,不断尝试将一个比当前值大1的新值赋给自己, // 如果失败则说明在执行"获取-设置"操作的时已经被其它线程修改过了,于是便再次进入循环下一次操作,直到成功为止 private static AtomicInteger foodCount = new AtomicInteger(); //构造函数 public FoodMaker(BlockingQueue blockingQueue, String makerName) { this.blockingQueue = blockingQueue; this.makerName = makerName; } public void run() { Random r = new Random(); System.out.println("厨师:" + makerName + "开始做食物"); try { while (runFlag) { //随机休眠一点时间,用随机的目的就是造成厨师制作食物的速度和吃食物的人的速度不匹配,以此来验证阻塞的作用 Thread.sleep(r.nextInt(1000)); String data = "food_" + foodCount.incrementAndGet(); blockingQueue.put(data);//此处队列如果已经满了,当前线程会在这里挂起 String msg = makerName + "将 " + data + " 放入了队列"; System.out.println(msg); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println(makerName + "不再做食物!"); } } public void stop(){ runFlag = false; } }
2)消费者:
package blockingqueue.demo; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * @author Administrator * @date 2018/12/26 */ public class FoodEater implements Runnable { private BlockingQueue blockingQueue; private String eaterName; private Boolean runFlag = true; //构造函数 public FoodEater(BlockingQueue blockingQueue, String eaterName) { this.blockingQueue = blockingQueue; this.eaterName = eaterName; } public void run() { Random r = new Random(); System.out.println("食者:" + eaterName + "走进了食堂"); try { while (runFlag) { //随机休眠一点时间,用随机的目的就是造成厨师制作食物的速度和吃食物的人的速度不匹配,以此来验证阻塞的作用 Thread.sleep(r.nextInt(1000)); String msg1 = eaterName + "等待食物到来========"; System.out.println(msg1); String data = blockingQueue.take().toString(); String msg2 = eaterName + "吃了 " + data; System.out.println(msg2); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println(eaterName + "退出了食堂"); } } public void stop(){ runFlag = false; } }
3) 测试
package blockingqueue.demo; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * @author Administrator * @date 2018/12/27 */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException{ BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); //三个生产者 FoodMaker foodMaker1 = new FoodMaker(queue,"张三"); FoodMaker foodMaker2 = new FoodMaker(queue,"李四"); FoodMaker foodMaker3 = new FoodMaker(queue,"王五"); //两个消费者 FoodEater foodEater1 = new FoodEater(queue,"马六"); FoodEater foodEater2 = new FoodEater(queue,"赵七"); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(foodMaker1); executorService.submit(foodMaker2); executorService.submit(foodMaker3); executorService.submit(foodEater1); executorService.submit(foodEater2); Thread.sleep( 5000); foodMaker1.stop(); foodMaker2.stop(); foodMaker3.stop(); Thread.sleep( 5000); foodEater1.stop(); foodEater2.stop(); Thread.sleep( 2000); executorService.shutdown(); } }
执行结果:
厨师:王五开始做食物 食者:赵七走进了食堂 厨师:张三开始做食物 厨师:李四开始做食物 食者:马六走进了食堂 赵七等待食物到来======== 王五将 food_1 放入了队列 赵七吃了 food_1 王五将 food_2 放入了队列 马六等待食物到来======== 马六吃了 food_2 赵七等待食物到来======== 李四将 food_3 放入了队列 赵七吃了 food_3 张三将 food_4 放入了队列 李四将 food_5 放入了队列 王五将 food_6 放入了队列 王五将 food_7 放入了队列 马六等待食物到来======== 马六吃了 food_4 李四将 food_8 放入了队列 马六等待食物到来======== 马六吃了 food_5 张三将 food_9 放入了队列 赵七等待食物到来======== 赵七吃了 food_6 王五将 food_10 放入了队列 赵七等待食物到来======== 赵七吃了 food_7 王五将 food_11 放入了队列 张三将 food_12 放入了队列 王五将 food_13 放入了队列 李四将 food_14 放入了队列 马六等待食物到来======== 马六吃了 food_8 赵七等待食物到来======== 赵七吃了 food_9 马六等待食物到来======== 马六吃了 food_10 赵七等待食物到来======== 赵七吃了 food_11 王五将 food_15 放入了队列 王五不再做食物! 马六等待食物到来======== 马六吃了 food_12 张三将 food_16 放入了队列 张三不再做食物! 马六等待食物到来======== 马六吃了 food_13 李四将 food_17 放入了队列 李四不再做食物! 马六等待食物到来======== 马六吃了 food_14 赵七等待食物到来======== 赵七吃了 food_15 马六等待食物到来======== 马六吃了 food_16 赵七等待食物到来======== 赵七吃了 food_17 马六等待食物到来======== 赵七等待食物到来========
由以上结果看出来,食者线程并没有执行:System.out.println(eaterName + "退出了食堂");
这是因为当 blockingQueue.take() 的时候发现队列为空就阻塞在这里了,后面虽然调用了stop方法修改了runFlag=false,但是本次循环没有完,所以会一直卡在take的地方。