<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    netty3.2.3源碼分析--ServerBootstrap啟動分析

    Posted on 2010-12-01 21:37 alex_zheng 閱讀(4377) 評論(0)  編輯  收藏 所屬分類: java
    這里首先分析下ServerBootstrap的啟動過程,在netty中,channel可以看成是socketchannel的抽象
    channelpipeline里存放著channelhandler,channelpipeline根據不同的channelevent觸發對應的操作
    如channel的open,bind,connect等
    下面以TelnetServer為例來一步步看server啟動
    public static void main(String[] args) throws Exception {
            
    // Configure the server.
            
    // new NioServerSocketChannelFactory中初始化一個NioServerSocketPipelineSink,用來處理downstreamhandler
            ServerBootstrap bootstrap = new ServerBootstrap(
                    
    new NioServerSocketChannelFactory(
                            Executors.newCachedThreadPool(),
                            Executors.newCachedThreadPool()));

            
    // Set up the event pipeline factory.
            bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());

            
    // Bind and start to accept incoming connections.
            bootstrap.bind(new InetSocketAddress(8080));
        }

    直接看serverbootstrap的bind方法
    public Channel bind(final SocketAddress localAddress) {
            
    if (localAddress == null) {
                
    throw new NullPointerException("localAddress");
            }
            
    //該隊列中只放了一個Binder
            final BlockingQueue<ChannelFuture> futureQueue =
                
    new LinkedBlockingQueue<ChannelFuture>();
            
    //Binder extends SimpleChannelUpstreamHandler,處理channelOpen
            ChannelHandler binder = new Binder(localAddress, futureQueue);
        
           
    //這里parenthandler為null
            ChannelHandler parentHandler = getParentHandler();
             
    //初始化DefaultChannelPipeline
             
    //在綁定端口前的pipeline里只有一個binder的upstreamhandler
            ChannelPipeline bossPipeline = pipeline();  
            
    //這里調用DefaultChannelPipeline的addlast方法,初始化一個DefaultChannelHandlerContext,
            
    //handlercontext里面是一個鏈表結構
           
    //該context中只有一個binder      
            bossPipeline.addLast("binder", binder);
            
    if (parentHandler != null) {
                bossPipeline.addLast(
    "userHandler", parentHandler);
            }
            
    //一切從這里開始,getFactory()==NioServerSocketChannelFactory
            Channel channel = getFactory().newChannel(bossPipeline);
        
        }

    NioServerSocketChannelFactory.newChannel(ChannelPipeline pipeline)如下
     public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
            
    //初始化一個NioServerSocketChannel,pipeline中放的是binder,sink是NioServerSocketPipelineSink
            return new NioServerSocketChannel(this, pipeline, sink);
     }

    來看NioServerSocketChannel的構造函數中我們看到這么一句fireChannelOpen(this);引用自Channles
     
    public static void fireChannelOpen(Channel channel) {
            
    // Notify the parent handler.
            if (channel.getParent() != null) {
                fireChildChannelStateChanged(channel.getParent(), channel);
            }
            
    //這里調用DefaultChannelPipeline的sendUpstream方法
            channel.getPipeline().sendUpstream(
                    
    new UpstreamChannelStateEvent(
                            channel, ChannelState.OPEN, Boolean.TRUE));
        }
    DefaultChannelPipeline.sendUpstream(ChannelEvent e)
    public void sendUpstream(ChannelEvent e) {
            
    //this.head==binder
            DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
            
    if (head == null) {
                logger.warn(
                        
    "The pipeline contains no upstream handlers; discarding: " + e);
                
    return;
            }

            sendUpstream(head, e);
        }

    執行
     
    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            
    try {
            
    //ctx.getHandler()==binder-->SimpleChannelUpstreamHandler
                ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
            } 
    catch (Throwable t) {
                notifyHandlerException(e, t);
            }
        }
    這里會在SimpleChannelUpstreamHandler.handleUpstream(ctx, e);中調用binder的channelOpen
    public void channelOpen(
                    ChannelHandlerContext ctx,
                    ChannelStateEvent evt) {

                
    try {
                    
    //設置NioServerSocketChannel的DefaultServerSocketChannelConfig的pipelinetfactory
                    
    //在之后的線程分發中會去取該factory的pipeline,即TelnetServerPipelineFactory中的pipeline
                    evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
            
                } 
    finally {
                    
                    ctx.sendUpstream(evt);
                }
                
    //執行NioServerSocketChannel.bind,最終會調用Channels.bind(Channel channel, SocketAddress localAddress)
                boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
                
    assert finished;
            }
    Channels.bind方法如下:
     
    public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
            
    if (localAddress == null) {
                
    throw new NullPointerException("localAddress");
            }
            ChannelFuture future 
    = future(channel);
            
    //又調用了DefaultChannelPipeline的senddownstream,對應事件是bound
            channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
                    channel, future, ChannelState.BOUND, localAddress));
            
    return future;
    }
    DefaultChannelPipeline的senddownstream
     
    public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext tail 
    = getActualDownstreamContext(this.tail);
            
    if (tail == null) {
                
    try {
                    getSink().eventSunk(
    this, e);
                    
    return;
                } 
    catch (Throwable t) {
                    notifyHandlerException(e, t);
                    
    return;
                }
            }

            sendDownstream(tail, e);
        }
    從getActualDownstreamContext返回的是null,所以上面會執行 getSink().eventSunk(this, e);
    DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
            
    if (ctx == null) {
                
    return null;
            }

            DefaultChannelHandlerContext realCtx 
    = ctx;

            
    //Binder是upstream,這里返回null
            while (!realCtx.canHandleDownstream()) {
                realCtx 
    = realCtx.prev;
                
    if (realCtx == null) {
                    
    return null;
                }
            }

            
    return realCtx;
        }
    sendDownstream將執行 getSink().eventSunk(this, e);
    getSink()獲得的是NioServerSocketPipelineSink,
    public void eventSunk(
                ChannelPipeline pipeline, ChannelEvent e) 
    throws Exception {
            Channel channel 
    = e.getChannel();
            
    if (channel instanceof NioServerSocketChannel) {
                handleServerSocket(e);
            } 
    else if (channel instanceof NioSocketChannel) {
                handleAcceptedSocket(e);
            }
        }

     
    private void handleServerSocket(ChannelEvent e) {
            
    if (!(e instanceof ChannelStateEvent)) {
                
    return;
            }

            ChannelStateEvent event 
    = (ChannelStateEvent) e;
            NioServerSocketChannel channel 
    =
                (NioServerSocketChannel) event.getChannel();
            ChannelFuture future 
    = event.getFuture();
            ChannelState state 
    = event.getState();
            Object value 
    = event.getValue();
           
    //根據new DownstreamChannelStateEvent(channel, future, ChannelState.BOUND, localAddress)
            switch (state) {
            
    case OPEN:
                
    if (Boolean.FALSE.equals(value)) {
                    close(channel, future);
                }
                
    break;
            
    case BOUND:
                
    if (value != null) {
                    
    //在這里完成socketAddress綁定
                    bind(channel, future, (SocketAddress) value);
                } 
    else {
                    close(channel, future);
                }
                
    break;
            }
        }

    對應的NioServerSocketPipelineSink.bind方法
    private void bind(
                NioServerSocketChannel channel, ChannelFuture future,
                SocketAddress localAddress) {

            
    boolean bound = false;
            
    boolean bossStarted = false;
            
    try {
                channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
                bound 
    = true;

                future.setSuccess();

                
    //觸發channelbound
                fireChannelBound(channel, channel.getLocalAddress());

                Executor bossExecutor 
    =
                    ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
                bossExecutor.execute(
                        
    new IoWorkerRunnable(
                                
    new ThreadRenamingRunnable(
                                        
    new Boss(channel),
                                        
    "New I/O server boss #" + id +
                                        
    " (" + channel + ')')));
                bossStarted 
    = true;
            } 
    catch (Throwable t) {
                future.setFailure(t);
                fireExceptionCaught(channel, t);
            } 
    finally {
                
    if (!bossStarted && bound) {
                    close(channel, future);
                }
            }
        }
    先來看Channels.fireChannelBound方法做了什么
     
    public static void fireChannelBound(Channel channel, SocketAddress localAddress) {
            
    //channel.getPipeline()的DefaultChannelPipeline中只有一個binder
            
    //這里調用SimpleChannelUpstreamHandler的handleUpstream中的hannelBound(ctx, evt);
            
            channel.getPipeline().sendUpstream(
                    
    new UpstreamChannelStateEvent(
                            channel, ChannelState.BOUND, localAddress));
        }

    接著看bind方法
    Executor bossExecutor =
                    ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
               
    //在bossexcutor中創建一個boss線程
               
    //在該boss線程中分派新的客戶端連接給workerExecutor,workerExecutor的數量為cpu*2
                bossExecutor.execute(
                        
    new IoWorkerRunnable(
                                
    new ThreadRenamingRunnable(
                                        
    new Boss(channel),
                                        
    "New I/O server boss #" + id +
                                        
    " (" + channel + ')')));         
    在new Boss的時候,注冊channel的accept事件
    Boss(NioServerSocketChannel channel) throws IOException {
                
    this.channel = channel;

                selector 
    = Selector.open();

                
    boolean registered = false;
                
    try {
                   
                    channel.socket.register(selector, SelectionKey.OP_ACCEPT);
                    registered 
    = true;
                } 
    finally {
                    
    if (!registered) {
                        closeSelector();
                    }
                }

                channel.selector 
    = selector;
            }

    最終調用Boss.run()

    public void run() {
                
    //獲得當前boss線程,mainreactor
                final Thread currentThread = Thread.currentThread();

                channel.shutdownLock.lock();
                
    try {
                    
    for (;;) {
                        
    try {
                            
    if (selector.select(1000> 0) {
                                selector.selectedKeys().clear();
                            }
                            
    //接收新的客戶端連接
                            SocketChannel acceptedSocket = channel.socket.accept();
                            
    if (acceptedSocket != null) {
                                
    //分派當前連接給workerexcutor,即subreactor
                                registerAcceptedChannel(acceptedSocket, currentThread);
                            }
                        }
                    }
                } 
    finally {
                    channel.shutdownLock.unlock();
                    closeSelector();
                }
            }

    private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                
    try {
                    
    //這里獲得用戶的pipleline,那么這個是在哪里設置的呢,在Binder的channelopen方法的第一句
                    
    // evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
                    
    //在這之前的pipeline都是defalutchannelpipeline,里面只有一個Binder
                    
    //在這之后,每一個NioAcceptedSocketChannel的pipeline獲得的是TelnetServerPipelineFactory中的pipeline
                    ChannelPipeline pipeline =
                        channel.getConfig().getPipelineFactory().getPipeline();
                    
    //nioworker充當subreactor
                    NioWorker worker = nextWorker();
                    worker.register(
    new NioAcceptedSocketChannel(
                            channel.getFactory(), pipeline, channel,
                            NioServerSocketPipelineSink.
    this, acceptedSocket,
                            worker, currentThread), 
    null);
                } 
            }

    這里調用NioWorker.register
    void register(NioSocketChannel channel, ChannelFuture future) {

            
    boolean server = !(channel instanceof NioClientSocketChannel);
            
    //初始化新的task,將當前accept的socketchannel綁定到nioworker的selecortkey的attch
            Runnable registerTask = new RegisterTask(channel, future, server);
            Selector selector;

            
    synchronized (startStopLock) {
                
    if (!started) {
                    
    // Open a selector if this worker didn't start yet.
                    try {
                        
    this.selector = selector = Selector.open();
                    } 
    catch (Throwable t) {
                        
    throw new ChannelException(
                                
    "Failed to create a selector.", t);
                    }

                    
    // Start the worker thread with the new Selector.
                    String threadName =
                        (server 
    ? "New I/O server worker #"
                                : 
    "New I/O client worker #"+ bossId + '-' + id;

                    
    boolean success = false;
                    
    try {
                    
    //啟動一個線程來處理該連接
                        executor.execute(
                                
    new IoWorkerRunnable(
                                        
    new ThreadRenamingRunnable(this, threadName)));
                        success 
    = true;
                    } 
    finally {
                        
    if (!success) {
                            
    // Release the Selector if the execution fails.
                            try {
                                selector.close();
                            } 
    catch (Throwable t) {
                                logger.warn(
    "Failed to close a selector.", t);
                            }
                            
    this.selector = selector = null;
                            
    // The method will return to the caller at this point.
                        }
                    }
                } 
    else {
                    
    // Use the existing selector if this worker has been started.
                    selector = this.selector;
                }

                
    assert selector != null && selector.isOpen();

                started 
    = true;
                
    //加入到任務隊列
                boolean offered = registerTaskQueue.offer(registerTask);
                
    assert offered;
            }

            
    if (wakenUp.compareAndSet(falsetrue)) {
                selector.wakeup();
            }
        }
    來看registertask的run方法
    public void run() {
                SocketAddress localAddress = channel.getLocalAddress();
                SocketAddress remoteAddress = channel.getRemoteAddress();
                if (localAddress == null || remoteAddress == null) {
                    if (future != null) {
                        future.setFailure(new ClosedChannelException());
                    }
                    close(channel, succeededFuture(channel));
                    return;
                }

                try {
                    if (server) {
                        channel.socket.configureBlocking(false);
                    }

                    synchronized (channel.interestOpsLock) {
                        //這里注冊當前accepted的socketchannel的read事件
                        channel.socket.register(
                                selector, channel.getRawInterestOps(), channel);
                    }
                    if (future != null) {
                        channel.setConnected();
                        future.setSuccess();
                    }
                } catch (IOException e) {
                    if (future != null) {
                        future.setFailure(e);
                    }
                    close(channel, succeededFuture(channel));
                    if (!(e instanceof ClosedChannelException)) {
                        throw new ChannelException(
                                "Failed to register a socket to the selector.", e);
                    }
                }

                if (!server) {
                    if (!((NioClientSocketChannel) channel).boundManually) {
                        fireChannelBound(channel, localAddress);
                    }
                    fireChannelConnected(channel, remoteAddress);
                }
            }

    其中executor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(this, threadName)));
    這里的this指向當前的nioworker,調用nioworker.run
    public void run() {
            
    //當前nioworker
            thread = Thread.currentThread();

            
    boolean shutdown = false;
            Selector selector 
    = this.selector;
            
    for (;;) {
                wakenUp.set(
    false);

                
    if (CONSTRAINT_LEVEL != 0) {
                    selectorGuard.writeLock().lock();
                        
    // This empty synchronization block prevents the selector
                        
    // from acquiring its lock.
                    selectorGuard.writeLock().unlock();
                }

                
    try {
                    SelectorUtil.select(selector);

                    
    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    
    // before calling 'selector.wakeup()' to reduce the wake-up
                    
    // overhead. (Selector.wakeup() is an expensive operation.)
                    
    //
                    
    // However, there is a race condition in this approach.
                    
    // The race condition is triggered when 'wakenUp' is set to
                    
    // true too early.
                    
    //
                    
    // 'wakenUp' is set to true too early if:
                    
    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    
    //    'selector.select()'. (BAD)
                    
    // 2) Selector is waken up between 'selector.select()' and
                    
    //    'if (wakenUp.get()) {  }'. (OK)
                    
    //
                    
    // In the first case, 'wakenUp' is set to true and the
                    
    // following 'selector.select()' will wake up immediately.
                    
    // Until 'wakenUp' is set to false again in the next round,
                    
    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    
    // any attempt to wake up the Selector will fail, too, causing
                    
    // the following 'selector.select()' call to block
                    
    // unnecessarily.
                    
    //
                    
    // To fix this problem, we wake up the selector again if wakenUp
                    
    // is true immediately after selector.select().
                    
    // It is inefficient in that it wakes up the selector for both
                    
    // the first case (BAD - wake-up required) and the second case
                    
    // (OK - no wake-up required).

                    
    if (wakenUp.get()) {
                        selector.wakeup();
                    }

                    cancelledKeys 
    = 0;
                    processRegisterTaskQueue();
                    processWriteTaskQueue();
                    processSelectedKeys(selector.selectedKeys());

                    
    // Exit the loop when there's nothing to handle.
                    
    // The shutdown flag is used to delay the shutdown of this
                    
    // loop to avoid excessive Selector creation when
                    
    // connections are registered in a one-by-one manner instead of
                    
    // concurrent manner.
                    if (selector.keys().isEmpty()) {
                        
    if (shutdown ||
                            executor 
    instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {

                            
    synchronized (startStopLock) {
                                
    if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                                    started 
    = false;
                                    
    try {
                                        selector.close();
                                    } 
    catch (IOException e) {
                                        logger.warn(
                                                
    "Failed to close a selector.", e);
                                    } 
    finally {
                                        
    this.selector = null;
                                    }
                                    
    break;
                                } 
    else {
                                    shutdown 
    = false;
                                }
                            }
                        } 
    else {
                            
    // Give one more second.
                            shutdown = true;
                        }
                    } 
    else {
                        shutdown 
    = false;
                    }
                } 
    catch (Throwable t) {
                    logger.warn(
                            
    "Unexpected exception in the selector loop.", t);

                    
    // Prevent possible consecutive immediate failures that lead to
                    
    // excessive CPU consumption.
                    try {
                        Thread.sleep(
    1000);
                    } 
    catch (InterruptedException e) {
                        
    // Ignore.
                    }
                }
            }
        }


    來看processSelectedKeys(selector.selectedKeys());
     private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
            
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
                SelectionKey k 
    = i.next();
                i.remove();
                
    try {
                    
    int readyOps = k.readyOps();
                    
    //可讀,處理downstream
                    if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                        
    if (!read(k)) {
                            
    // Connection already closed - no need to handle write.
                            continue;
                        }
                    }
                    
    //可寫,處理upstream
                    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                        writeFromSelectorLoop(k);
                    }
                } 
    catch (CancelledKeyException e) {
                    close(k);
                }

                
    if (cleanUpCancelledKeys()) {
                    
    break// break the loop to avoid ConcurrentModificationException
                }
            }
        }
    從這個過程來看,在netty中,boss線程用來偵聽socket的連接,然后分派該連接給nioworker,在nioworker中有讀和寫的任務注冊線程池,nioworker線程負責從這些線程中獲取任務進行讀寫操作

    posts - 10, comments - 9, trackbacks - 0, articles - 15

    Copyright © alex_zheng

    主站蜘蛛池模板: 亚洲日本乱码一区二区在线二产线 | 免费看一级做a爰片久久| 日韩免费高清一级毛片在线| 一本色道久久88亚洲综合| 亚洲欧洲∨国产一区二区三区| 亚洲第一成年男人的天堂| 亚洲av乱码一区二区三区| 国产精品亚洲lv粉色| 亚洲一区二区三区久久| 亚洲av无码无线在线观看| 国产三级在线免费观看| 99re6热视频精品免费观看| 国产精品免费一区二区三区四区| 国产在线精品一区免费香蕉| 特级精品毛片免费观看| 成人黄动漫画免费网站视频 | 亚洲国产精品久久久久秋霞影院| 亚洲中文字幕乱码AV波多JI| 看Aⅴ免费毛片手机播放| a在线观看免费视频| 99在线视频免费观看| 亚洲一区二区免费视频| 最近最新高清免费中文字幕| 91精品全国免费观看青青| 国产在线观看麻豆91精品免费 | 国产99在线|亚洲| 一级女性全黄生活片免费看| 人人公开免费超级碰碰碰视频 | 久久久久国产免费| 日韩激情淫片免费看| 亚洲av永久无码制服河南实里 | 久久精品国产亚洲AV高清热| 亚洲aⅴ无码专区在线观看春色| 国产啪精品视频网站免费尤物| 成全高清视频免费观看| 亚洲午夜久久久久妓女影院| 亚洲综合小说另类图片动图| a级毛片毛片免费观看久潮喷| 成人一a毛片免费视频| 亚洲av鲁丝一区二区三区| 国产精品亚洲精品久久精品 |