增加對(duì)結(jié)果的處理:
1、修改Job,實(shí)現(xiàn)Callable接口
- public abstract class Job implements Callable<Object> {
-
- @Override
- public Object call() throws Exception {
- Object result = this.execute();//執(zhí)行子類具體任務(wù)
- synchronized (Executer.LOCK) {
- //處理完業(yè)務(wù)后,任務(wù)結(jié)束,遞減線程數(shù),同時(shí)喚醒主線程
- Executer.THREAD_COUNT--;
- Executer.LOCK.notifyAll();
- }
- return result;
- }
- /**
- * 業(yè)務(wù)處理函數(shù)
- */
- public abstract Object execute();
-
- }
2、修改Executer,增加對(duì)結(jié)果的處理
- public class Executer {
- //計(jì)算已經(jīng)派發(fā)的任務(wù)數(shù)(條件謂詞)
- public static int THREAD_COUNT = 0;
- //存儲(chǔ)任務(wù)的執(zhí)行結(jié)果
- private List<Future<Object>> futres = new ArrayList<Future<Object>>();
- //條件隊(duì)列鎖
- public static final Object LOCK = new Object();
- //線程池
- private ExecutorService pool = null;
- public Executer() {
- this(1);
- }
- public Executer(int threadPoolSize) {
- pool = Executors.newFixedThreadPool(threadPoolSize);
- }
- /**
- * 任務(wù)派發(fā)
- * @param job
- */
- public void fork(Job job){
- //將任務(wù)派發(fā)給線程池去執(zhí)行
- futres.add(pool.submit(job));
- //增加線程數(shù)
- synchronized (LOCK) {
- THREAD_COUNT++;
- }
- }
- /**
- * 統(tǒng)計(jì)任務(wù)結(jié)果
- */
- public List<Object> join(){
- synchronized (LOCK) {
- while(THREAD_COUNT > 0){//檢查線程數(shù),如果為0,則表示所有任務(wù)處理完成
- System.out.println("threadCount: "+THREAD_COUNT);
- try {
- LOCK.wait();//如果任務(wù)沒有全部完成,則掛起。等待完成的任務(wù)給予通知
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- List<Object> list = new ArrayList<Object>();
- //取出每個(gè)任務(wù)的處理結(jié)果,匯總后返回
- for (Future<Object> future : futres) {
- try {
- Object result = future.get();//因?yàn)槿蝿?wù)都已經(jīng)完成,這里直接get
- list.add(result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- return list;
- }
- }
3、測(cè)試:
- public static void main(String[] args) {
- //初始化任務(wù)池
- Executer exe = new Executer(5);
- //初始化任務(wù)
- long time = System.currentTimeMillis();
- for (int i = 0; i < 10; i++) {
- MyJob job = new MyJob();
- exe.fork(job);//派發(fā)任務(wù)
- }
- //匯總?cè)蝿?wù)結(jié)果
- List<Object> list = exe.join();
- System.out.println("Result: "+list);
- System.out.println("time: "+(System.currentTimeMillis() - time));
- }
4、執(zhí)行結(jié)果:
- threadCount: 10
- running thread id = 9
- running thread id = 11
- running thread id = 8
- running thread id = 10
- running thread id = 12
- threadCount: 5
- running thread id = 9
- running thread id = 8
- running thread id = 11
- running thread id = 12
- running thread id = 10
- Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10]
- time: 2000
5、附件是完整代碼
posted on 2012-07-15 01:21
mixer-a 閱讀(1126)
評(píng)論(0) 編輯 收藏