Channel和Unsafe

1.Channel

1.1 功能说明

io.netty.channel.Channel是Netty的网络操作抽象类,聚合了一组功能,包括但不限于网络读写、客户端发起连接、主动关闭连接,同时也包含了Netty框架相关的一些功能,包括获取Channel的EventLoop,获取缓冲区分配器ByteBufAllocator和pipeline等.

1.2 JDK原生Channel对比

  • JDK的SocketChannelServerSocketChannel没有统一的Channel接口供业务开发者使用.对用户而言,没有统一的操作视图,使用起来不方便.
  • JDK的SocketChannelServerSocketChannel是SPI类接口,通过继承来扩展很不方便,不如开发一个新的.
  • Netty的Channel需要能跟Netty架构融合在一起.
  • 自定义Channel功能实现会更灵活

1.3 设计理念

  • 在Channel接口层,采用Facade模式统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供.
  • Channel接口定义尽量大而全,为SocketChannelServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用.
  • 具体实现采用聚合而非包含的方式,Channel负责统一分配和调度,更加灵活.

1.4 Channel的功能

  • 通道状态主要包括:打开、关闭、连接
  • 通道主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)
  • 获取EventLoop
  • 元数据 metadata,获取 TCP 参数配置等
  • 所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中.
  • channel有父子关系,SocketChannel是通过ServerSocketChannel接受创建的,故SocketChannel的parent()方法返回的就是ServerSocketChannel.
  • Channel使用完毕后,请调用close方法,释放通道占用的资源

2.源码分析

类继承关系图

2.1 Channel

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    /**
     * Returns the globally unique identifier of this {@link Channel}.
     *
     * Channel 的编号
     */
    ChannelId id();

    /**
     * Return the {@link EventLoop} this {@link Channel} was registered to.
     *
     * Channel 注册到的 EventLoop
     */
    EventLoop eventLoop();

    /**
     * Returns the parent of this channel.
     *
     * 父 Channel 对象
     *
     * @return the parent channel.
     *         {@code null} if this channel does not have a parent channel.
     */
    Channel parent();

    /**
     * Returns the configuration of this channel.
     *
     * Channel 配置参数
     */
    ChannelConfig config();

    /**
     * Returns {@code true} if the {@link Channel} is open and may get active later
     *
     * Channel 是否打开.
     *
     * true 表示 Channel 可用
     * false 表示 Channel 已关闭,不可用
     */
    boolean isOpen();

    /**
     * Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.
     *
     * Channel 是否注册
     *
     * true 表示 Channel 已注册到 EventLoop 上
     * false 表示 Channel 未注册到 EventLoop 上
     */
    boolean isRegistered();

    /**
     * Return {@code true} if the {@link Channel} is active and so connected.
     *
     * Channel 是否激活
     *
     * 对于服务端 ServerSocketChannel ,true 表示 Channel 已经绑定到端口上,可提供服务
     * 对于客户端 SocketChannel ,true 表示 Channel 连接到远程服务器
     */
    boolean isActive();

    /**
     * Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.
     *
     * Channel 元数据
     */
    ChannelMetadata metadata();

    /**
     * Returns the local address where this channel is bound to.  The returned
     * {@link SocketAddress} is supposed to be down-cast into more concrete
     * type such as {@link InetSocketAddress} to retrieve the detailed
     * information.
     *
     * 本地地址
     *
     * @return the local address of this channel.
     *         {@code null} if this channel is not bound.
     */
    SocketAddress localAddress();

    /**
     * Returns the remote address where this channel is connected to.  The
     * returned {@link SocketAddress} is supposed to be down-cast into more
     * concrete type such as {@link InetSocketAddress} to retrieve the detailed
     * information.
     *
     * 远端地址
     *
     * @return the remote address of this channel.
     *         {@code null} if this channel is not connected.
     *         If this channel is not connected but it can receive messages
     *         from arbitrary remote addresses (e.g. {@link DatagramChannel},
     *         use {@link DatagramPacket#recipient()} to determine
     *         the origination of the received message as this method will
     *         return {@code null}.
     */
    SocketAddress remoteAddress();

    /**
     * Returns the {@link ChannelFuture} which will be notified when this
     * channel is closed.  This method always returns the same future instance.
     *
     * Channel 关闭的 Future 对象
     */
    ChannelFuture closeFuture();

    /**
     * Returns {@code true} if and only if the I/O thread will perform the
     * requested write operation immediately.  Any write requests made when
     * this method returns {@code false} are queued until the I/O thread is
     * ready to process the queued write requests.
     *
     * Channel 是否可写
     *
     * 当 Channel 的写缓存区 outbound 非 null 且可写时,返回 true
     */
    boolean isWritable();

    /**
     * 获得距离不可写还有多少字节数
     *
     * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
     */
    long bytesBeforeUnwritable();

    /**
     * 获得距离可写还要多少字节数
     *
     * Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
     */
    long bytesBeforeWritable();

    /**
     * Returns an <em>internal-use-only</em> object that provides unsafe operations.
     *
     * Unsafe 对象
     */
    Unsafe unsafe();

    /**
     * Return the assigned {@link ChannelPipeline}.
     *
     * ChannelPipeline 对象,用于处理 Inbound 和 Outbound 事件的处理
     */
    ChannelPipeline pipeline();

    /**
     * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
     *
     * ByteBuf 分配器
     */
    ByteBufAllocator alloc();

    @Override
    Channel read();

    @Override
    Channel flush();

    /**
     * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
     * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
     * following methods:
     * <ul>
     *   <li>{@link #localAddress()}</li>
     *   <li>{@link #remoteAddress()}</li>
     *   <li>{@link #closeForcibly()}</li>
     *   <li>{@link #register(EventLoop, ChannelPromise)}</li>
     *   <li>{@link #deregister(ChannelPromise)}</li>
     *   <li>{@link #voidPromise()}</li>
     * </ul>
     */
    interface Unsafe {

        /**
         * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
         * receiving data.
         *
         * ByteBuf 分配器的处理器
         */
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        /**
         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
         *
         * 本地地址
         */
        SocketAddress localAddress();

        /**
         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
         *
         * 远端地址
         */
        SocketAddress remoteAddress();

        /**
         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

        /**
         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);

        /**
         * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
         * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
         * pass {@code null} to it.
         *
         * The {@link ChannelPromise} will get notified once the connect operation was complete.
         */
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

        /**
         * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void disconnect(ChannelPromise promise);

        /**
         * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void close(ChannelPromise promise);

        /**
         * Closes the {@link Channel} immediately without firing any events.  Probably only useful
         * when registration attempt failed.
         */
        void closeForcibly();

        /**
         * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
         * {@link ChannelPromise} once the operation was complete.
         */
        void deregister(ChannelPromise promise);

        /**
         * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
         * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
         */
        void beginRead();

        /**
         * Schedules a write operation.
         */
        void write(Object msg, ChannelPromise promise);

        /**
         * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
         */
        void flush();

        /**
         * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
         * It will never be notified of a success or error and so is only a placeholder for operations
         * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
         */
        ChannelPromise voidPromise();

        /**
         * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
         */
        ChannelOutboundBuffer outboundBuffer();
    }
}

2.2 AbstractChannel

  1. 成员变量

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
     /**
      * 父 Channel 对象
      */
     private final Channel parent;
     /**
      * Channel 编号
      */
     private final ChannelId id;
     /**
      * Unsafe 对象
      */
     private final Unsafe unsafe;
     /**
      * DefaultChannelPipeline 对象
      */
     private final DefaultChannelPipeline pipeline;
     private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
     private final CloseFuture closeFuture = new CloseFuture(this);
    
     /**
      * 本地地址
      */
     private volatile SocketAddress localAddress;
     /**
      * 远端地址
      */
     private volatile SocketAddress remoteAddress;
     /**
      * EventLoop 对象
      */
     private volatile EventLoop eventLoop;
     /**
      * 是否注册
      */
     private volatile boolean registered;
     /**
      * 关闭是否已经初始化
      *
      * @see AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, boolean)
      */
     private boolean closeInitiated;
    
     /** Cache for the string representation of this channel */
     private boolean strValActive;
     private String strVal;
    
     /**
      * Creates a new instance.
      *
      * @param parent
      *        the parent of this channel. {@code null} if there's no parent.
      */
     protected AbstractChannel(Channel parent) {
         this.parent = parent;
         // 创建 ChannelId 对象
         id = newId();
         // 创建 Unsafe 对象
         unsafe = newUnsafe();
         // 创建 DefaultChannelPipeline 对象
         pipeline = newChannelPipeline();
     }
    }
    
  2. 核心API

网络I/O操作是直接触发ChannelPipeline中的对应的事件方法.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

    @Override
    public ChannelFuture deregister() {
        return pipeline.deregister();
    }

    @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }
}
  1. 公共的API的具体实现
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
     @Override
     public SocketAddress remoteAddress() {
         SocketAddress remoteAddress = this.remoteAddress;
         if (remoteAddress == null) {
             try {
                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
             } catch (Throwable t) {
                 // Sometimes fails on a closed socket in Windows.
                 return null;
             }
         }
         return remoteAddress;
     }  
    }
    

2.3 AbstractNioChannel

  1. 成员变量

    public abstract class AbstractNioChannel extends AbstractChannel {
     /**
      * Netty NIO Channel 对象,持有的 Java 原生 NIO 的 Channel 对象
      */
     private final SelectableChannel ch;
     /**
      * 感兴趣读事件的操作位值, 代表了JDK SelectionKey的OP_READ
      */
     protected final int readInterestOp;
     /**
      * 是Channel注册到EventLoop后返回的选择键
      */
     volatile SelectionKey selectionKey;
     boolean readPending;
    
     /**
      * 移除对“读”事件感兴趣的 Runnable 对象
      */
     private final Runnable clearReadPendingRunnable = new Runnable() {
         @Override
         public void run() {
             clearReadPending0();
         }
     };
    
     /**
      * 目前正在连接远程地址的 ChannelPromise 对象.
      *
      * The future of the current connection attempt.  If not null, subsequent
      * connection attempts will fail.
      */
     private ChannelPromise connectPromise;
     /**
      * 连接超时监听 ScheduledFuture 对象.
      */
     private ScheduledFuture<?> connectTimeoutFuture;
     /**
      * 正在连接的远程地址
      */
     private SocketAddress requestedRemoteAddress;
    }
    
  2. 核心API

    public abstract class AbstractNioChannel extends AbstractChannel {
     @Override
     protected void doRegister() throws Exception {
         // 用selected标识是否操作成功
         boolean selected = false;
         for (;;) {
             try {
                 // 调用SelectableChannel的register方法, 将当前Channel注册到EventLoop的多路复用器上
                 // 注册的是0, 说明对任何事件都不敢兴趣,仅仅完成注册操作
                 // 如果注册成功,返回selectionKey,通过selectionKey可以从多路复用器中获取Channel对象
                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                 return;
             } catch (CancelledKeyException e) {
                 // 该key已经被取消
                 if (!selected) {
                     // Force the Selector to select now as the "canceled" SelectionKey may still be
                     // cached and not removed because no Select.select(..) operation was called yet.
                     // 调用该方法将取消的key从selector中删除掉
                     eventLoop().selectNow();
                     selected = true;
                 } else {
                     // We forced a select operation on the selector before but the SelectionKey is still cached
                     // for whatever reason. JDK bug ?
                     throw e;
                 }
             }
         }
     }
    }
    

注册Channel的时候需要指定监听的忘了操作位来表示Channel对哪几类网络事件感兴趣

public abstract class SelectionKey {
    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;
}

2.4 AbstractNioByteChannel

  1. 成员变量

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    
     // 负责继续写半包消息
     private final Runnable flushTask = new Runnable() {
         @Override
         public void run() {
             // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
             // meantime.
             ((AbstractNioUnsafe) unsafe()).flush0();
         }
     };
    
     /**
      * Create a new instance
      *
      * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
      * @param ch                the underlying {@link SelectableChannel} on which it operates
      */
     protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
         super(parent, ch, SelectionKey.OP_READ);
     }
    }
    
  2. 核心方法

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
     @Override
     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
         // 类似并发编程的自旋次数, 尝试将数据写完
         int writeSpinCount = config().getWriteSpinCount();
         do {
             Object msg = in.current();
             if (msg == null) {
                 // 如果为空,说明所有待发送的消息都已经发送完成
                 // Wrote all messages.
                 // 清除半包标识
                 clearOpWrite();
                 // Directly return here so incompleteWrite(...) is not called.
                 return;
             }
             // 发送消息不为空,继续处理
             writeSpinCount -= doWriteInternal(in, msg);
         } while (writeSpinCount > 0);
    
         // 处理半包发送任务
         incompleteWrite(writeSpinCount < 0);
     }
    
     protected final void setOpWrite() {
         final SelectionKey key = selectionKey();
         // Check first if the key is still valid as it may be canceled as part of the deregistration
         // from the EventLoop
         // See https://github.com/netty/netty/issues/2104
         if (!key.isValid()) { // 合法
             return;
         }
         final int interestOps = key.interestOps();
         // 注册 SelectionKey.OP_WRITE 事件的感兴趣
         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
             key.interestOps(interestOps | SelectionKey.OP_WRITE);
         }
     }
    
     protected final void clearOpWrite() {
         final SelectionKey key = selectionKey();
         // Check first if the key is still valid as it may be canceled as part of the deregistration
         // from the EventLoop
         // See https://github.com/netty/netty/issues/2104
         if (!key.isValid()) { // 合法
             return;
         }
         final int interestOps = key.interestOps();
         // 若注册了 SelectionKey.OP_WRITE ,则进行取消
         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
         }
     }
    
     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
         if (msg instanceof ByteBuf) {
             ByteBuf buf = (ByteBuf) msg;
             if (!buf.isReadable()) {
                 in.remove();
                 return 0;
             }
    
             final int localFlushedAmount = doWriteBytes(buf);
             if (localFlushedAmount > 0) {
                 in.progress(localFlushedAmount);
                 if (!buf.isReadable()) {
                     in.remove();
                 }
                 return 1;
             }
         } else if (msg instanceof FileRegion) {
             FileRegion region = (FileRegion) msg;
             if (region.transferred() >= region.count()) {
                 in.remove();
                 return 0;
             }
    
             long localFlushedAmount = doWriteFileRegion(region);
             if (localFlushedAmount > 0) {
                 in.progress(localFlushedAmount);
                 if (region.transferred() >= region.count()) {
                     in.remove();
                 }
                 return 1;
             }
         } else {
             // Should not reach here.
             throw new Error();
         }
         return WRITE_STATUS_SNDBUF_FULL;
     }
    
     protected final void incompleteWrite(boolean setOpWrite) {
         // Did not write completely.
         // true ,注册对 SelectionKey.OP_WRITE 事件感兴趣
         if (setOpWrite) {
             setOpWrite();
         // false ,取消对 SelectionKey.OP_WRITE 事件感兴趣
         } else {
             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
             // and set the write OP if necessary.
             clearOpWrite();
    
             // Schedule flush again later so other tasks can be picked up in the meantime
             // 立即发起下一次 flush 任务
             eventLoop().execute(flushTask);
         }
     }
    }
    

2.5 AbstractNioMessageChannel

主要的方法也是doWrite,只不过上面发送的的是ByteBuf,而对于AbstractNioMessageChannel发送的是POJO

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            // 从ChannelOutboundBuffer中弹出一条消息进行处理
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }

                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            } catch (Exception e) {
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throw e;
                }
            }
        }
    }  
}

2.6 NioServerSocketChannel

  1. 成员变量

    public class NioServerSocketChannel extends AbstractNioMessageChannel
                              implements io.netty.channel.socket.ServerSocketChannel {
     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
     private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
    
     private static ServerSocketChannel newSocket(SelectorProvider provider) {
         try {
             /**
              *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
              *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
              *
              *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
              */
             return provider.openServerSocketChannel();
         } catch (IOException e) {
             throw new ChannelException("Failed to open a server socket.", e);
         }
     }
    
     private final ServerSocketChannelConfig config;
    }
    
  2. 核心方法

    public class NioServerSocketChannel extends AbstractNioMessageChannel
                              implements io.netty.channel.socket.ServerSocketChannel {
         @Override
     protected int doReadMessages(List<Object> buf) throws Exception {
         // 接受客户端连接
         SocketChannel ch = SocketUtils.accept(javaChannel());
    
         try {
             // 创建 Netty NioSocketChannel 对象
             if (ch != null) {
                 buf.add(new NioSocketChannel(this, ch));
                 return 1;
             }
         } catch (Throwable t) {
             logger.warn("Failed to create a new channel from an accepted socket.", t);
             // 发生异常,关闭客户端的 SocketChannel 连接
             try {
                 ch.close();
             } catch (Throwable t2) {
                 logger.warn("Failed to close a socket.", t2);
             }
         }
    
         return 0;
     }
    }
    
  3. 其它与服务的无关的

    public class NioServerSocketChannel extends AbstractNioMessageChannel
                              implements io.netty.channel.socket.ServerSocketChannel {
     @Override
     protected boolean doConnect(
             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
         throw new UnsupportedOperationException();
     }
    
     @Override
     protected void doFinishConnect() throws Exception {
         throw new UnsupportedOperationException();
     }
    
     @Override
     protected SocketAddress remoteAddress0() {
         return null;
     }
    
     @Override
     protected void doDisconnect() throws Exception {
         throw new UnsupportedOperationException();
     }
    
     @Override
     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
         throw new UnsupportedOperationException();
     }
    }
    

2.7 NioServerSocketChannel

  1. 连接操作

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
     @Override
     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
         // 绑定本地地址
         if (localAddress != null) {
             doBind0(localAddress);
         }
    
         boolean success = false; // 执行是否成功
         try {
             // 连接远程地址
             boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
             // 若未连接完成,则关注连接( OP_CONNECT )事件.
             if (!connected) {
                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
             }
             // 标记执行是否成功
             success = true;
             // 返回是否连接完成
             return connected;
         } finally {
             // 执行失败,则关闭 Channel
             if (!success) {
                 doClose();
             }
         }
     }
    }
    
  2. 写操作

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
     @Override
     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
         SocketChannel ch = javaChannel();
         // 获得自旋写入次数
         int writeSpinCount = config().getWriteSpinCount();
         do {
             // 内存队列为空,结束循环,直接返回
             if (in.isEmpty()) {
                 // 取消对 SelectionKey.OP_WRITE 的感兴趣
                 // All written so clear OP_WRITE
                 clearOpWrite();
                 // Directly return here so incompleteWrite(...) is not called.
                 return;
             }
    
             // 获得每次写入的最大字节数
             // Ensure the pending writes are made of ByteBufs only.
             int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
             // 从内存队列中,获得要写入的 ByteBuffer 数组
             ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
             // 写入的 ByteBuffer 数组的个数
             int nioBufferCnt = in.nioBufferCount();
    
             // 写入 ByteBuffer 数组,到对端
             // Always us nioBuffers() to workaround data-corruption.
             // See https://github.com/netty/netty/issues/2761
             switch (nioBufferCnt) {
                 case 0:
                     // We have something else beside ByteBuffers to write so fallback to normal writes.
                     writeSpinCount -= doWrite0(in);
                     break;
                 case 1: {
                     // Only one ByteBuf so use non-gathering write
                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                     // to check if the total size of all the buffers is non-zero.
                     ByteBuffer buffer = nioBuffers[0];
                     int attemptedBytes = buffer.remaining();
                     // 执行 NIO write 调用,写入单个 ByteBuffer 对象到对端
                     final int localWrittenBytes = ch.write(buffer);
                     // 写入字节小于等于 0 ,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写,并返回以结束循环
                     if (localWrittenBytes <= 0) {
                         incompleteWrite(true);
                         return;
                     }
                     // 调整每次写入的最大字节数
                     adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                     // 从内存队列中,移除已经写入的数据( 消息 )
                     in.removeBytes(localWrittenBytes);
                     // 写入次数减一
                     --writeSpinCount;
                     break;
                 }
                 default: {
                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                     // to check if the total size of all the buffers is non-zero.
                     // We limit the max amount to int above so cast is safe
                     long attemptedBytes = in.nioBufferSize();
                     // 执行 NIO write 调用,写入多个 ByteBuffer 到对端
                     final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                     // 写入字节小于等于 0 ,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写,并返回以结束循环
                     if (localWrittenBytes <= 0) {
                         incompleteWrite(true);
                         return;
                     }
                     // 调整每次写入的最大字节数
                     // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                     adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
                     // 从内存队列中,移除已经写入的数据( 消息 )
                     in.removeBytes(localWrittenBytes);
                     // 写入次数减一
                     --writeSpinCount;
                     break;
                 }
             }
         } while (writeSpinCount > 0); // 循环自旋写入
    
         // 内存队列中的数据未完全写入,说明 NIO Channel 不可写,所以注册 SelectionKey.OP_WRITE ,等待 NIO Channel 可写
         incompleteWrite(writeSpinCount < 0);
     }
    }
    

3.Unsafe

Unsafe函数不允许被用户代码使用,这些函数是真正用于数据传输操作,必须被IO线程调用

序号 接口 注释
1 ChannelHandlerInvoker invoker(); 返回默认使用的ChannelHandlerInvoker
2 SocketAddress localAddress(); 返回本地绑定的Socket地址
3 SocketAddress remoteAddress(); 返回通信对端的Socket地址
4 void register(ChannelPromise promise); 注册Channel到多路复用器,一旦注册操作完成,通知ChannelFuture
5 void bind(SocketAddress localAddress, ChannelPromise promise); 绑定指定的本地地址localAddress到当前的Channel上,一旦完成,通知ChannelFuture
6 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); 绑定本地的localAddress到当前的Channel上,连接服务器,一旦操作完成,通知ChannelFuture
7 void disconnect(ChannelPromise promise); 断开Channel连接,一旦完成,通知ChannelFuture
8 void close(ChannelPromise promise); 关闭Channel连接,一旦完成,通知ChannelFuture
9 void closeForcibly(); 强制立即关闭连接
10 void beginRead(); 设置网络操位为读用于读取消息
11 void write(Object msg, ChannelPromise promise); 发送消息,一旦完成,通知ChannelFuture
12 void flush(); 将发送缓存数据中的消息写入Channel中
13 ChannelPromise voidPromise(); 返回一个特殊的可重用和传递的ChannelPromise,它不用于操作成功或者失败的通知器,仅仅作为一个容器被使用
14 ChannelOutboundBuffer outboundBuffer(); 返回消息发送缓冲区

4.Unsafe源码分析

Unsafe继承关系图

Unsafe读写操作 Unsafe读写操作

4.1 AbstractUnsafe

  1. register方法

用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上.

/**
 * {@link Unsafe} implementation which sub-classes must extend and use.
 */
protected abstract class AbstractUnsafe implements Unsafe {
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 校验传入的 eventLoop 非空
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        // 校验未注册
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        // 校验 Channel 和 eventLoop 匹配
        if (!isCompatible(eventLoop)) {
            promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        // 设置 Channel 的 eventLoop 属性
        AbstractChannel.this.eventLoop = eventLoop;

        // 在 EventLoop 中执行注册逻辑
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            // 由用户线程或者其他线程发起的注册操作
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread() + ": register");

                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() // TODO 1001 Promise
                    || !ensureOpen(promise)) { // 确保 Channel 是打开的
                return;
            }
            // 记录是否为首次注册
            boolean firstRegistration = neverRegistered;

            // 执行注册逻辑
            doRegister();

            // 标记首次注册为 false
            neverRegistered = false;
            // 标记 Channel 为已注册
            registered = true;

            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();

            // 回调通知 `promise` 执行成功
            safeSetSuccess(promise);

            // 触发通知已注册事件
            pipeline.fireChannelRegistered();

            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
  1. bind方法 用于绑定指定的端口,对于服务端,用于绑定监听端口;对于客户端,主要用于指定客户端Channel的本地绑定Socket地址
protected abstract class AbstractUnsafe implements Unsafe {
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        // 判断是否在 EventLoop 的线程中.
        assertEventLoop();

        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }

        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
            // Warn a user about the fact that a non-root user can't receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can't receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }

        // 记录 Channel 是否激活
        boolean wasActive = isActive();

        // 绑定 Channel 的端口
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }

        // 若 Channel 是新激活的,触发通知 Channel 已激活的事件.
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }

        // 回调通知 promise 执行成功
        safeSetSuccess(promise);
    }
}
  1. disconnect方法 用于客户端或者服务端主动关闭连接
protected abstract class AbstractUnsafe implements Unsafe {
    @Override
    public final void disconnect(final ChannelPromise promise) {
        assertEventLoop();

        if (!promise.setUncancellable()) {
            return;
        }

        boolean wasActive = isActive();
        try {
            doDisconnect();
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }

        if (wasActive && !isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelInactive();
                }
            });
        }

        safeSetSuccess(promise);
        closeIfClosed(); // doDisconnect() might have closed the channel
    }
}
  1. close 关闭链路
protected abstract class AbstractUnsafe implements Unsafe {
    @Override
    public final void close(final ChannelPromise promise) {
        assertEventLoop();

        // 关闭
        close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
    }

    private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) {
        // 设置 Promise 不可取消
        if (!promise.setUncancellable()) {
            return;
        }

        // 若关闭已经标记初始化
        if (closeInitiated) {
            // 关闭已经完成,直接通知 Promise 对象
            if (closeFuture.isDone()) {
                // Closed already.
                safeSetSuccess(promise);
            // 关闭未完成,通过监听器通知 Promise 对象
            } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                // This means close() was called before so we just register a listener and return
                closeFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        promise.setSuccess();
                    }
                });
            }
            return;
        }

        // 标记关闭已经初始化
        closeInitiated = true;

        // 获得 Channel 是否激活
        final boolean wasActive = isActive();
        // 标记 outboundBuffer 为空
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
        // 执行准备关闭
        Executor closeExecutor = prepareToClose();
        // 若 closeExecutor 非空
        if (closeExecutor != null) {
            closeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 在 closeExecutor 中,执行关闭
                        // Execute the close.
                        doClose0(promise);
                    } finally {
                        // 在 EventLoop 中,执行
                        // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                        invokeLater(new Runnable() {
                            @Override
                            public void run() {
                                if (outboundBuffer != null) {
                                    // 写入数据( 消息 )到对端失败,通知相应数据对应的 Promise 失败.
                                    // Fail all the queued messages
                                    outboundBuffer.failFlushed(cause, notify);
                                    // 关闭内存队列
                                    outboundBuffer.close(closeCause);
                                }
                                // 执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
                                fireChannelInactiveAndDeregister(wasActive);
                            }
                        });
                    }
                }
            });
        // 若 closeExecutor 为空
        } else {
            try {
                // 执行关闭
                // Close the channel and fail the queued messages in all cases.
                doClose0(promise);
            } finally {
                if (outboundBuffer != null) {
                    // 写入数据( 消息 )到对端失败,通知相应数据对应的 Promise 失败.
                    // Fail all the queued messages.
                    outboundBuffer.failFlushed(cause, notify);
                    // 关闭内存队列
                    outboundBuffer.close(closeCause);
                }
            }
            // 正在 flush 中,在 EventLoop 中执行执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
            if (inFlush0) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        fireChannelInactiveAndDeregister(wasActive);
                    }
                });
            // 不在 flush 中,直接执行执行取消注册,并触发 Channel Inactive 事件到 pipeline 中
            } else {
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }

    private void doClose0(ChannelPromise promise) {
        try {
            // 执行关闭
            doClose();
            // 通知 closeFuture 关闭完成
            closeFuture.setClosed();
            // 通知 Promise 关闭成功
            safeSetSuccess(promise);
        } catch (Throwable t) {
            // 通知 closeFuture 关闭完成
            closeFuture.setClosed();
            // 通知 Promise 关闭异常
            safeSetFailure(promise, t);
        }
    }
}
  1. write 将消息添加到环形发送数组中,并不是真正的写Channel
protected abstract class AbstractUnsafe implements Unsafe {
    @Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        // 内存队列为空
        if (outboundBuffer == null) {
            // 内存队列为空,一般是 Channel 已经关闭,所以通知 Promise 异常结果
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // 释放消息( 对象 )相关的资源
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            // 过滤写入的消息( 数据 )
            msg = filterOutboundMessage(msg);
            // 计算消息的长度
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            // 通知 Promise 异常结果
            safeSetFailure(promise, t);
            // 释放消息( 对象 )相关的资源
            ReferenceCountUtil.release(msg);
            return;
        }

        // 写入消息( 数据 )到内存队列
        outboundBuffer.addMessage(msg, size, promise);
    }
}
  1. flush 负责将发送缓冲区中待发送的消息全部写入Channel中,并发送给通信对方 ```java protected abstract class AbstractUnsafe implements Unsafe { @Override public final void flush() {

     assertEventLoop();
    
     // 内存队列为 null ,一般是 Channel 已经关闭,所以直接返回.
     ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     if (outboundBuffer == null) {
         return;
     }
    
     // 标记内存队列开始 flush
     outboundBuffer.addFlush();
     // 执行 flush
     flush0();
    

    }

    protected void flush0() {

     // 正在 flush 中,所以直接返回.
     if (inFlush0) {
         // Avoid re-entrance
         return;
     }
    
     // 内存队列为 null ,一般是 Channel 已经关闭,所以直接返回.
     // 内存队列为空,无需 flush ,所以直接返回
     final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
     if (outboundBuffer == null || outboundBuffer.isEmpty()) {
         return;
     }
    
     // 标记正在 flush 中.
     inFlush0 = true;
    
     // 若未激活,通知 flush 失败
     // Mark all pending write requests as failure if the channel is inactive.
     if (!isActive()) {
         try {
             if (isOpen()) {
                 outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
             } else {
                 // Do not trigger channelWritabilityChanged because the channel is closed already.
                 outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
             }
         } finally {
             // 标记不在 flush 中.
             inFlush0 = false;
         }
         return;
     }
    
     // 执行真正的写入到对端
     try {
         doWrite(outboundBuffer);
     } catch (Throwable t) {
         if (t instanceof IOException && config().isAutoClose()) {
             /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
             close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
         } else {
             try {
                 shutdownOutput(voidPromise(), t);
             } catch (Throwable t2) {
                 close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
             }
         }
     } finally {
         // 标记不在 flush 中.
         inFlush0 = false;
     }
    

    } }

public final class ChannelOutboundBuffer { /**

 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
 * and so you will be able to handle them.
 */
public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        // 若 flushedEntry 为空,赋值为 unflushedEntry ,用于记录第一个( 开始 ) flush 的 Entry .
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        // 计算 flush 的数量,并设置每个 Entry 对应的 Promise 不可取消
        do {
            // 增加 flushed
            flushed ++;
            // 设置 Promise 不可取消
            if (!entry.promise.setUncancellable()) { // 设置失败
                // 减少 totalPending 计数
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            // 获得下一个 Entry
            entry = entry.next;
        } while (entry != null);

        // 设置 unflushedEntry 为空,表示所有都 flush
        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

}


### 4.2 AbstractNioUnsafe

1. connect
```java
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
    @Override
    public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }

        try {
            // 目前有正在连接远程地址的 ChannelPromise ,则直接抛出异常,禁止同时发起多个连接.
            if (connectPromise != null) {
                // Already a connect in process.
                throw new ConnectionPendingException();
            }

            // 记录 Channel 是否激活
            boolean wasActive = isActive();

            // 执行连接远程地址
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);
            } else {
                // 记录 connectPromise
                connectPromise = promise;
                // 记录 requestedRemoteAddress
                requestedRemoteAddress = remoteAddress;

                // 使用 EventLoop 发起定时任务,监听连接远程地址超时.若连接超时,则回调通知 connectPromise 超时异常.
                // Schedule connect timeout.
                int connectTimeoutMillis = config().getConnectTimeoutMillis(); // 默认 30 * 1000 毫秒
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                            ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                close(voidPromise());
                            }
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                // 添加监听器,监听连接远程地址取消.
                promise.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isCancelled()) {
                            // 取消定时任务
                            if (connectTimeoutFuture != null) {
                                connectTimeoutFuture.cancel(false);
                            }
                            // 置空 connectPromise
                            connectPromise = null;
                            close(voidPromise());
                        }
                    }
                });
            }
        } catch (Throwable t) {
            // 回调通知 promise 发生异常
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        if (promise == null) {
            // Closed via cancellation and the promise has been notified already.
            return;
        }

        // 获得 Channel 是否激活
        // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
        // We still need to ensure we call fireChannelActive() in this case.
        boolean active = isActive();

        // 回调通知 promise 执行成功
        // trySuccess() will return false if a user cancelled the connection attempt.
        boolean promiseSet = promise.trySuccess();

        // 若 Channel 是新激活的,触发通知 Channel 已激活的事件.
        // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
        // because what happened is what happened.
        if (!wasActive && active) {
            pipeline().fireChannelActive();
        }

        // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
        if (!promiseSet) {
            close(voidPromise());
        }
    }
}
  1. finishConnect

    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
     @Override
     public final void finishConnect() {
         // Note this method is invoked by the event loop only if the connection attempt was
         // neither cancelled nor timed out.
         // 判断是否在 EventLoop 的线程中.
         assert eventLoop().inEventLoop();
    
         try {
             // 获得 Channel 是否激活
             boolean wasActive = isActive();
             // 执行完成连接
             doFinishConnect();
             // 通知 connectPromise 连接完成
             fulfillConnectPromise(connectPromise, wasActive);
         } catch (Throwable t) {
             // 通知 connectPromise 连接异常
             fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
         } finally {
             // 取消 connectTimeoutFuture 任务
             // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
             // See https://github.com/netty/netty/issues/1770
             if (connectTimeoutFuture != null) {
                 connectTimeoutFuture.cancel(false);
             }
             // 置空 connectPromise
             connectPromise = null;
         }
     }
    }
    

4.3 NioByteUnsafe

  1. read

    protected class NioByteUnsafe extends AbstractNioUnsafe {
     @Override
     @SuppressWarnings("Duplicates")
     public final void read() {
         final ChannelConfig config = config();
         // 若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣.
         if (shouldBreakReadReady(config)) {
             clearReadPending();
             return;
         }
         final ChannelPipeline pipeline = pipeline();
         final ByteBufAllocator allocator = config.getAllocator();
         // 获得 RecvByteBufAllocator.Handle 对象
         final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
         // 重置 RecvByteBufAllocator.Handle 对象
         allocHandle.reset(config);
    
         ByteBuf byteBuf = null;
         boolean close = false; // 是否关闭连接
         try {
             do {
                 // 申请 ByteBuf 对象
                 byteBuf = allocHandle.allocate(allocator);
                 // 读取数据
                 // 设置最后读取字节数
                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
                 // <1> 未读取到数据
                 if (allocHandle.lastBytesRead() <= 0) {
                     // 释放 ByteBuf 对象
                     // nothing was read. release the buffer.
                     byteBuf.release();
                     // 置空 ByteBuf 对象
                     byteBuf = null;
                     // 如果最后读取的字节为小于 0 ,说明对端已经关闭
                     close = allocHandle.lastBytesRead() < 0;
                     // TODO
                     if (close) {
                         // There is nothing left to read as we received an EOF.
                         readPending = false;
                     }
                     // 结束循环
                     break;
                 }
    
                 // <2> 读取到数据
    
                 // 读取消息数量 + localRead
                 allocHandle.incMessagesRead(1);
                 readPending = false;
                 // 触发 Channel read 事件到 pipeline 中
                 pipeline.fireChannelRead(byteBuf);
                 // 置空 ByteBuf 对象
                 byteBuf = null;
             } while (allocHandle.continueReading()); // 循环判断是否继续读取
    
             // 读取完成
             allocHandle.readComplete();
             // 触发 Channel readComplete 事件到 pipeline 中.
             pipeline.fireChannelReadComplete();
    
             if (close) {
                 closeOnRead(pipeline);
             }
         } catch (Throwable t) {
             handleReadException(pipeline, byteBuf, t, close, allocHandle);
         } finally {
             // Check if there is a readPending which was not processed yet.
             // This could be for two reasons:
             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
             //
             // See https://github.com/netty/netty/issues/2254
             if (!readPending && !config.isAutoRead()) {
                 removeReadOp();
             }
         }
     }
    }
    

results matching ""

    No results matching ""