<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-7  評論-23  文章-0  trackbacks-0
    JAVA LOCK總體來說關鍵要素主要包括3:
    1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
    2.unsafe.park() unsafe.unpark()
    3.單向鏈表結構或者說存儲線程的數據結構

    1主要為了保證鎖的原子性,相當于一個鎖是否正在被使用的標記,并且比較和設置這個標記的操作是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令周期不可中斷,SMP中通過鎖總線支持上訴兩個指令的原子性),這基本等于軟件級別所能達到的最高級別隔離。

    2主要將未得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是關鍵代碼park的最終實現是和操作系統相關的,比如windows下實現是在os_windows.cpp中,有興趣的同學可以下載jdk源碼查看)。喚醒一個被park()線程主要手段包括以下幾種
    1.       其他線程調用以被park()線程為參數的unpark(Thread thread).
    2.       其他線程中斷被park()線程,waiters.peek().interrupt();waiters為存儲線程對象的隊列.
    3.       不知原因的返回。

    park()方法返回并不會報告到底是上訴哪種返回,所以返回好最好檢查下線程狀態,如

    LockSupport.park();  //禁用當前線程
    If(Thread.interrupted){
       
    //doSomething
    }

    AbstractQueuedSynchronizer(AQS)對于這點實現得相當巧妙,如下所示
    private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
        
    final Node node = addWaiter(Node.SHARED);
        
    try {
             
    for (;;) {
                 
    final Node p = node.predecessor();
                 
    if (p == head) {
                     
    int r = tryAcquireShared(arg);
                     
    if (r >= 0{
                         setHeadAndPropagate(node, r);
                         p.next 
    = null// help GC
                         return;
                     }

                 }

                 
    //parkAndCheckInterrupt()會返回park住的線程在被unpark后的線程狀態,如果線程中斷,跳出循環。
                 if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                     
    break;
             }

         }
     catch (RuntimeException ex) {
              cancelAcquire(node);
              
    throw ex;
         }

        
         
    // 只有線程被interrupt后才會走到這里
         cancelAcquire(node);
         
    throw new InterruptedException();
    }


    //在park()住的線程被unpark()后,第一時間返回當前線程是否被打斷
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(
    this);
        
    return Thread.interrupted();
    }
    3點對于一個Synchronizer的實現非常重要,存儲等待線程,并且unlock時喚醒等待線程,這中間有很多工作需要做,喚醒策略,等待線程意外終結處理,公平非公平,可重入不可重入等。


    以上簡單說明了下
    JAVA LOCKS關鍵要素,現在我們來看下java.util.concurrent.locks大致結構

    上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現類都持有自己內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現不同的Sync呢?這和每種Lock用途相關。另外還有AQSState機制。

    基于AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不一樣而已。

    ReentrantLock需要記錄當前線程獲取原子狀態的次數,如果次數為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據著鎖從而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。以下為ReetranLock的FairSync的tryAcquire實現代碼解析。

    //公平獲取鎖
    protected final boolean tryAcquire(int acquires) {
        
    final Thread current = Thread.currentThread();
        
    int c = getState();
        
    //如果當前重進入數為0,說明有機會取得鎖
        if (c == 0{
            
    //如果是第一個等待者,并且設置重進入數成功,那么當前線程獲得鎖
            if (isFirst(current) &&
                compareAndSetState(
    0, acquires)) {
                setExclusiveOwnerThread(current);
                
    return true;
            }

        }

        
    //如果當前線程本身就持有鎖,那么疊加重進入數,并且繼續獲得鎖
        else if (current == getExclusiveOwnerThread()) {
            
    int nextc = c + acquires;
            
    if (nextc < 0)
                
    throw new Error("Maximum lock count exceeded");
            setState(nextc);
            
    return true;
         }

         
    //以上條件都不滿足,那么線程進入等待隊列。
         return false;
    }

    Semaphore則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現并發量的控制,Semaphore一開始設置許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現

    protected int tryAcquireShared(int acquires) {
        Thread current 
    = Thread.currentThread();
        
    for (;;) {
             Thread first 
    = getFirstQueuedThread();
             
    //如果當前等待隊列的第一個線程不是當前線程,那么就返回-1表示當前線程需要等待
             if (first != null && first != current)
                  
    return -1;
             
    //如果當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那么先取得semaphore還有幾個許可證,并且減去當前線程需要的許可證得到剩下的值
             int available = getState();
             
    int remaining = available - acquires;
             
    //如果remining<0,那么反饋給AQS當前線程需要等待,如果remaining>0,并且設置availble成功設置成剩余數,那么返回剩余值(>0),也就告知AQS當前線程拿到許可,可以繼續執行。
             if (remaining < 0 ||compareAndSetState(available, remaining))
                 
    return remaining;
        }

    }

    CountDownLatch閉鎖則要保持其狀態,在這個狀態到達終止態之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDownsync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠將這個初始值減到0,從而釋放原子狀態,讓等待的所有線程通過。

    //await時候執行,只查看當前需要countDown數量減為0了,如果為0,說明可以繼續執行,否則需要park住,等待countDown次數足夠,并且unpark所有等待線程
    public int tryAcquireShared(int acquires) {
         
    return getState() == 0? 1 : -1;
    }


    //countDown時候執行,如果當前countDown數量為0,說明沒有線程await,直接返回false而不需要喚醒park住線程,如果不為0,得到剩下需要countDown的數量并且compareAndSet,最終返回剩下的countDown數量是否為0,供AQS判定是否釋放所有await線程。
    public boolean tryReleaseShared(int releases) {
        
    for (;;) {
             
    int c = getState();
             
    if (c == 0)
                 
    return false;
             
    int nextc = c-1;
             
    if (compareAndSetState(c, nextc))
                 
    return nextc == 0;
       }

    }

    FutureTask需要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQSacquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續還是park,FutureTasktryAcquireShared方法所做的唯一事情就是檢查狀態,如果是RUNNING狀態那么讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,并且通過unpark喚醒正在等待的線程,返回結果。

    //get時待用,只檢查當前任務是否完成或者被Cancel,如果未完成并且沒有被cancel,那么告訴AQS當前線程需要進入等待隊列并且park住
    protected int tryAcquireShared(int ignore) {
         
    return innerIsDone()? 1 : -1;
    }


    //判定任務是否完成或者被Cancel
    boolean innerIsDone() {
        
    return ranOrCancelled(getState()) &&    runner == null;
    }


    //get時調用,對于CANCEL與其他異常進行拋錯
    V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
        
    if (!tryAcquireSharedNanos(0,nanosTimeout))
            
    throw new TimeoutException();
        
    if (getState() == CANCELLED)
            
    throw new CancellationException();
        
    if (exception != null)
            
    throw new ExecutionException(exception);
        
    return result;
    }


    //任務的執行線程執行完畢調用(set(V v))
    void innerSet(V v) {
         
    for (;;) {
            
    int s = getState();
            
    //如果線程任務已經執行完畢,那么直接返回(多線程執行任務?)
            if (s == RAN)
                
    return;
            
    //如果被CANCEL了,那么釋放等待線程,并且會拋錯
            if (s == CANCELLED) {
                releaseShared(
    0);
                
    return;
            }

            
    //如果成功設定任務狀態為已完成,那么設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及后續清理工作(一般由FutrueTask的子類實現)
            if (compareAndSetState(s, RAN)) {
                result 
    = v;
                releaseShared(
    0);
                done();
                
    return;
            }

         }

    }

    以上4AQS的使用是比較典型,然而有個問題就是這些狀態存在哪里呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()setState()兩個方法(protected)。這樣就為上述狀態解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態。

    AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryReleaseisHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現的,比如線程獨占ReetranLockSync,而tryAcquireSharedtryReleasedShared為需要共享形式獲取的synchronizer實現。

    ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSyncNonfairSync實現)Semaphore內部類Sync則實現了tryAcquireSharedtryReleasedShared(CountDownLatch相似,因為公平性問題,tryAcquireShared由其內部類FairSyncNonfairSync實現)CountDownLatch內部類Sync實現了tryAcquireSharedtryReleasedSharedFutureTask內部類Sync也實現了tryAcquireSharedtryReleasedShared

    其實使用過一些JAVA synchronizer的之后,然后結合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實我們自己也可以構造synchronizer。如下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。

    public class FIFOMutex {
        
    private AtomicBoolean locked=new AtomicBoolean(false);
        
    private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
        
        
    public void lock(){
            
    boolean wasInterrupted=false;
            Thread current
    =Thread.currentThread();
            waiters.add(current);
            
            
    //如果waiters的第一個等待者不為當前線程,或者當前locked的狀態為被占用(true)
            
    //那么park住當前線程
            while(waiters.peek()!=current||!locked.compareAndSet(falsetrue)){
                LockSupport.park();
                
                
    //當線程被unpark時,第一時間檢查當前線程是否被interrupted
                if(Thread.interrupted()){
                    wasInterrupted
    =true;
                }

            }

            
            
    //得到鎖后,從等待隊列移除當前線程,如果,并且如果當前線程已經被interrupted,
            
    //那么再interrupt一下以便供外部響應。
            waiters.remove();
            
    if(wasInterrupted){
                current.interrupt();
            }

        }

        
        
    //unlock邏輯相對簡單,設定當前鎖為空閑狀態,并且將等待隊列中
        
    //的第一個等待線程喚醒
        public void unlock(){
            locked.set(
    false);
            LockSupport.unpark(waiters.peek());
        }

    }

    總結,JAVA lock機制對于整個java concurrent包的成員意義重大,了解這個機制對于使用java并發類有著很多的幫助,文章中可能存在著各種錯誤,請各位多多諒解并且能夠提出來,謝謝。

    文章參考:JDK 1.6 source
                        java 并發編程實踐
                        JDK 1.6 API 文檔

     

    posted on 2010-09-30 12:05 BucketLI 閱讀(13158) 評論(2)  編輯  收藏

    評論:
    # re: JAVA LOCK代碼淺析 2012-10-30 10:32 | mengmeng.zhangmm
    非常清晰的講解了Concurrent的QAS機制和部分并發工具類實現。
    謝謝樓主的博文。  回復  更多評論
      
    # re: JAVA LOCK代碼淺析 2014-10-28 10:43 | zuidaima
    java 線程demo教程源代碼下載:http://zuidaima.com/share/k%E7%BA%BF%E7%A8%8B-p1-s1.htm  回復  更多評論
      

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 三级网站在线免费观看| 深夜a级毛片免费无码| 免费无码中文字幕A级毛片| 亚洲精品无码久久久影院相关影片| 亚洲av日韩av永久无码电影| 日韩激情无码免费毛片| 黄色a级免费网站| 亚洲M码 欧洲S码SSS222| 一区二区三区免费精品视频| 国产偷国产偷亚洲高清日韩| 美女无遮挡拍拍拍免费视频 | 久久精品国产精品亚洲艾草网美妙| 久久久久久久久无码精品亚洲日韩| 日本一道本高清免费| 深夜免费在线视频| 亚洲精品国产精品乱码在线观看| 野花香在线视频免费观看大全| 亚洲日本在线看片| 免费看韩国黄a片在线观看| 亚洲av片在线观看| 亚洲乱亚洲乱妇无码麻豆| 最近在线2018视频免费观看| 在线亚洲高清揄拍自拍一品区| 日本不卡高清中文字幕免费| 国产特黄一级一片免费| 亚洲午夜视频在线观看| 丁香花在线观看免费观看 | 亚洲成av人无码亚洲成av人| 中文字幕在亚洲第一在线| 你懂的免费在线观看网站| 亚洲偷自精品三十六区| 亚洲а∨天堂久久精品| 久久九九AV免费精品| 亚洲日韩精品无码专区| 亚洲日韩小电影在线观看| 国产91色综合久久免费| 免费一级毛片在线播放放视频 | 亚洲精品无码午夜福利中文字幕 | 久久免费动漫品精老司机| 亚洲精品又粗又大又爽A片| 国产精品亚洲mnbav网站|