暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

CompletableFuture基础实践小结

18

写在文章开头

CompletableFuture继承了CompletionStage接口和Future接口,在原有Future的基础上增加了异步回调、流式处理以及任务组合,成为JDK8多任务协同场景下一个有效利器。所以笔者今天就以此文演示一下CompletableFuture基础实践案例。

你好,我叫sharkchili,目前还是在一线奋斗的Java开发,经历过很多有意思的项目,也写过很多有意思的文章,是CSDN Java领域的博客专家,也是Java Guide的维护者之一,非常欢迎你关注我的公众号:写代码的SharkChili,这里面会有笔者精心挑选的并发、JVM、MySQL数据库专栏,也有笔者日常分享的硬核技术小文。

CompletableFuture使用示例

提交有返回值的异步任务

通过supplyAsync提交我们的异步任务,然后通过get方法等待异步任务完成并获取返回结果。

    public static void main(String[] args) throws Exception {
    提交一个CompletableFuture任务
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
    long start = System.currentTimeMillis();


    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("work complete! cost:" +(System.currentTimeMillis() - start) + " ms");
    return 1;
    });




    System.out.println("main thread working");


    通过get方法阻塞获取任务执行结果
    System.out.println("supplyAsync result: " + task.get());


    System.out.println("main thread finish");
    }

    输出结果如下,可以看出CompletableFuture的get方法会阻塞主线程工作,直到得到返回值为止。

      main thread working
      work complete! cost:1001 ms
      supplyAsync result: 1
      main thread finish

      对此我们不妨来看看get方法是如何做到阻塞主线程并等待异步线程任务执行完成的。从下面这段源码我们可以看到get方法的执行步骤:

      1. 调用reportGet查看异步任务是否将结果赋值给result。
      2. 如果不为null直接返回。
      3. 若为null则调用waitingGet等待任务返回。
        public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
        }

        查看reportGet方法可以看到逻辑也很简单,如果r为空则直接抛中断异常,如果r存在异常则直接将异常抛出,如果有结果则将结果返回。

            private static <T> T reportGet(Object r)
          throws InterruptedException, ExecutionException {
          如果结果为null直接抛出终端异常
          if (r == null) by convention below, null means interrupted
          throw new InterruptedException();
          如果结果有异常则将异常抛出
          if (r instanceof AltResult) {
          Throwable x, cause;
          if ((x = ((AltResult)r).ex) == null)
          return null;
          if (x instanceof CancellationException)
          throw (CancellationException)x;
          if ((x instanceof CompletionException) &&
          (cause = x.getCause()) != null)
          x = cause;
          throw new ExecutionException(x);
          }
          如果r正常则直接将结果返回出去
          @SuppressWarnings("unchecked") T t = (T) r;
          return t;
          }

          waitingGet源码相对复杂一些,整体步骤我们可以拆解为while循环内部和while循环外部,我们先来看看while循环内部的执行流程:

          1. while循环从任务中获取result,如果result为空,则进入循环。
          2. 如果spins小于0,说明刚刚进入循环内部,可以自旋等待一下任务的获取,设置好spins(spins的值从SPINS来,如果多核的情况下值为256),进入下一次循环。
          3. 进入循环发现spins大于0,则随机生成一个数,如果这个数大于等于0则--spins,进入下次循环。
          4. 不断执行步骤3的操作,知道spins等于0。
          5. 此时判断来到q==null,说明任务自旋等待了一段时间还是没有结果,我们需要将其挂起,首先将线程封装成一个Signaller,进入下一次循环。
          6. 循环会判断if (!queued)
            ,将要阻塞的任务放到栈中,进入下一次循环。
          7. 循环下一次会来到if (q.thread != null && result == null)
            ,说明q线程不为空且没有结果,我们需要将其打断,调用ForkJoinPool.managedBlock(q)
            将其打断,直至有结果后才结束循环。

          while循环外操作就简单了,来到循环尾部时,result已经有值了,代码执行postComplete完成任务,并将结果返回。

            private Object waitingGet(boolean interruptible) {
            Signaller q = null;
            boolean queued = false;
            int spins = -1;
            Object r;
            如果result为空则进入循环
            while ((r = result) == null) {
            如果spins小于0,说明刚刚进入循环内部,可以自旋等待一下任务的获取,设置好spins(spins的值从SPINS来,如果多核的情况下值为256),自此,第一次循环步骤结束
            if (spins < 0)
            spins = SPINS;


            //这一步的操作是自旋等待任务结果,所以代码进入循环发现spins大于0,则随机生成一个数,如果这个数大于等于0则--spins,进入下次循环,直到循环spins变为0
            else if (spins > 0) {
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
            --spins;
            }
            此时判断来到q==null,说明任务自旋等待了一段时间还是没有结果,我们需要将其挂起,首先将线程封装成一个Signaller,结束本次循环
            else if (q == null)
            q = new Signaller(interruptible, 0L, 0L);
            //上一步我们将任务封装成Signaller,这里就将其存入栈中,然后结束循环
            else if (!queued)
            queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
            q.thread = null;
            cleanStack();
            return null;
            }
            循环来到这说明q线程不为空且没有结果,我们需要将其打断,调用`ForkJoinPool.managedBlock(q)`将其打断,直至有结果后才结束循环
            else if (q.thread != null && result == null) {
            try {
            ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
            q.interruptControl = -1;
            }
            }
            }
            if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
            if (interruptible)
            r = null; report interruption
            else
            Thread.currentThread().interrupt();
            }
            }
            结束循环,调用postComplete结束任务并返回结果r
            postComplete();
            return r;
            }

            提交无返回值的异步任务

            通过runAsync提交一个无返回值的异步任务,这里我们为了实现任务执行完成再关闭主线程用了个get阻塞等待任务完成。

              public static void main(String[] args) throws ExecutionException, InterruptedException {
              CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
              long start = System.currentTimeMillis();
              System.out.println(Thread.currentThread().getName() + "开始工作了,执行时间:" + start);
              try {
              Thread.sleep(1000);
              } catch (InterruptedException e) {
              e.printStackTrace();
              }
              System.out.println(Thread.currentThread().getName() + "结束工作了,总执行时间:" + (System.currentTimeMillis() - start));
              });


              System.out.println("主线程开始运行");
              get阻塞主线程等待任务结束
              supplyAsync.get();
              System.out.println("主线程运行结束");
              }

              输出结果

                主线程开始运行
                ForkJoinPool.commonPool-worker-1开始工作了,执行时间:1651251489755
                ForkJoinPool.commonPool-worker-1结束工作了,总执行时间:1010
                主线程运行结束

                将异步任务提交给自己的线程池处理

                查看supplyAsync方法的源码我们发现,我们提交的任务默认情况下会交给asyncPool这个线程池处理。

                   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
                  return asyncSupplyStage(asyncPool, supplier);
                  }

                  查看asyncPool 我们可以看到如果服务器是多核的情况下返回的是一个commonPool,commonPool默认线程池数为CPU核心数。

                     private static final Executor asyncPool = useCommonPool ?
                    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

                    所以如果某些情况下我们希望将任务提交到我们自己的线程池中,就建议通过supplyAsync的第二个参数告知CompletableFuture自己要用自定义线程池。

                      public static void main(String[] args) throws ExecutionException, InterruptedException {
                      ExecutorService executorService = Executors.newSingleThreadExecutor();


                      使用第二个参数告知CompletableFuture使用的线程池
                      CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
                      long start = System.currentTimeMillis();
                      System.out.println(Thread.currentThread() + "开始工作了,执行时间:" + start);
                      try {
                      Thread.sleep(2000);
                      } catch (InterruptedException e) {
                      e.printStackTrace();
                      }
                      打印当前执行任务的线程
                      System.out.println(Thread.currentThread() + "结束工作了,总执行时间:" + (System.currentTimeMillis() - start));
                      return 1;
                      }, executorService);


                      System.out.println("主线程开始运行");
                      System.out.println("输出结果 " + supplyAsync.get());
                      System.out.println("主线程运行结束");


                      executorService.shutdown();
                      while (executorService.isTerminated()) {


                      }
                      }

                      从输出结果也可以看出这里使用的线程池是我们自定义的线程池

                        主线程开始运行
                        Thread[pool-1-thread-1,5,main]开始工作了,执行时间:1651251851358
                        Thread[pool-1-thread-1,5,main]结束工作了,总执行时间:2005
                        输出结果 1
                        主线程运行结束


                        thenApply和thenApplyAsync

                        thenApply 适用那些需要顺序执行的异步任务,例如我们希望将第一个任务的返回值交给第二个异步任务,就可以使用thenApply将两个任务组合起来。

                          public static void main(String[] args) throws ExecutionException, InterruptedException {
                          ExecutorService executorService = Executors.newFixedThreadPool(5);
                          CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                          System.out.println(Thread.currentThread() + "开始工作了");
                          try {
                          Thread.sleep(2000);
                          } catch (InterruptedException e) {
                          e.printStackTrace();
                          }
                          System.out.println(Thread.currentThread() + "结束工作了");
                          return 100;
                          }, executorService);


                          将两个任务组合起来
                          CompletableFuture<String> task2 = task1.thenApply((data) -> {
                          System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
                          try {
                          Thread.sleep(2000);
                          } catch (InterruptedException e) {
                          e.printStackTrace();
                          }
                          return "第一个线程的结果为 " + data;
                          });






                          System.out.println("获取组合任务结果");
                          System.out.println("组合任务处理结果为: " + task2.get());
                          System.out.println("获取组合任务结果结束");


                          executorService.shutdown();
                          while (executorService.isTerminated()) {


                          }
                          }

                          输出结果可以看到,任务1执行完成后任务2接着执行了。

                            Thread[pool-1-thread-1,5,main]开始工作了
                            获取组合任务结果
                            Thread[pool-1-thread-1,5,main]结束工作了
                            第二个线程:Thread[pool-1-thread-1,5,main]开始工作了
                            组合任务处理结果为: 第一个线程的结果为 100
                            获取组合任务结果结束

                            thenApplyAsync与thenApply不同的是,在第一个异步任务有指定线程池的情况下,第二个异步任务会被提交到其他线程池中,所以这里我们可以说明一个规律,带有Async关键字的方法支持组合任务时,将任务提交到不同的线程池中。

                              public static void main(String[] args) throws ExecutionException, InterruptedException {
                              ExecutorService executorService = Executors.newFixedThreadPool(3);
                              CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                              System.out.println(Thread.currentThread()+"开始工作了");
                              try {
                              Thread.sleep(1000);
                              } catch (InterruptedException e) {
                              e.printStackTrace();
                              }
                              System.out.println(Thread.currentThread()+"结束工作了");
                              return 100;
                              },executorService);


                              CompletableFuture<String> task2 = task1.thenApplyAsync((data) -> {
                              System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
                              try {
                              Thread.sleep(2000);
                              } catch (InterruptedException e) {
                              e.printStackTrace();
                              }
                              return "第一个线程的结果为 " + data;
                              });






                              System.out.println("获取任务结果开始");
                              System.out.println("任务的结果 "+task2.get());
                              System.out.println("获取任务结果结束");


                              executorService.shutdown();
                              while (executorService.isTerminated()){


                              }
                              }

                              输出结果

                                Thread[pool-1-thread-1,5,main]开始工作了
                                获取任务结果开始
                                Thread[pool-1-thread-1,5,main]结束工作了
                                第二个线程:Thread[ForkJoinPool.commonPool-worker-9,5,main]开始工作了
                                任务的结果 第一个线程的结果为 100
                                获取任务结果结束

                                thenAccept和thenRun

                                thenAccept和thenRun都会在上一个任务执行结束后才会继续执行。两者唯一区别时:

                                1. thenAccept在上一个任务执行结束后,将上一个任务返回结果作为入参,但无返回值。
                                1. thenRun会在上一个任务执行结束后才开始处理,既没有入参也没有返回值。

                                以下便是笔者的使用示例:

                                  public static void main(String[] args) throws ExecutionException, InterruptedException {
                                  ExecutorService executorService = Executors.newFixedThreadPool(5);




                                  CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
                                  System.out.println("task线程:" + Thread.currentThread().getName() + "开始工作了");
                                  try {
                                  Thread.sleep(2000);
                                  } catch (InterruptedException e) {
                                  e.printStackTrace();
                                  }
                                  System.out.println("task线程:" + Thread.currentThread().getName() + "结束工作了");
                                  return 200;
                                  }, executorService);


                                  CompletableFuture<Integer> task2 = task.thenApply((data) -> {
                                  System.out.println("task2线程:" + Thread.currentThread().getName() + "开始工作了");
                                  try {
                                  Thread.sleep(2000);
                                  } catch (InterruptedException e) {
                                  e.printStackTrace();
                                  }
                                  System.out.println("task2线程:" + Thread.currentThread().getName() + "执行结束");
                                  return data;
                                  });


                                  thenAccept 收上一个任务的入参,但无返回值
                                  CompletableFuture<Void> task3 = task2.thenAccept((data) -> {
                                  System.out.println("task3线程:" + Thread.currentThread().getName() + ",该任务接收上一个任务的结果,但无返回值,收到上一个任务的结果值为 " + data);
                                  });


                                  thenRun在上一个任务结束后执行,既无入参也无出参
                                  CompletableFuture<Void> task4 = task3.thenRun(() -> {
                                  System.out.println("task4在上一个任务结束后继续执行,无入参,也无返回值");
                                  });




                                  System.out.println("尝试获取最终执行结果");
                                  task4.get();
                                  System.out.println("执行任务直至task4 ");
                                  System.out.println("任务全部执行结束");


                                  executorService.shutdown();
                                  while (executorService.isTerminated()) {


                                  }
                                  }

                                  输出结果

                                    task线程:pool-1-thread-1开始工作了
                                    尝试获取最终执行结果
                                    task线程:pool-1-thread-1结束工作了
                                    task2线程:pool-1-thread-1开始工作了
                                    task2线程:pool-1-thread-1执行结束
                                    task3线程:pool-1-thread-1,该任务接收上一个任务的结果,但无返回值,收到上一个任务的结果值为 200
                                    task4在上一个任务结束后继续执行,无入参,也无返回值
                                    执行任务直至task4
                                    任务全部执行结束


                                    exceptionally

                                    假如我们的任务1执行过程中可能报错,我们希望能够从逻辑的角度处理掉,那么我们就可以在任务1后面接一个exceptionally方法,然后再接上任务2。这样一来,任务1执行报错就会走到exceptionally,反之就会走到任务2的代码段。

                                      public static void main(String[] args) throws ExecutionException, InterruptedException {
                                      CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                                      System.out.println("task1 开始工作了");
                                      随机生成被除数,为0会抛出算术异常
                                      int num = RandomUtil.randomInt(0, 2);
                                      int result = 10 num;
                                      System.out.println("task1 结束工作");
                                      return 200;
                                      });


                                      假如task1报错,任务会走到这个任务上
                                      CompletableFuture<Integer> exceptionally = task1.exceptionally((e) -> {
                                      System.out.println("上一个任务报错了,错误信息" + e.getMessage());
                                      return -1;
                                      });


                                      CompletableFuture task2 = task1.thenAccept((param) -> {
                                      System.out.println("走到正常的结束分支了,task1执行结果:" + param);
                                      });


                                      System.out.println("主线程开始运行");
                                      // 调用错误捕获的任务执行结束也会自动走到正常结束的分支
                                      System.out.println("输出结果 " + exceptionally.get());
                                      System.out.println("主线程运行结束");
                                      }

                                      执行正常的输出结果:

                                        task1 开始工作了
                                        主线程开始运行
                                        task1 结束工作
                                        走到正常的结束分支了:200
                                        输出结果 200
                                        主线程运行结束


                                        执行异常的输出结果:

                                          task1 开始工作了
                                          主线程开始运行
                                          上一个任务报错了,错误信息java.lang.ArithmeticException: by zero
                                          输出结果 -1
                                          主线程运行结束

                                          whenComplete

                                          对于上面的例子,我们完全可以用whenComplete来简化,whenComplete会接收两个入参:

                                          1. 入参1为上一个任务的返回值。
                                          2. 入参2比较特殊,如果上一个任务抛出异常,则第2个入参不为空。

                                          所以上一个例子的代码我们可以简化成这样,需要注意的是whenComplete返回结果是上一个任务的执行结果,我们无法返回任务2的执行结果。

                                             public static void main(String[] args) {


                                            CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
                                            System.out.println("任务1开始工作");
                                            int num = RandomUtil.randomInt(0, 2);
                                            int result = 10 num;
                                            System.out.println("任务1执行结束,执行结果:" + result);
                                            return result;
                                            });


                                            CompletableFuture<Integer> task2 = task.whenComplete((result, err) -> {
                                            System.out.println("任务2开始工作");


                                            if (err != null) {
                                            System.out.println("任务1执行报错,报错原因:" + err.getMessage());
                                            return;
                                            }


                                            System.out.println("任务1正常结束,执行结果:" + result);


                                            });




                                            try {
                                            System.out.println("task2拿到最终执行结果 " + task2.get());
                                            } catch (Exception e) {


                                            }
                                            System.out.println("全流程结束");




                                            }

                                            错误的输出结果

                                              任务1开始工作
                                              任务2开始工作
                                              任务1执行报错,报错原因:java.lang.ArithmeticException: by zero
                                              全流程结束

                                              正确执行的输出结果:

                                                任务1开始工作
                                                任务1执行结束,执行结果:10
                                                任务2开始工作
                                                任务1正常结束,执行结果:10
                                                task2拿到最终执行结果 10
                                                全流程结束

                                                handle

                                                handle使用和whenComplete差不多,唯一的区别就是whenComplete返回的是上一个任务的结果,而handle可以返回自己的结果。

                                                代码如下所示

                                                  public static void execute1() throws ExecutionException, InterruptedException {
                                                  CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                                                  System.out.println(Thread.currentThread() + "开始工作了");
                                                  try {
                                                  Thread.sleep(200);
                                                  } catch (InterruptedException e) {
                                                  e.printStackTrace();
                                                  }


                                                  Random random = new java.util.Random();
                                                  int num = random.nextInt(10);
                                                  if (num < 5) {
                                                  throw new RuntimeException("报错了 num:" + num);
                                                  }
                                                  System.out.println(Thread.currentThread() + "结束工作了");
                                                  return num;
                                                  });


                                                  CompletableFuture<String> future2 = future.handle((result, err) -> {
                                                  System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
                                                  try {
                                                  Thread.sleep(2000);
                                                  } catch (InterruptedException e) {
                                                  e.printStackTrace();
                                                  }


                                                  if (err != null) {
                                                  System.out.println(err.getMessage());
                                                  ;return "fail";
                                                  }
                                                  return "sucdess";
                                                  });




                                                  System.out.println("拿第1个任务的结果");
                                                  System.out.println("第1个任务的结果 " + future2.get());
                                                  System.out.println("第1个任务结果结束");






                                                  **
                                                  * 输出结果
                                                  * Thread[pool-1-thread-1,5,main]开始工作了
                                                  * 拿第一个任务的结果
                                                  * Thread[pool-1-thread-1,5,main]结束工作了
                                                  * 第二个线程:Thread[pool-1-thread-1,5,main]开始工作了
                                                  * 100
                                                  * 第一个任务结果结束
                                                  * 拿第2个任务的结果
                                                  * 第二个任务的结果 第一个线程的结果为 100
                                                  * 第2个任务结果结束
                                                  */


                                                  }

                                                  thenCombine thenAcceptBoth runAfterBoth

                                                  这几个方法都是将两个任务组合起来执行的,只有两个任务都顺利完成了,才会执行之后的方法,唯一的区别是:

                                                  1. thenCombine 接收两个任务的返回值,并返回自己的返回值。
                                                    public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                    CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                                                    System.out.println("task开始工作");
                                                    int num = RandomUtil.randomInt(0, 100);
                                                    System.out.println("task结束工作");
                                                    return num;
                                                    });




                                                    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
                                                    System.out.println("task2开始工作");
                                                    int num = RandomUtil.randomInt(0, 100);
                                                    System.out.println("task2结束工作");
                                                    return num;
                                                    });


                                                    通过thenCombine将两个任务组合起来
                                                    CompletableFuture<Integer> completableFuture = task1.thenCombine(task2, (result1, result2) -> {
                                                    System.out.println("task1返回结果:" + result1 + " task2返回结果:" + result2);
                                                    return result1 + result2;
                                                    });




                                                    System.out.println(completableFuture.get());




                                                    }

                                                    输出结果如下:

                                                      task开始工作
                                                      task2开始工作
                                                      task结束工作
                                                      task2结束工作
                                                      task1返回结果:30 task2返回结果:1
                                                      31
                                                      1. thenAcceptBoth 接收两个参数返回值,但没有返回值。
                                                        public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                                                        System.out.println("task开始工作");
                                                        int num = RandomUtil.randomInt(0, 100);
                                                        System.out.println("task结束工作");
                                                        return num;
                                                        });




                                                        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
                                                        System.out.println("task2开始工作");
                                                        int num = RandomUtil.randomInt(0, 100);
                                                        System.out.println("task2结束工作");
                                                        return num;
                                                        });


                                                        通过 thenAcceptBoth 将两个任务组合起来,获取前两个任务处理结果,但自己不返回结果
                                                        CompletableFuture<Void> completableFuture = task1.thenAcceptBoth(task2, (result1, result2) -> {
                                                        System.out.println("task1返回结果:" + result1 + " task2返回结果:" + result2);


                                                        });




                                                        completableFuture.get();




                                                        }

                                                        输出结果:

                                                          task开始工作
                                                          task2开始工作
                                                          task结束工作
                                                          task2结束工作
                                                          task1返回结果:66 task2返回结果:10
                                                          1. runAfterBoth 既不能接收入参,也无返回值,待前两个任务执行完成后才能执行。
                                                             public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                            CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
                                                            System.out.println("task开始工作");
                                                            int num = RandomUtil.randomInt(0, 100);
                                                            System.out.println("task结束工作");
                                                            return num;
                                                            });




                                                            CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
                                                            System.out.println("task2开始工作");
                                                            int num = RandomUtil.randomInt(0, 100);
                                                            System.out.println("task2结束工作");
                                                            return num;
                                                            });


                                                            //通过 runAfterBoth 将两个任务组合起来,待前两个组合任务完成后执行,无入参、无出参
                                                            CompletableFuture<Void> completableFuture = task1.runAfterBoth(task2,()-> {
                                                            System.out.println("task1、task2处理完成" );


                                                            });




                                                            completableFuture.get();




                                                            }

                                                            输出结果:

                                                              task开始工作
                                                              task2开始工作
                                                              task结束工作
                                                              task2结束工作
                                                              task1、task2处理完成

                                                              applyToEither / acceptEither / runAfterEither

                                                              这种组合模式只要有一个异步任务成功,就会触发后续的方法,比如我们组合任务1和任务2,如果任务1执行完成就直接执行任务3,无视任务2。反之任务2先完成直接执行任务3,无视任务1。

                                                              和上一个组合模式一样,依次规律也是:

                                                              1. 接收入参,含返回值。
                                                                 public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                                CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);




                                                                CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);


                                                                CompletableFuture<String> completableFuture = task.applyToEither(task2, (result) -> {
                                                                if (result == 1) {
                                                                System.out.println("task1先完成任务");
                                                                return "task1";
                                                                }
                                                                System.out.println("task2先完成任务");
                                                                return "task2";
                                                                });




                                                                System.out.println("最先完成任务的是:" + completableFuture.get());




                                                                }
                                                                1. 接收入参,无返回值。
                                                                  public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                                  CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);




                                                                  CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);


                                                                  CompletableFuture<Void> completableFuture = task.acceptEither(task2, (result) -> {
                                                                  System.out.println("result:" + result);
                                                                  if (result == 1) {
                                                                  System.out.println("task1先完成任务");
                                                                  return;
                                                                  }
                                                                  System.out.println("task2先完成任务");
                                                                  });




                                                                  completableFuture.get();




                                                                  }
                                                                  1. 无入参,无返回值。
                                                                    public static void main(String[] args) throws ExecutionException, InterruptedException {


                                                                    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
                                                                    System.out.println("task1开始工作");
                                                                    try {
                                                                    TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
                                                                    } catch (InterruptedException e) {
                                                                    e.printStackTrace();
                                                                    }
                                                                    System.out.println("task1结束工作");
                                                                    return 1;
                                                                    });




                                                                    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync( () -> {
                                                                    System.out.println("task2 开始工作");
                                                                    try {
                                                                    TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
                                                                    } catch (InterruptedException e) {
                                                                    e.printStackTrace();
                                                                    }
                                                                    System.out.println("task2 结束工作");
                                                                    return 2;
                                                                    });


                                                                    CompletableFuture<Void> completableFuture = task.runAfterEither(task2, () -> {
                                                                    System.out.println("有一个任务完成了");
                                                                    });




                                                                    completableFuture.get();




                                                                    }

                                                                    输出结果

                                                                      task1开始工作
                                                                      task2 开始工作
                                                                      task1结束工作
                                                                      有一个任务完成了

                                                                      thenCompose

                                                                      thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,例如我们希望任务1执行完成后执行任务2,任务2执行完成后返回执行任务3,最终结果是从任务3中获取。

                                                                         public static void main(String[] args) throws ExecutionException, InterruptedException {
                                                                        // 创建异步执行任务:
                                                                        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
                                                                        System.out.println("task1开始工作");
                                                                        int num=RandomUtil.randomInt(0,5);
                                                                        try {
                                                                        TimeUnit.SECONDS.sleep(num);
                                                                        } catch (InterruptedException e) {
                                                                        e.printStackTrace();
                                                                        }
                                                                        System.out.println("task1结束工作,处理结果:"+num);
                                                                        return num;
                                                                        });




                                                                        CompletableFuture<String> task2= task1.thenCompose((r)->{


                                                                        System.out.println("task2 开始工作");
                                                                        int num=RandomUtil.randomInt(0,5);
                                                                        try {
                                                                        TimeUnit.SECONDS.sleep(num);
                                                                        } catch (InterruptedException e) {
                                                                        e.printStackTrace();
                                                                        }
                                                                        System.out.println("task2 结束工作");




                                                                        return CompletableFuture.supplyAsync(()->{
                                                                        System.out.println("task3 开始工作,收到任务1的执行结果:"+r);
                                                                        return "task3 finished";
                                                                        });
                                                                        });


                                                                        System.out.println("执行结果->"+task2.get());




                                                                        }

                                                                        输出结果:

                                                                          task1开始工作
                                                                          task1结束工作,处理结果:1
                                                                          task2 开始工作
                                                                          task2 结束工作
                                                                          task3 开始工作,收到任务1的执行结果:1
                                                                          执行结果->task3 finished

                                                                          allOf / anyOf

                                                                          allOf返回的CompletableFuture是所有任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常。

                                                                            public static void main(String[] args) {
                                                                            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                                                                            // 模拟异步任务1
                                                                            try {
                                                                            Thread.sleep(1000);
                                                                            } catch (InterruptedException e) {
                                                                            e.printStackTrace();
                                                                            }
                                                                            return "Hello";
                                                                            });


                                                                            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                                                                            // 模拟异步任务2
                                                                            try {
                                                                            Thread.sleep(2000);
                                                                            } catch (InterruptedException e) {
                                                                            e.printStackTrace();
                                                                            }
                                                                            return "World";
                                                                            });


                                                                            CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);


                                                                            allFutures.thenRun(() -> {
                                                                            // 所有异步任务完成后打印它们的结果
                                                                            String result1 = future1.join();
                                                                            String result2 = future2.join();
                                                                            System.out.println(result1 + " " + result2);
                                                                            });


                                                                            // 等待所有异步任务完成
                                                                            allFutures.join();
                                                                            }

                                                                            输出结果:

                                                                              Hello World

                                                                              而anyOf则是只要有一个任务完成就可以触发后续方法,并且可以返回先完成任务的返回值,这一点和上述applyToEither 例子差不多。

                                                                                public class Main {


                                                                                public static void main(String[] args) {
                                                                                CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                                                                                // 模拟异步任务1
                                                                                try {
                                                                                Thread.sleep(1000);
                                                                                } catch (InterruptedException e) {
                                                                                e.printStackTrace();
                                                                                }
                                                                                return "Hello";
                                                                                });


                                                                                CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                                                                                // 模拟异步任务2
                                                                                try {
                                                                                Thread.sleep(2000);
                                                                                } catch (InterruptedException e) {
                                                                                e.printStackTrace();
                                                                                }
                                                                                return "World";
                                                                                });


                                                                                CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);


                                                                                anyFuture.thenAccept(result -> {
                                                                                // 任何一个异步任务完成后打印它的结果
                                                                                System.out.println(result);
                                                                                });


                                                                                // 等待任何一个异步任务完成
                                                                                anyFuture.join();
                                                                                }
                                                                                }

                                                                                小结

                                                                                我是sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号:写代码的SharkChili,同时我的公众号也有我精心整理的并发编程JVMMySQL数据库个人专栏导航。

                                                                                参考

                                                                                Java8 CompletableFuture 用法全解:https://blog.csdn.net/qq_31865983/article/details/106137777

                                                                                源码解析 Java 的 compareAndSwapObject 到底比较的是什么?:https://blog.csdn.net/qq_40697071/article/details/103374783


                                                                                文章转载自写代码的SharkChili,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                评论