<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Concurrent學(xué)習(xí)—Executor框架


        java.util.concurrent包分成了三個部分,分別是:
                               java.util.concurrent
     
                               java.util.concurrent.atomic
                               java.util.concurrent.lock
        
    內(nèi)容涵蓋了并發(fā)集合類、線程池機(jī)制、同步互斥機(jī)制、線程安全的變量更新工具類、鎖等等常用工具。 

     

    并發(fā)編程的一種編程方式是把任務(wù)拆分為一些列的小任務(wù),即Runnable,然后在提交給一個Executor執(zhí)行,Executor.execute(Runnalbe) Executor在執(zhí)行時使用內(nèi)部的線程池完成操作。

        
    例子:
                  
    有一個很大的整數(shù)數(shù)組,需要求這個數(shù)組中所有整數(shù)的和,來計算結(jié)果。  
                
    JDK 7 中的 Fork/Join模式可以解決該問題,http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/)

        
    分析:
                 
    采用多線程(任務(wù)),并且還要分割List,每一小塊的數(shù)組采用一個線程(任務(wù))進(jìn)行計算其和,那么我們必須要等待所有的線程(任務(wù))完成之后才能得到正確的結(jié)果.

    步驟:

    • 分割數(shù)組,根據(jù)采用的線程(任務(wù))數(shù)平均分配,即array.length/threadCounts。
    • 定義一個記錄“很大數(shù)組”中所有整數(shù)和的變量sum,采用一個線程(任務(wù))處理一個分割后的子數(shù)組,計算子數(shù)組中所有整數(shù)和(subSum),然后把和(subSum)累加到sum上。
    • 等待所有線程(任務(wù))完成后輸出總和(sum)的值。

     

    /**
     * 并行計算數(shù)組的和, 測試類
     * 
     * 
    @author lsb
     *
     
    */

    public class MainTest {
        
    public static void main(String[] args) {
            
    int[] numbers = new int[] 123456781011 };
            CalcArrayTotal calc 
    = new CalcArrayTotal();
            Long sum 
    = calc.sum(numbers);
            System.out.println(sum);
            calc.close();
        }

    }


    主要實(shí)現(xiàn)類:

    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import com.li.senbiao.Thread.concurrent.test1.SumCalculator;

    public class CalcArrayTotal {

        
    private ExecutorService exec;

        
    private CompletionService<Long> completionService;

        
    private int cpuCoreNumber;

        
    public CalcArrayTotal() {
            cpuCoreNumber 
    = Runtime.getRuntime().availableProcessors();
            exec 
    = Executors.newFixedThreadPool(cpuCoreNumber);
            completionService 
    = new ExecutorCompletionService<Long>(exec);
        }


        
    public Long sum(final int[] numbers) {
            
    // 根據(jù)CPU核心個數(shù)拆分任務(wù),創(chuàng)建FutureTask并提交到Executor
            for (int i = 0; i < cpuCoreNumber; i++{
                
    int increment = numbers.length / cpuCoreNumber + 1;
                
    int start = increment * i;
                
    int end = increment * i + increment;
                
    if (end > numbers.length) {
                    end 
    = numbers.length;
                }

                SumCalculator subCalc 
    = new SumCalculator(numbers, start, end);
                
    if (!exec.isShutdown()) {
                    
    /**
                     * 生產(chǎn)者 submit() 執(zhí)行的 任務(wù)。使用者 take() 已完成的任務(wù), 
                     * 并按照完成這些任務(wù)的順序處理它們的結(jié)果  。
                     * 也就是調(diào)用CompletionService 的 take 方法是,
                     * 會返回按完成順序放回任務(wù)的結(jié)果, CompletionService 內(nèi)部維護(hù)了一個 阻塞隊(duì)列 BlockingQueue ,
                     * 如果沒有任務(wù)完成, take() 方法也會阻塞。
                     
    */

                    completionService.submit(subCalc);
                }

            }

            
    return getResult();
        }


        
    /**
         * 迭代每個只任務(wù),獲得部分和,相加返回
         
    */

        
    public Long getResult() {
            Long result 
    = 0L;
            
    for (int i = 0; i < cpuCoreNumber; i++{
                
    try {
                    Long subSum 
    = completionService.take().get();
                    result 
    += subSum;
                }
     catch (InterruptedException e) {
                    e.printStackTrace();
                }
     catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }

            
    return result;
        }


        
    public void close() {
            exec.shutdown();
        }


    }


    一組的計算和:

    import java.util.concurrent.Callable;

    /**
     * 一組計算和值 
     * 
     * 
    @author lsb
     *
     
    */

    public class SumCalculator implements Callable<Long> {

        
    private int[] numbers;

        
    private int start;

        
    private int end;

        
    public SumCalculator(final int[] numbers, int start, int end) {
            
    this.numbers = numbers;
            
    this.start = start;
            
    this.end = end;
        }


        @Override
        
    public Long call() throws Exception {
            Long sum 
    = 0L;
            
    for (int i = start; i < end; i++{
                sum 
    += numbers[i];
            }

            
    return sum;
        }

    }






     


         一、Executors創(chuàng)建線程池

            Executors類,提供了一系列工廠方法用于創(chuàng)先線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。

             // 創(chuàng)建固定數(shù)目線程的線程池
            public static ExecutorService newFixedThreadPool(int nThreads)

           
            // 創(chuàng)建一個可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

            public static ExecutorService newCachedThreadPool() 
        
            // 創(chuàng)建一個單線程化的Executor

            public static ExecutorService newSingleThreadExecutor()

           
           //  創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類

            public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)



       二、ExecutorService 與生命周期

          ExecutorService 擴(kuò)展了Executor 并添加了一些生命周期管理的方法。一個Executor 的生命周期有三種狀態(tài),運(yùn)行 ,關(guān)閉 ,終止 。Executor 創(chuàng)建時處于運(yùn)行狀態(tài)。當(dāng)調(diào)用ExecutorService.shutdown() 后,處于關(guān)閉狀態(tài),isShutdown() 方法返 回true 。這時,不應(yīng)該再想Executor 中添加任務(wù),所有已添加的任務(wù)執(zhí)行完畢后,Executor 處于終止?fàn)顟B(tài),isTerminated() 返 回true 。

       如果Executor 處于關(guān)閉狀態(tài),往Executor 提交任務(wù)會拋出unchecked exception RejectedExecutionException 。

     

     

        三、使用Callable ,F(xiàn)uture 返回結(jié)果

         Future<V> 代表一個異步執(zhí)行的操作,通過get() 方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則,get() 會使當(dāng)前 線程阻塞。FutureTask<V> 實(shí)現(xiàn)了Future<V> 和Runable<V> 。Callable 代表一個 有返回值得操作。

    ExecutoreService 提供了submit() 方法,傳遞一個Callable ,或Runnable ,返回Future 。如果Executor 后臺線程池還沒有完成Callable 的計算,這調(diào)用返回Future 對象的get() 方法,會阻塞直到計算完成。





        

    posted on 2011-10-12 17:25 胡鵬 閱讀(2442) 評論(0)  編輯  收藏 所屬分類: java基礎(chǔ)

    導(dǎo)航

    <2011年10月>
    2526272829301
    2345678
    9101112131415
    16171819202122
    23242526272829
    303112345

    統(tǒng)計

    常用鏈接

    留言簿(3)

    隨筆分類

    隨筆檔案

    agile

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 中文字幕免费人成乱码中国| 亚洲日本一线产区和二线 | 成人妇女免费播放久久久| 国产最新凸凹视频免费| 日韩成人精品日本亚洲| 亚洲国产精品专区在线观看| 特级毛片免费播放| 伊人久久亚洲综合| 伊人久久免费视频| 亚洲国产日韩在线一区| 成年性生交大片免费看| 亚洲爆乳无码专区www| 国产99视频精品免费视频7| 污污视频网站免费观看| 亚洲精品无码久久一线| 69影院毛片免费观看视频在线| 91嫩草亚洲精品| 永久在线毛片免费观看| 成人久久久观看免费毛片| 国产亚洲精品自在久久| 日本免费xxxx| 亚洲av成人一区二区三区观看在线| 免费人成网站在线播放| 久草视频在线免费看| 亚洲第一区二区快射影院| 国产小视频在线观看免费| 99久久国产精品免费一区二区| 亚洲午夜国产精品| 日本视频免费在线| 免费成人高清在线视频| 亚洲精品无码久久久久久| 亚洲人成伊人成综合网久久久 | 四虎影视无码永久免费| 亚洲视频在线视频| 日本视频免费在线| 污污网站免费观看| 国产亚洲男人的天堂在线观看| 亚洲国产精品VA在线看黑人| 久久WWW色情成人免费观看| 一日本道a高清免费播放| 亚洲第一香蕉视频|