HttpServerCodec和HttpObjectAggregator如何解析Http协议

1、首先开始看HttpServerCodec

HttpServerCodec和HttpObjectAggregator如何解析Http协议

可以看到他继承了ChannelHandlerAppender,并且创建了一个HttpRequestDecode和一个HttpResponseEncoder。

Appender内部有一个Entry的list按顺序存放这两个编码解码器

ps:在initchannelhandler时,添加的是HttpServerCode,查看addLast方法代码,也没有发现将这两个编码解码器add在pipeline里面了,是不是在处理是有判断handler是否是一个appender,是的话依次调用list<entry>里的handler。

针对上面的问题,先留着,先看看httprequestencode的代码

 

2、HttpRequestDecode的处理

对于http的request请求,协议如下

HttpServerCodec和HttpObjectAggregator如何解析Http协议

对于基于tcp的协议,底层拿到的字节流需要按照上层的协议格式去解析字节流。

再来看一下HttpRequestDecode的类层次图

 

HttpServerCodec和HttpObjectAggregator如何解析Http协议

2.1 HttpRequestDecoder类

比较简单,就是调用父类的构造方法,然后增加了以下方法

HttpServerCodec和HttpObjectAggregator如何解析Http协议

2.2 HttpObjectDecoder类

比较复杂,看一下官方api的解释

public abstract class HttpObjectDecoder extends ByteToMessageDecoder

Decodes ByteBufs into HttpMessages and HttpContents.

将bytebuf编码成为HttpMessages和HttpContents对象

Parameters that prevents excessive memory consumption

防止过多内存消耗的参数

Name

Meaning

maxInitialLineLength

The maximum length of the initial line (e.g. "GET / HTTP/1.0" or "HTTP/1.0 200 OK") If the length of the initial line exceeds this value, a TooLongFrameException will be raised.

maxHeaderSize

The maximum length of all headers. If the sum of the length of each header exceeds this value, a TooLongFrameException will be raised.

maxChunkSize

The maximum length of the content or each chunk. If the content length (or the length of each chunk) exceeds this value, the content or chunk will be split into multiple HttpContents whose length is maxChunkSize at maximum.

maxInittialLineLength 初始行例如:"GET / HTTP/1.0" or "HTTP/1.0 200 OK"的最大长度

maxHeaderSize 所有headers的长度

maxChunkSize content或者一个chunk的最大长度

Chunked Content

If the content of an HTTP message is greater than maxChunkSize or the transfer encoding of the HTTP message is

如果一个http消息的content(内容)大于maxChunkSize或者chunked(http消息传输的编码)

'chunked', this decoder generates one HttpMessage instance and its following HttpContents per single HTTP

这个解码器生成一个HttpMessage 实例,在一个http消息里它可能伴随多个HttpContent,为了避免过多的内存消耗

message to avoid excessive memory consumption. For example, the following HTTP message:

 GET / HTTP/1.1  Transfer-Encoding: chunked   1a  abcdefghijklmnopqrstuvwxyz  10  1234567890abcdef  0  Content-MD5: ...  [blank line] 

triggers HttpRequestDecoder to generate 3 objects:

引发HttpRequestDecoder生成3个对象

  1. An HttpRequest,
  2. The first HttpContent whose content is 'abcdefghijklmnopqrstuvwxyz',
  3. The second LastHttpContent whose content is '1234567890abcdef', which marks the end of the content.

          标记内容的结束

If you prefer not to handle HttpContents by yourself for your convenience, insert HttpObjectAggregator after

为了方便,如果你不想处理HttpContents,你可以在这个解码器后将它插入 在同一个HttpObjectAggregator

this decoder in the ChannelPipeline. However, please note that your server might not be as memory efficient as

而且,请注意你的服务器的内存效率可能比不用aggregator低。

without the aggregator.

Extensibility

Please note that this decoder is designed to be extended to implement a protocol derived from HTTP, such

请注意,这个解码器是设计用来扩展基于Http协议的实现,例如。rtsp和icap。为了实现这些协议的decoder你可以继承这个类,并实现它的抽象方法。

as RTSP and ICAP. To implement the decoder of such a derived protocol, extend this class and implement all abstract methods properly.

通过对api的解读,实现Http协议的解析应该就是这个类做的,所以开始重点看这个类,其父类再看

protected void

decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out)

Decode the from one ByteBuf to an other.

protected void

decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

Is called one last time when the ChannelHandlerContext goes in-active.

看方法总结里,有decode和decodeLast,应该是解析ByteBuf的主要方法。

 

2.3 ReplayingDecoder类

 

下面是源码的注释

/**

 * A specialized variation of {@link ByteToMessageDecoder} which enables implementation

一种ByteToMessageDecoder的特别变化,能实现对于阻塞IO范例的非阻塞解码

 * of a non-blocking decoder in the blocking I/O paradigm.

 * <p>

 * The biggest difference between {@link ReplayingDecoder} and

 * {@link ByteToMessageDecoder} is that {@link ReplayingDecoder} allows you to

 * implement the {@code decode()} and {@code decodeLast()} methods just like

实现了decode和decodeLast方法,能让所需要的bytes都已经被接收到,而不是检查

 

* all required bytes were received already, rather than checking the

 * availability of the required bytes.  For example, the following

所需要的bytes是否可以获得

 * {@link ByteToMessageDecoder} implementation:

 * <pre>

 * public class IntegerHeaderFrameDecoder extends {@link ByteToMessageDecoder} {

 *

 *   {@code @Override}

 *   protected void decode({@link ChannelHandlerContext} ctx,

 *                           {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {

 *

// 可以读取的字节小于4 return

 *     if (in.readableBytes() &lt; 4) {

 *        return;

 *     }

 *

 *     in.markReaderIndex();

 *     int length = in.readInt();

 *

// 剩下的字节不小于length,重置readIndex,return

 *     if (in.readableBytes() &lt; length) {

 *        in.resetReaderIndex();

 *        return;

 *     }

 *

 *     out.add(in.readBytes(length));

 *   }

 * }

 * </pre>

 * is simplified like the following with {@link ReplayingDecoder}:

 上面的代码可以像下面一样实现

 * <pre>

 * public class IntegerHeaderFrameDecoder

 *      extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {

 *

 *   protected void decode({@link ChannelHandlerContext} ctx,

 *                           {@link ByteBuf} buf) throws Exception {

 *

 *     out.add(buf.readBytes(buf.readInt()));

 *   }

 * }

 * </pre>

 *

 * <h3>How does this work?</h3>

它是怎么做到的,它是什么设计的?

 * <p>

 * {@link ReplayingDecoder} passes a specialized {@link ByteBuf}

 * implementation which throws an {@link Error} of certain type when there's not

通过一种特殊的ByteBuf的实现,当buffer里的数据不够时抛出Error这种错误

 * enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,

 * you just assumed that there will be 4 or more bytes in the buffer when

在上面的IntegerHeaderFrameDecoder,你只是假设buffer里有4或者更多的字节

 * you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,

 * it will return the integer header as you expected.  Otherwise, the

时,你执行buf.readInt(),如果真有4bytes,它会return你期望的Integer

 * {@link Error} will be raised and the control will be returned to

 * {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the

不然,会抛出Error这错误,如果ReplayingDecoder补获这个Error,

 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer

 * back to the 'initial' position (i.e. the beginning of the buffer) and call

 * the {@code decode(..)} method again when more data is received into the

 * buffer.

它会设置readerIndex到initial(比如 buffer的开始位置)的值,并在获取更多数据后再次调用decode

 * <p>

 * Please note that {@link ReplayingDecoder} always throws the same cached

 * {@link Error} instance to avoid the overhead of creating a new {@link Error}

 * and filling its stack trace for every throw.

 *

请注意 ReplayingDecoder总是抛出相同的cached的实例去避免每次在栈顶压入一个新的Error

 * <h3>Limitations</h3>

 * <p>

 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few

 * limitations:

这种简化的代价,它强迫你有一些限制

 * <ul>

 * <li>Some buffer operations are prohibited.</li>

一些buffer操作被禁止

 * <li>Performance can be worse if the network is slow and the message

 *     format is complicated unlike the example above.  In this case, your

 *     decoder might have to decode the same part of the message over and over

 *     again.</li>

当你的网络很慢并且消息格式很复杂性能可能是很糟糕的,在这种情况下,你的解码器可能不得不一遍遍encode相同的消息part

 * <li>You must keep in mind that {@code decode(..)} method can be called many

 *     times to decode a single message.  For example, the following code will

 *     not work:

你必须记清楚,decode方法可能会被多次encode一个单独的message,下面的代码是不会有作用的

 * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {

 *

 *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();

 *

 *   {@code @Override}

 *   public void decode(.., {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {

 *

 *     // A message contains 2 integers.

 *     values.offer(buffer.readInt());

 *     values.offer(buffer.readInt());

 *

 *     // This assertion will fail intermittently since values.offer()

 *     // can be called more than two times!

 这个断言会间歇的失败,因为values.offer可能会被调用超过2次,因为失败抛错后并没有将values里读的数据移出去

 *     assert values.size() == 2;

 *     out.add(values.poll() + values.poll());

 *   }

 * }</pre>

 *      The correct implementation looks like the following, and you can also

 *      utilize the 'checkpoint' feature which is explained in detail in the

 *      next section.

   正确的实现方式如下:你可以使用ckeckpoint这个特性,它将在下一节详细解释

 * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {

 *

 *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();

 *

 *   {@code @Override}

 *   public void decode(.., {@link ByteBuf} buffer, List&lt;Object&gt; out) throws Exception {

 *

 *     // Revert the state of the variable that might have been changed

 *     // since the last partial decode.

 *     values.clear();

 *

 *     // A message contains 2 integers.

 *     values.offer(buffer.readInt());

 *     values.offer(buffer.readInt());

 *

 *     // Now we know this assertion will never fail.

 *     assert values.size() == 2;

 *     out.add(values.poll() + values.poll());

 *   }

 * }</pre>

 *     </li>

 * </ul>

 *

 * <h3>Improving the performance</h3>

 * <p>

 * Fortunately, the performance of a complex decoder implementation can be

 * improved significantly with the {@code checkpoint()} method.  The

 * {@code checkpoint()} method updates the 'initial' position of the buffer so

 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer

 * to the last position where you called the {@code checkpoint()} method.

 *

复杂的解码器的实现的性能通过checkpoint能显示的提高,checkpoint方法更新了initial的值,

所以当你调用这个方法时会设置readerIndex到buffer最后一个位置

 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>

 * <p>

 * Although you can just use {@code checkpoint()} method and manage the state

 * of the decoder by yourself, the easiest way to manage the state of the

 * decoder is to create an {@link Enum} type which represents the current state

 * of the decoder and to call {@code checkpoint(T)} method whenever the state

 * changes.  You can have as many states as you want depending on the

 * complexity of the message you want to decode:

 *

尽管你可以使用checkpoint方法自己管理decoder的state,最简单的方法去管理decoder的state就是创建一个enum类型,代表当前decoder的state,并且在state改变时调用checkpoint方法。你可以拥有很多states,取决于你解码的消息的复杂性

 * <pre>

 * public enum MyDecoderState {

 *   READ_LENGTH,

 *   READ_CONTENT;

 * }

 *

 * public class IntegerHeaderFrameDecoder

 *      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {

 *

 *   private int length;

 *

 *   public IntegerHeaderFrameDecoder() {

 *     // Set the initial state.

 *     <strong>super(MyDecoderState.READ_LENGTH);</strong>

 *   }

 *

 *   {@code @Override}

 *   protected void decode({@link ChannelHandlerContext} ctx,

 *                           {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {

 *     switch (state()) {

 *     case READ_LENGTH:

 *       length = buf.readInt();

 *       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>

 *     case READ_CONTENT:

 *       ByteBuf frame = buf.readBytes(length);

 *       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>

 *       out.add(frame);

 *     default:

 *       throw new Error("Shouldn't reach here.");

 *     }

 *   }

 * }

 * </pre>

 *

 * <h4>Calling {@code checkpoint()} with no parameter</h4>

调用无参方法

 * <p>

 * An alternative way to manage the decoder state is to manage it by yourself.

 * <pre>

 * public class IntegerHeaderFrameDecoder

 *      extends {@link ReplayingDecoder}&lt;<strong>{@link Void}</strong>&gt; {

 *

 *   <strong>private boolean readLength;</strong>

 *   private int length;

 *

 *   {@code @Override}

 *   protected void decode({@link ChannelHandlerContext} ctx,

 *                           {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {

 *     if (!readLength) {

 *       length = buf.readInt();

 *       <strong>readLength = true;</strong>

 *       <strong>checkpoint();</strong>

 *     }

 *

 *     if (readLength) {

 *       ByteBuf frame = buf.readBytes(length);

 *       <strong>readLength = false;</strong>

 *       <strong>checkpoint();</strong>

 *       out.add(frame);

 *     }

 *   }

 * }

 * </pre>

 *

 * <h3>Replacing a decoder with another decoder in a pipeline</h3>

用一个decoder替换pipeline中的另一个decoder

 * <p>

 * If you are going to write a protocol multiplexer, you will probably want to

 * replace a {@link ReplayingDecoder} (protocol detector) with another

 * {@link ReplayingDecoder}, {@link ByteToMessageDecoder} or {@link MessageToMessageDecoder}

 * (actual protocol decoder).

 * It is not possible to achieve this simply by calling

 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but

 * some additional steps are required:

如果你想编写一个协议多路复用器?你可能想用一个decoder替换一个decoder,但是不能简单的使用

ChannelPipeline.replace方法替换,需要一些额外的步骤

 * <pre>

 * public class FirstDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {

 *

 *     {@code @Override}

 *     protected Object decode({@link ChannelHandlerContext} ctx,

 *                             {@link ByteBuf} in, List&lt;Object&gt; out) {

 *         ...

 *         // Decode the first message

 *         Object firstMessage = ...;

 *

 *         // Add the second decoder

 *         ctx.pipeline().addLast("second", new SecondDecoder());

 *

 *         if (buf.isReadable()) {

 *             // Hand off the remaining data to the second decoder

 *             out.add(firstMessage);

 *             out.add(buf.readBytes(<b>super.actualReadableBytes()</b>));

 *         } else {

 *             // Nothing to hand off

 *             out.add(firstMessage);

 *         }

 *         // Remove the first decoder (me)

 *         ctx.pipeline().remove(this);

 *     }

 * </pre>

 * @param <S>

 *        the state type which is usually an {@link Enum}; use {@link Void} if state management is

 *        unused

 */

state通常是一个Enum枚举类,如果没有用到state管理,用void类型作为泛型参数

 

2.4 ByteToMessageDecoder类

该类继承了ChannelHandlerAdpter,当触发了channelRead方法时,它将方法参数里的bytebuf里的内容写入内部的cumulation(ByteBuf实例)变量中,并调用callDecode方法。

在ReplayingDecoder的callDeocde方法中会循环调用decode方法,如果encode报replay错了,会等待下次数据到来再走一遍。其他错肯定抛出去或者忽视不管。

通过以上的父类的解读再来看HttpObjectDecoder的decode方法怎么具体的去解析http协议

 

2.5 HttpObjectDecoder.encode方法代码

 

就不看怎么仔细去解析Http协议的各个部分了,其实就是按Http协议的格式定义读取到每个部分的State,encodeLastContent是放在ChannelInActive里面调用的。

@Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {

// state 该类内部有个state的enum用来标记当前读取到那块内容,初始SKIP_CONTROL_CHARS

        switch (state()) {

       ...........