暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty接收数据时一次读取多少字节以及读多少次

Netty历险记 2021-08-26
971


本篇文章介绍一下,Netty在接收到数据时,一次性读取多少字节.


本篇使用Netty构建一个简单的服务端,使用Python构建一个简单的客户端,然后客户端向服务端发送数据,然后观察Netty每次读取的字节数.


客户端代码如下


    import socket


    if __name__ == '__main__':


    client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    client.send('''11111111111111111111111111111111111111111111\
    2222222222222222222222222222222222222222222222222\
    3333333333333333333333333333333333333333333333333\
    4444444444444444444444444444444444444444444444444\
    5555555555555555555555555555555555555555555555555\
    6666666666666666666666666666666666666666666666666\
    7777777777777777777777777777777777777777777777777\
    8888888888888888888888888888888888888888888888888\
    9999999999999999999999999999999999999999999999999\
    0000000000000000000000000000000000000000000000000\
    1111111111111111111111111111111111111111111111111\
    2222222222222222222222222222222222222222222222222\
    3333333333333333333333333333333333333333333333333\
    4444444444444444444444444444444444444444444444444\
    5555555555555555555555555555555555555555555555555\
    6666666666666666666666666666666666666666666666666\
    7777777777777777777777777777777777777777777777777\
    8888888888888888888888888888888888888888888888888\
    9999999999999999999999999999999999999999999999999'''.encode('utf-8'))
    复制


    刻意让客户端一次性发送多一些数据 .


    服务端代码如下



      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelPipeline;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import io.netty.handler.codec.string.StringDecoder;
      import io.netty.handler.codec.string.StringEncoder;
      import io.netty.handler.logging.LogLevel;
      import io.netty.handler.logging.LoggingHandler;


      public class Server {




      public static void main(String[] args) throws Exception {


      EventLoopGroup bossGroup = new NioEventLoopGroup(1);
      EventLoopGroup workerGroup = new NioEventLoopGroup(8);
      EventLoopGroup businessGroup = new NioEventLoopGroup(8);


      ServerBootstrap serverBootstrap = new ServerBootstrap();


      try {


      serverBootstrap.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class)
      .handler(new LoggingHandler(LogLevel.INFO))
      .childHandler(new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) {
      ChannelPipeline channelPipeline = ch.pipeline();
      channelPipeline.addLast(new StringEncoder());
      channelPipeline.addLast(new StringDecoder());
      channelPipeline.addLast(businessGroup, new ServerHandler());
      }
      });


      ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync();
      channelFuture.channel().closeFuture().sync();
      } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
      }
      }
      }


      复制




        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.SimpleChannelInboundHandler;


        public class ServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println("接收到客户端信息:" + msg);
        }
        }
        复制



        启动服务端,然后执行客户端发送数据.



        这里我们借助wireshark工具查看数据包情况




        如上图可知, 客户端(端口43262)向服务端(端口8080)首先进行了三次握手,握手之后,客户端一次性向服务端发送了1142个字节内容,之后进行了四次挥手.

        接下来看一下服务端的打印日志情况.



        从上图可以发现,共打印了两次. 客户端发送了一次数据,就把所有的数据发送完了,而服务端却打印了两次,难道是Netty读取了两次TCP中的数据?

        接下来通过debug方式,观察下数据读取情况.

        Netty读取数据的逻辑在以下类方法中

          // 源码位置
          io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read


          @Override
          public final void read() {
          ...
          byteBuf = allocHandle.allocate(allocator);
          // 读取数据
          allocHandle.lastBytesRead(doReadBytes(byteBuf));
          ...
          }
          复制


          继续跟踪doReadBytes方法,最后会调用到


            @Override
            public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
            ensureWritable(length);
            int writtenBytes = setBytes(writerIndex, in, length);
            if (writtenBytes > 0) {
            writerIndex += writtenBytes;
            }
            // 就在此处打断点
            return writtenBytes;
            }
            复制


            客户端重新发送数据,再进行测试



            会发现,读取了1024个字节,放行



            如上图,第二次读取了118字节. 两次加起来1024+118=1142个字节,和客户端发送的数据一致.
            当然以上是我们通过debug方式查看的数据读取情况,我们也可以通过ss命令查看数据的读取情况,先让客户端发送数据,然后服务端读取一次数据,再通过debug让服务器暂时停下来,通过ss命令查看TCP接收缓冲区中还剩多少字节.



            还剩119个字节,其实就是118个有效数据再加一个结束字节. 其实与我们上面分析的是一致的.


            根据以上分析,客户端一次性把1142个字节发送给了服务端,但是服务端分两次才把数据读取完成,而且第一次只读取1024个字节.


            如果这个时候你认为文章标题的答案是1024个字节,那其实也是不对的.
            我们假设一种场景,客户端在一直急速地给服务端发送数据. 第一次Netty会使用1024字节大小的Buffer去读取TCP接收缓冲区中的数据,当读取完成之后,Netty发现分配的1024字节大小的Buffer都用来装数据了,那么Netty猜测后面应该还会有更多的数据,那么Netty下次就会分配16384字节大小的Buffer用来读取TCP接收缓冲区中的数据,如果16384字节大小的Buffer也被装满了数据,说明后面可能还会有很多数据,因此还会分配比16384更大的Buffer用来装数据.假如分配的16384字节大小的Buffer在读取数据之后没有被装满,说明TCP接收缓冲区中的数据可能不是很多,那么Netty就会分配比16384小的Buffer用来装下一次要读取的数据.


            总之,Netty会根据分配Buffer的大小和实际读取到的数据大小之间的关系,来决定是增大Buffer的大小还是减小Buffer的大小.

            核心代码如下

              // 源码位置
              io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#record


              private void record(int actualReadBytes) {
              if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
              if (decreaseNow) {
              index = max(index - INDEX_DECREMENT, minIndex);
              // 减小下次读取字节的大小
              nextReceiveBufferSize = SIZE_TABLE[index];
              decreaseNow = false;
              } else {
              decreaseNow = true;
              }
              } else if (actualReadBytes >= nextReceiveBufferSize) {
              index = min(index + INDEX_INCREMENT, maxIndex);
              // 增大下次读取字节的大小
              nextReceiveBufferSize = SIZE_TABLE[index];
              decreaseNow = false;
              }
              }
              复制


              还有一点需要说明的是,假如客户端发送了非常多的数据过来,难道服务端必须一直读取这个Channel里的数据吗?
              当然不是, 默认Netty只会读取Channel里面的数据16次,如果在16次的机会里,还是没有读取完这个Channel里面的数据,那么暂时就不会读取这个Channel里面的数据了,Netty需要去处理其他事情(比如轮询IO事件,处理IO事件,执行task任务等),只有当下次轮循到IO事件的时候,才会继续读取之前没有读完的数据.





              Netty使用的是水平触发,因此即便客户端不发送数据了,Netty依然可以把之前没有读取完的数据,继续读取.





              文章转载自Netty历险记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论