线程消费者和生产者模型
线程消费者和生产者模型
概述:生产者和消费者模型是多线程中的经典模型。在这里我想详细的介绍下关于该模型实现的多种方式。
生产者:数据产生的源头。
消费者:数据流向的结点。
对于该模型,感觉好熟悉。如RPC框架Dubbo就是有生产者和消费者之说,也就是服务提供者和服务消费者接口。对于消息队列也是如此。但是比如Dubbo来说,服务消费者接口是通过注册中心去调用服务的生产者的。而不是直接根据地址调用。为什么这么做呢?以程序设计的角度来看,就是解耦。如下图所示:
约定:消费者从共享资源中获取数据,生产者负责产生数据到共享资源当中。一旦共享资源中数据为空,那么消费者停止消费。同理,如果共享资源中数据已满,那么生产者停止生产。比如说:张三负责蒸馒头,李四负责吃馒头。张三把蒸好的馒头放在桌子上。李四从桌子上获取馒头开吃。
生产者和消费者实现原始实现(线程不安全)
下面的代码为共享资源:
/**
* @author gosaint
* @Description:
* @Date Created in 21:00 2019/1/14
* @Modified By:公共资源中存储数据
*/
public class Share {
private static final Logger logger = LoggerFactory.getLogger(Share.class);
private String username;
private String gender;
/**
* 存储数据
* @param username
* @param gender
*/
public void storage(String username,String gender){
this.username=username;
this.gender=gender;
}
public void fetch(){
logger.info(this.username+":"+this.gender);
}
}
Producter
/**
* @author gosaint
* @Description:
* @Date Created in 20:59 2019/1/14
* @Modified By:
*/
public class Producter implements Runnable{
private Share share;
public Producter(final Share share) {
this.share = share;
}
@Override
public void run() {
for(int i=0;i<50;i++){
if(i % 2==0){
share.storage("李白","男");
}else {
share.storage("李清照","女");
}
}
}
}
Consumer
/**
* @Authgor: gosaint
* @Description:
* @Date Created in 21:09 2019/1/14
* @Modified By:
*/
public class Consumer implements Runnable{
private Share share;
public Consumer(final Share share) {
this.share = share;
}
@Override
public void run() {
for(int i=0;i<50;i++){
share.fetch();
}
}
}
测试:
public static void main(String[] args) {
//启动生产者线程、消费者
Share resource=new Share();
new Thread(new Producter(resource)).start();
new Thread(new Consumer(resource)).start();
}
出现了清一色的数据,为什么会出现这种状况呢。可以这样分析:假设i=4,在共享资源中准备存储数据时,此时另外一个线程也进来了。修改了i=4存储变量的值,导致最终存储的值为清一色。为了更好的观察效果,我们在为变量赋值的过程中,让线程休眠一段时间,修改Share中的storage()、fetch()方法:
public void storage(String username,String gender){
this.username=username;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.gender=gender;
}
public void fetch(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(this.username+":"+this.gender);
}
问题出现了:性别紊乱!
为什么会出现如此的情况呢?假设生产者1生产了李白,男。但是消费者还没有来得及消费。生产者又生产了李清照。还没有来的及存储性别的时候,此时消费者开始消费:取到了李清照,男的情况。上述的示例演示了线程的不安全。很简单,当多线程共享变量时就会出现这样的问题!
生产者和消费者实现原始实现(synchrnoized)
面对此种情况,我们使用synchrnoized即可解决。
public synchronized void storage(String username,String gender){
this.username=username;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.gender=gender;
}
public synchronized void fetch(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(this.username+":"+this.gender);
}
上述的测试结果实在是不令人满意。应该来说是交替出现的。可是Synchrnoized在此种情况下却只保证了数据的完整性。没有保证数据的逻辑性。也就是说我只要往桌子上放好馒头就行,不管你是否消费。写道这里,我想起一道面试题:面试官会问什么时候使用Synchrnoized,什么时候使用lock。哈哈。我引出下文啦。当然这不是绝对的。因为下面的例子我并没有使用lock.
public class Share {
private final Logger logger = LoggerFactory.getLogger(Share.class);
private String username;
private String gender;
private boolean isEmpty;
/**
* 存储数据
* @param username
* @param gender
*/
public synchronized void storage(String username,String gender){
while(!isEmpty){
//共享资源非空,说明消费者没有消费,此时就要等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//此时说明共享资源为空,开始生产
this.username=username;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.gender=gender;
//生产结束,修改容器状态
isEmpty=false;
//唤醒消费者消费
this.notify();
}
public synchronized void fetch(){
try {
while(isEmpty){
//为空的状态.等待
this.wait();
}
//否则开始消费
Thread.sleep(10);
logger.info(this.username+":"+this.gender);
//消费结束,修改容器状态
isEmpty=true;
//唤醒生产者
this.notify();
}catch (InterruptedException e){
}
}
}