初识Netty
Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty能做什么?
高性能RPC通信框架底层数据传输,如Dubbo
服务器开发,如代理服务器
游戏行业基于客户端/服务端的数据通信
... ...
Java IO模型
Netty是基于Java NIO开发的,我们先来了解下Java的IO模型。
BIO
同步阻塞IO模式下,每个客户端发起一个请求,服务端都会开启一个新的线程来处理。并发量肯定受到影响。
Java实现BIO示例代码
public static void main(String[] args) {
try {
//监听8080端口
final ServerSocket ss = new ServerSocket(8080);
while (true) {
//阻塞等待外部连接到来
Socket s = ss.accept();
//连接建立后,开始读数据
InputStream is = s.getInputStream();
int i = 0;
//阻塞直到有数据可读(基于流)
while ((i = is.read()) != -1) {
System.out.print((char) i);
}
s.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}复制
NIO
同步非阻塞IO模式下,服务端可以通过selector IO多路复用,实现一个服务端线程服务于多个客户端线程。主要体现在一个服务端线程的selector会轮询所有已注册上的连接,并监听IO事件。
NIO的三大核心:Buffer、Channel、Selector
Buffer缓存区,底层实际是数组,其有几个重要的变量及方法:position、limit、capacity;flip(),clear()
position是一个数组下标,用于表示读或者写时的位置
limit表示读或者写的最大位置,写入时当position等于limit则不会再写入
我们看下在申请缓存区时的allocate方法,可以看到初始化时position等于0,limit等于capacity
//分配缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(256);
//初始化
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
//HeapByteBuffer 构造方法
HeapByteBuffer(int cap, int lim) {
//第2个参数为position
super(-1, 0, lim, cap, new byte[cap], 0);
}复制
从channel读数据写入buffer
在写入数据时,position会改变位置指向下一个要写入的位置,当数据写完后,我们要从buffer中读取数据时,由于position此时已经指向了下一个要写入的位置,则需要先把position的值赋给limit,然后把position置为0,此时再读就可以读取刚写入的数据,即position到limit的数据。
buffer中flip()方法源码:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}复制
读之前需调用flip()方法
数据读完之后,再写数据,此时position为下一个要读取的位置,则需要把position置为0,capacity赋值给limit,此时再写即从0开始到limit。
buffer中clear()方法源码:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}复制
读取完毕后,调用clear()方法。
通过上面的介绍,我们可以看到一般在读之前会调用flip(),在写之前会调用clear()。
AIO
异步非阻塞IO模式,由操作系统负责完成服务端与客户端之间的读写,不再需要线程去轮询IO状态改变,当有事件处理时直接通知服务端线程处理。
NIO和AIO的区别是,NIO会通过线程主动轮询IO事件,而AIO则是交给操作系统来处理,有IO事件主动通知应用线程。
通过两段代码来对比
NIO服务端实现
//创建channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(PORT));
//创建selector 监听accept事件
Selector acceptorSel = Selector.open();
ssc.register(acceptorSel, SelectionKey.OP_ACCEPT);
//轮询所有IO事件
while (true) {
//线程阻塞
acceptorSel.select();
Set<SelectionKey> set = acceptorSel.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
//遍历事件key
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
//收到请求建立连接的事件
if (sk.isAcceptable()) {
//事件处理...
}
}
}复制
AIO服务端实现
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP, PORT));
//注册事件和事件完成后的处理器
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
//IO处理完成回调方法
}
@Override
public void failed(Throwable exc, Object attachment) {
//处理异常回调
}
});复制
Netty原理
有了JavaNIO为什么还要用Netty框架
Java NIO存在epoll bug,导致selector空轮询
网络编程过程中,存在很多特殊场景,如粘包拆包问题,网络异常重试等,都需要手动处理
对于常见传输协议,编解码需要开发工作量大
单Reactor单线程模型
Reactor线程负责监听业务客户端请求事件,并分发给相应的处理器完成业务处理。这种单线程模型下,比较简单,如果业务处理中存在过多IO操作,肯定会阻塞后续请求处理。我们经常使用的Redis,它就是基于这种线程模型实现的,由于Redis是纯内存访问,因此并没有觉得处理慢。
单Reactor多线程模型
Reactor负责监听客户端请求事件,并分发给相应的处理器完成业务处理。这里不同的是对于业务的处理会委派给worker线程池来处理,能够充分发挥多核cpu的优势,但Reactor要负责监听所有事件和处理响应,在高并发场景下存在性能瓶颈。
主从Reactor多线程模型
存在多个Reactor,每个Reactor都有自己的selector。MainReactor负责监听客户端连接,后续将交由SubReactor处理监听IO事件。这种模式下,主从Reactor分工明确,主Reactor只需要把连接传给从Reactor,多个从Reactor能够提高并发度
Netty线程模型
Netty默认有两组线程池,一个是BossGroup,对比我们上面的主Reactor,一个是WorkGroup,对比我们上面的从Reactor,两者都是NioEventLoopGroup,每个NioEventLoopGroup中包含多个NioEventLoop,NioEventLoop可以理解为一个线程,来监听注册在其上的Channel。BossGroup主要负责和客户端建立连接,WorkGroup主要处理连接上的读写请求。每组线程池中包含多个处理线程,每个线程中都有一个Selector来监听注册在上面的Channel。
为方便理解,我们用一段代码来模拟这个过程
//主Reactor
public class NioAcceptorServer {
Selector acceptorSel = null;
NioWorker[] workers = null;
int workerPoint = 0;
public NioAcceptorServer(int workerNum) {
//模拟生成N个Worker
workers = new NioWorker[workerNum];
for (int i = 0; i < workerNum; i++) {
workers[i] = new NioWorker();
workers[i].start();
}
}
//获取下一个Worker
public NioWorker nextWorker() {
workerPoint++;
int i = workerPoint % workers.length;
return workers[i];
}
public void run() {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(8080));
//注册监听Accept事件
acceptorSel = Selector.open();
ssc.register(acceptorSel, SelectionKey.OP_ACCEPT);
while (true) {
//阻塞等待
acceptorSel.select();
Set<SelectionKey> set = acceptorSel.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
//收到请求建立连接的事件
if (sk.isAcceptable()) {
ServerSocketChannel ssc1 = (ServerSocketChannel) sk.channel();
//建立连接
SocketChannel sc = ssc1.accept();
// 选择并分发请求到一个NioWorker
NioWorker worker = nextWorker();
worker.register(sc);
}
}
}
} catch (ClosedChannelException e1) {
e1.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}复制
//从Reactor
public class NioWorker extends Thread {
private Selector selector = null;
public NioWorker() {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void register(SocketChannel sc) {
try {
sc.configureBlocking(false);
ByteBuffer bb = ByteBuffer.allocate(4);
//注册监听Read事件
sc.register(selector, SelectionKey.OP_READ, bb);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
//阻塞等待事件
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
//处理读事件
if (sk.isReadable()) {
SocketChannel sc = (SocketChannel) sk.channel();
ByteBuffer bb = (ByteBuffer) sk.attachment();
try {
//数据读取非阻塞,可以触发多次 OP_READ 事件才完成数据传输
int i = sc.read(bb);
if (i == -1) {
//end of file,读取结束...
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}复制
Handler处理器
Netty中每个Channel都有与之对应的ChannelPipeline,每个ChannelPipeline维护了一个ChannelHandlerContext组成的双向链表,ChannelHandlerContext包装了对ChannelHandler的封装。通过责任链模式,ChannelHandler在处理完一个IO事件后,会传递给下一个ChannelHandler。
ChannelHandler分为ChannelInboundHandler处理入IO事件、ChannelOutboundHandler处理出IO事件。比如我们开发一个Netty服务器,处理请求则会进入ChannelInboundHandler链中处理,处理响应则会进入ChannelOutboundHandler链中处理。
在Netty开发中,通常我们会通过ChannelPipeline的addLast()等方法来注入出和入handler执行链,这里就需要注意它的执行顺序,也是会按照add的顺序,分别找出handler或者入handler来处理IO事件。
Handler串行化设计,从消息的读取、编码以及后续Handler的执行,始终都由IO线程NioEventLoop负责,这就意味着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。
Netty实战
下面我将用简短的代码,示范一个基于Netty实现的HTTP服务器
我们首先考虑要实现一个HTTP服务器,需要做哪些事:
1、定义主线程池用于处理客户端连接,以及从线程池用于处理IO事件;
2、考虑一些必要的基础配置,如连接数据缓冲器,内存分配器等;
3、HTTP请求解码,可通过HttpRequestDecoder完成;
4、处理HTTP请求完成相应业务逻辑处理;
5、HTTP响应编码,可通过HttpResponseEncoder完成;
6、响应HTTP请求到客户端。
public class NettyServer {
public void startServer() throws Exception {
//初始化boss和worker线程池
int defaultThreadNum = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup bossGroup = new NioEventLoopGroup(defaultThreadNum,
new ThreadFactoryBuilder().setNameFormat("boss-group").build());
EventLoopGroup workerGroup = new NioEventLoopGroup(defaultThreadNum,
new ThreadFactoryBuilder().setNameFormat("worker-group").build());
//创建启动类
ServerBootstrap b = new ServerBootstrap();
//允许重复使用本地地址和端口
b.option(ChannelOption.SO_REUSEADDR, true);
//设置内存分配器,可以重复利用之前分配的内存空间
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//接收缓冲器,AdaptiveRecvByteBufAllocator能够根据上一次接收数据的大小,来自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费
b.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
//true关闭Nagle算法,Nagle算法尽可能发送大块数据,避免网络中充斥着许多小数据块,要求高实时性,有数据发送时就马上发送,就将该选项设置为true
b.childOption(ChannelOption.TCP_NODELAY, true);
//设置线程组
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//注册handler处理器
.childHandler(
new DispatchHandler()
);
//绑定端口并注册异步处理监听器
b.bind(8080).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("Netty 服务器成功启动");
} else {
log.error("Netty 服务器启动失败, 原因: ", future.cause());
}
}
}).await();
}
}复制
public class DispatchHandler extends ChannelInitializer<SocketChannel> {
private final int maxAggregateSize = 1024 * 1024 * 5;
private final int httpClientCodecMaxInitialLineLength = 4096;
private final int httpClientCodecMaxHeaderSize = 64 * 1024;
private final int httpClientCodecMaxChunkSize = 128 * 1024;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//HTTP请求解码处理器
p.addLast(new HttpRequestDecoder(httpClientCodecMaxInitialLineLength, httpClientCodecMaxHeaderSize, httpClientCodecMaxChunkSize));
//HTTP响应编码处理器
p.addLast(new HttpResponseEncoder());
//定义缓冲数据量,通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,需要绑定HttpObjectAggregator
p.addLast(new HttpObjectAggregator(maxAggregateSize));
//HTTP业务逻辑处理器
p.addLast(new HttpBusinessHandler());
}
}复制
public class HttpBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest httpRequest = (FullHttpRequest) msg;
//完成业务逻辑处理...
//...
//向客户端响应内容
String responseMessage = "ok";
//FullHttpResponse可以设置响应头,响应状态码等基本信息
FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1,
HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseMessage
.getBytes()));
ctx.writeAndFlush(httpResponse);
}
}复制
Netty实战经验总结
多数情况下,Netty自带基础handler满足大部分业务场景可供我们灵活选择,开发人员只需关注业务处理器的逻辑
时间可控的简单业务可直接在IO线程上处理
复杂和时间不可控的业务建议投递到后端业务线程池统一处理
业务线程避免直接操作ChannelHandler