<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    上善若水
    In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
    posts - 146,comments - 147,trackbacks - 0

    一直想好好學習concurrent包中的各個類的實現,然而經??戳艘稽c就因為其他事情干擾而放下了。發現這樣太不利于自己的成長了,因而最近打算潛心一件一件的完成自己想學習的東西。

    對concurrent包的學習打算先從Lock的實現開始,因而自然而然的就端起了AbstractQueuedSynchronizer,然而要讀懂這個類的源碼并不是那么容易,因而我就開始問自己一個問題:如果自己要去實現這個一個Lock對象,應該如何實現呢?

    要實現Lock對象,首先理解什么是鎖?我自己從編程角度簡單的理解,所謂鎖對象(互斥鎖)就是它能保證一次只有一個線程能進入它保護的臨界區,如果有一個線程已經拿到鎖對象,那么其他對象必須讓權等待,而在該線程退出這個臨界區時需要喚醒等待列表中的其他線程。更學術一些,《計算機操作系統》中對同步機制準則的歸納(P50):

    1. 空閑讓進。當無進程處于臨界區時,表明臨界資源處于空閑狀態,應允許一個請求進入臨界區的進程立即進入自己的臨界區,以有效的利用臨界資源。
    2. 忙則等待。當已有進程進入臨界區時,表明臨界資源正在被訪問,因而其他試圖進入臨界區的進程必須等待,以保證對臨界區資源的互斥訪問。
    3. 有限等待。對要求訪問臨界資源的進程,應保證在有限時間內能進入自己的臨界區,以免陷入“死等”狀態。
    4. 讓權等待。當進程不能進入自己的臨界區時,應該釋放處理機,以免進程陷入“忙等”狀態。

    說了那么多,其實對互斥鎖很簡單,只需要一個標記位,如果該標記位為0,表示沒有被占用,因而直接獲得鎖,然后把該標記位置為1,此時其他線程發現該標記位已經是1,因而需要等待。這里對這個標記位的比較并設值必須是原子操作,而在JDK5以后提供的atomic包里的工具類可以很方便的提供這個原子操作。然而上面的四個準則應該漏了一點,即釋放鎖的線程(進程)和得到鎖的線程(進程)應該是同一個,就像一把鑰匙對應一把鎖(理想的),所以一個非常簡單的Lock類可以這么實現:

    public class SpinLockV1 {
        
    private final AtomicInteger state = new AtomicInteger(0);
        
    private volatile Thread owner; // 這里owner字段可能存在中間值,不可靠,因而其他線程不可以依賴這個字段的值
        
        
    public void lock() {
            
    while (!state.compareAndSet(01)) { }
            owner 
    = Thread.currentThread();
        }
        
        
    public void unlock() {
            Thread currentThread 
    = Thread.currentThread();
            
    if (owner != currentThread || !state.compareAndSet(10)) {
                
    throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
            }
            owner 
    = null;
        }
    }

    一個簡單的測試方法:

        @Test
        
    public void testLockCorrectly() throws InterruptedException {
            
    final int COUNT = 100;
            Thread[] threads 
    = new Thread[COUNT];
            SpinLockV1 lock 
    = new SpinLockV1();
            AddRunner runner 
    = new AddRunner(lock);
            
    for (int i = 0; i < COUNT; i++) { 
                threads[i] 
    = new Thread(runner, "thread-" + i);
                threads[i].start();
            }
            
            
    for (int i = 0; i < COUNT; i++) {
                threads[i].join();
            }
            
            assertEquals(COUNT, runner.getState());
        }
        
        
    private static class AddRunner implements Runnable {
            
    private final SpinLockV1 lock;
            
    private int state = 0;

            
    public AddRunner(SpinLockV1 lock) {
                
    this.lock = lock;
            }
            
            
    public void run() {
                lock.lock();
                
    try {
                    quietSleep(
    10);
                    state
    ++;
                    System.out.println(Thread.currentThread().getName() 
    + "" + state);
                } 
    finally {
                    lock.unlock();
                }
            }
            
            
    public int getState() {
                
    return state;
            }
        }

    然而這個SpinLock其實并不需要state這個字段,因為owner的賦值與否也是一種狀態,因而可以用它作為一種互斥狀態:

    public class SpinLockV2 {
        
    private final AtomicReference<Thread> owner = new AtomicReference<Thread>(null);
        
        
    public void lock() {
            
    final Thread currentThread = Thread.currentThread();
            
    while (!owner.compareAndSet(null, currentThread)) { }
        }
        
        
    public void unlock() {
            Thread currentThread 
    = Thread.currentThread();
            
    if (!owner.compareAndSet(currentThread, null)) {
                
    throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
            }
        }
    }

    這在操作系統中被定義為整形信號量,然而整形信號量如果沒拿到鎖會一直處于“忙等”狀態(沒有遵循有限等待和讓權等待的準則),因而這種鎖也叫Spin Lock,在短暫的等待中它可以提升性能,因為可以減少線程的切換,concurrent包中的Atomic大部分都采用這種機制實現,然而如果需要長時間的等待,“忙等”會占用不必要的CPU時間,從而性能會變的很差,這個時候就需要將沒有拿到鎖的線程放到等待列表中,這種方式在操作系統中也叫記錄型信號量,它遵循了讓權等待準則(當前沒有實現有限等待準則)。在JDK6以后提供了LockSupport.park()/LockSupport.unpark()操作,可以將當前線程放入一個等待列表或將一個線程從這個等待列表中喚醒。然而這個park/unpark的等待列表是一個全局的等待列表,在unpartk的時候還是需要提供需要喚醒的Thread對象,因而我們需要維護自己的等待列表,但是如果我們可以用JDK提供的工具類ConcurrentLinkedQueue,就非常容易實現,如LockSupport文檔中給出來的代碼事例

    class FIFOMutex {
       
    private final AtomicBoolean locked = new AtomicBoolean(false);
       
    private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

       
    public void lock() {
         
    boolean wasInterrupted = false;
         Thread current 
    = Thread.currentThread();
         waiters.add(current);

         
    // Block while not first in queue or cannot acquire lock
         while (waiters.peek() != current || !locked.compareAndSet(falsetrue)) {
            LockSupport.park(
    this);
            
    if (Thread.interrupted()) // ignore interrupts while waiting
              wasInterrupted = true;
         }

         waiters.remove();
         
    if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
       }

       
    public void unlock() {
         locked.set(
    false);
         LockSupport.unpark(waiters.peek());
       }
     }

    在該代碼事例中,有一個線程等待隊列和鎖標記字段,每次調用lock時先將當前線程放入這個等待隊列中,然后拿出隊列頭線程對象,如果該線程對象正好是當前線程,并且成功 使用CAS方式設置locked字段(這里需要兩個同時滿足,因為可能出現一個線程已經從隊列中移除了但還沒有unlock,此時另一個線程調用lock方法,此時隊列頭的線程就是第二個線程,然而由于第一個線程還沒有unlock或者正在unlock,因而需要使用CAS原子操作來判斷是否要park),表示該線程競爭成功,獲得鎖,否則將當前線程park,這里之所以要放在 while循環中,因為park操作可能無理由返回(spuriously),如文檔中給出的描述:

    LockSupport.park()
    public static void park(Object blocker)
    Disables the current thread for thread scheduling purposes unless the permit is available.

    If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

    • Some other thread invokes unpark with the current thread as the target; or
    • Some other thread interrupts the current thread; or
    • The call spuriously (that is, for no reason) returns.

    This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

    Parameters:
    blocker - the synchronization object responsible for this thread parking
    Since:
    1.6
    我在實現自己的類時就被這個“無理由返回”坑了好久。對于已經獲得鎖的線程,將該線程從等待隊列中移除,這里由于ConcurrentLinkedQueue是線程安全的,因而能保證每次都是隊列頭的線程得到鎖,因而在得到鎖匙將隊列頭移除。unlock邏輯比較簡單,只需要將locked字段打開(設置為false),喚醒(unpark)隊列頭的線程即可,然后該線程會繼續在lock方法的while循環中繼續競爭unlocked字段,并將它自己從線程隊列中移除表示獲得鎖成功。當然安全起見,最好在unlock中加入一些驗證邏輯,如解鎖的線程和加鎖的線程需要相同。

    然而本文的目的是自己實現一個Lock對象,即只使用一些基本的操作,而不使用JDK提供的Atomic類和ConcurrentLinkedQueue。類似的首先我們也需要一個隊列存放等待線程隊列(公平起見,使用先進先出隊列),因而先定義一個Node對象用以構成這個隊列:

     

        protected static class Node {
            
    volatile Thread owner;
            
    volatile Node prev;
            
    volatile Node next;
            
            
    public Node(Thread owner) {
                
    this.owner = owner;
                
    this.state = INIT;
            }
            
            
    public Node() {
                
    this(Thread.currentThread());
            }
        }

    簡單起見,隊列頭是一個起點的placeholder,每個調用lock的線程都先將自己競爭放入這個隊列尾,每個隊列頭后一個線程(Node)即是獲得鎖的線程,所以我們需要有head Node字段用以快速獲取隊列頭的后一個Node,而tail Node字段用來快速插入新的Node,所以關鍵在于如何線程安全的構建這個隊列,方法還是一樣的,使用CAS操作,即CAS方法將自己設置成tail值,然后重新構建這個列表:

        protected boolean enqueue(Node node) {
            
    while (true) {
                
    final Node preTail = tail;
                node.prev 
    = preTail;
                
    if (compareAndSetTail(preTail, node)) {
                    preTail.next 
    = node;
                    
    return node.prev == head;
                }
            }
        }

    在當前線程Node以線程安全的方式放入這個隊列后,lock實現相對就比較簡單了,如果當前Node是的前驅是head,該線程獲得鎖,否則park當前線程,處理park無理由返回的問題,因而將park放入while循環中(該實現是一個不可重入的實現):

        public void lock() {
            
    // Put the latest node to a queue first, then check if the it is the first node
            
    // this way, the list is the only shared resource to deal with
            Node node = new Node();
            
    if (enqueue(node)) {
                current 
    = node.owner;
            } 
    else {
                
    while (node.prev != head) {
                    LockSupport.park(
    this); // This may return "spuriously"!!, so put it to while
                }

                current 
    = node.owner;
            }
        }

    unlock的實現需要考慮多種情況,如果當前Node(head.next)有后驅,那么直接unpark該后驅即可;如果沒有,表示當前已經沒有其他線程在等待隊列中,然而在這個判斷過程中可能會有其他線程進入,因而需要用CAS的方式設置tail,如果設置失敗,表示此時有其他線程進入,因而需要將該新進入的線程unpark從而該新進入的線程在調用park后可以立即返回(這里的CAS和enqueue的CAS都是對tail操作,因而能保證狀態一致):

        public void unlock() {
            Node curNode 
    = unlockValidate();
            Node next 
    = curNode.next;
            
    if (next != null) {
               
    head.next = next;
                next.prev 
    = head;
                LockSupport.unpark(next.owner);
            } 
    else {
                
    if (!compareAndSetTail(curNode, head)) {
                   
    while (curNode.next == null) { } // Wait until the next available
                    // Another node queued during the time, so we have to unlock that, or else, this node can never unparked
                    unlock();
                } 
    else {
                   
    compareAndSetNext(head, curNode, null); // Still use CAS here as the head.next may already been changed
                }
            }
        }

    具體的代碼和測試類可以參考查看這里


    其實直到自己寫完這個類后才直到者其實這是一個MCS鎖的變種,因而這個實現每個線程park在自身對應的node上,而由前一個線程unpark它;而AbstractQueuedSynchronizer是CLH鎖,因為它的park由前驅狀態決定,雖然它也是由前一個線程unpark它。具體可以參考這里。

    posted on 2015-08-11 06:08 DLevin 閱讀(5448) 評論(0)  編輯  收藏 所屬分類: CodeTools 、MultiThreading
    主站蜘蛛池模板: av成人免费电影| 亚洲av色香蕉一区二区三区蜜桃| 插鸡网站在线播放免费观看| 亚洲Av无码乱码在线znlu| 在线精品自拍亚洲第一区| 免费人成网站在线高清| 日日夜夜精品免费视频| 亚洲人成未满十八禁网站| 黑人粗长大战亚洲女2021国产精品成人免费视频 | 色www永久免费| 成人免费无码大片A毛片抽搐 | 日日操夜夜操免费视频| 黄床大片30分钟免费看| 国产偷窥女洗浴在线观看亚洲| 亚洲永久在线观看| 鲁丝片一区二区三区免费| 亚洲精品成人无限看| 国产天堂亚洲国产碰碰| 亚洲精品视频免费观看| 中文字幕在线免费看| 亚洲精选在线观看| 中文字幕的电影免费网站| 亚洲av永久无码精品漫画 | 亚洲精品V天堂中文字幕| 91精品国产免费网站| 亚洲精品高清国产一线久久| 男女午夜24式免费视频| 亚洲另类古典武侠| 97在线视频免费公开观看| 国产91在线|亚洲| 九月婷婷亚洲综合在线| 亚洲国产精品成人AV在线| 亚洲精品视频久久久| 91精品成人免费国产片| 亚洲毛片无码专区亚洲乱| 国产网站在线免费观看| 久久免费精品视频| 亚洲精品无码成人片久久不卡| 亚洲中文字幕无码中文字在线| 国产高清免费视频| 中文字幕亚洲综合久久综合 |