Netty优雅关闭

1.接口定义

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    /**
     * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
     *
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully();

    /**
     * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
     * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
     * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
     * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
     * it is guaranteed to be accepted and the quiet period will start over.
     *
     * @param quietPeriod the quiet period as described in the documentation
     * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
     *                    regardless if a task was submitted during the quiet period
     * @param unit        the unit of {@code quietPeriod} and {@code timeout}
     *
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
}

调用者希望执行器进行关闭的信号.一旦这个方法被调用了,isShuttingDown()方法将开始都会返回true,同时执行器准备关闭它自己.不像shutdown()方法,优雅关闭会确保在它关闭它自己之前没有任务在'the quiet period'(平静期,即,gracefulShutdownQuietPeriod属性)内提交.如果一个任务在平静期内提交了,它会保证任务被接受并且重新开始平静期.

2.源码解析

2.1 MultithreadEventExecutorGroup

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {

    /**
     * EventLoop 优雅关闭
     */
    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    /**
     * 默认关闭超时,单位:秒
     */
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
}

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
}

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    /**
     * EventExecutor 数组
     */
    private final EventExecutor[] children;

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        // 遍历EventExecutor[]数组,取出EventExecutor执行shutdownGracefully操作
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }
}

2.2 SingleThreadEventExecutor中对线程的关闭方法

  1. 成员变量
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    private static final int ST_NOT_STARTED = 1; // 未开始
    private static final int ST_STARTED = 2; // 已开始
    private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
    private static final int ST_SHUTDOWN = 4; // 已关闭isShutdown
    private static final int ST_TERMINATED = 5; // 已经终止

    **
     * {@link #state} 字段的原子更新器
     */
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    /**
     * 状态
     */
    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    /**
     * 优雅关闭
     */
    private volatile long gracefulShutdownQuietPeriod;
    /**
     * 优雅关闭超时时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
     */
    private volatile long gracefulShutdownTimeout;
    /**
     * 优雅关闭开始时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
     */
    private long gracefulShutdownStartTime;

    /**
     * EventLoop 优雅关闭
     */
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}
  1. 关闭流程检验判断

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
     @Override
     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
         // 确保quietPeriod、unit的为有效值
         if (quietPeriod < 0) {
             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
         }
         if (timeout < quietPeriod) {
             throw new IllegalArgumentException(
                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
         }
         if (unit == null) {
             throw new NullPointerException("unit");
         }
    
         // 如果该NioEventLoop已经执行过关闭操作了,可能是shutdownGracefully()这样的优雅关闭,也有可能是shutdown() or shutdownNow(),当然后两种方法已经不建议使用了(Deprecated).那么直接返回该异步操作的Future对象
         if (isShuttingDown()) {
             return terminationFuture();
         }
    
         boolean inEventLoop = inEventLoop();
         boolean wakeup;
         int oldState;
    
         //使用自旋锁(自旋 + CAS)的方式修改当前NioEventLoop所关联的线程的状态(volatile修饰的成员变量state)
         for (;;) {
             if (isShuttingDown()) {
                 return terminationFuture();
             }
             int newState;
             wakeup = true;
             oldState = state;
             if (inEventLoop) {
                 newState = ST_SHUTTING_DOWN;
             } else {
                 switch (oldState) {
                     case ST_NOT_STARTED:
                     case ST_STARTED:
                         newState = ST_SHUTTING_DOWN;
                         break;
                     default:
                         // 一个线程已修改好线程状态,此时这个线程才执行16行代码
                         newState = oldState;
                         // 已经有线程唤醒,所以不用再唤醒
                         wakeup = false;
                 }
             }
    
             // 保证只有一个线程将oldState修改为newState
             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                 break;
             }
    
             // 隐含STATE_UPDATER已被修改,则在下一次循环返回
         }
    
         // 在default情况下会更新这两个值
         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
         gracefulShutdownTimeout = unit.toNanos(timeout);
    
         if (oldState == ST_NOT_STARTED) {
             try {
                 doStartThread();
             } catch (Throwable cause) {
                 STATE_UPDATER.set(this, ST_TERMINATED);
                 terminationFuture.tryFailure(cause);
    
                 if (!(cause instanceof Exception)) {
                     // Also rethrow as it may be an OOME for example
                     PlatformDependent.throwException(cause);
                 }
                 return terminationFuture;
             }
         }
    
         if (wakeup) {
             // 唤醒阻塞在阻塞点上的线程,使其从阻塞状态退出
             wakeup(inEventLoop);
         }
    
         return terminationFuture();
     }
    
     @Override
     public boolean isShuttingDown() {
         return state >= ST_SHUTTING_DOWN;
     }
    
     @Override
     public boolean isShutdown() {
         return state >= ST_SHUTDOWN;
     }
    
     @Override
     public boolean isTerminated() {
         return state == ST_TERMINATED;
     }
    
  2. 单线程启动

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     private void doStartThread() {
         assert thread == null;
         executor.execute(new Runnable() {
    
             @Override
             public void run() {
                 // 记录当前线程
                 thread = Thread.currentThread();
    
                 // 如果当前线程已经被标记打断,则进行打断操作.
                 if (interrupted) {
                     thread.interrupt();
                 }
    
                 // 是否执行成功
                 boolean success = false;
    
                 // 更新最后执行时间
                 updateLastExecutionTime();
                 try {
                     // 执行任务.模板方法,EventLoop实现
                     SingleThreadEventExecutor.this.run();
                     // 标记执行成功
                     success = true;
                 } catch (Throwable t) {
                     logger.warn("Unexpected exception from an event executor: ", t);
                 } finally {
                     for (;;) {
                         int oldState = state;
                         // 用户调用了关闭的方法或者抛出异常
                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                             // 抛出异常也将状态置为ST_SHUTTING_DOWN
                             break;
                         }
                     }
    
                     // Check if confirmShutdown() was called at the end of the loop.
                     if (success && gracefulShutdownStartTime == 0) {
                         // time=0,说明confirmShutdown()方法没有调用,记录日志
                         if (logger.isErrorEnabled()) {
                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                     "be called before run() implementation terminates.");
                         }
                     }
    
                     try {
                         // Run all remaining tasks and shutdown hooks.
                         // 抛出异常时,将普通任务和shutdownHook任务执行完毕
                         // 正常关闭时,结合前述的循环跳出条件
                         for (;;) {
                             if (confirmShutdown()) {
                                 break;
                             }
                         }
                     } finally {
                         try {
                             cleanup(); // 清理,释放资源
                         } finally {
                             // 线程状态设置为ST_TERMINATED,线程终止
                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                             // threadLock是一个初始化资源为0的信号量,此操作会使得信号量的资源+1.
                             // 那么这种情况下,如果有用户操作了awaitTermination方法的话(该方法底层会通过threadLock.tryAcquire(timeout, unit)来阻塞的尝试获取信号量的资源),
                             // 该方法就会结束阻塞并返回,当然它也可以因为设置的等待超时间已到而返回
                             threadLock.release();
                             if (!taskQueue.isEmpty()) {
                                 //  关闭时,任务队列中添加了任务,记录日志
                                 if (logger.isWarnEnabled()) {
                                     logger.warn("An event executor terminated with " +
                                             "non-empty task queue (" + taskQueue.size() + ')');
                                 }
                             }
    
                             // 异步结果设置为成功
                             terminationFuture.setSuccess(null);
                         }
                     }
                 }
             }
         });
     }
    }
    
  3. 单线程关闭

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
     /**
      * Confirm that the shutdown if the instance should be done now!
      */
     protected boolean confirmShutdown() {
         // 没有调用shutdown相关的方法直接返回
         if (!isShuttingDown()) {
             return false;
         }
    
         // 必须是原生线程
         if (!inEventLoop()) {
             throw new IllegalStateException("must be invoked from an event loop");
         }
    
         // 取消调度任务
         cancelScheduledTasks();
    
         // 优雅关闭开始时间,这也是一个标记
         if (gracefulShutdownStartTime == 0) {
             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
         }
    
         // 执行完普通任务或者没有普通任务时执行完shutdownHook任务
         if (runAllTasks() || runShutdownHooks()) {
             if (isShutdown()) {
                 // 调用shutdown()方法直接退出
                 // Executor shut down - no new tasks anymore.
                 return true;
             }
    
             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
             // terminate if the quiet period is 0.
             // See https://github.com/netty/netty/issues/4241
             if (gracefulShutdownQuietPeriod == 0) {
                 // 优雅关闭静默时间为0也直接退出
                 return true;
             }
    
             // 优雅关闭但有未执行任务,唤醒线程执行
             wakeup(true);
             return false;
         }
    
         final long nanoTime = ScheduledFutureTask.nanoTime();
         // shutdown()方法调用直接返回,优雅关闭截止时间到也返回
         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
             return true;
         }
    
         // 在静默期间每100ms唤醒线程执行期间提交的任务
         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
             // Check if any tasks were added to the queue every 100ms.
             // TODO: Change the behavior of takeTask() so that it returns on timeout.
             wakeup(true);
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
                 // Ignore
             }
    
             return false;
         }
    
         // 静默时间内没有任务提交,可以优雅关闭,此时若用户又提交任务则不会被执行
         // No tasks were added for last quiet period - hopefully safe to shut down.
         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
         return true;
     }
    
     protected boolean runAllTasks() {
         assert inEventLoop();
         boolean fetchedAll;
         boolean ranAtLeastOne = false; // 是否执行过任务
    
         do {
             // 从定时任务获得到时间的任务
             fetchedAll = fetchFromScheduledTaskQueue();
             // 执行任务队列中的所有任务
             if (runAllTasksFrom(taskQueue)) {
                 // 若有任务执行,则标记为 true
                 ranAtLeastOne = true;
             }
         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
         // 如果执行过任务,则设置最后执行时间
         if (ranAtLeastOne) {
             lastExecutionTime = ScheduledFutureTask.nanoTime();
         }
    
         // 执行所有任务完成的后续方法
         afterRunningAllTasks();
         return ranAtLeastOne;
     }
    
     private boolean runShutdownHooks() {
         boolean ran = false;
         // Note shutdown hooks can add / remove shutdown hooks.
         while (!shutdownHooks.isEmpty()) {
             // 使用copy是因为shutdwonHook任务中可以添加或删除shutdwonHook任务
             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
             shutdownHooks.clear();
             for (Runnable task: copy) {
                 try {
                     task.run();
                 } catch (Throwable t) {
                     logger.warn("Shutdown hook raised an exception.", t);
                 } finally {
                     ran = true;
                 }
             }
         }
    
         if (ran) {
             lastExecutionTime = ScheduledFutureTask.nanoTime();
         }
    
         return ran;
     }
    
     @Override
     protected void cleanup() {
         try {
             selector.close();
         } catch (IOException e) {
             logger.warn("Failed to close a selector.", e);
         }
     }
    
     @Override
     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
         if (unit == null) {
             throw new NullPointerException("unit");
         }
    
         if (inEventLoop()) {
             throw new IllegalStateException("cannot await termination of the current thread");
         }
    
         // 由于tryAcquire()永远不会成功,所以必定阻塞timeout时间
         if (threadLock.tryAcquire(timeout, unit)) {
             threadLock.release();
         }
    
         return isTerminated();
     }
    }
    

2.3 其它

  1. 知识点

    • 自旋锁(Spin lock):
      • 由它自己去占有CPU运行的时间,然后去尝试进行更新,直到更新成功完成.也因为它是占用CPU资源的方式,所以自旋锁实现的操作是非常简短的,不然其他线程可能会一直在自旋等待该自旋锁.也正式因为自旋锁是不会释放CPU的,也就是线程无需被挂起,这样就没有线程上下文切换的问题了
      • 因此,自旋锁一般用于在多核处理器中预计线程持有锁的时间很短(即锁操作所需的时间非常的短)情况,甚至时间短于两次线程上下文的切换的开销
    • volatile的可见性
      • volatile除了保证单个变量的读/写具有原子性外,还有有一个很重要的特性就是对线程内存可见性的保证(即,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入)
      • 因为此处修改state字段(本文是Netty服务端主线程)的线程和使用该字段的线程(NioEventLoop所关联线程)不是同一个线程.因此通过volatile来修饰state字段来实现,通过主线程修改了EventLoop所关联的线程状态后,在NioEventLoop的事件循环中能立即正确感知其线程状态的变化,从而做出相应的操作
  2. 为什么我们在执行关闭操作的时候,还需要特意去启动那些未启动的NioEventLoop线程了?

    • 在基于NIO的网络传输模式中,会在构建NioEventLoopGroup的时候就预先将一定数量的NioEventLoop给创建好(默认为操作系统可运行处理器数的2倍),而NioEventLoop在初始化的时候就会将其上的Selector给开启了.同时Selector的关闭是在doStartThread()方法中最后会去完成的事.
  3. 循环退出条件

    • 调用shutdown()方法从循环跳出的条件有:
      • 执行完普通任务
      • 没有普通任务,执行完shutdownHook任务
      • 既没有普通任务也没有shutdownHook任务
    • 调用shutdownGracefully()方法从循环跳出的条件有:
      • 执行完普通任务且静默时间为0
      • 没有普通任务,执行完shutdownHook任务且静默时间为0
      • 静默期间没有任务提交
      • 优雅关闭截止时间已到

results matching ""

    No results matching ""