常见的Queue介绍

 

 常见的Queue介绍

 

 

  1. queue常用方法:

remove:从队列中删除第一个元素,不存在报错。

poll:从队列中删除第一个元素,不存在返回null。

peek:查询第一个元素,不存在返回null。

add:往队列加一个元素,队列满报异常。

offer:往队列加一个元素,队列满返回false;

 

put: 往队列加一个元素,队列满时阻塞,直到队列有新的空间。

take:从队列中拿取第一个元素,队列为空则阻塞到队列有元素为止。

 

PriorityQueue,即优先队列。优先队列的作用是能保证每次取出的元素都是队列中权值最小的

Java中PriorityQueue实现了Queue接口,不允许放入null元素;其通过堆实现,具体说是通过完全二叉树(complete binary tree)实现的小顶堆(任意一个非叶子节点的权值,都不大于其左右子节点的权值),也就意味着可以通过数组来作为PriorityQueue的底层实现。

 常见的Queue介绍

 

 

请看下面的案例:

 

public class T_PriorityQueque {

 

    public static void main(String[] args) {

 

        PriorityQueue<String> q = new PriorityQueue<>();

        q.add("c");

        q.add("e");

        q.add("a");

        q.add("d");

        q.add("z");

 

        for (int i = 0; i <5 ; i++) {

            System.out.println(q.poll());

        }

 

    }

}

 

结果:

 常见的Queue介绍

 

ConcurrentLinkedQueue

 

一个基于链接节点的*线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。
新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。

 

简单使用:

 

public class T_ConcurrentQueue {

 

    public static void main(String[] args) {

        Queue<String> strs = new ConcurrentLinkedQueue<>();

 

        for (int i=0;i<10;i++){

            strs.offer("a"+i); //add

        }

        System.out.println(strs);

 

        System.out.println(strs.size());

 

        System.out.println(strs.poll());

 

        System.out.println(strs.peek());

 

        System.out.println(strs.size());

 

    }

}

 

 

结果:

 常见的Queue介绍

 

 

BlockingQueue

BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。队列的特性就是先进先出很容易理解

 

ArrayBlockingQueue

 

简单使用:

 

public class T_ArrayBlockingQueue {

 

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(18);

 

    static Random r = new Random();

 

    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i <10 ; i++) {

            strs.put("a"+i);

        }

 

        strs.offer("aaa", 1, TimeUnit.SECONDS);

 

        System.out.println(strs);

 

    }

}

 

 

结果:

 常见的Queue介绍

 

 

LinkedBlockingDeque

 

LinkedBlockingQueue也是一个阻塞式的队列,与ArrayBlockingQueue的区别是什么呢?

LinkedBlockingQueue保存元素的是一个链表。其内部有一个Node的内部类,其中有一个成员变量 Node next。就这样形成了一个链表的结构,要获取下一个元素,只要调用next就可以了。而ArrayBlockingQueue则是一个数组。

LinkedBlockingQueue内部读写(插入获取)各有一个锁,而ArrayBlockingQueue则读写共享一个锁。

 

简单使用:

 

 

public class T_LinkedBlockingQueue {

 

    static BlockingQueue<String> strs = new LinkedBlockingDeque<>();

 

    static Random r = new Random();

 

    public static void main(String[] args) {

        new Thread(() -> {

 

            for (int i = 0; i < 10; i++) {

 

                try {

                    strs.put("a" + i); //如果满了,就会等待

                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

 

        }, "p1").start();

 

        for (int i = 0; i < 5; i++) {

            new Thread(() -> {

 

                for (; ; ) {

                    try {

                        System.out.println(Thread.currentThread().getName() + "take -" + strs.take()); //如果空了,就会等待

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

 

            }, "c" + i).start();

        }

 

 

    }

 

 

}

 

结果:

 常见的Queue介绍

 

 

 

SynchronousQueue

SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。

也就是说SynchronousQueue的每一次insert操作,必须等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。

这种特性可以让我们想起了Exchanger。和Exchanger不同的是,使用SynchronousQueue可以在两个线程中传递同一个对象。一个线程放对象,另外一个线程取对象。

 

public class T_SynchronusQueue {

 

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> strs = new  SynchronousQueue<>();

        new Thread(()->{

            try {

                System.out.println(strs.take());

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

        }).start();

 

        strs.put("aaa");

        System.out.println(strs.size());

    }

}

 

 

结果:

 

 常见的Queue介绍

 

 

DelayQueue

 

DelayQueue是一个*的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

 

简单使用:

public class T_DelayQueue {

 

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

 

    static Random r = new Random();

 

    static class MyTask implements Delayed{

 

        String name;

        long runningTime;

 

        MyTask(String name,long rt){

            this.name = name;

            this.runningTime = rt;

        }

 

        @Override

        public long getDelay(TimeUnit unit) {

            return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);

        }

 

        @Override

        public int compareTo(Delayed o) {

           if(this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS)){

               return -1;

           }else if(this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)){

               return 1;

           }else

               return 0;

        }

 

        @Override

        public String toString() {

            return "MyTask{" +

                    "name='" + name + '\'' +

                    ", runningTime=" + runningTime +

                    '}';

        }

    }

 

 

    public static void main(String[] args) throws InterruptedException {

        long now = System.currentTimeMillis();

        MyTask t1 = new MyTask("t1",now+1000);

        MyTask t2 = new MyTask("t2",now+2000);

        MyTask t3 = new MyTask("t3",now+1500);

        MyTask t4 = new MyTask("t4",now+2500);

        MyTask t5 = new MyTask("t5",now+500);

 

        tasks.put(t1);

        tasks.put(t2);

        tasks.put(t3);

        tasks.put(t4);

        tasks.put(t5);

 

        System.out.println(tasks);

 

        for (int i = 0; i <5 ; i++) {

            System.out.println(tasks.take());

        }

    }

 

}

 

结果:

 

[MyTask{name='t5', runningTime=1592299436085}, MyTask{name='t1', runningTime=1592299436585}, MyTask{name='t3', runningTime=1592299437085}, MyTask{name='t4', runningTime=1592299438085}, MyTask{name='t2', runningTime=1592299437585}]

MyTask{name='t5', runningTime=1592299436085}

MyTask{name='t1', runningTime=1592299436585}

MyTask{name='t3', runningTime=1592299437085}

MyTask{name='t2', runningTime=1592299437585}

MyTask{name='t4', runningTime=1592299438085}

 

Process finished with exit code 0

 

 

DelayQueue 应用场景

1. 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。 
2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。

3.关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

4.缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

5.任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。

 

LinkedTransferQueue

 

LinkedTransferQueue是在JDK1.7时,J.U.C包新增的一种比较特殊的阻塞队列,它除了具备阻塞队列的常用功能外,还有一个比较特殊的transfer方法。

我们知道,在普通阻塞队列中,当队列为空时,消费者线程(调用takepoll方法的线程)一般会阻塞等待生产者线程往队列中存入元素。而LinkedTransferQueuetransfer方法则比较特殊:

  1. 当有消费者线程阻塞等待时,调用transfer方法的生产者线程不会将元素存入队列,而是直接将元素传递给消费者;
  2. 如果调用transfer方法的生产者线程发现没有正在等待的消费者线程,则会将元素入队,然后会阻塞等待,直到有一个消费者线程来获取该元素。

 

简单使用:

 

public class T_TransferQueue {

 

    public static void main(String[] args) throws InterruptedException {

        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();

 

        new Thread(()->{

 

            try {

                System.out.println(strs.take());

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

 

        }).start();

 

        strs.transfer("aaa");

    }

}

 

结果:

 常见的Queue介绍