李濤,Sun中國工程研究院工程師
概述
1:三個新加的多線程包
2:Callable 和 Future接口
3:新的線程執行架構
4:Lockers和Condition接口
5: Synchronizer:同步裝置
6: BlockingQueue接口
7:Atomics 原子級變量
8:Concurrent Collections 共點聚集
概述
:
Java
自
1995
年面世以來得到了廣泛得一個運用,但是對多線程編程的支持
Java
很長時間一直停留在初級階段。在
Java 5.0
之前
Java
里的多線程編程主要是通過
Thread
類,
Runnable
接口,
Object
對象中的
wait()
、
notify()
、
notifyAll()
等方法和
synchronized
關鍵詞來實現的。這些工具雖然能在大多數情況下解決對共享資源的管理和線程間的調度,但存在以下幾個問題
1.?????
過于原始,拿來就能用的功能有限,即使是要實現簡單的多線程功能也需要編寫大量的代碼。這些工具就像匯編語言一樣難以學習和使用,比這更糟糕的是稍有不慎它們還可能被錯誤地使用,而且這樣的錯誤很難被發現。
2.?????
如果使用不當,會使程序的運行效率大大降低。
3.?????
為了提高開發效率,簡化編程,開發人員在做項目的時候往往需要寫一些共享的工具來實現一些普遍適用的功能。但因為沒有規范,相同的工具會被重復地開發,造成資源浪費。
4.?????
因為鎖定的功能是通過
Synchronized
來實現的,這是一種塊結構,只能對代碼中的一段代碼進行鎖定,而且鎖定是單一的。如以下代碼所示:
synchronized
(
lock
)
{
?
?? //
執行對共享資源的操作
??? ……
}
|
?
一些復雜的功能就很難被實現。比如說如果程序需要取得
lock A
和
lock B
來進行操作
1
,然后需要取得
lock C
并且釋放
lock A
來進行操作
2
,
Java 5.0
之前的多線程框架就顯得無能為力了。
因為這些問題,程序員對舊的框架一直頗有微詞。這種情況一直到
Java 5.0
才有較大的改觀,一系列的多線程工具包被納入了標準庫文件。這些工具包括了一個新的多線程程序的執行框架,使編程人員可方便地協調和調度線程的運行,并且新加入了一些高性能的常用的工具,使程序更容易編寫,運行效率更高。本文將分類并結合例子來介紹這些新加的多線程工具。
在我們開始介紹
Java 5.0
里的新
Concurrent
工具前讓我們先來看一下一個用舊的多線程工具編寫的程序,這個程序里有一個
Server
線程,它需要啟動兩個
Component
,
Server
線程需等到
Component
線程完畢后再繼續。相同的功能在
Synchronizer
一章里用新加的工具
CountDownLatch
有相同的實現。兩個程序,孰優孰劣,哪個程序更容易編寫,哪個程序更容易理解,相信大家看過之后不難得出結論。
public class ServerThread {
????? Object concLock = new Object();
????? int count = 2;
public void runTwoThreads() {
?????
//
啟動兩個線程去初始化組件
??????????? new Thread(new ComponentThread1(this)).start();
??????????? new Thread(new ComponentThread1(this)).start();
??????????? // Wait for other thread
while(count != 0) {
????????????????? synchronized(concLock) {
??????????????????????? try {
????????????????????????????? concLock.wait();
????????????????????????????? System.out.println("Wake up.");
??????????????????????? } catch (InterruptedException ie) { //
處理異常
}
????????????????? }
??????????? }
??????????? System.out.println("Server is up.");
????? }
????? public void callBack() {
synchronized(concLock) {
????????????????? count--;
????????????????? concLock.notifyAll();
??????????? }
????? }
????? public static void main(String[] args){
??????????? ServerThread server = new ServerThread();
??????????? server.runTwoThreads();
????? }
}
?
public class ComponentThread1 implements Runnable {
????? private ServerThread server;
????? public ComponentThread1(ServerThread server) {
??????????? this.server = server;
????? }
public void run() {
????? //
做組件初始化的工作
??????????? System.out.println("Do component initialization.");
??????????? server.callBack();
????? }
}
|
1:三個新加的多線程包
Java 5.0
里新加入了三個多線程包:
java.util.concurrent, java.util.concurrent.atomic, java.util.concurrent.locks.
-
java.util.concurrent
包含了常用的多線程工具,是新的多線程工具的主體。
-
java.util.concurrent.atomic
包含了不用加鎖情況下就能改變值的原子變量,比如說
AtomicInteger
提供了
addAndGet()
方法。
Add
和
Get
是兩個不同的操作,為了保證別的線程不干擾,以往的做法是先鎖定共享的變量,然后在鎖定的范圍內進行兩步操作。但用
AtomicInteger.addAndGet()
就不用擔心鎖定的事了,其內部實現保證了這兩步操作是在原子量級發生的,不會被別的線程干擾。
-
java.util.concurrent.locks
包包含鎖定的工具。
2:Callable 和 Future接口
Callable
是類似于
Runnable
的接口,實現
Callable
接口的類和實現
Runnable
的類都是可被其它線程執行的任務。
Callable
和
Runnable
有幾點不同:
-
Callable
規定的方法是
call()
,而
Runnable
規定的方法是
run().
-
Callable
的任務執行后可返回值,而
Runnable
的任務是不能返回值的。
-
call
()方法可拋出異常,而
run
()方法是不能拋出異常的。
-
運行
Callable
任務可拿到一個
Future
對象,通過
Future
對象可了解任務執行情況,可取消任務的執行,還可獲取任務執行的結果。
以下是
Callable
的一個例子:
public class DoCallStuff implements Callable<String>{ // *1
??????? private int aInt;
??????? public DoCallStuff(int aInt) {
??????????????? this.aInt = aInt;
??????? }
??????? public String call() throws Exception { //*2
??????????????? boolean resultOk = false;
??????????????? if(aInt == 0){
??????????????????????? resultOk = true;
??????????????? }? else if(aInt == 1){
??????????????????????? while(true){ //infinite loop
??????????????????
?????????????System.out.println("looping....");
??????????????????????????????? Thread.sleep(3000);
??????????????????????? }
??????????????? } else {
???????????????????????
throw new Exception("Callable terminated with Exception!"); //*3
??????????????? }
??????????????? if(resultOk){
??????????????????????? return "Task done.";
??????????????? } else {
??????????????????????? return "Task failed";
??????????????? }
??????? }
}
|
*1:
名為
DoCallStuff
類實現了
Callable<String>
,
String
將是
call
方法的返回值類型。例子中用了
String
,但可以是任何
Java
類。
*2: call
方法的返回值類型為
String
,這是和類的定義相對應的。并且可以拋出異常。
*3: call
方法可以拋出異常,如加重的斜體字所示。
以下是調用
DoCallStuff
的主程序。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Executor {
??????? public static void main(String[] args){
??????????????? //*1
??????????????? DoCallStuff call1 = new DoCallStuff(0);
??????????????? DoCallStuff call2 = new DoCallStuff(1);
??????????????? DoCallStuff call3 = new DoCallStuff(2);
??????????????? //*2
??????????????? ExecutorService es = Executors.newFixedThreadPool(3);
??????????????? //*3
??????????????? Future<String> future1 = es.submit(call1);
??????????????? Future<String> future2 = es.submit(call2);
????
???????????Future<String> future3 = es.submit(call3);
??????????????? try {
??????????????????????? //*4
??????????????????????? System.out.println(future1.get());
???????????????????????? //*5
??????????????????????? Thread.sleep(3000);
?????????????????
??????System.out.println("Thread 2 terminated? :" + future2.cancel(true));
??????????????????????? //*6
??????????????????????? System.out.println(future3.get());
??????????????? } catch (ExecutionException ex) {
??????????????????????? ex.printStackTrace();
??????????????? } catch (InterruptedException ex) {
??????????????????????? ex.printStackTrace();
??????????????? }
??????? }
}
|
*1:
定義了幾個任務
*2:
初始了任務執行工具。任務的執行框架將會在后面解釋。
*3:
執行任務,任務啟動時返回了一個
Future
對象,如果想得到任務執行的結果或者是異常可對這個
Future
對象進行操作。
Future
所含的值必須跟
Callable
所含的值對映,比如說例子中
Future<String>
對印
Callable<String>
*4:
任務
1
正常執行完畢,
future1.get()
會返回線程的值
*5:
任務
2
在進行一個死循環,調用
future2.cancel(true)
來中止此線程。傳入的參數標明是否可打斷線程,
true
表明可以打斷。
*6:
任務
3
拋出異常,調用
future3.get()
時會引起異常的拋出。
?
運行
Executor
會有以下運行結果:
looping....
Task done. //*1
looping....
looping....//*2
looping....
looping....
looping....
looping....
Thread 2 terminated? :true //*3
//*4
java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!
??????? at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)
??????? at java.util.concurrent.FutureTask.get(FutureTask.java:80)
??????? at concurrent.Executor.main(Executor.java:43)
??????? …….
|
*1:
任務
1
正常結束
*2:
任務
2
是個死循環,這是它的打印結果
*3:
指示任務
2
被取消
*4:
在執行
future3.get()
時得到任務
3
拋出的異常
3:新的任務執行架構
在
Java 5.0
之前啟動一個任務是通過調用
Thread
類的
start()
方法來實現的,任務的提于交和執行是同時進行的,如果你想對任務的執行進行調度或是控制同時執行的線程數量就需要額外編寫代碼來完成。
5.0
里提供了一個新的任務執行架構使你可以輕松地調度和控制任務的執行,并且可以建立一個類似數據庫連接池的線程池來執行任務。這個架構主要有三個接口和其相應的具體類組成。這三個接口是
Executor, ExecutorService
和
ScheduledExecutorService
,讓我們先用一個圖來顯示它們的關系:
?
圖的左側是接口,圖的右側是這些接口的具體類。注意
Executor
是沒有直接具體實現的。
Executor
接口:
是用來執行
Runnable
任務的,它只定義一個方法:
-
execute(Runnable command)
:執行
Ruannable
類型的任務
ExecutorService
接口:
ExecutorService
繼承了
Executor
的方法,并提供了執行
Callable
任務和中止任務執行的服務,其定義的方法主要有:
-
submit(task)
:可用來提交
Callable
或
Runnable
任務,并返回代表此任務的
Future
對象
-
invokeAll(collection of tasks)
:批處理任務集合,并返回一個代表這些任務的
Future
對象集合
-
shutdown()
:在完成已提交的任務后關閉服務,不再接受新任務
-
shutdownNow()
:停止所有正在執行的任務并關閉服務。
-
isTerminated()
:測試是否所有任務都執行完畢了。
-
isShutdown()
:測試是否該
ExecutorService
已被關閉
ScheduledExecutorService
接口
在
ExecutorService
的基礎上,
ScheduledExecutorService
提供了按時間安排執行任務的功能,它提供的方法主要有:
-
schedule(task, initDelay):
安排所提交的
Callable
或
Runnable
任務在
initDelay
指定的時間后執行。
-
scheduleAtFixedRate()
:安排所提交的
Runnable
任務按指定的間隔重復執行
-
scheduleWithFixedDelay()
:安排所提交的
Runnable
任務在每次執行完后,等待
delay
所指定的時間后重復執行。
代碼:
ScheduleExecutorService
的例子
public class ScheduledExecutorServiceTest {
??????? public static void main(String[] args)
?????????????? throws InterruptedException, ExecutionException{
?????????????? //*1
??????????????? ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
??????????????? //*2
??????????????? Runnable task1 = new Runnable() {
???????????????????? public void run() {
??????????????????????? System.out.println("Task repeating.");
???????????????????? }
??????????????? };
??????????????? //*3
??????????????? final ScheduledFuture future1 =
??????????????????????? service.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
??????????????? //*4
??????????????? ScheduledFuture<String> future2 = service.schedule(new Callable<String>(){
???????????????????? public String call(){
???????????????????????????? future1.cancel(true);
???????????????????????????? return "task cancelled!";
???????????????
?????}
??????????????? }, 5, TimeUnit.SECONDS);
??????????????? System.out.println(future2.get());
//*5
service.shutdown();
??????? }
}
|
這個例子有兩個任務,第一個任務每隔一秒打印一句“
Task repeating
”
,
第二個任務在
5
秒鐘后取消第一個任務。
*1:
初始化一個
ScheduledExecutorService
對象,這個對象的線程池大小為
2
。
*2:
用內函數的方式定義了一個
Runnable
任務。
*3:
調用所定義的
ScheduledExecutorService
對象來執行任務,任務每秒執行一次。能重復執行的任務一定是
Runnable
類型。注意我們可以用
TimeUnit
來制定時間單位,這也是
Java 5.0
里新的特征,
5.0
以前的記時單位是微秒,現在可精確到奈秒。
*4:
調用
ScheduledExecutorService
對象來執行第二個任務,第二個任務所作的就是在
5
秒鐘后取消第一個任務。
*5:
關閉服務。
Executors
類
雖然以上提到的接口有其實現的具體類,但為了方便
Java 5.0
建議使用
Executors
的工具類來得到
Executor
接口的具體對象,需要注意的是
Executors
是一個類,不是
Executor
的復數形式。
Executors
提供了以下一些
static
的方法:
-
callable(Runnable task):
將
Runnable
的任務轉化成
Callable
的任務
-
newSingleThreadExecutor:
產生一個
ExecutorService
對象,這個對象只有一個線程可用來執行任務,若任務多于一個,任務將按先后順序執行。
-
newCachedThreadPool():
產生一個
ExecutorService
對象,這個對象帶有一個線程池,線程池的大小會根據需要調整,線程執行完任務后返回線程池,供執行下一次任務使用。
-
newFixedThreadPool(int poolSize)
:產生一個
ExecutorService
對象,這個對象帶有一個大小為
poolSize
的線程池,若任務數量大于
poolSize
,任務會被放在一個
queue
里順序執行。
-
newSingleThreadScheduledExecutor
:產生一個
ScheduledExecutorService
對象,這個對象的線程池大小為
1
,若任務多于一個,任務將按先后順序執行。
-
newScheduledThreadPool(int poolSize):
產生一個
ScheduledExecutorService
對象,這個對象的線程池大小為
poolSize
,若任務數量大于
poolSize
,任務會在一個
queue
里等待執行
以下是得到和使用
ExecutorService
的例子:
代碼:如何調用
Executors
來獲得各種服務對象
//Single Threaded ExecutorService
???? ExecutorService singleThreadeService = Executors.newSingleThreadExecutor();
//Cached ExecutorService
???? ExecutorService cachedService = Executors.newCachedThreadPool();
//Fixed number of ExecutorService
???? ExecutorService fixedService = Executors.newFixedThreadPool(3);
//Single ScheduledExecutorService
???? ScheduledExecutorService singleScheduledService =
????????? Executors.newSingleThreadScheduledExecutor();
//Fixed number of ScheduledExecutorService
ScheduledExecutorService fixedScheduledService =
???? Executors.newScheduledThreadPool(3);
|
4:Lockers和Condition接口
在多線程編程里面一個重要的概念是鎖定,如果一個資源是多個線程共享的,為了保證數據的完整性,在進行事務性操作時需要將共享資源鎖定,這樣可以保證在做事務性操作時只有一個線程能對資源進行操作,從而保證數據的完整性。在
5.0
以前,鎖定的功能是由
Synchronized
關鍵字來實現的,這樣做存在幾個問題:
-
每次只能對一個對象進行鎖定。若需要鎖定多個對象,編程就比較麻煩,一不小心就會出現死鎖現象。
-
如果線程因拿不到鎖定而進入等待狀況,是沒有辦法將其打斷的
在
Java 5.0
里出現兩種鎖的工具可供使用,下圖是這兩個工具的接口及其實現:
Lock
接口
ReentrantLock
是
Lock
的具體類,
Lock
提供了以下一些方法:
-
lock():
請求鎖定,如果鎖已被別的線程鎖定,調用此方法的線程被阻斷進入等待狀態。
-
tryLock()
:如果鎖沒被別的線程鎖定,進入鎖定狀態,并返回
true
。若鎖已被鎖定,返回
false
,不進入等待狀態。此方法還可帶時間參數,如果鎖在方法執行時已被鎖定,線程將繼續等待規定的時間,若還不行才返回
false
。
-
unlock()
:取消鎖定,需要注意的是
Lock
不會自動取消,編程時必須手動解鎖。
代碼:
//
生成一個鎖
Lock lock = new ReentrantLock();
public void accessProtectedResource() {
?
lock.lock(); //
取得鎖定
? try {
??? //
對共享資源進行操作
? } finally {
??? //
一定記著把鎖取消掉,鎖本身是不會自動解鎖的
???
lock.unlock()
;
? }
}
|
ReadWriteLock
接口
為了提高效率有些共享資源允許同時進行多個讀的操作,但只允許一個寫的操作,比如一個文件,只要其內容不變可以讓多個線程同時讀,不必做排他的鎖定,排他的鎖定只有在寫的時候需要,以保證別的線程不會看到數據不完整的文件。
ReadWriteLock
可滿足這種需要。
ReadWriteLock
內置兩個
Lock
,一個是讀的
Lock
,一個是寫的
Lock
。多個線程可同時得到讀的
Lock
,但只有一個線程能得到寫的
Lock
,而且寫的
Lock
被鎖定后,任何線程都不能得到
Lock
。
ReadWriteLock
提供的方法有:
-
readLock():
返回一個讀的
lock
-
writeLock():
返回一個寫的
lock,
此
lock
是排他的。
ReadWriteLock
的例子:
public class FileOperator{
????? //
初始化一個
ReadWriteLock
?????
ReadWriteLock lock = new ReentrantReadWriteLock();
public String read() {
????? //
得到
readLock
并鎖定
??????????? Lock readLock = lock.readLock();
??????
?????readLock.lock();
??????????? try {
????????????????? //
做讀的工作
????????????????? return "Read something";
??????????? } finally {
???????????????? readLock.unlock();
??????????? }
????? }
?????
public void write(String content) {
????? //
得到
writeLock
并鎖定
??????????? Lock writeLock = lock.writeLock();
??????????? writeLock.lock();
??????????? try {
????????????????? //
做讀的工作
??????????? } finally {
????????????????
writeLock.unlock();
??????????? }
????? }
}
|
?
需要注意的是
ReadWriteLock
提供了一個高效的鎖定機理,但最終程序的運行效率是和程序的設計息息相關的,比如說如果讀的線程和寫的線程同時在等待,要考慮是先發放讀的
lock
還是先發放寫的
lock
。如果寫發生的頻率不高,而且快,可以考慮先給寫的
lock
。還要考慮的問題是如果一個寫正在等待讀完成,此時一個新的讀進來,是否要給這個新的讀發鎖,如果發了,可能導致寫的線程等很久。等等此類問題在編程時都要給予充分的考慮。
Condition
接口:
有時候線程取得
lock
后需要在一定條件下才能做某些工作,比如說經典的
Producer
和
Consumer
問題,
Consumer
必須在籃子里有蘋果的時候才能吃蘋果,否則它必須暫時放棄對籃子的鎖定,等到
Producer
往籃子里放了蘋果后再去拿來吃。而
Producer
必須等到籃子空了才能往里放蘋果,否則它也需要暫時解鎖等
Consumer
把蘋果吃了才能往籃子里放蘋果。在
Java 5.0
以前,這種功能是由
Object
類的
wait(), notify()
和
notifyAll()
等方法實現的,在
5.0
里面,這些功能集中到了
Condition
這個接口來實現,
Condition
提供以下方法:
-
await()
:使調用此方法的線程放棄鎖定,進入睡眠直到被打斷或被喚醒。
-
signal():
喚醒一個等待的線程
-
signalAll()
:喚醒所有等待的線程
Condition
的例子:
public class Basket {?????
Lock lock = new ReentrantLock();
//
產生
Condition
對象
???? Condition produced = lock.newCondition();
???? Condition consumed = lock.newCondition();
???? boolean available = false;
??
??
???? public void produce() throws InterruptedException {
?????????? lock.lock();
?????????? try {
???????????????? if(available){
??????????????????? consumed.await(); //
放棄
lock
進入睡眠
?
???????????????? }
???????????????? /*
生產蘋果
*/
???????????????? System.out.println("Apple produced.");
???????????????? available = true;
????????????????
produced.signal(); //
發信號喚醒等待這個
Condition
的線程
?????????? } finally {
???????????????? lock.unlock();
?????????? }
???? }
????
???? public void consume() throws InterruptedException {
?????????? lock.lock();
?????????? try {
???????????????? if(!available){
??????????????????????
produced.await();//
放棄
lock
進入睡眠
?
???????????????? }
???????????????? /*
吃蘋果
*/
???????????????? System.out.println("Apple consumed.");
???????????????? available = false;
????????????????
consumed.signal();//
發信號喚醒等待這個
Condition
的線程
?????????? } finally {
???????????????? lock.unlock();
?????????? }
???? }?????
}
|
ConditionTester:
public class ConditionTester {
?????
????? public static void main(String[] args) throws InterruptedException{
final Basket basket = new Basket();
//
定義一個
producer
??????????? Runnable producer = new Runnable() {
????????????????? public void run() {
??????????????????????? try {
????????????????????????????? basket.produce();
??????
?????????????????} catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
};
//
定義一個
consumer
??????????? Runnable consumer = new Runnable() {
????????????????? public void run() {
??????????????????????? try {
????????????????????????????? basket.consume();
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
};
//
各產生
10
個
consumer
和
producer
??????????? ExecutorService service = Executors.newCachedThreadPool();
??????????? for(int i=0; i < 10; i++)
????????????????? service.submit(consumer);
??????????? Thread.sleep(2000);
??????????? for(int i=0; i<10; i++)
????????????????? service.submit(producer);
??????????? service.shutdown();
????? }?????
}
|
5: Synchronizer:同步裝置
Java 5.0
里新加了
4
個協調線程間進程的同步裝置,它們分別是
Semaphore, CountDownLatch, CyclicBarrier
和
Exchanger.
Semaphore:
用來管理一個資源池的工具,
Semaphore
可以看成是個通行證,線程要想從資源池拿到資源必須先拿到通行證,
Semaphore
提供的通行證數量和資源池的大小一致。如果線程暫時拿不到通行證,線程就會被阻斷進入等待狀態。以下是一個例子:
public class Pool {
???
??ArrayList<String> pool = null;
????? Semaphore pass = null;
????? public Pool(int size){
???????????
//
初始化資源池
??????????? pool = new ArrayList<String>();
??????????? for(int i=0; i<size; i++){
????????????????? pool.add("Resource "+i);
??????????? }
???
????????
//Semaphore
的大小和資源池的大小一致
??????????? pass = new Semaphore(size);
????? }
????? public String get() throws InterruptedException{
??????????? //
獲取通行證
,
只有得到通行證后才能得到資源
??????????? pass.acquire();
??????????? return getResource();
????? }
????? public void put(String resource){
??????????? //
歸還通行證,并歸還資源
??????????? pass.release();
??????????? releaseResource(resource);
????? }
???? private synchronized String getResource() {
??????????? String result = pool.get(0);
??????????? pool.remove(0);
??????????? System.out.println("Give out "+result);
??????????? return result;
????? }
????? private synchronized void releaseResource(String resource) {
??????????? System.out.println("return "+resource);
??????????? pool.add(resource);
????? }
}
|
SemaphoreTest:
public class SemaphoreTest {
????? public static void main(String[] args){
??????????? final Pool aPool = new Pool(2);
??????????? Runnable worker = new Runnable() {
????????????????? public void run() {
??????????????????????? String resource = null;
???????
????????????????try {
????????????????????????????? //
取得
resource
????????????????????????????? resource = aPool.get();
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
??????????????????????? //
用
resource
做工作
??????????????????????? System.out.println("I worked on "+resource);
??????????????????????? //
歸還
resource
??????????????????????? aPool.put(resource);
????????????????? }
??????????? };
??????????? ExecutorService service = Executors.newCachedThreadPool();
??????????? for(int i=0; i<20; i++){
????????????????? service.submit(worker);
??????????? }
??????????? service.shutdown();
????? }????
}
|
CountDownLatch:
CountDownLatch
是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。比如說一個
Server
啟動時需要初始化
4
個部件,
Server
可以同時啟動
4
個線程去初始化這
4
個部件,然后調用
CountDownLatch(4).await()
阻斷進入等待,每個線程完成任務后會調用一次
CountDownLatch.countDown()
來倒計數
,
當
4
個線程都結束時
CountDownLatch
的計數就會降低為
0
,此時
Server
就會被喚醒繼續下一步操作。
CountDownLatch
的方法主要有:
-
await()
:使調用此方法的線程阻斷進入等待
-
countDown():
倒計數,將計數值減
1
-
getCount():
得到當前的計數值
CountDownLatch
的例子:一個
server
調了三個
ComponentThread
分別去啟動三個組件,然后
server
等到組件都啟動了再繼續。
public class Server {
????? public static void main(String[] args) throws InterruptedException{
??????????? System.out.println("Server is starting.");
??????????
?//
初始化一個初始值為
3
的
CountDownLatch
??????????? CountDownLatch latch = new CountDownLatch(3);
??????????? //
起
3
個線程分別去啟動
3
個組件
??????????? ExecutorService service = Executors.newCachedThreadPool();
??????????? service.submit(new ComponentThread(latch, 1));
?????????
??service.submit(new ComponentThread(latch, 2));
??????????? service.submit(new ComponentThread(latch, 3));
??????????? service.shutdown();
??????????? //
進入等待狀態
??????????? latch.await();
??????????? //
當所需的三個組件都完成時,
Server
就可繼續了
??????????? System.out.println("Server is up!");
????? }
}
?
public class ComponentThread implements Runnable{
????? CountDownLatch latch;
????? int ID;
????? /** Creates a new instance of ComponentThread */
????? public ComponentThread(CountDownLatch latch, int ID) {
??????????? this.latch = latch;
??????????? this.ID = ID;
????? }
????? public void run() {
??????????? System.out.println("Component "+ID + " initialized!");
??????????? //
將計數減一
??????????? latch.countDown();
????? }????
}
|
運行結果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
|
CyclicBarrier:
CyclicBarrier
類似于
CountDownLatch
也是個計數器,不同的是
CyclicBarrier
數的是調用了
CyclicBarrier.await()
進入等待的線程數,當線程數達到了
CyclicBarrier
初始時規定的數目時,所有進入等待狀態的線程被喚醒并繼續。
CyclicBarrier
就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。
CyclicBarrier
初始時還可帶一個
Runnable
的參數,此
Runnable
任務在
CyclicBarrier
的數目達到后,所有其它線程被喚醒前被執行。
CyclicBarrier
提供以下幾個方法:
-
await()
:進入等待
-
getParties()
:返回此
barrier
需要的線程數
-
reset()
:將此
barrier
重置
以下是使用
CyclicBarrier
的一個例子:兩個線程分別在一個數組里放一個數,當這兩個線程都結束后,主線程算出數組里的數的和(這個例子比較無聊,我沒有想到更合適的例子)
public class MainThread {
public static void main(String[] args)
????? throws InterruptedException, BrokenBarrierException, TimeoutException{
??????????? final int[] array = new int[2];
??????????? CyclicBarrier barrier = new CyclicBarrier(2,
????????????????? new Runnable() {//
在所有線程都到達
Barrier
時執行
????????????????? public void run() {
??????????????????????? System.out.println("Total is:"+(array[0]+array[1]));
????????????????? }
??????????? });???????????
??????????? //
啟動線程
??????????? new Thread(new ComponentThread(barrier, array, 0)).start();
??????????? new Thread(new ComponentThread(barrier, array, 1)).start();???
????? }?????
}
?
public class ComponentThread implements Runnable{
????? CyclicBarrier barrier;
????? int ID;
????? int[] array;
????? public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
??????????? this.barrier = barrier;
??????????? this.ID = ID;
??????????? this.array = array;
????? }
????? public void run() {
??????????? try {
????????????????? array[ID] = new Random().nextInt();
????????????????? System.out.println(ID+ " generates:"+array[ID]);
????????????????? //
該線程完成了任務等在
Barrier
處
????????????????? barrier.await();
??????????? } catch (BrokenBarrierException ex) {
????????????????? ex.printStackTrace();
??????????? } catch (InterruptedException ex) {
????????????????? ex.printStackTrace();
??????????? }
????? }
}
|
Exchanger:
顧名思義
Exchanger
讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過
Exchanger
雙方互換杯子,服務生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復始。
class FillAndEmpty {
????? //
初始化一個
Exchanger
,并規定可交換的信息類型是
DataCup
????? Exchanger<Cup> exchanger = new Exchanger();
????
?Cup initialEmptyCup = ...; //
初始化一個空的杯子
????? Cup initialFullCup = ...; //
初始化一個裝滿水的杯子
????? //
服務生線程
????? class Waiter implements Runnable {
??????????? public void run() {
????????????????? Cup currentCup = initialEmptyCup;
????????????????? try {
??????
?????????????????//
往空的杯子里加水
??????????????????????? currentCup.addWater();
??????????????????????? //
杯子滿后和顧客的空杯子交換
??????????????????????? currentCup = exchanger.exchange(currentCup);
????????????????? } catch (InterruptedException ex) { ... handle ... }
?
?????????? }
????? }
????? //
顧客線程
????? class Customer implements Runnable {
??????????? public void run() {
????????????????? DataCup currentCup = initialFullCup;
????????????????? try {
??????????????????????? //
把杯子里的水喝掉
??????????????????????? currentCup.drinkFromCup();
??????????????????????? //
將空杯子和服務生的滿杯子交換
??????????????????????? currentCup = exchanger.exchange(currentCup);
????????????????? } catch (InterruptedException ex) { ... handle ...}
??????????? }
????? }
?????
????? void start() {
???????
????new Thread(new Waiter()).start();
??????????? new Thread(new Customer()).start();
????? }
}
|
6: BlockingQueue接口
BlockingQueue
是一種特殊的
Queue
,若
BlockingQueue
是空的,從
BlockingQueue
取東西的操作將會被阻斷進入等待狀態直到
BlocingkQueue
進了新貨才會被喚醒。同樣,如果
BlockingQueue
是滿的任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到
BlockingQueue
里有新的空間才會被喚醒繼續操作。
BlockingQueue
提供的方法主要有:
-
add(anObject):
把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容納返回
true
,否則拋出
IllegalStateException
異常。
-
offer(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容納返回
true
,否則返回
false
。
-
put(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
沒有空間,調用此方法的線程被阻斷直到
BlockingQueue
里有新的空間再繼續。
-
poll(time)
:取出
BlockingQueue
里排在首位的對象,若不能立即取出可等
time
參數規定的時間。取不到時返回
null
。
-
take()
:取出
BlockingQueue
里排在首位的對象,若
BlockingQueue
為空,阻斷進入等待狀態直到
BlockingQueue
有新的對象被加入為止。
根據不同的需要
BlockingQueue
有
4
種具體實現:
-
ArrayBlockingQueue
:規定大小的
BlockingQueue
,其構造函數必須帶一個
int
參數來指明其大小。其所含的對象是以
FIFO
(先入先出)順序排序的。
-
LinkedBlockingQueue
:大小不定的
BlockingQueue
,若其構造函數帶一個規定大小的參數,生成的
BlockingQueue
有大小限制,若不帶大小參數,所生成的
BlockingQueue
的大小由
Integer.MAX_VALUE
來決定。其所含的對象是以
FIFO
(先入先出)順序排序的。
LinkedBlockingQueue
和
ArrayBlockingQueue
比較起來,它們背后所用的數據結構不一樣,導致
LinkedBlockingQueue
的數據吞吐量要大于
ArrayBlockingQueue
,但在線程數量很大時其性能的可預見性低于
ArrayBlockingQueue
。
-
PriorityBlockingQueue
:類似于
LinkedBlockingQueue
,但其所含對象的排序不是
FIFO
,而是依據對象的自然排序順序或者是構造函數所帶的
Comparator
決定的順序。
-
SynchronousQueue
:特殊的
BlockingQueue
,對其的操作必須是放和取交替完成的。
下面是用
BlockingQueue
來實現
Producer
和
Consumer
的例子:
public class BlockingQueueTest {
????? static BlockingQueue<String> basket;
????? public BlockingQueueTest() {
??????????? //
定義了一個大小為
2
的
BlockingQueue
,也可根據需要用其他的具體類
??????????? basket = new ArrayBlockingQueue<String>(2);
????? }
????? class Producor implements Runnable {
??????????? public void run() {
????????????????? while(true){
??????????????????????? try {
????????????????????????????? //
放入一個對象,若
basket
滿了,等到
basket
有位置
??????????????????????
???????basket.put("An apple");
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
??????????? }
????? }
????? class Consumer implements Runnable {
?
??????????public void run() {
????????????????? while(true){
??????????????????????? try {
????????????????????????????? //
取出一個對象,若
basket
為空,等到
basket
有東西為止
????????????????????????????? String result = basket.take();
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
?????
??????}???????????
????? }
????? public void execute(){
??????????? for(int i=0; i<10; i++){
????????????????? new Thread(new Producor()).start();
????????????????? new Thread(new Consumer()).start();
??????????? }???????????
????? }
????? public static void main(String[] args){
??????????? BlockingQueueTest test = new BlockingQueueTest();
??????????? test.execute();
????? }?????
}
|
7:Atomics 原子級變量
原子量級的變量,主要的類有
AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……
。這些原子量級的變量主要提供兩個方法:
-
compareAndSet(expectedValue, newValue):
比較當前的值是否等于
expectedValue
,
若等于把當前值改成
newValue
,并返回
true
。若不等,返回
false
。
-
getAndSet(newValue):
把當前值改為
newValue
,并返回改變前的值。
這些原子級變量利用了現代處理器(
CPU
)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。
8:Concurrent Collections 共點聚集
在
Java
的聚集框架里可以調用
Collections.synchronizeCollection(aCollection)
將普通聚集改變成同步聚集,使之可用于多線程的環境下。
但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。
Java 5.0
里提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。
Java 5.0
目前提供的共點聚集類有:
ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList
和
CopyOnWriteArraySet.
?