Netty概述

1.介绍

1.1Netty特点

  • 设计
    • 针对多种传输类型的统一接口 - 阻塞和非阻塞
    • 简单但更强大的线程模型
    • 真正的无连接的数据报套接字支持
    • 链接逻辑支持复用
  • 易用性
    • 大量的 Javadoc 和 代码实例
    • 除了在 JDK 1.6 + 额外的限制.(一些特征是只支持在Java 1.7 +.可选的功能可能有额外的限制.)
  • 性能
    • 比核心 Java API 更好的吞吐量,较低的延时
    • 资源消耗更少,这个得益于共享池和重用 减少内存拷贝
  • 健壮性
    • 消除由于慢,快,或重载连接产生的 OutOfMemoryError
    • 消除经常发现在 NIO 在高速网络中的应用中的不公平的读/写比
  • 安全
    • 完整的 SSL / TLS 和 StartTLS 的支持
    • 运行在受限的环境例如 Applet 或 OSGI
  • 社区
    • 发布的更早和更频繁
    • 社区驱动

2.服务端

2.1 Netty服务端创建时序图

@startuml
title Netty服务端创建时序图
autonumber

actor "用户" as User

activate User
User -> ServerBootstrap: 创建ServerBootstrap实例()

activate ServerBootstrap
ServerBootstrap -> EventLoopGroup: 设置并绑定Reactor线程池()

activate EventLoopGroup
EventLoopGroup -> NioServerSocketChannel: 设置并绑定服务器端Channel()
EventLoopGroup -> ChannelPipeline: TCP链路建立的时候创建ChannelPipeline
deactivate EventLoopGroup

ServerBootstrap -> ChannelHandler: 添加并设置ChannelHandler
ServerBootstrap -> ServerBootstrap: 绑定并启动监听窗口
deactivate ServerBootstrap
deactivate User

activate EventLoopGroup
EventLoopGroup -> EventLoopGroup: Selector轮询()
EventLoopGroup -> ChannelPipeline: 网络事件通知()

activate ChannelPipeline
ChannelPipeline -> ChannelHandler: 执行netty系统和业务HandlerChannel()
deactivate ChannelPipeline
activate EventLoopGroup

@enduml

Netty服务端创建时序图

服务端线程模型 Netty服务端线程模型

Handler模型 Netty服务端Handler模型

2.2 开发过程

  1. 创建ServerBootstrap实例

     ServerBootstrap b=new ServerBootstrap();
    

    ServerBootstrap是Netty服务器端的启动辅助类,提供了一系列的方法用于设置服务器端启动相关的参数.

  2. 设置并绑定Reactor线程池

     //用于服务器端接受客户端的连接
     EventLoopGroup bossGruop=new NioEventLoopGroup();
     //用于网络事件的处理
     EventLoopGroup workGroup=new NioEventLoopGroup();
    

    Netty的线程池是EventLoopGroup,它实际上是EventLoop的数组,EventLoop职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作是由绑定的EventLoop线程run方法驱动.

  3. 设置并绑定服务器端Channel

     b.group(bossGruop, workGroup).channel(NioServerSocketChannel.class)
    

    Netty对原生的NIO类库进行封装,作为NIO服务端,需要创建ServerSocketChannel,对应的实现是NioServerSocketChannel.

  4. 链路建立的时候创建并初始化ChannelPipeline

     b.group(bossGruop, workGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new ChannelInitializer<SocketChannel>(){});
    

    ChannelPipeline的本质是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler.网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据Channel|Handler的执行策略调度ChannelHandler的执行.

    典型的网络事件有:

    • 链路注册
    • 链路激活
    • 链路断开
    • 接收到请求信息
    • 请求信息接收并处理完毕
    • 发送应答消息
    • 链路发生异常
    • 用户自定义事件
  5. 添加并设置ChannelHandler

     b.group(bossGruop, workGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new ChannelInitializer<SocketChannel>(){
         @Override
         protected void initChannel(SocketChannel arg0) throws Exception{
             arg0.pipeline().addLast(new HelloServerHandler());
         }
     }).option(ChannelOption.SO_BACKLOG, 1024);
    

    ChannelHandler是Netty提供给用户定制和扩展的接口

    • 系统编解码框架 —— ByteToMessageCodec
    • 通用基于长度的半包解码器 —— LengthFieldBasedFrameDecoder
    • 码流日志打印 Handler —— LoggingHandler
    • SSL安全日志 Handler —— SslHandler
    • 连裤空闲检测 Handler —— IdleStateHandler
    • 流量整形 Handler —— ChannelTrafficShapingHandler
    • Base64编解码 —— Base64Decoder和Base64Encoder
  6. 绑定并启动监听窗口

     ChannelFuture f=b.bind(port).sync();
    

    经过一系列初始化和检测工作后,会启动监听端口,并将ServerSocketChannel注册到Selector上监听客户端连接

  7. Selector轮询 由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合

  8. 当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法,最终调度并执行ChannelHandler

     public class HelloServerHandler extends ChannelHandlerAdapter{}
    
  9. 执行Netty系统ChannelHandler和用户添加定制的ChannelHandler,ChannelPipeline根据网络事件的类型,调度并执行ChannelHandler

     @Override
     public ChannelHandlerContext fireChannelRead(final Object msg) {
         if (msg == null) {
             throw new NullPointerException("msg");
         }
    
         final AbstractChannelHandlerContext next = findContextInbound();
         EventExecutor executor = next.executor();
         if (executor.inEventLoop()) {
             next.invokeChannelRead(msg);
         } else {
             executor.execute(new OneTimeTask() {
                 @Override
                 public void run() {
                     next.invokeChannelRead(msg);
                 }
             });
         }
         return this;
     }
    

2.3 示例代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//Netty服务器端
public class HelloServer{
    private int port;

    public HelloServer(int port){
        super();
        this.port = port;
    }

    private void bind() throws InterruptedException{
        //用于服务器端接受客户端的连接
        EventLoopGroup bossGruop=new NioEventLoopGroup();
        //用于网络事件的处理
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap b=new ServerBootstrap();
            b.group(bossGruop, workGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel arg0) throws Exception{
                    arg0.pipeline().addLast(new HelloServerHandler());
                }
            }).option(ChannelOption.SO_BACKLOG, 1024);//指定此套接口排队的最大连接个数
            ChannelFuture f=b.bind(port).sync();
            f.channel().closeFuture().sync();
        }finally{
            bossGruop.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        new HelloServer(8080).bind();
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

//自定义的ChannelHandler
public class HelloServerHandler extends ChannelHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        System.out.println("客户端连上了...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
        ByteBuf buf=(ByteBuf) msg;
        byte[] req=new byte[buf.readableBytes()];
        buf.readBytes(req);
        System.out.println("服务器端接收的消息:"+new String(req));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
        ctx.close();
    }
}

3.客户端

3.1 Netty客户端创建时序图

@startuml
title Netty客户端创建时序图
autonumber

actor "客户端" as User

activate User
User -> Bootstrap: 创建客户端

activate Bootstrap
Bootstrap -> NioEventLoopGroup: 创建I/O线程池()

activate NioEventLoopGroup
NioEventLoopGroup -> NioSocketChannel: 设置NioSocketChannel()
NioSocketChannel -> ChannelPipeline: 创建默认的Default Pipeline()

activate NioSocketChannel
NioSocketChannel -> NioSocketChannel: 异步发起TCP连接()
NioSocketChannel -> NioEventLoopGroup: 注册连接操作到多路复用器
deactivate NioSocketChannel

NioEventLoopGroup -> NioEventLoopGroup: 处理连接结果事件
NioEventLoopGroup -> ChannelPipeline: 发起连接成功事件

activate ChannelPipeline
ChannelPipeline -> ChannelHandler: 调用用户ChannelHandler()
deactivate ChannelPipeline

activate NioEventLoopGroup
deactivate Bootstrap

@enduml

Netty客户端创建时序图

3.2 开发过程

  1. 用户线程创建Bootstrap

     Bootstrap b = new Bootstrap();
    

      Bootstrap是Socket客户端创建工具类,通过API设置创建客户端相关的参数,异步发起客户端连接.

  2. 创建处理客户端连接、IO读写的Reactor线程组NioEventLoopGroup

     EventLoopGroup group = new NioEventLoopGroup();
    
  3. 通过Bootstrap的ChannelFactory和用户指定的Channel类型创建用于客户端连接的NioSocketChannel

     b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
    

      此处的NioSocketChannel类似于Java NIO提供的SocketChannel.

  4. 创建默认的channel Handler pipeline

     b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new ChannelInitializer<SocketChannel>(){
         @Override
         public void initChannel(SocketChannel ch) throws Exception{
             ch.pipeline().addLast(new HelloClientHandler());
         }
     });
    

      用于调度和执行网络事件.

  5. 异步发起TCP连接

     // 发起异步连接操作
     ChannelFuture f = b.connect(host, port).sync();
    

      SocketChannel执行connect()操作后有以下三种结果:

     - 连接成功,然会true
     - 暂时没有连接上,服务器端没有返回ACK应答,连接结果不确定,返回false.此种结果下,需要将NioSocketChannel中的selectionKey设置为 - OP_CONNECT,监听连接结果
     - 接连失败,直接抛出I/O异常
    
  6. 由多路复用器在I/O中轮询个Channel,处理连接结果

  7. 如果连接成功,设置Future结果,发送连接成功事件,触发ChannelPipeline执行
  8. 由ChannelPipeline调度执行系统和用户的ChannelHandler,执行业务逻辑

3.3 示例代码

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class HelloClient{

    public void connect(int port, String host) throws Exception{
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception{
                            ch.pipeline().addLast(new HelloClientHandler());
                        }
                    });

            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally{
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        int port = 8080;
        new HelloClient().connect(port, "127.0.0.1");
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class HelloClientHandler  extends ChannelHandlerAdapter{

    private final ByteBuf message;

    public HelloClientHandler(){
        byte[] req="hello Netty".getBytes();
        message=Unpooled.buffer(req.length);
        message.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
        ctx.close();
    }
}

results matching ""

    No results matching ""