JAVA LOCK總體來說關(guān)鍵要素主要包括3點:
1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
2.unsafe.park() 和 unsafe.unpark()
3.單向鏈表結(jié)構(gòu)或者說存儲線程的數(shù)據(jù)結(jié)構(gòu)
第1點主要為了保證鎖的原子性,相當于一個鎖是否正在被使用的標記,并且比較和設(shè)置這個標記的操作是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令周期不可中斷,SMP中通過鎖總線支持上訴兩個指令的原子性),這基本等于軟件級別所能達到的最高級別隔離。
第2點主要將未得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(xiàn)(這幾個native方法的實現(xiàn)代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是關(guān)鍵代碼park的最終實現(xiàn)是和操作系統(tǒng)相關(guān)的,比如windows下實現(xiàn)是在os_windows.cpp中,有興趣的同學(xué)可以下載jdk源碼查看)。喚醒一個被park()線程主要手段包括以下幾種
1. 其他線程調(diào)用以被park()線程為參數(shù)的unpark(Thread thread).
2. 其他線程中斷被park()線程,如waiters.peek().interrupt();waiters為存儲線程對象的隊列.
3. 不知原因的返回。
park()方法返回并不會報告到底是上訴哪種返回,所以返回好最好檢查下線程狀態(tài),如
LockSupport.park(); //禁用當前線程

If(Thread.interrupted)
{
//doSomething
}

AbstractQueuedSynchronizer(AQS)對于這點實現(xiàn)得相當巧妙,如下所示

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException
{
final Node node = addWaiter(Node.SHARED);

try
{

for (;;)
{
final Node p = node.predecessor();

if (p == head)
{
int r = tryAcquireShared(arg);

if (r >= 0)
{
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//parkAndCheckInterrupt()會返回park住的線程在被unpark后的線程狀態(tài),如果線程中斷,跳出循環(huán)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}

} catch (RuntimeException ex)
{
cancelAcquire(node);
throw ex;
}
// 只有線程被interrupt后才會走到這里
cancelAcquire(node);
throw new InterruptedException();
}

//在park()住的線程被unpark()后,第一時間返回當前線程是否被打斷

private final boolean parkAndCheckInterrupt()
{
LockSupport.park(this);
return Thread.interrupted();
}
第3點對于一個Synchronizer的實現(xiàn)非常重要,存儲等待線程,并且unlock時喚醒等待線程,這中間有很多工作需要做,喚醒策略,等待線程意外終結(jié)處理,公平非公平,可重入不可重入等。
以上簡單說明了下JAVA LOCKS關(guān)鍵要素,現(xiàn)在我們來看下java.util.concurrent.locks大致結(jié)構(gòu)

上圖中,LOCK的實現(xiàn)類其實都是構(gòu)建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現(xiàn)類都持有自己內(nèi)部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現(xiàn)不同的Sync呢?這和每種Lock用途相關(guān)。另外還有AQS的State機制。
基于AQS構(gòu)建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態(tài)的獲取和釋放,只是條件不一樣而已。
ReentrantLock需要記錄當前線程獲取原子狀態(tài)的次數(shù),如果次數(shù)為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據(jù)著鎖從而需要等待),如果次數(shù)大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數(shù)變成0而釋放原子狀態(tài)。以下為ReetranLock的FairSync的tryAcquire實現(xiàn)代碼解析。
//公平獲取鎖

protected final boolean tryAcquire(int acquires)
{
final Thread current = Thread.currentThread();
int c = getState();
//如果當前重進入數(shù)為0,說明有機會取得鎖

if (c == 0)
{
//如果是第一個等待者,并且設(shè)置重進入數(shù)成功,那么當前線程獲得鎖
if (isFirst(current) &&

compareAndSetState(0, acquires))
{
setExclusiveOwnerThread(current);
return true;
}
}
//如果當前線程本身就持有鎖,那么疊加重進入數(shù),并且繼續(xù)獲得鎖

else if (current == getExclusiveOwnerThread())
{
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//以上條件都不滿足,那么線程進入等待隊列。
return false;
}
Semaphore則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現(xiàn)并發(fā)量的控制,Semaphore一開始設(shè)置許可數(shù)為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現(xiàn)

protected int tryAcquireShared(int acquires)
{
Thread current = Thread.currentThread();

for (;;)
{
Thread first = getFirstQueuedThread();
//如果當前等待隊列的第一個線程不是當前線程,那么就返回-1表示當前線程需要等待
if (first != null && first != current)
return -1;
//如果當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那么先取得semaphore還有幾個許可證,并且減去當前線程需要的許可證得到剩下的值
int available = getState();
int remaining = available - acquires;
//如果remining<0,那么反饋給AQS當前線程需要等待,如果remaining>0,并且設(shè)置availble成功設(shè)置成剩余數(shù),那么返回剩余值(>0),也就告知AQS當前線程拿到許可,可以繼續(xù)執(zhí)行。
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
CountDownLatch閉鎖則要保持其狀態(tài),在這個狀態(tài)到達終止態(tài)之前,所有線程都會被park住,閉鎖可以設(shè)定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDown是sync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠?qū)⑦@個初始值減到0,從而釋放原子狀態(tài),讓等待的所有線程通過。
//await時候執(zhí)行,只查看當前需要countDown數(shù)量減為0了,如果為0,說明可以繼續(xù)執(zhí)行,否則需要park住,等待countDown次數(shù)足夠,并且unpark所有等待線程

public int tryAcquireShared(int acquires)
{
return getState() == 0? 1 : -1;
}

//countDown時候執(zhí)行,如果當前countDown數(shù)量為0,說明沒有線程await,直接返回false而不需要喚醒park住線程,如果不為0,得到剩下需要countDown的數(shù)量并且compareAndSet,最終返回剩下的countDown數(shù)量是否為0,供AQS判定是否釋放所有await線程。

public boolean tryReleaseShared(int releases)
{

for (;;)
{
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
FutureTask需要記錄任務(wù)的執(zhí)行狀態(tài),當調(diào)用其實例的get方法時,內(nèi)部類Sync會去調(diào)用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調(diào)用Sync實現(xiàn)的tryAcquireShared()方法,即讓具體實現(xiàn)類決定是否讓當前線程繼續(xù)還是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是檢查狀態(tài),如果是RUNNING狀態(tài)那么讓當前線程park。而跑任務(wù)的線程會在任務(wù)結(jié)束時調(diào)用FutureTask 實例的set方法(與等待線程持相同的實例),設(shè)定執(zhí)行結(jié)果,并且通過unpark喚醒正在等待的線程,返回結(jié)果。
//get時待用,只檢查當前任務(wù)是否完成或者被Cancel,如果未完成并且沒有被cancel,那么告訴AQS當前線程需要進入等待隊列并且park住

protected int tryAcquireShared(int ignore)
{
return innerIsDone()? 1 : -1;
}

//判定任務(wù)是否完成或者被Cancel

boolean innerIsDone()
{
return ranOrCancelled(getState()) && runner == null;
}

//get時調(diào)用,對于CANCEL與其他異常進行拋錯

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException
{
if (!tryAcquireSharedNanos(0,nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

//任務(wù)的執(zhí)行線程執(zhí)行完畢調(diào)用(set(V v))

void innerSet(V v)
{

for (;;)
{
int s = getState();
//如果線程任務(wù)已經(jīng)執(zhí)行完畢,那么直接返回(多線程執(zhí)行任務(wù)?)
if (s == RAN)
return;
//如果被CANCEL了,那么釋放等待線程,并且會拋錯

if (s == CANCELLED)
{
releaseShared(0);
return;
}
//如果成功設(shè)定任務(wù)狀態(tài)為已完成,那么設(shè)定結(jié)果,unpark等待線程(調(diào)用get()方法而阻塞的線程),以及后續(xù)清理工作(一般由FutrueTask的子類實現(xiàn))

if (compareAndSetState(s, RAN))
{
result = v;
releaseShared(0);
done();
return;
}
}
}
以上4個AQS的使用是比較典型,然而有個問題就是這些狀態(tài)存在哪里呢?并且是可以計數(shù)的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()和setState()兩個方法(protected)。這樣就為上述狀態(tài)解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數(shù),Semaphore可以用這個state存儲許可數(shù),CountDownLatch則可以存儲需要被countDown的次數(shù),而Future則可以存儲當前任務(wù)的執(zhí)行狀態(tài)(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態(tài)。
AQS留給實現(xiàn)者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現(xiàn)的,比如線程獨占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared為需要共享形式獲取的synchronizer實現(xiàn)。
ReentrantLock內(nèi)部Sync類實現(xiàn)的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內(nèi)部類FairSync和NonfairSync實現(xiàn))Semaphore內(nèi)部類Sync則實現(xiàn)了tryAcquireShared和tryReleasedShared(與CountDownLatch相似,因為公平性問題,tryAcquireShared由其內(nèi)部類FairSync和NonfairSync實現(xiàn))。CountDownLatch內(nèi)部類Sync實現(xiàn)了tryAcquireShared和tryReleasedShared。FutureTask內(nèi)部類Sync也實現(xiàn)了tryAcquireShared和tryReleasedShared。
其實使用過一些JAVA synchronizer的之后,然后結(jié)合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態(tài)和釋放原子狀態(tài),其實我們自己也可以構(gòu)造synchronizer。如下是一個LOCK API的一個例子,實現(xiàn)了一個先入先出的互斥鎖。

public class FIFOMutex
{
private AtomicBoolean locked=new AtomicBoolean(false);
private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();

public void lock()
{
boolean wasInterrupted=false;
Thread current=Thread.currentThread();
waiters.add(current);
//如果waiters的第一個等待者不為當前線程,或者當前l(fā)ocked的狀態(tài)為被占用(true)
//那么park住當前線程

while(waiters.peek()!=current||!locked.compareAndSet(false, true))
{
LockSupport.park();
//當線程被unpark時,第一時間檢查當前線程是否被interrupted

if(Thread.interrupted())
{
wasInterrupted=true;
}
}
//得到鎖后,從等待隊列移除當前線程,如果,并且如果當前線程已經(jīng)被interrupted,
//那么再interrupt一下以便供外部響應(yīng)。
waiters.remove();

if(wasInterrupted)
{
current.interrupt();
}
}
//unlock邏輯相對簡單,設(shè)定當前鎖為空閑狀態(tài),并且將等待隊列中
//的第一個等待線程喚醒

public void unlock()
{
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
總結(jié),JAVA lock機制對于整個java concurrent包的成員意義重大,了解這個機制對于使用java并發(fā)類有著很多的幫助,文章中可能存在著各種錯誤,請各位多多諒解并且能夠提出來,謝謝。
文章參考:JDK 1.6 source
java 并發(fā)編程實踐
JDK 1.6 API 文檔
posted on 2010-09-30 12:05
BucketLI 閱讀(13156)
評論(2) 編輯 收藏