前言
最近本人在实现一个异步任务调度框架,不打算依赖于任何第三方包。在实现任务状态监听时遇到了一些困惑,
于是想了解一下Guava中的ListenableFuture的实现方式。ListenableFuture实现非阻塞的方式是其提
供了回调机制(机制),下面将阐述该回调机制的实现,主要对Futures的addCallback方法源码进行剖析。
Guava Futures简介
Google Guava框架的 com.google.common.util.concurrent包是并发相关的包,它是对JDK自带
concurrent包中Future和线程池相关类的扩展,从而衍生出一些新类,并提供了更为广泛的功能。
在项目中常用的该包中类如下所示:
- ListenableFuture:该接口扩展了Future接口,增加了addListener方法,该方法在给定的excutor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用。
- FutureCallback:该接口提供了OnSuccess和onFailure方法。获取异步计算的结果并回调。
- MoreExecutors:该类是final类型的工具类,提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。
- ListenableFutureTask:该类是一个适配器,可以将其它Future适配成ListenableFuture。
- ListeningExecutorService:该类是对ExecutorService的扩展,重写了ExecutorService类中的submit方法,并返回ListenableFuture对象。
- JdkFutureAdapters:该类扩展了FutureTask类并实现ListenableFuture接口,增加了addListener方法。
- Futures:该类提供和很多实用的静态方法以供使用。
Futures.addCallback方法源码剖析
下面将模拟异步发送请求,并对请求结果进行(回调)监听。这里使用Spring框架提供的AsyncRestTemplate,
来发送http请求,并获取一个org.springframework.util.concurrent.ListenableFuture对象,
此时的对象是spring框架中的ListenableFuture对象。由于org.springframework.util.concurrent
包中只提供了最基本的监听功能,没有其它额外功能,这里将其转化成Guava中的ListenableFuture,
用到了JdkFutureAdapters这个适配器类。(以下源码来自guava-18.0.jar)
1 | AsyncRestTemplate tp = new AsyncRestTemplate(); |
Futures的addCallback方法通过传入ListenableFuture和FutureCallback(一般情况FutureCallback实现为内部类)来实现回调机制。
1 | //com.google.common.util.concurrent.Futures |
在addCallback方法中,我们发现多了一个 directExecutor()方法,这里的 directExecutor()方法
返回的是一个枚举类型的线程池,这样做的目的是提高性能,而线程池中的execute方法实质执行的是所的
传入参数Runnable 的run方法,可以把这里的线程池看作一个”架子”。
1 | //创建一个单实例的线程 接口需要显著的性能开销 提高性能 |
在具体的addCallback方法中,首先判断FutureCallback是否为空,然后创建一个线程,这个线程的run
方法中会获取到一个value值,这里的value值即为http请求的结果,然后将value值传入FutureCallback
的onSuccess方法,然后我们就可以在onSuccess方法中执行业务逻辑了。这个线程是如何执行的呢?继续
往下看,发现调用了ListenableFuture的addListener方法,将刚才创建的线程和上一步创建的枚举线程池传入。
1 | //增加回调 |
在addListener方法中,将待执行的任务和枚举型线程池加入ExecutionList中,ExecutionList的本质
是一个链表,将这些任务链接起来。具体可参考下方代码注释。
1 |
|
在ExecutionList的add方法中,判断是否执行完成,如果没有执行完成,则放入待执行的链表中并返回,
否则调用executeListener方法执行任务,在executeListener方法中,我们发现执行的是线程池的execute
方法,而execute方法实质的是调用了任务线程的run方法,这样最终会调用OnSuccess方法获取到执行结果。
1 | //将任务放入ExecutionList中 |
execute方法是执行任务链表中的任务,由于先加入的任务会依次排列在链表的末尾,所以需要将链表翻转。
然后从链表头开始依次取出任务执行并放入枚举线程池中执行。
1 | //执行监听链表中的任务 |
在上文中,可以发现每当对一个ListenerFuture增加回调时,都会创建一个线程,而这个线程的run方法中
会获取一个value值,这个value值就是通过下面的getUninterruptibly方法获取到的,我们可以发现在
方法中调用了while进行阻塞,一直等到future获取到结果,即发送的http请求获取到数据后才会终止并返回。
可以看出,回调机制将获取结果中的阻塞分散开来,即使现在有100个线程在并发地发送http请求,那么也只是
创建了100个独立的线程并行阻塞,那么运行的总时间则会是这100个线程中最长的时间,而不是100个线程的
时间相加,这样就实现了异步非阻塞机制。
1 | //用while阻塞直到获取到结果 |
这里实质上执行了线程的run方法,并进行阻塞。
1 | //执行监听器,调用线程池的execute方法,这里线程池并没有提供额外的功能,只提供了执行架子,实际上执行的是监听任务runnable的run方法 |
使用JdkFutureAdaoter适配Spring中的ListenableFuture达到异步调用的结果。在future.get方法中到底阻塞在什么地方呢?
通过调试发现最后调用的是BasicFuture中的阻塞方法。详情见下方源码和中文注释,这里不累赘。
1 | FutureAdapter: |
1 | BasicFuture: |
1 | FutureAdapter: |
1 | //这个方法判断 |
总结
本文主要剖析了Futures.callback方法的源码,我们只需要一个ListenableFuture的实例,就可以使用
该方法来实现回调机制。假设在我们的主线程中,有n个子方法要发送http请求,这时,我们可以创建n个
ListenableFuture,对这n个ListenableFuture增加监听,这n个请求就是异步且非阻塞的,这样不但
主线程不会阻塞,而且会大大减少总的响应时间。那Futures.callback是如何实现并发的呢?通过源码,
我们发现,对于每一个ListenableFuture,都会创建一个独立的线程对其进行监听,也就是这n个ListenableFuture
对应着n个独立的线程,而在每一个独立的线程中会各自调用Future.get方法阻塞。