java并发之生产消费模型,Condition和独占锁ReentrantLock的运用
一 1.ReentrantLock简介
- 一个可重入互斥
Lock
具有与使用synchronized
方法和语句访问的隐式监视锁相同的基本行为和语义,但具有扩展功能。 - A
ReentrantLock
由线程拥有 ,最后成功锁定,但尚未解锁。 调用lock
的线程将返回,成功获取锁,当锁不是由另一个线程拥有。 如果当前线程已经拥有该锁,该方法将立即返回。 这可以使用方法isHeldByCurrentThread()
和getHoldCount()
进行检查。
2.主要用到的方法
(1)lock() 获得锁
(2)unlock() 释放锁
3.测试类
开启多个线程,顺序完成任务,彼此互不影响
任务类:
public class Job implements Runnable { private PrintQueue printQueue; public Job(PrintQueue printQueue) { this.printQueue = printQueue; } @Override public void run() { printQueue.printJob(new Object()); } }
打印类:
public class PrintQueue { private final Lock lock = new ReentrantLock(); public void printJob(Object o) { try { lock.lock(); System.out.println(Thread.currentThread().getName() + ": Going to print a document"); Long duration = (long) (Math.random() * 10000); System.out.println(Thread.currentThread().getName() + ":PrintQueue: Printing a Job during " + (duration / 1000) + " seconds"); Thread.sleep(duration); System.out.printf(Thread.currentThread().getName() + ": The document has been printed\n"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) {//测试 PrintQueue printQueue = new PrintQueue(); Job job = new Job(printQueue); for (int i = 0;i < 5;i++) { new Thread(job,"thread_" + i).start(); } } }
测试结果:如下图所示,各个线程独立执行互不影响。
二 1.condition简介
-
条件(也称为条件队列或条件变量 )为一个线程暂停执行(“等待”)提供了一种方法,直到另一个线程通知某些状态现在可能为真。 因为访问此共享状态信息发生在不同的线程中,所以它必须被保护,因此某种形式的锁与该条件相关联。 等待条件的关键属性是它原子地释放相关的锁并挂起当前线程,就像
Object.wait
。一个
Condition
实例本质上绑定到一个锁。 要获得特定Condition
实例的Condition实例,请使用其newCondition()
方法。
2.主要方法
3.测试类
生产者生产产品,放入仓库,消费者从仓库取出来消费。只有仓库有产品时才能消费,为空时生产者开始生产产品。
仓库实体类:
public class Depot { private int depotSize; //仓库容量 private Lock lock; //独占锁 private int capaity; private Condition fullCondition; private Condition emptyCondition; public Depot() { this.depotSize = 0; this.lock = new ReentrantLock(); this.capaity = 100; this.fullCondition = this.lock.newCondition(); this.emptyCondition = this.lock.newCondition(); } public void put(int count) { lock.lock(); try { int left = count; while (left > 0) { while (depotSize >= capaity) { //仓库已满,生产者停止生产 fullCondition.await(); } //实际生产量 int inc = depotSize + left > capaity ? capaity - depotSize : left; depotSize += inc; left -= inc; System.out.println(Thread.currentThread().getName() + " 要生产的数量: " + count + " 实际生产的数量: " + inc + " 没有生产的数量: " + left + "---->当前仓库产品数量为:" + this.depotSize); //通知消费者可以消费了 emptyCondition.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { //务必在finally代码块中释放锁 lock.unlock(); } } public void get(int count) { lock.lock(); try { int left = count; while (left > 0) { while (depotSize <= 0) { //仓库没货了,消费者等待生产者 生产 emptyCondition.await(); } //实际消费数 int dec = depotSize < left ? depotSize : left; depotSize -= dec; left -= dec; System.out.println(Thread.currentThread().getName() + " 要消费的数量: " + count + " 实际消费的数量: " + dec + " 没有消费的数量: " + left +"---->当前仓库产品数量为:" + this.depotSize); //通知生产者可以继续生产了 fullCondition.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
消费者:
public class Customer { private Depot depot; public Customer(Depot depot) { this.depot = depot; } public void consume(final int count) { new Thread(){ @Override public void run() { depot.get(count); } }.start(); } }
生产者:
public class Producer { private Depot depot; public Producer(Depot depot) { this.depot = depot; } public void produce(final int count) { new Thread(){ @Override public void run() { depot.put(count); } }.start(); } }
测试类:
public class Test { public static void main(String[] args) { Depot depot = new Depot(); Customer customer = new Customer(depot); Producer producer = new Producer(depot); producer.produce(10); customer.consume(5); producer.produce(10); customer.consume(8); producer.produce(40); customer.consume(30); customer.consume(30); producer.produce(13); } }
测试结果:
如结果所示,可以看出符合先生产后消费的逻辑。