<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢蝶

    生活、程序、未來
       :: 首頁 ::  ::  :: 聚合  :: 管理

    Clojure的并發(fā)(四)Agent深入分析和Actor

    Posted on 2010-07-19 18:48 dennis 閱讀(4132) 評(píng)論(0)  編輯  收藏 所屬分類: javaClojure
    Clojure 的并發(fā)(一) Ref和STM
    Clojure 的并發(fā)(二)Write Skew分析
    Clojure 的并發(fā)(三)Atom、緩存和性能
    Clojure 的并發(fā)(四)Agent深入分析和Actor
    Clojure 的并發(fā)(五)binding和let
    Clojure的并發(fā)(六)Agent可以改進(jìn)的地方
    Clojure的并發(fā)(七)pmap、pvalues和pcalls
    Clojure的并發(fā)(八)future、promise和線程

    四、 Agent和Actor

       除了用于協(xié)調(diào)同步的Ref,獨(dú)立同步的Ref,還有一類非常常見的需求:你可能希望狀態(tài)的更新是異步,你通常不關(guān)心更新的結(jié)果,這時(shí)候你可以考慮下使用Agent。

    1、創(chuàng)建agent:

    user=> (def counter (agent 0))
    #'user/counter

    user
    => counter
    #<Agent@9444d1: 0>


    通過agent函數(shù)你就可以創(chuàng)建一個(gè)agent,指向一個(gè)不可變的初始狀態(tài)。

    2、取agent的值,這跟Ref和Atom沒啥兩樣,都是通過deref或者@宏:
    user=> @counter
    0
    user
    => (deref counter)
    0

    3、更新agent,通過send或者send-off函數(shù)給agent發(fā)送任務(wù)去更新agent:
    user=> (send counter inc)
    #<Agent@9444d1: 0>

      send返回agent對(duì)象,內(nèi)部的值仍然是0,而非inc遞增之后的1,這是因?yàn)閟end是異步發(fā)送,更新是在另一個(gè)線程執(zhí)行,兩個(gè)線程(REPL主線程和更新任務(wù)的線程)的執(zhí)行順序沒有同步,顯示什么取決于兩者誰更快。更新肯定是發(fā)生了,查看counter的值:
    user=> @counter
    1

       果然更新到了1了。send的方法簽名:
    (send a f & args)

       其中f是更新的函數(shù),它的定義如下:
    (f state-of-agent & args)
       也就是它會(huì)在第一個(gè)參數(shù)接收當(dāng)前agent的狀態(tài),而args是send附帶的參數(shù)。

       還有個(gè)方法,send-off,它的作用于send類似:
    user=> (send-off counter inc)
    #<Agent@9444d1: 1>
    user=> @counter
    2

       send和send-off的區(qū)別在于,send是將任務(wù)交給一個(gè)固定大小的線程池執(zhí)行
    final public static ExecutorService pooledExecutor =
            Executors
    .newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
       默認(rèn)線程池大小是CPU核數(shù)加上2。因此send執(zhí)行的任務(wù)最好不要有阻塞的操作。而send-off則使用沒有大小限制(取決于內(nèi)存)的線程池:

    final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
      
       因此,send-off比較適合任務(wù)有阻塞的操作,如IO讀寫之類。請(qǐng)注意,所有的agent是共用這些線程池,這從這些線程池的定義看出來,都是靜態(tài)變量。

    4、異步轉(zhuǎn)同步
    ,剛才提到send和send-off都是異步將任務(wù)提交給線程池去處理,如果你希望同步等待結(jié)果返回,那么可以使用await函數(shù):
     (do (send counter inc) (await counter) (println @counter))

    send一個(gè)任務(wù)之后,調(diào)用await等待agent所有派發(fā)的更新任務(wù)結(jié)束,然后打印agent的值。await是阻塞當(dāng)前線程,直到至今為止所有任務(wù)派發(fā)執(zhí)行完畢才返回。await沒有超時(shí),會(huì)一直等待直到條件滿足,await-for則可以接受等待的超時(shí)時(shí)間,如果超過指定時(shí)間沒有返回,則返回nil,否則返回結(jié)果。
     (do (send counter inc) (await-for 100 counter) (println @counter))

    await-for接受的單位是毫秒。

    5、錯(cuò)誤處理


       agent也可以跟Ref和Atom一樣設(shè)置validator,用于約束驗(yàn)證。由于agent的更新是異步的,你不知道更新的時(shí)候agent是否發(fā)生異常,只有等到你去取值或者更新的時(shí)候才能發(fā)現(xiàn):
    user=> (def counter (agent 0 :validator number?))
    #
    'user/counter

    user
    => (send counter (fn[_] "foo"))
    #
    <clojure.lang.Agent@4de8ce62: 0>

       強(qiáng)制要求counter的值是數(shù)值類型,第二個(gè)表達(dá)式我們給counter發(fā)送了一個(gè)更新任務(wù),想將狀態(tài)更新為字符串"foo",由于是異步更新,返回的結(jié)果可能沒有顯示異常,當(dāng)你取值的時(shí)候,問題出現(xiàn)了:
    user=> @counter
    java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
    0)

      告訴你agent處于不正常的狀態(tài),如果你想獲取詳細(xì)信息,可以通過agent-errors函數(shù):
    user=> (.printStackTrace (agent-errors counter))
    java.lang.IllegalArgumentException: No matching field found: printStackTrace 
    for class clojure.lang.PersistentList (NO_SOURCE_FILE:0)

       你可以恢復(fù)agent到前一個(gè)正常的狀態(tài),通過clear-agent-errors函數(shù):
     
    user=> (clear-agent-errors counter)
    nil
    user
    => @counter
    0

    6、加入事務(wù)

    agent跟atom不一樣,agent可以加入事務(wù),在事務(wù)里調(diào)用send發(fā)送一個(gè)任務(wù),當(dāng)事務(wù)成功的時(shí)候該任務(wù)將只會(huì)被發(fā)送一次,最多最少都一次。利用這個(gè)特性,我們可以實(shí)現(xiàn)在事務(wù)操作的時(shí)候?qū)懳募_(dá)到ACID中的D——持久性的目的:
    (def backup-agent (agent "output/messages-backup.clj" ))
    (def messages (ref []))
    (use 
    '[clojure.contrib.duck-streams :only (spit)])
    (defn add-message-with-backup [msg]
           (dosync
               (let [snapshot (commute messages conj msg)]
                    (send
    -off backup-agent (fn [filename]
                                            (spit filename snapshot)
                                            filename))
               snapshot)))

    定義了一個(gè)backup-agent用于保存消息,add-message-with-backup函數(shù)首先將狀態(tài)保存到messages,這是個(gè)普通的Ref,然后調(diào)用send-off給backup-agent一個(gè)任務(wù):
     (fn [filename]
              (spit filename snapshot)
             filename)
    這個(gè)任務(wù)是一個(gè)匿名函數(shù),它利用spit打開文件,寫入當(dāng)前的快照,并且關(guān)閉文件,文件名來自backup-agent的狀態(tài)值。注意到,我們是用send-off,send-off利用cache線程池,哪怕阻塞也沒關(guān)系。

    利用事務(wù)加上一個(gè)backup-agent可以實(shí)現(xiàn)類似數(shù)據(jù)庫的ACID,但是還是不同的,主要區(qū)別在于backup-agent的更新是異步,并不保證一定寫入文件,因此持久性也沒辦法得到保證。

    7、關(guān)閉線程池:


    前面提到agent的更新都是交給線程池去處理,在系統(tǒng)關(guān)閉的時(shí)候你需要關(guān)閉這兩個(gè)線程吃,通過shutdown-agents方法,你再添加任務(wù)將被拒絕:
    user=> (shutdown-agents)
    nil
    user
    => (send counter inc)
    java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
    0)
    user
    => (def counter (agent 0))
    #
    'user/counter
    user=> (send counter inc)    
    java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
    0)

    哪怕我重新創(chuàng)建了counter,提交任務(wù)仍然被拒絕,進(jìn)一步證明這些線程池是全局共享的。

    8、原理淺析

    前文其實(shí)已經(jīng)將agent的實(shí)現(xiàn)原理大體都說了,agent本身只是個(gè)普通的java對(duì)象,它的內(nèi)部維持一個(gè)狀態(tài)和一個(gè)隊(duì)列:
        volatile Object state;
        AtomicReference
    <IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);


    任務(wù)提交的時(shí)候,是封裝成Action對(duì)象,添加到此隊(duì)列

        
    public Object dispatch(IFn fn, ISeq args, boolean solo) {
            
    if (errors != null) {
                
    throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
            }
            
    //封裝成action對(duì)象
            Action action = new Action(this, fn, args, solo);
            dispatchAction(action);

            
    return this;
        }


        
    static void dispatchAction(Action action) {
            LockingTransaction trans 
    = LockingTransaction.getRunning();
            
    // 有事務(wù),加入事務(wù)
            if (trans != null)
                trans.enqueue(action);
            
    else if (nested.get() != null) {
                nested.set(nested.get().cons(action));
            }
            
    else {
                
    // 入隊(duì)
                action.agent.enqueue(action);
            }
        }

    send和send-off都是調(diào)用Agent的dispatch方法,只是兩者的參數(shù)不一樣,dispatch的第二個(gè)參數(shù) solo決定了是使用哪個(gè)線程池處理action:
    (defn send
      [#
    ^clojure.lang.Agent a f & args]
        (. a (dispatch f args 
    false)))

    (defn send
    -off
      [#
    ^clojure.lang.Agent a f & args]
        (. a (dispatch f args 
    true)))

    send-off將solo設(shè)置為true,當(dāng)為true的時(shí)候使用cache線程池:

       
    final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

        
    final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();

            
    void execute() {
                
    if (solo)
                    soloExecutor.execute(
    this);
                
    else
                    pooledExecutor.execute(
    this);
            }

    執(zhí)行的時(shí)候調(diào)用更新函數(shù)并設(shè)置新的狀態(tài):

    try {
                        Object oldval 
    = action.agent.state;
                        Object newval 
    = action.fn.applyTo(RT.cons(action.agent.state, action.args));
                        action.agent.setState(newval);
                        action.agent.notifyWatches(oldval, newval);
                    }
                    
    catch (Throwable e) {
                        
    // todo report/callback
                        action.agent.errors = RT.cons(e, action.agent.errors);
                        hadError 
    = true;
                    }

    9、跟actor的比較:

    Agent跟Actor有一個(gè)顯著的不同,agent的action來自于別人發(fā)送的任務(wù)附帶的更新函數(shù),而actor的action則是自身邏輯的一部分。因此,如果想用agent實(shí)現(xiàn)actor模型還是相當(dāng)困難的,下面是我的一個(gè)嘗試:

    (ns actor)

    (defn receive [
    & args]
       (apply hash
    -map args))
    (defn self [] 
    *agent*)

    (defn spawn [recv
    -map]
        (agent recv
    -map))

    (defn 
    ! [actor msg]
        (send actor #(apply (get 
    %1 %2)  (vector %2)) msg))
    ;;啟動(dòng)一個(gè)actor
    (def actor (spawn 
                 (receive :hello #(println 
    "receive "%))))
    ;;發(fā)送消息 hello
    (
    ! actor :hello)

       利用spawn啟動(dòng)一個(gè)actor,其實(shí)本質(zhì)上是一個(gè)agent,而發(fā)送通過感嘆號(hào)!,給agent發(fā)送一個(gè)更新任務(wù),它從recv-map中查找消息對(duì)應(yīng)的處理函數(shù)并將消息作為參數(shù)來執(zhí)行。難點(diǎn)在于消息匹配,匹配這種簡單類型的消息沒有問題,但是如果匹配用到變量,暫時(shí)沒有想到好的思路實(shí)現(xiàn),例如實(shí)現(xiàn)兩個(gè)actor的ping/pong。

     
    主站蜘蛛池模板: 59pao成国产成视频永久免费| 亚洲日韩中文字幕| 免费看大美女大黄大色| 成人性做爰aaa片免费看| 看一级毛片免费观看视频| 色在线亚洲视频www| 亚洲男人都懂得羞羞网站| 久久精品夜色噜噜亚洲A∨| 日本高清免费不卡在线| 久久久高清免费视频| 人妻丰满熟妇无码区免费| 国产精品hd免费观看| 美女露隐私全部免费直播| 亚洲午夜无码久久| 亚洲国产成人久久99精品| 亚洲欧洲一区二区| 亚洲高清在线视频| 亚洲gv白嫩小受在线观看| 国产亚洲av片在线观看18女人 | 三上悠亚亚洲一区高清| 四虎永久在线免费观看| 国产免费久久精品久久久| 久久久久国色AV免费观看性色 | 亚洲一区欧洲一区| 亚洲国产精品一区二区久| 337p日本欧洲亚洲大胆艺术| 亚洲a一级免费视频| 亚洲不卡中文字幕无码| 久久精品国产亚洲av麻| 亚洲AV成人片色在线观看| 久久精品国产精品亚洲艾| 久久精品国产亚洲av麻豆色欲 | 高清一区二区三区免费视频| baoyu122.永久免费视频| a级毛片毛片免费观看久潮喷| 999zyz**站免费毛片| 中文字幕a∨在线乱码免费看| 国产免费高清69式视频在线观看| 一级特黄录像视频免费| 中国精品一级毛片免费播放| 毛片免费在线观看|