Channel和Unsafe
1.Channel
1.1 功能说明
io.netty.channel.Channel是Netty的网络操作抽象类,聚合了一组功能,包括但不限于网络读写、客户端发起连接、主动关闭连接,同时也包含了Netty框架相关的一些功能,包括获取Channel的EventLoop,获取缓冲区分配器ByteBufAllocator和pipeline等.
1.2 JDK原生Channel对比
- JDK的
SocketChannel和ServerSocketChannel没有统一的Channel接口供业务开发者使用.对用户而言,没有统一的操作视图,使用起来不方便. - JDK的
SocketChannel和ServerSocketChannel是SPI类接口,通过继承来扩展很不方便,不如开发一个新的. - Netty的Channel需要能跟Netty架构融合在一起.
- 自定义Channel功能实现会更灵活
1.3 设计理念
- 在Channel接口层,采用
Facade模式统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供. - Channel接口定义尽量大而全,为
SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用. - 具体实现采用聚合而非包含的方式,Channel负责统一分配和调度,更加灵活.
1.4 Channel的功能
- 通道状态主要包括:打开、关闭、连接
- 通道主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)
- 获取EventLoop
- 元数据 metadata,获取 TCP 参数配置等
- 所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中.
channel有父子关系,SocketChannel是通过ServerSocketChannel接受创建的,故SocketChannel的parent()方法返回的就是ServerSocketChannel.- 在
Channel使用完毕后,请调用close方法,释放通道占用的资源
2.源码分析

2.1 Channel
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
/**
* Returns the globally unique identifier of this {@link Channel}.
*
* Channel 的编号
*/
ChannelId id();
/**
* Return the {@link EventLoop} this {@link Channel} was registered to.
*
* Channel 注册到的 EventLoop
*/
EventLoop eventLoop();
/**
* Returns the parent of this channel.
*
* 父 Channel 对象
*
* @return the parent channel.
* {@code null} if this channel does not have a parent channel.
*/
Channel parent();
/**
* Returns the configuration of this channel.
*
* Channel 配置参数
*/
ChannelConfig config();
/**
* Returns {@code true} if the {@link Channel} is open and may get active later
*
* Channel 是否打开.
*
* true 表示 Channel 可用
* false 表示 Channel 已关闭,不可用
*/
boolean isOpen();
/**
* Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.
*
* Channel 是否注册
*
* true 表示 Channel 已注册到 EventLoop 上
* false 表示 Channel 未注册到 EventLoop 上
*/
boolean isRegistered();
/**
* Return {@code true} if the {@link Channel} is active and so connected.
*
* Channel 是否激活
*
* 对于服务端 ServerSocketChannel ,true 表示 Channel 已经绑定到端口上,可提供服务
* 对于客户端 SocketChannel ,true 表示 Channel 连接到远程服务器
*/
boolean isActive();
/**
* Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.
*
* Channel 元数据
*/
ChannelMetadata metadata();
/**
* Returns the local address where this channel is bound to. The returned
* {@link SocketAddress} is supposed to be down-cast into more concrete
* type such as {@link InetSocketAddress} to retrieve the detailed
* information.
*
* 本地地址
*
* @return the local address of this channel.
* {@code null} if this channel is not bound.
*/
SocketAddress localAddress();
/**
* Returns the remote address where this channel is connected to. The
* returned {@link SocketAddress} is supposed to be down-cast into more
* concrete type such as {@link InetSocketAddress} to retrieve the detailed
* information.
*
* 远端地址
*
* @return the remote address of this channel.
* {@code null} if this channel is not connected.
* If this channel is not connected but it can receive messages
* from arbitrary remote addresses (e.g. {@link DatagramChannel},
* use {@link DatagramPacket#recipient()} to determine
* the origination of the received message as this method will
* return {@code null}.
*/
SocketAddress remoteAddress();
/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
*
* Channel 关闭的 Future 对象
*/
ChannelFuture closeFuture();
/**
* Returns {@code true} if and only if the I/O thread will perform the
* requested write operation immediately. Any write requests made when
* this method returns {@code false} are queued until the I/O thread is
* ready to process the queued write requests.
*
* Channel 是否可写
*
* 当 Channel 的写缓存区 outbound 非 null 且可写时,返回 true
*/
boolean isWritable();
/**
* 获得距离不可写还有多少字节数
*
* Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
*/
long bytesBeforeUnwritable();
/**
* 获得距离可写还要多少字节数
*
* Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
*/
long bytesBeforeWritable();
/**
* Returns an <em>internal-use-only</em> object that provides unsafe operations.
*
* Unsafe 对象
*/
Unsafe unsafe();
/**
* Return the assigned {@link ChannelPipeline}.
*
* ChannelPipeline 对象,用于处理 Inbound 和 Outbound 事件的处理
*/
ChannelPipeline pipeline();
/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*
* ByteBuf 分配器
*/
ByteBufAllocator alloc();
@Override
Channel read();
@Override
Channel flush();
/**
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
* <ul>
* <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
* <li>{@link #register(EventLoop, ChannelPromise)}</li>
* <li>{@link #deregister(ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li>
* </ul>
*/
interface Unsafe {
/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*
* ByteBuf 分配器的处理器
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
*
* 本地地址
*/
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or
* {@code null} if none is bound yet.
*
* 远端地址
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass {@code null} to it.
*
* The {@link ChannelPromise} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void disconnect(ChannelPromise promise);
/**
* Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void close(ChannelPromise promise);
/**
* Closes the {@link Channel} immediately without firing any events. Probably only useful
* when registration attempt failed.
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
*/
void beginRead();
/**
* Schedules a write operation.
*/
void write(Object msg, ChannelPromise promise);
/**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/
void flush();
/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();
/**
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
*/
ChannelOutboundBuffer outboundBuffer();
}
}
2.2 AbstractChannel
成员变量
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { /** * 父 Channel 对象 */ private final Channel parent; /** * Channel 编号 */ private final ChannelId id; /** * Unsafe 对象 */ private final Unsafe unsafe; /** * DefaultChannelPipeline 对象 */ private final DefaultChannelPipeline pipeline; private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); /** * 本地地址 */ private volatile SocketAddress localAddress; /** * 远端地址 */ private volatile SocketAddress remoteAddress; /** * EventLoop 对象 */ private volatile EventLoop eventLoop; /** * 是否注册 */ private volatile boolean registered; /** * 关闭是否已经初始化 * * @see AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, boolean) */ private boolean closeInitiated; /** Cache for the string representation of this channel */ private boolean strValActive; private String strVal; /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { this.parent = parent; // 创建 ChannelId 对象 id = newId(); // 创建 Unsafe 对象 unsafe = newUnsafe(); // 创建 DefaultChannelPipeline 对象 pipeline = newChannelPipeline(); } }核心API
网络I/O操作是直接触发ChannelPipeline中的对应的事件方法.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
}
- 公共的API的具体实现
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { @Override public SocketAddress remoteAddress() { SocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { this.remoteAddress = remoteAddress = unsafe().remoteAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return remoteAddress; } }
2.3 AbstractNioChannel
成员变量
public abstract class AbstractNioChannel extends AbstractChannel { /** * Netty NIO Channel 对象,持有的 Java 原生 NIO 的 Channel 对象 */ private final SelectableChannel ch; /** * 感兴趣读事件的操作位值, 代表了JDK SelectionKey的OP_READ */ protected final int readInterestOp; /** * 是Channel注册到EventLoop后返回的选择键 */ volatile SelectionKey selectionKey; boolean readPending; /** * 移除对“读”事件感兴趣的 Runnable 对象 */ private final Runnable clearReadPendingRunnable = new Runnable() { @Override public void run() { clearReadPending0(); } }; /** * 目前正在连接远程地址的 ChannelPromise 对象. * * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise; /** * 连接超时监听 ScheduledFuture 对象. */ private ScheduledFuture<?> connectTimeoutFuture; /** * 正在连接的远程地址 */ private SocketAddress requestedRemoteAddress; }核心API
public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doRegister() throws Exception { // 用selected标识是否操作成功 boolean selected = false; for (;;) { try { // 调用SelectableChannel的register方法, 将当前Channel注册到EventLoop的多路复用器上 // 注册的是0, 说明对任何事件都不敢兴趣,仅仅完成注册操作 // 如果注册成功,返回selectionKey,通过selectionKey可以从多路复用器中获取Channel对象 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // 该key已经被取消 if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. // 调用该方法将取消的key从selector中删除掉 eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } }
注册Channel的时候需要指定监听的忘了操作位来表示Channel对哪几类网络事件感兴趣
public abstract class SelectionKey {
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
}
2.4 AbstractNioByteChannel
成员变量
public abstract class AbstractNioByteChannel extends AbstractNioChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); // 负责继续写半包消息 private final Runnable flushTask = new Runnable() { @Override public void run() { // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the // meantime. ((AbstractNioUnsafe) unsafe()).flush0(); } }; /** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates */ protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } }核心方法
public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { // 类似并发编程的自旋次数, 尝试将数据写完 int writeSpinCount = config().getWriteSpinCount(); do { Object msg = in.current(); if (msg == null) { // 如果为空,说明所有待发送的消息都已经发送完成 // Wrote all messages. // 清除半包标识 clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // 发送消息不为空,继续处理 writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); // 处理半包发送任务 incompleteWrite(writeSpinCount < 0); } protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { // 合法 return; } final int interestOps = key.interestOps(); // 注册 SelectionKey.OP_WRITE 事件的感兴趣 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } } protected final void clearOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { // 合法 return; } final int interestOps = key.interestOps(); // 若注册了 SelectionKey.OP_WRITE ,则进行取消 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } } private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { in.remove(); return 0; } final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; if (region.transferred() >= region.count()) { in.remove(); return 0; } long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; } protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. // true ,注册对 SelectionKey.OP_WRITE 事件感兴趣 if (setOpWrite) { setOpWrite(); // false ,取消对 SelectionKey.OP_WRITE 事件感兴趣 } else { // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then // use our write quantum. In this case we no longer want to set the write OP because the socket is still // writable (as far as we know). We will find out next time we attempt to write if the socket is writable // and set the write OP if necessary. clearOpWrite(); // Schedule flush again later so other tasks can be picked up in the meantime // 立即发起下一次 flush 任务 eventLoop().execute(flushTask); } } }
2.5 AbstractNioMessageChannel
主要的方法也是doWrite,只不过上面发送的的是ByteBuf,而对于AbstractNioMessageChannel发送的是POJO
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
// 从ChannelOutboundBuffer中弹出一条消息进行处理
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
}
2.6 NioServerSocketChannel
成员变量
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a server socket.", e); } } private final ServerSocketChannelConfig config; }核心方法
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { @Override protected int doReadMessages(List<Object> buf) throws Exception { // 接受客户端连接 SocketChannel ch = SocketUtils.accept(javaChannel()); try { // 创建 Netty NioSocketChannel 对象 if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); // 发生异常,关闭客户端的 SocketChannel 连接 try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } }其它与服务的无关的
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { @Override protected boolean doConnect( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { throw new UnsupportedOperationException(); } @Override protected void doFinishConnect() throws Exception { throw new UnsupportedOperationException(); } @Override protected SocketAddress remoteAddress0() { return null; } @Override protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } @Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } }
2.7 NioServerSocketChannel
连接操作
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { // 绑定本地地址 if (localAddress != null) { doBind0(localAddress); } boolean success = false; // 执行是否成功 try { // 连接远程地址 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); // 若未连接完成,则关注连接( OP_CONNECT )事件. if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } // 标记执行是否成功 success = true; // 返回是否连接完成 return connected; } finally { // 执行失败,则关闭 Channel if (!success) { doClose(); } } } }写操作
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); // 获得自旋写入次数 int writeSpinCount = config().getWriteSpinCount(); do { // 内存队列为空,结束循环,直接返回 if (in.isEmpty()) { // 取消对 SelectionKey.OP_WRITE 的感兴趣 // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // 获得每次写入的最大字节数 // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 从内存队列中,获得要写入的 ByteBuffer 数组 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // 写入的 ByteBuffer 数组的个数 int nioBufferCnt = in.nioBufferCount(); // 写入 ByteBuffer 数组,到对端 // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); // 执行 NIO write 调用,写入单个 ByteBuffer 对象到对端 final int localWrittenBytes = ch.write(buffer); // 写入字节小于等于 0 ,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写,并返回以结束循环 if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // 调整每次写入的最大字节数 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 从内存队列中,移除已经写入的数据( 消息 ) in.removeBytes(localWrittenBytes); // 写入次数减一 --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); // 执行 NIO write 调用,写入多个 ByteBuffer 到对端 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); // 写入字节小于等于 0 ,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写,并返回以结束循环 if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // 调整每次写入的最大字节数 // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); // 从内存队列中,移除已经写入的数据( 消息 ) in.removeBytes(localWrittenBytes); // 写入次数减一 --writeSpinCount; break; } } } while (writeSpinCount > 0); // 循环自旋写入 // 内存队列中的数据未完全写入,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写 incompleteWrite(writeSpinCount < 0); } }
3.Unsafe
Unsafe函数不允许被用户代码使用,这些函数是真正用于数据传输操作,必须被IO线程调用
| 序号 | 接口 | 注释 |
|---|---|---|
| 1 | ChannelHandlerInvoker invoker(); |
返回默认使用的ChannelHandlerInvoker |
| 2 | SocketAddress localAddress(); |
返回本地绑定的Socket地址 |
| 3 | SocketAddress remoteAddress(); |
返回通信对端的Socket地址 |
| 4 | void register(ChannelPromise promise); |
注册Channel到多路复用器,一旦注册操作完成,通知ChannelFuture |
| 5 | void bind(SocketAddress localAddress, ChannelPromise promise); |
绑定指定的本地地址localAddress到当前的Channel上,一旦完成,通知ChannelFuture |
| 6 | void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); |
绑定本地的localAddress到当前的Channel上,连接服务器,一旦操作完成,通知ChannelFuture |
| 7 | void disconnect(ChannelPromise promise); |
断开Channel连接,一旦完成,通知ChannelFuture |
| 8 | void close(ChannelPromise promise); |
关闭Channel连接,一旦完成,通知ChannelFuture |
| 9 | void closeForcibly(); |
强制立即关闭连接 |
| 10 | void beginRead(); |
设置网络操位为读用于读取消息 |
| 11 | void write(Object msg, ChannelPromise promise); |
发送消息,一旦完成,通知ChannelFuture |
| 12 | void flush(); |
将发送缓存数据中的消息写入Channel中 |
| 13 | ChannelPromise voidPromise(); |
返回一个特殊的可重用和传递的ChannelPromise,它不用于操作成功或者失败的通知器,仅仅作为一个容器被使用 |
| 14 | ChannelOutboundBuffer outboundBuffer(); |
返回消息发送缓冲区 |
4.Unsafe源码分析

Unsafe读写操作

4.1 AbstractUnsafe
- register方法
用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上.
/**
* {@link Unsafe} implementation which sub-classes must extend and use.
*/
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 校验传入的 eventLoop 非空
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 校验未注册
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 校验 Channel 和 eventLoop 匹配
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 设置 Channel 的 eventLoop 属性
AbstractChannel.this.eventLoop = eventLoop;
// 在 EventLoop 中执行注册逻辑
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 由用户线程或者其他线程发起的注册操作
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + ": register");
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() // TODO 1001 Promise
|| !ensureOpen(promise)) { // 确保 Channel 是打开的
return;
}
// 记录是否为首次注册
boolean firstRegistration = neverRegistered;
// 执行注册逻辑
doRegister();
// 标记首次注册为 false
neverRegistered = false;
// 标记 Channel 为已注册
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
// 回调通知 `promise` 执行成功
safeSetSuccess(promise);
// 触发通知已注册事件
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
- bind方法 用于绑定指定的端口,对于服务端,用于绑定监听端口;对于客户端,主要用于指定客户端Channel的本地绑定Socket地址
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 判断是否在 EventLoop 的线程中.
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 记录 Channel 是否激活
boolean wasActive = isActive();
// 绑定 Channel 的端口
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 若 Channel 是新激活的,触发通知 Channel 已激活的事件.
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 回调通知 promise 执行成功
safeSetSuccess(promise);
}
}
- disconnect方法 用于客户端或者服务端主动关闭连接
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
}
- close 关闭链路
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
// 关闭
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) {
// 设置 Promise 不可取消
if (!promise.setUncancellable()) {
return;
}
// 若关闭已经标记初始化
if (closeInitiated) {
// 关闭已经完成,直接通知 Promise 对象
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
// 关闭未完成,通过监听器通知 Promise 对象
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
// 标记关闭已经初始化
closeInitiated = true;
// 获得 Channel 是否激活
final boolean wasActive = isActive();
// 标记 outboundBuffer 为空
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
// 执行准备关闭
Executor closeExecutor = prepareToClose();
// 若 closeExecutor 非空
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 在 closeExecutor 中,执行关闭
// Execute the close.
doClose0(promise);
} finally {
// 在 EventLoop 中,执行
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// 写入数据( 消息 )到对端失败,通知相应数据对应的 Promise 失败.
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
// 关闭内存队列
outboundBuffer.close(closeCause);
}
// 执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
// 若 closeExecutor 为空
} else {
try {
// 执行关闭
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// 写入数据( 消息 )到对端失败,通知相应数据对应的 Promise 失败.
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
// 关闭内存队列
outboundBuffer.close(closeCause);
}
}
// 正在 flush 中,在 EventLoop 中执行执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
// 不在 flush 中,直接执行执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
// 执行关闭
doClose();
// 通知 closeFuture 关闭完成
closeFuture.setClosed();
// 通知 Promise 关闭成功
safeSetSuccess(promise);
} catch (Throwable t) {
// 通知 closeFuture 关闭完成
closeFuture.setClosed();
// 通知 Promise 关闭异常
safeSetFailure(promise, t);
}
}
}
- write 将消息添加到环形发送数组中,并不是真正的写Channel
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 内存队列为空
if (outboundBuffer == null) {
// 内存队列为空,一般是 Channel 已经关闭,所以通知 Promise 异常结果
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// 释放消息( 对象 )相关的资源
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 过滤写入的消息( 数据 )
msg = filterOutboundMessage(msg);
// 计算消息的长度
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
// 通知 Promise 异常结果
safeSetFailure(promise, t);
// 释放消息( 对象 )相关的资源
ReferenceCountUtil.release(msg);
return;
}
// 写入消息( 数据 )到内存队列
outboundBuffer.addMessage(msg, size, promise);
}
}
flush 负责将发送缓冲区中待发送的消息全部写入Channel中,并发送给通信对方 ```java protected abstract class AbstractUnsafe implements Unsafe { @Override public final void flush() {
assertEventLoop(); // 内存队列为 null ,一般是 Channel 已经关闭,所以直接返回. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // 标记内存队列开始 flush outboundBuffer.addFlush(); // 执行 flush flush0();}
protected void flush0() {
// 正在 flush 中,所以直接返回. if (inFlush0) { // Avoid re-entrance return; } // 内存队列为 null ,一般是 Channel 已经关闭,所以直接返回. // 内存队列为空,无需 flush ,所以直接返回 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } // 标记正在 flush 中. inFlush0 = true; // 若未激活,通知 flush 失败 // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { // 标记不在 flush 中. inFlush0 = false; } return; } // 执行真正的写入到对端 try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { // 标记不在 flush 中. inFlush0 = false; }} }
public final class ChannelOutboundBuffer { /**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
// 若 flushedEntry 为空,赋值为 unflushedEntry ,用于记录第一个( 开始 ) flush 的 Entry .
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
// 计算 flush 的数量,并设置每个 Entry 对应的 Promise 不可取消
do {
// 增加 flushed
flushed ++;
// 设置 Promise 不可取消
if (!entry.promise.setUncancellable()) { // 设置失败
// 减少 totalPending 计数
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
// 获得下一个 Entry
entry = entry.next;
} while (entry != null);
// 设置 unflushedEntry 为空,表示所有都 flush
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
}
### 4.2 AbstractNioUnsafe
1. connect
```java
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
// 目前有正在连接远程地址的 ChannelPromise ,则直接抛出异常,禁止同时发起多个连接.
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
// 记录 Channel 是否激活
boolean wasActive = isActive();
// 执行连接远程地址
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
// 记录 connectPromise
connectPromise = promise;
// 记录 requestedRemoteAddress
requestedRemoteAddress = remoteAddress;
// 使用 EventLoop 发起定时任务,监听连接远程地址超时.若连接超时,则回调通知 connectPromise 超时异常.
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis(); // 默认 30 * 1000 毫秒
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 添加监听器,监听连接远程地址取消.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
// 取消定时任务
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
// 置空 connectPromise
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
// 回调通知 promise 发生异常
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// 获得 Channel 是否激活
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// 回调通知 promise 执行成功
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// 若 Channel 是新激活的,触发通知 Channel 已激活的事件.
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
}
finishConnect
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { @Override public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. // 判断是否在 EventLoop 的线程中. assert eventLoop().inEventLoop(); try { // 获得 Channel 是否激活 boolean wasActive = isActive(); // 执行完成连接 doFinishConnect(); // 通知 connectPromise 连接完成 fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { // 通知 connectPromise 连接异常 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // 取消 connectTimeoutFuture 任务 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } // 置空 connectPromise connectPromise = null; } } }
4.3 NioByteUnsafe
read
protected class NioByteUnsafe extends AbstractNioUnsafe { @Override @SuppressWarnings("Duplicates") public final void read() { final ChannelConfig config = config(); // 若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣. if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); // 获得 RecvByteBufAllocator.Handle 对象 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 重置 RecvByteBufAllocator.Handle 对象 allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; // 是否关闭连接 try { do { // 申请 ByteBuf 对象 byteBuf = allocHandle.allocate(allocator); // 读取数据 // 设置最后读取字节数 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // <1> 未读取到数据 if (allocHandle.lastBytesRead() <= 0) { // 释放 ByteBuf 对象 // nothing was read. release the buffer. byteBuf.release(); // 置空 ByteBuf 对象 byteBuf = null; // 如果最后读取的字节为小于 0 ,说明对端已经关闭 close = allocHandle.lastBytesRead() < 0; // TODO if (close) { // There is nothing left to read as we received an EOF. readPending = false; } // 结束循环 break; } // <2> 读取到数据 // 读取消息数量 + localRead allocHandle.incMessagesRead(1); readPending = false; // 触发 Channel read 事件到 pipeline 中 pipeline.fireChannelRead(byteBuf); // 置空 ByteBuf 对象 byteBuf = null; } while (allocHandle.continueReading()); // 循环判断是否继续读取 // 读取完成 allocHandle.readComplete(); // 触发 Channel readComplete 事件到 pipeline 中. pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }