暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「DUBBO系列」线程模型实现原理

JAVA前线 2020-06-01
192

IT徐胖子原创本文未授权请勿转载


1 线程模型概念

DUBBO默认网络通信采用Netty框架,Netty服务端通常包含两个线程组:bossGroup线程组和workerGroup线程组。

bossGroup线程组只有一个线程处理客户端连接请求,连接完成后将完成三次握手的SocketChannel连接分发给workerGroup处理读写请求,这两个线程组被称为IO线程。我们编写一个Netty服务端进行观察。

    public class NettyServer {
    public static void main(String[] args) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(8);
    try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new NettyServerHandler());
    }
    });
    ChannelFuture channelFuture = bootstrap.bind(9999).sync();
    System.out.println("服务端准备就绪");
    channelFuture.channel().closeFuture().sync();
    } catch (Exception ex) {
    System.out.println(ex.getMessage());
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }

    我们再引出另一个概念:业务线程。服务生产者接收到请求后,如果处理逻辑可以快速处理完成,并且不会发起新IO请求,那么可以直接放在IO线程处理,减少线程池调度与上下文切换提高效率。

    服务生产者接收到请求后,如果处理逻辑非常耗时,或者会发起新IO请求例如查询数据库,那么必须派发到业务线程池处理。

    DUBBO提供了多种线程模型,选择线程模型需要在配置文件指定dispatcher属性

      <dubbo:protocol name="dubbo" dispatcher="all" >
      <dubbo:protocol name="dubbo" dispatcher="direct" >
      <dubbo:protocol name="dubbo" dispatcher="message" >
      <dubbo:protocol name="dubbo" dispatcher="execution" >
      <dubbo:protocol name="dubbo" dispatcher="connection" >

      不同线程模型在不同场景选择使用IO线程还是业务线程,参看DUBBO官网文档

        all
        所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳


        direct
        所有消息都不派发到业务线程池,全部在IO线程直接执行


        message
        只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行


        execution
        只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行


        connection
        在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池


        2 线程模型确定时机

        生产者在暴露服务时确认线程模型,消费者在发起连接时确认线程模型,默认线程模型AllDispatcher,本文以生产者为例

          public class NettyTransporter implements Transporter {
          @Override
          public Server bind(URL url, ChannelHandler listener) throws RemotingException {
          // 生产者暴露服务
          return new NettyServer(url, listener);
          }

          @Override
          public Client connect(URL url, ChannelHandler listener) throws RemotingException {
          // 消费者发起连接
          return new NettyClient(url, listener);
          }
          }


          public class NettyServer extends AbstractServer implements Server {
          public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
          // 默认线程模型all
          // MultiMessageHandler(HeartbeatHandler(AllChannelHandler))
          super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
          }
          }

          ChannelHandlers.wrap方法获取Dispatcher自适应扩展点,如果配置指定dispatcher属性,扩展点加载器会从URL获取属性值加载对应线程模型

            public class ChannelHandlers {
            protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            // Dispatcher默认自适应扩展点AllDispatcher
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
            }
            }


            @SPI(AllDispatcher.NAME)
            public interface Dispatcher {
            @Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
            ChannelHandler dispatch(ChannelHandler handler, URL url);
            }


            3 源码分析

            3.1 All

              public class AllDispatcher implements Dispatcher {


              // 线程模型名称
              public static final String NAME = "all";


              // 具体实现策略
              @Override
              public ChannelHandler dispatch(ChannelHandler handler, URL url) {
              return new AllChannelHandler(handler, url);
              }
              }


              public class AllChannelHandler extends WrappedChannelHandler {


              @Override
              public void connected(Channel channel) throws RemotingException {
              // 连接完成事件交给业务线程池
              ExecutorService cexecutor = getExecutorService();
              try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
              } catch (Throwable t) {
              throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
              }
              }


              @Override
              public void disconnected(Channel channel) throws RemotingException {
              // 断开连接事件交给业务线程池
              ExecutorService cexecutor = getExecutorService();
              try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
              } catch (Throwable t) {
              throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
              }
              }


              @Override
              public void received(Channel channel, Object message) throws RemotingException {
              // 请求响应事件交给业务线程池
              ExecutorService cexecutor = getExecutorService();
              try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
              } catch (Throwable t) {
              if(message instanceof Request && t instanceof RejectedExecutionException) {
              Request request = (Request)message;
              if(request.isTwoWay()) {
              String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
              Response response = new Response(request.getId(), request.getVersion());
              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
              response.setErrorMessage(msg);
              channel.send(response);
              return;
              }
              }
              throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
              }
              }


              @Override
              public void caught(Channel channel, Throwable exception) throws RemotingException {
              // 异常事件交给业务线程池
              ExecutorService cexecutor = getExecutorService();
              try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
              } catch (Throwable t) {
              throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
              }
              }
              }


              3.2 Direct

                public class DirectDispatcher implements Dispatcher {


                // 线程模型名称
                public static final String NAME = "direct";


                // 具体实现策略
                @Override
                public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                // 直接返回handler表示所有事件交给IO线程处理
                return handler;
                }
                }


                3.3 Message

                  public class MessageOnlyDispatcher implements Dispatcher {


                  // 线程模型名称
                  public static final String NAME = "message";


                  // 具体实现策略
                  @Override
                  public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                  return new MessageOnlyChannelHandler(handler, url);
                  }
                  }


                  public class MessageOnlyChannelHandler extends WrappedChannelHandler {


                  @Override
                  public void received(Channel channel, Object message) throws RemotingException {
                  // 请求响应事件交给业务线程池处理
                  ExecutorService cexecutor = getExecutorService();
                  try {
                  cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
                  } catch (Throwable t) {
                  throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
                  }
                  }
                  }


                  3.4 Execution

                    public class ExecutionDispatcher implements Dispatcher {


                    // 线程模型名称
                    public static final String NAME = "execution";


                    // 具体实现策略
                    @Override
                    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                    return new ExecutionChannelHandler(handler, url);
                    }
                    }


                    public class ExecutionChannelHandler extends WrappedChannelHandler {


                    @Override
                    public void received(Channel channel, Object message) throws RemotingException {


                    // 获取业务线程池
                    ExecutorService cexecutor = getExecutorService();


                            // 请求类消息交给业务线程池处理
                    if (message instanceof Request) {
                    try {
                    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
                    } catch (Throwable t) {
                    if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                    }
                    }
                    throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
                    }
                    } else {
                    handler.received(channel, message);
                    }
                    }
                    }


                    3.5 Connection

                      public class ConnectionOrderedDispatcher implements Dispatcher {


                      // 线程模型名称
                      public static final String NAME = "connection";


                      // 具体实现策略
                      @Override
                      public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                      return new ConnectionOrderedChannelHandler(handler, url);
                      }
                      }


                      public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
                      protected final ThreadPoolExecutor connectionExecutor;
                      private final int queuewarninglimit;


                      public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
                      super(handler, url);


                      // 构造函数声明只有一个线程的线程池进行串行化处理
                      String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
                      connectionExecutor = new ThreadPoolExecutor(1, 1,
                      0L, TimeUnit.MILLISECONDS,
                      new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                      new NamedThreadFactory(threadName, true),
                      new AbortPolicyWithReport(threadName, url)
                      );
                      queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
                      }


                      @Override
                      public void connected(Channel channel) throws RemotingException {
                      try {
                      // 检查线程池队列元素个数
                      checkQueueLength();
                      // 连接完成事件交给线程池处理
                      connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
                      } catch (Throwable t) {
                      throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
                      }
                      }


                      @Override
                      public void disconnected(Channel channel) throws RemotingException {
                      try {
                      // 检查线程池队列元素个数
                      checkQueueLength();
                      // 连接断开事件交给线程池处理
                      connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
                      } catch (Throwable t) {
                      throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
                      }
                      }


                      @Override
                      public void received(Channel channel, Object message) throws RemotingException {
                      // 请求响应事件交给业务线程池处理
                      ExecutorService cexecutor = getExecutorService();
                      try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
                      } catch (Throwable t) {
                      if (message instanceof Request && t instanceof RejectedExecutionException) {
                      Request request = (Request) message;
                      if (request.isTwoWay()) {
                      String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                      Response response = new Response(request.getId(), request.getVersion());
                      response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                      response.setErrorMessage(msg);
                      channel.send(response);
                      return;
                      }
                      }
                      throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
                      }
                      }


                      @Override
                      public void caught(Channel channel, Throwable exception) throws RemotingException {
                      // 异常事件交给业务线程池处理
                      ExecutorService cexecutor = getExecutorService();
                      try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
                      } catch (Throwable t) {
                      throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
                      }
                      }


                      // 检查队列大小
                      private void checkQueueLength() {
                      if (connectionExecutor.getQueue().size() > queuewarninglimit) {
                      logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
                      }
                      }
                      }


                      4 文章总结

                      本文分析了DUBBO线程模型源码,可以根据不同业务场景选择不同线程模型。后续文章分析DUBBO线程池策略请继续关注。


                      长按二维码关注更多精彩文章

                      文章转载自JAVA前线,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论