暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty的责任链和事件驱动

没意思先生 2020-02-17
502

我们先通过官网Java Doc的一张图来了解pipeline的工作流程:

从图我们可以了解到:
1. 入站事件由Socket读取到数据开始,通过层层的入站处理器,到达最后一个入站处理器为止;
2: 出站事件由用户调用写入数据,通过层层的出站处理器,到达Socket写入数据为止;
3. 入站事件通过ChannelHandlerContext.fireXXX(xxx指入站事件名)触发;
4. 出站事件通过ChannelHandlerContext.xxx(xxx指出站事件名)触发;
DefaultChannelPipeline是Netty对pipeline的具体实现,我们具体看它的addLast()方法:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
复制
我们的关注点有三:
第一:方法内有synchronized关键字,因为pipeline处理Netty的内部I/O线程会调用,用户也可以在自己的线程里动态的增删handler,因此需要保证线程安全;
第二:checkMultiplicity()方法是为了避免将一个非线程安全的handler添加到多个channel中,从代码我们可以看到,要么单例handler被添加到多个handler时需要声明其为sharable(注:多线程情况下需要自己处理线程安全问题),即添加@Sharable注解,要么它本身就是一个多例:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
复制

第三:DefaultChannelPipeline不是直接通过保存ChannelHandler的链表进行调用,而是通过newContext()方法将handler包装成DefaultChannelHandlerContext对象后进行操作.我们看看DefaultChannelHandlerContext的几个关键属性:

volatile AbstractChannelHandlerContext next;// 指向前一个 ChannelHandlerContext
volatile AbstractChannelHandlerContext prev;// 指向后一个 ChannelHandlerContext
private final int executionMask;// 该ChannelHandler具体实现了哪些事件方法的掩码标识
复制

我们可以看到DefaultChannelHandlerContext对象保存了前缀指针和后缀指针,形成了一条双向链表,然后DefaultChannelPipeline通过自身的head和tail两个内部实现的AbstractChannelHandlerContext,将首尾链接起来,形成了调用链,至此一条调用链便完成了.


紧接着我们看看AbstractChannelHandlerContextexecutionMask,其值是调用了ChannelHandlerMaskmask()方法初始化:

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;


private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;


static int mask(Class<? extends ChannelHandler> clazz) {
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}


private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
// 略
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;


if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
// 略
}
// 略
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return mask;
}
复制

我们可以看到executionMask和传入的handler类型是有关系的,当传入的是ChannelInboundHandler类型的,它能处理的事件有:

  1. MASK_EXCEPTION_CAUGHT(异常抛出)

  2. MASK_CHANNEL_REGISTERED (channel注册)

  3. MASK_CHANNEL_UNREGISTERED(channel未注册)

  4. MASK_CHANNEL_ACTIVE(channel激活)

  5. MASK_CHANNEL_INACTIVE(channel失效)

  6. MASK_CHANNEL_READ(读数据)

  7. MASK_CHANNEL_READ_COMPLETE(读完成)

  8. MASK_USER_EVENT_TRIGGERED(用户事件触发)

  9. MASK_CHANNEL_WRITABILITY_CHANGED(channel可读性变更)

当传入的是ChannelOutboundHandler类型的,它能处理的事件有:

  1. MASK_EXCEPTION_CAUGHT(异常抛出)

  2. MASK_BIND(channel绑定)

  3. MASK_CONNECT(channel连接事件)

  4. MASK_DISCONNECT(channel断开)

  5. MASK_CLOSE(channel关闭)

  6. MASK_DEREGISTER(channel解除注册)

  7. MASK_READ(读数据)

  8. MASK_WRITE(写数据)

  9. MASK_FLUSH(刷新数据)

举例:当我们调用fireChannelRead()后如何将消息通过调用链传下去?当我们调用fireChannelRead()方法,实际上就是从调用链中去找下一个ChannelInboundHandler,这时候就需要用到executionMask了:

public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}


static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}


private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
复制

我们通过MASK_CHANNEL_READ这个mask值去找能处理MASK_CHANNEL_READ的事件处理器,这个过程是通过AbstractChannelHandlerContext的next指针向下遍历.当然如果不需要继续传递就不需要在代码中再写fireChannelRead()方法了.这里我们可以看到判断能否处理,就是通过一个位运算去处理,而我们一般人的话估计就是定义一堆事件的对象然后变量判断或者是通过hash判断是否在事件集合内.当然位运算肯定是更快了.

文章转载自没意思先生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论