<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    netty3.2.3源碼分析--服務器端讀數據分析

    Posted on 2010-12-03 21:26 alex_zheng 閱讀(2090) 評論(0)  編輯  收藏 所屬分類: java
    上一篇分析了serverboostrap的啟動,接下來分析netty的數據讀取。
    在nioworker的,負責讀取操作是由,在該方法中,如果當前channel的(readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0,且此時
    ch.read(buff)<0,則判斷客戶端已經斷開連接
    private boolean read(SelectionKey k) {
            
    final SocketChannel ch = (SocketChannel) k.channel();
            
    final NioSocketChannel channel = (NioSocketChannel) k.attachment();

            
    final ReceiveBufferSizePredictor predictor =
                channel.getConfig().getReceiveBufferSizePredictor();
            
    //默認1024個字節空間
            final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

            
    int ret = 0;
            
    int readBytes = 0;
            
    boolean failure = true;
            
    //分配連續的1024個byte空間
            ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
            
    try {
                
    while ((ret = ch.read(bb)) > 0) {
                    readBytes 
    += ret;
                    
    if (!bb.hasRemaining()) {
                        
    break;
                    }
                }
                failure 
    = false;
            } 
    catch (ClosedChannelException e) {
                
    // Can happen, and does not need a user attention.
            } catch (Throwable t) {
                fireExceptionCaught(channel, t);
            }

            
    if (readBytes > 0) {
                bb.flip();

                
    final ChannelBufferFactory bufferFactory =
                    channel.getConfig().getBufferFactory();
                
    final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
                buffer.setBytes(
    0, bb);
                buffer.writerIndex(readBytes);

                recvBufferPool.release(bb);

                
    // Update the predictor.
                predictor.previousReceiveBufferSize(readBytes);

                
    //觸發消息接收事件,根據pipeline中upstreamhandler由上到下的順序,調用messageReceived方法
                fireMessageReceived(channel, buffer);
            } 
    else {
                recvBufferPool.release(bb);
            }

            
    if (ret < 0 || failure) {
                close(channel, succeededFuture(channel));
                
    return false;
            }

            
    return true;
        }
        

    在pipelinefactory中的第一個upstreamhandler為DelimiterBasedFrameDecoder,繼承自FrameDecoder
    public ChannelPipeline getPipeline() throws Exception {
            
    // Create a default pipeline implementation.
            ChannelPipeline pipeline = pipeline();

            
    // Add the text line codec combination first,
            pipeline.addLast("framer"new DelimiterBasedFrameDecoder(
                    
    8192, Delimiters.lineDelimiter()));
            pipeline.addLast(
    "decoder"new StringDecoder());
            pipeline.addLast(
    "encoder"new StringEncoder());

            
    // and then business logic.
            pipeline.addLast("handler"new TelnetServerHandler());

            
    return pipeline;
        }
    會調用FrameDecoder的messageReceived
     
    public void messageReceived(
                ChannelHandlerContext ctx, MessageEvent e) 
    throws Exception {

            Object m 
    = e.getMessage();
            
    if (!(m instanceof ChannelBuffer)) {
                ctx.sendUpstream(e);
                
    return;
            }

            ChannelBuffer input 
    = (ChannelBuffer) m;
            
    if (!input.readable()) {
                
    return;
            }

            ChannelBuffer cumulation 
    = cumulation(ctx);
            
    if (cumulation.readable()) {
                cumulation.discardReadBytes();
                cumulation.writeBytes(input);
                callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
            } 
    else {
                
    //這里調用子類的decode方法
                callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
                
    if (input.readable()) {
                    cumulation.writeBytes(input);
                }
            }
        }

     //在這個upstreamhandler中,會一直讀取數據,直到遇到協議約定的結束標志才將messagereceived事件傳給下一個
     
    private void callDecode(
                ChannelHandlerContext context, Channel channel,
                ChannelBuffer cumulation, SocketAddress remoteAddress) 
    throws Exception {

            
    while (cumulation.readable()) {
                
    int oldReaderIndex = cumulation.readerIndex();
                Object frame 
    = decode(context, channel, cumulation);
                
    if (frame == null) {
                    
    if (oldReaderIndex == cumulation.readerIndex()) {
                        
    // Seems like more data is required.
                        
    // Let us wait for the next notification.
                        break;
                    } 
    else {
                        
    // Previous data has been discarded.
                        
    // Probably it is reading on.
                        continue;
                    }
                } 
    else if (oldReaderIndex == cumulation.readerIndex()) {
                    
    throw new IllegalStateException(
                            
    "decode() method must read at least one byte " +
                            
    "if it returned a frame (caused by: " + getClass() + ")");
                }
                
    //將messagereceive事件傳給下個upstreamhandler
                unfoldAndFireMessageReceived(context, remoteAddress, frame);
            }
        }
    看子類的decode是如何判斷數據讀取完畢
    protected Object decode(
                ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) 
    throws Exception {
            
    // Try all delimiters and choose the delimiter which yields the shortest frame.
            int minFrameLength = Integer.MAX_VALUE;
            ChannelBuffer minDelim 
    = null;
            
    //獲取\r\n的位置
            for (ChannelBuffer delim: delimiters) {
                
    int frameLength = indexOf(buffer, delim);
                
    if (frameLength >= 0 && frameLength < minFrameLength) {
                    minFrameLength 
    = frameLength;
                    minDelim 
    = delim;
                }
            }
            
    //如果找到\r\n,表明客戶端數據發送完畢
            if (minDelim != null) {
                
    int minDelimLength = minDelim.capacity();
                ChannelBuffer frame;

                
    if (discardingTooLongFrame) {
                    
    // We've just finished discarding a very large frame.
                    
    // Go back to the initial state.
                    discardingTooLongFrame = false;
                    buffer.skipBytes(minFrameLength 
    + minDelimLength);

                    
    // TODO Let user choose when the exception should be raised - early or late?
                    
    //      If early, fail() should be called when discardingTooLongFrame is set to true.
                    int tooLongFrameLength = this.tooLongFrameLength;
                    
    this.tooLongFrameLength = 0;
                    fail(ctx, tooLongFrameLength);
                    
    return null;
                }

                
    if (minFrameLength > maxFrameLength) {
                    
    // Discard read frame.
                    buffer.skipBytes(minFrameLength + minDelimLength);
                    fail(ctx, minFrameLength);
                    
    return null;
                }

                
    if (stripDelimiter) {
                    
    //這里讀取全部數據
                    frame = buffer.readBytes(minFrameLength);
                    buffer.skipBytes(minDelimLength);
                } 
    else {
                    frame 
    = buffer.readBytes(minFrameLength + minDelimLength);
                }

                
    return frame;
            } 
    else {
                
    if (!discardingTooLongFrame) {
                    
    if (buffer.readableBytes() > maxFrameLength) {
                        
    // Discard the content of the buffer until a delimiter is found.
                        tooLongFrameLength = buffer.readableBytes();
                        buffer.skipBytes(buffer.readableBytes());
                        discardingTooLongFrame 
    = true;
                    }
                } 
    else {
                    
    // Still discarding the buffer since a delimiter is not found.
                    tooLongFrameLength += buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                }
                
    return null;
            }
        }
    因為unfold默認是false,會執行,調用下一個upstreamhandler,這里是stringdecoder,通過stringdecoder,將channelbuffer中的數據轉為string
    然后再觸發下一個upstreamhandler的messagereceive,這里是TelnetServerHandler
    public void messageReceived(
                ChannelHandlerContext ctx, MessageEvent e) {

            
    // Cast to a String first.
            
    // We know it is a String because we put some codec in TelnetPipelineFactory.
            String request = (String) e.getMessage();

            
    // Generate and write a response.
            String response;
            
    boolean close = false;
            
    if (request.length() == 0) {
                response 
    = "Please type something."r"n";
            } 
    else if (request.toLowerCase().equals("bye")) {
                response 
    = "Have a good day!"r"n";
                close 
    = true;
            } 
    else {
                response 
    = "Did you say '" + request + "'?"r"n";
            }

            
    // We do not need to write a ChannelBuffer here.
            
    // We know the encoder inserted at TelnetPipelineFactory will do the conversion.
            ChannelFuture future = e.getChannel().write(response);

            
    // Close the connection after sending 'Have a good day!'
            
    // if the client has sent 'bye'.
            if (close) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }

    數據讀取分析完畢,接著繼續分析服務器端數據的發送


    posts - 10, comments - 9, trackbacks - 0, articles - 15

    Copyright © alex_zheng

    主站蜘蛛池模板: 亚洲日韩精品射精日| 免费v片视频在线观看视频| 国产v亚洲v天堂无码网站| 色老头综合免费视频| 亚洲免费视频一区二区三区| 色综合久久精品亚洲国产| 午夜无遮挡羞羞漫画免费| 国产精品亚洲av色欲三区| 国产成人青青热久免费精品 | 777成影片免费观看| 亚洲色偷偷偷网站色偷一区| 麻豆成人久久精品二区三区免费| 亚洲一本综合久久| 国产精品免费观看| 亚洲经典千人经典日产| 免费大黄网站在线看| 久久九九免费高清视频| 亚洲综合国产精品| 毛片在线看免费版| 深夜福利在线视频免费| 狠狠色伊人亚洲综合成人| 成人免费福利视频| 美女被免费网站视频在线| 相泽亚洲一区中文字幕| 4399影视免费观看高清直播| 亚洲kkk4444在线观看| 久久久久亚洲AV无码专区网站| 男人进去女人爽免费视频国产 | 免费看的成人yellow视频| 日本一区二区三区在线视频观看免费| 国产亚洲精久久久久久无码77777 国产亚洲精品成人AA片新蒲金 | 东北美女野外bbwbbw免费| 色婷婷亚洲十月十月色天| 成全视频在线观看免费高清动漫视频下载 | 亚洲av片不卡无码久久| 四虎永久在线精品视频免费观看| 中国精品一级毛片免费播放| 亚洲国产综合在线| 亚洲视频一区二区| 日本视频一区在线观看免费| 午夜亚洲乱码伦小说区69堂|