在多线程中,使用线程池时使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
一.CompletableFuture简介
Future可明确地完成(设定其值和状态),并且可以被用作CompletionStage,支持相关的功能和动作的,其完成后触发。
当两个或多个线程尝试对 complete, completeExceptionally或 cancel CompletableFuture进行操作时,只有其中一个成功。
除了直接操作状态和结果的这些方法和相关方法之外,CompletableFuture还CompletionStage使用以下策略实现接口:
为非异步方法的相关完成提供的动作 可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。
所有没有显式Executor参数的异步方法都使用来执行ForkJoinPool.commonPool() (除非它不支持并行度至少为2,在这种情况下,将创建一个新的Thread来运行每个任务)。
为了简化监视,调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask。
所有CompletionStage方法都是独立于其他公共方法实现的,因此一个方法的行为不受子类中其他方法的覆盖影响。
CompletableFuture还Future采用以下策略实施:
由于(与FutureTask此类不同)此类无法直接控制导致其完成的计算,因此取消被视为异常完成的另一种形式。
方法cancel具有与相同的效果 completeExceptionally(new CancellationException())。
方法 isCompletedExceptionally()可用于确定CompletableFuture是否以任何特殊方式完成
如果使用CompletionException异常完成,则方法get()和get(long, TimeUnit)抛出具有 ExecutionException与相应CompletionException中所保存的原因相同的原因。
为了简化大多数情况下的用法,此类还定义了方法,join()并且 getNow(T)在这些情况下直接抛出CompletionException。
二.常用Api介绍与实战
2.1 runAsync
runAsync(Runnable runnable)
返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成。runAsync(Runnable runnable, Executor executor)
返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
代码示例
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
set(6.0, 10.0);
});
Thread.sleep(2000);
}
public static Double set(Double x,Double y){
System.out.println("X="+x + "---- Y="+y);
return Double.sum(x,y);
}复制
2.2 supplyAsync
supplyAsync(Supplier supplier)
返回一个新的CompletableFuture,它通过在 ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值 异步完成。supplyAsync(Supplier supplier, Executor executor)
返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值
代码示例
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return set(6.0, 10.0);
});
Thread.sleep(2000);
}
public static Double set(Double x,Double y){
System.out.println("X="+x + "---- Y="+y);
return Double.sum(x,y);
}复制
2.3 thenAccept和exceptionally
thenAccept(Consumer<? super T> action)
返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。exceptionally(Function<Throwable,? extends T> fn)
返回一个新的CompletableFuture,当CompletableFuture完成时完成,结果是异常触发此CompletableFuture的完成特殊功能的给定功能; 否则,如果此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。
代码示例
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return set(6.0, 10.0);
});
//成功后执行
cf2.thenAccept((sum)->{
System.out.println("最终的值为:"+sum);
});
//失败执行
cf2.exceptionally((e)->{
e.printStackTrace();
return null;
});
Thread.sleep(2000);
}
public static Double set(Double x,Double y){
System.out.println("X="+x + "---- Y="+y);
return Double.sum(x,y);
}复制
2.4 串行与并行
2.4.1 串行
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return set(6.0, 10.0);
});
//成功后执行 执行下一个
CompletableFuture<Double> cf3 = cf2.thenApplyAsync((setUp) -> {
return setUp(10.0,9.98);
});
cf2.thenAccept(sum-> System.out.println("sum:"+sum));
cf3.thenAccept(sum-> System.out.println("setUp:"+sum));
//失败执行
cf2.exceptionally((e)->{
e.printStackTrace();
return null;
});
Thread.sleep(2000);
}
@SneakyThrows
public static Double set(Double x, Double y){
System.out.println("X="+x + "---- Y="+y);
Thread.sleep(500);
return Double.sum(x,y);
}
@SneakyThrows
public static Double setUp(Double x, Double y){
System.out.println("X="+x + "---- Y="+y);
Thread.sleep(500);
return Double.sum(x,y);
}复制
执行结果
2.4.2 并行
allOf(CompletableFuture<?>… cfs)
返回一个新的CompletableFuture,当所有给定的CompletableFutures完成时,完成。
anyOf(CompletableFuture<?>… cfs)
返回一个新的CompletableFuture,当任何一个给定的CompletableFutures完成时,完成相同的结果。
@SneakyThrows
public static void main(String[] args) {
//创建两个异步的 CompletableFuture
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return set(6.0, 10.0);
});
CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
return setUp(10.0,9.98);
});
//将 cf2 cf3 进行合并
CompletableFuture<Object> objcf = CompletableFuture.anyOf(cf2, cf3);
// 分别执行 ,获取 相应的返回值
CompletableFuture<Object> obj1 = objcf.thenApplyAsync(set -> {
return set;
});
CompletableFuture<Object> obj2 = objcf.thenApplyAsync(setUp -> {
return setUp;
});
//在次合并
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(obj1, obj2);
//统一sum
objectCompletableFuture.thenAccept(result->{
System.out.println("sum:"+result);
});
Thread.sleep(2000);
}
@SneakyThrows
public static Double set(Double x, Double y){
System.out.println("X="+x + "---- Y="+y);
Thread.sleep(500);
return Double.sum(x,y);
}
@SneakyThrows
public static Double setUp(Double x, Double y){
System.out.println("X="+x + "---- Y="+y);
Thread.sleep(500);
return Double.sum(x,y);
}复制
执行结果
实际应用示例:
https://github.com/Dylan-haiji/javayh-platform/blob/master/javayh-starter/javayh-common-starter/src/main/java/com/javayh/common/exception/GlobalExceptionHandler.java
小编寄语
小编创建了一个关于Java学习讨论的微信群!想进去的可以联系小编!同时也欢迎大家点赞与转发!
小编微信:372787553
备注为进群,通过后小编会邀请您进群!
