<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢(mèng)蝶

    生活、程序、未來(lái)
       :: 首頁(yè) ::  ::  :: 聚合  :: 管理

    nio框架中的多個(gè)Selector結(jié)構(gòu)

    Posted on 2009-10-06 16:10 dennis 閱讀(5846) 評(píng)論(2)  編輯  收藏 所屬分類: java
        隨著并發(fā)數(shù)量的提高,傳統(tǒng)nio框架采用一個(gè)Selector來(lái)支撐大量連接事件的管理和觸發(fā)已經(jīng)遇到瓶頸,因此現(xiàn)在各種nio框架的新版本都采用多個(gè)Selector并存的結(jié)構(gòu),由多個(gè)Selector均衡地去管理大量連接。這里以Mina和Grizzly的實(shí)現(xiàn)為例。

       在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor來(lái)處理,每個(gè)NioProcessor對(duì)象保存一個(gè)Selector,負(fù)責(zé)具體的select、wakeup、channel的注冊(cè)和取消、讀寫事件的注冊(cè)和判斷、實(shí)際的IO讀寫操作等等,核心代碼如下:
       public NioProcessor(Executor executor) {
            
    super(executor);
            
    try {
                
    // Open a new selector
                selector = Selector.open();
            } 
    catch (IOException e) {
                
    throw new RuntimeIoException("Failed to open a selector.", e);
            }
        }


        
    protected int select(long timeout) throws Exception {
            
    return selector.select(timeout);
        }

     
        
    protected boolean isInterestedInRead(NioSession session) {
            SelectionKey key 
    = session.getSelectionKey();
            
    return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
        }


        
    protected boolean isInterestedInWrite(NioSession session) {
            SelectionKey key 
    = session.getSelectionKey();
            
    return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
        }

        
    protected int read(NioSession session, IoBuffer buf) throws Exception {
            
    return session.getChannel().read(buf.buf());
        }


        
    protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
            
    if (buf.remaining() <= length) {
                
    return session.getChannel().write(buf.buf());
            } 
    else {
                
    int oldLimit = buf.limit();
                buf.limit(buf.position() 
    + length);
                
    try {
                    
    return session.getChannel().write(buf.buf());
                } 
    finally {
                    buf.limit(oldLimit);
                }
            }
        }

       這些方法的調(diào)用都是通過(guò)AbstractPollingIoProcessor來(lái)處理,這個(gè)類里可以看到一個(gè)nio框架的核心邏輯,注冊(cè)、select、派發(fā),具體因?yàn)榕c本文主題不合,不再展開(kāi)。NioProcessor的初始化是在NioSocketAcceptor的構(gòu)造方法中調(diào)用的:

     public NioSocketAcceptor() {
            
    super(new DefaultSocketSessionConfig(), NioProcessor.class);
            ((DefaultSocketSessionConfig) getSessionConfig()).init(
    this);
        }

       直接調(diào)用了父類AbstractPollingIoAcceptor的構(gòu)造函數(shù),在其中我們可以看到,默認(rèn)是啟動(dòng)了一個(gè)SimpleIoProcessorPool來(lái)包裝NioProcessor:
    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
                Class
    <? extends IoProcessor<T>> processorClass) {
            
    this(sessionConfig, nullnew SimpleIoProcessorPool<T>(processorClass),
                    
    true);
        }

       這里其實(shí)是一個(gè)組合模式,SimpleIoProcessorPool和NioProcessor都實(shí)現(xiàn)了Processor接口,一個(gè)是組合形成的Processor池,而另一個(gè)是單獨(dú)的類。調(diào)用的SimpleIoProcessorPool的構(gòu)造函數(shù)是這樣:

        private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1
        public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
            
    this(processorType, null, DEFAULT_SIZE);
        }
        可以看到,默認(rèn)的池大小是cpu個(gè)數(shù)+1,也就是創(chuàng)建了cpu+1個(gè)的Selector對(duì)象。它的重載構(gòu)造函數(shù)里是創(chuàng)建了一個(gè)數(shù)組,啟動(dòng)一個(gè)CachedThreadPool來(lái)運(yùn)行NioProcessor,通過(guò)反射創(chuàng)建具體的Processor對(duì)象,這里就不再列出了。

        Mina當(dāng)有一個(gè)新連接建立的時(shí)候,就創(chuàng)建一個(gè)NioSocketSession,并且傳入上面的SimpleIoProcessorPool,當(dāng)連接初始化的時(shí)候?qū)ession加入SimpleIoProcessorPool:
        protected NioSession accept(IoProcessor<NioSession> processor,
                ServerSocketChannel handle) 
    throws Exception {

            SelectionKey key 
    = handle.keyFor(selector);
            
            
    if ((key == null|| (!key.isValid()) || (!key.isAcceptable()) ) {
                
    return null;
            }

            
    // accept the connection from the client
            SocketChannel ch = handle.accept();
            
            
    if (ch == null) {
                
    return null;
            }

            
    return new NioSocketSession(this, processor, ch);
        }

           
            
    private void processHandles(Iterator<H> handles) throws Exception {
                
    while (handles.hasNext()) {
                    H handle 
    = handles.next();
                    handles.remove();

                    
    // Associates a new created connection to a processor,
                    
    // and get back a session
                    T session = accept(processor, handle);
                    
                    
    if (session == null) {
                        
    break;
                    }

                    initSession(session, 
    nullnull);

                    
    // add the session to the SocketIoProcessor
                    session.getProcessor().add(session);
                }
            }

        加入的操作是遞增一個(gè)整型變量并且模數(shù)組大小后對(duì)應(yīng)的NioProcessor注冊(cè)到session里:

        
    private IoProcessor<T> nextProcessor() {
            checkDisposal();
            
    return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
        }

     
       if (p == null) {
                p 
    = nextProcessor();
                IoProcessor
    <T> oldp =
                    (IoProcessor
    <T>) session.setAttributeIfAbsent(PROCESSOR, p);
                
    if (oldp != null) {
                    p 
    = oldp;
                }
        }


        這樣一來(lái),每個(gè)連接都關(guān)聯(lián)一個(gè)NioProcessor,也就是關(guān)聯(lián)一個(gè)Selector對(duì)象,避免了所有連接共用一個(gè)Selector負(fù)載過(guò)高導(dǎo)致server響應(yīng)變慢的后果。但是注意到NioSocketAcceptor也有一個(gè)Selector,這個(gè)Selector用來(lái)干什么的呢?那就是集中處理OP_ACCEPT事件的Selector,主要用于連接的接入,不跟處理讀寫事件的Selector混在一起,因此Mina的默認(rèn)open的Selector是cpu+2個(gè)。

        看完mina2.0之后,我們來(lái)看看Grizzly2.0是怎么處理的,Grizzly還是比較保守,它默認(rèn)就是啟動(dòng)兩個(gè)Selector,其中一個(gè)專門負(fù)責(zé)accept,另一個(gè)負(fù)責(zé)連接的IO讀寫事件的管理。Grizzly 2.0中Selector的管理是通過(guò)SelectorRunner類,這個(gè)類封裝了Selector對(duì)象以及核心的分發(fā)注冊(cè)邏輯,你可以將他理解成Mina中的NioProcessor,核心的代碼如下:

    protected boolean doSelect() {
            selectorHandler 
    = transport.getSelectorHandler();
            selectionKeyHandler 
    = transport.getSelectionKeyHandler();
            strategy 
    = transport.getStrategy();
            
            
    try {

                
    if (isResume) {
                    
    // If resume SelectorRunner - finish postponed keys
                    isResume = false;
                    
    if (keyReadyOps != 0) {
                        
    if (!iterateKeyEvents()) return false;
                    }
                    
                    
    if (!iterateKeys()) return false;
                }

                lastSelectedKeysCount 
    = 0;
                
                selectorHandler.preSelect(
    this);
                
                readyKeys 
    = selectorHandler.select(this);

                
    if (stateHolder.getState(false== State.STOPPING) return false;
                
                lastSelectedKeysCount 
    = readyKeys.size();
                
                
    if (lastSelectedKeysCount != 0) {
                    iterator 
    = readyKeys.iterator();
                    
    if (!iterateKeys()) return false;
                }

                selectorHandler.postSelect(
    this);
            } 
    catch (ClosedSelectorException e) {
                notifyConnectionException(key,
                        
    "Selector was unexpectedly closed", e,
                        Severity.TRANSPORT, Level.SEVERE, Level.FINE);
            } 
    catch (Exception e) {
                notifyConnectionException(key,
                        
    "doSelect exception", e,
                        Severity.UNKNOWN, Level.SEVERE, Level.FINE);
            } 
    catch (Throwable t) {
                logger.log(Level.SEVERE,
    "doSelect exception", t);
                transport.notifyException(Severity.FATAL, t);
            }

            
    return true;
        }

        基本上是一個(gè)reactor實(shí)現(xiàn)的樣子,在AbstractNIOTransport類維護(hù)了一個(gè)SelectorRunner的數(shù)組,而Grizzly用于創(chuàng)建tcp server的類TCPNIOTransport正是繼承于AbstractNIOTransport類,在它的start方法中調(diào)用了startSelectorRunners來(lái)創(chuàng)建并啟動(dòng)SelectorRunner數(shù)組:
      private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2;
     @Override
      
    public void start() throws IOException {

      
    if (selectorRunnersCount <= 0) {
                    selectorRunnersCount 
    = DEFAULT_SELECTOR_RUNNERS_COUNT;
                }
      startSelectorRunners();

    }

     protected void startSelectorRunners() throws IOException {
            selectorRunners 
    = new SelectorRunner[selectorRunnersCount];
            
            
    synchronized(selectorRunners) {
                
    for (int i = 0; i < selectorRunnersCount; i++) {
                    SelectorRunner runner 
    =
                            
    new SelectorRunner(this, SelectorFactory.instance().create());
                    runner.start();
                    selectorRunners[i] 
    = runner;
                }
            }
        }

      可見(jiàn)Grizzly并沒(méi)有采用一個(gè)單獨(dú)的池對(duì)象來(lái)管理SelectorRunner,而是直接采用數(shù)組管理,默認(rèn)數(shù)組大小是2。SelectorRunner實(shí)現(xiàn)了Runnable接口,它的start方法調(diào)用了一個(gè)線程池來(lái)運(yùn)行自身。剛才我提到了說(shuō)Grizzly的Accept是單獨(dú)一個(gè)Selector來(lái)管理的,那么是如何表現(xiàn)的呢?答案在RoundRobinConnectionDistributor類,這個(gè)類是用于派發(fā)注冊(cè)事件到相應(yīng)的SelectorRunner上,它的派發(fā)方式是這樣:

     public Future<RegisterChannelResult> registerChannelAsync(
                SelectableChannel channel, 
    int interestOps, Object attachment,
                CompletionHandler completionHandler) 
                
    throws IOException {
            SelectorRunner runner 
    = getSelectorRunner(interestOps);
            
            
    return transport.getSelectorHandler().registerChannelAsync(
                    runner, channel, interestOps, attachment, completionHandler);
        }
        
        
    private SelectorRunner getSelectorRunner(int interestOps) {
            SelectorRunner[] runners 
    = getTransportSelectorRunners();
            
    int index;
            
    if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) {
                index 
    = 0;
            } 
    else {
                index 
    = (counter.incrementAndGet() % (runners.length - 1)) + 1;
            }
            
            
    return runners[index];
        }

        getSelectorRunner這個(gè)方法道出了秘密,如果是OP_ACCEPT,那么都使用數(shù)組中的第一個(gè)SelectorRunner,如果不是,那么就通過(guò)取模運(yùn)算的結(jié)果+1從后面的SelectorRunner中取一個(gè)來(lái)注冊(cè)。

        分析完mina2.0和grizzly2.0對(duì)Selector的管理后我們可以得到幾個(gè)啟示:

    1、在處理大量連接的情況下,多個(gè)Selector比單個(gè)Selector好
    2、多個(gè)Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理OP_ACCEPT的Selector分離,也就是說(shuō)處理接入應(yīng)該要一個(gè)單獨(dú)的Selector對(duì)象來(lái)處理,避免IO讀寫事件影響接入速度。
    3、Selector的數(shù)目問(wèn)題,mina默認(rèn)是cpu+2,而grizzly總共就2個(gè),我更傾向于mina的策略,但是我認(rèn)為應(yīng)該對(duì)cpu個(gè)數(shù)做一個(gè)判斷,如果CPU個(gè)數(shù)超過(guò)8個(gè),那么更多的Selector線程可能帶來(lái)比較大的線程切換的開(kāi)銷,mina默認(rèn)的策略并非合適,幸好可以設(shè)置這個(gè)數(shù)值。

       
       


        

    評(píng)論

    # re: nio框架中的多個(gè)Selector結(jié)構(gòu)  回復(fù)  更多評(píng)論   

    2009-10-07 17:51 by Sunng
    學(xué)習(xí)了,謝謝lz

    # re: nio框架中的多個(gè)Selector結(jié)構(gòu)[未登錄](méi)  回復(fù)  更多評(píng)論   

    2012-04-06 00:59 by ak47
    能否請(qǐng)教下為何“如果CPU個(gè)數(shù)超過(guò)8個(gè),那么更多的Selector線程可能帶來(lái)比較大的線程切換的開(kāi)銷”?
    主站蜘蛛池模板: 成人人免费夜夜视频观看| 久久精品亚洲视频| 亚洲乱妇熟女爽到高潮的片| 日日麻批免费40分钟无码| 久久精品亚洲福利| 香蕉视频免费在线播放| 成人a免费α片在线视频网站| 亚洲国产中文在线二区三区免| 久久免费视频99| 中文字幕专区在线亚洲| 免费无遮挡无码视频在线观看| 破了亲妺妺的处免费视频国产 | 91精品手机国产免费| 亚洲一区精品无码| 一级特黄色毛片免费看| 四虎影院永久免费观看| 亚洲一本到无码av中文字幕| 国产麻豆视频免费观看| 亚洲免费在线视频观看| 91手机看片国产永久免费| 亚洲黄色一级毛片| 四虎精品视频在线永久免费观看| 亚洲综合成人网在线观看| 免费国产黄网站在线观看视频 | 毛片在线看免费版| 亚洲最大福利视频| 国产一精品一AV一免费孕妇| 亚洲乱码卡一卡二卡三| 国产免费AV片在线播放唯爱网| 亚洲mv国产精品mv日本mv| 18禁网站免费无遮挡无码中文| 亚洲国产成AV人天堂无码| av无码国产在线看免费网站 | 免费看h片的网站| 内射干少妇亚洲69XXX| 99久久综合精品免费| 亚洲最大视频网站| 日韩精品无码区免费专区| 亚洲男同gay片| 国产免费小视频在线观看| 免费一区二区三区在线视频|