Dubbo系列 - 编解码 在分析请求编码逻辑之前,我们先来看一下 Dubbo 数据包结构。
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。
偏移量(Bit)
字段
取值
0 ~ 7
魔数高位
0xda00
8 ~ 15
魔数低位
0xbb
16
数据包类型
0 - Response, 1 - Request
17
调用方式
仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18
事件标识
0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23
序列化器编号
2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31
状态
20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE……
32 ~ 95
请求编号
共8字节,运行时生成
96 ~ 127
消息体长度
运行时计算
还是从NettyServer
开始:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("handler" , nettyServerHandler); } });
可以看到编码和解码主要是调用NettyCodecAdapter
里的decoder
和encoder
。
NettyCodecAdapter
里有两个内部类InternalEncoder
和InternalDecoder
,我们先从解码开始。
解码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break ; } else { if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data." ); } if (msg != null ) { out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }
InternalDecoder
通过调codec.decode
来进行解码,这里的codec
是DubboCountCodec
实例。再看看DubboCountCodec
的decode
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break ; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true ); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1 ) { return result.get(0 ); } return result; } private void logMessageLength (Object result, int bytes) { if (bytes <= 0 ) { return ; } if (result instanceof Request) { try { ((RpcInvocation) ((Request) result).getData()).setAttachment( Constants.INPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { } } else if (result instanceof Response) { try { ((RpcResult) ((Response) result).getResult()).setAttachment( Constants.OUTPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { } } }
DubboCountCodec
调用ExchangeCodec.decode
来进行解码,并处理tcp粘包拆包的情况。下面看ExchangeCodec.decode
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 public class ExchangeCodec extends TelnetCodec { protected static final int HEADER_LENGTH = 16 ; protected static final short MAGIC = (short ) 0xdabb ; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0 ]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1 ]; protected static final byte FLAG_REQUEST = (byte ) 0x80 ; protected static final byte FLAG_TWOWAY = (byte ) 0x40 ; protected static final byte FLAG_EVENT = (byte ) 0x20 ; protected static final int SERIALIZATION_MASK = 0x1f ; @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0 ) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { Response res = new Response(id); if ((flag & FLAG_EVENT) != 0 ) { res.setEvent(Response.HEARTBEAT_EVENT); } byte status = header[3 ]; res.setStatus(status); try { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (status == Response.OK) { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeResponseData(channel, in, getRequestData(id)); } res.setResult(data); } else { res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(Request.HEARTBEAT_EVENT); } try { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeRequestData(channel, in); } req.setData(data); } catch (Throwable t) { req.setBroken(true ); req.setData(t); } return req; } } @Deprecated protected Object decodeHeartbeatData (Channel channel, ObjectInput in) throws IOException { try { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read object failed." , e)); } } protected Object decodeEventData (Channel channel, ObjectInput in) throws IOException { try { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read object failed." , e)); } } protected Object decodeRequestData (ObjectInput in) throws IOException { try { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read object failed." , e)); } } protected Object decodeResponseData (ObjectInput in) throws IOException { try { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read object failed." , e)); } }
所有的decode方法都是调用ObjectInput.readObject
。先看下CodecSupport.deserialize
的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class CodecSupport { private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>(); static { Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions(); for (String name : supportedExtensions) { Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name); byte idByte = serialization.getContentTypeId(); if (ID_SERIALIZATION_MAP.containsKey(idByte)) { logger.error("Serialization extension " + serialization.getClass().getName() + " has duplicate id to Serialization extension " + ID_SERIALIZATION_MAP.get(idByte).getClass().getName() + ", ignore this Serialization extension" ); continue ; } ID_SERIALIZATION_MAP.put(idByte, serialization); ID_SERIALIZATIONNAME_MAP.put(idByte, name); } } public static Serialization getSerializationById (Byte id) { return ID_SERIALIZATION_MAP.get(id); } public static Serialization getSerialization (URL url, Byte id) throws IOException { Serialization serialization = getSerializationById(id); String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); if (serialization == null || ((id == 3 || id == 7 || id == 4 ) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) { throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id." ); } return serialization; } public static ObjectInput deserialize (URL url, InputStream is, byte proto) throws IOException { Serialization s = getSerialization(url, proto); return s.deserialize(url, is); }
CodecSupport
主要是根据proto
根据SPI找到对应的Serialization
,然后调用deserialize
方法返回ObjectInput
对象。Serialization
默认使用hessian2
。Serialization
不在深入分析。
编码 从NettyCodecAdapter
的内部类InternalEncoder
开始。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } } }
InternalEncoder
调用DubboCountCodec.encode
。
1 2 3 4 @Override public void encode (Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); }
DubboCountCodec
调用DubboCodec
的父类ExchangeCodec
的encode
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public void encode (Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super .encode(channel, buffer, msg); } } protected void encodeRequest (Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = (byte ) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2 ] |= FLAG_TWOWAY; if (req.isEvent()) header[2 ] |= FLAG_EVENT; Bytes.long2bytes(req.getId(), header, 4 ); int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
编码过程其实是解码的逆向过程,而且没有了粘包拆包的处理,代码更为简单。encodeResponse
方法的代码和encodeRequest
没有太大差别,就不细写了。 Dubbo编解码过程讲解结束。