My Profile Photo

庄津津的技术博客


众里寻他千百度,蓦然回首,那人却在灯火阑珊处


Dubbo报文处理

注意:以下分析都是基于Dubbo2.7.2

报文结构

1
2
3
4
  +-----------+----------+------------+--------+------------+---------+      
  | MagicH(2B)| FLAG(1B) | STATUS(1B) | ID(8B) | Length(4B) | Payload |
  | 0xDABB    | 0x80     |            |        | 0x00000005 | "HELLO" |
  +-----------+----------+------------+--------+------------+---------+

Dubbo报文头包含:MagicNumber(占2个字节),Flag(占1个字节),Status(占1个字节),ID(占8个字节),报文体(占4个字节),总共16个字节。

  • Flag (高3位用来存储Flag类型,低5位用来存储序列化协议)
    • FLAG_REQUEST (0x80)
    • FLAG_TWOWAY (0x40)
    • FLAG_EVENT (0x20)
    • SERIALIZATION_MASK (0x1f)
  • Status
    • OK (20)
    • CLIENT_TIMEOUT (30)
    • SERVER_TIMEOUT (31)
    • CHANNEL_INACTIVE (35)
    • BAD_REQUEST (40)
    • BAD_RESPONSE (50)
    • SERVICE_NOT_FOUND (60)
    • SERVICE_ERROR (70)
    • SERVER_ERROR (80)
    • CLIENT_ERROR (90)
    • SERVER_THREADPOOL_EXHAUSTED_ERROR (100)

注意

  • payload长度不能超过8M(默认,可配置),超出则抛出异常ExceedPayloadLimitException

报文编码解码器

默认情况下,Dubbo会采用DubboCountCodec作为报文编解码器。

报文解码时序图如下:

解码时序图

报文的编解码器类图如下:

DubboCountCodec类图

处理粘包半包过程

我们都知道在处理TCP请求中,都会碰到半包和粘包的情况。由于对端如果是没有关闭Nagle算法优化,那很有可能在发送的时候几个数据报文就是合并在一起的;亦或是我端在接收请求不及时,也会造成缓冲区堆积多个数据报文,这些原因都有可能造成粘包问题。所以我们来看看Dubbo是怎么处理粘包问题的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package org.apache.dubbo.rpc.protocol.dubbo;

public final class DubboCountCodec implements Codec2 {
    private DubboCodec codec = new DubboCodec();
    ...
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            //循环解析缓冲区的报文,因为有可能有粘包
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                //读到一个半包,把缓冲区的读位置设置成上一次的完整包读位置,并且返回
                buffer.readerIndex(save); 
                break;
            } else {
                //读到一个完整包
                result.addMessage(obj);
                //记录报文长度
                logMessageLength(obj, buffer.readerIndex() - save);
                //记录上一次的完整包读位置
                save = buffer.readerIndex();
            }
        } while (true);
        ...
        return result;
    }
    ...
}

判断半包过程

半包无非以下两种情况:

  • 报文头不全
  • 报文体不全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.apache.dubbo.remoting.exchange.codec;

public class ExchangeCodec extends TelnetCodec {
    ...
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        //做这个判断是因为0xDABB是两个字节,有一种可能是缓冲区只有一个可读字节是0xDA,这时候不能把它当做其他的数据报文处理
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            ...
            //如果不是魔数开头,有可能是TELNET数据报文
            return super.decode(channel, buffer, readable, header);
        }
        // 开始解析Dubbo报文
        // 如果剩余可读长度小于包头长度,则判断为半包
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        // 如果剩余可读长度小于包头长度+包体长度,则判断为半包
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            //解析一个完整包
            return decodeBody(channel, is, header);
        } finally {
            ...
        }
    }
    ...
}

解析完整的Dubbo报文

根据对端发过来的序列化协议,我们可以把报文体正确的反序列化出来对应的消息。dubbo目前支持很多序列化方式,比如(fastjson, fst, hessian2, jdk, kyro, protostuff, protobuf-json, avro, gson),其中后面3种是2.7.2中加入的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.apache.dubbo.rpc.protocol.dubbo;

public class DubboCodec extends ExchangeCodec {
    ...
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // 报文ID
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // 解码Response
            Response res = new Response(id);
            ...
            try {
                // 反序列报文体(payload)。(默认采用hessian2)
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                if (status == Response.OK) {
                    Object data;
                    if (res.isHeartbeat()) {
                        //心跳的解析
                        data = decodeHeartbeatData(channel, in);
                    } else if (res.isEvent()) {
                        //事件的解析
                        data = decodeEventData(channel, in);
                    } else {
                        //RPC调用的解析
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            //直接在IO线程解析
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            //解析包括:dubbo版本号,接口版本号,接口名,接口方法名等等
                            result.decode();
                        } else {
                            //在业务线程解析
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                ...
            }
            return res;
        } else {
            // 解码Request,步骤同上
            Request req = new Request(id);
            ...
            return req;
        }
    }
    ...
}