RocketMQ的编解码技术细节
首页 2021欧洲杯买球app手机版 2021欧洲杯买球app首页
  • 首页
  • 2021欧洲杯买球app手机版
  • 2021欧洲杯买球app首页
  • RocketMQ的编解码技术细节
    发布者:admin浏览次数:

     

    本文转载自微信公众号「漫漫技术路」,作者刘莅。转载本文请联系漫漫技术路公众号。

    从上一篇文章中,我们了解的RocketMQ不同组件之间,数据是如何通过网络传输的,总结以下几点

    RocketMQ的网络传输模块,在remoting子模块下,入口可以参考RemotingNettyServer和RemotingNettyClient两个类。 RocketMQ是依靠Netty,与各个组件进行数据传输。 RocketMQ序列化、反序列化有两种方式:一种是将数据通过FastJSON将数据转化成JSON字符串,然后转化成byte[]数组进行编解码。另一种方式是RocketMQ定义了一套自己的编解码,将每个字段分别进行编解码。编码不论采用那种方式,都最终都编码为byte[]。

    今天,我们分析RocketMQ的编解码细节。在本篇文章中,可以学到:

    RocketMQ网络协议。 RocketMQ在传输数据时,内存的分配。 RocketMQ编解码细节。 编码流程

    首先,我们编码器org.apache.rocketmq.remoting.netty.NettyEncoder入手

    @ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);      @Override     public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)         throws Exception {         try {             ByteBuffer header = remotingCommand.encodeHeader();             out.writeBytes(header);             byte[] body = remotingCommand.getBody();             if (body != null) {                 out.writeBytes(body);             }         } catch (Exception e) {             log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);             if (remotingCommand != null) {                 log.error(remotingCommand.toString());             }             RemotingUtil.closeChannel(ctx.channel());         }     } } 

    从上面的代码,是RocketMQ将RemotingCommand对象编码成ByteBuf的唯一入口。

    我们可以看到,先将header部分编码成ByteBuf,然后将body部分追加到ByteBuf里。

    body部分编码很容易理解,那么header部分是怎么编码的呢?

    public ByteBuffer encodeHeader() {     return encodeHeader(this.body != null ? this.body.length : 0); }  public ByteBuffer encodeHeader(final int bodyLength) {     // 1> header length size     int length = 4;      // 2> header data length     byte[] headerData;     headerData = this.headerEncode();      length += headerData.length;      // 3> body data length     length += bodyLength;      ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);      // length     result.putInt(length);      // header length     result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));      // header data     result.put(headerData);      result.flip();      return result; } 

    我们一步一步看,逻辑如下

    定义变量length,其初始值是4,经过几步操作,其值是4+header.length+body.length。 调用headerEnocde()方法编码header部分。 创建ByteBuffer并申请4+length-body.length大小的内存,其实就是4+4+header.length大小的内存。 将数据写入ByteBuffer,完成header部分的编码。

    我们画一张图来表示当前内存都存的什么数据。

    我们可以很清楚的看到,每部分数据,分别存储在ButyBuf的什么位置,这里需要特别强调的是,byte[4]部分存储的什么,通过源码,进一步分析。

    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));   public static byte[] markProtocolType(int source, SerializeType type) {     byte[] result = new byte[4];       result[0] = type.getCode();     result[1] = (byte) ((source >> 16) & 0xFF);     result[2] = (byte) ((source >> 8) & 0xFF);     result[3] = (byte) (source & 0xFF);     return result; }   public enum SerializeType {     JSON((byte) 0),     ROCKETMQ((byte) 1); } 

    markProtocolType方法,第一个参数source表示header部分的长度,第二个参数type是编码类型的枚举,返回值是byte[]。

    那么markProtocolType方法是做什么的呢?我们将type字段传入JSON和ROCKETMQ这两个枚举值,分别看一下,返回的是什么。

    返回值共四个字节,只有第一个字节不同,第四个字节是header部分的长度,前文已经提到过,RocketMQ对header部分,可以采用两种编解码方式。

    对!!没错,第一个字节就是标识编解码类型的。

    解码流程

    接下来我们来看解码,解码与编码稍有不同,解码器继承Netty提供的LengthFieldBasedFrameDecoder解码器,我们来看org.apache.rocketmq.remoting.netty.NettyDecoder的源码。

    public class NettyDecoder extends LengthFieldBasedFrameDecoder {     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);      private static final int FRAME_MAX_LENGTH =         Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));      public NettyDecoder() {         super(FRAME_MAX_LENGTH, 0, 4, 0, 4);     }      @Override     public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {         ByteBuf frame = null;         try {             frame = (ByteBuf) super.decode(ctx, in);             if (null == frame) {                 return null;             }              ByteBuffer byteBuffer = frame.nioBuffer();              return RemotingCommand.decode(byteBuffer);         } catch (Exception e) {             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);             RemotingUtil.closeChannel(ctx.channel());         } finally {             if (null != frame) {                 frame.release();             }         }          return null;     } } 

    其步骤如下:

    调用LengthFieldBasedFrameDecoder的decode方法,初次解码。 调用RemotingCommand.decode()方法,完成对header、body部分的解码,并转化为RemotingCommand对象。

    为什么要经过两次解码?

    熟悉LengthFieldBasedFrameDecoder解码器的朋友都知道,LengthFieldBasedFrameDecoder解码器是Netty提供的一种非常灵活的解码器。

    它在RocketMQ的NettyDecoder类中是这样被构造的。

    public NettyDecoder() {     super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } 

    LengthFieldBasedFrameDecoder我在这里不详细解释,按上面的构造方法,意思是跳过开头前4个字节。

    构造出来LengthFieldBasedFrameDecoder后对RocketMQ协议进行初次解码,解码结果如下:

    我们可以看到,把前四个字节,也就是把存储length字段的那部分内存截去了,只剩byte[]+header+body部分。

    我们再来看RemotingCommand的解码逻辑

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {     int length = byteBuffer.limit();     int oriHeaderLen = byteBuffer.getInt();     int headerLength = getHeaderLength(oriHeaderLen);      byte[] headerData = new byte[headerLength];     byteBuffer.get(headerData);      RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));      int bodyLength = length - 4 - headerLength;     byte[] bodyData = null;     if (bodyLength > 0) {         bodyData = new byte[bodyLength];         byteBuffer.get(bodyData);     }     cmd.body = bodyData;      return cmd; }  private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {     switch (type) {         case JSON:             RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);             resultJson.setSerializeTypeCurrentRPC(type);             return resultJson;         case ROCKETMQ:             RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);             resultRMQ.setSerializeTypeCurrentRPC(type);             return resultRMQ;         default:             break;     }     return null; } 

    只剩byte[4]+header+body三个部分,解码逻辑便很清晰。可以分为以下几个步骤

    求出数据的长度、header的长度。 根据编码类型,解码。比如通过JSON方式编码,则通过JSON方式解码。 算出body的长度,解码body。 将最终解码生成的RemotingCommand对象,发送给pipeline的下一个handler处理。

    【编辑推荐】

    鸿蒙官方战略合作共建——HarmonyOS技术社区 重磅 |《数据安全法》正式颁布,要求对数据实行分类分级保护 数据的“敏捷制造”,DataWorks一站式数据开发治理范式演进 网络安全攻防:大数据安全问题 《数据安全法》正式公布!如何做好风险管理,保护企业敏感数据? 20款优秀的数据可视化工具 (建议收藏)