Future和Promise
1.Future
1.1 功能
1.1.1 JDK中的Future
用来获取异步操作的结果.
// 取消异步操作
boolean cancel(boolean mayInterruptIfRunning);
// 异步操作是否取消
boolean isCancelled();
// 异步操作是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操作结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
1.1.2 ChannelFuture
在netty中,因为操作的都是Channel,因此命名为ChannelFuture
- 两种状态
uncompletedcompleted
- 以及三种可能
- 成功
- 失败
- 取消
- 提供了API用来添加事件监听器,并建议通过监听器的方式获取结果并进行相关的操作
+---------------------------+
| Completed successfully |
+---------------------------+
+----> isDone() = true |
+--------------------------+ | | isSuccess() = true |
| Uncompleted | | +===========================+
+--------------------------+ | | Completed with failure |
| isDone() = false | | +---------------------------+
| isSuccess() = false |----+----> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +===========================+
+--------------------------+ | | Completed by cancellation |
| +---------------------------+
+----> isDone() = true |
| isCancelled() = true |
+---------------------------+
1.2 源码分析
1.2.1 类继承关系图

1.2.2 源码
public interface ChannelFuture extends Future<Void> {
/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
}
/**
* Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener
* is added by calling {@link Future#addListener(GenericFutureListener)}.
*/
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
2.Promise
2.1 功能
是可写的future,future本身没有写操作的相关接口,通过promise可以对future扩展,用于设置IO操作的结果
2.2 源码分析
2.2.1 继承图

2.2.2 Promise源码
/**
* Special {@link Future} which is writable.
*/
public interface Promise<V> extends Future<V> {
/**
* Marks this future as a success and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise<V> setSuccess(V result);
/**
* Marks this future as a success and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean trySuccess(V result);
/**
* Marks this future as a failure and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise<V> setFailure(Throwable cause);
/**
* Marks this future as a failure and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a failure. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean tryFailure(Throwable cause);
/**
* Make this future impossible to cancel.
*
* @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
* without being cancelled. {@code false} if this future has been cancelled already.
*/
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
2.2.2 DefaultPromise源码
1.成员变量
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
// 可以嵌套的Listener的最大层数,可见最大值为8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
@SuppressWarnings("rawtypes")
// result字段由使用RESULT_UPDATER更新
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
// 异步操作失败时保存异常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
// 异步操作结果
private volatile Object result;
private final EventExecutor executor;
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
* 执行listener操作的执行器
*/
private Object listeners;
/**
* Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
* 阻塞等待该结果的线程数
*/
private short waiters;
/**
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
* 通知正在进行标识
*/
private boolean notifyingListeners;
}
addListener方法
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { @Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { // 保证多线程情况下只有一个线程执行添加操作 addListener0(listener); } if (isDone()) { // 异步操作已经完成通知监听者 notifyListeners(); } return this; } private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { // 只有一个 listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 大于两个 ((DefaultFutureListeners) listeners).add(listener); } else { // 从一个扩展为两个 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } }setSuccess方法
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
// 可以设置结果说明异步操作已完成,故通知监听者
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 为空设置为Signal对象Success
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 只有结果为null或者UNCANCELLABLE时才可设置且只可以设置一次
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 通知等待的线程
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
// 确实有等待的线程才notifyAll
if (waiters > 0) {
// JDK方法
notifyAll();
}
}
private void notifyListeners() {
EventExecutor executor = executor();
// 执行线程为指定线程
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// 嵌套层数
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 执行前增加嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 执行完毕,无论如何都要回滚嵌套层数
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
// 此时外部线程可能会执行添加Listener操作,所以需要同步
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
// 正在通知或已没有监听者(外部线程删除)直接返回
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// 通知多个(遍历集合调用单个)
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 通知单个
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
// 执行完毕且外部线程没有再添加监听者
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
// 外部线程添加了监听者继续执行
listeners = this.listeners;
this.listeners = null;
}
}
}
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
}
- await方法
不能在同一个线程中调用await()相关的方法
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
@Override
public Promise<V> await() throws InterruptedException {
// 异步操作已经完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 同步使修改waiters的线程只有一个
synchronized (this) {
// 等待直到异步操作完成
while (!isDone()) {
incWaiters(); // ++waiters;
try {
// JDK方法
wait();
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (isDone()) {
return true;
}
if (timeoutNanos <= 0) {
return isDone();
}
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
long startTime = System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
for (;;) {
synchronized (this) {
if (isDone()) {
return true;
}
incWaiters();
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
} finally {
decWaiters();
}
}
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}