ListenableFuture定义
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
guava的ListenableFuture 和jdk8的CompletableFuture 都增加了监听器来监听计算的结果,后面再讲CompletableFuture,今天重点在ListenableFuture。
ListenableFuture继承了Future,在此基础上增加了下面方法
void addListener(Runnable listener, Executor executor);
是为了在运算(多线程执行)完成的时候进行调用
特性
提供静态工具类Futures,丰富对Future的操作
作用
例子
public class ListenableFutureLearn {
private static final int processors = Runtime.getRuntime().availableProcessors();
private static final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ListenableFutureAdapter-thread-%d")
.build();
private static final ExecutorService defaultAdapterExecutor =
Executors.newFixedThreadPool(processors,threadFactory);
public static void main(String[] args) {
String string = "test";
Future<String> future = defaultAdapterExecutor.submit(new Task(string) {
});
ListenableFuture<String> listenInPoolThread = JdkFutureAdapters.listenInPoolThread(future);
Futures.addCallback(listenInPoolThread, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.printf("success", result);
}
@Override
public void onFailure(Throwable t) {
System.err.printf("failure");
}
});
}
private static class Task implements Callable<String> {
private final String data;
public Task(String data) {
this.data = data;
}
@Override
public String call() throws Exception {
try {
return "result_Success" + data;
} catch (Exception e) {
return "result_Failure" + data;
}
}
}
}
源码分析
AbstractFuture 实现了ListenableFuture接口,实现了addListener()方法
public void addListener(Runnable listener, Executor executor) {
// 校验
checkNotNull(listener, "Runnable was null.");
checkNotNull(executor, "Executor was null.");
Listener oldHead = listeners;
if (oldHead != Listener.TOMBSTONE) {
Listener newNode = new Listener(listener, executor);
do {
newNode.next = oldHead;
if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
return;
}
oldHead = listeners; // re-read
} while (oldHead != Listener.TOMBSTONE);
}
// If we get here then the Listener TOMBSTONE was set, which means the future is done, call
// the listener.
executeListener(listener, executor);
}