注意:以下分析都是基于Dubbo2.7.2
报文结构
1
2
3
4
+-----------+----------+------------+--------+------------+---------+
| MagicH(2B)| FLAG(1B) | STATUS(1B) | ID(8B) | Length(4B) | Payload |
| 0xDABB | 0x80 | | | 0x00000005 | "HELLO" |
+-----------+----------+------------+--------+------------+---------+
Dubbo报文头包含:MagicNumber(占2个字节),Flag(占1个字节),Status(占1个字节),ID(占8个字节),报文体(占4个字节),总共16个字节。
- Flag (高3位用来存储Flag类型,低5位用来存储序列化协议)
- FLAG_REQUEST (0x80)
- FLAG_TWOWAY (0x40)
- FLAG_EVENT (0x20)
- SERIALIZATION_MASK (0x1f)
- Status
- OK (20)
- CLIENT_TIMEOUT (30)
- SERVER_TIMEOUT (31)
- CHANNEL_INACTIVE (35)
- BAD_REQUEST (40)
- BAD_RESPONSE (50)
- SERVICE_NOT_FOUND (60)
- SERVICE_ERROR (70)
- SERVER_ERROR (80)
- CLIENT_ERROR (90)
- SERVER_THREADPOOL_EXHAUSTED_ERROR (100)
注意
- payload长度不能超过8M(默认,可配置),超出则抛出异常
ExceedPayloadLimitException
报文编码解码器
默认情况下,Dubbo会采用DubboCountCodec
作为报文编解码器。
报文解码时序图如下:
报文的编解码器类图如下:
处理粘包半包过程
我们都知道在处理TCP请求中,都会碰到半包和粘包的情况。由于对端如果是没有关闭Nagle算法优化,那很有可能在发送的时候几个数据报文就是合并在一起的;亦或是我端在接收请求不及时,也会造成缓冲区堆积多个数据报文,这些原因都有可能造成粘包问题。所以我们来看看Dubbo是怎么处理粘包问题的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package org.apache.dubbo.rpc.protocol.dubbo;
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
...
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
//循环解析缓冲区的报文,因为有可能有粘包
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
//读到一个半包,把缓冲区的读位置设置成上一次的完整包读位置,并且返回
buffer.readerIndex(save);
break;
} else {
//读到一个完整包
result.addMessage(obj);
//记录报文长度
logMessageLength(obj, buffer.readerIndex() - save);
//记录上一次的完整包读位置
save = buffer.readerIndex();
}
} while (true);
...
return result;
}
...
}
判断半包过程
半包无非以下两种情况:
- 报文头不全
- 报文体不全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.apache.dubbo.remoting.exchange.codec;
public class ExchangeCodec extends TelnetCodec {
...
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
//做这个判断是因为0xDABB是两个字节,有一种可能是缓冲区只有一个可读字节是0xDA,这时候不能把它当做其他的数据报文处理
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
...
//如果不是魔数开头,有可能是TELNET数据报文
return super.decode(channel, buffer, readable, header);
}
// 开始解析Dubbo报文
// 如果剩余可读长度小于包头长度,则判断为半包
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 如果剩余可读长度小于包头长度+包体长度,则判断为半包
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//解析一个完整包
return decodeBody(channel, is, header);
} finally {
...
}
}
...
}
解析完整的Dubbo报文
根据对端发过来的序列化协议,我们可以把报文体正确的反序列化出来对应的消息。dubbo
目前支持很多序列化方式,比如(fastjson, fst, hessian2, jdk, kyro, protostuff, protobuf-json, avro, gson),其中后面3种是2.7.2中加入的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.apache.dubbo.rpc.protocol.dubbo;
public class DubboCodec extends ExchangeCodec {
...
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 报文ID
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 解码Response
Response res = new Response(id);
...
try {
// 反序列报文体(payload)。(默认采用hessian2)
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
//心跳的解析
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
//事件的解析
data = decodeEventData(channel, in);
} else {
//RPC调用的解析
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
//直接在IO线程解析
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
//解析包括:dubbo版本号,接口版本号,接口名,接口方法名等等
result.decode();
} else {
//在业务线程解析
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
...
}
return res;
} else {
// 解码Request,步骤同上
Request req = new Request(id);
...
return req;
}
}
...
}