暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

为Future增加Callback

蹲厕所的熊 2018-08-27
479

蹲厕所的熊 转载请注明原创出处,谢谢!

Future是Java5增加的类,它用来描述一个异步计算的结果。你可以使用 isDone
方法检查计算是否完成,或者使用 get
方法阻塞住调用线程,直到计算完成返回结果。你也可以使用 cancel
方法停止任务的执行。

  1. public class FutureDemo {


  2.    public static void main(String[] args) {

  3.        ExecutorService es = Executors.newFixedThreadPool(10);

  4.        Future<Integer> f = es.submit(() ->{

  5.            // 长时间的任务计算

  6.            Thread.sleep(10000);

  7.            // 返回结果

  8.            return 100;

  9.        });


  10.        // 做一些其他操作

  11.        // ....


  12.        Integer result = f.get();

  13.        System.out.println(result);


  14. //        while (f.isDone()) {

  15. //            System.out.println(result);

  16. //        }

  17.    }

  18. }

在这个例子中,我们往线程池中提交了一个任务并立即返回了一个Future对象,接着可以做一些其他操作,最后利用它的 get
方法阻塞等待结果或 isDone
方法轮询等待结果(关于Future的原理可以参考之前的文章:【细谈Java并发】谈谈FutureTask)

虽然这些方法提供了异步执行任务的能力,但是对于结果的获取却还是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时的得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用Callback的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future
接口,提供了 addListener
等多个扩展方法。Google的guava也提供了通用的扩展Future: ListenableFuture
SettableFuture
以及辅助类 Futures
等,方便异步编程。为此,Java终于在8这个版本中增加了一个能力更强的Future类: CompletableFuture
。它提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果。

Netty-Future

首先引入Maven依赖:

  1. <dependency>

  2.    <groupId>io.netty</groupId>

  3.    <artifactId>netty-all</artifactId>

  4.    <version>4.1.29.Final</version>

  5. </dependency>

  1. public class NettyFutureDemo {


  2.    public static void main(String[] args) throws InterruptedException {

  3.        EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads

  4.        System.out.println("begin:" + DateUtils.getNow());

  5.        Future<Integer> f = group.submit(new Callable<Integer>() {

  6.            @Override

  7.            public Integer call() throws Exception {

  8.                System.out.println("开始耗时计算:" + DateUtils.getNow());

  9.                Thread.sleep(3000);

  10.                System.out.println("结束耗时计算:" + DateUtils.getNow());

  11.                return 100;

  12.            }

  13.        });

  14.        f.addListener(new FutureListener<Object>() {

  15.            @Override

  16.            public void operationComplete(Future<Object> objectFuture) throws Exception {

  17.                System.out.println("计算结果:" + objectFuture.get());

  18.            }

  19.        });

  20.        System.out.println("end:" + DateUtils.getNow());

  21.        new CountDownLatch(1).await();//不让守护线程退出

  22.    }

  23. }

输出结果:

  1. begin:2018-08-26 04:56:40:779

  2. end:2018-08-26 04:56:40:783

  3. 开始耗时计算:2018-08-26 04:56:40:783

  4. 结束耗时计算:2018-08-26 04:56:43:789

  5. 计算结果:100

从结果可以看出,耗时计算结束后自动触发Listener的完成方法,避免了主线程无谓的阻塞等待,那么它究竟是怎么做到的呢?

一探源码

DefaultEventExecutorGroup
实现了 EventExecutorGroup
接口,而 EventExecutorGroup
则是实现了JDK ScheduledExecutorService
接口的线程组接口,所以它拥有线程池的所有方法。然而它却把所有返回 java.util.concurrent.Future
的方法重写为返回 io.netty.util.concurrent.Future
,把所有返回 java.util.concurrent.ScheduledFuture
的方法重写为返回 io.netty.util.concurrent.ScheduledFuture

  1. public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

  2.    /**

  3.     * 从组里返回一个EventExecutor

  4.     */

  5.    EventExecutor next();


  6.    Iterator<EventExecutor> iterator();


  7.    Future<?> submit(Runnable task);

  8.    <T> Future<T> submit(Runnable task, T result);

  9.    <T> Future<T> submit(Callable<T> task);


  10.    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

  11.    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

  12.    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

  13.    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

  14. }

EventExecutorGroup
的submit方法因为 newTaskFor
的重写导致返回了netty的 Future
实现类,而这个实现类正是 PromiseTask

  1. @Override

  2. public <T> Future<T> submit(Callable<T> task) {

  3.    return (Future<T>) super.submit(task);

  4. }


  5. @Override

  6. protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {

  7.    return new PromiseTask<T>(this, callable);

  8. }

PromiseTask
的实现很简单,它缓存了要执行的 Callable
任务,并在run方法中完成了任务调用和Listener的通知。

  1. @Override

  2. public void run() {

  3.    try {

  4.        if (setUncancellableInternal()) {

  5.            V result = task.call();

  6.            setSuccessInternal(result);

  7.        }

  8.    } catch (Throwable e) {

  9.        setFailureInternal(e);

  10.    }

  11. }


  12. @Override

  13. public Promise<V> setSuccess(V result) {

  14.    if (setSuccess0(result)) {

  15.        notifyListeners();

  16.        return this;

  17.    }

  18.    throw new IllegalStateException("complete already: " + this);

  19. }


  20. @Override

  21. public Promise<V> setFailure(Throwable cause) {

  22.    if (setFailure0(cause)) {

  23.        notifyListeners();

  24.        return this;

  25.    }

  26.    throw new IllegalStateException("complete already: " + this, cause);

  27. }

任务调用成功或者失败都会调用 notifyListeners
来通知Listener,所以大家得在回调的函数里调用 isSuccess
方法来检查状态。

这里有一个疑惑,会不会 Future
在调用 addListener
方法的时候任务已经执行完成了,这样子会不会通知就会失败了啊?

  1. @Override

  2. public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {

  3.    synchronized (this) {

  4.        addListener0(listener);

  5.    }


  6.    if (isDone()) {

  7.        notifyListeners();

  8.    }


  9.    return this;

  10. }

可以发现,在Listener添加成功之后,会立即检查状态,如果任务已经完成立刻进行回调,所以这里不用担心啦。

Guava-Future

首先引入guava的Maven依赖:

  1. <dependency>

  2.    <groupId>com.google.guava</groupId>

  3.    <artifactId>guava</artifactId>

  4.    <version>22.0</version>

  5. </dependency>

  1. public class GuavaFutureDemo {


  2.    public static void main(String[] args) throws InterruptedException {

  3.        System.out.println("begin:" + DateUtils.getNow());

  4.        ExecutorService executorService = Executors.newFixedThreadPool(10);

  5.        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);

  6.        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {

  7.            @Override

  8.            public Integer call() throws Exception {

  9.                System.out.println("开始耗时计算:" + DateUtils.getNow());

  10.                Thread.sleep(3000);

  11.                System.out.println("结束耗时计算:" + DateUtils.getNow());

  12.                return 100;

  13.            }

  14.        });

  15.        future.addListener(new Runnable() {

  16.            @Override

  17.            public void run() {

  18.                System.out.println("调用成功");

  19.            }

  20.        }, executorService);

  21.        System.out.println("end:" + DateUtils.getNow());

  22.        new CountDownLatch(1).await();

  23.    }

  24. }

ListenableFuture
可以通过 addListener
方法增加回调函数,一般用于不在乎执行结果的地方。如果需要在执行成功时获取结果或者执行失败时获取异常信息,需要用到 Futures
工具类的 addCallback
方法:

  1. Futures.addCallback(future, new FutureCallback<Integer>() {

  2.    @Override

  3.    public void onSuccess(@Nullable Integer result) {

  4.        System.out.println("调用成功,计算结果:" + result);

  5.    }


  6.    @Override

  7.    public void onFailure(Throwable t) {

  8.        System.out.println("调用失败");

  9.    }

  10. }, executorService);

前面提到除了 ListenableFuture
外,还有一个 SettableFuture
类也支持回调能力。它实现自 ListenableFuture
,所以拥有 ListenableFuture
的所有能力。

  1. public class GuavaFutureDemo {


  2.    public static void main(String[] args) throws InterruptedException {

  3.        System.out.println("begin:" + DateUtils.getNow());

  4.        ExecutorService executorService = Executors.newFixedThreadPool(10);

  5.        ListenableFuture<Integer> future = submit(executorService);

  6.        Futures.addCallback(future, new FutureCallback<Integer>() {

  7.            @Override

  8.            public void onSuccess(@Nullable Integer result) {

  9.                System.out.println("调用成功,计算结果:" + result);

  10.            }


  11.            @Override

  12.            public void onFailure(Throwable t) {

  13.                System.out.println("调用失败:" + t.getMessage());

  14.            }

  15.        }, executorService);

  16.        Thread.sleep(1000);

  17.        System.out.println("end:" + DateUtils.getNow());

  18.        new CountDownLatch(1).await();

  19.    }


  20.    private static ListenableFuture<Integer> submit(Executor executor) {

  21.        SettableFuture<Integer> future = SettableFuture.create();

  22.        executor.execute(new Runnable() {

  23.            @Override

  24.            public void run() {

  25.                System.out.println("开始耗时计算:" + DateUtils.getNow());

  26.                try {

  27.                    Thread.sleep(3000);

  28.                } catch (InterruptedException e) {

  29.                    e.printStackTrace();

  30.                }

  31.                System.out.println("结束耗时计算:" + DateUtils.getNow());

  32.                // 设置返回值

  33.                future.set(100);

  34.                // 设置异常信息

  35. //                future.setException(new RuntimeException("custom error!"));

  36.            }

  37.        });

  38.        return future;

  39.    }

  40. }

看起来用法上没有太多差别,但是有一个很容易被忽略的重要问题。SettableFuture
的这种方式最后调用了 cancel
方法后,线程池中的任务还是会继续执行,而通过 submit
方法返回的 ListenableFuture
方法则会立即取消执行,这点尤其要注意。

一探源码

和Netty的Future一样,Guava也是通过实现了自定义的 ExecutorService
实现类 ListeningExecutorService
来重写了 submit
方法。

  1. public interface ListeningExecutorService extends ExecutorService {

  2.  <T> ListenableFuture<T> submit(Callable<T> task);

  3.  ListenableFuture<?> submit(Runnable task);

  4.  <T> ListenableFuture<T> submit(Runnable task, T result);

  5. }

同样的, newTaskFor
方法也被进行了重写,返回了自定义的Future类: TrustedListenableFutureTask

  1. @Override

  2. protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

  3.    return TrustedListenableFutureTask.create(runnable, value);

  4. }


  5. @Override

  6. protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {

  7.    return TrustedListenableFutureTask.create(callable);

  8. }

任务调用会走 TrustedFutureInterruptibleTask
的run方法:

  1. @Override

  2. public void run() {

  3.    TrustedFutureInterruptibleTask localTask = task;

  4.    if (localTask != null) {

  5.        localTask.run();

  6.    }

  7. }


  8. @Override

  9. public final void run() {

  10.    if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {

  11.        return; // someone else has run or is running.

  12.    }

  13.    try {

  14.        // 抽象方法,子类进行重写

  15.        runInterruptibly();

  16.    } finally {

  17.        if (wasInterrupted()) {

  18.            while (!doneInterrupting) {

  19.                Thread.yield();

  20.            }

  21.        }

  22.    }

  23. }

最终还是调用到 TrustedFutureInterruptibleTask
runInterruptibly
方法,等待任务完成后调用 set
方法。

  1. @Override

  2. void runInterruptibly() {

  3.    if (!isDone()) {

  4.        try {

  5.            set(callable.call());

  6.        } catch (Throwable t) {

  7.            setException(t);

  8.        }

  9.    }

  10. }


  11. protected boolean set(@Nullable V value) {

  12.    Object valueToSet = value == null ? NULL : value;

  13.    // CAS设置值

  14.    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {

  15.        complete(this);

  16.        return true;

  17.    }

  18.    return false;

  19. }

complete
方法的最后会获取到Listener进行回调。

上面提到的 SettableFuture
ListenableFuture
cancel
方法效果不同,原因在于一个重写了 afterDone
方法而一个没有。

下面是 ListenableFuture
afterDone
方法:

  1. @Override

  2. protected void afterDone() {

  3.    super.afterDone();


  4.    if (wasInterrupted()) {

  5.        TrustedFutureInterruptibleTask localTask = task;

  6.        if (localTask != null) {

  7.            localTask.interruptTask();

  8.        }

  9.    }


  10.    this.task = null;

  11. }

wasInterrupted
用来判断是否调用了 cancel
(cancel方法会设置一个取消对象Cancellation到value中)

  1. protected final boolean wasInterrupted() {

  2.    final Object localValue = value;

  3.    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;

  4. }

interruptTask
方法通过线程的 interrupt
方法真正取消线程任务的执行:

  1. final void interruptTask() {

  2.    Thread currentRunner = runner;

  3.    if (currentRunner != null) {

  4.        currentRunner.interrupt();

  5.    }

  6.    doneInterrupting = true;

  7. }

CompletableFuture

最后我们来说说Java8提供的一种更为高级的回调方式: CompletableFuture

  1. public class CompletableFutureTest {


  2.    public static void main(String[] args) throws InterruptedException {

  3.        System.out.println("begin:" + DateUtils.getNow());

  4.        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {

  5.            System.out.println("开始耗时计算:" + DateUtils.getNow());

  6.            try {

  7.                Thread.sleep(3000);

  8.            } catch (InterruptedException e) {

  9.                e.printStackTrace();

  10.            }

  11.            System.out.println("结束耗时计算:" + DateUtils.getNow());

  12.            return 100;

  13.        });

  14.        completableFuture.whenComplete((result, e) -> {

  15.            System.out.println("回调结果:" + result);

  16.        });

  17.        System.out.println("end:" + DateUtils.getNow());

  18.        new CountDownLatch(1).await();

  19.    }

  20. }

总结

由此看来,为Future模式增加回调功能是非常有必要的。它不需要阻塞等待结果的返回并且不需要消耗无谓的CPU资源去轮询处理状态,JDK8之前使用Netty或者Guava提供的工具类,JDK8之后则可以使用自带的 CompletableFuture
类。

快给你用到 Future
的地方增加Callback吧~


如果读完觉得有收获的话,欢迎点赞、关注、加公众号【蹲厕所的熊】,查阅更多精彩历史!!!

文章转载自蹲厕所的熊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论