并发编程第12篇,FutureTask原理

FutureTask原理

使用FutureTask可以返回当前线程的结果。

可以让线程阻塞或者是唤醒

LockSupport与Wait/notify
 

FutureTask简单用法

public class Test001 implements Callable<String> {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        FutureTask<String> stringFutureTask = new FutureTask<>(new Test001());

        new Thread(stringFutureTask).start();

        String result = stringFutureTask.get();

        System.out.println(result);

    }

  

    @Override

    public String call() throws Exception {

        try {

            Thread.sleep(3000);

            System.out.println(Thread.currentThread().getName() + ",发送短信完成。");

        } catch (Exception e) {

  

        }

        return "发送短信完成";

    }

  

  

}

 

 

手写FurureTask 方式一

public interface MyCallable<V> {

    V call();

}

 

public class MayiktTask<V> implements Runnable {

    private MyCallable<V> myCallable;

    private V result;

    private Thread cuThread;

  

    public MayiktTask(MyCallable myCallable) {

        this.myCallable = myCallable;

    }

  

    @Override

    public void run() {

        result myCallable.call();

        LockSupport.unpark(cuThread);

    }

  

    public V get() {

        if (result != null) {

            return result;

        }

        cuThread = Thread.currentThread();

        LockSupport.park(cuThread);

        return result;

    }

}
 
MayiktTask<String> stringMayiktTask = new MayiktTask<>(new Test002());

  new Thread(stringMayiktTask).start();

String s = stringMayiktTask.get();

System.out.println(s);

 

手写FurureTask 方式二

public class MayiktTask<V> implements Runnable {

    private MyCallable<V> myCallable;

    private V result;

    private Thread cuThread;

  

    private Object lock = new Object();

  

    public MayiktTask(MyCallable myCallable) {

        this.myCallable = myCallable;

    }

  

    @Override

    public void run() {

        result = myCallable.call();

        synchronized (lock) {

            lock.notify();

        }

  //        LockSupport.unpark(cuThread);

    }

  

    public V get() {

        if (result != null) {

            return result;

        }

        cuThread = Thread.currentThread();

  //        LockSupport.park(cuThread);

        synchronized (lock) {

            try {

                lock.wait();

            } catch (Exception e) {

  

            }

        }

        return result;

    }

}

 

 

Fork Join 原理解读

并发编程发展

Java 1 支持thread,synchronized。

Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。

Java 7 加入了fork-join库。

Java 8 加入了 parallel streams。 并行流:

 

基本设计思想

Fork/Join是Java7提供的并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总小任务的结果得到大任务结果的框架

小任务可以继续不断拆分n多个小任务。

并发编程第12篇,FutureTask原理

 

基本伪代码实现

 

if(任务很小){

    直接计算得到结果

}else{

    分拆成N个子任务

    调用子任务的fork()进行计算

    调用子任务的join()合并计算结果

}

工作窃取机制

 

将一个比较大的任务,拆分成n多个不同的子任务,一直到不可以拆分为止。

例如:将一个大的任务,拆分成n多个子任务,每个任务中对应一个独立的队列。

由于每个线程处理的速度不一样,如果先执行完任务的队列的线程,窃取其他没有

执行完任务队列。

相关Api内容

1、RecursiveAction:用于没有返回结果的任务

2、RecursiveTask:用于有返回结果的任务

 

Compute()方法计算

Fork()方法 Fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中,拆分子任务。 

join()合并子任务 支持Join,即任务结果的合并

 

通过invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;

通过execute方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且没有返回结果;

通过submit方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且有返回结果(返回Future实现类,可以通过get获取结果)。

 

Fork JOIN用法

使用Fork join计算 总和

public class ForkJoinDemo extends RecursiveTask<Long> {

    // 最小分隔单位

    private long max = 200;

    private long start;

    private long end;

  

    public ForkJoinDemo(Long start, Long end) {

        this.start = start;

        this.end = end;

    }

  

  

    @Override

    protected Long compute() {

        Long sum = 0l;

        if (end - start < max) {

            System.out.println(Thread.currentThread().getName() + ",start:" + start + ",end:" + end);

            for (Long i = start; i <= end; i++) {

                sum += i;

            }

        } else {

            // 400+1 200 1-200,201-400

            long l = (end + start) / 2;

            ForkJoinDemo left = new ForkJoinDemo(start, l);

            left.fork();

            ForkJoinDemo rigt = new ForkJoinDemo(l + 1, end);

            rigt.fork();

            left.join();

            rigt.join();

            try {

                sum = left.get() + rigt.get();

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (ExecutionException e) {

                e.printStackTrace();

            }

        }

        return sum;

    }

  

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1l, 400l);

        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);

        System.out.println(submit.get());

  

    }

}

 

 

使用Fork JOIN异步群发短信

public class ForkJoinSms extends RecursiveAction {

    /**

     * 存放手机号码

     */

    private List<String> phones;

  

    private int start;

    private int end;

    // 1000

    private int max = 100;

  
    public ForkJoinSms(int start, int end) {

        this.start = start;

        this.end = end;

    }

  

    @Override

    protected void compute() {

        if (end - start < max) {

            System.out.println(Thread.currentThread().getName() + ",start:" + start + ",end:" + end);

        } else {

            int l = (end + start) / 2;

            ForkJoinSms left = new ForkJoinSms(start, l);

  

            ForkJoinSms rigt = new ForkJoinSms(l + 1, end);

            left.fork();

            rigt.fork();

        }

  

    }

 

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ForkJoinSms forkJoinSms = new ForkJoinSms(1, 1000);

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        forkJoinPool.invoke(forkJoinSms);


    }

}

每一行代码都有它的涵义,多问一句为什么;别怕,理清思路,一切代码都是数据的流动和转化,耐心一点,慢慢积累!一起加油!!!