<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-7  評論-23  文章-0  trackbacks-0
    JAVA LOCK總體來說關(guān)鍵要素主要包括3:
    1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
    2.unsafe.park() unsafe.unpark()
    3.單向鏈表結(jié)構(gòu)或者說存儲線程的數(shù)據(jù)結(jié)構(gòu)

    1主要為了保證鎖的原子性,相當于一個鎖是否正在被使用的標記,并且比較和設(shè)置這個標記的操作是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令周期不可中斷,SMP中通過鎖總線支持上訴兩個指令的原子性),這基本等于軟件級別所能達到的最高級別隔離。

    2主要將未得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(xiàn)(這幾個native方法的實現(xiàn)代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是關(guān)鍵代碼park的最終實現(xiàn)是和操作系統(tǒng)相關(guān)的,比如windows下實現(xiàn)是在os_windows.cpp中,有興趣的同學(xué)可以下載jdk源碼查看)。喚醒一個被park()線程主要手段包括以下幾種
    1.       其他線程調(diào)用以被park()線程為參數(shù)的unpark(Thread thread).
    2.       其他線程中斷被park()線程,waiters.peek().interrupt();waiters為存儲線程對象的隊列.
    3.       不知原因的返回。

    park()方法返回并不會報告到底是上訴哪種返回,所以返回好最好檢查下線程狀態(tài),如

    LockSupport.park();  //禁用當前線程
    If(Thread.interrupted){
       
    //doSomething
    }

    AbstractQueuedSynchronizer(AQS)對于這點實現(xiàn)得相當巧妙,如下所示
    private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
        
    final Node node = addWaiter(Node.SHARED);
        
    try {
             
    for (;;) {
                 
    final Node p = node.predecessor();
                 
    if (p == head) {
                     
    int r = tryAcquireShared(arg);
                     
    if (r >= 0{
                         setHeadAndPropagate(node, r);
                         p.next 
    = null// help GC
                         return;
                     }

                 }

                 
    //parkAndCheckInterrupt()會返回park住的線程在被unpark后的線程狀態(tài),如果線程中斷,跳出循環(huán)。
                 if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                     
    break;
             }

         }
     catch (RuntimeException ex) {
              cancelAcquire(node);
              
    throw ex;
         }

        
         
    // 只有線程被interrupt后才會走到這里
         cancelAcquire(node);
         
    throw new InterruptedException();
    }


    //在park()住的線程被unpark()后,第一時間返回當前線程是否被打斷
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(
    this);
        
    return Thread.interrupted();
    }
    3點對于一個Synchronizer的實現(xiàn)非常重要,存儲等待線程,并且unlock時喚醒等待線程,這中間有很多工作需要做,喚醒策略,等待線程意外終結(jié)處理,公平非公平,可重入不可重入等。


    以上簡單說明了下
    JAVA LOCKS關(guān)鍵要素,現(xiàn)在我們來看下java.util.concurrent.locks大致結(jié)構(gòu)

    上圖中,LOCK的實現(xiàn)類其實都是構(gòu)建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現(xiàn)類都持有自己內(nèi)部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現(xiàn)不同的Sync呢?這和每種Lock用途相關(guān)。另外還有AQSState機制。

    基于AQS構(gòu)建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態(tài)的獲取和釋放,只是條件不一樣而已。

    ReentrantLock需要記錄當前線程獲取原子狀態(tài)的次數(shù),如果次數(shù)為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據(jù)著鎖從而需要等待),如果次數(shù)大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數(shù)變成0而釋放原子狀態(tài)。以下為ReetranLock的FairSync的tryAcquire實現(xiàn)代碼解析。

    //公平獲取鎖
    protected final boolean tryAcquire(int acquires) {
        
    final Thread current = Thread.currentThread();
        
    int c = getState();
        
    //如果當前重進入數(shù)為0,說明有機會取得鎖
        if (c == 0{
            
    //如果是第一個等待者,并且設(shè)置重進入數(shù)成功,那么當前線程獲得鎖
            if (isFirst(current) &&
                compareAndSetState(
    0, acquires)) {
                setExclusiveOwnerThread(current);
                
    return true;
            }

        }

        
    //如果當前線程本身就持有鎖,那么疊加重進入數(shù),并且繼續(xù)獲得鎖
        else if (current == getExclusiveOwnerThread()) {
            
    int nextc = c + acquires;
            
    if (nextc < 0)
                
    throw new Error("Maximum lock count exceeded");
            setState(nextc);
            
    return true;
         }

         
    //以上條件都不滿足,那么線程進入等待隊列。
         return false;
    }

    Semaphore則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現(xiàn)并發(fā)量的控制,Semaphore一開始設(shè)置許可數(shù)為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現(xiàn)

    protected int tryAcquireShared(int acquires) {
        Thread current 
    = Thread.currentThread();
        
    for (;;) {
             Thread first 
    = getFirstQueuedThread();
             
    //如果當前等待隊列的第一個線程不是當前線程,那么就返回-1表示當前線程需要等待
             if (first != null && first != current)
                  
    return -1;
             
    //如果當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那么先取得semaphore還有幾個許可證,并且減去當前線程需要的許可證得到剩下的值
             int available = getState();
             
    int remaining = available - acquires;
             
    //如果remining<0,那么反饋給AQS當前線程需要等待,如果remaining>0,并且設(shè)置availble成功設(shè)置成剩余數(shù),那么返回剩余值(>0),也就告知AQS當前線程拿到許可,可以繼續(xù)執(zhí)行。
             if (remaining < 0 ||compareAndSetState(available, remaining))
                 
    return remaining;
        }

    }

    CountDownLatch閉鎖則要保持其狀態(tài),在這個狀態(tài)到達終止態(tài)之前,所有線程都會被park住,閉鎖可以設(shè)定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDownsync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠?qū)⑦@個初始值減到0,從而釋放原子狀態(tài),讓等待的所有線程通過。

    //await時候執(zhí)行,只查看當前需要countDown數(shù)量減為0了,如果為0,說明可以繼續(xù)執(zhí)行,否則需要park住,等待countDown次數(shù)足夠,并且unpark所有等待線程
    public int tryAcquireShared(int acquires) {
         
    return getState() == 0? 1 : -1;
    }


    //countDown時候執(zhí)行,如果當前countDown數(shù)量為0,說明沒有線程await,直接返回false而不需要喚醒park住線程,如果不為0,得到剩下需要countDown的數(shù)量并且compareAndSet,最終返回剩下的countDown數(shù)量是否為0,供AQS判定是否釋放所有await線程。
    public boolean tryReleaseShared(int releases) {
        
    for (;;) {
             
    int c = getState();
             
    if (c == 0)
                 
    return false;
             
    int nextc = c-1;
             
    if (compareAndSetState(c, nextc))
                 
    return nextc == 0;
       }

    }

    FutureTask需要記錄任務(wù)的執(zhí)行狀態(tài),當調(diào)用其實例的get方法時,內(nèi)部類Sync會去調(diào)用AQSacquireSharedInterruptibly()方法,而這個方法會反向調(diào)用Sync實現(xiàn)的tryAcquireShared()方法,即讓具體實現(xiàn)類決定是否讓當前線程繼續(xù)還是park,FutureTasktryAcquireShared方法所做的唯一事情就是檢查狀態(tài),如果是RUNNING狀態(tài)那么讓當前線程park。而跑任務(wù)的線程會在任務(wù)結(jié)束時調(diào)用FutureTask 實例的set方法(與等待線程持相同的實例),設(shè)定執(zhí)行結(jié)果,并且通過unpark喚醒正在等待的線程,返回結(jié)果。

    //get時待用,只檢查當前任務(wù)是否完成或者被Cancel,如果未完成并且沒有被cancel,那么告訴AQS當前線程需要進入等待隊列并且park住
    protected int tryAcquireShared(int ignore) {
         
    return innerIsDone()? 1 : -1;
    }


    //判定任務(wù)是否完成或者被Cancel
    boolean innerIsDone() {
        
    return ranOrCancelled(getState()) &&    runner == null;
    }


    //get時調(diào)用,對于CANCEL與其他異常進行拋錯
    V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
        
    if (!tryAcquireSharedNanos(0,nanosTimeout))
            
    throw new TimeoutException();
        
    if (getState() == CANCELLED)
            
    throw new CancellationException();
        
    if (exception != null)
            
    throw new ExecutionException(exception);
        
    return result;
    }


    //任務(wù)的執(zhí)行線程執(zhí)行完畢調(diào)用(set(V v))
    void innerSet(V v) {
         
    for (;;) {
            
    int s = getState();
            
    //如果線程任務(wù)已經(jīng)執(zhí)行完畢,那么直接返回(多線程執(zhí)行任務(wù)?)
            if (s == RAN)
                
    return;
            
    //如果被CANCEL了,那么釋放等待線程,并且會拋錯
            if (s == CANCELLED) {
                releaseShared(
    0);
                
    return;
            }

            
    //如果成功設(shè)定任務(wù)狀態(tài)為已完成,那么設(shè)定結(jié)果,unpark等待線程(調(diào)用get()方法而阻塞的線程),以及后續(xù)清理工作(一般由FutrueTask的子類實現(xiàn))
            if (compareAndSetState(s, RAN)) {
                result 
    = v;
                releaseShared(
    0);
                done();
                
    return;
            }

         }

    }

    以上4AQS的使用是比較典型,然而有個問題就是這些狀態(tài)存在哪里呢?并且是可以計數(shù)的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()setState()兩個方法(protected)。這樣就為上述狀態(tài)解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數(shù),Semaphore可以用這個state存儲許可數(shù),CountDownLatch則可以存儲需要被countDown的次數(shù),而Future則可以存儲當前任務(wù)的執(zhí)行狀態(tài)(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態(tài)。

    AQS留給實現(xiàn)者的方法主要有5個方法,其中tryAcquire,tryReleaseisHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現(xiàn)的,比如線程獨占ReetranLockSync,而tryAcquireSharedtryReleasedShared為需要共享形式獲取的synchronizer實現(xiàn)。

    ReentrantLock內(nèi)部Sync類實現(xiàn)的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內(nèi)部類FairSyncNonfairSync實現(xiàn))Semaphore內(nèi)部類Sync則實現(xiàn)了tryAcquireSharedtryReleasedShared(CountDownLatch相似,因為公平性問題,tryAcquireShared由其內(nèi)部類FairSyncNonfairSync實現(xiàn))CountDownLatch內(nèi)部類Sync實現(xiàn)了tryAcquireSharedtryReleasedSharedFutureTask內(nèi)部類Sync也實現(xiàn)了tryAcquireSharedtryReleasedShared

    其實使用過一些JAVA synchronizer的之后,然后結(jié)合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態(tài)和釋放原子狀態(tài),其實我們自己也可以構(gòu)造synchronizer。如下是一個LOCK API的一個例子,實現(xiàn)了一個先入先出的互斥鎖。

    public class FIFOMutex {
        
    private AtomicBoolean locked=new AtomicBoolean(false);
        
    private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
        
        
    public void lock(){
            
    boolean wasInterrupted=false;
            Thread current
    =Thread.currentThread();
            waiters.add(current);
            
            
    //如果waiters的第一個等待者不為當前線程,或者當前l(fā)ocked的狀態(tài)為被占用(true)
            
    //那么park住當前線程
            while(waiters.peek()!=current||!locked.compareAndSet(falsetrue)){
                LockSupport.park();
                
                
    //當線程被unpark時,第一時間檢查當前線程是否被interrupted
                if(Thread.interrupted()){
                    wasInterrupted
    =true;
                }

            }

            
            
    //得到鎖后,從等待隊列移除當前線程,如果,并且如果當前線程已經(jīng)被interrupted,
            
    //那么再interrupt一下以便供外部響應(yīng)。
            waiters.remove();
            
    if(wasInterrupted){
                current.interrupt();
            }

        }

        
        
    //unlock邏輯相對簡單,設(shè)定當前鎖為空閑狀態(tài),并且將等待隊列中
        
    //的第一個等待線程喚醒
        public void unlock(){
            locked.set(
    false);
            LockSupport.unpark(waiters.peek());
        }

    }

    總結(jié),JAVA lock機制對于整個java concurrent包的成員意義重大,了解這個機制對于使用java并發(fā)類有著很多的幫助,文章中可能存在著各種錯誤,請各位多多諒解并且能夠提出來,謝謝。

    文章參考:JDK 1.6 source
                        java 并發(fā)編程實踐
                        JDK 1.6 API 文檔

     

    posted on 2010-09-30 12:05 BucketLI 閱讀(13156) 評論(2)  編輯  收藏

    評論:
    # re: JAVA LOCK代碼淺析 2012-10-30 10:32 | mengmeng.zhangmm
    非常清晰的講解了Concurrent的QAS機制和部分并發(fā)工具類實現(xiàn)。
    謝謝樓主的博文。  回復(fù)  更多評論
      
    # re: JAVA LOCK代碼淺析 2014-10-28 10:43 | zuidaima

    只有注冊用戶登錄后才能發(fā)表評論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 亚洲av乱码一区二区三区| 波多野结衣中文一区二区免费| 久久香蕉国产线看免费| 成人精品一区二区三区不卡免费看| 一区二区三区免费看| eeuss影院免费直达入口| eeuss影院www天堂免费| 丝袜足液精子免费视频| 永久免费av无码入口国语片| 亚洲免费在线播放| 五月亭亭免费高清在线| 欧美大尺寸SUV免费| 女人被男人躁的女爽免费视频| 国产精品酒店视频免费看| 亚洲?V乱码久久精品蜜桃| 亚洲性日韩精品国产一区二区| 亚洲中文字幕在线第六区| 亚洲AV无码一区东京热久久| 亚洲精品韩国美女在线| 亚洲成a人片在线看| 曰批免费视频播放在线看片二| 国产免费人成视频尤勿视频| 久久综合九色综合97免费下载| 永久免费在线观看视频| 性色av免费观看| 亚洲精品成人在线| 亚洲∧v久久久无码精品| 亚洲免费视频播放| 性色av极品无码专区亚洲 | 亚洲人成亚洲人成在线观看| 亚洲AV无码久久精品色欲| 亚洲宅男精品一区在线观看| 国产成人人综合亚洲欧美丁香花| 国产精品黄页免费高清在线观看| 久久精品视频免费播放| 成年美女黄网站18禁免费| 亚洲一区二区三区无码影院| 亚洲黄色在线视频| 亚洲av无码av在线播放| 在线观看免费视频网站色| 国产福利在线免费|