生产者与消费者(读书笔记)
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Auther: zch
* @Date: 2019/1/8 17:02
* @Description:
*/
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingDeque<PCData>blockingDeque;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingDeque<PCData> blockingDeque) {
this.blockingDeque = blockingDeque;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id=" + Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData((count.incrementAndGet()));
System.out.println(data+" is put into queue");
if (!blockingDeque.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("failed to put data:"+data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
import com.sun.xml.internal.bind.v2.runtime.output.Pcdata;
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingDeque;
/**
* @Auther: zch
* @Date: 2019/1/8 17:11
* @Description:
*/
public class Consumer implements Runnable {
private BlockingDeque<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingDeque<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start consuner id=" + Thread.currentThread().getId());
Random r = new Random();
try {
while (true) {
PCData data = queue.take();
if (null != data) {
int re = data.getIntData() * data.getIntData();
System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(), data.getIntData(), re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
/**
* @Auther: zch
* @Date: 2019/1/8 17:04
* @Description:
*/
public final class PCData {
private final int intData;
public PCData(int intData) {
this.intData = intData;
}
public PCData(String intData) {
this.intData = Integer.parseInt(intData);
}
public int getIntData() {
return intData;
}
@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
}
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @Auther: zch
* @Date: 2019/1/8 17:17
* @Description:
*/
public class ProducerAndConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingDeque<PCData> queue = new LinkedBlockingDeque<>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
// 由于消费者是自循环且无法退出,所以认为任务没完成,无法停止
service.shutdown();
}
}