CompletableFuture异步回调
CompletableFuture异步回调
CompletableFuture简介
CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture实现了Future,CompletionStage接口,实现了Future接口可以兼容线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。
Futrue和CompletableFuture
Future在Java里面,通过用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我么会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,该有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Futrue缺点
1.不支持手动完成。2.不支持进一步的非阻塞调用。3.不支持链式调用。4.不支持多个Future合并。5.不支持异步处理。
CompletableFuture类的使用案例
CompletableFuture01
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 演示CompletableFuture
* @author 长名06
* @version 1.0
*/
public class CompletableFuture01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
System.out.println("子线程开始干活");
try {
//子线程沉睡3s
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//完成future任务
future.complete("success");
},"A").start();
System.out.println("主线程调用get方法获取结果为:" + future.get());
System.out.println("主线程完成,阻塞结束");
}
}
CompletableFuture02
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author 长名06
* @version 1.0
*/
public class CompletableFuture02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步调用,无返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行runSync()");
});
completableFuture1.get();
//异步调用,有返回值
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "执行supplyAsync()");
// int i = 1/0;
return 1024;
});
completableFuture2.whenComplete((t, u) -> {
System.out.println("----t=" + t);//t参数,是执行的返回值
System.out.println("----u=" + u);//异常信息
}).get();
// System.out.println(Runtime.getRuntime().availableProcessors());
}
}
CompletableFuture03
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 演示线程依赖,执行api thenApply()
* 一个任务,依赖于另一个任务可以使用thenApply()将两个任务(线程)串行化
* 对一个数先加10 再平方
* @author 长名06
* @version 1.0
*/
public class CompletableFuture03 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任务开启");
num += 10;
return num;
}).thenApply(i -> num * num);
Integer integer = future.get();
System.out.println("主线程结束,子线程的结果为" + integer);
}
}
CompletableFuture04
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* 消费处理结果
* thenAccept()方法,接收任务的处理结果,并消费结果,不返回结果了
* @author 长名06
* @version 1.0
*/
public class CompletableFuture04 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主线程开始");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任务开启");
num += 10;
return num;
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return num * num;
}
}).thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer i) {
System.out.println("子线程全部处理完成,最后调用了accept方法,消费了结果" + i);
}
});
}
}
CompletableFuture05
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* 异常处理
* @author 长名06
* @version 1.0
*/
public class CompletableFuture05 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;//模拟异常
System.out.println("加10任务开启");
num += 10;
return num;
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable ex) {
System.out.println(ex.getMessage());
return -1;
}
});
}
}
CompletableFuture06
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* 消费结果,同时处理异常
* handle类似与thenAccept/thenRun方法,是最后一步结果的调用,但是同时可以处理异常
* @author 长名06
* @version 1.0
*/
public class CompletableFuture06 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
System.out.println("加10任务开启");
num += 10;
return num;
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer i, Throwable ex) {
System.out.println("进入了handle方法");
if(ex != null){
System.out.println("发生了异常,内容为" + ex.getMessage());
return -1;
}else{
System.out.println("正常执行,结果为" + i);
return i;
}
}
});
}
}
CompletableFuture07
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* 两个CompletableFuture结果的合并
* @author 长名06
* @version 1.0
*/
public class CompletableFuture07 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
//有依赖关系的合并
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任务开启");
num += 10;
return num;
});
//合并
CompletableFuture<Integer> future2 = future.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer i) {
return CompletableFuture.supplyAsync(() -> {
return i + 1;
});
}
});
System.out.println(future.get());
System.out.println(future2.get());
//无依赖的任务合并
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任务开启");
num += 10;
return num;
});
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘10任务开启");
num *= 10;
return num;
});
//合并两个结果
CompletableFuture<Object> future3 = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer result1, Integer result2) {
ArrayList<Integer> list = new ArrayList<>();
list.add(result1);
list.add(result2);
return list;
}
});
System.out.println("合并结果为" + future3.get());
}
}
CompletableFuture08
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* 多个独立任务的合并 allOf
* @author 长名06
* @version 1.0
*/
public class CompletableFuture08 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<Integer>> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任务开启");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘10任务开启");
num *= 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
System.out.println("减10任务开启");
num -= 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
System.out.println("除10任务开启");
num /= 10;
return num;
});
list.add(job4);
//使用allOf需注意,输入也会执行任务,但是无法获取到结果
//allOf需要等所有的任务执行完毕
/**
* 返回值是CompletableFuture<Void>类型
* public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
* return andTree(cfs, 0, cfs.length - 1);
* }
*/
// CompletableFuture<Void> allJob = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
// System.out.println(allJob.get());
//也可以使用 join的形式,执行,可以获取结果
List<Integer> allResult = list.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(allResult);
}
}
CompletableFuture09
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* anyOf
* @author 长名06
* @version 1.0
*/
public class CompletableFuture09 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<Integer>> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("加10任务开启");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("乘10任务开启");
num *= 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("减10任务开启");
num -= 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("除10任务开启");
num /= 10;
return num;
});
list.add(job4);
//anyOf,这里只要有一个job执行完毕,就结束所有的任务执行,不需要等待所有的job执行完毕
//但是这个很鸡肋,因为如果不要执行所有的任务,就没必要开启一个CompletableFuture了
//也可以适用于竞争的场景,先执行成功的获取结果,其他的不再竞争了
CompletableFuture<Object> allJob = CompletableFuture.anyOf(list.toArray(new CompletableFuture[0]));
System.out.println(allJob.get());
}
}
只是为了记录自己的学习历程,且本人水平有限,不对之处,请指正。