IT徐胖子原创本文未授权请勿转载
1 线程模型概念
DUBBO默认网络通信采用Netty框架,Netty服务端通常包含两个线程组:bossGroup线程组和workerGroup线程组。
bossGroup线程组只有一个线程处理客户端连接请求,连接完成后将完成三次握手的SocketChannel连接分发给workerGroup处理读写请求,这两个线程组被称为IO线程。我们编写一个Netty服务端进行观察。
public class NettyServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(8);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyServerHandler());}});ChannelFuture channelFuture = bootstrap.bind(9999).sync();System.out.println("服务端准备就绪");channelFuture.channel().closeFuture().sync();} catch (Exception ex) {System.out.println(ex.getMessage());} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
我们再引出另一个概念:业务线程。服务生产者接收到请求后,如果处理逻辑可以快速处理完成,并且不会发起新IO请求,那么可以直接放在IO线程处理,减少线程池调度与上下文切换提高效率。
服务生产者接收到请求后,如果处理逻辑非常耗时,或者会发起新IO请求例如查询数据库,那么必须派发到业务线程池处理。
DUBBO提供了多种线程模型,选择线程模型需要在配置文件指定dispatcher属性
<dubbo:protocol name="dubbo" dispatcher="all" ><dubbo:protocol name="dubbo" dispatcher="direct" ><dubbo:protocol name="dubbo" dispatcher="message" ><dubbo:protocol name="dubbo" dispatcher="execution" ><dubbo:protocol name="dubbo" dispatcher="connection" >
不同线程模型在不同场景选择使用IO线程还是业务线程,参看DUBBO官网文档
all所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳direct所有消息都不派发到业务线程池,全部在IO线程直接执行message只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行execution只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行connection在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池
2 线程模型确定时机
生产者在暴露服务时确认线程模型,消费者在发起连接时确认线程模型,默认线程模型AllDispatcher,本文以生产者为例
public class NettyTransporter implements Transporter {@Overridepublic Server bind(URL url, ChannelHandler listener) throws RemotingException {// 生产者暴露服务return new NettyServer(url, listener);}@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException {// 消费者发起连接return new NettyClient(url, listener);}}public class NettyServer extends AbstractServer implements Server {public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 默认线程模型all// MultiMessageHandler(HeartbeatHandler(AllChannelHandler))super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}}
ChannelHandlers.wrap方法获取Dispatcher自适应扩展点,如果配置指定dispatcher属性,扩展点加载器会从URL获取属性值加载对应线程模型
public class ChannelHandlers {protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {// Dispatcher默认自适应扩展点AllDispatcherreturn new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}}@SPI(AllDispatcher.NAME)public interface Dispatcher {@Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})ChannelHandler dispatch(ChannelHandler handler, URL url);}
3 源码分析
3.1 All
public class AllDispatcher implements Dispatcher {// 线程模型名称public static final String NAME = "all";// 具体实现策略@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);}}public class AllChannelHandler extends WrappedChannelHandler {@Overridepublic void connected(Channel channel) throws RemotingException {// 连接完成事件交给业务线程池ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {// 断开连接事件交给业务线程池ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 请求响应事件交给业务线程池ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException) {Request request = (Request)message;if(request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {// 异常事件交给业务线程池ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);}}}
3.2 Direct
public class DirectDispatcher implements Dispatcher {// 线程模型名称public static final String NAME = "direct";// 具体实现策略@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 直接返回handler表示所有事件交给IO线程处理return handler;}}
3.3 Message
public class MessageOnlyDispatcher implements Dispatcher {// 线程模型名称public static final String NAME = "message";// 具体实现策略@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {return new MessageOnlyChannelHandler(handler, url);}}public class MessageOnlyChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 请求响应事件交给业务线程池处理ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}}
3.4 Execution
public class ExecutionDispatcher implements Dispatcher {// 线程模型名称public static final String NAME = "execution";// 具体实现策略@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {return new ExecutionChannelHandler(handler, url);}}public class ExecutionChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 获取业务线程池ExecutorService cexecutor = getExecutorService();// 请求类消息交给业务线程池处理if (message instanceof Request) {try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (t instanceof RejectedExecutionException) {Request request = (Request) message;if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);}} else {handler.received(channel, message);}}}
3.5 Connection
public class ConnectionOrderedDispatcher implements Dispatcher {// 线程模型名称public static final String NAME = "connection";// 具体实现策略@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {return new ConnectionOrderedChannelHandler(handler, url);}}public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {protected final ThreadPoolExecutor connectionExecutor;private final int queuewarninglimit;public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {super(handler, url);// 构造函数声明只有一个线程的线程池进行串行化处理String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);connectionExecutor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),new NamedThreadFactory(threadName, true),new AbortPolicyWithReport(threadName, url));queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);}@Overridepublic void connected(Channel channel) throws RemotingException {try {// 检查线程池队列元素个数checkQueueLength();// 连接完成事件交给线程池处理connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}@Overridepublic void disconnected(Channel channel) throws RemotingException {try {// 检查线程池队列元素个数checkQueueLength();// 连接断开事件交给线程池处理connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 请求响应事件交给业务线程池处理ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {Request request = (Request) message;if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {// 异常事件交给业务线程池处理ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}}// 检查队列大小private void checkQueueLength() {if (connectionExecutor.getQueue().size() > queuewarninglimit) {logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));}}}
4 文章总结
本文分析了DUBBO线程模型源码,可以根据不同业务场景选择不同线程模型。后续文章分析DUBBO线程池策略请继续关注。
长按二维码关注更多精彩文章
文章转载自JAVA前线,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




