<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
     1.CumulativeProtocolDecoder
          A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
         即解決'
    粘包'->即一次接收數據不能完全體現一個完整的消息數據->通過應用層數據協議,如協議中通過4字節描述消息大小或以結束符.

    2.CumulativeProtocolDecoder#decode實現
    /**
        * 1.緩存decode中的IoBuffer in至session的attribute
        * 2.循環調用doDecode方法直到其返回false
        * 3.解碼結束后緩存的buffer->壓縮
       
    */

       
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
          
    // 判斷傳輸層是否存在消息分片,如果不分片則直接doDecode.(可參考TCP/IP詳解)
            if (!session.getTransportMetadata().hasFragmentation()) {
               
    while (in.hasRemaining()) {
                   
    if (!doDecode(session, in, out)) {
                       
    break;
                    }

                }


               
    return;
            }


           
    boolean usingSessionBuffer = true;
            IoBuffer buf
    = (IoBuffer) session.getAttribute(BUFFER);
           
    // 如果session中有BUFFER這個attribute則直接執行追加,否則直接用網絡層讀到的buffer
            if (buf != null) {
               
    boolean appended = false;
               
    // Make sure that the buffer is auto-expanded.
                if (buf.isAutoExpand()) {
                   
    try {
                        buf.put(in);
                        appended
    = true;
                    }
    catch (IllegalStateException e) {
                       
    // 可能調用了類似slice的方法,會使父緩沖區的自動擴展屬性失效(1.可參考AbstractIoBuffer#recapacityAllowed 2.可參考IoBuffer的實現)
                    }
    catch (IndexOutOfBoundsException e) {
                       
    // 取消了自動擴展屬性(可參考IoBuffer實現)
                    }

                }


               
    if (appended) {
       
    // 追加成功的話,直接flip
                    buf.flip();
                }
    else {
        
    // 因為用了派生的方法(父子緩沖區)如slice或取消了自動擴展而導致追加失敗->重新分配一個Buffer
                    buf.flip();
                    IoBuffer newBuf
    = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                    newBuf.order(buf.order());
                    newBuf.put(buf);
                    newBuf.put(in);
                    newBuf.flip();
                    buf
    = newBuf;

                   
    // 更新session屬性
                    session.setAttribute(BUFFER, buf);
                }

            }
    else {
       
    // 此else表示session無BUFFER屬性,直接賦值
                buf = in;
                usingSessionBuffer
    = false;
            }


           
    // 無限循環直到break 1.doDecode返回false 2.doDecode返回true且buf已無數據 3.異常
            for (;;) {
               
    int oldPos = buf.position();
               
    boolean decoded = doDecode(session, buf, out);
               
    if (decoded) {
                   
    if (buf.position() == oldPos) {
                       
    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                    }


                   
    if (!buf.hasRemaining()) {
                       
    break;
                    }

                }
    else {
                   
    break;
                }

            }


           
    // 如果經過decode,buffer依然有剩余數據則存儲到session->這樣下次decode的時候就可以從session取出buffer并執行追加了
            if (buf.hasRemaining()) {
               
    if (usingSessionBuffer && buf.isAutoExpand()) {
          
    // 壓縮
                    buf.compact();
                }
    else {
                    storeRemainingInSession(buf, session);
                }

            }
    else {
               
    if (usingSessionBuffer) {
                    removeSessionBuffer(session);
                }

            }

        }

                注.
                        1.doDecode在消息非完整的時候返回false. 
                        2.如果doDecode成功解碼出一條完整消息則返回true->如果此時buffer中依然有剩余數據則繼續執行for->doDecode->直到buffer中的數據不足以解碼出一條成功消息返回false.或者恰恰有n條完整的消息->從for跳出.

    3.CumulativeProtocolDecoder example
        /**
          * 解碼以CRLF(回車換行)作為結束符的消息
                 
    */

       
    public class CrLfTerminatedCommandLineDecoder
             
    extends CumulativeProtocolDecoder {

        
    private Command parseCommand(IoBuffer in) {
       
    // 實現將二進制byte[]轉為業務邏輯消息對象Command
          }


      
    // 只需實現doDecode方法即可
        protected boolean doDecode(
                 IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                 
    throws Exception {

           
    // 初始位置
              int start = in.position();

             
    // 查找'\r\n'標記
              byte previous = 0;
            
    while (in.hasRemaining()) {
                
    byte current = in.get();

                 
    // 找到了\r\n
                  if (previous == '\r' && current == '\n') {
                    
    // Remember the current position and limit.
                      int position = in.position();
                   
    int limit = in.limit();
                   
    try {
                          in.position(start);
                        in.limit(position);
    //設置當前的位置為limit

              
    // position和limit之間是一個完整的CRLF消息
                        out.write(parseCommand(in.slice()));//調用slice方法獲得positon和limit之間的子緩沖區->調用write方法加入消息隊列(因為網絡層一個包可能有多個完整消息)->后經調用flush(遍歷消息隊列的消息)->nextFilter.messageReceived
    filter
                      }
    finally {
                        
    // 設置position為解碼后的position.limit設置為舊的limit
                         in.position(position);
                        in.limit(limit);
                      }


       
    // 直接返回true.因為在父類的decode方法中doDecode是循環執行的直到不再有完整的消息返回false.
                   return true;
               }


                previous
    = current;
             }


             
    // 沒有找到\r\n,則重置position并返回false.使得父類decode->for跳出break.
              in.position(start);

             
    return false;
          }

      }

     4.
    DemuxingProtocolDecoder
        
     1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
         2.這是一個復合的decoder->多路復用->找到一個合適的MessageDecoder.(不同的消息協議)

         3.其doDecode實現為迭代候選的MessageDecoder列表->調用MessageDecoder#decodable方法->如果解碼結果為MessageDecoderResult#NOT_OK,則從候選列表移除;如果解碼結果為MessageDecoderResult#NEED_DATA,則保留該候選decoder并在更多數據到達的時候會再次調用decodable;如果返回結果為MessageDecoderResult#OK,則表明找到了正確的decoder;如果沒有剩下任何的候選decoder,則拋出異常.

        4.doDecode源碼
          protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                 
    // 從Session中獲取一個State.State包含一個MessageDecoder數組以及一個當前的decoder
                State state = getState(session);

              
    // 如果當前decoder為空
                if (state.currentDecoder == null) {
                    MessageDecoder[] decoders
    = state.decoders;
                   
    int undecodables = 0;
           
           
    // 遍歷decoder候選列表
                    for (int i = decoders.length - 1; i >= 0; i--) {
                        MessageDecoder decoder
    = decoders[i];
                       
    int limit = in.limit();
                       
    int pos = in.position();

                        MessageDecoderResult result;

                       
    try {
                    
    // 執行decodable方法并返回result(decodable方法是檢查特定的buffer是否可以decoder解碼)
                            result = decoder.decodable(session, in);
                        }
    finally {
                 
    // 一定要重置回舊的position和limit
                            in.position(pos);
                            in.limit(limit);
                        }


                       
    if (result == MessageDecoder.OK) {
                 
    // 如果返回結果為OK,則設置為state的當前decoder并break
                            state.currentDecoder = decoder;
                           
    break;
                        }
    else if (result == MessageDecoder.NOT_OK) {
                 
    // 如果返回結果為NOT_OK,則記錄undecodables數目++
                            undecodables++;
                        }
    else if (result != MessageDecoder.NEED_DATA) {
                  
    // 如果結果都不是,即也不是NEED_DATA,則直接拋出異常
                            throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
                        }

                    }


           
    // 如果沒有找到合適的decoder,則拋出異常
                    if (undecodables == decoders.length) {
                       
    // Throw an exception if all decoders cannot decode data.
                        String dump = in.getHexDump();
                        in.position(in.limit());
    // 跳過這段數據
                        ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
                        e.setHexdump(dump);
                       
    throw e;
                    }

           
           
    // 迭代結束,如果還沒有找到合適的decoder則表示可能需要更多的數據->所以返回false->跳出父類的for-dodecode循環
                    if (state.currentDecoder == null) {
                       
    // Decoder is not determined yet (i.e. we need more data)
                        return false;
                    }

                }


              
    // 這里表示已找到合適的decoder,調用decode方法進行解碼二進制或者特定的協議數據為更高業務層的消息對象
                try {
                    MessageDecoderResult result
    = state.currentDecoder.decode(session, in, out);
                   
    if (result == MessageDecoder.OK) {
              
    // 重置為null
                        state.currentDecoder = null;
                       
    return true;
                    }
    else if (result == MessageDecoder.NEED_DATA) {
                       
    return false;
                    }
    else if (result == MessageDecoder.NOT_OK) {
                        state.currentDecoder
    = null;
                       
    throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
                    }
    else {
                        state.currentDecoder
    = null;
                       
    throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
                    }

                }
    catch (Exception e) {
                    state.currentDecoder
    = null;
                   
    throw e;
                }

            }

    5.一個特定消息協議的編解碼的例子,{@link org.apache.mina.example.sumup}
        1.AbstractMessageEncoder
        /**
         * 1.編碼消息頭,消息體編碼由子類實現.
         * 2.AbstractMessage中只有一個sequence字段
        
    */

       
    public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
          
    // 類型字段
            private final int type;

           
    protected AbstractMessageEncoder(int type) {
               
    this.type = type;
            }


           
    public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
                IoBuffer buf
    = IoBuffer.allocate(16);
                buf.setAutoExpand(
    true); // Enable auto-expand for easier encoding

               
    // 編碼消息頭
                buf.putShort((short) type);//type字段占2個字節(short)
                buf.putInt(message.getSequence());// sequence字段占4個字節(int)

               
    // 編碼消息體,由子類實現
                encodeBody(session, message, buf);
                buf.flip();
                out.write(buf);
            }


           
    // 子類實現編碼消息體
            protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
        }

        2.AbstractMessageDecoder
        /**
            * 解碼消息頭,消息體由子類實現解碼
           
    */

           
    public abstract class AbstractMessageDecoder implements MessageDecoder {
           
    private final int type;

           
    private int sequence;

           
    private boolean readHeader;

           
    protected AbstractMessageDecoder(int type) {
               
    this.type = type;
            }


           
    // 需覆寫decodable方法,檢查解碼結果
            public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
               
    // HEADER_LEN為type+sequence的長度,共占6個字節.如果此時buffer剩余數據不足header的長度,則返回NEED_DATA的result.
                if (in.remaining() < Constants.HEADER_LEN) {
                   
    return MessageDecoderResult.NEED_DATA;
                }


               
    // 第一個if判斷ok->讀取2字節(short),如果和type匹配則返回OK.
                if (type == in.getShort()) {
                   
    return MessageDecoderResult.OK;
                }


               
    // 兩個if判斷都不ok,則返回NOT_OK
                return MessageDecoderResult.NOT_OK;
            }

           
                    
    // 終極解碼
            public MessageDecoderResult decode(IoSession session, IoBuffer in,
                    ProtocolDecoderOutput out)
    throws Exception {
               
    // 如果header數據已ok且消息體數據不足則下次直接略過
                if (!readHeader) {
                    in.getShort();
    // Skip 'type'.
                    sequence = in.getInt(); // Get 'sequence'.
                    readHeader = true;
                }


               
    // 解碼消息體,如果數據不足以解析消息體,則返回null
                AbstractMessage m = decodeBody(session, in);
               
    // 消息數據體數據不足->返回NEED_DATA
                if (m == null) {
                   
    return MessageDecoderResult.NEED_DATA;
                }
    else {
                    readHeader
    = false; // 成功解碼出一條完成消息,則重置readHeader->下次繼續讀取header
                }

                m.setSequence(sequence);
                out.write(m);

               
    return MessageDecoderResult.OK;
            }


           
    /**
             * 數據完整不足以解析整個消息體則返回null
            
    */

           
    protected abstract AbstractMessage decodeBody(IoSession session,
                    IoBuffer in);
        }

        3.AddMessageEncoder
        /**
                               * 1.AddMessage的encoder.AddMessage繼承自AbstractMessage,又增加了一個字段value
                               * 2.該encoder的type為Constants.ADD,值為1
            
    */

           
    public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
           
    public AddMessageEncoder() {
               
    super(Constants.ADD);
            }


            @Override
           
    protected void encodeBody(IoSession session, T message, IoBuffer out) {                 // 實現了編碼消息體,向buffer追加了AddMessage的消息體value(4個字節-int)
                out.putInt(message.getValue());
            }


           
    public void dispose() throws Exception {
            }

        }


           4.AddMessageDecoder
        /**
            *  AddMessage的decoder.type為Constants.ADD(1)
           
    */

           
    public class AddMessageDecoder extends AbstractMessageDecoder {

           
    public AddMessageDecoder() {
               
    super(Constants.ADD);
            }


            @Override
           
    protected AbstractMessage decodeBody(IoSession session, IoBuffer in) {                  // ADD_BODY_LEN為AddMessage的消息體長度(value屬性),即為4字節(int),如果此時不足4字節,則返回null,表示body數據不足
                if (in.remaining() < Constants.ADD_BODY_LEN) {
                   
    return null;
                }


                AddMessage m
    = new AddMessage();
                m.setValue(in.getInt());
    // 讀取一個int
                return m;
            }


           
    public void finishDecode(IoSession session, ProtocolDecoderOutput out)
                   
    throws Exception {
            }

        }

    6.總結:使用CumulativeProtocolDecoder可以方便的進行特定消息協議的消息解碼并完美的解決了'粘包'問題.另外DemuxingProtocolDecoder結合MessageDecoder可更完美實現解碼方案.
    posted on 2013-12-02 18:55 landon 閱讀(3394) 評論(2)  編輯  收藏 所屬分類: Sources

    FeedBack:
    # re: apache-mina-2.07源碼筆記4-codec
    2013-12-03 09:38 | 鵬達鎖業
    謝謝博主分享。。。。。。。。。。。  回復  更多評論
      
    # re: apache-mina-2.07源碼筆記4-codec
    2013-12-05 17:26 | 左岸
    好東西啊,謝謝分享  回復  更多評論
      
    主站蜘蛛池模板: 最新亚洲人成无码网站| 亚洲av鲁丝一区二区三区| 在线亚洲午夜片AV大片| 日韩av无码久久精品免费| 亚洲国产精品成人久久| 最近免费mv在线观看动漫| 久久精品国产精品亚洲色婷婷| 成人免费ā片在线观看| 亚洲精品乱码久久久久久| 成全视频在线观看免费| 91亚洲国产在人线播放午夜| 亚洲毛片在线免费观看| 亚洲videosbestsex日本| 大学生一级毛片免费看| 亚洲区日韩精品中文字幕| 日韩精品无码人妻免费视频| 日韩在线视频免费| 日韩免费在线观看| 免费福利在线观看| 亚洲小说区图片区另类春色| 免费观看一区二区三区| 亚洲AV无码日韩AV无码导航| xxxx日本免费| 亚洲a∨无码一区二区| 久久青青草原亚洲av无码| 亚洲免费视频网站| 亚洲黄色激情视频| 亚洲国产成人精品女人久久久| 色www永久免费网站| 亚洲乱码一二三四区国产| 国产一区二区三区免费视频| 国产精品福利片免费看 | 四虎影视永久免费视频观看| 一级一级毛片免费播放| 中文字幕亚洲第一在线| 精品免费久久久久久成人影院| 国产日韩AV免费无码一区二区三区| 精品亚洲成a人片在线观看少妇| 在线观看免费为成年视频| 一级有奶水毛片免费看| 亚洲Av高清一区二区三区|