在《并發容器 part 4 并發隊列與Queue簡介》節中的類圖中可以看到,對于Queue來說,BlockingQueue是主要的線程安全版本。這是一個可阻塞的版本,也就是允許添加/刪除元素被阻塞,直到成功為止。
BlockingQueue相對于Queue而言增加了兩個操作:put/take。下面是一張整理的表格。
看似簡單的API,非常有用。這在控制隊列的并發上非常有好處。既然加入隊列和移除隊列能夠被阻塞,這在實現生產者-消費者模型上就簡單多了。
清單1 是生產者-消費者模型的一個例子。這個例子是一個真實的場景。服務端(ICE服務)接受客戶端的請求(accept),請求計算此人的好友生日,然后將計算的結果存取緩存中(Memcache)中。在這個例子中采用了ExecutorService實現多線程的功能,盡可能的提高吞吐量,這個在后面線程池的部分會詳細說明。目前就可以理解為new Thread(r).start()就可以了。另外這里阻塞隊列使用的是LinkedBlockingQueue。
清單1 一個生產者-消費者例子
package xylz.study.concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class BirthdayService {
final int workerNumber;
final Worker[] workers;
final ExecutorService threadPool;
static volatile boolean running = true;
public BirthdayService(int workerNumber, int capacity) {
if (workerNumber <= 0) throw new IllegalArgumentException();
this.workerNumber = workerNumber;
workers = new Worker[workerNumber];
for (int i = 0; i < workerNumber; i++) {
workers[i] = new Worker(capacity);
}
//
boolean b = running;// kill the resorting
threadPool = Executors.newFixedThreadPool(workerNumber);
for (Worker w : workers) {
threadPool.submit(w);
}
}
Worker getWorker(int id) {
return workers[id % workerNumber];
}
class Worker implements Runnable {
final BlockingQueue<Integer> queue;
public Worker(int capacity) {
queue = new LinkedBlockingQueue<Integer>(capacity);
}
public void run() {
while (true) {
try {
consume(queue.take());
} catch (InterruptedException e) {
return;
}
}
}
void put(int id) {
try {
queue.put(id);
} catch (InterruptedException e) {
return;
}
}
}
public void accept(int id) {
//accept client request
getWorker(id).put(id);
}
protected void consume(int id) {
//do the work
//get the list of friends and save the birthday to cache
}
}
在清單1 中可以看到不管是put()還是get(),都拋出了一個InterruptedException。我們就從這里開始,為什么會拋出這個異常。
上一節中提到實現一個并發隊列有三種方式。顯然只有第二種 Lock 才能實現阻塞隊列。在鎖機制中提到過,Lock結合Condition就可以實現線程的阻塞,這在鎖機制部分的很多工具中都詳細介紹過,而接下來要介紹的LinkedBlockingQueue就是采用這種方式。
LinkedBlockingQueue 原理
對比ConcurrentLinkedQueue的結構圖,LinkedBlockingQueue多了兩個ReentrantLock和兩個Condition以及用于計數的AtomicInteger,顯然這會導致LinkedBlockingQueue的實現有點復雜。對照此結構,有以下幾點說明:
- 但是整體上講,LinkedBlockingQueue和ConcurrentLinkedQueue的結構類似,都是采用頭尾節點,每個節點指向下一個節點的結構,這表示它們在操作上應該類似。
- LinkedBlockingQueue引入了原子計數器count,這意味著獲取隊列大小size()已經是常量時間了,不再需要遍歷隊列。每次隊列長度有變更時只需要修改count即可。
- 有了修改Node指向有了鎖,所以不需要volatile特性了。既然有了鎖Node的item為什么需要volatile在后面會詳細分析,暫且不表。
- 引入了兩個鎖,一個入隊列鎖,一個出隊列鎖。當然同時有一個隊列不滿的Condition和一個隊列不空的Condition。其實參照鎖機制前面介紹過的生產者-消費者模型就知道,入隊列就代表生產者,出隊列就代表消費者。為什么需要兩個鎖?一個鎖行不行?其實一個鎖完全可以,但是一個鎖意味著入隊列和出隊列同時只能有一個在進行,另一個必須等待其釋放鎖。而從ConcurrentLinkedQueue的實現原理來看,事實上head和last (ConcurrentLinkedQueue中是tail)是分離的,互相獨立的,這意味著入隊列實際上是不會修改出隊列的數據的,同時出隊列也不會修改入隊列,也就是說這兩個操作是互不干擾的。更通俗的將,這個鎖相當于兩個寫入鎖,入隊列是一種寫操作,操作head,出隊列是一種寫操作,操作tail。可見它們是無關的。但是并非完全無關,后面詳細分析。
在沒有揭示入隊列和出隊列過程前,暫且猜測下實現原理。
根據前面學到的鎖機制原理結合ConcurrentLinkedQueue的原理,入隊列的阻塞過程大概是這樣的:
- 獲取入隊列的鎖putLock,檢測隊列大小,如果隊列已滿,那么就掛起線程,等待隊列不滿信號notFull的喚醒。
- 將元素加入到隊列尾部,同時修改隊列尾部引用last。
- 隊列大小加1。
- 釋放鎖putLock。
- 喚醒notEmpty線程(如果有掛起的出隊列線程),告訴消費者,已經有了新的產品。
對比入隊列,出隊列的阻塞過程大概是這樣的:
- 獲取出隊列的鎖takeLock,檢測隊列大小,如果隊列為空,那么就掛起線程,等待隊列不為空notEmpty的喚醒。
- 將元素從頭部移除,同時修改隊列頭部引用head。
- 隊列大小減1。
- 釋放鎖takeLock。
- 喚醒notFull線程(如果有掛起的入隊列線程),告訴生產者,現在還有空閑的空間。
下面來驗證上面的過程。
入隊列過程(put/offer)
清單2 阻塞的入隊列過程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
清單2 描述的是入隊列的阻塞過程。可以看到和上面描述的入隊列的過程基本相同。但是也有以下幾個問題:
- 如果在入隊列的時候線程被中斷,那么就需要發出一個notFull的信號,表示下一個入隊列的線程能夠被喚醒(如果阻塞的話)。
- 入隊列成功后如果隊列不滿需要補一個notFull的信號。為什么?隊列不滿的時候其它入隊列的阻塞線程難道不知道么?有可能。這是因為為了減少上下文切換的次數,每次喚醒一個線程(不管是入隊列還是出隊列)都是只隨機喚醒一個(notify),而不是喚醒所有的(notifyall())。這會導致其它阻塞的入隊列線程不能夠即使處理隊列不滿的情況。
- 如果隊列不為空并且可能有一個元素的話就喚醒一個出隊列線程。這么做說明之前隊列一定為空,因為在加入隊列之后隊列最多只能為1,那么說明未加入之前是0,那么就可能有被阻塞的出隊列線程,所以就喚醒一個出隊列線程。特別說明的是為什么使用一個臨時變量c,而不用count。這是因為讀取一個count的開銷比讀取一個臨時一個變量大,而此處c又能夠完成確認隊列最多只有一個元素的判斷。首先c默認為-1,如果加入隊列后獲取原子計數器的結果為0,說明之前隊列為空,不可能消費(出隊列),也不可能入隊列,因為此時鎖還在當前線程上,那么加入一個后隊列就不為空了,所以就可以安全的喚醒一個消費(出對立)線程。
- 入隊列的過程允許被中斷,所以總是拋出InterruptedException 異常。
針對第2點,特別補充說明下。本來這屬于鎖機制中條件隊列的范圍,由于沒有應用場景,所以當時沒有提。
前面提高notifyall總是比notify更可靠,因為notify可能丟失通知,為什么不適用notifyall呢?
先解釋下notify丟失通知的問題。
notify丟失通知問題
假設線程A因為某種條件在條件隊列中等待,同時線程B因為另外一種條件在同一個條件隊列中等待,也就是說線程A/B都被同一個Conditon.await()掛起,但是等待的條件不同。現在假設線程B的線程被滿足,線程C執行一個notify操作,此時JVM從Conditon.await()的多個線程(A/B)中隨機挑選一個喚醒,不幸的是喚醒了A。此時A的條件不滿足,于是A繼續掛起。而此時B仍然在傻傻的等待被喚醒的信號。也就是說本來給B的通知卻被一個無關的線程持有了,真正需要通知的線程B卻沒有得到通知,而B仍然在等待一個已經發生過的通知。
如果使用notifyall,則能夠避免此問題。notifyall會喚醒所有正在等待的線程,線程C發出的通知線程A同樣能夠收到,但是由于對于A沒用,所以A繼續掛起,而線程B也收到了此通知,于是線程B正常被喚醒。
既然notifyall能夠解決單一notify丟失通知的問題,那么為什么不總是使用notifyall替換notify呢?
假設有N個線程在條件隊列中等待,調用notifyall會喚醒所有線程,然后這N個線程競爭同一個鎖,最多只有一個線程能夠得到鎖,于是其它線程又回到掛起狀態。這意味每一次喚醒操作可能帶來大量的上下文切換(如果N比較大的話),同時有大量的競爭鎖的請求。這對于頻繁的喚醒操作而言性能上可能是一種災難。
如果說總是只有一個線程被喚醒后能夠拿到鎖,那么為什么不使用notify呢?所以某些情況下使用notify的性能是要高于notifyall的。
如果滿足下面的條件,可以使用單一的notify取代notifyall操作:
相同的等待者,也就是說等待條件變量的線程操作相同,每一個從wait放回后執行相同的邏輯,同時一個條件變量的通知至多只能喚醒一個線程。
也就是說理論上講在put/take中如果使用sinallAll喚醒的話,那么在清單2 中的notFull.singal就是多余的。
出隊列過程(poll/take)
再來看出隊列過程。清單3 描述了出隊列的過程。可以看到這和入隊列是對稱的。從這里可以看到,出隊列使用的是和入隊列不同的鎖,所以入隊列、出隊列這兩個操作才能并行進行。
清單3 阻塞的出隊列過程
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
為什么有異常?
有了入隊列、出隊列的過程后再來回答前面的幾個問題。
為什么總是拋出InterruptedException 異常? 這是很大一塊內容,其實是Java對線程中斷的處理問題,希望能夠在系列文章的最后能夠對此開辟單獨的篇章來談談。
在鎖機制里面也是總遇到,這是因為,Java里面沒有一種直接的方法中斷一個掛起的線程,所以通常情況下等于一個處于WAITING狀態的線程,允許設置一個中斷位,一旦線程檢測到這個中斷位就會從WAITING狀態退出,以一個InterruptedException 的異常返回。所以只要是對一個線程掛起操作都會導致InterruptedException 的可能,比如Thread.sleep()、Thread.join()、Object.wait()。盡管LockSupport.park()不會拋出一個InterruptedException 異常,但是它會將當前線程的的interrupted狀態位置上,而對于Lock/Condition而言,當捕捉到interrupted狀態后就認為線程應該終止任務,所以就拋出了一個InterruptedException 異常。
又見volatile
還有一個不容易理解的問題。為什么Node.item是volatile類型的?
起初我不大明白,因為對于一個進入隊列的Node,它的item是不變,當且僅當出隊列的時候會將頭結點元素的item 設置為null。盡管在remove(o)的時候也是設置為null,但是那時候是加了putLock/takeLock兩個鎖的,所以肯定是沒有問題的。那么問題出在哪?
我們知道,item的值是在put/offer的時候加入的。這時候都是有putLock鎖保證的,也就是說它保證使用putLock鎖的讀取肯定是沒有問題的。那么問題就只可能出在一個不適用putLock卻需要讀取Node.item的地方。
peek操作時獲取頭結點的元素而不移除它。顯然他不會操作尾節點,所以它不需要putLock鎖,也就是說它只有takeLock鎖。清單4 描述了這個過程。
清單4 查詢隊列頭元素過程
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
清單4 描述了peek的過程,最后返回一個非null節點的結果是Node.item。這里讀取了Node的item值,但是整個過程卻是使用了takeLock而非putLock。換句話說putLock對Node.item的操作,peek()線程可能不可見!
清單5 隊列尾部加入元素
private void insert(E x) {
last = last.next = new Node<E>(x);
}
清單5 是入隊列offer/put的一部分,這里關鍵在于last=new Node<E>(x)可能發生重排序。Node構造函數是這樣的:Node(E x) { item = x; }。在這一步里面我們可能得到以下一種情況:
- 構建一個Node對象n;
- 將Node的n賦給last
- 初始化n,設置item=x
在執行步驟2 的時候一個peek線程可能拿到了新的Node n,這時候它讀取item,得到了一個null。顯然這是不可靠的。
對item采用volatile之后,JMM保證對item=x的賦值一定在last=n之前,也就是說last得到的一個是一個已經賦值了的新節點n。這就不會導致讀取空元素的問題的。
出對了poll/take和peek都是使用的takeLock鎖,所以不會導致此問題。
刪除操作和遍歷操作由于同時獲取了takeLock和putLock,所以也不會導致此問題。
總結:當前僅當元素加入隊列時讀取此元素才可能導致不一致的問題。采用volatile正式避免此問題。
附加功能
BlockingQueue有一個額外的功能,允許批量從隊列中異常元素。這個API是:
int drainTo(Collection<? super E> c, int maxElements); 最多從此隊列中移除給定數量的可用元素,并將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c); 移除此隊列中所有可用的元素,并將它們添加到給定 collection 中。
清單6 描述的是最多移除指定數量元素的過程。由于批量操作只需要一次獲取鎖,所以效率會比每次獲取鎖要高。但是需要說明的,需要同時獲取takeLock/putLock兩把鎖,因為當移除完所有元素后這會涉及到尾節點的修改(last節點仍然指向一個已經移走的節點)。
由于迭代操作contains()/remove()/iterator()也是獲取了兩個鎖,所以迭代操作也是線程安全的。
清單6 批量移除操作
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
assert head.item == null;
if (p == null)
last = head;
if (count.getAndAdd(-n) == capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
}
©2009-2014 IMXYLZ
|求賢若渴