<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Vincent

    Vicent's blog
    隨筆 - 74, 文章 - 0, 評論 - 5, 引用 - 0
    數(shù)據(jù)加載中……

    一個(gè)線程池的實(shí)現(xiàn)

    設(shè)計(jì)目標(biāo)
    ?????提供一個(gè)線程池的組件,具有良好的伸縮性,當(dāng)線程夠用時(shí),銷毀不用線程,當(dāng)線程不夠用時(shí),自動增加線程數(shù)量;
    ?????提供一個(gè)工作任務(wù)接口和工作隊(duì)列,實(shí)際所需要的任務(wù)都必須實(shí)現(xiàn)這個(gè)工作任務(wù)接口,然后放入工作隊(duì)列中;
    ?????線程池中的線程從工作隊(duì)列中,自動取得工作任務(wù),執(zhí)行任務(wù)。
    主要控制類和功能接口設(shè)計(jì)
    線程池管理器?ThreadPoolManager?的功能:
    ?????管理線程池中的各個(gè)屬性變量
    ü????最大工作線程數(shù)
    ü????最小工作線程數(shù)
    ü????激活的工作線程總數(shù)
    ü????睡眠的工作線程總數(shù)
    ü????工作線程總數(shù)?(即:激活的工作線程總數(shù)+睡眠的工作線程總數(shù))
    ?????創(chuàng)建工作線程
    ?????銷毀工作線程
    ?????啟動處于睡眠的工作線程
    ?????睡眠處于激活的工作線程
    ?????縮任務(wù):當(dāng)工作線程總數(shù)小于或等于最小工作線程數(shù)時(shí),銷毀多余的睡眠的工作線程,使得現(xiàn)有工作線程總數(shù)等于最小工作任務(wù)總數(shù)
    ?????伸任務(wù):當(dāng)任務(wù)隊(duì)列任務(wù)總數(shù)大于工作線程數(shù)時(shí),增加工作線程總數(shù)至最大工作線程數(shù)
    ?????提供線程池啟動接口
    ?????提供線程池銷毀接口
    工作線程?WorkThread??的功能:
    ?????從工作隊(duì)列取得工作任務(wù)
    ?????執(zhí)行工作任務(wù)接口中的指定任務(wù)
    工作任務(wù)接口?ITask???的功能:
    ?????提供指定任務(wù)動作
    工作隊(duì)列?IWorkQueue??的功能:
    ?????提供獲取任務(wù)接口,并刪除工作隊(duì)列中的任務(wù);
    ?????提供加入任務(wù)接口;
    ?????提供刪除任務(wù)接口;
    ?????提供取得任務(wù)總數(shù)接口;
    ?????提供自動填任務(wù)接口;(當(dāng)任務(wù)總數(shù)少于或等于默認(rèn)總數(shù)的25%時(shí),自動裝填)
    ?????提供刪除所有任務(wù)接口;


    Code


    ThreadPoolManager:
    =====================================
    CODE:
    package test.thread.pool1;
    import java.util.ArrayList;
    import java.util.List;
    import test.thread.pool1.impl.MyWorkQueue;
    
    /**
     * <p>Title: 線程池管理器</p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class ThreadPoolManager {
      /*最大線程數(shù)*/
      private int threads_max_num;
    
      /*最小線程數(shù)*/
      private int threads_min_num;
      
      /* 線程池線程增長步長 */
      private int threads_increase_step = 5;
    
      /* 任務(wù)工作隊(duì)列 */
      private IWorkQueue queue;
      
      /* 線程池監(jiān)視狗 */
      private PoolWatchDog poolWatchDog ;
      
      /* 隊(duì)列線程 */
      private Thread queueThread ;
      
      /* 線程池 封裝所有工作線程的數(shù)據(jù)結(jié)構(gòu) */
      private List pool = new ArrayList();
      
      /* 線程池中 封裝所有鈍化后的數(shù)據(jù)結(jié)構(gòu)*/
      private List passivePool = new ArrayList();
      
      /* 空閑60秒 */
      private static final long IDLE_TIMEOUT = 60000L;
      
      /* 關(guān)閉連接池標(biāo)志位 */
      private boolean close = false;
      
      /**
       * 線程池管理器
       * @param queue 任務(wù)隊(duì)列
       * @param threads_min_num 工作線程最小數(shù)
       * @param threads_max_num 工作線程最大數(shù)
       */
      public ThreadPoolManager(int threads_max_num
                              ,int threads_min_num
                              ,IWorkQueue queue){
        this.threads_max_num = threads_max_num;
        this.threads_min_num = threads_min_num;
        this.queue = queue;    
      }
    
      /**
       * 線程池啟動
       */
      public void startPool(){
        System.out.println("=== startPool..........");
        poolWatchDog = new PoolWatchDog("PoolWatchDog");
        poolWatchDog.setDaemon(true);
        poolWatchDog.start();
        System.out.println("=== startPool..........over");
      }
    
      /**
       * 線程池銷毀接口
       */
      public void destoryPool(){
        System.out.println("==========================DestoryPool starting ...");
        this.close = true;
        int pool_size = this.pool.size();
        
        //中斷隊(duì)列線程
        System.out.println("===Interrupt queue thread ... ");
        queueThread.interrupt();
        queueThread = null;
        
        System.out.println("===Interrupt thread pool ... ");
        Thread pool_thread = null;
        for(int i=0; i<pool_size; i++){
          pool_thread = (Thread)pool.get(i);
          if(pool_thread !=null 
          && pool_thread.isAlive() 
          && !pool_thread.isInterrupted()){
            pool_thread.interrupt();
            System.out.println("Stop pool_thread:"
                              +pool_thread.getName()+"[interrupt] "
                              +pool_thread.isInterrupted());
          }
        }//end for
        
        if(pool != null){
          pool.clear();
        }
        if(passivePool != null){
          pool.clear();
        }
        
        try{
          System.out.println("=== poolWatchDog.join() starting ...");
          poolWatchDog.join();
          System.out.println("=== poolWatchDog.join() is over ...");
        }
        catch(Throwable ex){
          System.out.println("###poolWatchDog ... join method throw a exception ... "
                              +ex.toString());
        }
        
        poolWatchDog =null;
        System.out.println("==============================DestoryPool is over ...");    
      }
      
      
      public static void main(String[] args) throws Exception{
        ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
        
        threadPoolManager1.startPool();
        Thread.sleep(60000);
        threadPoolManager1.destoryPool();
      }
      
      /**
       * 線程池監(jiān)視狗
       */
      private class PoolWatchDog extends Thread{
        public PoolWatchDog(String name){
          super(name);
        }
      
        public void run(){
          Thread workThread = null;
          Runnable run = null;
          
          //開啟任務(wù)隊(duì)列線程,獲取數(shù)據(jù)--------
          System.out.println("===QueueThread starting ... ... ");
          queueThread = new Thread(new QueueThread(),"QueueThread");
          queueThread.start();
          
          System.out.println("===Initial thread Pool ... ...");
          //初始化線程池的最小線程數(shù),并放入池中
          for(int i=0; i<threads_min_num; i++){
            run = new WorkThread();
            workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
            workThread.start();
            if(i == threads_min_num -1){
              workThread = null;
              run = null;
            }
          }
          System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());
    
          //線程池線程動態(tài)增加線程算法--------------
          while(!close){
          
            //等待5秒鐘,等上述線程都啟動----------
            synchronized(this){          
              try{
                System.out.println("===Wait the [last time] threads starting ....");
                this.wait(15000);
              }
              catch(Throwable ex){
                System.out.println("###PoolWatchDog invoking is failure ... "+ex);
              }
            }//end synchronized
              
            //開始增加線程-----------------------spread動作
            int queue_size = queue.getTaskSize();
            int temp_size = (queue_size - threads_min_num);
            
            if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
              System.out.println("================Spread thread pool starting ....");
              for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
                System.out.println("=== Spread thread num : "+i);
                run = new WorkThread();
                workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
                workThread.start();
              }//end for
              
              workThread = null;
              run = null;    
              System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
            }//end if
              
            //刪除已經(jīng)多余的睡眠線程-------------shrink動作
            int more_sleep_size = pool.size() - threads_min_num;//最多能刪除的線程數(shù)
            int sleep_threads_size = passivePool.size();
            if(more_sleep_size >0 && sleep_threads_size >0){
              System.out.println("================Shrink thread pool starting ....");        
              for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
                System.out.println("=== Shrink thread num : "+i);
                Thread removeThread = (Thread)passivePool.get(0);
                if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
                  removeThread.interrupt();
                }
              }
              System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
            }
    
            System.out.println("===End one return [shrink - spread operator] ....");    
          }//end while
        }//end run 
      }//end private class
      
      /**
       * 工作線程
       */
      class WorkThread implements Runnable{
      
        public WorkThread(){
        }
      
        public void run(){
          String name = Thread.currentThread().getName();
          System.out.println("===Thread.currentThread():"+name);
          pool.add(Thread.currentThread());    
        
          while(true){
          
            //獲取任務(wù)---------
            ITask task = null;
            try{
              System.out.println("===Get task from queue is starting ... ");
              //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
              if(Thread.currentThread().isInterrupted()){
                System.out.println("===Breaking current thread and jump whlie [1] ... ");
                break;
              }
              task = queue.getTask();
            }
            catch(Throwable ex){
              System.out.println("###No task in queue:"+ex);
            }//end tryc
            
            if(task != null){
              //執(zhí)行任務(wù)---------
              try{
                System.out.println("===Execute the task is starting ... ");
                //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
                if(Thread.currentThread().isInterrupted()){
                  System.out.println("===Breaking current thread and jump whlie [1] ... ");
                  break;
                }     
                task.executeTask();
                //任務(wù)執(zhí)行完畢-------
                System.out.println("===Execute the task is over ... ");
              }
              catch(Throwable ex){
                System.out.println("###Execute the task is failure ... "+ex);
              }//end tryc
              
            }else{
              //沒有任務(wù),則鈍化線程至規(guī)定時(shí)間--------
              synchronized(this){
                try{
                  System.out.println("===Passivate into passivePool ... ");
                  
                  //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
                  boolean isInterrupted = Thread.currentThread().isInterrupted();
                  if(isInterrupted){
                    System.out.println("===Breaking current thread and jump whlie [1] ... ");
                    break;
                  }
    //              passivePool.add(this);
                passivePool.add(Thread.currentThread());
    
                  
                  //準(zhǔn)備睡眠線程-------
                  isInterrupted = Thread.currentThread().isInterrupted();
                  if(isInterrupted){
                    System.out.println("===Breaking current thread and jump whlie [2] ... ");
                    break;
                  }              
                  this.wait(IDLE_TIMEOUT);
                }
                catch(Throwable ex1){
                  System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
                  break;
                }
              }          
            }        
          }//end while--------
          
          if(pool.contains(passivePool)){
            pool.remove(this);
          }
          if(passivePool.contains(passivePool)){
            passivePool.remove(this);
          }
          System.out.println("===The thread execute over ... "); 
        }//end run----------
      }
      
      
      class QueueThread implements Runnable{
      
        public QueueThread(){
        }
      
        public void run(){
          while(true){
            //自動裝在任務(wù)--------
            queue.autoAddTask();
            System.out.println("===The size of queue's task is "+queue.getTaskSize());
          
            synchronized(this){
              if(Thread.currentThread().isInterrupted()){
                break;
              }else{
                  try{
                    this.wait(queue.getLoadDataPollingTime());
                  }
                  catch(Throwable ex){
                    System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                    break;
                  }
              }//end if
            }//end synchr
            
          }//end while
        }//end run
      } 
    }
    






    WorkQueue
    =====================================
    CODE:
    package test.thread.pool1;
    
    import java.util.LinkedList;
    import test.thread.pool1.impl.MyTask;
    
    /**
     * <p>Title: 工作隊(duì)列對象 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public abstract class WorkQueue implements IWorkQueue{
      /* 預(yù)計(jì)裝載量 */
      private int load_size;
      
      /* 數(shù)據(jù)裝載輪循時(shí)間 */
      private long load_polling_time;
      
      /* 隊(duì)列 */
      private LinkedList queue = new LinkedList();
      
      /**
       * 
       * @param load_size 預(yù)計(jì)裝載量
       * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
       */
      public WorkQueue(int load_size,long load_polling_time){
        this.load_size = (load_size <= 10) ? 10 : load_size;
        this.load_polling_time = load_polling_time;
      }
    
      /* 數(shù)據(jù)裝載輪循時(shí)間 */
      public long getLoadDataPollingTime(){
        return this.load_polling_time;
      }
    
    
      /*獲取任務(wù),并刪除隊(duì)列中的任務(wù)*/
      public synchronized ITask getTask(){
        ITask task = (ITask)queue.getFirst();
        queue.removeFirst();
        return task;
      }
    
      /*加入任務(wù)*/
      public void  addTask(ITask task){
        queue.addLast(task);
      }
    
      /*刪除任務(wù)*/
      public synchronized void removeTask(ITask task){
        queue.remove(task);
      }
    
      /*任務(wù)總數(shù)*/
      public synchronized int getTaskSize(){
        return queue.size();
      }
    
      /*自動裝填任務(wù)*/
      public synchronized void autoAddTask(){
      
        synchronized(this){
          float load_size_auto = load_size - getTaskSize() / load_size;
          System.out.println("===load_size_auto:"+load_size_auto);
          
          if(load_size_auto > 0.25){        
            autoAddTask0();
          }
          else {
            System.out.println("=== Not must load new work queue ... Now! ");
          }    
        }
      }
    
      /*刪除所有任務(wù)*/
      public synchronized void clearAllTask(){
        queue.clear();
      }
      
      /**
       * 程序員自己實(shí)現(xiàn)該方法
       */
      protected abstract void autoAddTask0();
    }
    





    MyWorkQueue
    =====================================
    CODE:
    package test.thread.pool1.impl;
    
    import java.util.LinkedList;
    import test.thread.pool1.WorkQueue;
    
    /**
     * <p>Title: 例子工作隊(duì)列對象 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class MyWorkQueue extends WorkQueue{
    
      /**
       * @param load_size 預(yù)計(jì)裝載量
       * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
       */
      public MyWorkQueue(int load_size,long load_polling_time){
        super(load_size,load_polling_time);
      }
    
      /**
       * 自動加載任務(wù)
       */
      protected synchronized void autoAddTask0(){
        //-------------------
        System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
        for(int i=0; i<10; i++){
          System.out.println("===add task :"+i);
          this.addTask(new MyTask());
        }    
        //-------------------
      }
    }
    





    MyTask
    =====================================
    CODE:
    package test.thread.pool1.impl;
    import test.thread.pool1.ITask;
    
    /**
     * <p>Title: 工作任務(wù)接口 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class MyTask implements ITask {
    
      /**
       * 執(zhí)行的任務(wù)
       * @throws java.lang.Throwable
       */
      public void executeTask() throws Throwable{
        System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
      }
    }
    

    posted on 2006-08-24 16:55 Binary 閱讀(3731) 評論(2)  編輯  收藏 所屬分類: j2se

    評論

    # re: 一個(gè)線程池的實(shí)現(xiàn)  回復(fù)  更多評論   

    編譯時(shí),
    ITask出錯(cuò)
    IWorkQueue找不到.
    是不是弄少了段代碼?
    2007-03-05 23:29 | Peng

    # re: 一個(gè)線程池的實(shí)現(xiàn)  回復(fù)  更多評論   

    文章不錯(cuò),但是缺代碼了,作者能補(bǔ)全嗎?
    2008-05-25 23:30 | lindily
    主站蜘蛛池模板: 一级特黄录像免费播放中文版| 久久久久亚洲av无码专区| 日韩免费毛片视频| 毛片a级毛片免费播放下载| 亚洲免费视频播放| ww4545四虎永久免费地址| 亚洲免费电影网站| 国产精品成人免费福利| 99国产精品永久免费视频| www.黄色免费网站| 最近免费中文字幕视频高清在线看| 久久久久久久久免费看无码| 成年女人视频网站免费m| 日日AV拍夜夜添久久免费| 免费在线观看日韩| 亚洲国产精品人人做人人爱| 亚洲片国产一区一级在线观看| 国产亚洲精品成人a v小说| 亚洲精品二区国产综合野狼| 亚洲s色大片在线观看| 亚洲欧洲日产韩国在线| 99久久婷婷国产综合亚洲| 亚洲大成色www永久网址| 美女啪啪网站又黄又免费| 国产99久久久久久免费看| 日韩免费高清播放器| 中文字幕免费视频一| 最近最好的中文字幕2019免费| 免费国产真实迷j在线观看| 国外亚洲成AV人片在线观看| 午夜影视日本亚洲欧洲精品一区| 亚洲日产2021三区在线| WWW亚洲色大成网络.COM| 久久国产精品免费一区二区三区| 三年片在线观看免费观看大全动漫 | 四虎www成人影院免费观看| 男人的天堂亚洲一区二区三区| 亚洲 另类 无码 在线| 亚洲成年人在线观看| 亚洲中文无码mv| 一个人看的www在线免费视频|