[本文是我對Java Concurrency In Practice C14.1的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]?
java類庫中包含許多狀態依賴的類: 其中的某些方法只有滿足特定的前置條件才能繼續, 比如BlockingQueue的take方法, 只有隊列不為空時take方法才能返回.
狀態依賴的操作一般如下:
void blockingAction() {
申請鎖
while(前置條件不滿足) {
釋放鎖
重新獲取鎖
}
執行操作
釋放鎖
}
?
BaseBoundedBuffer是一個普通抽象類, 它對put和take方法的實現是有缺陷的: 沒有在put方法執行前判斷緩沖區是否已滿, 也沒有在take方法執行之前判斷緩沖區是否為空. 其代碼如下:
public abstract class BaseBoundedBuffer<V> {
private final V[] buf;
private int tail;
private int head;
private int count;
@SuppressWarnings("unchecked")
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length) {
tail = 0;
}
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length) {
head = 0;
}
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}?
本文將使用多種方式為doPut和doTake方法增加前置判斷.
?
通知調用方前置條件判斷失敗
GrumpyBoundedBuffer是BaseBoundedBuffer的子類, 并向外提供put和take方法. 調用put方法時, 如果緩沖區已滿, 將拋出BufferFullException異常. 調用take方法時, 如果緩沖區為空, 將拋出BufferEmptyException異常:
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}?
GrumpyBoundedBuffer實現起來很簡單, 但是這樣的類很難使用: 調用方需要捕獲并處理異常. 例如調用take方法時需要這樣:
public void invocaton() {
while (true) {
try {
V item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
// 當拋出BufferEmptyException異常時, 說明buffer為空. 調用方睡眠一段時間后再進行嘗試
Thread.sleep(SLEEP_GRANULARITY);
}
}
}?
這樣的調用方式是令人難以忍受的, 而且sleep的時間SLEEP_GRANULARITY不好確定: 如果設定的太短, 將白白消耗CPU資源. 如果設定的太長, 則程序的響應性不好, 也有可能錯過前置條件滿足的時刻.
?
內部處理重試邏輯
既然由調用方處理異常并重試是不可取的, 那么SleepyBoundedBuffer類改為在內部處理重試邏輯:
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
private static final long SLEEP_GRANULARITY = 10;
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
// 釋放鎖后sleep一段時間再進行重試
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
// 釋放鎖后sleep一段時間再進行重試
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
SleepyBoundedBuffer相比于GrumpyBoundedBuffer具有很大的進步: 不需要在調用方進行重試. SleepyBoundedBuffer易于使用, 但是sleep的時間仍然不好確定, 需要在響應性和CPU消耗間權衡.?
?
前置條件滿足時喚醒線程
BoundedBuffer試著解決SleepyBoundedBuffer中的問題: 當前置條件不滿足時將線程掛起, 并等待前置條件滿足時由其他線程喚醒, 這樣就不需要權衡sleep的時間了:
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
public BoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws InterruptedException {
// 當緩沖區已滿時將線程掛起, 等待其他線程喚醒
// 不給喚醒之后再次判斷緩沖區是否已滿
while (isFull()) {
wait();
}
doPut(v);
// 操作完成后喚醒其他線程
notifyAll();
}
public synchronized V take() throws InterruptedException {
// 當緩沖區為空時將線程掛起, 等待其他線程喚醒
// 被喚醒之后再次判斷緩沖區是否為空
while (isEmpty()) {
wait();
}
V v = doTake();
// 操作完成后喚醒其他線程
notifyAll();
return v;
}
}?
BoundedBuffer已經比較完美了, 相比于SleepyBoundedBuffer, 其具有更好的響應性, 更高的CPU效率以及更少的上下文切換.
?