暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

dubbo编解码原理分析

shysheng 2019-01-27
457

在dubbo的一次请求、响应过程中,客户端和服务端之间的数据交换都涉及对数据的编解码工作,而这都是通过codec2接口来处理的,默认使用dubbo协议。本文将主要回答以下3个问题: 1.dubbo协议头格式 2.dubbo采用异步通信方式,那请求和响应是怎么关联的 3.编解码过程

1.Codec2接口

Codec2接口比较简单,除了负责编解码的2个方法之外,只有一个枚举DecodeResult用来对解码结果进行标识。

  • NEEDMOREINPUT:收到的字节流不是一个完整数据包,需要等待更多数据到达。

  • SKIPSOMEINPUT:忽略掉一部分数据包。

  1. @SPI

  2. public interface Codec2 {


  3.    @Adaptive({Constants.CODEC_KEY})

  4.    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;


  5.    @Adaptive({Constants.CODEC_KEY})

  6.    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;


  7.    enum DecodeResult {

  8.        NEED_MORE_INPUT, SKIP_SOME_INPUT

  9.    }


  10. }

复制
2. ExchangeCodec核心属性

dubbo中编解码入口在DubboCountCodec中,最终都会调到ExchangeCodec,其核心属性简单介绍如下:

  1. // 协议头长度,16个字节

  2. protected static final int HEADER_LENGTH = 16;

  3. // 魔数,用来判断是否是一个dubbo协议包

  4. protected static final short MAGIC = (short) 0xdabb;

  5. protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];

  6. protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];

  7. // 消息标记位

  8. protected static final byte FLAG_REQUEST = (byte) 0x80;

  9. protected static final byte FLAG_TWOWAY = (byte) 0x40;

  10. protected static final byte FLAG_EVENT = (byte) 0x20;

  11. // 5位序列化掩码,因此dubbo最多支持32种序列化协议

  12. protected static final int SERIALIZATION_MASK = 0x1f;

复制
3.dubbo协议头格式

dubbo官网使用下面这个图对dubbo协议头格式进行了介绍:

可以看到,dubbo协议头一共包含16个字节:

  • 2字节的魔数

  • 5位的序列化id

  • 1位事件标识

  • 1位单双向标识

  • 1位请求响应标识

  • 1个字节状态位

  • 8字节请求id

  • 4字节的消息体长度

4.编码源码解读

从DubboCountCodec#encode方法开始,最终会调到ExchangeCodec#encodeRequest方法,我们就从这里开始看。

4.1 客户端请求编码
  1. protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {

  2.    // 获取序列化协议,默认为hessian2

  3.    Serialization serialization = getSerialization(channel);

  4.    // 协议头总长度为16字节

  5.    byte[] header = new byte[HEADER_LENGTH];

  6.    // 设置魔数

  7.    Bytes.short2bytes(MAGIC, header);


  8.    // 设置消息标记位,总长度为1个字节,前3位分别表示请求响应标识、单双向标识和事件标识,后5位标识序列化id

  9.    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());


  10.    if (req.isTwoWay()) {

  11.        header[2] |= FLAG_TWOWAY;

  12.    }

  13.    if (req.isEvent()) {

  14.        header[2] |= FLAG_EVENT;

  15.    }


  16.    // 设置请求id

  17.    Bytes.long2bytes(req.getId(), header, 4);


  18.    // encode request data.

  19.    int savedWriteIndex = buffer.writerIndex();

  20.    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

  21.    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

  22.    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

  23.    if (req.isEvent()) {

  24.        // 心跳事件

  25.        encodeEventData(channel, out, req.getData());

  26.    } else {

  27.        // 正常请求

  28.        encodeRequestData(channel, out, req.getData(), req.getVersion());

  29.    }

  30.    out.flushBuffer();

  31.    if (out instanceof Cleanable) {

  32.        ((Cleanable) out).cleanup();

  33.    }

  34.    bos.flush();

  35.    bos.close();

  36.    int len = bos.writtenBytes();

  37.    // 检查请求大小是否超过限制,最大8M

  38.    checkPayload(channel, len);

  39.    Bytes.int2bytes(len, header, 12);


  40.    // write

  41.    buffer.writerIndex(savedWriteIndex);

  42.    buffer.writeBytes(header); // write header.

  43.    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);

  44. }

复制

解读源码发现,对请求消息进行编码时,没有第4个字节的状态位,同时官网中关于dubbo协议头的结构对于第3字节的描述其实反了,正确的dubbo协议头格式如下:

在对请求进行编码时,会把版本号、服务路径、方法名、方法参数等信息都进行编码,其中参数类型是用JVM中的类型表示方法来编码的。

  1. @Override

  2. protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {

  3.    RpcInvocation inv = (RpcInvocation) data;


  4.    out.writeUTF(version);

  5.    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));

  6.    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));


  7.    out.writeUTF(inv.getMethodName());

  8.    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));

  9.    Object[] args = inv.getArguments();

  10.    if (args != null) {

  11.        for (int i = 0; i < args.length; i++) {

  12.            out.writeObject(encodeInvocationArgument(channel, inv, i));

  13.        }

  14.    }

  15.    out.writeObject(RpcUtils.getNecessaryAttachments(inv));

  16. }

复制
  1. public static String getDesc(Class<?> c) {

  2.    StringBuilder ret = new StringBuilder();


  3.    while (c.isArray()) {

  4.        ret.append('[');

  5.        c = c.getComponentType();

  6.    }


  7.    if (c.isPrimitive()) {

  8.        String t = c.getName();

  9.        if ("void".equals(t)) {

  10.            ret.append(JVM_VOID);

  11.        } else if ("boolean".equals(t)) {

  12.            ret.append(JVM_BOOLEAN);

  13.        } else if ("byte".equals(t)) {

  14.            ret.append(JVM_BYTE);

  15.        } else if ("char".equals(t)) {

  16.            ret.append(JVM_CHAR);

  17.        } else if ("double".equals(t)) {

  18.            ret.append(JVM_DOUBLE);

  19.        } else if ("float".equals(t)) {

  20.            ret.append(JVM_FLOAT);

  21.        } else if ("int".equals(t)) {

  22.            ret.append(JVM_INT);

  23.        } else if ("long".equals(t)) {

  24.            ret.append(JVM_LONG);

  25.        } else if ("short".equals(t)) {

  26.            ret.append(JVM_SHORT);

  27.        }

  28.    } else {

  29.        ret.append('L');

  30.        ret.append(c.getName().replace('.', '/'));

  31.        ret.append(';');

  32.    }

  33.    return ret.toString();

  34. }

复制
4.2 服务端响应编码

服务端对响应进行编码的逻辑与客户端类似,具体见代码注释:

  1. protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {

  2.    int savedWriteIndex = buffer.writerIndex();

  3.    try {

  4.        // 获取序列化协议、初始化等等

  5.        Serialization serialization = getSerialization(channel);

  6.        byte[] header = new byte[HEADER_LENGTH];

  7.        Bytes.short2bytes(MAGIC, header);

  8.        header[2] = serialization.getContentTypeId();

  9.        if (res.isHeartbeat()) {

  10.            header[2] |= FLAG_EVENT;

  11.        }

  12.        // 对响应进行编码,会有第3字节的响应状态位

  13.        byte status = res.getStatus();

  14.        header[3] = status;

  15.        Bytes.long2bytes(res.getId(), header, 4);


  16.        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

  17.        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

  18.        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

  19.        // encode response data or error message.

  20.        if (status == Response.OK) {

  21.            if (res.isHeartbeat()) {

  22.                encodeHeartbeatData(channel, out, res.getResult());

  23.            } else {

  24.                encodeResponseData(channel, out, res.getResult(), res.getVersion());

  25.            }

  26.        } else {

  27.            out.writeUTF(res.getErrorMessage());

  28.        }

  29.        ...

  30. }

复制
  1. @Override

  2. protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {

  3.    Result result = (Result) data;

  4.    // 2.0.2之后版本的响应中,才会存在附加属性,这里会根据版本号来判断是否需要将附加属性进行编码

  5.    boolean attach = Version.isSupportResponseAttatchment(version);

  6.    Throwable th = result.getException();

  7.    if (th == null) {

  8.        Object ret = result.getValue();

  9.        if (ret == null) {

  10.            out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);

  11.        } else {

  12.            out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);

  13.            out.writeObject(ret);

  14.        }

  15.    } else {

  16.        out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);

  17.        out.writeObject(th);

  18.    }


  19.    if (attach) {

  20.        // returns current version of Response to consumer side.

  21.        result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());

  22.        out.writeObject(result.getAttachments());

  23.    }

  24. }

复制
5.解码源码解读

解码逻辑从DubboCountCodec#decode方法开始,最终会调到ExchangeCodec#decode方法,从这里一步步往下看。

  1. @Override

  2. protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {

  3.    // 检查魔数,如果不是dubbo协议,则使用父类的解码方法

  4.    if (readable > 0 && header[0] != MAGIC_HIGH

  5.            || readable > 1 && header[1] != MAGIC_LOW) {

  6.        int length = header.length;

  7.        if (header.length < readable) {

  8.            header = Bytes.copyOf(header, readable);

  9.            buffer.readBytes(header, length, readable - length);

  10.        }

  11.        for (int i = 1; i < header.length - 1; i++) {

  12.            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {

  13.                buffer.readerIndex(buffer.readerIndex() - header.length + i);

  14.                header = Bytes.copyOf(header, i);

  15.                break;

  16.            }

  17.        }

  18.        return super.decode(channel, buffer, readable, header);

  19.    }

  20.    // 长度不够,放弃本次解码,等待读取更多字节

  21.    if (readable < HEADER_LENGTH) {

  22.        return DecodeResult.NEED_MORE_INPUT;

  23.    }


  24.    // 读取具体编码数据

  25.    int len = Bytes.bytes2int(header, 12);

  26.    checkPayload(channel, len);


  27.    // 长度不够,放弃本次解码,等待读取更多字节

  28.    int tt = len + HEADER_LENGTH;

  29.    if (readable < tt) {

  30.        return DecodeResult.NEED_MORE_INPUT;

  31.    }


  32.    // limit input stream.

  33.    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);


  34.    try {

  35.        return decodeBody(channel, is, header);

  36.    } finally {

  37.    ...

  38.    }

  39. }

复制

decodeBody分为两部分,客户端对响应的解码和服务端队请求的解码,具体是请求还是响应就是根据协议头中的请求响应标志位来区分的,源码如下:

  1. @Override

  2. protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {

  3.    // 读取消息标记为和协议类型

  4.    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);

  5.    // 读取请求id

  6.    long id = Bytes.bytes2long(header, 4);

  7.    if ((flag & FLAG_REQUEST) == 0) {

  8.    // 客户端对响应进行解码

  9.    }else {

  10.    // 服务端对请求进行解码

  11.    }

  12. }

复制
5.1 客户端响应解码
  1. // 客户端对响应进行解码

  2. Response res = new Response(id);

  3. if ((flag & FLAG_EVENT) != 0) {

  4.    res.setEvent(Response.HEARTBEAT_EVENT);

  5. }

  6. // 读取响应状态

  7. byte status = header[3];

  8. res.setStatus(status);

  9. try {

  10.    // 获取序列化协议,对输入数据进行解码,这里会根据序列化协议id和名称来校验序列化协议是否合法

  11.    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);

  12.    if (status == Response.OK) {

  13.        Object data;

  14.        if (res.isHeartbeat()) {

  15.            data = decodeHeartbeatData(channel, in);

  16.        } else if (res.isEvent()) {

  17.            data = decodeEventData(channel, in);

  18.        } else {

  19.            // 根据decode.in.io的配置决定何时进行解码

  20.            DecodeableRpcResult result;

  21.            if (channel.getUrl().getParameter(

  22.                    Constants.DECODE_IN_IO_THREAD_KEY,

  23.                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {

  24.                result = new DecodeableRpcResult(channel, res, is,

  25.                        (Invocation) getRequestData(id), proto);

  26.                result.decode();

  27.            } else {

  28.                result = new DecodeableRpcResult(channel, res,

  29.                        new UnsafeByteArrayInputStream(readMessageData(is)),

  30.                        (Invocation) getRequestData(id), proto);

  31.            }

  32.            data = result;

  33.        }

  34.    ...

  35. }

  36. return res;

复制
5.2 服务端请求解码

解码过程与客户端对响应的解码基本一样,这里就不再赘述。


文章转载自shysheng,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论