在dubbo的一次请求、响应过程中,客户端和服务端之间的数据交换都涉及对数据的编解码工作,而这都是通过codec2接口来处理的,默认使用dubbo协议。本文将主要回答以下3个问题: 1.dubbo协议头格式 2.dubbo采用异步通信方式,那请求和响应是怎么关联的 3.编解码过程
1.Codec2接口
Codec2接口比较简单,除了负责编解码的2个方法之外,只有一个枚举DecodeResult用来对解码结果进行标识。
NEEDMOREINPUT:收到的字节流不是一个完整数据包,需要等待更多数据到达。
SKIPSOMEINPUT:忽略掉一部分数据包。
@SPI
public interface Codec2 {
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
enum DecodeResult {
NEED_MORE_INPUT, SKIP_SOME_INPUT
}
}
复制
2. ExchangeCodec核心属性
dubbo中编解码入口在DubboCountCodec中,最终都会调到ExchangeCodec,其核心属性简单介绍如下:
// 协议头长度,16个字节
protected static final int HEADER_LENGTH = 16;
// 魔数,用来判断是否是一个dubbo协议包
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// 消息标记位
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
// 5位序列化掩码,因此dubbo最多支持32种序列化协议
protected static final int SERIALIZATION_MASK = 0x1f;
复制
3.dubbo协议头格式
dubbo官网使用下面这个图对dubbo协议头格式进行了介绍:
可以看到,dubbo协议头一共包含16个字节:
2字节的魔数
5位的序列化id
1位事件标识
1位单双向标识
1位请求响应标识
1个字节状态位
8字节请求id
4字节的消息体长度
4.编码源码解读
从DubboCountCodec#encode方法开始,最终会调到ExchangeCodec#encodeRequest方法,我们就从这里开始看。
4.1 客户端请求编码
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
// 获取序列化协议,默认为hessian2
Serialization serialization = getSerialization(channel);
// 协议头总长度为16字节
byte[] header = new byte[HEADER_LENGTH];
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// 设置消息标记位,总长度为1个字节,前3位分别表示请求响应标识、单双向标识和事件标识,后5位标识序列化id
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 设置请求id
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 心跳事件
encodeEventData(channel, out, req.getData());
} else {
// 正常请求
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
// 检查请求大小是否超过限制,最大8M
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
复制
解读源码发现,对请求消息进行编码时,没有第4个字节的状态位,同时官网中关于dubbo协议头的结构对于第3字节的描述其实反了,正确的dubbo协议头格式如下:
在对请求进行编码时,会把版本号、服务路径、方法名、方法参数等信息都进行编码,其中参数类型是用JVM中的类型表示方法来编码的。
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null) {
for (int i = 0; i < args.length; i++) {
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
}
out.writeObject(RpcUtils.getNecessaryAttachments(inv));
}
复制
public static String getDesc(Class<?> c) {
StringBuilder ret = new StringBuilder();
while (c.isArray()) {
ret.append('[');
c = c.getComponentType();
}
if (c.isPrimitive()) {
String t = c.getName();
if ("void".equals(t)) {
ret.append(JVM_VOID);
} else if ("boolean".equals(t)) {
ret.append(JVM_BOOLEAN);
} else if ("byte".equals(t)) {
ret.append(JVM_BYTE);
} else if ("char".equals(t)) {
ret.append(JVM_CHAR);
} else if ("double".equals(t)) {
ret.append(JVM_DOUBLE);
} else if ("float".equals(t)) {
ret.append(JVM_FLOAT);
} else if ("int".equals(t)) {
ret.append(JVM_INT);
} else if ("long".equals(t)) {
ret.append(JVM_LONG);
} else if ("short".equals(t)) {
ret.append(JVM_SHORT);
}
} else {
ret.append('L');
ret.append(c.getName().replace('.', '/'));
ret.append(';');
}
return ret.toString();
}
复制
4.2 服务端响应编码
服务端对响应进行编码的逻辑与客户端类似,具体见代码注释:
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
// 获取序列化协议、初始化等等
Serialization serialization = getSerialization(channel);
byte[] header = new byte[HEADER_LENGTH];
Bytes.short2bytes(MAGIC, header);
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) {
header[2] |= FLAG_EVENT;
}
// 对响应进行编码,会有第3字节的响应状态位
byte status = res.getStatus();
header[3] = status;
Bytes.long2bytes(res.getId(), header, 4);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
out.writeUTF(res.getErrorMessage());
}
...
}
复制
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
Result result = (Result) data;
// 2.0.2之后版本的响应中,才会存在附加属性,这里会根据版本号来判断是否需要将附加属性进行编码
boolean attach = Version.isSupportResponseAttatchment(version);
Throwable th = result.getException();
if (th == null) {
Object ret = result.getValue();
if (ret == null) {
out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
} else {
out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
out.writeObject(ret);
}
} else {
out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
out.writeObject(th);
}
if (attach) {
// returns current version of Response to consumer side.
result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
out.writeObject(result.getAttachments());
}
}
复制
5.解码源码解读
解码逻辑从DubboCountCodec#decode方法开始,最终会调到ExchangeCodec#decode方法,从这里一步步往下看。
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 检查魔数,如果不是dubbo协议,则使用父类的解码方法
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// 长度不够,放弃本次解码,等待读取更多字节
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 读取具体编码数据
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 长度不够,放弃本次解码,等待读取更多字节
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);
} finally {
...
}
}
复制
decodeBody分为两部分,客户端对响应的解码和服务端队请求的解码,具体是请求还是响应就是根据协议头中的请求响应标志位来区分的,源码如下:
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 读取消息标记为和协议类型
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 读取请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 客户端对响应进行解码
}else {
// 服务端对请求进行解码
}
}
复制
5.1 客户端响应解码
// 客户端对响应进行解码
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 读取响应状态
byte status = header[3];
res.setStatus(status);
try {
// 获取序列化协议,对输入数据进行解码,这里会根据序列化协议id和名称来校验序列化协议是否合法
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
} else {
// 根据decode.in.io的配置决定何时进行解码
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
...
}
return res;
复制
5.2 服务端请求解码
解码过程与客户端对响应的解码基本一样,这里就不再赘述。