LengthFieldBasedFrameDecoder 基于长度字段解码器,是一个非常灵活、强大的解码器,他能够根据我们动态配置的参数对接收到的消息进行动态解码,以满足实际的业务需求。当解码具有消息头长度字段表示该消息主体或整个消息的长度的二进制消息时,它是特别有用的。
//=====================核心属性====================//表示字节流的数据是大端还是小端,默认是大端private final ByteOrder byteOrder;//表示读取每帧数据的最大阈值private final int maxFrameLength;//表示长度域左边的偏移量private final int lengthFieldOffset;//表示长度域的长度private final int lengthFieldLength;//表示长度域左边的偏移量+长度域的长度private final int lengthFieldEndOffset;//表示长度域左边的偏移量+长度域的长度后面偏移量、调整量private final int lengthAdjustment;//表示跳过多少字节后就能拿到数据域private final int initialBytesToStrip;//表示是否开启快速失败机制private final boolean failFast;//表示是否开启丢弃模式、正在丢弃数据private boolean discardingTooLongFrame;//表示一共丢弃的字节数据private long tooLongFrameLength;//表示每一次丢弃的字节数据private long bytesToDiscard;
public class LengthFieldBasedFrameDecoderTestServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,2)).addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof ByteBuf) {ByteBuf packet = (ByteBuf) msg;System.out.println(packet.toString(Charset.defaultCharset()));}}});}});ChannelFuture f = b.bind(9000).sync();System.out.println("Started LengthFieldBasedFrameDecoderTestServer...");f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}
public class LengthFieldBasedFrameDecoderTestClient {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {public void channelActive(ChannelHandlerContext ctx) {ByteBuf byteBuf = Unpooled.copiedBuffer("hello world".getBytes());ctx.writeAndFlush(byteBuf);}});}});ChannelFuture f = b.connect("127.0.0.1", 9000).sync();System.out.println("Started LengthFieldBasedFrameDecoderTestClient...");f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}}
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,0);
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)+--------+----------------+ +----------------+| Length | Actual Content |----->| Actual Content || 0x000C | "HELLO, WORLD" | |"HELLO, WORLD" |+--------+----------------+ +----------------+
在这个例子中,长度字段的值是12(0x0C)表示“HELLO,WORLD”的长度(不包括长度域,只代表数据域的长度)。解码后长度域丢失。
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,2);
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,-2,0);
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)+----------+----------+----------------+ +----------+----------+----------------+| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content || 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |+----------+----------+----------------+ +----------+----------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,2,3,0,0);
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)+----------+----------+----------------+ +----------+----------+----------------+| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content || 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |+----------+----------+----------------+ +----------+----------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,3,2,0);
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)+------+--------+------+----------------+ +------+----------------+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content || 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |+------+--------+------+----------------+ +------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,1,2,1,3);
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)+------+--------+------+----------------+ +------+----------------+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content || 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |+------+--------+------+----------------+ +------+----------------+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,1,2,-3,3);
netty提供的各种解码器统一都继承了ByteToMessageDecoder,ByteToMessageDecoder负责读取字节流并转换成其他消息,而ByteToMessageDecoder又继承了ChannelInboundHandlerAdapter,而ChannelInboundHandlerAdapter正是我们刚才在测试数据的时候在客户端重写了他的方法channelActive(),使管道生效并且数据写入缓冲区并发送数据。

@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {//寻找下一个绑定的Handler并且调用ChannelRead方法invokeChannelRead(findContextInbound(), msg);return this;}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(() -> next.invokeChannelRead(m));}}
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {int outSize = out.size();//将out里的事件执行并且清空outif (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();// 在继续解码之前,请检查是否已删除此处理程序// 如果已将其删除,则继续在缓冲区上操作是不安全的。if (ctx.isRemoved()) {break;}outSize = 0;}int oldInputLength = in.readableBytes();// 开始解析数据,如果解析出来数据,那么out的长度一定会改变decode(ctx, in, out);// 在继续循环之前,请检查是否已删除此处理程序。// 如果已将其删除,则继续在缓冲区上操作是不安全的。if (ctx.isRemoved()) {break;}// 如果没有解析出来数据if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {break;} else {continue;}}if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}if (isSingleDecode()) {break;}}} catch (DecoderException e) {throw e;} catch (Throwable cause) {throw new DecoderException(cause);}}
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {if (msgs instanceof CodecOutputList) {fireChannelRead(ctx, (CodecOutputList) msgs, numElements);} else {for (int i = 0; i < numElements; i++) {ctx.fireChannelRead(msgs.get(i));}}}
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {//寻找下一个绑定的Handler并且调用ChannelRead方法invokeChannelRead(findContextInbound(), msg);return this;}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(() -> next.invokeChannelRead(m));}}private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}

//=====================核心属性====================//表示字节流的数据是大端还是小端,默认是大端private final ByteOrder byteOrder;//表示读取每帧数据的最大阈值private final int maxFrameLength;//表示长度域左边的偏移量private final int lengthFieldOffset;//表示长度域的长度private final int lengthFieldLength;//表示长度域左边的偏移量+长度域的长度private final int lengthFieldEndOffset;//表示长度域左边的偏移量+长度域的长度后面偏移量、调整量private final int lengthAdjustment;//表示跳过多少字节后就能拿到数据域private final int initialBytesToStrip;//表示是否开启快速失败机制private final boolean failFast;//表示是否开启丢弃模式、正在丢弃数据private boolean discardingTooLongFrame;//表示一共丢弃的字节数据private long tooLongFrameLength;//表示每一次丢弃的字节数据private long bytesToDiscard;
@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {//首先判断是否正在处于丢弃模式中,如果正在处于丢弃模式中,直接进行数据丢弃if (discardingTooLongFrame) {long bytesToDiscard = this.bytesToDiscard;int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());//直接跳过指定的字节数据in.skipBytes(localBytesToDiscard);bytesToDiscard -= localBytesToDiscard;this.bytesToDiscard = bytesToDiscard;failIfNecessary(false);}// 如果当前可读字节还未达到lengthFieldEndOffset大小,说明数据域中并没有可读内容,此时直接返回nullif (in.readableBytes() < lengthFieldEndOffset) {return null;}// 拿到实际的lengthFieldOffsetint actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;// 拿到未调整过得数据包长度long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);// 如果数据包长度为负数,直接跳过长度域并抛出异常if (frameLength < 0) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);}// 调整包的总长度frameLength += lengthAdjustment + lengthFieldEndOffset;// 包的总长度还没有长度域长,直接抛出异常if (frameLength < lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " +"than lengthFieldEndOffset: " + lengthFieldEndOffset);}// 数据包长度超出最大包长度,进入丢弃模式,总丢弃数据长度tooLongFrameLengthif (frameLength > maxFrameLength) {long discard = frameLength - in.readableBytes();tooLongFrameLength = frameLength;if (discard < 0) {// 缓冲区包含更多的字节,然后是frameLength,所以我们现在就可以丢弃所有字节// 当前readableBytes达到frameLength,直接跳过frameLength个字节in.skipBytes((int) frameLength);} else {// 当前可读字节未达到frameLength,说明后面未读到的字节也需要丢弃,进入丢弃模式discardingTooLongFrame = true;// bytesToDiscard表示还需要丢弃多少字节bytesToDiscard = discard;// 进入丢弃模式并丢弃到目前为止收到的所有内容.in.skipBytes(in.readableBytes());}failIfNecessary(true);return null;}// frameLength永远不会溢出,因为它小于maxFrameLengthint frameLengthInt = (int) frameLength;if (in.readableBytes() < frameLengthInt) {return null;}// 如果返回数据包之前跳过的字节都大于frameLengthInt,说明数据异常,跳过现在包中的数据if (initialBytesToStrip > frameLengthInt) {in.skipBytes(frameLengthInt);throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " +"than initialBytesToStrip: " + initialBytesToStrip);}// 正常跳过initialBytesToStrip字节in.skipBytes(initialBytesToStrip);// 提取帧数据int readerIndex = in.readerIndex();int actualFrameLength = frameLengthInt - initialBytesToStrip;ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);// 调整读索引in.readerIndex(readerIndex + actualFrameLength);return frame;}
文章转载自文一西路代码狗,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




