介绍
Java 8提供的工具类,实现了Future接口和CompletionStage接口。拥有40多种方法,为函数式编程中的流式调用而准备。
支持流式计算,函数式编程,完成通知,自定义异常处理等特性。
和Future一样,可以作为函数调用的契约。调用线程调用CompletableFuture.supplyAsync()创建一个新线程,请求线程可以先处理其他任务。如果其他任务完成后调用CompletableFuture.get()方法获取结果。如果数据还没有准备好,就会等待。
在Java 8 之前,如果使用实现Runnable的Run方法实现多线程,问题是Run()没有返回值。
如果使用Callable的Call方法,再用Future的get()获取返回值,会造成主线程的阻塞。
CompletableFuture的用法
以下带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。
创建CompleteableFuture
构造函数创建
最简单的方式就是通过构造函数创建一个CompletableFuture实例。
CompletableFuture<String> future = new CompletableFuture(); String result = future.join(); System.out.println(result);
由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。
此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。
future.complete("test");
supplyAsync创建
通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get()或join()获取最终计算结果。
注意:在定义和调用CompletableFuture.supplyAsync()方法时,该异步任务就开始执行了。
而get()或者join()则是阻塞主线程,获取异步任务的结果。
CompletableFuture<String> future
= CompletableFuture.supplyAsync(()->{
System.out.println("compute test");
return "test";
}).completeOnTimeout(1000, TimeUnit.MILLISECONDS);
String result = future.join();
System.out.println("get result: " + result);
join()和get()方法都是阻塞调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。
CompletableFuture.get() 和 CompletableFuture.join() 这两个方法是获取异步守护线程的返回值的。
不同点:
get()方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。
join()方法会抛出未经检查的异常。
runAsync创建
与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("compute test");
});
System.out.println("get result: " + future.join());
由于任务没有返回值, 所以最后的打印结果是"get result: null"。
简单用法 get() 与 complete()
get() 获取任务结果
在主线程中调用get()方法会阻塞主线程(用来获取异步任务的返回值)。
get:获取任务返回值,没有则阻塞直到有返回值返回;抛出经过检查的异常InterruptedException, ExecutionException。
getNow:如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值;
join:获取任务返回值,没有则阻塞直到有返回值返回;抛出未经检查的异常,不会强制抛出,会将异常包装成CompletionException/CancellationException异常;
注意:get和join会阻塞主线程,是阻塞主线程直到异步任务有返回,不是阻塞主线程开始其他异步任务。
try {
log.info("test14doubleGet start");
future1.get();
log.info("test14doubleGet future1 start");
future2.get();
log.info("test14doubleGet end");
} catch (InterruptedException | ExecutionException e) {
log.debug(e.getMessage());
Thread.currentThread().interrupt();
}
执行结果:
21:49:45.255 [main] INFO com.txw.ms.sample.service.TestCompletableFuture - test14doubleGet start 21:49:45.568 [ForkJoinPool.commonPool-worker-3] INFO com.txw.ms.sample.service.TestCompletableFuture - 第一阶段:1 21:49:45.568 [ForkJoinPool.commonPool-worker-5] INFO com.txw.ms.sample.service.TestCompletableFuture - 第二阶段:5 21:49:46.568 [main] INFO com.txw.ms.sample.service.TestCompletableFuture - test14doubleGet future1 start 21:49:50.582 [main] INFO com.txw.ms.sample.service.TestCompletableFuture - test14doubleGet end Process finished with exit code 0
第2行,主线程执行。
第3行,开始异步任务1。
第5行,同时,开始异步任务2。注意:异步任务2的开始不需要等待异步任务1的结束,也不需要等待第4行主线程。
第4行,异步任务1结束后,执行主线程。注意:主线程被异步任务的get()阻塞,需要等待异步任务结束。
第6行,异步任务2结束后,执行主线程。注意:主线程被异步任务的get()阻塞,需要等待异步任务结束。
join()与get()区别
join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常。另外get() 可以指定超时时间。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number1 = 1;
log.info("第一阶段:" + number1);
try {
TimeUnit.SECONDS.sleep(number1);
} catch (InterruptedException e) {
log.debug(e.getMessage());
Thread.currentThread().interrupt();
}
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = 7;
log.info("第二阶段:" + number2);
try {
TimeUnit.SECONDS.sleep(number2);
} catch (InterruptedException e) {
log.debug(e.getMessage());
Thread.currentThread().interrupt();
}
return number2;
});
try {
log.info("test13doubleGet start...");
future1.get();
future2.get(1, TimeUnit.SECONDS);
log.info("test13doubleGet end");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.debug(e.getMessage());
Thread.currentThread().interrupt();
}
在上面的例子中,异步任务future1正常执行并结束,异步任务future2执行需要7s,通过get()指定超时时间1s。实际的阻塞时间可能不止1s。
try {
log.info("test13doubleGet start...");
CompletableFuture.allOf(future1, future2).get(1, TimeUnit.SECONDS);
log.info("test13doubleGet end");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.debug(e.getMessage());
Thread.currentThread().interrupt();
}
改用CompletableFuture.allOf()并指定超时时间1s。效果达到预期。
complete() 手动完成任务
手动完成任务。所有等待这个Future的客户端都将得到指定的结果。并且对completableFuture.complete()的后续调用将被忽略。
提交任务 runAsync() 与 supplyAsync()
runAsync()
异步调用没有返回值。
supplyAsync()
异步调用有返回值。
任务编排
链式处理 thenRun()、thenAccept() 和 thenApply()
需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。
thenRun() 无入参无返回
thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。
thenAccept() 有入参无返回
thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。
thenApply() 有入参有返回
thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。
thenApply和thenCompose的区别
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
两个方法的返回值都是CompletionStage<U>,不同之处在于它们的传入参数fn.
对于thenApply,fn函数是一个对一个已完成的stage或者说CompletableFuture的返回值进行计算、操作。
对于thenCompose,fn函数是对另一个CompletableFuture进行计算、操作。
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> 123).thenApply(num -> "thenApply任务:" + num); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> 456).thenCompose(num -> CompletableFuture.supplyAsync(() -> "thenCompose任务:" + num)); System.out.println(f1.join()); System.out.println(f2.join());
组合任务 thenCompose() 与 thenCombine() - 都完成
thenCompose()
thenCompose主要用于有前后依赖关系之间的任务进行连接。
合并两个有依赖关系的 CompletableFutures 的执行结果。组合两个future,并将前一个任务的返回结果作为下一个任务的参数,存在先后顺序。
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
//合并
CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再来一个 CompletableFuture
CompletableFuture.supplyAsync(() -> {
return i + 1;
}));
System.out.println(future.get());
System.out.println(future1.get());
}
thenCombine() (组合两个future,获取它们的返回值,有返回值)
thenCombine主要用于没有前后依赖关系之间的任务进行连接。
组合两个future,获取它们的返回值,并返回当前任务的返回值。
private static Integer num = 5;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
});
//合并两个结果
CompletableFuture<Object> future = job1.thenCombine(job2, new
BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer a, Integer b) {
List<Integer> list = new ArrayList<>();
list.add(a);
list.add(b);
return list;
}
});
System.out.println("合并结果为:" + future.get());
}
注意:上例代码可能输出不同结果,因为jobA和jobB并发执行且顺序随机,而且操作了同一个静态数据。
thenAcceptBoth (组合两个future,获取它们的执行结果,没有返回值)
private static void test11thenAcceptBoth() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number1 = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段:" + number1);
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段:" + number2);
return number2;
});
CompletableFuture job = future1.thenAcceptBoth(future2, (num1, num2) -> {
System.out.println("最终结果:" + (num1 + num2));
});
job.get();
}
注意:如果future1.get()则只会获得future1的执行结果。
而future1和future2哪个先执行是随机的。
CompletableFuture使用详解 - 周文豪 - 博客园 (cnblogs.com)
runAfterBoth (组合两个future,不获取它们的执行结果,没有返回值)
private static void test12runAfterBoth() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number1 = new Random().nextInt(3) + 1;
System.out.println("第一阶段:" + number1);
try {
TimeUnit.SECONDS.sleep(number1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = new Random().nextInt(3) + 1;
System.out.println("第二阶段:" + number2);
try {
TimeUnit.SECONDS.sleep(number2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number2;
});
CompletableFuture job = future1.runAfterBoth(future2, () -> {
System.out.println("两个任务完成。");
});
job.get();
}
thenCombine / thenAcceptBoth / runAfterBoth都是组合的任务互相不依赖
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务。
区别在于,
thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;
runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
组合任务 applyToEither, acceptEither, runAfterEither - 任意一个完成
applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务(其他线程依然会继续执行)。
其区别在于
applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;
acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;
runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
多任务组合allOf()与anyOf()
allOf()
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
private static void test9allOf() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> jobA = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return num;
});
CompletableFuture<Integer> jobB = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return num;
});
CompletableFuture<Integer> jobC = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return num;
});
CompletableFuture allofJob = CompletableFuture.allOf(jobA, jobB, jobC)
.whenComplete((result, ex) -> {
if (ex != null) {
log.info("exception: " + ex.getMessage());
} else {
log.info("result: " + result);
}
});
log.info("return: " + jobA.get());
}
注意:在jobA,jobB和jobC任务完成后,allOf()会触发allofJob任务的执行。而且,如果没有jobA.get()阻塞主线程,可能会因为主线程执行结束而无法打印异步任务的日志。
调用jobA.get()或者allofJob.get()都会阻塞主线程,获取对应任务的执行结果。
anyOf()
anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。
private static void test10anyOf() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> jobA = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return 5;
});
CompletableFuture<Integer> jobB = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return 6;
});
CompletableFuture<Integer> jobC = CompletableFuture.supplyAsync(() -> {
log.info("Start job: " + Thread.currentThread().getName());
log.info("End job: " + Thread.currentThread().getName());
return 7;
});
CompletableFuture job = CompletableFuture.anyOf(jobA, jobB, jobC)
.whenComplete((result, ex) -> {
if (ex != null) {
log.info("exception: " + ex.getMessage());
} else {
log.info("result: " + result);
}
});
log.info("return: " + job.get());
}
任务编排总结
串行任务
thenRun,任务A完成后继续任务B,任务B不处理任务A的结果,无返回。
thenAccept,任务A完成后继续任务B,任务B处理任务A的结果,无返回。
thenApply,任务A完成后继续任务B,任务B处理任务A的结果,并返回。
组合任务-都完成
thenCompose,任务A和任务B有依赖关系,任务A完成后继续任务B,任务B处理任务A的结果,并返回。
thenCombine,任务A和任务B没有依赖关系,同时进行,两个任务的执行结果作为thenCombine定义的apply方法的入参,有返回。
thenAcceptBoth,任务A和任务B没有依赖关系,同时进行,获取任务的执行结果,没有返回值。
runAfterBoth ,任务A和任务B没有依赖关系,同时进行,不需要获取执行结果,没有返回值。
组合任务-任一完成
applyToEither,任务A和任务B没有依赖关系,同时进行,已经执行完成的任务的执行结果作为方法入参,并有返回值;
acceptEither,任务A和任务B没有依赖关系,同时进行,将已经执行完成的任务的执行结果作为方法入参,没有返回值;
runAfterEither,任务A和任务B没有依赖关系,同时进行,没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
多任务组合
allOf:多个任务同时进行,任务全部完成后返回CompletableFuture对象,调用该对象的get方法返回null。
anyOf:多个任务同时进行,其中一个任务完成后返回CompletableFuture对象。
异常处理exceptionall,whenComplete,handle
exceptionally有返回
异常处理,任务出现异常时触发。
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中。
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
System.out.println("加 10 任务开始");
num += 10;
return num;
}).exceptionally(ex -> {
//异常内容输出
System.out.println(ex.getMessage());
return -1;
});
System.out.println(future.get());
}
使用方式类似于 try catch中的catch代码块中异常处理;当某个任务执行异常时将抛出的异常作为参数传递到回调方法中。
上述例子中,supplyAsync().thenApply().exceptionally(),如果supplyAsync()发生异常,会被exceptionally()捕获处理,而且thenApply()不再执行。
whenComplete()
whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。
不论上一个阶段是正常/异常完成(即不会对阶段的原来结果产生影响);类似于 try..catch..finanlly 中 finally 代码块,无论是否发生异常都将会执行的。
当某个任务执行完成后,会将执行结果或者执行期间抛出的异常传递给回调方法:
如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,
如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
whenComplete()接收参数包括异步任务的返回结果和可能的异常。对于异常的处理可以用于清理操作或记录日志,注意,处理后仍会自动抛出异常。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 10 / 0)
.thenApply(s -> {
System.out.println("执行了");
return s + 1;
})
.whenComplete((r, e) -> {
if (r != null && e == null) {
System.out.println("正常执行");
}
if (e != null) {
System.out.println("执行异常:" + e.getMessage());
}
});
System.out.println(futureA.get());
}
上述例子,出现异常后,不再执行thenApply(),执行了whenComplete(),但是仍然抛出异常,因为没有exceptionally。
注意:如果主线程没有处理whenComplete()抛出的异常,会造成业务中断。
private static void test4() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 10/0)
.thenApply(s -> {
System.out.println("执行了");
return s + 1;
}).exceptionally(e -> {
System.out.println(e.getMessage());
return 100;
})
.whenCompleteAsync((r,e) -> {
if(r != null && e == null){
System.out.println("正常执行");
}
if(e != null){
System.out.println("执行异常:" + e.getMessage());
}
});
System.out.println(futureA.get());
}
上述例子,异步任务抛出异常,thenApply()不再执行,expectionally()处理该异常,whenCompleteAsync()继续执行。
handle() 有返回
类似于thenAccept/thenRun,是最后一步的处理调用,但是同时可以处理异常。
handle与whenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。
handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。
handle()接收参数包括异步任务的返回结果和可能的异常,所以可以用于处理异常。
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// int i= 1/0;
System.out.println("加 10 任务开始");
num += 10;
return num;
}).handle((i, ex) -> {
System.out.println("进入 handle 方法");
if (ex != null) {
System.out.println("发生了异常,内容为:" + ex.getMessage());
return -1;
} else {
System.out.println("正常完成,内容为: " + i);
return i;
}
});
System.out.println(future.get());
}
产出型方法,即可以对正常完成的结果进行转换,也可以对异常完成的进行补偿一个结果,即可以改变阶段的现状。
跟whenComplete()基本一致,区别在于handle()的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关。
和whenComplete()不同,handle()不会继续抛出异常。
简言之,handle()可以处理异常,不会继续抛出异常。
whenComplete()可以处理异常,仍会继续抛出异常。
exceptionally()专门处理异常。
Java多线程之CompletableFuture_爱吃牛肉的大老虎的博客-CSDN博客
java 异步编程 CompletableFuture_compleablefuture异步编程_帅气的喵喵的博客-CSDN博客
问题
1.runAsync是否可以使用exceptionally
可以。在exceptionally中返回null。
private static void test5() throws ExecutionException, InterruptedException, TimeoutException {
final CompletableFuture completableFuture = CompletableFuture
.runAsync(() -> log.info("" + 1 / 0))
.exceptionally(ex -> {
log.info("exception: " + ex.getMessage());
return null;
});
log.info("result: " + completableFuture.get(1, TimeUnit.SECONDS));
}
2.trycatch和exceptionally的区别
3.同一个任务,调用多次get(),该任务会执行多次吗?
不会。任务只会执行一次,返回结果复用。
private static void test8supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> job = CompletableFuture.supplyAsync(() -> {
log.info("Start job: ");
return num;
});
log.info("return: " + job.get() * job.get());
}
4. 定义了runAsync,任务会自动执行吗?
会。
注意如果主线程提前结束,可能导致任务没有完整执行。可以使用CompletableFuture.get()或者join()方法阻塞线程,等待任务完成。
在非主线程场景(如web容器)中无需额外处理。
误区
1.没有设置超时时间
CompletableFuture.get(3, TimeUnit.SECONDS)
2.没有使用自定义线程池
默认使用ForkJoinPool。
3.没有异常处理
4.多个异步任务操作同一个对象
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 10 / 0)
.thenApply(s -> {
System.out.println("执行了");
return s + 1;
})
.whenComplete((r, e) -> {
if (r != null && e == null) {
System.out.println("正常执行");
}
if (e != null) {
System.out.println("执行异常:" + e.getMessage());
}
});
System.out.println(futureA.get());









