暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty从入门到实战

云筑网技术团队 2022-03-30
585

初识Netty

Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。


Netty能做什么?

  • 高性能RPC通信框架底层数据传输,如Dubbo

  • 服务器开发,如代理服务器

  • 游戏行业基于客户端/服务端的数据通信

  • ... ...


Java IO模型

Netty是基于Java NIO开发的,我们先来了解下Java的IO模型。

BIO

同步阻塞IO模式下,每个客户端发起一个请求,服务端都会开启一个新的线程来处理。并发量肯定受到影响。

Java实现BIO示例代码

public static void main(String[] args) {
        try {
            //监听8080端口
            final ServerSocket ss = new ServerSocket(8080);
            while (true) {
                //阻塞等待外部连接到来
                Socket s = ss.accept(); 

                //连接建立后,开始读数据
                InputStream is = s.getInputStream();
                int i = 0;
                //阻塞直到有数据可读(基于流)
                while ((i = is.read()) != -1) {
                    System.out.print((char) i);
                }
                s.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

复制

NIO

同步非阻塞IO模式下,服务端可以通过selector IO多路复用,实现一个服务端线程服务于多个客户端线程。主要体现在一个服务端线程的selector会轮询所有已注册上的连接,并监听IO事件。

NIO的三大核心:Buffer、Channel、Selector


Buffer缓存区,底层实际是数组,其有几个重要的变量及方法:position、limit、capacity;flip(),clear()

position是一个数组下标,用于表示读或者写时的位置

limit表示读或者写的最大位置,写入时当position等于limit则不会再写入


我们看下在申请缓存区时的allocate方法,可以看到初始化时position等于0,limit等于capacity

//分配缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(256);
//初始化
public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
 }
//HeapByteBuffer 构造方法
HeapByteBuffer(int cap, int lim) {
        //第2个参数为position
        super(-1, 0, lim, cap, new byte[cap], 0);
    
}

复制

从channel读数据写入buffer


在写入数据时,position会改变位置指向下一个要写入的位置,当数据写完后,我们要从buffer中读取数据时,由于position此时已经指向了下一个要写入的位置,则需要先把position的值赋给limit,然后把position置为0,此时再读就可以读取刚写入的数据,即position到limit的数据。

buffer中flip()方法源码:

public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
}

复制

读之前需调用flip()方法

数据读完之后,再写数据,此时position为下一个要读取的位置,则需要把position置为0,capacity赋值给limit,此时再写即从0开始到limit。

buffer中clear()方法源码:

public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
}

复制

读取完毕后,调用clear()方法。


通过上面的介绍,我们可以看到一般在读之前会调用flip(),在写之前会调用clear()。


AIO

异步非阻塞IO模式,由操作系统负责完成服务端与客户端之间的读写,不再需要线程去轮询IO状态改变,当有事件处理时直接通知服务端线程处理。

NIO和AIO的区别是,NIO会通过线程主动轮询IO事件,而AIO则是交给操作系统来处理,有IO事件主动通知应用线程。

通过两段代码来对比

NIO服务端实现

//创建channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(PORT));
//创建selector 监听accept事件
Selector acceptorSel = Selector.open();
ssc.register(acceptorSel, SelectionKey.OP_ACCEPT);
//轮询所有IO事件
while (true) {
   //线程阻塞
   acceptorSel.select();                
   Set<SelectionKey> set = acceptorSel.selectedKeys();
   Iterator<SelectionKey> it = set.iterator();
   //遍历事件key
   while (it.hasNext()) {
       SelectionKey sk = it.next();
       it.remove();
       //收到请求建立连接的事件
       if (sk.isAcceptable()) {
           //事件处理...
       }
    }
}

复制

AIO服务端实现

AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP, PORT));
        //注册事件和事件完成后的处理器
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {

            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
//IO处理完成回调方法
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                //处理异常回调
            }

        });

复制

Netty原理

有了JavaNIO为什么还要用Netty框架

  • Java NIO存在epoll bug,导致selector空轮询

  • 网络编程过程中,存在很多特殊场景,如粘包拆包问题,网络异常重试等,都需要手动处理

  • 对于常见传输协议,编解码需要开发工作量大

单Reactor单线程模型

Reactor线程负责监听业务客户端请求事件,并分发给相应的处理器完成业务处理。这种单线程模型下,比较简单,如果业务处理中存在过多IO操作,肯定会阻塞后续请求处理。我们经常使用的Redis,它就是基于这种线程模型实现的,由于Redis是纯内存访问,因此并没有觉得处理慢。


单Reactor多线程模型

Reactor负责监听客户端请求事件,并分发给相应的处理器完成业务处理。这里不同的是对于业务的处理会委派给worker线程池来处理,能够充分发挥多核cpu的优势,但Reactor要负责监听所有事件和处理响应,在高并发场景下存在性能瓶颈。

主从Reactor多线程模型

存在多个Reactor,每个Reactor都有自己的selector。MainReactor负责监听客户端连接,后续将交由SubReactor处理监听IO事件。这种模式下,主从Reactor分工明确,主Reactor只需要把连接传给从Reactor,多个从Reactor能够提高并发度

Netty线程模型

Netty默认有两组线程池,一个是BossGroup,对比我们上面的主Reactor,一个是WorkGroup,对比我们上面的从Reactor,两者都是NioEventLoopGroup,每个NioEventLoopGroup中包含多个NioEventLoop,NioEventLoop可以理解为一个线程,来监听注册在其上的Channel。BossGroup主要负责和客户端建立连接,WorkGroup主要处理连接上的读写请求。每组线程池中包含多个处理线程,每个线程中都有一个Selector来监听注册在上面的Channel。

为方便理解,我们用一段代码来模拟这个过程

//主Reactor
public class NioAcceptorServer {
    Selector acceptorSel = null;
    NioWorker[] workers = null;
    int workerPoint = 0;

    public NioAcceptorServer(int workerNum) {
        //模拟生成N个Worker
        workers = new NioWorker[workerNum];
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new NioWorker();
            workers[i].start();
        }
    }

    //获取下一个Worker
    public NioWorker nextWorker() {
        workerPoint++;
        int i = workerPoint % workers.length;
        return workers[i];
    }

    public void run() {
        try {

            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(new InetSocketAddress(8080));
            //注册监听Accept事件
            acceptorSel = Selector.open();
            ssc.register(acceptorSel, SelectionKey.OP_ACCEPT);
            while (true) {
                //阻塞等待
                acceptorSel.select();
                Set<SelectionKey> set = acceptorSel.selectedKeys();
                Iterator<SelectionKey> it = set.iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    //收到请求建立连接的事件
                    if (sk.isAcceptable()) {
                        ServerSocketChannel ssc1 = (ServerSocketChannel) sk.channel();
                        //建立连接
                        SocketChannel sc = ssc1.accept();
                        // 选择并分发请求到一个NioWorker
                        NioWorker worker = nextWorker();
                        worker.register(sc);
                    }
                }
            }
        } catch (ClosedChannelException e1) {
            e1.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
}

复制
//从Reactor
public class NioWorker extends Thread {

    private Selector selector = null;

    public NioWorker() {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void register(SocketChannel sc) {
        try {
            sc.configureBlocking(false);
            ByteBuffer bb = ByteBuffer.allocate(4);
            //注册监听Read事件
            sc.register(selector, SelectionKey.OP_READ, bb);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                //阻塞等待事件
                selector.select();

                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> it = set.iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    //处理读事件
                    if (sk.isReadable()) {
                        SocketChannel sc = (SocketChannel) sk.channel();
                        ByteBuffer bb = (ByteBuffer) sk.attachment();
                        try {
                            //数据读取非阻塞,可以触发多次 OP_READ 事件才完成数据传输
                            int i = sc.read(bb);                           
                            if (i == -1) {
                                //end of file,读取结束...
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }                 
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

复制

Handler处理器

Netty中每个Channel都有与之对应的ChannelPipeline,每个ChannelPipeline维护了一个ChannelHandlerContext组成的双向链表,ChannelHandlerContext包装了对ChannelHandler的封装。通过责任链模式,ChannelHandler在处理完一个IO事件后,会传递给下一个ChannelHandler。


ChannelHandler分为ChannelInboundHandler处理入IO事件、ChannelOutboundHandler处理出IO事件。比如我们开发一个Netty服务器,处理请求则会进入ChannelInboundHandler链中处理,处理响应则会进入ChannelOutboundHandler链中处理。


在Netty开发中,通常我们会通过ChannelPipeline的addLast()等方法来注入出和入handler执行链,这里就需要注意它的执行顺序,也是会按照add的顺序,分别找出handler或者入handler来处理IO事件。


Handler串行化设计,从消息的读取、编码以及后续Handler的执行,始终都由IO线程NioEventLoop负责,这就意味着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。

Netty实战

下面我将用简短的代码,示范一个基于Netty实现的HTTP服务器

我们首先考虑要实现一个HTTP服务器,需要做哪些事:

1、定义主线程池用于处理客户端连接,以及从线程池用于处理IO事件;

2、考虑一些必要的基础配置,如连接数据缓冲器,内存分配器等;

3、HTTP请求解码,可通过HttpRequestDecoder完成;

4、处理HTTP请求完成相应业务逻辑处理;

5、HTTP响应编码,可通过HttpResponseEncoder完成;

6、响应HTTP请求到客户端。

public class NettyServer {
    
    public void startServer() throws Exception {
        //初始化boss和worker线程池
        int defaultThreadNum = Runtime.getRuntime().availableProcessors() * 2;
        EventLoopGroup bossGroup = new NioEventLoopGroup(defaultThreadNum,
                new ThreadFactoryBuilder().setNameFormat("boss-group").build());
        EventLoopGroup workerGroup = new NioEventLoopGroup(defaultThreadNum,
                new ThreadFactoryBuilder().setNameFormat("worker-group").build());

        //创建启动类
        ServerBootstrap b = new ServerBootstrap();

        //允许重复使用本地地址和端口
        b.option(ChannelOption.SO_REUSEADDR, true);

        //设置内存分配器,可以重复利用之前分配的内存空间
        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        //接收缓冲器,AdaptiveRecvByteBufAllocator能够根据上一次接收数据的大小,来自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费
        b.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);

        //true关闭Nagle算法,Nagle算法尽可能发送大块数据,避免网络中充斥着许多小数据块,要求高实时性,有数据发送时就马上发送,就将该选项设置为true
        b.childOption(ChannelOption.TCP_NODELAY, true);

        //设置线程组
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                //注册handler处理器
                .childHandler(
                        new DispatchHandler()
                );
        //绑定端口并注册异步处理监听器
        b.bind(8080).addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    log.info("Netty 服务器成功启动");
                } else {
                    log.error("Netty 服务器启动失败, 原因: ", future.cause());
                }
            }
        }).await();
    }
}

复制
public class DispatchHandler extends ChannelInitializer<SocketChannel> {

    private final int maxAggregateSize = 1024 * 1024 * 5;
    private final int httpClientCodecMaxInitialLineLength = 4096;
    private final int httpClientCodecMaxHeaderSize = 64 * 1024;
    private final int httpClientCodecMaxChunkSize = 128 * 1024;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        //HTTP请求解码处理器
        p.addLast(new HttpRequestDecoder(httpClientCodecMaxInitialLineLength, httpClientCodecMaxHeaderSize, httpClientCodecMaxChunkSize));
        //HTTP响应编码处理器
        p.addLast(new HttpResponseEncoder());
        //定义缓冲数据量,通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,需要绑定HttpObjectAggregator
        p.addLast(new HttpObjectAggregator(maxAggregateSize));
        //HTTP业务逻辑处理器
        p.addLast(new HttpBusinessHandler());
    }
}

复制
public class HttpBusinessHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest httpRequest = (FullHttpRequest) msg;
        //完成业务逻辑处理...
        //...
        
        //向客户端响应内容
        String responseMessage = "ok";
        //FullHttpResponse可以设置响应头,响应状态码等基本信息
        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1,
                HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseMessage
                .getBytes()));
        ctx.writeAndFlush(httpResponse);
    }
}

复制

Netty实战经验总结

  • 多数情况下,Netty自带基础handler满足大部分业务场景可供我们灵活选择,开发人员只需关注业务处理器的逻辑

  • 时间可控的简单业务可直接在IO线程上处理

  • 复杂和时间不可控的业务建议投递到后端业务线程池统一处理

  • 业务线程避免直接操作ChannelHandler


作  者:杨鑫(杨过)
审  稿:吴友强(技巅)
编  吴友强(技巅)
文章转载自云筑网技术团队,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论