<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-7  評論-23  文章-0  trackbacks-0
     1.       ExecutorService

     
    Java1.5開始正式提供了并發包,而這個并發包里面除了原子變量,synchronizer,并發容器,另外一個非常重要的特性就是線程池.對于線程池的意義,我們這邊不再多說.

    上圖是線程池的主體類圖,ThreadPoolExecutor是應用最為廣泛的一個線程池實現(我也將在接下來的文字中詳細描述我對這個類的理解和執行機制),ScheduledThreadPoolExecutor則在ThreadPoolExecutor上提供了定時執行的等附加功能,這個可以從ScheduledExecutorService接口的定義中看出來.Executors則類似工廠方法,提供了幾個非常常用的線程池初始化方法.

    ThreadPoolExecutor

    這個類繼承了AbstractExecutorService抽象類, AbstractExecutorService主要的職責有2部分,一部分定義和實現提交任務的方法(3submit方法的實現) ,實例化FutureTask并且交給子類執行,另外一部分實現invokeAny,invokeAll方法.留給子類的方法為execute方法,也就是Executor接口定義的方法.

    //實例化一個FutureTask,交給子類的execute方法執行.這種設計能夠保證callable和runnable的執行接口方法的一致性(FutureTask包裝了這個差別)
    public <T> Future<T> submit(Runnable task, T result) {
        
    if (task == nullthrow new NullPointerException();
        RunnableFuture
    <T> ftask = newTaskFor(task, result);
        execute(ftask);
        
    return ftask;
    }


    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        
    return new FutureTask<T>(runnable, value);
    }
    關于FutureTask這個類的實現,我在前面的JAVA LOCK代碼淺析有講過其實現原理,主要的思想就是關注任務完成與未完成的狀態,任務提交線程get()結果時被park,等待任務執行完成被喚醒,任務執行線程在任務執行完畢后設置結果,并且unpark對應線程并且讓其得到執行結果.

    回到ThreadPoolExecutor.ThreadPoolExecutor需要實現除了我們剛才說的execute(Runnable command)方法外,還得實現ExecutorService接口定義的部分方法.ThreadPoolExecutor所提供的不光是這些,以下根據我的理解來列一下它所具有的特性
    1.       execute流程
    2.      
    3.       工作隊列
    4.       飽和拒絕策略
    5.       線程工廠
    6.       beforeExecuteafterExecute擴展

    execute方法的實現有個機制非常重要,當當前線程池線程數量小于corePoolSize,那么生成一個新的worker并把提交的任務置為這個工作線程的頭一個執行任務,如果大于corePoolSize,那么會試著將提交的任務塞到workQueue里面供線程池里面的worker稍后執行,并不是直接再起一個worker,但是當workQueue也滿,并且當前線程池小于maxPoolSize,那么起一個新的worker并將該任務設為該worker執行的第一個任務執行,大于maxPoolSize,workQueue也滿負荷,那么調用飽和策略里面的行為.

    worker線程在執行完一個任務之后并不會立刻關閉,而是嘗試著去workQueue里面取任務,如果取不到,根據策略關閉或者保持空閑狀態.所以submit任務的時候,提交的順序為核心線程池------工作隊列------擴展線程池.

    池包括核心池
    ,擴展池(2者的線程在同一個hashset中,這里只是為了方便才這么稱呼,并不是分離的),核心池在池內worker沒有用完的情況下,只要有任務提交都會創建新的線程,其代表線程池正常處理任務的能力.擴展池,是在核心線程池用完,并且工作隊列也已排滿任務的情況下才會開始初始化線程,其代表的是線程池超出正常負載時的解決方案,一旦任務完成,并且試圖從workQueue取不到任務,那么會比較當前線程池與核心線程池的大小,大于核心線程池數的worker將被銷毀.
    Runnable getTask() {
        
    for (;;) {
            
    try {
                
    int state = runState;
                
    //>SHUTDOWN就是STOP或者TERMINATED
                
    //直接返回
                if (state > SHUTDOWN)
                    
    return null;
                Runnable r;
                
    //如果是SHUTDOWN狀態,那么取任務,如果有
                  
    //將剩余任務執行完畢,否則就結束了
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                
    //如果不是以上狀態的(也就是RUNNING狀態的),那么如果當前池大于核心池數量,
                
    //或者允許核心線程池取任務超時就可以關閉,那么從任務隊列取任務,
                
    //如果超出keepAliveTime,那么就返回null了,也就意味著這個worker結束了
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r 
    = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                
    //如果當前池小于核心池,并且不允許核心線程池取任務超時就關閉,那么take(),直到拿到任務或者被interrupt
                else
                    r 
    = workQueue.take();
                
    //如果經過以上判定,任務不為空,那么返回任務
                if (r != null)
                    
    return r;
                
    //如果取到任務為空,那么判定是否可以退出
                if (workerCanExit()) {
                    
    //如果整個線程池狀態變為SHUTDOWN或者TERMINATED,那么將所有worker interrupt (如果正在執行,那繼續讓其執行)
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    
    return null;
                }

                
    // Else retry
            }
     catch (InterruptedException ie) {
                
    // On interruption, re-check runState
            }

    }

        }


    //worker從workQueue中取不到數據的時候調用此方法,以決定自己是否跳出取任務的無限循環,從而結束此worker的運行
    private boolean workerCanExit() {
        
    final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        
    boolean canExit;
        
    try {
            
    /*
            *線程池狀態為stop或者terminated,
            *或者任務隊列里面任務已經為空,
            *或者允許線程池線程空閑超時(實現方式是從工作隊列拿最多keepAliveTime的任務,超過這個時間就返回null了)并且
             *當前線程池大于corePoolSize(>1)
            *那么允許線程結束
            *static final int RUNNING    = 0;
            *static final int SHUTDOWN   = 1;
            *static final int STOP       = 2;
            *static final int TERMINATED = 3;
            
    */

            canExit 
    = runState >= STOP ||
            workQueue.isEmpty() 
    ||
           (allowCoreThreadTimeOut 
    &&
            poolSize 
    > Math.max(1,corePoolSize));
        }
     finally {
            mainLock.unlock();
        }

        
    return canExit;
    }

    當提交任務是,線程池都已滿,并且工作隊列也無空閑位置的情況下,ThreadPoolExecutor會執行reject操作,JDK提供了四種reject策略,包括AbortPolicy(直接拋RejectedException Exception),CallerRunsPolicy(提交任務線程自己執行,當然這時剩余任務也將無法提交),DiscardOldestPolicy(將線程池的workQueue任務隊列里面最老的任務剔除,將新任務丟入),DiscardPolicy(無視,忽略此任務,并且立即返回).實例化ThreadPoolExecutor,如果不指定任何飽和策略,默認將使用AbortPolicy.

    個人認為這些飽和策略并不十分理想
    ,特別是在應用既要保證快速,又要高可用的情況下,我的想法是能夠加入超時等待策略,也就是提交線程時線程池滿,能夠park住提交任務的線程,一旦有空閑,能在第一時間通知到等待線程. 這個實際上和主線程執行相似,但是主線程執行期間即使線程池有大量空閑也不會立即可以提交任務,效率上后者可能會比較低,特別是執行慢速任務.

    實例化Worker的時候會調用ThreadFactoryaddThread(Runnable r)方法返回一個Thread,這個線程工廠是可以在ThreadPoolExecutor實例化的時候指定的,如果不指定,那么將會使用DefaultThreadFactory, 這個也就是提供給使用者命名線程,線程歸組,是否是demon等線程相關屬性設置的機會.

    beforeExecuteafterExecute是提供給使用者擴展的,這兩個方法會在worker runTask之前和run完畢之后分別調用.JDK注釋里 Doug Lea(concurrent包作者)展示了beforeExecute一個很有趣的示例.代碼如下.

    class PausableThreadPoolExecutor extends ThreadPoolExecutor {
        
    private boolean isPaused;
        
    private ReentrantLock pauseLock = new ReentrantLock();
        
    private Condition unpaused = pauseLock.newCondition();
     
    public PausableThreadPoolExecutor(super(); }

    protected void beforeExecute(Thread t, Runnable r) {
        
    super.beforeExecute(t, r);
        pauseLock.lock();
        
    try {
            
    while (isPaused) unpaused.await();
        }
     catch (InterruptedException ie) {
            t.interrupt();
        }
     finally {
            pauseLock.unlock();
        }

    }

     
    public void pause() {
        pauseLock.lock();
        
    try {
            isPaused 
    = true;
        }
     finally {
            pauseLock.unlock();
        }

    }


    public void resume() {
        pauseLock.lock();
        
    try {
            isPaused 
    = false;
            unpaused.signalAll();
        }
     finally {
            pauseLock.unlock();
        }

    }

      }

    使用這個線程池,用戶可以隨時調用pause中止剩余任務執行,當然也可以使用resume重新開始執行剩余任務.

    ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor
    是一個很實用的類,它的實現核心是基于DelayedWorkQueue.ScheduledThreadPoolExecutor的繼承結構上來看,各位應該能夠看出些端倪來,就是ScheduledThreadPoolExecutorThreadPoolExecutor中的任務隊列設置成了DelayedWorkQueue,這也就是說,線程池Worker從任務隊列中取的一個任務,需要等待這個隊列中最短超時任務的超時,也就是實現定時的效果.所以ScheduledThreadPoolExecutor所做的工作其實是比較少的.主要就是實現任務的實例化并加入工作隊列,以及支持scheduleAtFixedRatescheduleAtFixedDelay這種周期性任務執行.
    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
               
    super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);
    }

    對于scheduleAfFixedRatescheduleAtFiexedDelay這種周期性任務支持,是由ScheduledThreadPoolExecutor內部封裝任務的ScheduledFutureTask來實現的.這個類在執行任務后,對于周期性任務,它會處理周期時間,并將自己再次丟入線程池的工作隊列,從而達到周期執行的目的.
    private void runPeriodic() {
             
    boolean ok = ScheduledFutureTask.super.runAndReset();
              
    boolean down = isShutdown();
             
    // Reschedule if not cancelled and not shutdown or policy allows
          if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) {
                   
    long p = period;
                   
    if (p > 0)
                          time 
    += p;
                   
    else
                          time 
    = triggerTime(-p);
         
                    ScheduledThreadPoolExecutor.
    super.getQueue().add(this);
             }

            
    // This might have been the final executed delayed
           
    // task.  Wake up threads to check.
            else if (down)
                  interruptIdleWorkers();
    }

     

    2.       CompletionService


    ExecutorCompletionService

    CompletionService定義了線程池執行任務集,可以依次拿到任務執行完畢的Future,ExecutorCompletionService是其實現類,先舉個例子,如下代碼,這個例子中,需要注意ThreadPoolExecutor核心池一定保證能夠讓任務提交并且馬上執行,而不是放到等待隊列中去,那樣次序將會無法控制,CompletionService也將失去效果(其實核心池中的任務完成順序還是準確的).

    public static void main(String[] args) throws InterruptedException, ExecutionException{
        ThreadPoolExecutor es
    =new ThreadPoolExecutor(10152000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.AbortPolicy());
        CompletionService
    <String> cs=new ExecutorCompletionService<String>(es);    
        cs.submit(
    new Callable<String>() {
         @Override
         
    public String call() throws Exception {
             Thread.currentThread().sleep(
    1000);
             
    return "i am sleeped 1000 milliseconds";
         }

        }
    );
            
        cs.submit(
    new Callable<String>() {
         @Override
         
    public String call() throws Exception {
             Thread.currentThread().sleep(
    5000);
             
    return "i am sleeped 5000 milliseconds";
         }

        }
    );
            
        cs.submit(
    new Callable<String>() {
         @Override
         
    public String call() throws Exception {
             Thread.currentThread().sleep(
    4000);
             
    return "i am sleeped 4000 milliseconds";
         }

        }
    );
            
        cs.submit(
    new Callable<String>() {
         @Override
             
    public String call() throws Exception {
              Thread.currentThread().sleep(
    2000);
                  
    return "i am sleeped 2000 milliseconds";
          }

    }
    );
            
        
    for(int i=0;i<4;i++){
            Future
    <String> fu=cs.take();
            System.out.println(fu.get());
        }

    }

      執行結果:
            i am sleeped 1000 milliseconds 
       i am sleeped 2000 milliseconds
      
    i am sleeped 4000 milliseconds
      
    i am sleeped 5000 milliseconds
    從執行結果看來,我們發現先完成的任務先被拿出來了,直到所有任務被執行完畢,也就是CompletionService的效果達到了.

    ExecutorCompletionService并不復雜,關鍵的一個點就是它的內部類QueueingFuture繼承了FutureTask,并且實現了done()方法,done()方法是在線程池任務執行完畢,最后調用FutureTask的方法(這在 JAVA LOCK代碼淺析(http://m.tkk7.com/BucketLi/archive/2010/09/30/333471.html)一文中對于FutureTask代碼解析有提到)

    QueueingFuturedone()方法實現是將執行完的任務(FutureTask)丟入全局的完成隊列中(completionQueue),那么take是從這個blockingqueue中取元素.也就是任務完成就會有元素,即生產者消費者.

    這種實現的思想是將原本在單個FutureTask上的等待轉化為在BlockingQueue上的等待,即對全部FutureTask的等待,從而達到哪個先完成,哪個就可取執行結果的效果.

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture
    <V> task) {
            
    super(task, null);
            his.task 
    = task;
        }

        
    protected void done() { completionQueue.add(task); }
        
    private final Future<V> task;
    }

    總結:
    JUC提供的線程池體系核心是在ThreadPoolExecutor, ScheduledThreadPoolExecutorExecutorCompletionService只是對其擴展,這里沒有去細講Executors這個便捷類,這個類提供很多便捷的線程池構建方法.各位使用的時候不妨去看下.



    posted on 2010-12-16 13:57 BucketLI 閱讀(5071) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 中文字幕无码一区二区免费| 久久99亚洲综合精品首页| 最近的2019免费中文字幕| 亚洲另类无码专区首页| 精品亚洲国产成AV人片传媒| 亚洲中文字幕久久精品无码APP | 久久久久亚洲AV成人网| 国产一级理论免费版| 黄瓜视频高清在线看免费下载| 香蕉免费一区二区三区| 91成人免费观看在线观看| 一级毛片高清免费播放| 免费播放美女一级毛片| 四虎必出精品亚洲高清| 亚洲国产视频一区| 亚洲视频国产精品| 麻豆亚洲av熟女国产一区二| 亚洲Av永久无码精品三区在线| 久久久久久久亚洲精品| 亚洲日本在线观看视频| 亚洲精品视频免费| 亚洲国产成人精品女人久久久| 又大又硬又爽免费视频| 四虎永久成人免费影院域名| 免费精品一区二区三区在线观看| 日韩免费a级毛片无码a∨| 9久9久女女免费精品视频在线观看 | 另类小说亚洲色图| 国产精品亚洲一区二区在线观看| 狠狠色伊人亚洲综合网站色 | 热久久精品免费视频| 成人免费无毒在线观看网站 | 亚洲午夜无码久久久久软件 | 四虎永久免费网站免费观看| 小小影视日本动漫观看免费| 国内自产拍自a免费毛片| 日本特黄特色免费大片| 四虎永久成人免费| 国产成人综合亚洲AV第一页| 亚洲免费观看视频| 久久精品国产亚洲av影院|