JUC并发编程学习(十三)ForkJoin
ForkJoin
什么是ForkJoin
ForkJoin在JDK1.7,并发执行任务!大数据量时提高效率。
大数据:Map Reduce(把大任务拆分成小任务)
ForkJoin特点:工作窃取
为什么可以取窃取其他线程的任务呢?因为这里面维护的都是双端队列(即队列的两端都可以取元素)
ForkJoin操作
在java.util.concurrent下的接口摘要中,有以下两个接口
点进其中一个找到具体的类,可以看到ForkJoinPool
这是具体的执行类,就像ThreadPoolExecutor
具体的执行通过ForkJoinTask类来执行
使用ForkJoin
1、需要一个ForkJoinPool,通过execute方法来执行,参数为ForkJoinTask
2、计算任务 forkJoinPool.execute(ForkJoinTask<?> task)
3、定义一个ForkJoinTask
如何去定义一个ForkJoinTask,打开jdk文档,找到ForkJoinTask类,查看具体的子类。
其中递归事件没有返回值,而任务肯定要有结果,所以递归任务是有返回值的
点进任务,查看示例
代码示例:
package org.example.forkjoin;
/*
* 求和计算的任务
*
*
* 程序员的三六九等
* 三(普通求和)、六(ForkJoin)、九(Stream并行流)
*
*
* 使用ForkJoin:
* 1、ForkJoinPoll 通过他来执行
* 2、计算任务 forkJoinPool.execute(ForkJoinTask<?> task)
* 3、定义一个ForkJoinTask
*
*
* */
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else {
//使用ForkJoin分支合并计算
Long middle = (end + start) / 2;//中间值
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
//拆分任务、把任务压入线程队列
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
}
三六九等程序员的测试
package org.example.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test01();//11720
// test02();//7369
test03();//239
}
/*
* 普通程序员
* */
public static void test01() {
long startTime = System.currentTimeMillis();
Long sum = 0L;
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum+=i;
}
long endTime = System.currentTimeMillis();
System.out.println("结果:" + sum + "-->耗时:" + (endTime - startTime));
}
/*
* 会使用ForkJoin
* */
public static void test02() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinPool forkJoinPool = new ForkJoinPool();
// forkJoinPool.execute(forkJoinDemo);//执行
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);
Long aLong = submit.get();
long endTime = System.currentTimeMillis();
System.out.println("结果:" + aLong + "-->耗时:" + (endTime - startTime));
}
/*
* 九等程序员:Stream流并行运算
* 效率高十几倍
*
* */
public static void test03() {
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("结果:" + sum + "-->耗时:" + (end - start));
}
}