暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty 入门学习

伦少的博客 2025-01-14
91

前言

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

Spark 通信历史

最开始: Akka Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题 Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。 Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

Netty

Server 主要代码:

    // 创建ServerBootstrap实例,服务器启动对象
    ServerBootstrap bootstrap = new ServerBootstrap();


    ChannelFuture channelFuture = bootstrap.bind(8888).sync();
    // 等待服务器关闭
    channelFuture.channel().closeFuture().sync();

    主要是启动 ServerBootstrap、绑定端口、等待关闭。

    Client 主要代码:

      // 创建Bootstrap实例,客户端启动对象
      Bootstrap bootstrap = new Bootstrap();
      // 连接服务端
      ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

      Server 添加 Handler

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ServerHandler());
            }
        });
          bootstrap.handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  socketChannel.pipeline().addLast(new ClientHandler());
              }
          });

          这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。 如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

          完整代码

          地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

          NettyServer

            package com.dkl.java.demo;


            import io.netty.bootstrap.ServerBootstrap;
            import io.netty.channel.*;
            import io.netty.channel.nio.NioEventLoopGroup;
            import io.netty.channel.socket.SocketChannel;
            import io.netty.channel.socket.nio.NioServerSocketChannel;


            public class NettyServer {


                public static void main(String[] args) {
                    try {
                        bind();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }


                public static void bind() throws InterruptedException {


                    // 创建boss线程组,用于接收连接
                    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                    创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
                    try {
                        创建ServerBootstrap实例,服务器启动对象
                        ServerBootstrap bootstrap = new ServerBootstrap();


                        使用链式编程配置参数
                        将boss线程组和worker线程组暂存到ServerBootstrap
                        bootstrap.group(bossGroup, workerGroup);
                        设置服务端Channel类型为NioServerSocketChannel作为通道实现
                        bootstrap.channel(NioServerSocketChannel.class);


                        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器
                                socketChannel.pipeline().addLast(new ServerHandler());
                            }
                        });
                        设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求
                        当有多个客户端同时来请求时,未处理的请求先放入队列中
                        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                        绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象
                        ChannelFuture channelFuture = bootstrap.bind(8888).sync();
                        等待服务器关闭
                        channelFuture.channel().closeFuture().sync();
                    } finally {
                        优雅地关闭boss线程组
                        bossGroup.shutdownGracefully();
                        优雅地关闭worker线程组
                        workerGroup.shutdownGracefully();
                    }
                }
            }

            ServerHandler

              package com.dkl.java.demo;


              import io.netty.buffer.ByteBuf;
              import io.netty.buffer.Unpooled;
              import io.netty.channel.ChannelHandlerContext;
              import io.netty.channel.ChannelInboundHandlerAdapter;
              import io.netty.util.CharsetUtil;
              import io.netty.util.ReferenceCountUtil;


              public class ServerHandler extends ChannelInboundHandlerAdapter {


                  /**
                   * 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelRegistered");
                  }


                  /**
                   * 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调
                   * 用
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelUnregistered");
                  }


                  /**
                   * 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelActive(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelActive");
                  }


                  /**
                   * 当 Channel 离开活动状态并且不再连接它的远程节点时被调用
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelInactive");
                  }


                  /**
                   * 当从 Channel 读取数据时被调用
                   *
                   * @param ctx
                   * @param msg
                   * @throws Exception
                   */
                  @Override
                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


                      System.out.println("执行 channelRead");
                      // 处理接收到的数据
                      ByteBuf byteBuf = (ByteBuf) msg;
                      try {
                          // 将接收到的字节数据转换为字符串
                          String message = byteBuf.toString(CharsetUtil.UTF_8);
                          // 打印接收到的消息
                          System.out.println("Server端收到客户消息: " + message);
                          // 发送响应消息给客户端
                          ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));
                      } finally {
                          // 释放ByteBuf资源
                          ReferenceCountUtil.release(byteBuf);
                      }


                  }


                  /**
                   * 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelReadComplete");
                  }


                  /**
                   * 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被
                   * 调用
                   *
                   * @param ctx
                   * @param evt
                   * @throws Exception
                   */
                  @Override
                  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                      System.out.println("执行 userEventTriggered");
                  }


                  /**
                   * 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法
                   * * 来检测 Channel 的可写性。与可写性相关的阈值可以通过
                   * * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来
                   * * 设置
                   *
                   * @param ctx
                   * @throws Exception
                   */
                  @Override
                  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
                      System.out.println("执行 channelWritabilityChanged");
                  }


                  @Override
                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                      System.out.println("执行 exceptionCaught");
                      // 异常处理
                      cause.printStackTrace();
                      ctx.close();
                  }
              }


              NettyClient

                package com.dkl.java.demo;


                import io.netty.bootstrap.Bootstrap;
                import io.netty.channel.ChannelFuture;
                import io.netty.channel.ChannelInitializer;
                import io.netty.channel.EventLoopGroup;
                import io.netty.channel.nio.NioEventLoopGroup;
                import io.netty.channel.socket.SocketChannel;
                import io.netty.channel.socket.nio.NioSocketChannel;


                public class NettyClient {


                    public static void main(String[] args) {
                        start();
                    }


                    public static void start() {


                        // 创建EventLoopGroup,用于处理客户端的I/O操作
                        EventLoopGroup groupThread = new NioEventLoopGroup();


                        try {
                            // 创建Bootstrap实例,客户端启动对象
                            Bootstrap bootstrap = new Bootstrap();
                            bootstrap.group(groupThread);
                            // 设置服务端Channel类型为NioSocketChannel作为通道实现
                            bootstrap.channel(NioSocketChannel.class);
                            // 设置客户端处理
                            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    socketChannel.pipeline().addLast(new ClientHandler());
                                }
                            });
                            // 绑定端口
                            ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
                            channelFuture.channel().closeFuture().sync();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        } finally {
                            // 优雅地关闭线程
                            groupThread.shutdownGracefully();
                        }
                    }
                }

                ClientHandler

                  package com.dkl.java.demo;


                  import io.netty.buffer.ByteBuf;
                  import io.netty.buffer.Unpooled;
                  import io.netty.channel.ChannelHandlerContext;
                  import io.netty.channel.ChannelInboundHandlerAdapter;
                  import io.netty.util.CharsetUtil;
                  import io.netty.util.ReferenceCountUtil;


                  public class ClientHandler extends ChannelInboundHandlerAdapter {


                      @Override
                      public void channelActive(ChannelHandlerContext ctx) {
                          // 连接建立时的处理,发送请求消息给服务器
                          ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));
                      }


                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) {
                          // 处理接收到的数据
                          ByteBuf byteBuf = (ByteBuf) msg;
                          try {
                              // 将接收到的字节数据转换为字符串
                              String message = byteBuf.toString(CharsetUtil.UTF_8);
                              // 打印接收到的消息
                              System.out.println("受到服务端响应的消息: " + message);


                              // TODO: 对数据进行业务处理


                          } finally {
                              // 释放ByteBuf资源
                              ReferenceCountUtil.release(byteBuf);
                          }
                      }


                      @Override
                      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                          // 异常处理
                          cause.printStackTrace();
                          ctx.close();
                      }
                  }

                  运行截图

                  handler 执行顺序

                  Server 端

                    连接时:


                    执行 channelRegistered
                    执行 channelActive
                    执行 channelRead
                    执行 channelReadComplete


                    断开连接时:


                    执行 channelReadComplete
                    (强制中断 Client 连接
                    执行 exceptionCaught
                    执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
                    )
                    执行 channelInactive
                    执行 channelUnregistered


                    channelReadComplete 中 ctx.close(); 触发:
                    执行 channelInactive
                    执行 channelUnregistered

                    Client 端

                      执行 channelRegistered
                      执行 channelActive
                      执行 channelRead
                      执行 channelReadComplete

                      Spark 对应位置

                      Spark版本:3.2.3Server: org.apache.spark.network.server.TransportServer.initClient: org.apache.spark.network.client.TransportClientFactory.createClient

                      文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论