<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    xylz,imxylz

    關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

       :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

    此小節(jié)介紹幾個與鎖有關(guān)的有用工具。

    閉鎖(Latch)

    閉鎖(Latch):一種同步方法,可以延遲線程的進(jìn)度直到線程到達(dá)某個終點(diǎn)狀態(tài)。通俗的講就是,一個閉鎖相當(dāng)于一扇大門,在大門打開之前所有線程都被阻斷,一旦大門打開所有線程都將通過,但是一旦大門打開,所有線程都通過了,那么這個閉鎖的狀態(tài)就失效了,門的狀態(tài)也就不能變了,只能是打開狀態(tài)。也就是說閉鎖的狀態(tài)是一次性的,它確保在閉鎖打開之前所有特定的活動都需要在閉鎖打開之后才能完成。

    CountDownLatch是JDK 5+里面閉鎖的一個實(shí)現(xiàn),允許一個或者多個線程等待某個事件的發(fā)生。CountDownLatch有一個正數(shù)計數(shù)器,countDown方法對計數(shù)器做減操作,await方法等待計數(shù)器達(dá)到0。所有await的線程都會阻塞直到計數(shù)器為0或者等待線程中斷或者超時。

    CountDownLatch的API如下。

    • public void await() throws InterruptedException
    • public boolean await(long timeout, TimeUnit unit) throws InterruptedException
    • public void countDown()
    • public long getCount()

    其中getCount()描述的是當(dāng)前計數(shù),通常用于調(diào)試目的。

    下面的例子中描述了閉鎖的兩種常見的用法。

    package xylz.study.concurrency.lock;

    import java.util.concurrent.CountDownLatch;

    public class PerformanceTestTool {

        public long timecost(final int times, final Runnable task) throws InterruptedException {
            if (times <= 0) throw new IllegalArgumentException();
            final CountDownLatch startLatch = new CountDownLatch(1);
            final CountDownLatch overLatch = new CountDownLatch(times);
            for (int i = 0; i < times; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            startLatch.await();
                            //
                            task.run();
                        } catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                        } finally {
                            overLatch.countDown();
                        }
                    }
                }).start();
            }
            //
            long start = System.nanoTime();
            startLatch.countDown();
            overLatch.await();
            return System.nanoTime() - start;
        }

    }

    在上面的例子中使用了兩個閉鎖,第一個閉鎖確保在所有線程開始執(zhí)行任務(wù)前,所有準(zhǔn)備工作都已經(jīng)完成,一旦準(zhǔn)備工作完成了就調(diào)用startLatch.countDown()打開閉鎖,所有線程開始執(zhí)行。第二個閉鎖在于確保所有任務(wù)執(zhí)行完成后主線程才能繼續(xù)進(jìn)行,這樣保證了主線程等待所有任務(wù)線程執(zhí)行完成后才能得到需要的結(jié)果。在第二個閉鎖當(dāng)中,初始化了一個N次的計數(shù)器,每個任務(wù)執(zhí)行完成后都會將計數(shù)器減一,所有任務(wù)完成后計數(shù)器就變?yōu)榱?,這樣主線程閉鎖overLatch拿到此信號后就可以繼續(xù)往下執(zhí)行了。

    根據(jù)前面的happend-before法則可以知道閉鎖有以下特性:

    內(nèi)存一致性效果:線程中調(diào)用 countDown() 之前的操作 happen-before 緊跟在從另一個線程中對應(yīng) await() 成功返回的操作。

    在上面的例子中第二個閉鎖相當(dāng)于把一個任務(wù)拆分成N份,每一份獨(dú)立完成任務(wù),主線程等待所有任務(wù)完成后才能繼續(xù)執(zhí)行。這個特性在后面的線程池框架中會用到,其實(shí)FutureTask就可以看成一個閉鎖。后面的章節(jié)還會具體分析FutureTask的。

     

    同樣基于探索精神,仍然需要“窺探”下CountDownLatch里面到底是如何實(shí)現(xiàn)await*countDown的。

    首先,研究下await()方法。內(nèi)部直接調(diào)用了AQSacquireSharedInterruptibly(1)。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    前面一直提到的都是獨(dú)占鎖(排它鎖、互斥鎖),現(xiàn)在就用到了另外一種鎖,共享鎖。

    所謂共享鎖是說所有共享鎖的線程共享同一個資源,一旦任意一個線程拿到共享資源,那么所有線程就都擁有的同一份資源。也就是通常情況下共享鎖只是一個標(biāo)志,所有線程都等待這個標(biāo)識是否滿足,一旦滿足所有線程都被激活(相當(dāng)于所有線程都拿到鎖一樣)。這里的閉鎖CountDownLatch就是基于共享鎖的實(shí)現(xiàn)。

    閉鎖中關(guān)于AQStryAcquireShared的實(shí)現(xiàn)是如下代碼(java.util.concurrent.CountDownLatch.Sync.tryAcquireShared):

    public int tryAcquireShared(int acquires) {
        return getState() == 0? 1 : -1;
    }

    在這份邏輯中,對于閉鎖而言第一次await時tryAcquireShared應(yīng)該總是-1,因?yàn)閷τ陂]鎖CountDownLatch而言state的值就是初始化的count值。這也就解釋了為什么在countDown調(diào)用之前閉鎖的count總是>0。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }

    上面的邏輯展示了如何通過await將所有線程串聯(lián)并掛起,直到被喚醒或者條件滿足或者被中斷。整個過程是這樣的:

      1. 將當(dāng)前線程節(jié)點(diǎn)以共享模式加入AQSCLH隊列中(相關(guān)概念參考這里這里)。進(jìn)行2。
      2. 檢查當(dāng)前節(jié)點(diǎn)的前任節(jié)點(diǎn),如果是頭結(jié)點(diǎn)并且當(dāng)前閉鎖計數(shù)為0就將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),喚醒繼任節(jié)點(diǎn),返回(結(jié)束線程阻塞)。否則進(jìn)行3。
      3. 檢查線程是否該阻塞,如果應(yīng)該就阻塞(park),直到被喚醒(unpark)。重復(fù)2。
      4. 如果2、3有異常就拋出異常(結(jié)束線程阻塞)。

    這里有一點(diǎn)值得說明下,設(shè)置頭結(jié)點(diǎn)并喚醒繼任節(jié)點(diǎn)setHeadAndPropagate。由于前面tryAcquireShared總是返回1或者-1,而進(jìn)入setHeadAndPropagate時總是propagate>=0,所以這里propagate==1。后面喚醒繼任節(jié)點(diǎn)操作就非常熟悉了。

    private void setHeadAndPropagate(Node node, int propagate) {
        setHead(node);
        if (propagate > 0 && node.waitStatus != 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                unparkSuccessor(node);
        }
    }

    從上面的所有邏輯可以看出countDown應(yīng)該就是在條件滿足(計數(shù)為0)時喚醒頭結(jié)點(diǎn)(時間最長的一個節(jié)點(diǎn)),然后頭結(jié)點(diǎn)就會根據(jù)FIFO隊列喚醒整個節(jié)點(diǎn)列表(如果有的話)。

    CountDownLatchcountDown代碼中看到,直接調(diào)用的是AQSreleaseShared(1),參考前面的知識,這就印證了上面的說法。

    tryReleaseShared中正是采用CAS操作減少計數(shù)(每次減-1)。

    public boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

    整個CountDownLatch就是這個樣子的。其實(shí)有了前面原子操作和AQS的原理及實(shí)現(xiàn),分析CountDownLatch還是比較容易的。

     



    ©2009-2014 IMXYLZ |求賢若渴
    posted on 2010-07-09 09:21 imxylz 閱讀(29345) 評論(6)  編輯  收藏 所屬分類: J2EE

    評論

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch)[未登錄] 2010-07-12 19:40 行云流水
    堅持。幾天沒有更新了,堅持一天3篇,支持支持。。  回復(fù)  更多評論
      

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch) 2010-07-12 23:40 xylz
    @行云流水

    多謝鼓勵!爭取不“太監(jiān)”了。  回復(fù)  更多評論
      

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch) 2012-02-02 12:15 漆黑之牙
    @xylz
    文章太長,看完頭都大了!  回復(fù)  更多評論
      

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch) 2012-06-19 00:15 vinfai
    應(yīng)該向你好好學(xué)習(xí)!都落伍了  回復(fù)  更多評論
      

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch) 2013-03-26 15:42 ffengtian
    所有的請求await的線程都阻塞了,而countdouwn操作只是state-1,哪個操作執(zhí)行喚醒阻塞的  回復(fù)  更多評論
      

    # re: 深入淺出 Java Concurrency (10): 鎖機(jī)制 part 5 閉鎖 (CountDownLatch) 2013-10-02 22:27 txbhcml
    @ffengtian
    doReleaseShared  回復(fù)  更多評論
      


    ©2009-2014 IMXYLZ
    主站蜘蛛池模板: 久久精品国产精品亚洲色婷婷| 亚洲乱码国产一区三区| 亚洲欧美精品午睡沙发| 国产91在线免费| 国产免费久久精品丫丫| 久久亚洲国产成人亚| 1000部啪啪未满十八勿入免费| 2020久久精品亚洲热综合一本| 国产免费变态视频网址网站| 91av视频免费在线观看| 亚洲AV无码成人网站在线观看| 亚洲精品偷拍视频免费观看 | 叮咚影视在线观看免费完整版| 久久久亚洲精品视频| 亚洲欧洲中文日韩av乱码| 日本a级片免费看| 99视频免费在线观看| 亚洲а∨天堂久久精品9966| 久久久久亚洲AV成人网人人软件| 91大神免费观看| 鲁丝片一区二区三区免费| 亚洲国产精华液2020| 亚洲国产成人久久综合碰碰动漫3d| 成人免费一级毛片在线播放视频| 免费人成在线观看播放a| 亚洲狠狠综合久久| 亚洲成人中文字幕| 亚洲成年轻人电影网站www| 亚洲中文字幕在线乱码| 夭天干天天做天天免费看| 免费精品99久久国产综合精品| 4hu四虎免费影院www| 亚洲熟妇丰满xxxxx| 亚洲卡一卡2卡三卡4卡无卡三| 亚洲不卡av不卡一区二区| heyzo亚洲精品日韩| 国产福利在线免费| 免费做爰猛烈吃奶摸视频在线观看| 很黄很污的网站免费| 久久免费国产视频| 国产精品永久免费视频|