JAVA LOCK總體來說關鍵要素主要包括3點:
1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
2.unsafe.park() 和 unsafe.unpark()
3.單向鏈表結構或者說存儲線程的數據結構
第1點主要為了保證鎖的原子性,相當于一個鎖是否正在被使用的標記,并且比較和設置這個標記的操作是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令周期不可中斷,SMP中通過鎖總線支持上訴兩個指令的原子性),這基本等于軟件級別所能達到的最高級別隔離。
第2點主要將未得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是關鍵代碼park的最終實現是和操作系統相關的,比如windows下實現是在os_windows.cpp中,有興趣的同學可以下載jdk源碼查看)。喚醒一個被park()線程主要手段包括以下幾種
1. 其他線程調用以被park()線程為參數的unpark(Thread thread).
2. 其他線程中斷被park()線程,如waiters.peek().interrupt();waiters為存儲線程對象的隊列.
3. 不知原因的返回。
park()方法返回并不會報告到底是上訴哪種返回,所以返回好最好檢查下線程狀態,如
LockSupport.park(); //禁用當前線程

If(Thread.interrupted)
{
//doSomething
}

AbstractQueuedSynchronizer(AQS)對于這點實現得相當巧妙,如下所示

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException
{
final Node node = addWaiter(Node.SHARED);

try
{

for (;;)
{
final Node p = node.predecessor();

if (p == head)
{
int r = tryAcquireShared(arg);

if (r >= 0)
{
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//parkAndCheckInterrupt()會返回park住的線程在被unpark后的線程狀態,如果線程中斷,跳出循環。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}

} catch (RuntimeException ex)
{
cancelAcquire(node);
throw ex;
}
// 只有線程被interrupt后才會走到這里
cancelAcquire(node);
throw new InterruptedException();
}

//在park()住的線程被unpark()后,第一時間返回當前線程是否被打斷

private final boolean parkAndCheckInterrupt()
{
LockSupport.park(this);
return Thread.interrupted();
}
第3點對于一個Synchronizer的實現非常重要,存儲等待線程,并且unlock時喚醒等待線程,這中間有很多工作需要做,喚醒策略,等待線程意外終結處理,公平非公平,可重入不可重入等。
以上簡單說明了下JAVA LOCKS關鍵要素,現在我們來看下java.util.concurrent.locks大致結構

上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現類都持有自己內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現不同的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。
基于AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不一樣而已。
ReentrantLock需要記錄當前線程獲取原子狀態的次數,如果次數為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據著鎖從而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。以下為ReetranLock的FairSync的tryAcquire實現代碼解析。
//公平獲取鎖

protected final boolean tryAcquire(int acquires)
{
final Thread current = Thread.currentThread();
int c = getState();
//如果當前重進入數為0,說明有機會取得鎖

if (c == 0)
{
//如果是第一個等待者,并且設置重進入數成功,那么當前線程獲得鎖
if (isFirst(current) &&

compareAndSetState(0, acquires))
{
setExclusiveOwnerThread(current);
return true;
}
}
//如果當前線程本身就持有鎖,那么疊加重進入數,并且繼續獲得鎖

else if (current == getExclusiveOwnerThread())
{
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//以上條件都不滿足,那么線程進入等待隊列。
return false;
}
Semaphore則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現并發量的控制,Semaphore一開始設置許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現

protected int tryAcquireShared(int acquires)
{
Thread current = Thread.currentThread();

for (;;)
{
Thread first = getFirstQueuedThread();
//如果當前等待隊列的第一個線程不是當前線程,那么就返回-1表示當前線程需要等待
if (first != null && first != current)
return -1;
//如果當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那么先取得semaphore還有幾個許可證,并且減去當前線程需要的許可證得到剩下的值
int available = getState();
int remaining = available - acquires;
//如果remining<0,那么反饋給AQS當前線程需要等待,如果remaining>0,并且設置availble成功設置成剩余數,那么返回剩余值(>0),也就告知AQS當前線程拿到許可,可以繼續執行。
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
CountDownLatch閉鎖則要保持其狀態,在這個狀態到達終止態之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDown是sync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠將這個初始值減到0,從而釋放原子狀態,讓等待的所有線程通過。
//await時候執行,只查看當前需要countDown數量減為0了,如果為0,說明可以繼續執行,否則需要park住,等待countDown次數足夠,并且unpark所有等待線程

public int tryAcquireShared(int acquires)
{
return getState() == 0? 1 : -1;
}

//countDown時候執行,如果當前countDown數量為0,說明沒有線程await,直接返回false而不需要喚醒park住線程,如果不為0,得到剩下需要countDown的數量并且compareAndSet,最終返回剩下的countDown數量是否為0,供AQS判定是否釋放所有await線程。

public boolean tryReleaseShared(int releases)
{

for (;;)
{
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
FutureTask需要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續還是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是檢查狀態,如果是RUNNING狀態那么讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,并且通過unpark喚醒正在等待的線程,返回結果。
//get時待用,只檢查當前任務是否完成或者被Cancel,如果未完成并且沒有被cancel,那么告訴AQS當前線程需要進入等待隊列并且park住

protected int tryAcquireShared(int ignore)
{
return innerIsDone()? 1 : -1;
}

//判定任務是否完成或者被Cancel

boolean innerIsDone()
{
return ranOrCancelled(getState()) && runner == null;
}

//get時調用,對于CANCEL與其他異常進行拋錯

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException
{
if (!tryAcquireSharedNanos(0,nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

//任務的執行線程執行完畢調用(set(V v))

void innerSet(V v)
{

for (;;)
{
int s = getState();
//如果線程任務已經執行完畢,那么直接返回(多線程執行任務?)
if (s == RAN)
return;
//如果被CANCEL了,那么釋放等待線程,并且會拋錯

if (s == CANCELLED)
{
releaseShared(0);
return;
}
//如果成功設定任務狀態為已完成,那么設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及后續清理工作(一般由FutrueTask的子類實現)

if (compareAndSetState(s, RAN))
{
result = v;
releaseShared(0);
done();
return;
}
}
}
以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪里呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()和setState()兩個方法(protected)。這樣就為上述狀態解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態。
AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現的,比如線程獨占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared為需要共享形式獲取的synchronizer實現。
ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現)Semaphore內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch相似,因為公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。CountDownLatch內部類Sync實現了tryAcquireShared和tryReleasedShared。FutureTask內部類Sync也實現了tryAcquireShared和tryReleasedShared。
其實使用過一些JAVA synchronizer的之后,然后結合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實我們自己也可以構造synchronizer。如下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。

public class FIFOMutex
{
private AtomicBoolean locked=new AtomicBoolean(false);
private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();

public void lock()
{
boolean wasInterrupted=false;
Thread current=Thread.currentThread();
waiters.add(current);
//如果waiters的第一個等待者不為當前線程,或者當前locked的狀態為被占用(true)
//那么park住當前線程

while(waiters.peek()!=current||!locked.compareAndSet(false, true))
{
LockSupport.park();
//當線程被unpark時,第一時間檢查當前線程是否被interrupted

if(Thread.interrupted())
{
wasInterrupted=true;
}
}
//得到鎖后,從等待隊列移除當前線程,如果,并且如果當前線程已經被interrupted,
//那么再interrupt一下以便供外部響應。
waiters.remove();

if(wasInterrupted)
{
current.interrupt();
}
}
//unlock邏輯相對簡單,設定當前鎖為空閑狀態,并且將等待隊列中
//的第一個等待線程喚醒

public void unlock()
{
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
總結,JAVA lock機制對于整個java concurrent包的成員意義重大,了解這個機制對于使用java并發類有著很多的幫助,文章中可能存在著各種錯誤,請各位多多諒解并且能夠提出來,謝謝。
文章參考:JDK 1.6 source
java 并發編程實踐
JDK 1.6 API 文檔
posted on 2010-09-30 12:05
BucketLI 閱讀(13158)
評論(2) 編輯 收藏