Java.util.concurrent怎么用
这篇文章主要为大家展示了“Java.util.concurrent怎么用”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Java.util.concurrent怎么用”这篇文章吧。
Java8 在线API https://blog.fondme.cn/apidoc/jdk-1.8-google/
package com.shi.juc; import java.util.concurrent.atomic.AtomicInteger; /** * * @author shiye * * 二.原子变量: jdk1.5之后,java.util.concurrent.atomic 包下提供了常用的原子变量 * 1. volatile 保证了可见性 * 2. CAS (Compare - and -swap) 算法保证数据的原子性 * CAS 算法是硬件对于并发操作共享数据的支持 * CAS 包含三个操作数: * 内存值 V * 预估值(旧值)A * 更新值 B * 当且仅当V==A 时 ,把B的值赋值给V ,否则,不做任何操作 */ public class AtomacTest { public static void main(String[] args) { AtomicThread thread = new AtomicThread(); for (int i = 0; i < 10; i++) { new Thread(thread).start(); } } } class AtomicThread implements Runnable{ public AtomicInteger auAtomicInteger = new AtomicInteger(); public int add() { return auAtomicInteger.getAndIncrement(); } @Override public void run() { System.out.println(add()); } }
package com.shi.juc; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; /** * CopyOnWriteArrayList/CopyOnWriteArraySet (写入并复制) * 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常大。 * 并发迭代读时可以选择使用这个,提高效率 * @author shiye * */ public class CopyOnWriteArrayListTest { public static void main(String[] args) { HelloEntity entity = new HelloEntity(); for (int i = 0; i < 10; i++) { new Thread(()-> { entity.forEachList(); },String.valueOf(i)).start(); } } } class HelloEntity{ private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList(); static { list.add("aaa"); list.add("bbb"); list.add("ccc"); } public void forEachList() { Iterator<String> iterator = list.iterator(); while(iterator.hasNext()) { System.out.println(Thread.currentThread().getName()+"线程"+iterator.next()); list.add("DDD");//再读取的时候添加数据 } } }
package com.shi.juc; import java.util.concurrent.CountDownLatch; /** * CountDownLatch : 闭锁 * @author shiye * */ public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { //闭锁 final CountDownLatch latch = new CountDownLatch(10); //开始时间 long start = System.currentTimeMillis(); /** * 启动10个线程 每个线程 循环答应1000次偶数,计算总的耗时时间 */ for (int i = 0; i < 10; i++) { new Thread( ()->{ synchronized (latch) { for (int j = 0; j <1000; j++) { if(j%10 == 0) { System.out.println(j); } } latch.countDown(); } },String.valueOf(i)).start(); } latch.await(); //结束时间 long end = System.currentTimeMillis(); System.out.println("总耗时为:"+(end - start)); } }
package com.shi.juc; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 允许一组线程全部等待彼此达到共同屏障点的同步辅助。 * 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 * 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。 * @author shiye * *结果: 3 集到龙珠... 0 集到龙珠... 1 集到龙珠... 4 集到龙珠... 2 集到龙珠... 5 集到龙珠... 6 集到龙珠... ********7科线程集齐,召唤神龙...... 7 集到龙珠... 8 集到龙珠... 9 集到龙珠... */ public class TestCyclicBarriar { public static void main(String[] args) { //必须集满7个线程才能够执行 CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{ System.out.println("********7科线程集齐,召唤神龙......"); }); for (int i = 0; i < 10; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t 集到龙珠..."); try { cyclicBarrier.await();//它必须放最下面 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
package com.shi.juc; import java.util.concurrent.Semaphore; /** * 模拟 6部车抢占3个车位 * @author shiye * *运行结果: Thread - 2 抢占到车位了 , 暂停3秒 Thread - 0 抢占到车位了 , 暂停3秒 Thread - 1 抢占到车位了 , 暂停3秒 Thread - 1 离开车位.... Thread - 2 离开车位.... Thread - 0 离开车位.... Thread - 3 抢占到车位了 , 暂停3秒 Thread - 5 抢占到车位了 , 暂停3秒 Thread - 4 抢占到车位了 , 暂停3秒 Thread - 4 离开车位.... Thread - 5 离开车位.... Thread - 3 离开车位.... */ public class TestSemaphore { public static void main(String[] args) { //模拟3个停车位 false:非公平 Semaphore semaphore = new Semaphore(3, false); for (int i = 0; i < 6; i++) { new Thread(()->{ try { semaphore.acquire();//抢占车位 (抢占线程) System.out.println(Thread.currentThread().getName() + " 抢占到车位了 , 暂停3秒" ); Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 离开车位...."); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release();//释放车位(释放线程) } },"Thread - "+i).start(); } } }
package com.shi.juc; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * 一. 创建线程的方式三:实现Callalbe接口.相较于Runable接口方式,方法又返回值,并且可以抛出异常. * 二. 可以当成闭锁来使用 * @author shiye * */ public class TestCallable { public static void main(String[] args) throws InterruptedException, ExecutionException { CallableDemo td = new CallableDemo(); //执行Callable方式,需要FutureTask 实现类的支持,用于接收运算结果 FutureTask<Integer> result = new FutureTask<>(td); new Thread(result).start(); //接收执行后的结果 System.out.println("---------当前线程开始了---------"); Integer sum = result.get(); //获取当前的值时 会导致当前线程一下的全部暂停执行,直到获取该值(慎用) System.out.println(" 和 : "+sum); System.out.println("---------当前线程终止了---------"); } } /** * * @author shiye * 创建线程并且提供返回值 */ class CallableDemo implements Callable<Integer>{ @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i < Integer.MAX_VALUE; i++) { sum+=i; } Thread.sleep(10000); return sum; } } /** * 实现Runable接口的方式实现的结果 class RunableThread implements Runnable{ @Override public void run() { } } */
package com.shi.juc; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 用于解决线程同步的问题 * Synchronized:隐士锁 * 1. 同步代码块 * 2. 同步方法 * * JDK 1.5 以后 * 3.同步锁 : Lock * 注意: 是一个显示锁,需要通过Lock()方法上锁, 必须通过 unlock() 解锁(放在finnal中最好) * * @author shiye * */ public class TestLock { static int toket = 100; public static void main(String[] args) { Lock lock = new ReentrantLock(); /** * 创建10个线程去卖票 */ for (int i = 0; i < 10; i++) { new Thread(()->{ while(toket>0) { lock.lock();//加锁 try { if(toket>0) { Thread.sleep(200); System.out.println(Thread.currentThread().getName()+"号线程正在卖票,剩余" + (--toket)); } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock();//解锁 } } },String.valueOf(i)).start(); } } }
package com.shi.juc; /** * 使用隐士锁实现的synchronized * 生产者消费者问题: * 保证生产者生产的货能即使被消费掉(产品不存在剩余) * @author shiye * */ public class TestProductAndConsoumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor pro = new Productor(clerk); Consumer con = new Consumer(clerk); new Thread(pro,"生产者A").start(); new Thread(con,"消费者B").start(); new Thread(pro,"生产者C").start(); new Thread(con,"消费者D").start(); } } //售货员 class Clerk{ private int product = 0; //进货 public synchronized void get() throws InterruptedException { while(product>=1) { System.out.println(Thread.currentThread().getName() + " :产品已满!"); this.wait();//wait()方法必须放到循环中才行,避免虚假唤醒的问题 } System.out.println(Thread.currentThread().getName() +" 进货: " + (++product)); this.notifyAll(); } //卖货 public synchronized void sale() throws InterruptedException { while(product<=0) { System.out.println(Thread.currentThread().getName() + " :产品已经售罄!"); this.wait(); } System.out.println(Thread.currentThread().getName()+" 卖货: " +(--product)); this.notifyAll(); } } //生产者 class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { Thread.sleep(200); clerk.get(); } catch (InterruptedException e) { e.printStackTrace(); } } } } //消费者 class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { clerk.sale(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package com.shi.juc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 使用显示锁解决:lock * 生产者消费者问题: * 保证生产者生产的货能即使被消费掉(产品不存在剩余) * @author shiye * */ public class TestProductAndConsoumerforLock { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor pro = new Productor(clerk); Consumer con = new Consumer(clerk); new Thread(pro,"生产者A").start(); new Thread(con,"消费者B").start(); new Thread(pro,"生产者C").start(); new Thread(con,"消费者D").start(); } } //售货员 class Clerk{ private int product = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //进货 public void get() throws InterruptedException { try { lock.lock();//加锁 while(product>=1) {//循环是为了,避免虚假唤醒的问题 System.out.println(Thread.currentThread().getName() + " :产品已满!"); condition.await();//线程等待 } System.out.println(Thread.currentThread().getName() +" 进货: " + (++product)); condition.signalAll();//线程唤醒 } finally { lock.unlock();//解锁 } } //卖货 public void sale() throws InterruptedException { try { lock.lock();//加锁 while(product<=0) {//循环是为了,避免虚假唤醒的问题 System.out.println(Thread.currentThread().getName() + " :产品已经售罄!"); condition.await();//线程等待 } System.out.println(Thread.currentThread().getName()+" 卖货: " +(--product)); condition.signalAll();//线程唤醒 } finally { lock.unlock();//解锁 } } } //生产者 class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { Thread.sleep(200); clerk.get(); } catch (InterruptedException e) { e.printStackTrace(); } } } } //消费者 class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { clerk.sale(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package com.shi.juc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 开启6个线程,依次打印ABCABC... 循环打印 * @author shiye * *结果: A-1 : 打印 0 遍 B-1 : 打印 0 遍 C-2 : 打印 0 遍 A-2 : 打印 0 遍 B-2 : 打印 0 遍 C-1 : 打印 0 遍 A-1 : 打印 0 遍 B-1 : 打印 0 遍 C-2 : 打印 0 遍 A-2 : 打印 0 遍 B-2 : 打印 0 遍 C-1 : 打印 0 遍 */ public class TestABCAlert { public static void main(String[] args) { AlternateDemo demo = new AlternateDemo(); System.out.println("-----------第一轮线程-------------"); //线程A new Thread( ()->{ while(true) { demo.printA(); } },"A-1").start(); //线程B new Thread(()->{ while(true) { demo.printB(); } },"B-1").start(); //线程C new Thread(()-> { while (true) { demo.printC(); } },"C-1").start(); System.out.println("-----------第二轮线程-------------"); //线程A new Thread( ()->{ while(true) { demo.printA(); } },"A-2").start(); //线程B new Thread(()->{ while(true) { demo.printB(); } },"B-2").start(); //线程C new Thread(()-> { while (true) { demo.printC(); } },"C-2").start(); } } class AlternateDemo{ private int number = 1;//当前线程执行线程的标记 private Lock lock = new ReentrantLock();//显示锁 private Condition condition1 = lock.newCondition();//线程之间的通讯 private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); //打印A public void printA() { lock.lock();//加锁 try { while(number != 1) { //一定要用while 不能要if 因为:存在线程线程虚假唤醒,线程抢占的问题 condition1.await();//线程 A 等待 } for (int i = 0; i < 1; i++) { System.out.println(Thread.currentThread().getName() + " : 打印 "+ i + " 遍 "); } number = 2; condition2.signal();//唤醒2线程 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock();//解锁 } } //打印B public void printB() { lock.lock();//上锁 try { while(number != 2) {//一定要用while 不能要if 因为:存在线程线程虚假唤醒,线程抢占的问题 condition2.await(); } for (int i = 0; i < 1; i++) { System.out.println(Thread.currentThread().getName() + " : 打印 "+ i + " 遍 "); } number = 3; condition3.signal();//唤醒3线程 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock();//解锁 } } //打印C public void printC() { lock.lock();//上锁 try { while(number != 3) {//一定要用while 不能要if 因为:存在线程线程虚假唤醒,线程抢占的问题 condition3.await(); } for (int i = 0; i < 1; i++) { System.out.println(Thread.currentThread().getName() + " : 打印 "+ i + " 遍 "); } number = 1; condition1.signal();//唤醒1线程 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock();//解锁 } } }
package com.shi.juc; import java.time.LocalTime; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 读写锁 * 写写/读写 需要互斥 * 读读 不需要互斥 * @author shiye * * 结果: 完全证明上面的理论成功 read-0 正在读 0 当前时间:14:47:47.534 read-1 正在读 0 当前时间:14:47:47.534 write-0 写过之后的值为: 111 当前时间:14:47:52.535 write-1 写过之后的值为: 111 当前时间:14:47:57.536 write-2 写过之后的值为: 111 当前时间:14:48:02.536 read-2 正在读 111 当前时间:14:48:07.537 read-3 正在读 111 当前时间:14:48:07.537 write-3 写过之后的值为: 111 当前时间:14:48:12.537 write-5 写过之后的值为: 111 当前时间:14:48:17.537 * */ public class TestReadAndWrite { public static void main(String[] args) { ReadAndWrite item = new ReadAndWrite(); //启动100个读写线程操作数据 for (int i = 0; i < 10; i++) { //读线程 new Thread(()->{ item.read(); },"read-" + i ).start(); //写线程 new Thread(()->{ item.write(); },"write-" + i ).start(); } } } class ReadAndWrite{ private int number = 0; private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();//读写锁 //读 public void read() { readWriteLock.readLock().lock();//上读锁 try { Thread.sleep(5000);//睡眠5秒钟 System.out.println(Thread.currentThread().getName() + " 正在读 " + number + " 当前时间:"+ LocalTime.now()); } catch (InterruptedException e) { e.printStackTrace(); }finally { readWriteLock.readLock().unlock();//释放读锁 } } //写 public void write() { readWriteLock.writeLock().lock(); try { number = 111; Thread.sleep(5000);//写需要花费5s钟时间 System.out.println(Thread.currentThread().getName() + " 写过之后的值为: " + number+ " 当前时间:"+ LocalTime.now()); } catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.writeLock().unlock(); } } }
package com.shi.juc; /** * 线程8锁 * @author shiye * * 1. 俩个普通同步方法,俩个线程,标准打印,打印?// one two * 2. 新增Thread.sleep() 给 getOne() , 打印?// one two * 3. 新增普通getThree(), 打印? // Three one two * 4. 俩个普通同步方法, 俩个Number对象 打印?// two one * 5. 修改getOne() 为静态同步方法, 一个Number对象?//two one * 6. 修改俩个方法均为 静态同步方法,一个Number对像?//one two * 7. 一个静态同步方法,一个非静态同步方法,俩个Number?//two one * 8. 俩个静态同步方法,俩个Number对象 ? //one two * * 线程八锁的关键: * 一. 非静态方法锁的默认为this,静态方法的锁为对应的Class实力 * 二. 某一个时刻内,只能有一个线程持有锁,无论几个方法。 * */ public class TestThread8Lock { public static void main(String[] args) { Number number = new Number(); Number number2 = new Number(); //线程1 new Thread(()->{ number.getOne(); }).start(); //线程2 new Thread(()->{ // number.getTwo(); number2.getTwo(); }).start(); //线程3 // new Thread(()->{ // number.getThree(); // }).start(); } } class Number{ public static synchronized void getOne() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("one"); } public static synchronized void getTwo() { System.out.println("two"); } // public void getThree() { // System.out.println("Three"); // } }
package com.shi.juc; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。 * * 二、线程池的体系结构: * java.util.concurrent.Executor : 负责线程的使用与调度的根接口 * |--**ExecutorService 子接口: 线程池的主要接口 * |--ThreadPoolExecutor 线程池的实现类 * |--ScheduledExecutorService 子接口:负责线程的调度 * |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService * * 三、工具类 : Executors * ExecutorService newFixedThreadPool() : 创建固定大小的线程池 * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。 * ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程 * * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。 * @author shiye * */ public class TestThreadPool { public static void main(String[] args) throws InterruptedException, ExecutionException { Number1 number1 = new Number1(); //1 创建长度5个线程的线程池 ExecutorService pool = Executors.newFixedThreadPool(5); //2 创建10个线程 执行线程 //结果: 每个线程都要按顺序 一个一个执行,而且必须要一个线程把值返回了才执行下一个线程(闭锁) for (int i = 0; i < 10; i++) { Future<Integer> future = pool.submit(()->{ int sum = number1.sum(); return sum; }); Integer sum = future.get(); System.out.println(Thread.currentThread().getName()+ " 线程 执行的结果为: " + sum); } //3 创建10个线程 执行线程 //结果 ,每个线程分开操作不需要过多的等待, for (int i = 0; i < 10; i++) { pool.submit(()->{ number1.sum(); }); } pool.shutdown();//一定要关闭线程池 } } /** * 计算 1—100 的和 ,每次计算睡眠1s * @author shiye * */ class Number1{ public int sum() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } int sum = 0; for (int i = 0; i < 101; i++) { sum +=i; } System.out.println(Thread.currentThread().getName()+ " 线程 执行的结果为: " + sum); return sum; } }
package com.shi.juc; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。 * @author shiye * */ public class TestScheduledThreadPool { public static void main(String[] args) throws InterruptedException, ExecutionException { //1 创建一个带任务调度的线程池 ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 10; i++) { //启动一个任务调度 ScheduledFuture<?> future = pool.schedule(()->{ int num = new Random().nextInt(100); System.out.println(Thread.currentThread().getName() + " 线程 产生的随机数为: " + num); return num; },3,TimeUnit.SECONDS);// 延迟3s创建一个线程 System.out.println(future.get()); } pool.shutdown();//关闭线程池 } }
以上是“Java.util.concurrent怎么用”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!