<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    上一篇分析了服務(wù)器端讀取客戶發(fā)送的數(shù)據(jù),這篇來看服務(wù)器端如何發(fā)送數(shù)據(jù)給客戶端,服務(wù)器往外發(fā)送數(shù)據(jù)是通過downstreamhandler從下到上執(zhí)行
    發(fā)送從ChannelFuture future = e.getChannel().write(response)開始執(zhí)行Channels下的
    public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
            ChannelFuture future 
    = future(channel);
            channel.getPipeline().sendDownstream(
                    
    new DownstreamMessageEvent(channel, future, message, remoteAddress));
            
    return future;
     }

    telentpipeline中最下面一個(gè)downstreamhandler是stringencoder,最后執(zhí)行OneToOneEncoder的handleDownstream
    public void handleDownstream(
                ChannelHandlerContext ctx, ChannelEvent evt) 
    throws Exception {
            
    if (!(evt instanceof MessageEvent)) {
                ctx.sendDownstream(evt);
                
    return;
            }

            MessageEvent e 
    = (MessageEvent) evt;
            Object originalMessage 
    = e.getMessage();
            Object encodedMessage 
    = encode(ctx, e.getChannel(), originalMessage);
            
    if (originalMessage == encodedMessage) {
                ctx.sendDownstream(evt);
            } 
    else if (encodedMessage != null) {
                
    //這里寫encode數(shù)據(jù),DefaultChannelPipeline的sendDownstream
                write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
            }
        }
    DefaultChannelPipeline的sendDownstream方法
    public void sendDownstream(ChannelEvent e) {
                DefaultChannelHandlerContext prev 
    = getActualDownstreamContext(this.prev);
                
    if (prev == null) {
                    
    try {
                        
    //因?yàn)閟tringencoder是唯一一個(gè)downstreamhandler,這里執(zhí)行NioServerSocketPipelineSink.eventSunk
                        getSink().eventSunk(DefaultChannelPipeline.this, e);
                    } 
    catch (Throwable t) {
                        notifyHandlerException(e, t);
                    }
                } 
    else {
                    DefaultChannelPipeline.
    this.sendDownstream(prev, e);
                }
            }
    eventSunk方法會(huì)執(zhí)行
    private void handleAcceptedSocket(ChannelEvent e) {
            
    if (e instanceof ChannelStateEvent) {
                ChannelStateEvent event 
    = (ChannelStateEvent) e;
                NioSocketChannel channel 
    = (NioSocketChannel) event.getChannel();
                ChannelFuture future 
    = event.getFuture();
                ChannelState state 
    = event.getState();
                Object value 
    = event.getValue();

                
    switch (state) {
                
    case OPEN:
                    
    if (Boolean.FALSE.equals(value)) {
                        channel.worker.close(channel, future);
                    }
                    
    break;
                
    case BOUND:
                
    case CONNECTED:
                    
    if (value == null) {
                        channel.worker.close(channel, future);
                    }
                    
    break;
                
    case INTEREST_OPS:
                    channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                    
    break;
                }
            } 
    else if (e instanceof MessageEvent) {
                MessageEvent event 
    = (MessageEvent) e;
                NioSocketChannel channel 
    = (NioSocketChannel) event.getChannel();
                
    //放入writerequestqueue隊(duì)列
                boolean offered = channel.writeBuffer.offer(event);
                
    assert offered;
                
    //執(zhí)行nioworker的writeFromUserCode,之后執(zhí)行write0方法
                channel.worker.writeFromUserCode(channel);
            }
        }

    private void write0(NioSocketChannel channel) {
            
    boolean open = true;
            
    boolean addOpWrite = false;
            
    boolean removeOpWrite = false;

            
    long writtenBytes = 0;

            
    final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
            
    final SocketChannel ch = channel.socket;
            
    //之前將channel放到了該隊(duì)列
            final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
            //默認(rèn)嘗試16次寫
            
    final int writeSpinCount = channel.getConfig().getWriteSpinCount();
            
    synchronized (channel.writeLock) {
                channel.inWriteNowLoop 
    = true;
                
    for (;;) {
                    MessageEvent evt 
    = channel.currentWriteEvent;
                    SendBuffer buf;
                    
    if (evt == null) {
                
    //從writebuffer中獲得一個(gè)writeevent
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                            removeOpWrite 
    = true;
                            channel.writeSuspended 
    = false;
                            
    break;
                        }
                        
                        channel.currentWriteBuffer 
    = buf = sendBufferPool.acquire(evt.getMessage());
                    } 
    else {
                        buf 
    = channel.currentWriteBuffer;
                    }

                    ChannelFuture future 
    = evt.getFuture();
                    
    try {
                        
    long localWrittenBytes = 0;
                        
    for (int i = writeSpinCount; i > 0; i --) {
                            
    //發(fā)送數(shù)據(jù)給客戶端,執(zhí)行PooledSendBuffer.transferTo
                            localWrittenBytes = buf.transferTo(ch);
                            
    if (localWrittenBytes != 0) {
                                writtenBytes 
    += localWrittenBytes;
                                
    break;
                            }
                            
    if (buf.finished()) {
                                
    break;
                            }
                        }

                        
    if (buf.finished()) {
                            
    // Successful write - proceed to the next message.
                            buf.release();
                            channel.currentWriteEvent 
    = null;
                            channel.currentWriteBuffer 
    = null;
                            evt 
    = null;
                            buf 
    = null;
                            future.setSuccess();
                        } 
    else {
                            
    // Not written fully - perhaps the kernel buffer is full.
                            addOpWrite = true;
                            channel.writeSuspended 
    = true;

                            
    if (localWrittenBytes > 0) {
                                
    // Notify progress listeners if necessary.
                                future.setProgress(
                                        localWrittenBytes,
                                        buf.writtenBytes(), buf.totalBytes());
                            }
                            
    break;
                        }
                    } 
    catch (AsynchronousCloseException e) {
                        
    // Doesn't need a user attention - ignore.
                    } catch (Throwable t) {
                        buf.release();
                        channel.currentWriteEvent 
    = null;
                        channel.currentWriteBuffer 
    = null;
                        buf 
    = null;
                        evt 
    = null;
                        future.setFailure(t);
                        fireExceptionCaught(channel, t);
                        
    if (t instanceof IOException) {
                            open 
    = false;
                            close(channel, succeededFuture(channel));
                        }
                    }
                }
                channel.inWriteNowLoop 
    = false;
            }
            
    //觸發(fā)寫完成事件,執(zhí)行的是DefaultChannelPipeline的sendUpstream,最后調(diào)用SimpleChannelUpstreamHandler.writeComplete
            
    //pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡(jiǎn)單的傳遞該事件
            fireWriteComplete(channel, writtenBytes);

            
    if (open) {
                
    if (addOpWrite) {
                    setOpWrite(channel);
                } 
    else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }

    posts - 10, comments - 9, trackbacks - 0, articles - 15

    Copyright © alex_zheng

    主站蜘蛛池模板: 日本一区二区三区在线视频观看免费 | 国产精品无码免费专区午夜| 日韩在线播放全免费| 中文字幕亚洲日韩无线码| 久久亚洲中文字幕无码| 成人网站免费观看| 久久久国产亚洲精品| 91久久精品国产免费直播| 亚洲欧美国产精品专区久久| 毛片a级三毛片免费播放| 亚洲国产精品综合久久20| 91频在线观看免费大全| 亚洲精品天堂在线观看| 国产亚洲精品精品国产亚洲综合| 在线美女免费观看网站h| 亚洲一级片在线播放| 在线看片韩国免费人成视频| 深夜久久AAAAA级毛片免费看| 又大又黄又粗又爽的免费视频| 国产精品无码亚洲精品2021| 午夜国产羞羞视频免费网站| 老司机午夜精品视频在线观看免费 | 免费精品久久天干天干| 亚洲综合一区二区精品导航| 日本zzzzwww大片免费| 高h视频在线免费观看| 亚洲区精品久久一区二区三区| 我想看一级毛片免费的| 久久99毛片免费观看不卡| 亚洲激情电影在线| 午夜爱爱免费视频| 九九九精品视频免费| 亚洲AV无码成人精品区在线观看| 麻豆成人久久精品二区三区免费| 亚洲综合区图片小说区| 自拍偷自拍亚洲精品第1页 | 久艹视频在线免费观看| 国产成人亚洲合集青青草原精品 | 中文字幕精品亚洲无线码二区| 国产精品免费看久久久久| 国产精品免费αv视频|