設(shè)計(jì)目標(biāo)
?????提供一個(gè)線程池的組件,具有良好的伸縮性,當(dāng)線程夠用時(shí),銷毀不用線程,當(dāng)線程不夠用時(shí),自動增加線程數(shù)量;
?????提供一個(gè)工作任務(wù)接口和工作隊(duì)列,實(shí)際所需要的任務(wù)都必須實(shí)現(xiàn)這個(gè)工作任務(wù)接口,然后放入工作隊(duì)列中;
?????線程池中的線程從工作隊(duì)列中,自動取得工作任務(wù),執(zhí)行任務(wù)。
主要控制類和功能接口設(shè)計(jì)
線程池管理器?ThreadPoolManager?的功能:
?????管理線程池中的各個(gè)屬性變量
ü????最大工作線程數(shù)
ü????最小工作線程數(shù)
ü????激活的工作線程總數(shù)
ü????睡眠的工作線程總數(shù)
ü????工作線程總數(shù)?(即:激活的工作線程總數(shù)+睡眠的工作線程總數(shù))
?????創(chuàng)建工作線程
?????銷毀工作線程
?????啟動處于睡眠的工作線程
?????睡眠處于激活的工作線程
?????縮任務(wù):當(dāng)工作線程總數(shù)小于或等于最小工作線程數(shù)時(shí),銷毀多余的睡眠的工作線程,使得現(xiàn)有工作線程總數(shù)等于最小工作任務(wù)總數(shù)
?????伸任務(wù):當(dāng)任務(wù)隊(duì)列任務(wù)總數(shù)大于工作線程數(shù)時(shí),增加工作線程總數(shù)至最大工作線程數(shù)
?????提供線程池啟動接口
?????提供線程池銷毀接口
工作線程?WorkThread??的功能:
?????從工作隊(duì)列取得工作任務(wù)
?????執(zhí)行工作任務(wù)接口中的指定任務(wù)
工作任務(wù)接口?ITask???的功能:
?????提供指定任務(wù)動作
工作隊(duì)列?IWorkQueue??的功能:
?????提供獲取任務(wù)接口,并刪除工作隊(duì)列中的任務(wù);
?????提供加入任務(wù)接口;
?????提供刪除任務(wù)接口;
?????提供取得任務(wù)總數(shù)接口;
?????提供自動填任務(wù)接口;(當(dāng)任務(wù)總數(shù)少于或等于默認(rèn)總數(shù)的25%時(shí),自動裝填)
?????提供刪除所有任務(wù)接口;
Code
ThreadPoolManager:
=====================================
CODE:
package test.thread.pool1;
import java.util.ArrayList;
import java.util.List;
import test.thread.pool1.impl.MyWorkQueue;
/**
* <p>Title: 線程池管理器</p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class ThreadPoolManager {
/*最大線程數(shù)*/
private int threads_max_num;
/*最小線程數(shù)*/
private int threads_min_num;
/* 線程池線程增長步長 */
private int threads_increase_step = 5;
/* 任務(wù)工作隊(duì)列 */
private IWorkQueue queue;
/* 線程池監(jiān)視狗 */
private PoolWatchDog poolWatchDog ;
/* 隊(duì)列線程 */
private Thread queueThread ;
/* 線程池 封裝所有工作線程的數(shù)據(jù)結(jié)構(gòu) */
private List pool = new ArrayList();
/* 線程池中 封裝所有鈍化后的數(shù)據(jù)結(jié)構(gòu)*/
private List passivePool = new ArrayList();
/* 空閑60秒 */
private static final long IDLE_TIMEOUT = 60000L;
/* 關(guān)閉連接池標(biāo)志位 */
private boolean close = false;
/**
* 線程池管理器
* @param queue 任務(wù)隊(duì)列
* @param threads_min_num 工作線程最小數(shù)
* @param threads_max_num 工作線程最大數(shù)
*/
public ThreadPoolManager(int threads_max_num
,int threads_min_num
,IWorkQueue queue){
this.threads_max_num = threads_max_num;
this.threads_min_num = threads_min_num;
this.queue = queue;
}
/**
* 線程池啟動
*/
public void startPool(){
System.out.println("=== startPool..........");
poolWatchDog = new PoolWatchDog("PoolWatchDog");
poolWatchDog.setDaemon(true);
poolWatchDog.start();
System.out.println("=== startPool..........over");
}
/**
* 線程池銷毀接口
*/
public void destoryPool(){
System.out.println("==========================DestoryPool starting ...");
this.close = true;
int pool_size = this.pool.size();
//中斷隊(duì)列線程
System.out.println("===Interrupt queue thread ... ");
queueThread.interrupt();
queueThread = null;
System.out.println("===Interrupt thread pool ... ");
Thread pool_thread = null;
for(int i=0; i<pool_size; i++){
pool_thread = (Thread)pool.get(i);
if(pool_thread !=null
&& pool_thread.isAlive()
&& !pool_thread.isInterrupted()){
pool_thread.interrupt();
System.out.println("Stop pool_thread:"
+pool_thread.getName()+"[interrupt] "
+pool_thread.isInterrupted());
}
}//end for
if(pool != null){
pool.clear();
}
if(passivePool != null){
pool.clear();
}
try{
System.out.println("=== poolWatchDog.join() starting ...");
poolWatchDog.join();
System.out.println("=== poolWatchDog.join() is over ...");
}
catch(Throwable ex){
System.out.println("###poolWatchDog ... join method throw a exception ... "
+ex.toString());
}
poolWatchDog =null;
System.out.println("==============================DestoryPool is over ...");
}
public static void main(String[] args) throws Exception{
ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
threadPoolManager1.startPool();
Thread.sleep(60000);
threadPoolManager1.destoryPool();
}
/**
* 線程池監(jiān)視狗
*/
private class PoolWatchDog extends Thread{
public PoolWatchDog(String name){
super(name);
}
public void run(){
Thread workThread = null;
Runnable run = null;
//開啟任務(wù)隊(duì)列線程,獲取數(shù)據(jù)--------
System.out.println("===QueueThread starting ... ... ");
queueThread = new Thread(new QueueThread(),"QueueThread");
queueThread.start();
System.out.println("===Initial thread Pool ... ...");
//初始化線程池的最小線程數(shù),并放入池中
for(int i=0; i<threads_min_num; i++){
run = new WorkThread();
workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
workThread.start();
if(i == threads_min_num -1){
workThread = null;
run = null;
}
}
System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());
//線程池線程動態(tài)增加線程算法--------------
while(!close){
//等待5秒鐘,等上述線程都啟動----------
synchronized(this){
try{
System.out.println("===Wait the [last time] threads starting ....");
this.wait(15000);
}
catch(Throwable ex){
System.out.println("###PoolWatchDog invoking is failure ... "+ex);
}
}//end synchronized
//開始增加線程-----------------------spread動作
int queue_size = queue.getTaskSize();
int temp_size = (queue_size - threads_min_num);
if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
System.out.println("================Spread thread pool starting ....");
for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
System.out.println("=== Spread thread num : "+i);
run = new WorkThread();
workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
workThread.start();
}//end for
workThread = null;
run = null;
System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
}//end if
//刪除已經(jīng)多余的睡眠線程-------------shrink動作
int more_sleep_size = pool.size() - threads_min_num;//最多能刪除的線程數(shù)
int sleep_threads_size = passivePool.size();
if(more_sleep_size >0 && sleep_threads_size >0){
System.out.println("================Shrink thread pool starting ....");
for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
System.out.println("=== Shrink thread num : "+i);
Thread removeThread = (Thread)passivePool.get(0);
if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
removeThread.interrupt();
}
}
System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());
}
System.out.println("===End one return [shrink - spread operator] ....");
}//end while
}//end run
}//end private class
/**
* 工作線程
*/
class WorkThread implements Runnable{
public WorkThread(){
}
public void run(){
String name = Thread.currentThread().getName();
System.out.println("===Thread.currentThread():"+name);
pool.add(Thread.currentThread());
while(true){
//獲取任務(wù)---------
ITask task = null;
try{
System.out.println("===Get task from queue is starting ... ");
//看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
if(Thread.currentThread().isInterrupted()){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
task = queue.getTask();
}
catch(Throwable ex){
System.out.println("###No task in queue:"+ex);
}//end tryc
if(task != null){
//執(zhí)行任務(wù)---------
try{
System.out.println("===Execute the task is starting ... ");
//看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
if(Thread.currentThread().isInterrupted()){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
task.executeTask();
//任務(wù)執(zhí)行完畢-------
System.out.println("===Execute the task is over ... ");
}
catch(Throwable ex){
System.out.println("###Execute the task is failure ... "+ex);
}//end tryc
}else{
//沒有任務(wù),則鈍化線程至規(guī)定時(shí)間--------
synchronized(this){
try{
System.out.println("===Passivate into passivePool ... ");
//看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
boolean isInterrupted = Thread.currentThread().isInterrupted();
if(isInterrupted){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
// passivePool.add(this);
passivePool.add(Thread.currentThread());
//準(zhǔn)備睡眠線程-------
isInterrupted = Thread.currentThread().isInterrupted();
if(isInterrupted){
System.out.println("===Breaking current thread and jump whlie [2] ... ");
break;
}
this.wait(IDLE_TIMEOUT);
}
catch(Throwable ex1){
System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
break;
}
}
}
}//end while--------
if(pool.contains(passivePool)){
pool.remove(this);
}
if(passivePool.contains(passivePool)){
passivePool.remove(this);
}
System.out.println("===The thread execute over ... ");
}//end run----------
}
class QueueThread implements Runnable{
public QueueThread(){
}
public void run(){
while(true){
//自動裝在任務(wù)--------
queue.autoAddTask();
System.out.println("===The size of queue's task is "+queue.getTaskSize());
synchronized(this){
if(Thread.currentThread().isInterrupted()){
break;
}else{
try{
this.wait(queue.getLoadDataPollingTime());
}
catch(Throwable ex){
System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
break;
}
}//end if
}//end synchr
}//end while
}//end run
}
}
WorkQueue
=====================================
CODE:
package test.thread.pool1;
import java.util.LinkedList;
import test.thread.pool1.impl.MyTask;
/**
* <p>Title: 工作隊(duì)列對象 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public abstract class WorkQueue implements IWorkQueue{
/* 預(yù)計(jì)裝載量 */
private int load_size;
/* 數(shù)據(jù)裝載輪循時(shí)間 */
private long load_polling_time;
/* 隊(duì)列 */
private LinkedList queue = new LinkedList();
/**
*
* @param load_size 預(yù)計(jì)裝載量
* @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
*/
public WorkQueue(int load_size,long load_polling_time){
this.load_size = (load_size <= 10) ? 10 : load_size;
this.load_polling_time = load_polling_time;
}
/* 數(shù)據(jù)裝載輪循時(shí)間 */
public long getLoadDataPollingTime(){
return this.load_polling_time;
}
/*獲取任務(wù),并刪除隊(duì)列中的任務(wù)*/
public synchronized ITask getTask(){
ITask task = (ITask)queue.getFirst();
queue.removeFirst();
return task;
}
/*加入任務(wù)*/
public void addTask(ITask task){
queue.addLast(task);
}
/*刪除任務(wù)*/
public synchronized void removeTask(ITask task){
queue.remove(task);
}
/*任務(wù)總數(shù)*/
public synchronized int getTaskSize(){
return queue.size();
}
/*自動裝填任務(wù)*/
public synchronized void autoAddTask(){
synchronized(this){
float load_size_auto = load_size - getTaskSize() / load_size;
System.out.println("===load_size_auto:"+load_size_auto);
if(load_size_auto > 0.25){
autoAddTask0();
}
else {
System.out.println("=== Not must load new work queue ... Now! ");
}
}
}
/*刪除所有任務(wù)*/
public synchronized void clearAllTask(){
queue.clear();
}
/**
* 程序員自己實(shí)現(xiàn)該方法
*/
protected abstract void autoAddTask0();
}
MyWorkQueue
=====================================
CODE:
package test.thread.pool1.impl;
import java.util.LinkedList;
import test.thread.pool1.WorkQueue;
/**
* <p>Title: 例子工作隊(duì)列對象 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MyWorkQueue extends WorkQueue{
/**
* @param load_size 預(yù)計(jì)裝載量
* @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
*/
public MyWorkQueue(int load_size,long load_polling_time){
super(load_size,load_polling_time);
}
/**
* 自動加載任務(wù)
*/
protected synchronized void autoAddTask0(){
//-------------------
System.out.println("===MyWorkQueue ... invoked autoAddTask0() method ...");
for(int i=0; i<10; i++){
System.out.println("===add task :"+i);
this.addTask(new MyTask());
}
//-------------------
}
}
MyTask
=====================================
CODE:
package test.thread.pool1.impl;
import test.thread.pool1.ITask;
/**
* <p>Title: 工作任務(wù)接口 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MyTask implements ITask {
/**
* 執(zhí)行的任務(wù)
* @throws java.lang.Throwable
*/
public void executeTask() throws Throwable{
System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
}
}