EventLoop和EventLoopGroup

1.Reactor和Proactor

1.1 Reactor

Reactor是一个同步的I/O多路复用模型

Reactor模型

  • 用户发起IO操作到事件分离器
  • 事件分离器调用相应的处理器处理事件
  • 事件处理完成,事件分离器获得控制权,继续相应处理

1.2 Proactor

Proactor是一个异步I/O的多路复用模型

Proactor模型

  • 用户发起IO操作到事件分离器
  • 事件分离器通知操作系统进行IO操作
  • 操作系统将数据存放到数据缓存区
  • 操作系统通知分发器IO完成
  • 分离器将事件分发至相应的事件处理器
  • 事件处理器直接读取数据缓存区内的数据进行处理

1.3 比较

  • Reactor模型简单,Proactor复杂
  • Reactor是同步处理方式,Proactor是异步处理方式
  • Proactor的IO事件依赖操作系统,操作系统须支持异步IO
  • 同步与异步是相对于服务端与IO事件来说的,Proactor通过操作系统异步来完成IO操作,当IO完成后通知事件分离器,而Reactor需要自己完成IO操作

2.NIO Reactor模型

2.1 Reactor模式思想

分而治之

一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步. Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行.

事件驱动

每个Task对应特定网络事件.当Task准备就绪时,Reactor收到对应的网络事件通知,并将Task分发给绑定了对应网络事件的Handler执行.

角色

  • Reactor:负责响应事件,将事件分发给绑定了该事件的Handler处理
  • Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理
  • Acceptor:Handler的一种,绑定了connect事件.当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理

2.2 Reactor单线程模型

指所有的I/O操作都在同一个NIO线程上面完成.

单线程Reactor

NIO线程的职责如下

  • 作为NIO服务端,接收客户端的TCP连接
  • 作为NIO客户端,向服务端发起TCP连接
  • 读取通信对端的请求或者应答消息
  • 向通信对端发送消息请求或者应答消息

  • 特点

    • 由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作
  • 高负载、大并发应用场景下的问题
    • 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送
      • 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈
      • 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

2.3 Reactor多线程模式

有一个专门的Acceptor线程,读写由另一个IO线程池负责

多线程Reactor

  • 特点
    • 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP连接请求
    • 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送
    • 一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题
  • 问题
    • 一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题

2.4 主从多线程模型

Acceptor也采用一个独立的线程池

主从Reactor

  • 特点
    • 服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池
    • Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作.Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作

3.netty的线程模型

3.1 线程模型

Netty线程模型

服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池

  • 用于接收客户端请求的线程池(bossGroup)作用
    • 接收客户端TCP连接,初始化Channel参数
    • 将链路状态变更事件通知给ChannelPipeline
  • 处理I/O线程池(workerGroup)作用
    • 异步读取通信对端的数据报,发送读事件到ChannelPipeline
    • 异步发送消息到通信对端,调用ChannelPipeline的消息发送接口
    • 执行系统调用Task
    • 执行定时任务Task. 例如:链路空闲状态监测定时任务

3.1 Reactor模型

Netty中的Reactor

Netty中的Reactor

  • Acceptor中的NioEventLoop用于接收TCP连接,初始化参数
  • I/O线程池中的NioEventLoop异步读取通信对端的数据报,发送读事件到channel
  • 异步发送消息到对端,调用channel的消息发送接口
  • 执行系统调用Task
  • 执行定时Task

为了尽可能的提升性能,netty在很多地方进行了无锁化的设计:netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead,只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换

3.3 最佳实践

  • 创建两个NioEventLoopGroup,隔离NIO Acceptor和NIO的IO线程
  • 尽量不要在ChannelHandler中启动用户线程(解码之后,将POJO消息派发到后端的业务线程池除外)
  • 解码要放在NIO线程调用的Handler中,不要切换到用户线程处
  • 如果IO操作非常简单,不涉及复杂的业务逻辑计算,没有可能导致阻塞的磁盘操作、数据库操作、网络操作等,可以再NIO线程调用的Handler中完成业务逻辑,不需要切换到用户线程
  • 如果IO业务操作比较复杂,就不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程处理,以保证NIO线程尽快的被释放,处理其他的IO操作
推荐的线程数量计算公式
    1. 线程数量 = (线程总时间/瓶颈资源时间) * 瓶颈资源的线程并行数
    2. QPS=1000/线程总时间*线程数

4.源码分析

关系继承图

4.1 职责划分

  • EventExecutorGroup
    • 继承自Java的周期任务接口,是一个事件处理器组的概念
    • 方法
      • isShuttingDown():是否正在关闭,或者是已经关闭
      • shutdownGracefully():优雅停机,等待所有执行中任务执行完成,并不再接收新的任务
      • terminationFuture():返回一个该线程池管理的所有线程都terminated的时候触发的future
      • shutdown():废弃了的关闭方法,shutdownGracefully取代
      • next():返回一个被该Group管理的EventExecutor
      • iterator():所有管理的EventExecutor的迭代器
      • submit():提交一个线程任务
      • schedule():周期执行一个任务
  • EventExecutor
    • 继承自EventExecutorGroup,复用接口定义的方法
    • 方法
      • next():就是找group中下一个子集
      • parent():就是所属group
      • inEventLoop():当前线程是否是在该子集中
  • EventLoopGroup
    • 继承自EventExecutorGroup.EventLoop的定位是处理一个连接的生命周期过程中的周期事件,group是多个EventLoop的集合了
    • 方法
      • next(): 获取下一个EventLoop
      • register(): 注册channel
  • EventLoop

    • 事件循环,其也是一个处理器,最终继承自EventExecutor和EventLoopGroup
    • 方法
      • parent(): 获取父事件循环组EventLoopGroup

  • AbstractEventExecutor

    • 基本作用就是与group在定义混乱上做了一个区分。提供了执行器与Future关联方法和一个基本的执行任务的方法
    • 方法
      • 将next设置成自己(上面说过继承的group,这个操作就和group区分开了)。
      • 优雅停机调用的是带有超时的停机方案,超时为15秒
      • 覆盖了Java提供的newTask包装成FutureTask的方法,使用了自己的PromiseTask
      • 提供安全执行方法:safeExecute,直接调用的run方法
  • AbstractScheduledEventExecutor
    • 对AbstractEventExecutor未处理的周期任务提供了具体的完成方法
    • 方法
      • 提供计算当前距服务启动的时间
      • 提供存储ScheduledFutureTask的优先队列
      • 提供了取消所有周期任务的方法
      • 提供了获取一个符合的周期任务的方法,要满足时间,并获取后移除
      • 提供了获取距最近一个周期任务的时间多久
      • 提供了移除一个周期任务的方法
      • 提供添加周期任务的方法
  • SingleThreadEventExecutor
    • 主要完成了一个单线程的EventExecutor的基本操作
    • 方法
      • 创建一个taskQueue
      • 中断线程
      • 从任务队列中获取一个任务,takeTask连同周期任务也会获取
      • 添加任务到任务队列
      • 移除任务队列中指定任务
      • 运行所有任务,会先将周期任务存入taskQueue,再使用safeExecute方法执行任务
      • 实现了execute方法,会添加任务到任务队列,如果当前线程不是事件循环线程,开启一个线程。通过的就是持有的executor来开启的线程任务。execute方法调用了run方法,该类没有实现run方法。任务的添加都不是通过execute直接执行了,而是走的添加任务到taskQueue,由未实现的run线程来处理这些事件。
      • 优雅停机
  • SingleThreadEventLoop
    • 主要是针对netty自身的事件循环的定义来实现方法
    • 方法
      • 注册channel,实际上是生成了一个DefaultChannelPromise对象,持有了channel,和运行该channel的EventExecutor,然后将该对象交给最底层的unsafe处理。
      • 添加一个事件周期结束后执行的尾任务tailTasks
      • 执行尾任务
      • 删除指定尾任务

4.2 NioEventLoop

4.2.1 概览

NioEventLoop的处理链

Chain

Selector的过程

Selector

NioEventLoop执行过程

Flow

4.2.2 源码

  1. 成员变量

    public final class NioEventLoop extends SingleThreadEventLoop {
    
     /**
      * 是否禁用 SelectionKey 的优化,默认开启
      */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    
      /**
      * Boolean that controls determines if a blocked Selector.select should
      * break out of its selection process. In our case we use a timeout for
      * the select method and the select method will block for that time unless
      * waken up.
      *
      * 唤醒标记.因为唤醒方法 {@link Selector#wakeup()} 开销比较大,通过该标识,减少调用.
      *
      * @see #wakeup(boolean)
      * @see #run()
      */
     private final AtomicBoolean wakenUp = new AtomicBoolean();
     /**
      * Select 策略
      *
      * @see #select(boolean)
      */
     private final SelectStrategy selectStrategy;
     /**
      * 处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例
      */
     private volatile int ioRatio = 50;
     /**
      * 取消 SelectionKey 的数量
      *
      * TODO 1007 NioEventLoop cancel
      */
     private int cancelledKeys;
     /**
      * 是否需要再次 select Selector 对象
      *
      * TODO 1007 NioEventLoop cancel
      */
     private boolean needsToSelectAgain;
    
     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
         super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
         if (selectorProvider == null) {
             throw new NullPointerException("selectorProvider");
         }
         if (strategy == null) {
             throw new NullPointerException("selectStrategy");
         }
         provider = selectorProvider;
         // 创建 Selector 对象
         final SelectorTuple selectorTuple = openSelector();
         selector = selectorTuple.selector;
         unwrappedSelector = selectorTuple.unwrappedSelector;
         selectStrategy = strategy;
     }
    }
    
  2. Selector的初始化 作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个多路复用器对象.

public final class NioEventLoop extends SingleThreadEventLoop {
    /**
     * The NIO {@link Selector}.
     *
     * 包装的 Selector 对象,经过优化
     *
     * {@link #openSelector()}
     */
    private Selector selector;
    /**
     * 未包装的 Selector 对象
     */
    private Selector unwrappedSelector;
    /**
     * 注册的 SelectionKey 集合.Netty 自己实现,经过优化.
     */
    private SelectedSelectionKeySet selectedKeys;
    /**
     * SelectorProvider 对象,用于创建 Selector 对象
     */
    private final SelectorProvider provider;

    /**
     * Selector 元组
     */
    private static final class SelectorTuple {

        /**
         * 未包装的 Selector 对象
         */
        final Selector unwrappedSelector;
        /**
         * 未包装的 Selector 对象
         */
        final Selector selector;

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }

    }

    private SelectorTuple openSelector() {
        // 创建 Selector 对象,作为 unwrappedSelector
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        // 禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        // 获得 SelectorImpl 类
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader()); // 成功,则返回该类
                } catch (Throwable cause) {
                    return cause; // 失败,则返回该异常
                }
            }
        });

        // 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        // 创建 SelectedSelectionKeySet 对象
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    // 获得 "selectedKeys" "publicSelectedKeys" 的 Field
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    // 设置 Field 可访问
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 的 Field 中
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e; // 失败,则返回该异常
                } catch (IllegalAccessException e) {
                    return e; // 失败,则返回该异常
                }
            }
        });

        // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中失败,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }

        // 设置 SelectedSelectionKeySet 对象到 selectedKeys 中
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);

        // 创建 SelectedSelectionKeySetSelector 对象
        // 创建 SelectorTuple 对象.即,selector 也使用 SelectedSelectionKeySetSelector 对象.
        return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
}
  1. run

    public final class NioEventLoop extends SingleThreadEventLoop {
    
     @Override
     protected void run() {
         for (;;) {
             try {
                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                     case SelectStrategy.CONTINUE: // 默认实现下,不存在这个情况.
                         continue;
                     case SelectStrategy.SELECT:
                         // 重置 wakenUp 标记为 false
                         // 选择( 查询 )任务
                         select(wakenUp.getAndSet(false));
    
                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
                         // before calling 'selector.wakeup()' to reduce the wake-up
                         // overhead. (Selector.wakeup() is an expensive operation.)
                         //
                         // However, there is a race condition in this approach.
                         // The race condition is triggered when 'wakenUp' is set to
                         // true too early.
                         //
                         // 'wakenUp' is set to true too early if:
                         // 1) Selector is waken up between 'wakenUp.set(false)' and
                         //    'selector.select(...)'. (BAD)
                         // 2) Selector is waken up between 'selector.select(...)' and
                         //    'if (wakenUp.get()) { ... }'. (OK)
                         //
                         // In the first case, 'wakenUp' is set to true and the
                         // following 'selector.select(...)' will wake up immediately.
                         // Until 'wakenUp' is set to false again in the next round,
                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                         // any attempt to wake up the Selector will fail, too, causing
                         // the following 'selector.select(...)' call to block
                         // unnecessarily.
                         //
                         // To fix this problem, we wake up the selector again if wakenUp
                         // is true immediately after selector.select(...).
                         // It is inefficient in that it wakes up the selector for both
                         // the first case (BAD - wake-up required) and the second case
                         // (OK - no wake-up required).
    
                         // 唤醒.原因,见上面中文注释
                         if (wakenUp.get()) {
                             selector.wakeup();
                         }
                         // fall through
                     default:
                 }
    
                 // TODO 1007 NioEventLoop cancel 方法
                 cancelledKeys = 0;
                 needsToSelectAgain = false;
    
                 final int ioRatio = this.ioRatio;
                 if (ioRatio == 100) {
                     try {
                         // 处理 Channel 感兴趣的就绪 IO 事件
                         processSelectedKeys();
                     } finally {
                         // 运行所有普通任务和定时任务,不限制时间
                         // Ensure we always run tasks.
                         runAllTasks();
                     }
                 } else {
                     final long ioStartTime = System.nanoTime();
                     try {
                         // 处理 Channel 感兴趣的就绪 IO 事件
                         processSelectedKeys();
                     } finally {
                         // 运行所有普通任务和定时任务,限制时间
                         // Ensure we always run tasks.
                         final long ioTime = System.nanoTime() - ioStartTime;
                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                     }
                 }
             } catch (Throwable t) {
                 handleLoopException(t);
             }
             // TODO 1006 EventLoop 优雅关闭
             // Always handle shutdown even if the loop processing threw an exception.
             try {
                 if (isShuttingDown()) {
                     closeAll();
                     if (confirmShutdown()) {
                         return;
                     }
                 }
             } catch (Throwable t) {
                 handleLoopException(t);
             }
         }
     }
    }
    
  2. 选择(查询)任务

public final class NioEventLoop extends SingleThreadEventLoop {
    private void select(boolean oldWakenUp) throws IOException {
        // 记录下 Selector 对象
        Selector selector = this.selector;
        try {
            // select 计数器
            int selectCnt = 0; // cnt 为 count 的缩写
            // 记录当前时间,单位:纳秒
            long currentTimeNanos = System.nanoTime();
            // 计算 select 截止时间,单位:纳秒.
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                // 计算本次 select 的超时时长,单位:毫秒.
                // + 500000L 是为了四舍五入
                // / 1000000L 是为了纳秒转为毫秒
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                // 如果超时时长,则结束 select
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) { // 如果是首次 select ,selectNow 一次,非阻塞
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // 若有新的任务加入
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    // selectNow 一次,非阻塞
                    selector.selectNow();
                    // 重置 select 计数器
                    selectCnt = 1;
                    break;
                }

                // 阻塞 select ,查询 Channel 是否有就绪的 IO 事件
                int selectedKeys = selector.select(timeoutMillis);
                // select 计数器 ++
                selectCnt ++;

                // 结束 select ,如果满足下面任一一个条件
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                // 线程被打断.一般情况下不会出现,出现基本是 bug ,或者错误使用.
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                // 记录当前时间
                long time = System.nanoTime();
                // 符合 select 超时条件,重置 selectCnt 为 1
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                // 不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);

                    // 重建 Selector 对象
                    rebuildSelector();
                    // 修改下 Selector 对象
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    // 立即 selectNow 一次,非阻塞
                    selector.selectNow();
                    // 重置 selectCnt 为 1
                    selectCnt = 1;
                    // 结束 select
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
            }
            // Harmless exception - log anyway
        }
    }
}
  1. 处理Channel感兴趣的就绪IO事件

    public final class NioEventLoop extends SingleThreadEventLoop {
     private void processSelectedKeys() {
         if (selectedKeys != null) {
             processSelectedKeysOptimized();
         } else {
             processSelectedKeysPlain(selector.selectedKeys());
         }
     }
    
     private void processSelectedKeysOptimized() {
         // 遍历数组
         for (int i = 0; i < selectedKeys.size; ++i) {
             final SelectionKey k = selectedKeys.keys[i];
             // null out entry in the array to allow to have it GC'ed once the Channel close
             // See https://github.com/netty/netty/issues/2363
             selectedKeys.keys[i] = null;
    
             final Object a = k.attachment();
    
             // 处理一个 Channel 就绪的 IO 事件
             if (a instanceof AbstractNioChannel) {
                 processSelectedKey(k, (AbstractNioChannel) a);
             // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
             } else {
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }
    
             // TODO 1007 NioEventLoop cancel 方法
             if (needsToSelectAgain) {
                 // null out entries in the array to allow to have it GC'ed once the Channel close
                 // See https://github.com/netty/netty/issues/2363
                 selectedKeys.reset(i + 1);
    
                 selectAgain();
                 i = -1;
             }
         }
     }
    
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
         // check if the set is empty and if so just return to not create garbage by
         // creating a new Iterator every time even if there is nothing to process.
         // See https://github.com/netty/netty/issues/597
         if (selectedKeys.isEmpty()) {
             return;
         }
    
         // 遍历 SelectionKey 迭代器
         Iterator<SelectionKey> i = selectedKeys.iterator();
         for (;;) {
             // 获得 SelectionKey 对象
             final SelectionKey k = i.next();
             // 从迭代器中移除
             i.remove();
    
             final Object a = k.attachment();
             // 处理一个 Channel 就绪的 IO 事件
             if (a instanceof AbstractNioChannel) {
                 processSelectedKey(k, (AbstractNioChannel) a);
             // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
             } else {
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }
    
             // 无下一个节点,结束
             if (!i.hasNext()) {
                 break;
             }
    
             // TODO 1007 NioEventLoop cancel 方法
             if (needsToSelectAgain) {
                 selectAgain();
                 selectedKeys = selector.selectedKeys();
    
                 // Create the iterator again to avoid ConcurrentModificationException
                 if (selectedKeys.isEmpty()) {
                     break;
                 } else {
                     i = selectedKeys.iterator();
                 }
             }
         }
     }
    }
    
  2. processSelectedKey

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 如果 SelectionKey 是不合法的,则关闭 Channel
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            // 获得就绪的 IO 事件的 ops
            int readyOps = k.readyOps();

            // OP_CONNECT 事件就绪
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 移除对 OP_CONNECT 感兴趣
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 完成连接
                unsafe.finishConnect();
            }

            // OP_WRITE 事件就绪
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                // 向 Channel 写入数据
                ch.unsafe().forceFlush();
            }

            // SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪
            // readyOps == 0 是对 JDK Bug 的处理,防止空的死循环
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            // 发生异常,关闭 Channel
            unsafe.close(unsafe.voidPromise());
        }
    }

    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
        int state = 0; // 未执行
        try {
            // 调用 NioTask 的 Channel 就绪事件
            task.channelReady(k.channel(), k);
            state = 1; // 执行成功
        } catch (Exception e) {
            // SelectionKey 取消
            k.cancel();
            // 执行 Channel 取消注册
            invokeChannelUnregistered(task, k, e);
            state = 2; // 执行异常
        } finally {
            switch (state) {
            case 0:
                // SelectionKey 取消
                k.cancel();
                // 执行 Channel 取消注册
                invokeChannelUnregistered(task, k, null);
                break;
            case 1:
                // SelectionKey 不合法,则执行 Channel 取消注册
                if (!k.isValid()) { // Cancelled by channelReady()
                    invokeChannelUnregistered(task, k, null);
                }
                break;
            }
        }
    }
}
  1. 运行所有普通任务和定时任务
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    // 不限制时间
    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false; // 是否执行过任务

        do {
            // 从定时任务获得到时间的任务
            fetchedAll = fetchFromScheduledTaskQueue();
            // 执行任务队列中的所有任务
            if (runAllTasksFrom(taskQueue)) {
                // 若有任务执行,则标记为 true
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        // 如果执行过任务,则设置最后执行时间
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }

        // 执行所有任务完成的后续方法
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

    // 限制时间
    protected boolean runAllTasks(long timeoutNanos) {
        // 从定时任务获得到时间的任务
        fetchFromScheduledTaskQueue();
        // 获得队头的任务
        Runnable task = pollTask();
        // 获取不到,结束执行
        if (task == null) {
            // 执行所有任务完成的后续方法
            afterRunningAllTasks();
            return false;
        }

        // 计算执行任务截止时间
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0; // 执行任务计数
        long lastExecutionTime;
        // 循环执行任务
        for (;;) {
            // 执行任务
            safeExecute(task);

            // 计数 +1
            runTasks ++;

            // 每隔 64 个任务检查一次时间,因为 nanoTime() 是相对费时的操作
            // 64 这个值当前是硬编码的,无法配置,可能会成为一个问题.
            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                // 重新获得时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                // 超过任务截止时间,结束
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            // 获得队头的任务
            task = pollTask();
            // 获取不到,结束执行
            if (task == null) {
                // 重新获得时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        // 执行所有任务完成的后续方法
        afterRunningAllTasks();

        // 设置最后执行时间
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
}
  1. 关闭

    public final class NioEventLoop extends SingleThreadEventLoop {
     private void closeAll() {
         selectAgain();
         Set<SelectionKey> keys = selector.keys();
         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
         for (SelectionKey k: keys) {
             Object a = k.attachment();
             if (a instanceof AbstractNioChannel) {
                 channels.add((AbstractNioChannel) a);
             } else {
                 k.cancel();
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 invokeChannelUnregistered(task, k, null);
             }
         }
    
         for (AbstractNioChannel ch: channels) {
             ch.unsafe().close(ch.unsafe().voidPromise());
         }
     }
    }
    

5.和NioChannel类的关系

Channel

6.处理事件

6.1 Server端NioEventLoop处理的事件

Server

6.2 Client端NioEventLoop处理的事件

Client

7.参考文档

netty学习系列二:NIO Reactor模型 & Netty线程模型

results matching ""

    No results matching ""