Java知识梳理之多线程与并行程序设计(九)
部分代码的Github地址为:https://github.com/hzka/JavaBook02/tree/master/chap30
(一)基础概念
1.线程的概念:
Java最重要的特性就是内部支持多线程,即一个程序中允许同时执行多个任务。线程是指一个任务从头到尾的执行流程。可以在程序中创建附加的线程以执行并发任务,在Java接口中,每个任务都是Runnable接口的一个实力,称之为可运行对象。线程本质上讲就是便于任务执行的对象。
2.创建任务和线程:
步骤1:为了创建任务,必须为任务一个实现Runnable接口的类TaskClass。其只包含一个run方法,需要告诉系统线程将如何运行;
步骤2:一旦定义一个TaskClass,就可以使用构造方法创建一个新任务;
步骤3:任务必须在线程中执行。Thread类包括创建线程的构造方法以及线程控制方法。eg:
Thread thread = new Thread(task);
步骤4:调用start方法来告诉Java虚拟机该线程准备运行。
3.Thread类:
3.1Thread类是什么?
Thread类包含了为任务而创建的线程的构造方法,以及控制线程的方法,其UML如下所示。
3.2注意要点
1.由于Thread实现了Runnable,可定义Thread扩展类。并实现run方法。创建类对象后调用start方法来启动线程。
2.可以使用yield来为其他进程临时让出CPU时间。使用Threa.yield()方法。
3.使用Thread.sleep(long mills)来将该进程休眠以确保其他进程的执行。
4.也可以使用join方法是一个线程等待另一个线程的结束。
5.还可以为每一个线程制定一个优先级。setPriority提高或者降低线程优先级;getPriority获取线程的优先级。优先级在1-10之间。Thread中有int常量MIN_PRIORITY和NORM_PRIMARITY以及MAX_PRIORITY代表1、5和10。若高优先级总不退出,可以定时调用sleep或者yield方法,来给低优先级或者相同优先级的线程一个运行的机会。
4.线程池
4.1目前方案存在问题及解决方案:
实战一中的方法对单一任务执行很方便,但由于必须为每一个任务创建一个线程,因此对大量任务而言是不够高效的。为每一个任务开启一个新线程会限制吞吐量,造成性能降低。线程池是管理并发执行任务个数的理想方法。(如果一个任务用thread就好,多任务创建线程时,最好使用线程池。)
4.2线程池的相关API:
newFixedThreadPool:创建一个可以并行运行指定数目线程的线程池,一个线程在当前任务已经完成的情况下可以重用,来执行另外一个任务;newCachedThreadPool:创建一个线程池,它会在必要时候创建新线程,若之前创建线程可用,则会重用之前创建的线程。若60s内都为使用,则终止该线程。
4.3代码:
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new PrintChat('a', 10000));
executorService.execute(new PrintChat('b', 10000));
executorService.execute(new PrintNum(1000));
executorService.shutdown();
//Shuntdown()通知执行器关闭,不接受新任务,但现有任务继续执行至完成。
5.线程同步:
5.1.何为线程同步?
线程同步用于协调相互依赖的进程的执行。若共享资源被同时访问,则会遭到破坏。
5.2示例
问题描述:创建启动100线程,每个线程往同一账户添加一便士,Accout类为模拟账户,AddAPennyTask为往账户添加一便士以及用于创建和启动线程的主类。类关系如下:
代码:(P3004项目,验证了当所有线程同时访问同一数据源时,会出现数据破坏的问题。)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static Account account = new Account();
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
//创建并加载100个线程
for (int i = 0; i < 100; i++) {
executorService.execute(new AddAPennyTask());
}
//直到所有的任务执行结束。
executorService.shutdown();
//重复检验所有任务是否完成。
while (!executorService.isTerminated()) {
}
//显示余额
System.out.println("the balance is:" + account.getBalance());
}
private static class Account {
private int balance = 0;
public int getBalance() {
return balance;
}
public void deposit(int i) {
int newBalance = balance + i;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
balance = newBalance;
}
}
private static class AddAPennyTask implements Runnable {
@Override
public void run() {
//执行加一操作。
account.deposit(1);
}
}
}
为什么会出现数据破坏?
同时获取同样余额数目,并写入新余额,这样Task 1就啥也没做。任务2与任务1以冲突方式访问一个公共资源。称之为竞争状态,多线程类中没有竞争状态,称之为线程安全,譬如:Accountk类不是线程安全的。
5.3 synchronized关键字
为避免进入竞争状态,防止多线程进入程序某一特定部分,这一部分我们称之为临界区。(上述代码中我们的整个deposit方法是临界区)我们采用synchronized关键字来同步方法,以便一次只能有一个线程访问该方法。
(1)加入关键字synchronized,使Account类成为线程安全的。eg: public synchronized void deposit(int i);
(2)同步方法执行之前需要加锁,锁是一种实现资源排他使用的机制,对于实例方法,给调用方法对象加锁;对于静态方法,给整个类加锁。执行后解锁。如下图所示:后面详细讲。
5.4 同步语句
同步对象不仅可以对this对象加锁,也可以对任意对象加锁。这个代码块成为同步块,同步语句允许设置同步方法中的部分代码,而不是刚才的整个方法。这样大大增加程序的并发能力。
synchronized (account) {
account.deposit(1);
}
6.利用加锁同步
6.1什么是加锁同步?
同步的实例方法在每次执行前都要隐式加上synchronized关键字以隐式的加锁。但Java也可以显式的加锁,给协调进程带来了更多的控制功能。一个锁是一个Lock接口实例,定义了加锁和释放锁的方法。也可以使用newCondition方法来创建任意个数的Condition对象,用于线程通信。RaentranrLock是Lock的一个具体实现,创建互相排斥的锁。也可以创建具有特定的公平策略的锁,若为true,则确保等待时间最长的线程首先获得锁。若为false,则给任意一个在等待的线程。
6.2代码实现:(P3005项目)
private static Lock lock = new ReentrantLock();//创建一个锁
lock.lock();//加锁
lock.unlock;//释放锁
使用synchronized方法或语句比使用互相排斥的显式锁更简单些,然而,使用显式锁对同步具有状态的线程更加直观和灵活。
7.线程间协作
7.1什么是线程间协作?
临界区多线程相互排斥完全可以避免竞争条件的发生,但有时仍需要线程之间的协作与通信,譬如:通过调用Lock对象的newCondition()方法来创建对象,一旦创建,则可以使用await()(让当前线程进入等待,直到条件发生)、signal()(唤醒一个等待的线程)以及sinalAll()(唤醒所有等待线程)来实现线程之间的相互通信。
7.2示例及其代码:
问题描述:假设创建并启动两个任务,一个存款,一个提款,当提款数额大于当前账户余额时,提款线程必须等待。不管什么时候,只要账户新存入一笔资金,存储线程必须通知提款线程重新尝试。如果余额仍未达到提款数额,提款线程继续等待。使用一个具有条件的锁newDeposit(即增加到账户的新存款)。如果余额小于取款额度,则等待,当增加现金,则唤醒重新尝试。
示例代码:(P3006项目)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static Account account = new Account();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new DepositTask());//存款进程
executorService.execute(new WithdrawTask());//取款进程
executorService.shutdown();
// System.out.println("Thread 1\t\tThread 2\t\tBalance");
}
private static class DepositTask implements Runnable {//存款任务
@Override
public void run() {
try {
while (true) {
account.deposit((int) (Math.random() * 10) + 1);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class WithdrawTask implements Runnable {//取款任务
@Override
public void run() {
while (true) {
//每次存储存1块-10块之间的随机数。
account.withDraw((int) (Math.random() * 10) + 1);
}
}
}
private static class Account {
//创建一个新锁
private static Lock lock = new ReentrantLock();
//创建一个Condition环境
private static Condition newDeposit = lock.newCondition();
//余额
private int balance = 0;
public int getBalance() {
return balance;
}
public void withDraw(int amount) {//取款进程,参数取款数目
lock.lock();
try {
while (balance < amount) {
System.out.println("Wait for a deposit");
newDeposit.await();//引起线程等待,直到发出条件信号
}
//减去取款数目,获得余额
balance -= amount;
System.out.println("取款:" + amount + "\t\t余额:" + getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁
}
}
public void deposit(int amount) {//存款方法,参数为存款数目
lock.lock();//获取锁
try {
balance += amount;
System.out.println("存款:" + amount + "\t\t余额:" + getBalance());
//唤醒所有等待线程
newDeposit.signalAll();
} finally {
lock.unlock();
}
}
}
}
7.3.synchronized关键字以及相应的唤醒、等待线程方法。
8.阻塞队列
8.1相关定义
阻塞队列在试图向一个满队列添加元素或者从空队列删除元素时会导致线程阻塞,BlockingQueue继承了Queue,并提供put和take方法来向队尾增加元素,在队头删除元素。有三种,分别是用数组、链表以及优先队列实现阻塞队列。 ArrayBlockingQueue必须指定容量或者可选的公平性策略。
8.2代码示例(P3008项目)
问题简述:使用ArrayBlockingQueue来简化生产者消费者(P3007项目)中的例子。使用ArrayBlockingQueue来存储整数,生产者将整数放入队列;消费者从队列中取走一个整数。果真很强大,ArrayBlockingQueue实现了刚才的Buffer内部类,因为同步已经在ArrayBlockingQueue中实现,所以无需锁和条件。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(2);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new ProduceTask());
executorService.execute(new ConsumerTask());
executorService.shutdown();
}
private static class ProduceTask implements Runnable {
@Override
public void run() {
int i = 1;
try {
while (true) {
System.out.println("Produce writes" + i);
buffer.put(i++);
// buffer.write(i++);
try {
Thread.sleep((int) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class ConsumerTask implements Runnable {
@Override
public void run() {
try {
while (true) {
System.out.println("Consumer reads:" + buffer.take());
try {
Thread.sleep((int) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
9.信号量
9.1定义
可以使用信号量来限制访问同一个共享资源的线程数。信号量指对共同资源进行访问控制的对象。访问资源前,从信号量获取许可,访问资源后,线程必须将许可返回信号量。acquire获得许可,信号量可用许可总数减一,一旦许可被释放,信号量可用许可加一。
9.2代码示例(P3009项目)
问题描述:如上所说,信号量可以用来模拟互相排斥的锁,确保同一时间只有一个线程可以访问P3004的deposit方法。
private static class Account {
private int balance = 0;
private Semaphore semaphore = new Semaphore(1);
public int getBalance() {
return balance;
}
//public synchronized void deposit(int i) {
public void deposit(int i) {
try {
semaphore.acquire();//获得信号量许可
int newBalance = balance + i;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
balance = newBalance;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//释放信号量许可
}
}
}
9.3锁和信号量之间的相似之处和不同之处(需要思考的一个问题)
此段转自:http://www.php.cn/java-article-407669.html
1.作用域:
信号量:进程间或者线程间;互斥锁:线程间
2.上锁时的执行:
信号量:如果信号量的value > 0;那么其他的线程可以执行任务。并且成功执行后,value--;如果value=0,那么线程sem_wait 使得线程阻塞,直到sem_post被释放后value++,其他的线程才可以根据value执行。
互斥锁:只有一个对象被上了锁(加锁),那么其他任何线程都不可访问被加锁的对象。
3.一个是同步一个是互斥
信号量:用在多线程多任务的同步的,一个线程完成了某个动作就通过信号量告诉别的线程,别的线程在进行某些动作。
互斥锁:用在多线程多任务互斥的。一个线程占用了某个资源后,那么别的线程就无法访问,直到这个线程unlock,其他线程才可以利用。
也就是说,信号量不一定是锁定某一个资源,而是流程上的概念,比如:有A,B两个线程,B线程要等A线程完成某一任务以后再进行自己下面的步骤,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理之类。而线程互斥量则是“锁住某一资源”的概念,在锁定期间内,其他线程无法对被保护的数据进行操作。在有些情况下两者可以互换。
10.避免死锁
线程1获取object1的锁,线程2获取object2的锁。而线程1等待object2上的锁,线程2等待object1的锁,互相等待对方线程释放所需要的锁,引发死锁。
解决方法:资源排序技术。
11.线程状态
线程可以是以下5种状态之一:新建、就绪、运行、阻塞或者结束。新建一线程,进入新建态;调用线程start方法,进入就绪态,可能未开始,系统必须为其分配CPU时间;线程开始运行(进入run方法),进入运行态。若给定CPU用完,或者使用yield方法,运行转至就绪。进入阻塞态可能是调用了join、sleep或者wait方法。Run完成,就结束态了。
12.同步合集
12.1相关定义
合集包括线性表、集合和映射表,它们线程不安全,Collection类同日拱了6个静态方法将其由合集转换为同步版本(同步包装类)。同步合集可以并发被多个线程删除或者更改。
12.2代码示例
创建同步合集对象,在遍历时获取对象上的锁,假设在这里遍历一个集合:
Set hasSet = Collections.synchronizedSet(new HashSet<>());
synchronized (hasSet){
Iterator iterator = hasSet.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
}
13.并行编程
采用fork/join框架来并行的分而治之。
(二)实战
1.创建三个任务以及三个运行这些任务的线程:第一个任务是打印字母a100次;第二个是任务是打印字母b100次,第三个任务是打印1-100的整数。共享CPU,轮流打印字母和数字。(P3001项目)
思路:使用线程来实现即可,任务中的run方法指明了如何完成这个任务,Java虚拟机会自动调用该方法,无需特意调用。直接调用run方法只是在同一个线程中执行该方法,没有新线程被启动。
代码:
public class Main {
public static void main(String[] args) {
//创建任务
Runnable printA = new PrintChat('a', 10000);
Runnable printB = new PrintChat('b', 10000);
Runnable print100 = new PrintNum(100);
//创建线程
Thread thread_01 = new Thread(printA);
Thread thread_02 = new Thread(printB);
Thread thread_03 = new Thread(print100);
//开启线程,说白了就是不清楚输出的顺序了。
thread_01.start();
thread_02.start();
thread_03.start();
}
public static class PrintChat implements Runnable {
private char charToPrint;
private int time;
public PrintChat(char a, int i) {
this.charToPrint = a;
this.time = i;
}
@Override
public void run() {
for (int i = 0; i < time; i++) {
System.out.print(charToPrint);
}
}
}
public static class PrintNum implements Runnable {
private int lastNum;
public PrintNum(int i) {
this.lastNum = i;
}
@Override
public void run() {
for (int i = 1; i <= lastNum; i++) {
System.out.print(" "+i);
}
}
}
}
2.闪烁的文本:使用线程来控制动画。如何在一个标签上显示闪烁的文本。如下所示。(P3002项目)
思路:(1)使用Thread.sleep来将该进程休眠。(2)闪烁的控制运行自单独线程,非应用程序线程代码不能更新应用线程中的GUI。(3)页面的布局,Scene布局。
代码:
import javafx.application.Application;
import javafx.application.Platform;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.layout.StackPane;
import javafx.stage.Stage;
public class FlashText extends Application {
private String text = "";
@Override
public void start(Stage primaryStage) throws Exception {
StackPane pane = new StackPane();
Label lalText = new Label("Programming is fun");
pane.getChildren().add(lalText);
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
//java 中的trim()方法,用于字符串的处理,功能是去除一段字符串前后的空格,
// 只保留中间的部分;例如“ hello world ” 将变为“hello world”
String s1 = lalText.getText().trim();
if (s1.length() == 0) {
text = "Welcome";
} else {
text = "";
}
//闪烁的控制运行自单独线程,非应用程序线程代码不能更新应用线程中的GUI。
//Platform.runLater(new Runnable())告诉系统在应用程序线程中创建一个Runnable对象。
Platform.runLater(new Runnable() {
@Override
public void run() {
//持续的修改标签。通过设置和取消文本来模拟闪烁的效果。
lalText.setText(text);
}
});
try {
//线程休息200ms。
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
Scene scene = new Scene(pane,200,50);
primaryStage.setTitle("FlashText");
primaryStage.setScene(scene);
primaryStage.show();
}
}
3.生产者/消费者问题
问题描述:缓冲区存储整数且大小受限,write()方法将int添加到缓冲区中,read()方法读取和删除一个Int值。为了同步这一操作,使用两个条件锁:notEmpty(缓冲区非空)以及notFull(缓冲区未满)。当任务向缓冲区添加int值,若缓冲区满,则等待notFull条件,当任务从缓冲区读取一个int时,若缓冲区是空,则等待notEmpty条件。缓冲区先进先出,其交互图如下所示:
代码示例:(P3007项目)
import java.nio.Buffer;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static Buffer buffer = new Buffer();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new ProduceTask());
executorService.execute(new ConsumerTask());
}
private static class ProduceTask implements Runnable {
@Override
public void run() {
int i = 1;
while (true){
System.out.println("Produce writes"+i);
buffer.write(i++);
try {
Thread.sleep((int)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class ConsumerTask implements Runnable {
@Override
public void run() {
while (true){
System.out.println("Consumer reads:"+buffer.read());
try {
Thread.sleep((int)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Buffer {
private static final int CAPACITY = 1;
private LinkedList<Integer> queue = new LinkedList<>();
private static Lock lock = new ReentrantLock();
//创建两个新锁及相应的Condition
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
public void write(int value) {
lock.lock();//获取锁
//若满,则唤起等待进程,无法写。
try {
while (queue.size() == CAPACITY) {
System.out.println("缓冲区已满,无法添加,等待notFull信号");
notFull.await();
}
queue.offer(value);
notEmpty.signal();//增加一个元素,有得删除了。唤醒未满notEmpty。唤醒非空Condition
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁
}
}
public int read() {
int value = 0;
lock.lock();//获取锁
try {
while (queue.isEmpty()) {
System.out.println("缓冲区已空,无法删除,等待notEmpty信号");
notEmpty.await();
}
value = queue.remove();
notFull.signal();//删除一个元素,腾出一个空间,非空,有得写入了。唤醒未满notFull。
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁
return value;
}
}
}
}
有一个百思不得其解的问题,为什么同步互斥锁,执行每个任务时候都要Thread.sleep(milles),若果没有这句话的话会污染数据,有的话就会正常执行???
解析:调用Threed.sleep方法是为了让出执行的cpu,让各个就绪的线程进行cpu的抢夺,如果不调用Threed.sleep该方法,可能该线程会继续执行,但是又没所需要的资源,执行也是浪费cpu,还是得等其他的线程执行完成,该线程才可能获取到资源,正确执行程序。(问一个大佬的)
(三)总结