Semaphore 是一個計(jì)數(shù)信號量。從概念上講,信號量維護(hù)了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire()
,然后再獲取該許可。每個 release()
添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實(shí)際的許可對象,Semaphore
只對可用許可的號碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動。
說白了,Semaphore是一個計(jì)數(shù)器,在計(jì)數(shù)器不為0的時候?qū)€程就放行,一旦達(dá)到0,那么所有請求資源的新線程都會被阻塞,包括增加請求到許可的線程,也就是說Semaphore不是可重入的。每一次請求一個許可都會導(dǎo)致計(jì)數(shù)器減少1,同樣每次釋放一個許可都會導(dǎo)致計(jì)數(shù)器增加1,一旦達(dá)到了0,新的許可請求線程將被掛起。
緩存池整好使用此思想來實(shí)現(xiàn)的,比如鏈接池、對象池等。
清單1 對象池
package xylz.study.concurrency.lock;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ObjectCache<T> {
public interface ObjectFactory<T> {
T makeObject();
}
class Node {
T obj;
Node next;
}
final int capacity;
final ObjectFactory<T> factory;
final Lock lock = new ReentrantLock();
final Semaphore semaphore;
private Node head;
private Node tail;
public ObjectCache(int capacity, ObjectFactory<T> factory) {
this.capacity = capacity;
this.factory = factory;
this.semaphore = new Semaphore(this.capacity);
this.head = null;
this.tail = null;
}
public T getObject() throws InterruptedException {
semaphore.acquire();
return getNextObject();
}
private T getNextObject() {
lock.lock();
try {
if (head == null) {
return factory.makeObject();
} else {
Node ret = head;
head = head.next;
if (head == null) tail = null;
ret.next = null;//help GC
return ret.obj;
}
} finally {
lock.unlock();
}
}
private void returnObjectToPool(T t) {
lock.lock();
try {
Node node = new Node();
node.obj = t;
if (tail == null) {
head = tail = node;
} else {
tail.next = node;
tail = node;
}
} finally {
lock.unlock();
}
}
public void returnObject(T t) {
returnObjectToPool(t);
semaphore.release();
}
}
清單1描述了一個基于信號量Semaphore的對象池實(shí)現(xiàn)。此對象池最多支持capacity個對象,這在構(gòu)造函數(shù)中傳入。對象池有一個基于FIFO的隊(duì)列,每次從對象池的頭結(jié)點(diǎn)開始取對象,如果頭結(jié)點(diǎn)為空就直接構(gòu)造一個新的對象返回。否則將頭結(jié)點(diǎn)對象取出,并且頭結(jié)點(diǎn)往后移動。特別要說明的如果對象的個數(shù)用完了,那么新的線程將被阻塞,直到有對象被返回回來。返還對象時將對象加入FIFO的尾節(jié)點(diǎn)并且釋放一個空閑的信號量,表示對象池中增加一個可用對象。
實(shí)際上對象池、線程池的原理大致上就是這樣的,只不過真正的對象池、線程池要處理比較復(fù)雜的邏輯,所以實(shí)現(xiàn)起來還需要做很多的工作,例如超時機(jī)制,自動回收機(jī)制,對象的有效期等等問題。
這里特別說明的是信號量只是在信號不夠的時候掛起線程,但是并不能保證信號量足夠的時候獲取對象和返還對象是線程安全的,所以在清單1中仍然需要鎖Lock來保證并發(fā)的正確性。
將信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進(jìn)制信號量,因?yàn)樗荒苡袃煞N狀態(tài):一個可用的許可,或零個可用的許可。按此方式使用時,二進(jìn)制信號量具有某種屬性(與很多 Lock
實(shí)現(xiàn)不同),即可以由線程釋放“鎖”,而不是由所有者(因?yàn)樾盘柫繘]有所有權(quán)的概念)。在某些專門的上下文(如死鎖恢復(fù))中這會很有用。
上面這段話的意思是說當(dāng)某個線程A持有信號量數(shù)為1的信號量時,其它線程只能等待此線程釋放資源才能繼續(xù),這時候持有信號量的線程A就相當(dāng)于持有了“鎖”,其它線程的繼續(xù)就需要這把鎖,于是線程A的釋放才能決定其它線程的運(yùn)行,相當(dāng)于扮演了“鎖”的角色。
另外同公平鎖非公平鎖一樣,信號量也有公平性。如果一個信號量是公平的表示線程在獲取信號量時按FIFO的順序得到許可,也就是按照請求的順序得到釋放。這里特別說明的是:所謂請求的順序是指在請求信號量而進(jìn)入FIFO隊(duì)列的順序,有可能某個線程先請求信號而后進(jìn)去請求隊(duì)列,那么次線程獲取信號量的順序就會晚于其后請求但是先進(jìn)入請求隊(duì)列的線程。這個在公平鎖和非公平鎖中談過很多。
除了acquire以外,Semaphore還有幾種類似的acquire方法,這些方法可以更好的處理中斷和超時或者異步等特性,可以參考JDK API。
按照同樣的學(xué)習(xí)原則,下面對主要的實(shí)現(xiàn)進(jìn)行分析。Semaphore的acquire方法實(shí)際上訪問的是AQS的acquireSharedInterruptibly(arg)方法。這個可以參考CountDownLatch一節(jié)或者AQS一節(jié)。
所以Semaphore的await實(shí)現(xiàn)也是比較簡單的。與CountDownLatch不同的是,Semaphore區(qū)分公平信號和非公平信號。
清單2 公平信號獲取方法
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;;) {
Thread first = getFirstQueuedThread();
if (first != null && first != current)
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
清單3 非公平信號獲取方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
對比清單2和清單3可以看到,公平信號和非公平信號在于第一次嘗試能否獲取信號時,公平信號量總是將當(dāng)前線程進(jìn)入AQS的CLH隊(duì)列進(jìn)行排隊(duì)(因?yàn)榈谝淮螄L試時隊(duì)列的頭結(jié)點(diǎn)線程很有可能不是當(dāng)前線程,當(dāng)然不排除同一個線程第二次進(jìn)入信號量),從而根據(jù)AQS的CLH隊(duì)列的順序FIFO依次獲取信號量;而對于非公平信號量,第一次立即嘗試能否拿到信號量,一旦信號量的剩余數(shù)available大于請求數(shù)(acquires通常為1),那么線程就立即得到了釋放,而不需要進(jìn)行AQS隊(duì)列進(jìn)行排隊(duì)。只有remaining<0的時候(也就是信號量不夠的時候)才會進(jìn)入AQS隊(duì)列。
所以非公平信號量的吞吐量總是要比公平信號量的吞吐量要大,但是需要強(qiáng)調(diào)的是非公平信號量和非公平鎖一樣存在“饑渴死”的現(xiàn)象,也就是說活躍線程可能總是拿到信號量,而非活躍線程可能難以拿到信號量。而對于公平信號量由于總是靠請求的線程的順序來獲取信號量,所以不存在此問題。
參考資料:
- 信號量(Semaphore)在生產(chǎn)者和消費(fèi)者模式的使用
- What is mutex and semaphore in Java ? What is the main difference ?
- 關(guān)于 java.util.concurrent 您不知道的 5 件事,第 2 部分
- Semahores
©2009-2014 IMXYLZ
|求賢若渴