Dubbo系列 - 编解码

在分析请求编码逻辑之前,我们先来看一下 Dubbo 数据包结构。

Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。

偏移量(Bit) 字段 取值
0 ~ 7 魔数高位 0xda00
8 ~ 15 魔数低位 0xbb
16 数据包类型 0 - Response, 1 - Request
17 调用方式 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18 事件标识 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23 序列化器编号 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31 状态 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE……
32 ~ 95 请求编号 共8字节,运行时生成
96 ~ 127 消息体长度 运行时计算

还是从NettyServer开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});

可以看到编码和解码主要是调用NettyCodecAdapter里的decoderencoder

NettyCodecAdapter里有两个内部类InternalEncoderInternalDecoder,我们先从解码开始。

解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);
} catch (IOException e) {
throw e;
}
//如果返回NEED_MORE_INPUT,跳出循环并将指针复位,等待数据
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}

InternalDecoder通过调codec.decode来进行解码,这里的codecDubboCountCodec实例。再看看DubboCountCodecdecode方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
//如果返回NEED_MORE_INPUT,跳出循环并将指针复位,等待数据
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
//记录消息长度
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}

private void logMessageLength(Object result, int bytes) {
if (bytes <= 0) {
return;
}
if (result instanceof Request) {
try {
((RpcInvocation) ((Request) result).getData()).setAttachment(
Constants.INPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignore */
}
} else if (result instanceof Response) {
try {
((RpcResult) ((Response) result).getResult()).setAttachment(
Constants.OUTPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignore */
}
}
}

DubboCountCodec调用ExchangeCodec.decode来进行解码,并处理tcp粘包拆包的情况。下面看ExchangeCodec.decode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
public class ExchangeCodec extends TelnetCodec {
// header length.
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag.
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
//读取16个字节的头,也就是128位的长度
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
//先检查魔数,如果头两个字节不是魔数且后面也找不到魔数,这调用父类TelnetCodec的decode方法。
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
//将所有数据读到header中
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
//一直往后去找魔数,如果找到的话,将指针指回到魔数开始的位置???
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通过 telnet 命令行发送的数据包不包含消息头,所以这里
// 调用 TelnetCodec 的 decode 方法对数据包进行解码
return super.decode(channel, buffer, readable, header);
}
// 如果可读的数据少于头的长度,则返回NEED_MORE_INPUT继续等待数据。
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

//从12个字节开始读取4个字节,最为body的长度。即96位到128位
int len = Bytes.bytes2int(header, 12);
//检查body长度是否过大,默认8M
checkPayload(channel, len);
//如果总长度大于可读的长度,返回NEED_MORE_INPUT
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
//读取第3个字节,即16~23位。proto为19 ~ 23的序列化器编号
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 32 ~ 95 读取ID
long id = Bytes.bytes2long(header, 4);
//读取第16位数据包类型
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
} else {
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
//18位事件标记,0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
//反序列化得到ObjectInput
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
} else {
data = decodeRequestData(channel, in);
}
req.setData(data);
} catch (Throwable t) {
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}

@Deprecated
protected Object decodeHeartbeatData(Channel channel, ObjectInput in) throws IOException {
try {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read object failed.", e));
}
}
protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException {
try {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read object failed.", e));
}
}
protected Object decodeRequestData(ObjectInput in) throws IOException {
try {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read object failed.", e));
}
}
protected Object decodeResponseData(ObjectInput in) throws IOException {
try {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read object failed.", e));
}
}

所有的decode方法都是调用ObjectInput.readObject。先看下CodecSupport.deserialize的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CodecSupport {
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
for (String name : supportedExtensions) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
byte idByte = serialization.getContentTypeId();
if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
logger.error("Serialization extension " + serialization.getClass().getName()
+ " has duplicate id to Serialization extension "
+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
+ ", ignore this Serialization extension");
continue;
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
ID_SERIALIZATIONNAME_MAP.put(idByte, name);
}
}

public static Serialization getSerializationById(Byte id) {
return ID_SERIALIZATION_MAP.get(id);
}

public static Serialization getSerialization(URL url, Byte id) throws IOException {
Serialization serialization = getSerializationById(id);
String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
// Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
if (serialization == null
|| ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
}
return serialization;
}

public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}

CodecSupport主要是根据proto根据SPI找到对应的Serialization,然后调用deserialize方法返回ObjectInput对象。Serialization默认使用hessian2Serialization不在深入分析。

编码

NettyCodecAdapter的内部类InternalEncoder开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private class InternalEncoder extends MessageToByteEncoder {

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
}
}

InternalEncoder调用DubboCountCodec.encode

1
2
3
4
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
codec.encode(channel, buffer, msg);
}

DubboCountCodec调用DubboCodec的父类ExchangeCodecencode方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
//获取序列化对象
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);

// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;

// set request id.
Bytes.long2bytes(req.getId(), header, 4);

// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);

// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

编码过程其实是解码的逆向过程,而且没有了粘包拆包的处理,代码更为简单。encodeResponse方法的代码和encodeRequest没有太大差别,就不细写了。
Dubbo编解码过程讲解结束。