Netty概述
1.介绍
1.1Netty特点
- 设计
- 针对多种传输类型的统一接口 - 阻塞和非阻塞
- 简单但更强大的线程模型
- 真正的无连接的数据报套接字支持
- 链接逻辑支持复用
- 易用性
- 大量的 Javadoc 和 代码实例
- 除了在 JDK 1.6 + 额外的限制.(一些特征是只支持在Java 1.7 +.可选的功能可能有额外的限制.)
- 性能
- 比核心 Java API 更好的吞吐量,较低的延时
- 资源消耗更少,这个得益于共享池和重用 减少内存拷贝
- 健壮性
- 消除由于慢,快,或重载连接产生的 OutOfMemoryError
- 消除经常发现在 NIO 在高速网络中的应用中的不公平的读/写比
- 安全
- 完整的 SSL / TLS 和 StartTLS 的支持
- 运行在受限的环境例如 Applet 或 OSGI
- 社区
- 发布的更早和更频繁
- 社区驱动
2.服务端
2.1 Netty服务端创建时序图
@startuml
title Netty服务端创建时序图
autonumber
actor "用户" as User
activate User
User -> ServerBootstrap: 创建ServerBootstrap实例()
activate ServerBootstrap
ServerBootstrap -> EventLoopGroup: 设置并绑定Reactor线程池()
activate EventLoopGroup
EventLoopGroup -> NioServerSocketChannel: 设置并绑定服务器端Channel()
EventLoopGroup -> ChannelPipeline: TCP链路建立的时候创建ChannelPipeline
deactivate EventLoopGroup
ServerBootstrap -> ChannelHandler: 添加并设置ChannelHandler
ServerBootstrap -> ServerBootstrap: 绑定并启动监听窗口
deactivate ServerBootstrap
deactivate User
activate EventLoopGroup
EventLoopGroup -> EventLoopGroup: Selector轮询()
EventLoopGroup -> ChannelPipeline: 网络事件通知()
activate ChannelPipeline
ChannelPipeline -> ChannelHandler: 执行netty系统和业务HandlerChannel()
deactivate ChannelPipeline
activate EventLoopGroup
@enduml

服务端线程模型

Handler模型

2.2 开发过程
创建ServerBootstrap实例
ServerBootstrap b=new ServerBootstrap();ServerBootstrap是Netty服务器端的启动辅助类,提供了一系列的方法用于设置服务器端启动相关的参数.
设置并绑定Reactor线程池
//用于服务器端接受客户端的连接 EventLoopGroup bossGruop=new NioEventLoopGroup(); //用于网络事件的处理 EventLoopGroup workGroup=new NioEventLoopGroup();Netty的线程池是EventLoopGroup,它实际上是EventLoop的数组,EventLoop职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作是由绑定的EventLoop线程run方法驱动.
设置并绑定服务器端Channel
b.group(bossGruop, workGroup).channel(NioServerSocketChannel.class)Netty对原生的NIO类库进行封装,作为NIO服务端,需要创建ServerSocketChannel,对应的实现是NioServerSocketChannel.
链路建立的时候创建并初始化ChannelPipeline
b.group(bossGruop, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>(){});ChannelPipeline的本质是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler.网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据Channel|Handler的执行策略调度ChannelHandler的执行.
典型的网络事件有:
- 链路注册
- 链路激活
- 链路断开
- 接收到请求信息
- 请求信息接收并处理完毕
- 发送应答消息
- 链路发生异常
- 用户自定义事件
添加并设置ChannelHandler
b.group(bossGruop, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception{ arg0.pipeline().addLast(new HelloServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 1024);ChannelHandler是Netty提供给用户定制和扩展的接口
- 系统编解码框架 —— ByteToMessageCodec
- 通用基于长度的半包解码器 —— LengthFieldBasedFrameDecoder
- 码流日志打印 Handler —— LoggingHandler
- SSL安全日志 Handler —— SslHandler
- 连裤空闲检测 Handler —— IdleStateHandler
- 流量整形 Handler —— ChannelTrafficShapingHandler
- Base64编解码 —— Base64Decoder和Base64Encoder
绑定并启动监听窗口
ChannelFuture f=b.bind(port).sync();经过一系列初始化和检测工作后,会启动监听端口,并将ServerSocketChannel注册到Selector上监听客户端连接
Selector轮询 由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合
当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法,最终调度并执行ChannelHandler
public class HelloServerHandler extends ChannelHandlerAdapter{}执行Netty系统ChannelHandler和用户添加定制的ChannelHandler,ChannelPipeline根据网络事件的类型,调度并执行ChannelHandler
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; }
2.3 示例代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//Netty服务器端
public class HelloServer{
private int port;
public HelloServer(int port){
super();
this.port = port;
}
private void bind() throws InterruptedException{
//用于服务器端接受客户端的连接
EventLoopGroup bossGruop=new NioEventLoopGroup();
//用于网络事件的处理
EventLoopGroup workGroup=new NioEventLoopGroup();
try{
ServerBootstrap b=new ServerBootstrap();
b.group(bossGruop, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel arg0) throws Exception{
arg0.pipeline().addLast(new HelloServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 1024);//指定此套接口排队的最大连接个数
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
}finally{
bossGruop.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException{
new HelloServer(8080).bind();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//自定义的ChannelHandler
public class HelloServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
System.out.println("客户端连上了...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf=(ByteBuf) msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
System.out.println("服务器端接收的消息:"+new String(req));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
ctx.close();
}
}
3.客户端
3.1 Netty客户端创建时序图
@startuml
title Netty客户端创建时序图
autonumber
actor "客户端" as User
activate User
User -> Bootstrap: 创建客户端
activate Bootstrap
Bootstrap -> NioEventLoopGroup: 创建I/O线程池()
activate NioEventLoopGroup
NioEventLoopGroup -> NioSocketChannel: 设置NioSocketChannel()
NioSocketChannel -> ChannelPipeline: 创建默认的Default Pipeline()
activate NioSocketChannel
NioSocketChannel -> NioSocketChannel: 异步发起TCP连接()
NioSocketChannel -> NioEventLoopGroup: 注册连接操作到多路复用器
deactivate NioSocketChannel
NioEventLoopGroup -> NioEventLoopGroup: 处理连接结果事件
NioEventLoopGroup -> ChannelPipeline: 发起连接成功事件
activate ChannelPipeline
ChannelPipeline -> ChannelHandler: 调用用户ChannelHandler()
deactivate ChannelPipeline
activate NioEventLoopGroup
deactivate Bootstrap
@enduml

3.2 开发过程
用户线程创建Bootstrap
Bootstrap b = new Bootstrap();Bootstrap是Socket客户端创建工具类,通过API设置创建客户端相关的参数,异步发起客户端连接.
创建处理客户端连接、IO读写的Reactor线程组NioEventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();通过Bootstrap的ChannelFactory和用户指定的Channel类型创建用于客户端连接的NioSocketChannel
b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)此处的NioSocketChannel类似于Java NIO提供的SocketChannel.
创建默认的channel Handler pipeline
b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>(){ @Override public void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new HelloClientHandler()); } });用于调度和执行网络事件.
异步发起TCP连接
// 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync();SocketChannel执行connect()操作后有以下三种结果:
- 连接成功,然会true - 暂时没有连接上,服务器端没有返回ACK应答,连接结果不确定,返回false.此种结果下,需要将NioSocketChannel中的selectionKey设置为 - OP_CONNECT,监听连接结果 - 接连失败,直接抛出I/O异常由多路复用器在I/O中轮询个Channel,处理连接结果
- 如果连接成功,设置Future结果,发送连接成功事件,触发ChannelPipeline执行
- 由ChannelPipeline调度执行系统和用户的ChannelHandler,执行业务逻辑
3.3 示例代码
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class HelloClient{
public void connect(int port, String host) throws Exception{
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new HelloClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port = 8080;
new HelloClient().connect(port, "127.0.0.1");
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class HelloClientHandler extends ChannelHandlerAdapter{
private final ByteBuf message;
public HelloClientHandler(){
byte[] req="hello Netty".getBytes();
message=Unpooled.buffer(req.length);
message.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{
ctx.close();
}
}