Netty优雅关闭
1.接口定义
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/**
* Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
*
* @return the {@link #terminationFuture()}
*/
Future<?> shutdownGracefully();
/**
* Signals this executor that the caller wants the executor to be shut down. Once this method is called,
* {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
* Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
* (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period,
* it is guaranteed to be accepted and the quiet period will start over.
*
* @param quietPeriod the quiet period as described in the documentation
* @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
* regardless if a task was submitted during the quiet period
* @param unit the unit of {@code quietPeriod} and {@code timeout}
*
* @return the {@link #terminationFuture()}
*/
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
}
调用者希望执行器进行关闭的信号.一旦这个方法被调用了,isShuttingDown()方法将开始都会返回true,同时执行器准备关闭它自己.不像shutdown()方法,优雅关闭会确保在它关闭它自己之前没有任务在'the quiet period'(平静期,即,gracefulShutdownQuietPeriod属性)内提交.如果一个任务在平静期内提交了,它会保证任务被接受并且重新开始平静期.
2.源码解析
2.1 MultithreadEventExecutorGroup
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
/**
* EventLoop 优雅关闭
*/
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
/**
* 默认关闭超时,单位:秒
*/
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
}
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
/**
* EventExecutor 数组
*/
private final EventExecutor[] children;
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
// 遍历EventExecutor[]数组,取出EventExecutor执行shutdownGracefully操作
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
}
2.2 SingleThreadEventExecutor中对线程的关闭方法
- 成员变量
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private static final int ST_NOT_STARTED = 1; // 未开始
private static final int ST_STARTED = 2; // 已开始
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
private static final int ST_SHUTDOWN = 4; // 已关闭isShutdown
private static final int ST_TERMINATED = 5; // 已经终止
**
* {@link #state} 字段的原子更新器
*/
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
/**
* 状态
*/
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
/**
* 优雅关闭
*/
private volatile long gracefulShutdownQuietPeriod;
/**
* 优雅关闭超时时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
*/
private volatile long gracefulShutdownTimeout;
/**
* 优雅关闭开始时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
*/
private long gracefulShutdownStartTime;
/**
* EventLoop 优雅关闭
*/
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}
关闭流程检验判断
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { // 确保quietPeriod、unit的为有效值 if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } // 如果该NioEventLoop已经执行过关闭操作了,可能是shutdownGracefully()这样的优雅关闭,也有可能是shutdown() or shutdownNow(),当然后两种方法已经不建议使用了(Deprecated).那么直接返回该异步操作的Future对象 if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; //使用自旋锁(自旋 + CAS)的方式修改当前NioEventLoop所关联的线程的状态(volatile修饰的成员变量state) for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: // 一个线程已修改好线程状态,此时这个线程才执行16行代码 newState = oldState; // 已经有线程唤醒,所以不用再唤醒 wakeup = false; } } // 保证只有一个线程将oldState修改为newState if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } // 隐含STATE_UPDATER已被修改,则在下一次循环返回 } // 在default情况下会更新这两个值 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_TERMINATED); terminationFuture.tryFailure(cause); if (!(cause instanceof Exception)) { // Also rethrow as it may be an OOME for example PlatformDependent.throwException(cause); } return terminationFuture; } } if (wakeup) { // 唤醒阻塞在阻塞点上的线程,使其从阻塞状态退出 wakeup(inEventLoop); } return terminationFuture(); } @Override public boolean isShuttingDown() { return state >= ST_SHUTTING_DOWN; } @Override public boolean isShutdown() { return state >= ST_SHUTDOWN; } @Override public boolean isTerminated() { return state == ST_TERMINATED; }单线程启动
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 记录当前线程 thread = Thread.currentThread(); // 如果当前线程已经被标记打断,则进行打断操作. if (interrupted) { thread.interrupt(); } // 是否执行成功 boolean success = false; // 更新最后执行时间 updateLastExecutionTime(); try { // 执行任务.模板方法,EventLoop实现 SingleThreadEventExecutor.this.run(); // 标记执行成功 success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; // 用户调用了关闭的方法或者抛出异常 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { // 抛出异常也将状态置为ST_SHUTTING_DOWN break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { // time=0,说明confirmShutdown()方法没有调用,记录日志 if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. // 抛出异常时,将普通任务和shutdownHook任务执行完毕 // 正常关闭时,结合前述的循环跳出条件 for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); // 清理,释放资源 } finally { // 线程状态设置为ST_TERMINATED,线程终止 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); // threadLock是一个初始化资源为0的信号量,此操作会使得信号量的资源+1. // 那么这种情况下,如果有用户操作了awaitTermination方法的话(该方法底层会通过threadLock.tryAcquire(timeout, unit)来阻塞的尝试获取信号量的资源), // 该方法就会结束阻塞并返回,当然它也可以因为设置的等待超时间已到而返回 threadLock.release(); if (!taskQueue.isEmpty()) { // 关闭时,任务队列中添加了任务,记录日志 if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } } // 异步结果设置为成功 terminationFuture.setSuccess(null); } } } } }); } }单线程关闭
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { /** * Confirm that the shutdown if the instance should be done now! */ protected boolean confirmShutdown() { // 没有调用shutdown相关的方法直接返回 if (!isShuttingDown()) { return false; } // 必须是原生线程 if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } // 取消调度任务 cancelScheduledTasks(); // 优雅关闭开始时间,这也是一个标记 if (gracefulShutdownStartTime == 0) { gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } // 执行完普通任务或者没有普通任务时执行完shutdownHook任务 if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { // 调用shutdown()方法直接退出 // Executor shut down - no new tasks anymore. return true; } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or // terminate if the quiet period is 0. // See https://github.com/netty/netty/issues/4241 if (gracefulShutdownQuietPeriod == 0) { // 优雅关闭静默时间为0也直接退出 return true; } // 优雅关闭但有未执行任务,唤醒线程执行 wakeup(true); return false; } final long nanoTime = ScheduledFutureTask.nanoTime(); // shutdown()方法调用直接返回,优雅关闭截止时间到也返回 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } // 在静默期间每100ms唤醒线程执行期间提交的任务 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. wakeup(true); try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } return false; } // 静默时间内没有任务提交,可以优雅关闭,此时若用户又提交任务则不会被执行 // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true; } protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; // 是否执行过任务 do { // 从定时任务获得到时间的任务 fetchedAll = fetchFromScheduledTaskQueue(); // 执行任务队列中的所有任务 if (runAllTasksFrom(taskQueue)) { // 若有任务执行,则标记为 true ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. // 如果执行过任务,则设置最后执行时间 if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } // 执行所有任务完成的后续方法 afterRunningAllTasks(); return ranAtLeastOne; } private boolean runShutdownHooks() { boolean ran = false; // Note shutdown hooks can add / remove shutdown hooks. while (!shutdownHooks.isEmpty()) { // 使用copy是因为shutdwonHook任务中可以添加或删除shutdwonHook任务 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks); shutdownHooks.clear(); for (Runnable task: copy) { try { task.run(); } catch (Throwable t) { logger.warn("Shutdown hook raised an exception.", t); } finally { ran = true; } } } if (ran) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } return ran; } @Override protected void cleanup() { try { selector.close(); } catch (IOException e) { logger.warn("Failed to close a selector.", e); } } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException("unit"); } if (inEventLoop()) { throw new IllegalStateException("cannot await termination of the current thread"); } // 由于tryAcquire()永远不会成功,所以必定阻塞timeout时间 if (threadLock.tryAcquire(timeout, unit)) { threadLock.release(); } return isTerminated(); } }
2.3 其它
知识点
- 自旋锁(Spin lock):
- 由它自己去占有CPU运行的时间,然后去尝试进行更新,直到更新成功完成.也因为它是占用CPU资源的方式,所以自旋锁实现的操作是非常简短的,不然其他线程可能会一直在自旋等待该自旋锁.也正式因为自旋锁是不会释放CPU的,也就是线程无需被挂起,这样就没有线程上下文切换的问题了
- 因此,自旋锁一般用于在多核处理器中预计线程持有锁的时间很短(即锁操作所需的时间非常的短)情况,甚至时间短于两次线程上下文的切换的开销
volatile的可见性- volatile除了保证单个变量的读/写具有原子性外,还有有一个很重要的特性就是对线程内存可见性的保证(即,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入)
- 因为此处修改state字段(本文是Netty服务端主线程)的线程和使用该字段的线程(NioEventLoop所关联线程)不是同一个线程.因此通过volatile来修饰state字段来实现,通过主线程修改了EventLoop所关联的线程状态后,在NioEventLoop的事件循环中能立即正确感知其线程状态的变化,从而做出相应的操作
- 自旋锁(Spin lock):
为什么我们在执行关闭操作的时候,还需要特意去启动那些未启动的NioEventLoop线程了?
- 在基于NIO的网络传输模式中,会在构建
NioEventLoopGroup的时候就预先将一定数量的NioEventLoop给创建好(默认为操作系统可运行处理器数的2倍),而NioEventLoop在初始化的时候就会将其上的Selector给开启了.同时Selector的关闭是在doStartThread()方法中最后会去完成的事.
- 在基于NIO的网络传输模式中,会在构建
循环退出条件
- 调用shutdown()方法从循环跳出的条件有:
- 执行完普通任务
- 没有普通任务,执行完shutdownHook任务
- 既没有普通任务也没有shutdownHook任务
- 调用shutdownGracefully()方法从循环跳出的条件有:
- 执行完普通任务且静默时间为0
- 没有普通任务,执行完shutdownHook任务且静默时间为0
- 静默期间没有任务提交
- 优雅关闭截止时间已到
- 调用shutdown()方法从循环跳出的条件有: