CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。
若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。 

示例用法:下面是一個在并行分解設(shè)計中使用 barrier 的例子:  
class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;
   
   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await(); 
         } catch (InterruptedException ex) { 
return
         } catch (BrokenBarrierException ex) { 
return
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     barrier = new CyclicBarrier(N, 
                                 new Runnable() {
                                   public void run() { 
                                     mergeRows(); 
                                   }
                                 });
     for (int i = 0; i < N; ++i) 
       new Thread(new Worker(i)).start();

     waitUntilDone();
   }
 }

在這個例子中,每個 worker 線程處理矩陣的一行,在處理完所有的行之前,該線程將一直在屏障處等待。
 處理完所有的行之后,將執(zhí)行所提供的 Runnable 屏障操作,并合并這些行。
  如果合并者確定已經(jīng)找到了一個解決方案,那么 done() 將返回 true,所有的 worker 線程都將終止。 
如果屏障操作在執(zhí)行時不依賴于正掛起的線程,則線程組中的任何線程在獲得釋放時都能執(zhí)行該操作。
為方便此操作,每次調(diào)用 await() 都將返回能到達屏障處的線程的索引。然后,您可以選擇哪個線程應該執(zhí)行屏障操作,例如: 
  if (barrier.await() == 0) {
     // log the completion of this iteration
   }對于失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 

內(nèi)存一致性效果:線程中調(diào)用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。 

(1)await

public int await()   throws InterruptedException, BrokenBarrierException在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前,將一直等待。 
如果當前線程不是將到達的最后一個線程,出于調(diào)度目的,將禁用它,且在發(fā)生以下情況之一前,該線程將一直處于休眠狀態(tài): 
最后一個線程到達;或者 
其他某個線程中斷當前線程;或者 
其他某個線程中斷另一個等待線程;或者 
其他某個線程在等待 barrier 時超時;或者 
其他某個線程在此 barrier 上調(diào)用 reset()。 
如果當前線程: 

在進入此方法時已經(jīng)設(shè)置了該線程的中斷狀態(tài);或者 
在等待時被中斷 
則拋出 InterruptedException,并且清除當前線程的已中斷狀態(tài)。 
如果在線程處于等待狀態(tài)時 barrier 被 reset(),或者在調(diào)用 await 時 barrier 被損壞,抑或任意一個線程正處于等待狀態(tài),則拋出 BrokenBarrierException 異常。 

如果任何線程在等待時被 中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,并將 barrier 置于損壞狀態(tài)。 

如果當前線程是最后一個將要到達的線程,并且構(gòu)造方法中提供了一個非空的屏障操作,則在允許其他線程繼續(xù)運行之前,當前線程將運行該操作。
如果在執(zhí)行屏障操作過程中發(fā)生異常,則該異常將傳播到當前線程中,并將 barrier 置于損壞狀態(tài)。 

返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最后一個到達的線程 
拋出: 
InterruptedException - 如果當前線程在等待時被中斷 
BrokenBarrierException - 如果另一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調(diào)用 await 時 barrier 被損壞,抑或由于異常而導致屏障操作(如果存在)失敗。

(2)getNumberWaiting


public int getNumberWaiting()返回當前在屏障處等待的參與者數(shù)目。此方法主要用于調(diào)試和斷言。 

返回:
當前阻塞在 await() 中的參與者數(shù)目。

應用實例:

package com.itm.thread;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {
    public static void main(String[] args) {
        final CyclicBarrier cb = new CyclicBarrier(3); // 這個團隊中共有3個隊員,即需要3個線程
        ExecutorService es = Executors.newFixedThreadPool(3); // 在線程池中放入三個線程
        for (int i = 0; i < 3; i++) { // 開啟三個任務(wù)
            es.execute(new Runnable() {

                @Override
                public void run() {
                    for (int i = 0; i < 5; i++) {
                        try {
                            Thread.sleep(new Random().nextInt(5000));
                            System.out.print(Thread.currentThread().getName()
                                    + "已到達集合點" + (i + 1) + ",現(xiàn)在共有"
                                    + (cb.getNumberWaiting() + 1) + "個線程到達");
                            // 如果有2個線程已經(jīng)在等待,那么最后一個線程到達后就可以一起開始后面操作
                            if (cb.getNumberWaiting() + 1 == 3) {
                                System.out.println(",全部到齊,出發(fā)去下一個目標");
                            } else {
                                System.out.println(",正在等待");
                            }
                            cb.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        es.shutdown();
    }
}

運行結(jié)果:

pool-1-thread-3已到達集合點1,現(xiàn)在共有1個線程到達,正在等待
pool-1-thread-2已到達集合點1,現(xiàn)在共有2個線程到達,正在等待
pool-1-thread-1已到達集合點1,現(xiàn)在共有3個線程到達,全部到齊,出發(fā)去下一個目標
pool-1-thread-3已到達集合點2,現(xiàn)在共有1個線程到達,正在等待
pool-1-thread-1已到達集合點2,現(xiàn)在共有2個線程到達,正在等待
pool-1-thread-2已到達集合點2,現(xiàn)在共有3個線程到達,全部到齊,出發(fā)去下一個目標
pool-1-thread-1已到達集合點3,現(xiàn)在共有1個線程到達,正在等待
pool-1-thread-2已到達集合點3,現(xiàn)在共有2個線程到達,正在等待
pool-1-thread-3已到達集合點3,現(xiàn)在共有3個線程到達,全部到齊,出發(fā)去下一個目標
pool-1-thread-1已到達集合點4,現(xiàn)在共有1個線程到達,正在等待
pool-1-thread-2已到達集合點4,現(xiàn)在共有2個線程到達,正在等待
pool-1-thread-3已到達集合點4,現(xiàn)在共有3個線程到達,全部到齊,出發(fā)去下一個目標
pool-1-thread-2已到達集合點5,現(xiàn)在共有1個線程到達,正在等待
pool-1-thread-3已到達集合點5,現(xiàn)在共有2個線程到達,正在等待
pool-1-thread-1已到達集合點5,現(xiàn)在共有3個線程到達,全部到齊,出發(fā)去下一個目標