线程之间的通信方式:wait/notify
1.什么是线程间的通信
通信,顾名思义就是一种通知交通的方式,在多线程的环境下,如果各个线程之间可以互相通信的话,可以很好地提高工作效率,提高CPU的利用率。
2.线程间常用的通信方式
多线程间的通信一般采取等待/通知机制进行实现,即Object类中的wait()和notify()方法实现的,一个是等待,一个是通知。其实就像我们平时去营业厅办理业务一样,我们要先取号,然后就开始等待,等到听到叫我们号的时候,我们再过去办理业务。
- Java中等待/通知机制的实现
- 如上面所说的,wait()和notify()这两个方法都是Object类中的方法,之所以是超类的方法,其实是因为之前我们说过任何对象都可以作为锁,而这两个方法都是由锁调用的,所以很自然地就可以理解为什么这两个方法是属于超类的。
- wait方法:
- 作用是使当前执行代码的线程进行等待,该方法会将该线程放入”等待队列“中,并且在wait()所在的代码处停止执行,直到接到通知或被中断为止。
- 在调用 wait() 之前,线程必须获得该对象级别锁,即只能在同步方法或同步块中调用 wait() 方法。
- wait() 是释放锁的,即在执行到 wait() 方法之后,当前线程会释放锁,当从 wait() 方法返回前,线程与其他线程竞争重新获得锁
- 此外,还有带一个参数的wait(long),表示在等待一段时间内,如果没有唤醒线程,则会自动唤醒。当然,在这段时间内,也可以由其他线程唤醒。
- notify方法:
- 和 wait() 方法一样, notify() 方法也要在同步块或同步方法中调用,即在调用前,线程也必须获得该对象的对象级别锁。
- 该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选出其中一个是wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。即notify方法一次只随机唤醒一个wait状态的线程。
- 这里需要注意的是,执行notify方法之后,当前线程不会立即释放其拥有的该对象锁,而是执行完之后才会释放该对象锁,被通知的线程也不会立即获得对象锁,而是等待notify方法执行完之后,释放了该对象锁,才可以获得该对象锁。
- notifyAll() 通知所有等待同一共享资源的全部线程从等待状态退出,进入可运行状态,重新竞争获得对象锁。即notifyAll方法可以唤醒所有wait状态的线程。
- wait()/notify()总结:
用一句话来说就是:wait使线程停止运行,notify使停止的线程继续运行 。
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒之后,才会进入就绪队列,等待CPU的调度;反之,一个线程调用wait方法后,就会进入阻塞队列,等待下一次被唤醒。
- 要结合synchronized关键字一起使用,因为他们都需要首先获取该对象的对象锁;
- wait方法是释放锁,notify方法是不释放锁的;
- 线程的四种状态如下图:
- wait/notify线程间通信示例代码
- Mylist代码:
public class MyList { private static List list = new ArrayList(); public static void add() { list.add("我是元素"); } public static int size() { return list.size(); } }
- 线程A:
public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
- 线程B:
public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已发出通知!"); } System.out.println("添加了" + (i + 1) + "个元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
- 测试代码:
public class Run { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 运行结果:
wait begin 1507634541467 添加了1个元素! 添加了2个元素! 添加了3个元素! 添加了4个元素! 已发出通知! 添加了5个元素! 添加了6个元素! 添加了7个元素! 添加了8个元素! 添加了9个元素! 添加了10个元素! wait end 1507634551563
由上面可以看出,虽然线程B在第五个元素的时候发出通知,而线程A实现线程B执行完之后才获得对象锁,这也可以明,wait方法是释放锁的而notify方法是不释放锁的。因为如果notify方法会释放锁的话,那么应该在打印通知之前就执行线程A中的打印wait end。
- Mylist代码:
-
使用wait/notify模拟BlockingQueue阻塞队列
-
BlockingQueue是阻塞队列,我们需要实现的是阻塞的放入和得到数据,设计思路如下:
(1)初始化队列最大长度为5;
(2)需要新加入的时候,判断是否长度为5,如果是5则等待插入;
(3)需要消费元素的时候,判断是否为0,如果是0则等待消费; -
实现代码如下:
public class MyQueue { //1、需要一个承装元素的集合 private final LinkedList<Object> list = new LinkedList<>(); //2、需要一个计数器 private final AtomicInteger count = new AtomicInteger(0); //3、需要指定上限和下限 private final int maxSize = 5; private final int minSize = 0; //5、初始化锁对象 private final Object lock = new Object(); /** * put方法 */ public void put(Object obj) { synchronized (lock) { //达到最大无法添加,进入等到 while (count.get() == maxSize) { try { lock.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } list.add(obj); //加入元素 count.getAndIncrement(); //计数器增加 System.out.println(" 元素 " + obj + " 被添加 "); lock.notify(); //通知另外一个阻塞的线程方法 } } /** * get方法 */ public Object get() { Object temp; synchronized (lock) { //达到最小,没有元素无法消费,进入等待 while (count.get() == minSize) { try { lock.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } count.getAndDecrement(); temp = list.removeFirst(); System.out.println(" 元素 " + temp + " 被消费 "); lock.notify(); } return temp; } private int size() { return count.get(); } public static void main(String[] args) throws Exception { final MyQueue myQueue = new MyQueue(); initMyQueue(myQueue); Thread t1 = new Thread(() -> { myQueue.put("h"); myQueue.put("i"); }, "t1"); Thread t2 = new Thread(() -> { try { Thread.sleep(2000); myQueue.get(); Thread.sleep(2000); myQueue.get(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2"); t1.start(); Thread.sleep(1000); t2.start(); } private static void initMyQueue(MyQueue myQueue) { myQueue.put("a"); myQueue.put("b"); myQueue.put("c"); myQueue.put("d"); myQueue.put("e"); System.out.println("当前元素个数:" + myQueue.size()); } }
-
执行结果:
元素 a 被添加 元素 b 被添加 元素 c 被添加 元素 d 被添加 元素 e 被添加 当前元素个数:5 元素 a 被消费 元素 h 被添加 元素 b 被消费 元素 i 被添加
注意:在数据结构中,队列是可以无长度限制的,就是可以无限扩展,但是对于阻塞队列,他之所以称之为阻塞队列就是因为其有长度限制,也是上述实例中的maxSize,这也是常见的笔试面试题中比较容易忽略的一个地方,想当然的认为只要是队列他就是无长度限制的,看到这里你应该知道了Java中提供的阻塞队列的类是有长度限制的!
-
当调用wait方法的时候,wait方法所在的代码块停止执行,直到被notify唤醒才开始执行。所以这里的get和put中都有wait和notify方法,可以理解为相互制约和唤醒。
-
-
-
注意事项:
-
wait()和notify()方法要在同步块或同步方法中调用,即在调用前,线程也必须获得该对象的对象级别锁。
-
wait方法是释放锁,notify方法是不释放锁的;
-
notify每次唤醒wait等待状态的线程都是随机的,且每次只唤醒一个;
-
notifAll每次唤醒wait等待状态的线程使之重新竞争获取对象锁,优先级最高的那个线程会最先执行;
-
当线程处于wait()状态时,调用线程对象的interrupt()方法会出现InterruptedException异常;
-
3.其他通信方式
(1)进程间的通信方式:
管道(pipe)、有名管道(named pipe)、信号量(semophore)、消息队列(message
queue)、信号(signal)、共享内存(shared memory)、套接字(socket);
(2)线程程间的通信方式:
1、锁机制:
1.1 互斥锁:提供了以排它方式阻止数据结构被并发修改的方法。
1.2 读写锁:允许多个线程同时读共享数据,而对写操作互斥。
1.3 条件变量:可以以原子的方式阻塞进程,直到某个特定条件为真为止。
对条件测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。
2、信号量机制:包括无名线程信号量与有名线程信号量
3、信号机制:类似于进程间的信号处理。
线程间通信的主要目的是用于线程同步,所以线程没有像进程通信中用于数据交换的
通信机制。
徐刘根大佬的多线程专栏:https://blog.****.net/column/details/17790.html