你所不知道的五件事情--java.util.concurrent(第二部分)
這是Ted Neward在IBM developerWorks中5 things系列文章中的一篇,仍然講述了關(guān)于Java并發(fā)集合API的一些應(yīng)用竅門,值得大家學(xué)習(xí)。(2010.06.17最后更新)
摘要:除了便于編寫并發(fā)應(yīng)用的集合API外,java.util.concurrent還引入了其它的預(yù)置程序組件,這些組件能輔助你在多線程應(yīng)用中控制和執(zhí)行線程。Ted Neward再介紹了五個(gè)來(lái)自于java.util.concurrent的Java編程必備竅門。
通過(guò)提供線程安全,性能良好的數(shù)據(jù)結(jié)構(gòu),并發(fā)集合框架使并發(fā)編程變得更容易。然而在有些情況下,開(kāi)發(fā)者需要多走一步,并要考慮控制和/或調(diào)節(jié)線程的執(zhí)行。提供java.util.concurrent包的全部原因就是為了簡(jiǎn)化多線程編程--事實(shí)上正是如此。
接著第一部分,本文介紹了多個(gè)同步數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)比核心語(yǔ)言基本結(jié)構(gòu)(監(jiān)視器)的層次要高,但不會(huì)高到將它們?nèi)︵笤谝粋€(gè)集合類中。一旦知道了這些鎖與栓的用途,就能徑直去用了。
1. 信號(hào)量
在有些企業(yè)級(jí)系統(tǒng)中,常需要開(kāi)發(fā)者去控制針對(duì)特定資源的請(qǐng)求(線程或動(dòng)作)的數(shù)量。雖然完全可能試著手工編寫這樣的調(diào)節(jié)程序,但使用Semaphore類會(huì)更容易些,該類會(huì)為你處理對(duì)線程的控制,如清單1所示:
清單1. 使用信號(hào)量調(diào)節(jié)線程
import java.util.*;import java.util.concurrent.*;
public class SemApp
{
public static void main(String[] args)
{
Runnable limitedCall = new Runnable() {
final Random rand = new Random();
final Semaphore available = new Semaphore(3);
int count = 0;
public void run()
{
int time = rand.nextInt(15);
int num = count++;
try
{
available.acquire();
System.out.println("Executing " +
"long-running action for " +
time + " seconds
#" + num);
Thread.sleep(time * 1000);
System.out.println("Done with #" +
num + "!");
available.release();
}
catch (InterruptedException intEx)
{
intEx.printStackTrace();
}
}
};
for (int i=0; i<10; i++)
new Thread(limitedCall).start();
}
}
雖然上例有10個(gè)線程在運(yùn)行(針對(duì)運(yùn)行SemApp的Java進(jìn)程執(zhí)行jstack程序可以驗(yàn)證這一點(diǎn)),但只有3個(gè)是活動(dòng)的。另外7個(gè)線程會(huì)被保存起來(lái),直到其中一個(gè)信號(hào)量計(jì)數(shù)器被釋放出來(lái)。(準(zhǔn)確地說(shuō),Semaphore類支持一次獲取和釋放一個(gè)以上的被許可線程,但在此處的場(chǎng)景中這么做沒(méi)有意義。)
2. CountDownLatch
如果并發(fā)類Semaphore是被設(shè)計(jì)為在同一時(shí)刻允許"其中"一個(gè)線程執(zhí)行的話,那么CountDownLatch就是賽馬比賽中的起跑門。該類持有所有的線程,當(dāng)遇到某個(gè)特定條件,那時(shí)CountDownLatch就會(huì)一次性釋放全部的線程。
清單2. CountDownLatch:讓我們比賽!
import java.util.*;
import java.util.concurrent.*;
class Race
{
private Random rand = new Random();
private int distance = rand.nextInt(250);
private CountDownLatch start;
private CountDownLatch finish;
private List<String> horses = new ArrayList<String>();
public Race(String
names)
{
this.horses.addAll(Arrays.asList(names));
}
public void run()
throws InterruptedException
{
System.out.println("And the horses are stepping up to the gate
");
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch finish = new CountDownLatch(horses.size());
final List<String> places =
Collections.synchronizedList(new ArrayList<String>());
for (final String h : horses)
{
new Thread(new Runnable() {
public void run() {
try
{
System.out.println(h +
" stepping up to the gate
");
start.await();
int traveled = 0;
while (traveled < distance)
{
// In a 0-2 second period of time
.
Thread.sleep(rand.nextInt(3) * 1000);
//
a horse travels 0-14 lengths
traveled += rand.nextInt(15);
System.out.println(h +
" advanced to " + traveled + "!");
}
finish.countDown();
System.out.println(h +
" crossed the finish!");
places.add(h);
}
catch (InterruptedException intEx)
{
System.out.println("ABORTING RACE!!!");
intEx.printStackTrace();
}
}
}).start();
}
System.out.println("And
they're off!");
start.countDown();
finish.await();
System.out.println("And we have our winners!");
System.out.println(places.get(0) + " took the gold
");
System.out.println(places.get(1) + " got the silver
");
System.out.println("and " + places.get(2) + " took home the bronze.");
}
}
public class CDLApp
{
public static void main(String[] args)
throws InterruptedException, java.io.IOException
{
System.out.println("Prepping
");
Race r = new Race(
"Beverly Takes a Bath",
"RockerHorse",
"Phineas",
"Ferb",
"Tin Cup",
"I'm Faster Than a Monkey",
"Glue Factory Reject"
);
System.out.println("It's a race of " + r.getDistance() + " lengths");
System.out.println("Press Enter to run the race
.");
System.in.read();
r.run();
}
}
注意在清單2中,CountDownLatch服務(wù)于兩個(gè)目的:首先,它同時(shí)釋放所有的線程,模擬比賽的開(kāi)始;但之后,另一個(gè)CountDownLatch模擬了比賽的結(jié)束。一場(chǎng)比賽會(huì)有更多的評(píng)論,你可以在比賽的"轉(zhuǎn)彎"和"半程"點(diǎn)添加CountDownLatch,當(dāng)馬匹跑過(guò)1/4程,半程和3/4程時(shí)。
3. Executor
清單1和清單2中的例子都遭遇了一個(gè)令人非常沮喪的錯(cuò)誤,你被迫要直接地創(chuàng)建Thread對(duì)象。這是一個(gè)造成麻煩的方式,因?yàn)樵谟行㎎VM中,創(chuàng)建Thread對(duì)象是一件重量級(jí)的工作,所以重用而非創(chuàng)建新的線程要好得多。然而在另一些JVM中,情況就恰恰相反:Thread是非常輕量級(jí)的,若你需要一個(gè)線程,直接創(chuàng)建它則會(huì)好得多。當(dāng)然,如果Murphy有他自己的方法(他經(jīng)常就是這么做的),無(wú)論你使用哪種方法,對(duì)于你最終所依賴的某種Java平臺(tái)都會(huì)是錯(cuò)誤的。
JSR-166專家組在一定程度上預(yù)見(jiàn)到了這種情況。與讓Java開(kāi)發(fā)者直接創(chuàng)建Thread實(shí)例不同,他們推薦Executor接口,這是一個(gè)創(chuàng)建新線程的抽象。如果清單3所示,Executor允許你自己不必使用new操作符去創(chuàng)建Thread對(duì)象:
清單3. Executor
Executor exec = getAnExecutorFromSomeplace();
exec.execute(new Runnable() {
});
使用Excutor的主要缺點(diǎn)與我們使用所有對(duì)象工廠所遇到的缺點(diǎn)一樣:工廠必須來(lái)源于某處。不幸地是,不同于CLR,JVM并不帶有一個(gè)標(biāo)準(zhǔn)的VM范圍內(nèi)的線程池。
Executor類只是作為獲取Executor實(shí)現(xiàn)實(shí)例的常用地方,但它只有new方法(例如,為了創(chuàng)建新的線程池);它沒(méi)有預(yù)創(chuàng)建的實(shí)例。所以,如果你想創(chuàng)建并使用一個(gè)能貫穿于整個(gè)程序的Executor實(shí)現(xiàn),你就可以創(chuàng)建一個(gè)你自己的Executor實(shí)例。(或者,在有些情況下,你可以使用你所選容器/平臺(tái)所提供的Executor實(shí)例。)
ExecutorService,為你服務(wù)
ExecutorService的用處在于使你不必關(guān)心Thread來(lái)自于何處,Executor接口缺乏Java開(kāi)發(fā)者可能期望的一些功能,比如啟動(dòng)一個(gè)線程,該線程用于產(chǎn)生結(jié)果,它會(huì)以非阻塞方式一直等待,直到結(jié)果出現(xiàn)為止。(在桌面應(yīng)用中這是很普通的需求,在這種應(yīng)用中用戶會(huì)執(zhí)行一個(gè)需要訪問(wèn)數(shù)據(jù)庫(kù)的UI操作,如果它耗時(shí)太長(zhǎng)的話,就可能想要在它完成之前就取消這一操作。)
為此,JSR-166的專家們創(chuàng)造一個(gè)更為有用的抽象,ExecutorService接口,該接口將啟動(dòng)線程的工廠模型化為一個(gè)服務(wù),這樣就能對(duì)該服務(wù)進(jìn)行集合化控制了。例如,不對(duì)每個(gè)任務(wù)調(diào)用一次execute()方法,ExecutorService能創(chuàng)建一個(gè)任務(wù)的集合,并可返回代表這些任務(wù)未來(lái)結(jié)果的Future集合。
4. ScheduledExecutorServices
與ExecutorService接口同樣優(yōu)秀,特定的任務(wù)需要以計(jì)劃的形式進(jìn)行執(zhí)行,例如在特定的時(shí)間間隔或在特定的時(shí)刻執(zhí)行給定的任務(wù)。這就是繼承自ExecutorService的ScheduledExecutorService的職責(zé)范疇。
如果你的目的是創(chuàng)建一個(gè)"心跳"命令,該命令每5秒鐘就去"ping"一次。ScheduledExecutorService會(huì)幫你做到這一點(diǎn),正如你在清單4中所見(jiàn)的那般簡(jiǎn)單:
清單4. ScheduledExecutorService按計(jì)劃去"Ping"
import java.util.concurrent.*;
public class Ping
{
public static void main(String[] args)
{
ScheduledExecutorService ses =
Executors.newScheduledThreadPool(1);
Runnable pinger = new Runnable() {
public void run() {
System.out.println("PING!");
}
};
ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);
}
}
怎么樣?沒(méi)有操作線程的煩惱,如果用戶想取消心跳,也不必操心如何去做,前臺(tái)或后臺(tái)都沒(méi)有顯示的標(biāo)記線程;所有的調(diào)度細(xì)節(jié)都留給了ScheduledExecutorService。
順便提一下,如果用戶想要取消心跳,從scheduleAtFixedRate()方法返回的會(huì)是一個(gè)ScheduledFuture實(shí)例,它不僅含有執(zhí)行結(jié)果(如果有的話),也有一個(gè)cancel()方法去停止該計(jì)劃任務(wù)。
5. 超時(shí)方法
擁有為阻塞操作置一個(gè)確定的超時(shí)控制的能力(這樣就可以避免死鎖)是java.util.concurrent類庫(kù)相比于舊有并發(fā)API,如針對(duì)鎖的監(jiān)視器,的最大優(yōu)點(diǎn)之一。
這些方法幾乎總是按int/TimeUnit對(duì)的方式進(jìn)行重載,該int/TimeUnit對(duì)用于指示方法在跳出執(zhí)行并將控制返回給平臺(tái)之前需要等待多長(zhǎng)時(shí)間。這要求開(kāi)發(fā)者對(duì)此做更多的工作--如果沒(méi)有獲得鎖,將如何進(jìn)行恢復(fù)?--但結(jié)果卻幾乎總是正確的:更少的死鎖,以及更加生產(chǎn)安全的代碼。(更多關(guān)于生產(chǎn)就緒的代碼,請(qǐng)見(jiàn)Michael Nygard的Release It!)
結(jié)論
java.util.concurrent包含有許多更優(yōu)雅的工具,它們出于集合框架,但更勝之,特別是.locks和.atomic包中的類。深入挖掘之,你將發(fā)現(xiàn)像CyclicBarrier這樣的十分有用的控制結(jié)構(gòu),甚至于更多。
下一次,我們將步入一個(gè)新的主題:你所不知道的五件關(guān)于Jar的事情。
請(qǐng)關(guān)注你所不知道的五件事情--java.util.concurrent(第一部分)