EventLoop和EventLoopGroup
1.Reactor和Proactor
1.1 Reactor
Reactor是一个同步的I/O多路复用模型

- 用户发起IO操作到事件分离器
- 事件分离器调用相应的处理器处理事件
- 事件处理完成,事件分离器获得控制权,继续相应处理
1.2 Proactor
Proactor是一个异步I/O的多路复用模型

- 用户发起IO操作到事件分离器
- 事件分离器通知操作系统进行IO操作
- 操作系统将数据存放到数据缓存区
- 操作系统通知分发器IO完成
- 分离器将事件分发至相应的事件处理器
- 事件处理器直接读取数据缓存区内的数据进行处理
1.3 比较
- Reactor模型简单,Proactor复杂
- Reactor是同步处理方式,Proactor是异步处理方式
- Proactor的IO事件依赖操作系统,操作系统须支持异步IO
- 同步与异步是相对于服务端与IO事件来说的,Proactor通过操作系统异步来完成IO操作,当IO完成后通知事件分离器,而Reactor需要自己完成IO操作
2.NIO Reactor模型
2.1 Reactor模式思想
分而治之
一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步. Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行.
事件驱动
每个Task对应特定网络事件.当Task准备就绪时,Reactor收到对应的网络事件通知,并将Task分发给绑定了对应网络事件的Handler执行.
角色
Reactor:负责响应事件,将事件分发给绑定了该事件的Handler处理Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理Acceptor:Handler的一种,绑定了connect事件.当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理
2.2 Reactor单线程模型
指所有的I/O操作都在同一个NIO线程上面完成.

NIO线程的职责如下
- 作为NIO服务端,接收客户端的TCP连接
- 作为NIO客户端,向服务端发起TCP连接
- 读取通信对端的请求或者应答消息
向通信对端发送消息请求或者应答消息
特点
- 由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作
- 高负载、大并发应用场景下的问题
- 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送
- 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈
- 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
- 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送
2.3 Reactor多线程模式
有一个专门的Acceptor线程,读写由另一个IO线程池负责

- 特点
- 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP连接请求
- 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送
- 一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题
- 问题
- 一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题
2.4 主从多线程模型
Acceptor也采用一个独立的线程池

- 特点
- 服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池
- Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作.Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作
3.netty的线程模型
3.1 线程模型

服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池
- 用于接收客户端请求的线程池(bossGroup)作用
- 接收客户端TCP连接,初始化Channel参数
- 将链路状态变更事件通知给ChannelPipeline
- 处理I/O线程池(workerGroup)作用
- 异步读取通信对端的数据报,发送读事件到ChannelPipeline
- 异步发送消息到通信对端,调用ChannelPipeline的消息发送接口
- 执行系统调用Task
- 执行定时任务Task. 例如:链路空闲状态监测定时任务
3.1 Reactor模型


- Acceptor中的NioEventLoop用于接收TCP连接,初始化参数
- I/O线程池中的NioEventLoop异步读取通信对端的数据报,发送读事件到channel
- 异步发送消息到对端,调用channel的消息发送接口
- 执行系统调用Task
- 执行定时Task
为了尽可能的提升性能,netty在很多地方进行了无锁化的设计:netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead,只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换
3.3 最佳实践
- 创建两个NioEventLoopGroup,隔离NIO Acceptor和NIO的IO线程
- 尽量不要在ChannelHandler中启动用户线程(解码之后,将POJO消息派发到后端的业务线程池除外)
- 解码要放在NIO线程调用的Handler中,不要切换到用户线程处
- 如果IO操作非常简单,不涉及复杂的业务逻辑计算,没有可能导致阻塞的磁盘操作、数据库操作、网络操作等,可以再NIO线程调用的Handler中完成业务逻辑,不需要切换到用户线程
- 如果IO业务操作比较复杂,就不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程处理,以保证NIO线程尽快的被释放,处理其他的IO操作
推荐的线程数量计算公式
1. 线程数量 = (线程总时间/瓶颈资源时间) * 瓶颈资源的线程并行数
2. QPS=1000/线程总时间*线程数
4.源码分析

4.1 职责划分
EventExecutorGroup- 继承自Java的周期任务接口,是一个事件处理器组的概念
- 方法
isShuttingDown():是否正在关闭,或者是已经关闭shutdownGracefully():优雅停机,等待所有执行中任务执行完成,并不再接收新的任务terminationFuture():返回一个该线程池管理的所有线程都terminated的时候触发的futureshutdown():废弃了的关闭方法,shutdownGracefully取代next():返回一个被该Group管理的EventExecutoriterator():所有管理的EventExecutor的迭代器submit():提交一个线程任务schedule():周期执行一个任务
EventExecutor- 继承自EventExecutorGroup,复用接口定义的方法
- 方法
next():就是找group中下一个子集parent():就是所属groupinEventLoop():当前线程是否是在该子集中
EventLoopGroup- 继承自EventExecutorGroup.EventLoop的定位是处理一个连接的生命周期过程中的周期事件,group是多个EventLoop的集合了
- 方法
next(): 获取下一个EventLoopregister(): 注册channel
EventLoop- 事件循环,其也是一个处理器,最终继承自EventExecutor和EventLoopGroup
- 方法
parent(): 获取父事件循环组EventLoopGroup
AbstractEventExecutor- 基本作用就是与group在定义混乱上做了一个区分。提供了执行器与Future关联方法和一个基本的执行任务的方法
- 方法
- 将next设置成自己(上面说过继承的group,这个操作就和group区分开了)。
- 优雅停机调用的是带有超时的停机方案,超时为15秒
- 覆盖了Java提供的newTask包装成FutureTask的方法,使用了自己的PromiseTask
- 提供安全执行方法:safeExecute,直接调用的run方法
AbstractScheduledEventExecutor- 对AbstractEventExecutor未处理的周期任务提供了具体的完成方法
- 方法
- 提供计算当前距服务启动的时间
- 提供存储ScheduledFutureTask的优先队列
- 提供了取消所有周期任务的方法
- 提供了获取一个符合的周期任务的方法,要满足时间,并获取后移除
- 提供了获取距最近一个周期任务的时间多久
- 提供了移除一个周期任务的方法
- 提供添加周期任务的方法
SingleThreadEventExecutor- 主要完成了一个单线程的EventExecutor的基本操作
- 方法
- 创建一个taskQueue
- 中断线程
- 从任务队列中获取一个任务,takeTask连同周期任务也会获取
- 添加任务到任务队列
- 移除任务队列中指定任务
- 运行所有任务,会先将周期任务存入taskQueue,再使用safeExecute方法执行任务
- 实现了execute方法,会添加任务到任务队列,如果当前线程不是事件循环线程,开启一个线程。通过的就是持有的executor来开启的线程任务。execute方法调用了run方法,该类没有实现run方法。任务的添加都不是通过execute直接执行了,而是走的添加任务到taskQueue,由未实现的run线程来处理这些事件。
- 优雅停机
SingleThreadEventLoop- 主要是针对netty自身的事件循环的定义来实现方法
- 方法
- 注册channel,实际上是生成了一个DefaultChannelPromise对象,持有了channel,和运行该channel的EventExecutor,然后将该对象交给最底层的unsafe处理。
- 添加一个事件周期结束后执行的尾任务tailTasks
- 执行尾任务
- 删除指定尾任务
4.2 NioEventLoop
4.2.1 概览
NioEventLoop的处理链

Selector的过程

NioEventLoop执行过程

4.2.2 源码
成员变量
public final class NioEventLoop extends SingleThreadEventLoop { /** * 是否禁用 SelectionKey 的优化,默认开启 */ private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); /** * Boolean that controls determines if a blocked Selector.select should * break out of its selection process. In our case we use a timeout for * the select method and the select method will block for that time unless * waken up. * * 唤醒标记.因为唤醒方法 {@link Selector#wakeup()} 开销比较大,通过该标识,减少调用. * * @see #wakeup(boolean) * @see #run() */ private final AtomicBoolean wakenUp = new AtomicBoolean(); /** * Select 策略 * * @see #select(boolean) */ private final SelectStrategy selectStrategy; /** * 处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例 */ private volatile int ioRatio = 50; /** * 取消 SelectionKey 的数量 * * TODO 1007 NioEventLoop cancel */ private int cancelledKeys; /** * 是否需要再次 select Selector 对象 * * TODO 1007 NioEventLoop cancel */ private boolean needsToSelectAgain; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; // 创建 Selector 对象 final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } }Selector的初始化 作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个多路复用器对象.
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* The NIO {@link Selector}.
*
* 包装的 Selector 对象,经过优化
*
* {@link #openSelector()}
*/
private Selector selector;
/**
* 未包装的 Selector 对象
*/
private Selector unwrappedSelector;
/**
* 注册的 SelectionKey 集合.Netty 自己实现,经过优化.
*/
private SelectedSelectionKeySet selectedKeys;
/**
* SelectorProvider 对象,用于创建 Selector 对象
*/
private final SelectorProvider provider;
/**
* Selector 元组
*/
private static final class SelectorTuple {
/**
* 未包装的 Selector 对象
*/
final Selector unwrappedSelector;
/**
* 未包装的 Selector 对象
*/
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
private SelectorTuple openSelector() {
// 创建 Selector 对象,作为 unwrappedSelector
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 获得 SelectorImpl 类
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader()); // 成功,则返回该类
} catch (Throwable cause) {
return cause; // 失败,则返回该异常
}
}
});
// 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 创建 SelectedSelectionKeySet 对象
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 获得 "selectedKeys" "publicSelectedKeys" 的 Field
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
// 设置 Field 可访问
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 的 Field 中
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e; // 失败,则返回该异常
} catch (IllegalAccessException e) {
return e; // 失败,则返回该异常
}
}
});
// 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中失败,则直接返回 SelectorTuple 对象.即,selector 也使用 unwrappedSelector .
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
// 设置 SelectedSelectionKeySet 对象到 selectedKeys 中
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 创建 SelectedSelectionKeySetSelector 对象
// 创建 SelectorTuple 对象.即,selector 也使用 SelectedSelectionKeySetSelector 对象.
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
}
run
public final class NioEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: // 默认实现下,不存在这个情况. continue; case SelectStrategy.SELECT: // 重置 wakenUp 标记为 false // 选择( 查询 )任务 select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). // 唤醒.原因,见上面中文注释 if (wakenUp.get()) { selector.wakeup(); } // fall through default: } // TODO 1007 NioEventLoop cancel 方法 cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 处理 Channel 感兴趣的就绪 IO 事件 processSelectedKeys(); } finally { // 运行所有普通任务和定时任务,不限制时间 // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { // 处理 Channel 感兴趣的就绪 IO 事件 processSelectedKeys(); } finally { // 运行所有普通任务和定时任务,限制时间 // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // TODO 1006 EventLoop 优雅关闭 // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } }选择(查询)任务
public final class NioEventLoop extends SingleThreadEventLoop {
private void select(boolean oldWakenUp) throws IOException {
// 记录下 Selector 对象
Selector selector = this.selector;
try {
// select 计数器
int selectCnt = 0; // cnt 为 count 的缩写
// 记录当前时间,单位:纳秒
long currentTimeNanos = System.nanoTime();
// 计算 select 截止时间,单位:纳秒.
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 计算本次 select 的超时时长,单位:毫秒.
// + 500000L 是为了四舍五入
// / 1000000L 是为了纳秒转为毫秒
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时时长,则结束 select
if (timeoutMillis <= 0) {
if (selectCnt == 0) { // 如果是首次 select ,selectNow 一次,非阻塞
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 若有新的任务加入
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// selectNow 一次,非阻塞
selector.selectNow();
// 重置 select 计数器
selectCnt = 1;
break;
}
// 阻塞 select ,查询 Channel 是否有就绪的 IO 事件
int selectedKeys = selector.select(timeoutMillis);
// select 计数器 ++
selectCnt ++;
// 结束 select ,如果满足下面任一一个条件
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
// 线程被打断.一般情况下不会出现,出现基本是 bug ,或者错误使用.
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 记录当前时间
long time = System.nanoTime();
// 符合 select 超时条件,重置 selectCnt 为 1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
// 重建 Selector 对象
rebuildSelector();
// 修改下 Selector 对象
selector = this.selector;
// Select again to populate selectedKeys.
// 立即 selectNow 一次,非阻塞
selector.selectNow();
// 重置 selectCnt 为 1
selectCnt = 1;
// 结束 select
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
}
// Harmless exception - log anyway
}
}
}
处理Channel感兴趣的就绪IO事件
public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized() { // 遍历数组 for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); // 处理一个 Channel 就绪的 IO 事件 if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); // 使用 NioTask 处理一个 Channel 就绪的 IO 事件 } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // TODO 1007 NioEventLoop cancel 方法 if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } // 遍历 SelectionKey 迭代器 Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { // 获得 SelectionKey 对象 final SelectionKey k = i.next(); // 从迭代器中移除 i.remove(); final Object a = k.attachment(); // 处理一个 Channel 就绪的 IO 事件 if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); // 使用 NioTask 处理一个 Channel 就绪的 IO 事件 } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 无下一个节点,结束 if (!i.hasNext()) { break; } // TODO 1007 NioEventLoop cancel 方法 if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } } }processSelectedKey
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 如果 SelectionKey 是不合法的,则关闭 Channel
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获得就绪的 IO 事件的 ops
int readyOps = k.readyOps();
// OP_CONNECT 事件就绪
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 移除对 OP_CONNECT 感兴趣
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 完成连接
unsafe.finishConnect();
}
// OP_WRITE 事件就绪
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
// 向 Channel 写入数据
ch.unsafe().forceFlush();
}
// SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪
// readyOps == 0 是对 JDK Bug 的处理,防止空的死循环
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
// 发生异常,关闭 Channel
unsafe.close(unsafe.voidPromise());
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0; // 未执行
try {
// 调用 NioTask 的 Channel 就绪事件
task.channelReady(k.channel(), k);
state = 1; // 执行成功
} catch (Exception e) {
// SelectionKey 取消
k.cancel();
// 执行 Channel 取消注册
invokeChannelUnregistered(task, k, e);
state = 2; // 执行异常
} finally {
switch (state) {
case 0:
// SelectionKey 取消
k.cancel();
// 执行 Channel 取消注册
invokeChannelUnregistered(task, k, null);
break;
case 1:
// SelectionKey 不合法,则执行 Channel 取消注册
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
}
- 运行所有普通任务和定时任务
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 不限制时间
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false; // 是否执行过任务
do {
// 从定时任务获得到时间的任务
fetchedAll = fetchFromScheduledTaskQueue();
// 执行任务队列中的所有任务
if (runAllTasksFrom(taskQueue)) {
// 若有任务执行,则标记为 true
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
// 如果执行过任务,则设置最后执行时间
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 执行所有任务完成的后续方法
afterRunningAllTasks();
return ranAtLeastOne;
}
// 限制时间
protected boolean runAllTasks(long timeoutNanos) {
// 从定时任务获得到时间的任务
fetchFromScheduledTaskQueue();
// 获得队头的任务
Runnable task = pollTask();
// 获取不到,结束执行
if (task == null) {
// 执行所有任务完成的后续方法
afterRunningAllTasks();
return false;
}
// 计算执行任务截止时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0; // 执行任务计数
long lastExecutionTime;
// 循环执行任务
for (;;) {
// 执行任务
safeExecute(task);
// 计数 +1
runTasks ++;
// 每隔 64 个任务检查一次时间,因为 nanoTime() 是相对费时的操作
// 64 这个值当前是硬编码的,无法配置,可能会成为一个问题.
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
// 重新获得时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
// 超过任务截止时间,结束
if (lastExecutionTime >= deadline) {
break;
}
}
// 获得队头的任务
task = pollTask();
// 获取不到,结束执行
if (task == null) {
// 重新获得时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 执行所有任务完成的后续方法
afterRunningAllTasks();
// 设置最后执行时间
this.lastExecutionTime = lastExecutionTime;
return true;
}
}
关闭
public final class NioEventLoop extends SingleThreadEventLoop { private void closeAll() { selectAgain(); Set<SelectionKey> keys = selector.keys(); Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size()); for (SelectionKey k: keys) { Object a = k.attachment(); if (a instanceof AbstractNioChannel) { channels.add((AbstractNioChannel) a); } else { k.cancel(); @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, k, null); } } for (AbstractNioChannel ch: channels) { ch.unsafe().close(ch.unsafe().voidPromise()); } } }
5.和NioChannel类的关系

6.处理事件
6.1 Server端NioEventLoop处理的事件

6.2 Client端NioEventLoop处理的事件
