<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    xylz,imxylz

    關注后端架構、中間件、分布式和并發編程

       :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

    本小節介紹鎖釋放Lock.unlock()。

    Release/TryRelease

    unlock操作實際上就調用了AQS的release操作,釋放持有的鎖。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    前面提到過tryRelease(arg)操作,此操作里面總是嘗試去釋放鎖,如果成功,說明鎖確實被當前線程持有,那么就看AQS隊列中的頭結點是否為空并且能否被喚醒,如果可以的話就喚醒繼任節點(下一個非CANCELLED節點,下面會具體分析)。

    對于獨占鎖而言,java.util.concurrent.locks.ReentrantLock.Sync.tryRelease(int)展示了如何嘗試釋放鎖(tryRelease)操作。

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    整個tryRelease操作是這樣的:

      1. 判斷持有鎖的線程是否是當前線程,如果不是就拋出IllegalMonitorStateExeception(),因為一個線程是不能釋放另一個線程持有的鎖(否則鎖就失去了意義)。否則進行2。
      2. 將AQS狀態位減少要釋放的次數(對于獨占鎖而言總是1),如果剩余的狀態位0(也就是沒有線程持有鎖),那么當前線程就是最后一個持有鎖的線程,清空AQS持有鎖的獨占線程。進行3。
      3. 將剩余的狀態位寫回AQS,如果沒有線程持有鎖就返回true,否則就是false。

    參考上一節的分析就可以知道,這里c==0決定了是否完全釋放了鎖。由于ReentrantLock是可重入鎖,因此同一個線程可能多重持有鎖,那么當且僅當最后一個持有鎖的線程釋放鎖是才能將AQS中持有鎖的獨占線程清空,這樣接下來的操作才需要喚醒下一個需要鎖的AQS節點(Node),否則就只是減少鎖持有的計數器,并不能改變其他操作。

    tryRelease操作成功后(也就是完全釋放了鎖),release操作才能檢查是否需要喚醒下一個繼任節點。這里的前提是AQS隊列的頭結點需要鎖(waitStatus!=0),如果頭結點需要鎖,就開始檢測下一個繼任節點是否需要鎖操作。

    在上一節中說道acquireQueued操作完成后(拿到了鎖),會將當前持有鎖的節點設為頭結點,所以一旦頭結點釋放鎖,那么就需要尋找頭結點的下一個需要鎖的繼任節點,并喚醒它。

    private void unparkSuccessor(Node node) {
            //此時node是需要是需要釋放鎖的頭結點

            //清空頭結點的waitStatus,也就是不再需要鎖了
            compareAndSetWaitStatus(node, Node.SIGNAL, 0);

            //從頭結點的下一個節點開始尋找繼任節點,當且僅當繼任節點的waitStatus<=0才是有效繼任節點,否則將這些waitStatus>0(也就是CANCELLED的節點)從AQS隊列中剔除  
           //這里并沒有從head->tail開始尋找,而是從tail->head尋找最后一個有效節點。
           //解釋在這里 http://m.tkk7.com/xylz/archive/2010/07/08/325540.html#377512

            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }

            //如果找到一個有效的繼任節點,就喚醒此節點線程
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    這里再一次把acquireQueued的過程找出來。對比unparkSuccessor,一旦頭節點的繼任節點被喚醒,那么繼任節點就會嘗試去獲取鎖(在acquireQueued中node就是有效的繼任節點,p就是喚醒它的頭結點),如果成功就會將頭結點設置為自身,并且將頭結點的前任節點清空,這樣前任節點(已經過時了)就可以被GC釋放了。

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

    setHead中,將頭結點的前任節點清空并且將頭結點的線程清空就是為了更好的GC,防止內存泄露。

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    對比lock()操作,unlock()操作還是比較簡單的,主要就是釋放響應的資源,并且喚醒AQS隊列中有效的繼任節點。這樣所就按照請求的順序去嘗試獲取鎖了。

    整個lock()/unlock()過程完成了,我們再回頭看公平鎖(FairSync)和非公平鎖(NonfairSync)。

    公平鎖和非公平鎖只是在獲取鎖的時候有差別,其它都是一樣的。

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    在上面非公平鎖的代碼中總是優先嘗試當前是否有線程持有鎖,一旦沒有任何線程持有鎖,那么非公平鎖就霸道的嘗試將鎖“占為己有”。如果在搶占鎖的時候失敗就和公平鎖一樣老老實實的去排隊。

    也即是說公平鎖和非公平鎖只是在入AQSCLH隊列之前有所差別,一旦進入了隊列,所有線程都是按照隊列中先來后到的順序請求鎖。

    Condition

    條件變量很大一個程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。

    條件(也稱為條件隊列條件變量)為線程提供了一個含義,以便在某個狀態條件現在可能為 true 的另一個線程通知它之前,一直掛起該線程(即讓其“等待”)。因為訪問此共享狀態信息發生在不同的線程中,所以它必須受保護,因此要將某種形式的鎖與該條件相關聯。等待提供一個條件的主要屬性是:以原子方式 釋放相關的鎖,并掛起當前線程,就像 Object.wait 做的那樣。

    上述API說明表明條件變量需要與鎖綁定,而且多個Condition需要綁定到同一鎖上。前面的Lock中提到,獲取一個條件變量的方法是Lock.newCondition()

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

    以上是Condition接口定義的方法,await*對應于Object.waitsignal對應于Object.notifysignalAll對應于Object.notifyAll。特別說明的是Condition的接口改變名稱就是為了避免與Object中的wait/notify/notifyAll的語義和使用上混淆,因為Condition同樣有wait/notify/notifyAll方法。

    每一個Lock可以有任意數據的Condition對象,Condition是與Lock綁定的,所以就有Lock的公平性特性:如果是公平鎖,線程為按照FIFO的順序從Condition.await中釋放,如果是非公平鎖,那么后續的鎖競爭就不保證FIFO順序了。

    一個使用Condition實現生產者消費者的模型例子如下。

    package xylz.study.concurrency.lock;

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    public class ProductQueue<T> {

        private final T[] items;

        private final Lock lock = new ReentrantLock();

        private Condition notFull = lock.newCondition();

        private Condition notEmpty = lock.newCondition();

        //
        private int head, tail, count;

        public ProductQueue(int maxSize) {
            items = (T[]) new Object[maxSize];
        }

        public ProductQueue() {
            this(10);
        }

        public void put(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == getCapacity()) {
                    notFull.await();
                }
                items[tail] = t;
                if (++tail == getCapacity()) {
                    tail = 0;
                }
                ++count;
                notEmpty.signalAll();
            } finally {
                lock.unlock();
            }
        }

        public T take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    notEmpty.await();
                }
                T ret = items[head];
                items[head] = null;//GC
                //
                if (++head == getCapacity()) {
                    head = 0;
                }
                --count;
                notFull.signalAll();
                return ret;
            } finally {
                lock.unlock();
            }
        }

        public int getCapacity() {
            return items.length;
        }

        public int size() {
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }

    }

    在這個例子中消費take()需要 隊列不為空,如果為空就掛起(await()),直到收到notEmpty的信號;生產put()需要隊列不滿,如果滿了就掛起(await()),直到收到notFull的信號。

    可能有人會問題,如果一個線程lock()對象后被掛起還沒有unlock,那么另外一個線程就拿不到鎖了(lock()操作會掛起),那么就無法通知(notify)前一個線程,這樣豈不是“死鎖”了?

     

    await* 操作

    上一節中說過多次ReentrantLock是獨占鎖,一個線程拿到鎖后如果不釋放,那么另外一個線程肯定是拿不到鎖,所以在lock.lock()lock.unlock()之間可能有一次釋放鎖的操作(同樣也必然還有一次獲取鎖的操作)。我們再回頭看代碼,不管take()還是put(),在進入lock.lock()后唯一可能釋放鎖的操作就是await()了。也就是說await()操作實際上就是釋放鎖,然后掛起線程,一旦條件滿足就被喚醒,再次獲取鎖!

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    上面是await()的代碼片段。上一節中說過,AQS在獲取鎖的時候需要有一個CHL的FIFO隊列,所以對于一個Condition.await()而言,如果釋放了鎖,要想再一次獲取鎖那么就需要進入隊列,等待被通知獲取鎖。完整的await()操作是安裝如下步驟進行的:

      1. 將當前線程加入Condition鎖隊列。特別說明的是,這里不同于AQS的隊列,這里進入的是Condition的FIFO隊列。后面會具體談到此結構。進行2。
      2. 釋放鎖。這里可以看到將鎖釋放了,否則別的線程就無法拿到鎖而發生死鎖。進行3。
      3. 自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。進行4。
      4. 獲取鎖(acquireQueued)。并將自己從Condition的FIFO隊列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。

    這里再回頭介紹Condition的數據結構。我們知道一個Condition可以在多個地方被await*(),那么就需要一個FIFO的結構將這些Condition串聯起來,然后根據需要喚醒一個或者多個(通常是所有)。所以在Condition內部就需要一個FIFO的隊列。

    private transient Node firstWaiter;
    private transient Node lastWaiter;

    上面的兩個節點就是描述一個FIFO的隊列。我們再結合前面提到的節點(Node)數據結構。我們就發現Node.nextWaiter就派上用場了!nextWaiter就是將一系列的Condition.await*串聯起來組成一個FIFO的隊列。

     

    signal/signalAll 操作

    await*()清楚了,現在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await*()中FIFO隊列中第一個Node喚醒(或者全部Node)喚醒。盡管所有Node可能都被喚醒,但是要知道的是仍然只有一個線程能夠拿到鎖,其它沒有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter  = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }

    上面的代碼很容易看出來,signal就是喚醒Condition隊列中的第一個非CANCELLED節點線程,而signalAll就是喚醒所有非CANCELLED節點線程。當然了遇到CANCELLED線程就需要將其從FIFO隊列中剔除。

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int c = p.waitStatus;
        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    上面就是喚醒一個await*()線程的過程,根據前面的小節介紹的,如果要unpark線程,并使線程拿到鎖,那么就需要線程節點進入AQS的隊列。所以可以看到在LockSupport.unpark之前調用了enq(node)操作,將當前節點加入到AQS隊列。

    整個鎖機制的原理就介紹完了,從下一節開始就進入了鎖機制的應用了。

     



    ©2009-2014 IMXYLZ |求賢若渴
    posted on 2010-07-08 12:33 imxylz 閱讀(30485) 評論(11)  編輯  收藏 所屬分類: J2EE

    評論

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2010-07-08 12:35 草包書生
    還可以.......  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2012-05-07 10:12 紅淚
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    有個問題想請教一下您,就是這段代碼中,為什么要從tail往前遍歷尋找有效繼任節點呢?CHL隊列不是一個先進先出的雙環隊列么?按照這個的話,我覺得應該是直接從head往后遍歷尋找有效繼任節點才對?

    希望博主能夠給我分析分析這個問題,萬分感謝!  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2012-05-07 11:36 imxylz
    @紅淚
    理論上講如果在釋放節點的時候其他所有節點都沒有被中斷(也就是節點沒有被CANCELLED),那么就應當喚醒頭節點的下一個有效節點(頭節點是傀儡節點),也就是從head往后尋找有效繼任節點。

    但是我們知道所有調用了lock()/tryLock(long time, TimeUnit unit)的線程可能會被中斷,這時候已經進入CHL隊列的節點node就會被CANCELLED,也就是會移出隊列。
    而移出隊列的邏輯有點復雜,有空我單獨寫一篇文章。
    簡單說就是用被移出節點node的繼任節點next替換前任有效節點的next。
    用代碼描述就是java.util.concurrent.locks.AbstractQueuedSynchronizer.cancelAcquire(Node):
    node=cancelled_node;
    cas(node.pre.next,node,node.next);
    并且將node.next指向node,也就是node沒有繼任節點了,但是不修改前任節點。
    也就是說如果從后tail往前遍歷到被刪出節點node時,根據node.pre可以繼續往前移動,直到移動到head為止。

    如果要想從head往后遍歷,那么代碼邏輯就是:
    node = cancelled_node;
    cas(node.next.pre,node,node.pre);

    這兩處的邏輯差別在于,由于存在一個傀儡節點(head),因此節點node.pre總是存在的,處理起來稍微容易點。  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2012-05-08 10:25 imxylz
    @imxylz
    @紅淚
    上面的回答可能不夠正確 有空我在寫一篇文章吧  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2012-05-12 11:44 紅淚
    呵呵.謝謝博主的解答!!我覺得誰要有空結合Condition將chl隊列的變化情況寫寫,這個挺好..不過我覺得這個還是挺復雜,功力不足唉.  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2012-07-21 23:40 ToO
    "在上一節中說道acquireQueued操作完成后(拿到了鎖),會將當前持有鎖的節點設為頭結點",也即是此時head 指向的是當前要釋放鎖的節點,所以head已經不在是傀儡節點了  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2013-03-25 01:00 ffengtian
    @imxylz
    從隊尾獲取可用的Node,執行解鎖后,試圖獲取鎖時,它的pred沒有指向頭節點造成重新阻塞  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2014-12-01 15:56 會飛的鴨子
    博主關于您寫的那個隊列,head做自加操作,是否需要判斷head跟tail的關系呢?如果head跟tail已經指向同一節點,那么此時做自加操作會不會有問題呢?
    if (++head == getCapacity()) {
    head = 0;
    }
    改為
    if(head!=tail){
    head++;
    if(head==getCapacity()){
    head=0;
    }
    }
      回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2015-06-28 16:02 mitisky
    請教一個問題:線程1,2.同時去lock,假設線程1獲得鎖,線程2加到等待隊列。如果在線程2加入等待隊列之前,線程1就unlock。也就是說線程2在加入等待隊列的前一步,線程1完成了unlock操作。那這是不是意味著線程2會等待,線程1的unlock并沒有成功喚醒線程2  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2015-06-29 09:25 imxylz
    @mitisky
    入隊列enq 是一個CAS操作,如果加入隊列成功,會立即進行隊列自旋操作,在這個操作里面嘗試獲取鎖。
    參考:http://m.tkk7.com/xylz/archive/2010/07/07/325410.html

    acquireQueued(node,arg)
      回復  更多評論
      

    # re: 深入淺出 Java Concurrency (9): 鎖機制 part 4 2016-07-11 16:17 guanzhisong
    @imxylz
    我覺得是因為被cancelled的節點,next指針會指向自己
    // If successor needs signal, try to set pred's next-link
    // so it will get one. Otherwise wake it up to propagate.
    int ws;
    if (pred != head &&
    ((ws = pred.waitStatus) == Node.SIGNAL ||
    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    pred.thread != null) {
    Node next = node.next;
    if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
    } else {
    unparkSuccessor(node);
    }

    node.next = node; // help GC
    這段代碼是cancelAcquire里面的,假如通過next指針去遍歷,鏈表其實已經被中斷了,但是鏈表的prev指針貌似就沒有這個問題,個人愚見  回復  更多評論
      


    ©2009-2014 IMXYLZ
    主站蜘蛛池模板: 国内免费高清在线观看| 免费国产作爱视频网站| 日韩亚洲精品福利| 免费看美女午夜大片| 国产免费卡一卡三卡乱码| 看一级毛片免费观看视频| 午夜亚洲福利在线老司机| 成人免费网站久久久| 亚洲色精品aⅴ一区区三区| 999zyz**站免费毛片| 亚洲成AV人片在WWW色猫咪| 最近2019中文字幕免费大全5| 亚洲男人的天堂在线| 成年美女黄网站色大免费视频| 亚洲精品乱码久久久久久V| 免费在线观看的黄色网址| 波霸在线精品视频免费观看| 亚洲高清国产拍精品26U| 免费观看激色视频网站bd| 亚洲国产成人精品无码区二本| 亚洲第一视频在线观看免费| 本免费AV无码专区一区| 亚洲色图综合网站| 爽爽日本在线视频免费| 精品久久久久久国产免费了 | 免费播放春色aⅴ视频| 成av免费大片黄在线观看| 亚洲高清不卡视频| 日本免费人成黄页在线观看视频 | 性感美女视频在线观看免费精品| 国产亚洲高清在线精品不卡| 亚洲日韩欧洲无码av夜夜摸| 亚洲一级免费毛片| 美女免费视频一区二区三区| 亚洲成熟xxxxx电影| 99re热免费精品视频观看| 色多多A级毛片免费看| 亚洲人成在线播放网站岛国| 在线中文高清资源免费观看| 热久久这里是精品6免费观看| 亚洲首页国产精品丝袜|