java.util.concurrent 包含許多線程安全、測試良好、高性能的并發(fā)構(gòu)建塊。不客氣地說,創(chuàng)建 java.util.concurrent 的目的就是要實現(xiàn) Collection 框架對數(shù)據(jù)結(jié)構(gòu)所執(zhí)行的并發(fā)操作。通過提供一組可靠的、高性能并發(fā)構(gòu)建塊,開發(fā)人員可以提高并發(fā)類的線程安全、可伸縮性、性能、可讀性和可靠性。
如果一些類名看起來相似,可能是因為 java.util.concurrent 中的許多概念源自 Doug Lea 的 util.concurrent 庫(請參閱 參考資料)。
JDK 5.0 中的并發(fā)改進(jìn)可以分為三組:
JVM 級別更改。大多數(shù)現(xiàn)代處理器對并發(fā)對某一硬件級別提供支持,通常以 compare-and-swap (CAS)指令形式。CAS 是一種低級別的、細(xì)粒度的技術(shù),它允許多個線程更新一個內(nèi)存位置,同時能夠檢測其他線程的沖突并進(jìn)行恢復(fù)。它是許多高性能并發(fā)算法的基礎(chǔ)。在 JDK 5.0 之前,Java 語言中用于協(xié)調(diào)線程之間的訪問的惟一原語是同步,同步是更重量級和粗粒度的。公開 CAS 可以開發(fā)高度可伸縮的并發(fā) Java 類。這些更改主要由 JDK 庫類使用,而不是由開發(fā)人員使用。
低級實用程序類 -- 鎖定和原子類。使用 CAS 作為并發(fā)原語,ReentrantLock 類提供與 synchronized 原語相同的鎖定和內(nèi)存語義,然而這樣可以更好地控制鎖定(如計時的鎖定等待、鎖定輪詢和可中斷的鎖定等待)和提供更好的可伸縮性(競爭時的高性能)。大多數(shù)開發(fā)人員將不再直接使用 ReentrantLock 類,而是使用在 ReentrantLock 類上構(gòu)建的高級類。
高級實用程序類。這些類實現(xiàn)并發(fā)構(gòu)建塊,每個計算機(jī)科學(xué)文本中都會講述這些類 -- 信號、互斥、閂鎖、屏障、交換程序、線程池和線程安全集合類等。大部分開發(fā)人員都可以在應(yīng)用程序中用這些類,來替換許多(如果不是全部)同步、wait() 和 notify() 的使用,從而提高性能、可讀性和正確性。
本教程將重點介紹 java.util.concurrent 包提供的高級實用程序類 -- 線程安全集合、線程池和同步實用程序。這些是初學(xué)者和專家都可以使用的"現(xiàn)成"類。
在第一小節(jié)中,我們將回顧并發(fā)的基本知識,盡管它不應(yīng)取代對線程和線程安全的了解。那些一點都不熟悉線程的讀者應(yīng)該先參考一些關(guān)于線程的介紹,如"Introduction to Java Threads"教程(請參閱參考資料)。
接下來的幾個小節(jié)將研究 java.util.concurrent 中的高級實用程序類 -- 線程安全集合、線程池、信號和同步工具。
最后一小節(jié)將介紹 java.util.concurrent 中的低級并發(fā)構(gòu)建塊,并提供一些性能測評來顯示新 java.util.concurrent 類的可伸縮性的改進(jìn)。
什么是線程?
所有重要的操作系統(tǒng)都支持進(jìn)程的概念 -- 獨立運行的程序,在某種程度上相互隔離。
線程有時稱為 輕量級進(jìn)程。與進(jìn)程一樣,它們擁有通過程序運行的獨立的并發(fā)路徑,并且每個線程都有自己的程序計數(shù)器,稱為堆棧和本地變量。然而,線程存在于進(jìn)程中,它們與同一進(jìn)程內(nèi)的其他線程共享內(nèi)存、文件句柄以及每進(jìn)程狀態(tài)。
今天,幾乎每個操作系統(tǒng)都支持線程,允許執(zhí)行多個可獨立調(diào)度的線程,以便共存于一個進(jìn)程中。因為一個進(jìn)程中的線程是在同一個地址空間中執(zhí)行的,所以多個線程可以同時訪問相同對象,并且它們從同一堆棧中分配對象。雖然這使線程更易于與其他線程共享信息,但也意味著您必須確保線程之間不相互干涉。
正確使用線程時,線程能帶來諸多好處,其中包括更好的資源利用、簡化開發(fā)、高吞吐量、更易響應(yīng)的用戶界面以及能執(zhí)行異步處理。
Java 語言包括用于協(xié)調(diào)線程行為的原語,從而可以在不違反設(shè)計原型或者不破壞數(shù)據(jù)結(jié)構(gòu)的前提下安全地訪問和修改共享變量。
線程有哪些功能?
在 Java 程序中存在很多理由使用線程,并且不管開發(fā)人員知道線程與否,幾乎每個 Java 應(yīng)用程序都使用線程。許多 J2SE 和 J2EE 工具可以創(chuàng)建線程,如 RMI、Servlet、Enterprise JavaBeans 組件和 Swing GUI 工具包。
使用線程的理由包括:
更易響應(yīng)的用戶界面。 事件驅(qū)動的 GUI 工具包(如 AWT 或 Swing)使用單獨的事件線程來處理 GUI 事件。從事件線程中調(diào)用通過 GUI 對象注冊的事件監(jiān)聽器。然而,如果事件監(jiān)聽器將執(zhí)行冗長的任務(wù)(如文檔拼寫檢查),那么 UI 將出現(xiàn)凍結(jié),因為事件線程直到冗長任務(wù)完畢之后才能處理其他事件。通過在單獨線程中執(zhí)行冗長操作,當(dāng)執(zhí)行冗長后臺任務(wù)時,UI 能繼續(xù)響應(yīng)。
使用多處理器。 多處理器(MP)系統(tǒng)變得越來越便宜,并且分布越來越廣泛。因為調(diào)度的基本單位通常是線程,所以不管有多少處理器可用,一個線程的應(yīng)用程序一次只能在一個處理器上運行。在設(shè)計良好的程序中,通過更好地利用可用的計算機(jī)資源,多線程能夠提高吞吐量和性能。
簡化建模。 有效使用線程能夠使程序編寫變得更簡單,并易于維護(hù)。通過合理使用線程,個別類可以避免一些調(diào)度的詳細(xì)、交叉存取操作、異步 IO 和資源等待以及其他復(fù)雜問題。相反,它們能專注于域的要求,簡化開發(fā)并改進(jìn)可靠性。
異步或后臺處理。 服務(wù)器應(yīng)用程序可以同時服務(wù)于許多遠(yuǎn)程客戶機(jī)。如果應(yīng)用程序從 socket 中讀取數(shù)據(jù),并且沒有數(shù)據(jù)可以讀取,那么對 read() 的調(diào)用將被阻塞,直到有數(shù)據(jù)可讀。在單線程應(yīng)用程序中,這意味著當(dāng)某一個線程被阻塞時,不僅處理相應(yīng)請求要延遲,而且處理所有請求也將延遲。然而,如果每個 socket 都有自己的 IO 線程,那么當(dāng)一個線程被阻塞時,對其他并發(fā)請求行為沒有影響。
線程安全
如果將這些類用于多線程環(huán)境中,雖然確保這些類的線程安全比較困難,但線程安全卻是必需的。java.util.concurrent 規(guī)范進(jìn)程的一個目標(biāo)就是提供一組線程安全的、高性能的并發(fā)構(gòu)建塊,從而使開發(fā)人員能夠減輕一些編寫線程安全類的負(fù)擔(dān)。
線程安全類非常難以明確定義,大多數(shù)定義似乎都是完全循環(huán)的。快速 Google 搜索會顯示下列線程安全代碼定義的例子,但這些定義(或者更確切地說是描述)通常沒什么幫助:
. . . can be called from multiple programming threads without unwanted interaction between the threads.
. . . may be called by more than on thread at a time without requiring any other action on the caller's part.
通過類似這樣的定義,不奇怪我們?yōu)槭裁磳€程安全如此迷惑。這些定義幾乎就是在說"如果可以從多個線程安全調(diào)用類,那么該類就是線程安全的"。這當(dāng)然是線程安全的解釋,但對我們區(qū)別線程安全類和不安全類沒有什么幫助。我們使用"安全"是為了說明什么?
要成為線程安全的類,首先它必須在單線程環(huán)境中正確運行。如果正確實現(xiàn)了類,那么說明它符合規(guī)范,對該類的對象的任何順序的操作(公共字段的讀寫、公共方法的調(diào)用)都不應(yīng)該使對象處于無效狀態(tài);觀察將處于無效狀態(tài)的對象;或違反類的任何變量、前置條件或后置條件。
而且,要成為線程安全的類,在從多個線程訪問時,它必須繼續(xù)正確運行,而不管運行時環(huán)境執(zhí)行那些線程的調(diào)度和交叉,且無需對部分調(diào)用代碼執(zhí)行任何其他同步。結(jié)果是對線程安全對象的操作將用于按固定的整體一致順序出現(xiàn)所有線程。
如果沒有線程之間的某種明確協(xié)調(diào),比如鎖定,運行時可以隨意在需要時在多線程中交叉操作執(zhí)行。
在 JDK 5.0 之前,確保線程安全的主要機(jī)制是 synchronized 原語。訪問共享變量(那些可以由多個線程訪問的變量)的線程必須使用同步來協(xié)調(diào)對共享變量的讀寫訪問。java.util.concurrent 包提供了一些備用并發(fā)原語,以及一組不需要任何其他同步的線程安全實用程序類。
令人厭煩的并發(fā)
即使您的程序從沒有明確創(chuàng)建線程,也可能會有許多工具或框架代表您創(chuàng)建了線程,這時要求從這些線程調(diào)用的類是線程安全的。這樣會對開發(fā)人員帶來較大的設(shè)計和實現(xiàn)負(fù)擔(dān),因為開發(fā)線程安全類比開發(fā)非線程安全類有更多要注意的事項,且需要更多的分析。
AWT 和 Swing
這些 GUI 工具包創(chuàng)建了稱為時間線程的后臺線程,將從該線程調(diào)用通過 GUI 組件注冊的監(jiān)聽器。因此,實現(xiàn)這些監(jiān)聽器的類必須是線程安全的。
TimerTask
JDK 1.3 中引入的 TimerTask 工具允許稍后執(zhí)行任務(wù)或計劃定期執(zhí)行任務(wù)。在 Timer 線程中執(zhí)行 TimerTask 事件,這意味著作為 TimerTask 執(zhí)行的任務(wù)必須是線程安全的。
Servlet 和 JavaServer Page 技術(shù)
Servlet 容器可以創(chuàng)建多個線程,在多個線程中同時調(diào)用給定 servlet,從而進(jìn)行多個請求。因此 servlet 類必須是線程安全的。
RMI
遠(yuǎn)程方法調(diào)用(remote method invocation,RMI)工具允許調(diào)用其他 JVM 中運行的操作。實現(xiàn)遠(yuǎn)程對象最普遍的方法是擴(kuò)展 UnicastRemoteObject。例示 UnicastRemoteObject 時,它是通過 RMI 調(diào)度器注冊的,該調(diào)度器可能創(chuàng)建一個或多個線程,將在這些線程中執(zhí)行遠(yuǎn)程方法。因此,遠(yuǎn)程類必須是線程安全的。
正如所看到的,即使應(yīng)用程序沒有明確創(chuàng)建線程,也會發(fā)生許多可能會從其他線程調(diào)用類的情況。幸運的是,java.util.concurrent 中的類可以大大簡化編寫線程安全類的任務(wù)。
例子 -- 非線程安全 servlet
下列 servlet 看起來像無害的留言板 servlet,它保存每個來訪者的姓名。然而,該 servlet 不是線程安全的,而這個 servlet 應(yīng)該是線程安全的。問題在于它使用 HashSet 存儲來訪者的姓名,HashSet 不是線程安全的類。
當(dāng)我們說這個 servlet 不是線程安全的時,是說它所造成的破壞不僅僅是丟失留言板輸入。在最壞的情況下,留言板數(shù)據(jù)結(jié)構(gòu)都可能被破壞并且無法恢復(fù)。
public class UnsafeGuestbookServlet extends HttpServlet {
private Set visitorSet = new HashSet();
protected void doGet(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
String visitorName = httpServletRequest.getParameter("NAME");
if (visitorName != null)
visitorSet.add(visitorName);
}
}
通過將 visitorSet 的定義更改為下列代碼,可以使該類變?yōu)榫€程安全的:
private Set visitorSet = Collections.synchronizedSet(new HashSet());
如上所示的例子顯示線程的內(nèi)置支持是一把雙刃劍 -- 雖然它使構(gòu)建多線程應(yīng)用程序變得很容易,但它同時要求開發(fā)人員更加注意并發(fā)問題,甚至在使用留言板 servlet 這樣普通的東西時也是如此。
線程安全集合
JDK 1.2 中引入的 Collection 框架是一種表示對象集合的高度靈活的框架,它使用基本接口 List、Set 和 Map。通過 JDK 提供每個集合的多次實現(xiàn)(HashMap、Hashtable、TreeMap、WeakHashMap、HashSet、TreeSet、Vector、ArrayList、LinkedList 等等)。其中一些集合已經(jīng)是線程安全的(Hashtable 和 Vector),通過同步的封裝工廠(Collections.synchronizedMap()、synchronizedList() 和 synchronizedSet()),其余的集合均可表現(xiàn)為線程安全的。
java.util.concurrent 包添加了多個新的線程安全集合類(ConcurrentHashMap、CopyOnWriteArrayList 和 CopyOnWriteArraySet)。這些類的目的是提供高性能、高度可伸縮性、線程安全的基本集合類型版本。
java.util 中的線程集合仍有一些缺點。例如,在迭代鎖定時,通常需要將該鎖定保留在集合中,否則,會有拋出 ConcurrentModificationException 的危險。(這個特性有時稱為條件線程安全;有關(guān)的更多說明,請參閱 參考資料。)此外,如果從多個線程頻繁地訪問集合,則常常不能很好地執(zhí)行這些類。java.util.concurrent 中的新集合類允許通過在語義中的少量更改來獲得更高的并發(fā)。
JDK 5.0 還提供了兩個新集合接口 -- Queue 和 BlockingQueue。Queue 接口與 List 類似,但它只允許從后面插入,從前面刪除。通過消除 List 的隨機(jī)訪問要求,可以創(chuàng)建比現(xiàn)有 ArrayList 和 LinkedList 實現(xiàn)性能更好的 Queue 實現(xiàn)。因為 List 的許多應(yīng)用程序?qū)嶋H上不需要隨機(jī)訪問,所以Queue 通常可以替代 List,來獲得更好的性能。
弱一致的迭代器
java.util 包中的集合類都返回 fail-fast 迭代器,這意味著它們假設(shè)線程在集合內(nèi)容中進(jìn)行迭代時,集合不會更改它的內(nèi)容。如果 fail-fast 迭代器檢測到在迭代過程中進(jìn)行了更改操作,那么它會拋出 ConcurrentModificationException,這是不可控異常。
在迭代過程中不更改集合的要求通常會對許多并發(fā)應(yīng)用程序造成不便。相反,比較好的是它允許并發(fā)修改并確保迭代器只要進(jìn)行合理操作,就可以提供集合的一致視圖,如 java.util.concurrent 集合類中的迭代器所做的那樣。
java.util.concurrent 集合返回的迭代器稱為弱一致的(weakly consistent)迭代器。對于這些類,如果元素自從迭代開始已經(jīng)刪除,且尚未由 next() 方法返回,那么它將不返回到調(diào)用者。如果元素自迭代開始已經(jīng)添加,那么它可能返回調(diào)用者,也可能不返回。在一次迭代中,無論如何更改底層集合,元素不會被返回兩次。
CopyOnWriteArrayList 和 CopyOnWriteArraySet
可以用兩種方法創(chuàng)建線程安全支持?jǐn)?shù)據(jù)的 List -- Vector 或封裝 ArrayList 和 Collections.synchronizedList()。java.util.concurrent 包添加了名稱繁瑣的 CopyOnWriteArrayList。為什么我們想要新的線程安全的 List 類?為什么 Vector 還不夠?
最簡單的答案是與迭代和并發(fā)修改之間的交互有關(guān)。使用 Vector 或使用同步的 List 封裝器,返回的迭代器是 fail-fast 的,這意味著如果在迭代過程中任何其他線程修改 List,迭代可能失敗。
Vector 的非常普遍的應(yīng)用程序是存儲通過組件注冊的監(jiān)聽器的列表。當(dāng)發(fā)生適合的事件時,該組件將在監(jiān)聽器的列表中迭代,調(diào)用每個監(jiān)聽器。為了防止 ConcurrentModificationException,迭代線程必須復(fù)制列表或鎖定列表,以便進(jìn)行整體迭代,而這兩種情況都需要大量的性能成本。
CopyOnWriteArrayList 類通過每次添加或刪除元素時創(chuàng)建支持?jǐn)?shù)組的新副本,避免了這個問題,但是進(jìn)行中的迭代保持對創(chuàng)建迭代器時的當(dāng)前副本進(jìn)行操作。雖然復(fù)制也會有一些成本,但是在許多情況下,迭代要比修改多得多,在這些情況下,寫入時復(fù)制要比其他備用方法具有更好的性能和并發(fā)性。
如果應(yīng)用程序需要 Set 語義,而不是 List,那么還有一個 Set 版本 -- CopyOnWriteArraySet。
ConcurrentHashMap
正如已經(jīng)存在線程安全的 List 的實現(xiàn),您可以用多種方法創(chuàng)建線程安全的、基于 hash 的 Map -- Hashtable,并使用 Collections.synchronizedMap() 封裝 HashMap。JDK 5.0 添加了 ConcurrentHashMap 實現(xiàn),該實現(xiàn)提供了相同的基本線程安全的 Map 功能,但它大大提高了并發(fā)性。
Hashtable 和 synchronizedMap 所采取的獲得同步的簡單方法(同步 Hashtable 中或者同步的 Map 封裝器對象中的每個方法)有兩個主要的不足。首先,這種方法對于可伸縮性是一種障礙,因為一次只能有一個線程可以訪問 hash 表。同時,這樣仍不足以提供真正的線程安全性,許多公用的混合操作仍然需要額外的同步。雖然諸如 get() 和 put() 之類的簡單操作可以在不需要額外同步的情況下安全地完成,但還是有一些公用的操作序列,例如迭代或者 put-if-absent(空則放入),需要外部的同步,以避免數(shù)據(jù)爭用。
Hashtable 和 Collections.synchronizedMap 通過同步每個方法獲得線程安全。這意味著當(dāng)一個線程執(zhí)行一個 Map 方法時,無論其他線程要對 Map 進(jìn)行什么樣操作,都不能執(zhí)行,直到第一個線程結(jié)束才可以。
對比來說,ConcurrentHashMap 允許多個讀取幾乎總是并發(fā)執(zhí)行,讀和寫操作通常并發(fā)執(zhí)行,多個同時寫入經(jīng)常并發(fā)執(zhí)行。結(jié)果是當(dāng)多個線程需要訪問同一 Map 時,可以獲得更高的并發(fā)性。
在大多數(shù)情況下,ConcurrentHashMap 是 Hashtable或 Collections.synchronizedMap(new HashMap()) 的簡單替換。然而,其中有一個顯著不同,即 ConcurrentHashMap 實例中的同步不鎖定映射進(jìn)行獨占使用。實際上,沒有辦法鎖定 ConcurrentHashMap 進(jìn)行獨占使用,它被設(shè)計用于進(jìn)行并發(fā)訪問。為了使集合不被鎖定進(jìn)行獨占使用,還提供了公用的混合操作的其他(原子)方法,如 put-if-absent。ConcurrentHashMap 返回的迭代器是弱一致的,意味著它們將不拋出ConcurrentModificationException ,將進(jìn)行"合理操作"來反映迭代過程中其他線程對 Map 的修改。
隊列
原始集合框架包含三個接口:List、Map 和 Set。List 描述了元素的有序集合,支持完全隨即訪問 -- 可以在任何位置添加、提取或刪除元素。
LinkedList 類經(jīng)常用于存儲工作元素(等待執(zhí)行的任務(wù))的列表或隊列。然而,List 提供的靈活性比該公用應(yīng)用程序所需要的多得多,這個應(yīng)用程序通常在后面插入元素,從前面刪除元素。但是要支持完整 List 接口則意味著 LinkedList 對于這項任務(wù)不像原來那樣有效。Queue 接口比 List 簡單得多,僅包含 put() 和 take() 方法,并允許比 LinkedList 更有效的實現(xiàn)。
Queue 接口還允許實現(xiàn)來確定存儲元素的順序。ConcurrentLinkedQueue 類實現(xiàn)先進(jìn)先出(first-in-first-out,F(xiàn)IFO)隊列,而 PriorityQueue 類實現(xiàn)優(yōu)先級隊列(也稱為堆),它對于構(gòu)建調(diào)度器非常有用,調(diào)度器必須按優(yōu)先級或預(yù)期的執(zhí)行時間執(zhí)行任務(wù)。
interface Queue<E> extends Collection<E> {
boolean offer(E x);
E poll();
E remove() throws NoSuchElementException;
E peek();
E element() throws NoSuchElementException;
}
實現(xiàn) Queue 的類是:
LinkedList 已經(jīng)進(jìn)行了改進(jìn)來實現(xiàn) Queue。
PriorityQueue 非線程安全的優(yōu)先級對列(堆)實現(xiàn),根據(jù)自然順序或比較器返回元素。
ConcurrentLinkedQueue 快速、線程安全的、無阻塞 FIFO 隊列。
任務(wù)管理之線程創(chuàng)建
線程最普遍的一個應(yīng)用程序是創(chuàng)建一個或多個線程,以執(zhí)行特定類型的任務(wù)。Timer 類創(chuàng)建線程來執(zhí)行 TimerTask 對象,Swing 創(chuàng)建線程來處理 UI 事件。在這兩種情況中,在單獨線程中執(zhí)行的任務(wù)都假定是短期的,這些線程是為了處理大量短期任務(wù)而存在的。
在其中每種情況中,這些線程一般都有非常簡單的結(jié)構(gòu):
while (true) {
if (no tasks)
wait for a task;
execute the task;
}
通過例示從 Thread 獲得的對象并調(diào)用 Thread.start() 方法來創(chuàng)建線程。可以用兩種方法創(chuàng)建線程:通過擴(kuò)展 Thread 和覆蓋 run() 方法,或者通過實現(xiàn) Runnable 接口和使用 Thread(Runnable) 構(gòu)造函數(shù):
class WorkerThread extends Thread {
public void run() { /* do work */ }
}
Thread t = new WorkerThread();
t.start();
或者:
Thread t = new Thread(new Runnable() {
public void run() { /* do work */ }
}
t.start();
重新使用線程
因為多個原因,類似 Swing GUI 的框架為事件任務(wù)創(chuàng)建單一線程,而不是為每項任務(wù)創(chuàng)建新的線程。首先是因為創(chuàng)建線程會有間接成本,所以創(chuàng)建線程來執(zhí)行簡單任務(wù)將是一種資源浪費。通過重新使用事件線程來處理多個事件,啟動和拆卸成本(隨平臺而變)會分?jǐn)傇诙鄠€事件上。
Swing 為事件使用單一后臺線程的另一個原因是確保事件不會互相干涉,因為直到前一事件結(jié)束,下一事件才開始處理。該方法簡化了事件處理程序的編寫。使用多個線程,將要做更多的工作來確保一次僅一個線程地執(zhí)行線程相關(guān)的代碼。
如何不對任務(wù)進(jìn)行管理
大多數(shù)服務(wù)器應(yīng)用程序(如 Web 服務(wù)器、POP 服務(wù)器、數(shù)據(jù)庫服務(wù)器或文件服務(wù)器)代表遠(yuǎn)程客戶機(jī)處理請求,這些客戶機(jī)通常使用 socket 連接到服務(wù)器。對于每個請求,通常要進(jìn)行少量處理(獲得該文件的代碼塊,并將其發(fā)送回 socket),但是可能會有大量(且不受限制)的客戶機(jī)請求服務(wù)。
用于構(gòu)建服務(wù)器應(yīng)用程序的簡單化模型會為每個請求創(chuàng)建新的線程。下列代碼段實現(xiàn)簡單的 Web 服務(wù)器,它接受端口 80 的 socket 連接,并創(chuàng)建新的線程來處理請求。不幸的是,該代碼不是實現(xiàn) Web 服務(wù)器的好方法,因為在重負(fù)載條件下它將失敗,停止整臺服務(wù)器。
class UnreliableWebServer {
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
// Don't do this!
new Thread(r).start();
}
}
}
當(dāng)服務(wù)器被請求吞沒時,UnreliableWebServer 類不能很好地處理這種情況。每次有請求時,就會創(chuàng)建新的類。根據(jù)操作系統(tǒng)和可用內(nèi)存,可以創(chuàng)建的線程數(shù)是有限的。不幸的是,您通常不知道限制是多少 -- 只有當(dāng)應(yīng)用程序因為 OutOfMemoryError 而崩潰時才發(fā)現(xiàn)。 如果足夠快地在這臺服務(wù)器上拋出請求的話,最終其中一個線程創(chuàng)建將失敗,生成的 Error 會關(guān)閉整個應(yīng)用程序。當(dāng)一次僅能有效支持很少線程時,沒有必要創(chuàng)建上千個線程,無論如何,這樣使用資源可能會損害性能。創(chuàng)建線程會使用相當(dāng)一部分內(nèi)存,其中包括有兩個堆棧(Java 和 C),以及每線程數(shù)據(jù)結(jié)構(gòu)。如果創(chuàng)建過多線程,其中每個線程都將占用一些 CPU 時間,結(jié)果將使用許多內(nèi)存來支持大量線程,每個線程都運行得很慢。這樣就無法很好地使用計算資源。
使用線程池解決問題
為任務(wù)創(chuàng)建新的線程并不一定不好,但是如果創(chuàng)建任務(wù)的頻率高,而平均任務(wù)持續(xù)時間低,我們可以看到每項任務(wù)創(chuàng)建一個新的線程將產(chǎn)生性能(如果負(fù)載不可預(yù)知,還有穩(wěn)定性)問題。 如果不是每項任務(wù)創(chuàng)建一個新的線程,則服務(wù)器應(yīng)用程序必須采取一些方法來限制一次可以處理的請求數(shù)。這意味著每次需要啟動新的任務(wù)時,它不能僅調(diào)用下列代碼。
new Thread(runnable).start()
管理一大組小任務(wù)的標(biāo)準(zhǔn)機(jī)制是組合工作隊列和線程池。工作隊列就是要處理的任務(wù)的隊列,前面描述的 Queue 類完全適合。線程池是線程的集合,每個線程都提取公用工作隊列。當(dāng)一個工作線程完成任務(wù)處理后,它會返回隊列,查看是否有其他任務(wù)需要處理。如果有,它會轉(zhuǎn)移到下一個任務(wù),并開始處理。
線程池為線程生命周期間接成本問題和資源崩潰問題提供了解決方案。通過對多個任務(wù)重新使用線程,創(chuàng)建線程的間接成本將分布到多個任務(wù)中。作為一種額外好處,因為請求到達(dá)時,線程已經(jīng)存在,從而可以消除由創(chuàng)建線程引起的延遲。因此,可以立即處理請求,使應(yīng)用程序更易響應(yīng)。而且,通過正確調(diào)整線程池中的線程數(shù),可以強(qiáng)制超出特定限制的任何請求等待,直到有線程可以處理它,它們等待時所消耗的資源要少于使用額外線程所消耗的資源,這樣可以防止資源崩潰。
Executor 框架
java.util.concurrent 包中包含靈活的線程池實現(xiàn),但是更重要的是,它包含用于管理實現(xiàn) Runnable 的任務(wù)的執(zhí)行的整個框架。該框架稱為 Executor 框架。
Executor 接口相當(dāng)簡單。它描述將運行 Runnable 的對象:
public interface Executor {
void execute(Runnable command);
}
任務(wù)運行于哪個線程不是由該接口指定的,這取決于使用的 Executor 的實現(xiàn)。它可以運行于后臺線程,如 Swing 事件線程,或者運行于線程池,或者調(diào)用線程,或者新的線程,它甚至可以運行于其他 JVM!通過同步的 Executor 接口提交任務(wù),從任務(wù)執(zhí)行策略中刪除任務(wù)提交。Executor 接口獨自關(guān)注任務(wù)提交 -- 這是Executor 實現(xiàn)的選擇,確定執(zhí)行策略。這使在部署時調(diào)整執(zhí)行策略(隊列限制、池大小、優(yōu)先級排列等等)更加容易,更改的代碼最少。
java.util.concurrent 中的大多數(shù) Executor 實現(xiàn)還實現(xiàn) ExecutorService 接口,這是對 Executor 的擴(kuò)展,它還管理執(zhí)行服務(wù)的生命周期。這使它們更易于管理,并向生命可能比單獨 Executor 的生命更長的應(yīng)用程序提供服務(wù)。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
}
Executor
java.util.concurrent 包包含多個 Executor 實現(xiàn),每個實現(xiàn)都實現(xiàn)不同的執(zhí)行策略。什么是執(zhí)行策略?執(zhí)行策略定義何時在哪個線程中運行任務(wù),
執(zhí)行任務(wù)可能消耗的資源級別(線程、內(nèi)存等等),以及如果執(zhí)行程序超載該怎么辦。
執(zhí)行程序通常通過工廠方法例示,而不是通過構(gòu)造函數(shù)。Executors 類包含用于構(gòu)造許多不同類型的 Executor 實現(xiàn)的靜態(tài)工廠方法:
Executors.newCachedThreadPool() 創(chuàng)建不限制大小的線程池,但是當(dāng)以前創(chuàng)建的線程可以使用時將重新使用那些線程。如果沒有現(xiàn)有線程可用,
將創(chuàng)建新的線程并將其添加到池中。使用不到 60 秒的線程將終止并從緩存中刪除。
Executors.newFixedThreadPool(int n) 創(chuàng)建線程池,其重新使用在不受限制的隊列之外運行的固定線程組。在關(guān)閉前,所有線程都會因為執(zhí)行
過程中的失敗而終止,如果需要執(zhí)行后續(xù)任務(wù),將會有新的線程來代替這些線程。
Executors.newSingleThreadExecutor() 創(chuàng)建 Executor,其使用在不受限制的隊列之外運行的單一工作線程,與 Swing 事件線程非常相似。
保證順序執(zhí)行任務(wù),在任何給定時間,不會有多個任務(wù)處于活動狀態(tài)。
更可靠的 Web 服務(wù)器 -- 使用 Executor
前面 如何不對任務(wù)進(jìn)行管理中的代碼顯示了如何不用編寫可靠服務(wù)器應(yīng)用程序。幸運的是,修復(fù)這個示例非常簡單,只需將 Thread.start() 調(diào)用替換為向 Executor 提交任務(wù)即可:
class ReliableWebServer {
Executor pool =
Executors.newFixedThreadPool(7);
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
pool.execute(r);
}
}
}
注意,本例與前例之間的區(qū)別僅在于 Executor 的創(chuàng)建以及如何提交執(zhí)行的任務(wù)。
定制ThreadPoolExecutor
Executors中的 newFixedThreadPool 和 newCachedThreadPool 工廠方法返回的 Executor 是類 ThreadPoolExecutor 的實例,是高度可定制的。
通過使用包含 ThreadFactory 變量的工廠方法或構(gòu)造函數(shù)的版本,可以定義池線程的創(chuàng)建。ThreadFactory 是工廠對象,其構(gòu)造執(zhí)行程序要使用的新線程。
使用定制的線程工廠,創(chuàng)建的線程可以包含有用的線程名稱,并且這些線程是守護(hù)線程,屬于特定線程組或具有特定優(yōu)先級。
下面是線程工廠的例子,它創(chuàng)建守護(hù)線程,而不是創(chuàng)建用戶線程:
public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
}
有時,Executor 不能執(zhí)行任務(wù),因為它已經(jīng)關(guān)閉或者因為 Executor 使用受限制隊列存儲等待任務(wù),而該隊列已滿。在這種情況下,需要咨詢執(zhí)行程序的RejectedExecutionHandler 來確定如何處理任務(wù) -- 拋出異常(默認(rèn)情況),放棄任務(wù),在調(diào)用者的線程中執(zhí)行任務(wù),或放棄隊列中最早的任務(wù)以為新任務(wù)騰出空間。ThreadPoolExecutor.setRejectedExecutionHandler 可以設(shè)置拒絕的執(zhí)行處理程序。 還可以擴(kuò)展 ThreadPoolExecutor,并覆蓋方法 beforeExecute 和 afterExecute,以添加裝置,添加記錄,添加計時,重新初始化線程本地變量,或進(jìn)行其他執(zhí)行定制。
需要特別考慮的問題
使用 Executor 框架會從執(zhí)行策略中刪除任務(wù)提交,一般情況下,人們希望這樣,那是因為它允許我們靈活地調(diào)整執(zhí)行策略,不必更改許多位置的代碼。然而,當(dāng)提交代碼暗含假設(shè)特定執(zhí)行策略時,存在多種情況,在這些情況下,重要的是選擇的 Executor 實現(xiàn)一致的執(zhí)行策略。 這類情況中的其中的一種就是一些任務(wù)同時等待其他任務(wù)完成。在這種情況下,當(dāng)線程池沒有足夠的線程時,如果所有當(dāng)前執(zhí)行的任務(wù)都在等待另一項任務(wù),而該任務(wù)因為線程池已滿不能執(zhí)行,那么線程池可能會死鎖。 另一種相似的情況是一組線程必須作為共同操作組一起工作。在這種情況下,需要確保線程池能夠容納所有線程。
如果應(yīng)用程序?qū)μ囟▓?zhí)行程序進(jìn)行了特定假設(shè),那么應(yīng)該在 Executor 定義和初始化的附近對這些進(jìn)行說明,從而使善意的更改不會破壞應(yīng)用程序的正確功能。
調(diào)整線程池
創(chuàng)建 Executor 時,人們普遍會問的一個問題是"線程池應(yīng)該有多大?"。當(dāng)然,答案取決于硬件和將執(zhí)行的任務(wù)類型(它們是受計算限制或是受 IO 的限制?)。
如果線程池太小,資源可能不能被充分利用,在一些任務(wù)還在工作隊列中等待執(zhí)行時,可能會有處理器處于閑置狀態(tài)。
另一方面,如果線程池太大,則將有許多有效線程,因為大量線程或有效任務(wù)使用內(nèi)存,或者因為每項任務(wù)要比使用少量線程有更多上下文切換,性能可能會受損。
所以假設(shè)為了使處理器得到充分使用,線程池應(yīng)該有多大?如果知道系統(tǒng)有多少處理器和任務(wù)的計算時間和等待時間的近似比率,Amdahl 法則提供很好的近似公式。
用 WT 表示每項任務(wù)的平均等待時間,ST 表示每項任務(wù)的平均服務(wù)時間(計算時間)。則 WT/ST 是每項任務(wù)等待所用時間的百分比。對于 N 處理器系統(tǒng),池中可以
近似有 N*(1+WT/ST) 個線程。
好的消息是您不必精確估計 WT/ST。"合適的"池大小的范圍相當(dāng)大;只需要避免"過大"和"過小"的極端情況即可。
Future 接口
Future 接口允許表示已經(jīng)完成的任務(wù)、正在執(zhí)行過程中的任務(wù)或者尚未開始執(zhí)行的任務(wù)。通過 Future 接口,可以嘗試取消尚未完成的任務(wù),查詢?nèi)蝿?wù)已經(jīng)完成還是取消了,以及提取(或等待)任務(wù)的結(jié)果值。
FutureTask 類實現(xiàn)了 Future,并包含一些構(gòu)造函數(shù),允許將 Runnable 或 Callable(會產(chǎn)生結(jié)果的 Runnable)和 Future 接口封裝。因為 FutureTask也實現(xiàn) Runnable,所以可以只將 FutureTask 提供給 Executor。一些提交方法(如 ExecutorService.submit())除了提交任務(wù)之外,還將返回 Future接口。
Future.get() 方法檢索任務(wù)計算的結(jié)果(或如果任務(wù)完成,但有異常,則拋出 ExecutionException)。如果任務(wù)尚未完成,那么 Future.get() 將被阻塞,直到任務(wù)完成;如果任務(wù)已經(jīng)完成,那么它將立即返回結(jié)果。
使用 Future 構(gòu)建緩存
該示例代碼與 java.util.concurrent 中的多個類關(guān)聯(lián),突出顯示了 Future 的功能。它實現(xiàn)緩存,使用 Future 描述緩存值,該值可能已經(jīng)計算,或者可能在其他線程中"正在構(gòu)造"。
它利用 ConcurrentHashMap 中的原子 putIfAbsent() 方法,確保僅有一個線程試圖計算給定關(guān)鍵字的值。如果其他線程隨后請求同一關(guān)鍵字的值,它僅能等待(通過 Future.get() 的幫助)第一個線程完成。因此兩個線程不會計算相同的值。
public class Cache<K, V> {
ConcurrentMap<K, FutureTask<V>> map = new ConcurrentHashMap();
Executor executor = Executors.newFixedThreadPool(8);
public V get(final K key) {
FutureTask<V> f = map.get(key);
if (f == null) {
Callable<V> c = new Callable<V>() {
public V call() {
// return value associated with key
}
};
f = new FutureTask<V>(c);
FutureTask old = map.putIfAbsent(key, f);
if (old == null)
executor.execute(f);
else
f = old;
}
return f.get();
}
}
CompletionService
CompletionService 將執(zhí)行服務(wù)與類似 Queue 的接口組合,從任務(wù)執(zhí)行中刪除任務(wù)結(jié)果的處理。CompletionService 接口包含用來提交將要執(zhí)行的任務(wù)的 submit() 方法和用來詢問下一完成任務(wù)的 take()/poll() 方法。
CompletionService 允許應(yīng)用程序結(jié)構(gòu)化,使用 Producer/Consumer 模式,其中生產(chǎn)者創(chuàng)建任務(wù)并提交,消費者請求完成任務(wù)的結(jié)果并處理這些結(jié)果。CompletionService 接口由 ExecutorCompletionService 類實現(xiàn),該類使用 Executor 處理任務(wù)并從 CompletionService 導(dǎo)出 submit/poll/take 方法。
下列代碼使用 Executor 和 CompletionService 來啟動許多"solver"任務(wù),并使用第一個生成非空結(jié)果的任務(wù)的結(jié)果,然后取消其余任務(wù):
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch(ExecutionException ignore) {}
}
}
finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
java.util.concurrent 中其他類別的有用的類也是同步工具。這組類相互協(xié)作,控制一個或多個線程的執(zhí)行流。
Semaphore、CyclicBarrier、CountdownLatch 和 Exchanger 類都是同步工具的例子。每個類都有線程可以調(diào)用的方法,
方法是否被阻塞取決于正在使用的特定同步工具的狀態(tài)和規(guī)則。
Semaphore
Semaphore 類實現(xiàn)標(biāo)準(zhǔn) Dijkstra 計數(shù)信號。計數(shù)信號可以認(rèn)為具有一定數(shù)量的許可權(quán),該許可權(quán)可以獲得或釋放。如果有剩余的許可權(quán),acquire() 方法將成功,
否則該方法將被阻塞,直到有可用的許可權(quán)(通過其他線程釋放許可權(quán))。線程一次可以獲得多個許可權(quán)。
計數(shù)信號可以用于限制有權(quán)對資源進(jìn)行并發(fā)訪問的線程數(shù)。該方法對于實現(xiàn)資源池或限制 Web 爬蟲(Web crawler)中的輸出 socket 連接非常有用。
注意信號不跟蹤哪個線程擁有多少許可權(quán);這由應(yīng)用程序來決定,以確保何時線程釋放許可權(quán),該信號表示其他線程擁有許可權(quán)或者正在釋放許可權(quán),以及其他線程知道
它的許可權(quán)已釋放。
互斥
計數(shù)信號的一種特殊情況是互斥,或者互斥信號。互斥就是具有單一許可權(quán)的計數(shù)信號,意味著在給定時間僅一個線程可以具有許可權(quán)(也稱為二進(jìn)制信號)。互斥可以
用于管理對共享資源的獨占訪問。
雖然互斥許多地方與鎖定一樣,但互斥還有一個鎖定通常沒有的其他功能,就是互斥可以由具有許可權(quán)的線程之外的其他線程來釋放。這在死鎖恢復(fù)時會非常有用。
CyclicBarrier 類可以幫助同步,它允許一組線程等待整個線程組到達(dá)公共屏障點。CyclicBarrier 是使用整型變量構(gòu)造的,其確定組中的線程數(shù)。當(dāng)一個線程到達(dá)屏障時(通過調(diào)用 CyclicBarrier.await()),它會被阻塞,直到所有線程都到達(dá)屏障,然后在該點允許所有線程繼續(xù)執(zhí)行。該操作與許多家庭逛商業(yè)街相似 -- 每個家庭成員都自己走,并商定 1:00 在電影院集合。當(dāng)您到電影院但不是所有人都到了時,您會坐下來等其他人到達(dá)。然后所有人一起離開。
認(rèn)為屏障是循環(huán)的是因為它可以重新使用;一旦所有線程都已經(jīng)在屏障處集合并釋放,則可以將該屏障重新初始化到它的初始狀態(tài)。
還可以指定在屏障處等待時的超時;如果在該時間內(nèi)其余線程還沒有到達(dá)屏障,則認(rèn)為屏障被打破,所有正在等待的線程會收到 BrokenBarrierException。
下列代碼將創(chuàng)建 CyclicBarrier 并啟動一組線程,每個線程將計算問題的一部分,等待所有其他線程結(jié)束之后,再檢查解決方案是否達(dá)成一致。如果不一致,那么每個工作線程將開始另一個迭代。該例將使用 CyclicBarrier 變量,它允許注冊 Runnable,在所有線程到達(dá)屏障但還沒有釋放任何線程時執(zhí)行 Runnable。
class Solver { // Code sketch
void solve(final Problem p, int nThreads) {
final CyclicBarrier barrier =
new CyclicBarrier(nThreads,
new Runnable() {
public void run() { p.checkConvergence(); }}
);
for (int i = 0; i < nThreads; ++i) {
final int id = i;
Runnable worker = new Runnable() {
final Segment segment = p.createSegment(id);
public void run() {
try {
while (!p.converged()) {
segment.update();
barrier.await();
}
}
catch(Exception e) { return; }
}
};
new Thread(worker).start();
}
}
CountdownLatch
CountdownLatch 類與 CyclicBarrier 相似,因為它的角色是對已經(jīng)在它們中間分?jǐn)偭藛栴}的一組線程進(jìn)行協(xié)調(diào)。它也是使用整型變量構(gòu)造的,指明計數(shù)的初始值,但是與 CyclicBarrier 不同的是,CountdownLatch 不能重新使用。
其中,CyclicBarrier 是到達(dá)屏障的所有線程的大門,只有當(dāng)所有線程都已經(jīng)到達(dá)屏障或屏障被打破時,才允許這些線程通過,CountdownLatch 將到達(dá)和等待功能分離。任何線程都可以通過調(diào)用 countDown() 減少當(dāng)前計數(shù),這種不會阻塞線程,而只是減少計數(shù)。await() 方法的行為與 CyclicBarrier.await() 稍微有所不同,調(diào)用 await() 任何線程都會被阻塞,直到閂鎖計數(shù)減少為零,在該點等待的所有線程才被釋放,對 await() 的后續(xù)調(diào)用將立即返回。
當(dāng)問題已經(jīng)分解為許多部分,每個線程都被分配一部分計算時,CountdownLatch 非常有用。在工作線程結(jié)束時,它們將減少計數(shù),協(xié)調(diào)線程可以在閂鎖處等待當(dāng)前這一批計算結(jié)束,然后繼續(xù)移至下一批計算。
相反地,具有計數(shù) 1 的 CountdownLatch 類可以用作"啟動大門",來立即啟動一組線程;工作線程可以在閂鎖處等待,協(xié)調(diào)線程減少計數(shù),從而立即釋放所有工作線程。下例使用兩個 CountdownLatche。一個作為啟動大門,一個在所有工作線程結(jié)束時釋放線程:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let them run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
}
Exchanger 類方便了兩個共同操作線程之間的雙向交換;這樣,就像具有計數(shù)為 2 的 CyclicBarrier,并且兩個線程在都到達(dá)屏障時可以"交換"一些狀態(tài)。(Exchanger 模式有時也稱為聚集。)
Exchanger 通常用于一個線程填充緩沖(通過讀取 socket),而另一個線程清空緩沖(通過處理從 socket 收到的命令)的情況。當(dāng)兩個線程在屏障處集合時,它們交換緩沖。下列代碼說明了這項技術(shù):
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = new DataBuffer();
DataBuffer initialFullBuffer = new DataBuffer();
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.full())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.empty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
鎖定和原子之Lock
Java 語言內(nèi)置了鎖定工具 -- synchronized 關(guān)鍵字。當(dāng)線程獲得監(jiān)視器時(內(nèi)置鎖定),其他線程如果試圖獲得相同鎖定,那么它們將被阻塞,直到第一個線程釋放該鎖定。同步還確保隨后獲得相同鎖定的線程可以看到之前的線程在具有該鎖定時所修改的變量的值,從而確保如果類正確地同步了共享狀態(tài)的訪問權(quán),那么線程將不會看到變量的"失效"值,這是緩存或編譯器優(yōu)化的結(jié)果。
雖然同步?jīng)]有什么問題,但它有一些限制,在一些高級應(yīng)用程序中會造成不便。Lock 接口將內(nèi)置監(jiān)視器鎖定的鎖定行為普遍化,允許多個鎖定實現(xiàn),同時提供一些內(nèi)置鎖定缺少的功能,如計時的等待、可中斷的等待、鎖定輪詢、每個鎖定有多個條件等待集合以及無阻塞結(jié)構(gòu)的鎖定。
interface Lock {
void lock();
void lockInterruptibly() throws IE;
boolean tryLock();
boolean tryLock(long time,
TimeUnit unit) throws IE;
void unlock();
Condition newCondition() throws
UnsupportedOperationException;
}
ReentrantLock
ReentrantLock 是具有與隱式監(jiān)視器鎖定(使用 synchronized 方法和語句訪問)相同的基本行為和語義的 Lock 的實現(xiàn),但它具有擴(kuò)展的能力。
作為額外收獲,在競爭條件下,ReentrantLock 的實現(xiàn)要比現(xiàn)在的 synchronized 實現(xiàn)更具有可伸縮性。(有可能在 JVM 的將來版本中改進(jìn) synchronized 的競爭性能。)
這意味著當(dāng)許多線程都競爭相同鎖定時,使用 ReentrantLock 的吞吐量通常要比 synchronized 好。換句話說,當(dāng)許多線程試圖訪問 ReentrantLock 保護(hù)的共享資源時,
JVM 將花費較少的時間來調(diào)度線程,而用更多個時間執(zhí)行線程。
雖然 ReentrantLock 類有許多優(yōu)點,但是與同步相比,它有一個主要缺點 -- 它可能忘記釋放鎖定。建議當(dāng)獲得和釋放 ReentrantLock 時使用下列結(jié)構(gòu):
Lock lock = new ReentrantLock();
...
lock.lock();
try {
// perform operations protected by lock
}
catch(Exception ex) {
// restore invariants
}
finally {
lock.unlock();
}
因為鎖定失誤(忘記釋放鎖定)的風(fēng)險,所以對于基本鎖定,強(qiáng)烈建議您繼續(xù)使用 synchronized,除非真的需要 ReentrantLock 額外的靈活性和可伸縮性。
ReentrantLock 是用于高級應(yīng)用程序的高級工具 -- 有時需要,但有時用原來的方法就很好。
Condition
就像 Lock 接口是同步的具體化,Condition 接口是 Object 中 wait() 和 notify() 方法的具體化。Lock 中的一個方法是 newCondition(),
它要求鎖定向該鎖定返回新的 Condition 對象限制。await()、signal() 和 signalAll() 方法類似于 wait()、notify() 和 notifyAll(),
但增加了靈活性,每個 Lock 都可以創(chuàng)建多個條件變量。這簡化了一些并發(fā)算法的實現(xiàn)。
ReadWriteLock
ReentrantLock 實現(xiàn)的鎖定規(guī)則非常簡單 -- 每當(dāng)一個線程具有鎖定時,其他線程必須等待,直到該鎖定可用。有時,當(dāng)對數(shù)據(jù)結(jié)構(gòu)的讀取通常多于修改時,
可以使用更復(fù)雜的稱為讀寫鎖定的鎖定結(jié)構(gòu),它允許有多個并發(fā)讀者,同時還允許一個寫入者獨占鎖定。該方法在一般情況下(只讀)提供了更大的并發(fā)性,同時
在必要時仍提供獨占訪問的安全性。ReadWriteLock 接口和 ReentrantReadWriteLock 類提供這種功能 -- 多讀者、單寫入者鎖定規(guī)則,可以用這種功能
來保護(hù)共享的易變資源。
原子變量
即使大多數(shù)用戶將很少直接使用它們,原子變量類(AtomicInteger、AtomicLong、AtomicReference 等等)也有充分理由是最顯著的新并發(fā)類。這些類公開對
JVM 的低級別改進(jìn),允許進(jìn)行具有高度可伸縮性的原子讀-修改-寫操作。大多數(shù)現(xiàn)代 CPU 都有原子讀-修改-寫的原語,比如比較并交換(CAS)或加載鏈接/條件存儲
(LL/SC)。原子變量類使用硬件提供的最快的并發(fā)結(jié)構(gòu)來實現(xiàn)。
許多并發(fā)算法都是根據(jù)對計數(shù)器或數(shù)據(jù)結(jié)構(gòu)的比較并交換操作來定義的。通過暴露高性能的、高度可伸縮的 CAS 操作(以原子變量的形式),用 Java 語言實現(xiàn)高性能、
無等待、無鎖定的并發(fā)算法已經(jīng)變得可行。
幾乎 java.util.concurrent 中的所有類都是在 ReentrantLock 之上構(gòu)建的,ReentrantLock 則是在原子變量類的基礎(chǔ)上構(gòu)建的。所以,雖然僅少數(shù)并發(fā)專家
使用原子變量類,但 java.util.concurrent 類的很多可伸縮性改進(jìn)都是由它們提供的。
原子變量主要用于為原子地更新"熱"字段提供有效的、細(xì)粒度的方式,"熱"字段是指由多個線程頻繁訪問和更新的字段。另外,原子變量還是計數(shù)器或生成序號的自然
機(jī)制。
性能與可伸縮性
雖然 java.util.concurrent 努力的首要目標(biāo)是使編寫正確、線程安全的類更加容易,但它還有一個次要目標(biāo),就是提供可伸縮性。可伸縮性與性能完全不同,實際上,
可伸縮性有時要以性能為代價來獲得。
性能是"可以快速執(zhí)行此任務(wù)的程度"的評測。可伸縮性描述應(yīng)用程序的吞吐量如何表現(xiàn)為它的工作量和可用計算資源增加。可伸縮的程序可以按比例使用更多的處理器、
內(nèi)存或 I/O 帶寬來處理更多個工作量。當(dāng)我們在并發(fā)環(huán)境中談?wù)摽缮炜s性時,我們是在問當(dāng)許多線程同時訪問給定類時,這個類的執(zhí)行情況。
java.util.concurrent 中的低級別類 ReentrantLock 和原子變量類的可伸縮性要比內(nèi)置監(jiān)視器(同步)鎖定高得多。因此,使用 ReentrantLock 或原子變量
類來協(xié)調(diào)共享訪問的類也可能更具有可伸縮性。
Hashtable 與 ConcurrentHashMap
作為可伸縮性的例子,ConcurrentHashMap 實現(xiàn)設(shè)計的可伸縮性要比其線程安全的上一代 Hashtable 的可伸縮性強(qiáng)得多。Hashtable 一次只允許一個線程訪問
Map;ConcurrentHashMap 允許多個讀者并發(fā)執(zhí)行,讀者與寫入者并發(fā)執(zhí)行,以及一些寫入者并發(fā)執(zhí)行。因此,如果許多線程頻繁訪問共享映射,使用
ConcurrentHashMap 的總的吞吐量要比使用 Hashtable 的好。
下表大致說明了 Hashtable 和 ConcurrentHashMap 之間的可伸縮性差別。在每次運行時,N 個線程并發(fā)執(zhí)行緊密循環(huán),它們從 Hashtable 或
ConcurrentHashMap 中檢索隨即關(guān)鍵字,60% 的失敗檢索將執(zhí)行 put() 操作,2% 的成功檢索執(zhí)行 remove() 操作。測試在運行 Linux 的雙處理器
Xeon 系統(tǒng)中執(zhí)行。數(shù)據(jù)顯示 10,000,000 個迭代的運行時間,對于 ConcurrentHashMap,標(biāo)準(zhǔn)化為一個線程的情況。可以看到直到許多線程,
ConcurrentHashMap 的性能仍保持可伸縮性,而 Hashtable 的性能在出現(xiàn)鎖定競爭時幾乎立即下降。
與通常的服務(wù)器應(yīng)用程序相比,這個測試中的線程數(shù)看起來很少。然而,因為每個線程未進(jìn)行其他操作,僅是重復(fù)地選擇使用該表,所以這樣可以模擬在執(zhí)行
一些實際工作的情況下使用該表的大量線程的競爭。
線程 ConcurrentHashMap Hashtable
1 1.0 1.51
2 1.44 17.09
4 1.83 29.9
8 4.06 54.06
16 7.5 119.44
32 15.32 237.2
Lock 與 synchronized 與原子
下列基準(zhǔn)說明了使用 java.util.concurrent 可能改進(jìn)可伸縮性的例子。該基準(zhǔn)將模擬旋轉(zhuǎn)骰子,使用線性同余隨機(jī)數(shù)生成器。有三個可用的隨機(jī)數(shù)生成器的實現(xiàn):一個使用同步來管理生成器的狀態(tài)(單一變量),一個使用 ReentrantLock,另一個則使用 AtomicLong。下圖顯示了在 8-way Ultrasparc3 系統(tǒng)上,逐漸增加線程數(shù)量時這三個版本的相對吞吐量。(該圖對原子變量方法的可伸縮性描述比較保守。)
圖 1. 使用同步、Lock 和 AtomicLong 的相對吞吐量
公平與不公平
java.util.concurrent 中許多類中的另外一個定制元素是"公平"的問題。公平鎖定或公平信號是指在其中根據(jù)先進(jìn)先出(FIFO)的原則給與線程鎖定或信號。ReentrantLock、Semaphore 和 ReentrantReadWriteLock 的構(gòu)造函數(shù)都可以使用變量確定鎖定是否公平,或者是否允許闖入(線程獲得鎖定,即使它們等待的時間不是最長)。
雖然闖入鎖定的想法可能有些可笑,但實際上不公平、闖入的鎖定非常普遍,且通常很受歡迎。使用同步訪問的內(nèi)置鎖定不是公平鎖定(且沒有辦法使它們公平)。相反,它們提供較弱的生病保證,要求所有線程最終都將獲得鎖定。
大多數(shù)應(yīng)用程序選擇(且應(yīng)該選擇)闖入鎖定而不是公平鎖定的原因是性能。在大多數(shù)情況下,完全的公平不是程序正確性的要求,真正公平的成本相當(dāng)高。下表向前面的面板中的表中添加了第四個數(shù)據(jù)集,并由一個公平鎖定管理對 PRNG 狀態(tài)的訪問。注意闖入鎖定與公平鎖定之間吞吐量的巨大差別。
圖 2. 使用同步、Lock、公平鎖定和 AtomicLong 的相對吞吐量
結(jié)束語
java.util.concurrent 包中包含大量有用的構(gòu)建快,可以用它們來改進(jìn)并發(fā)類的性能、可伸縮性、線程安全和可維護(hù)性。
通過這些構(gòu)建快,應(yīng)該可以不再需要在您的代碼中大量使用同步、wait/notify 和 Thread.start(),而用更高級別、標(biāo)準(zhǔn)化的、
高性能并發(fā)實用程序來替換它們。
Exchanger
CyclicBarrier
Synchronizer
凡是有該標(biāo)志的文章,都是該blog博主Caoer(草兒)原創(chuàng),凡是索引、收藏
、轉(zhuǎn)載請注明來處和原文作者。非常感謝