<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 9, comments - 4, trackbacks - 0, articles - 21

    生產者/消費者模型

    Posted on 2007-10-27 16:23 一步一步努力向上爬 閱讀(3552) 評論(0)  編輯  收藏 所屬分類: 設計模式
    生產者/消費者模型是最基本的并發協作模型,是所有并發協作的基礎。可以這么說,其他的并發協作都是供求關系模型的變種。生產者,消費者之間的供求關系可以簡單的 使用管道來構造。讓我們看兩者之間的行為模式: *生產/消費模型:消費者如果無消費對象,就會阻塞直到有消費對象到達;一個消費對象僅供一個消費者消費。 *BlockingQueue: 如果隊列為空,則讀取操作將會阻塞直至隊列有新的內容到達;隊列中對象一旦被讀取,將從隊列中移走。 由此可見,阻塞隊列天然符合生產/消費模型的供求行為模式。在前面展示condition的用法的時候,曾經 用過生產者/消費者模型來舉例。那個例子如果改用BlockingQueue來寫的話就十分簡單
        ...
    BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
    ...
    public void supply () {
    q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
    }
    ...
    public void cunsume () {
    String product =q.take();
    System.out.println("consume product:"+product);
    }
    從BlockingQueue也可以看出,它和UNIX系統下面的Pipe十分相似。所不同的不過是兩點,首先,pipe是進程間的,命名管道甚至可以在非親緣進程間使用,而BlockingQueue 目前只是線程間的通信手段。當然,由于java本身強大的動態類裝載功能,這個缺陷對java程序之間的溝通限制并不大。其次,pipe是基于字節流的,而BlockingQueue是 基于對象的,這使得BlockingQueue更加易用,不過卻讓BlockingQueue綁定了Java語言,使進一步成為輕量級本地進程通信工具的難度增大。

    從前面對生產/消費模型的行為方式可以看出,生產/消費模型著重于規范消費者的行為模式,當消費速度超過生產速度的時候,消費者就會被阻塞。而對于生產者的行為則沒有 規定。當生產速度超過消費速度,生產者的行為模式可以分為以下幾種: #當積壓的產品達到一定數量時,生產者被阻塞 #無論有多少積壓產品,生產者都不會被阻塞 #不能有任何積壓產品,生產者在當前產品未被消費之前,會被阻塞 對于產品來說,也有不同的行為模式 #產品只有在被生產出來一段時間之后才能被消費(先花點時間晾晾干?) #不同類別的產品被消費的優先級不同(有鉆石的話,黃金就先放一邊吧:))

    根據生產者行為模式的不同Concurrent包提供了不同的BlockingQueue的實現 ||Queue種類||行為描述 |ArrayBlockingQueue|有限制的blocking queue,積壓的產品不得超過制訂數量 |DelayQueue|產品只有生產出一段時間之后,才能被消費,無限制的積壓產品 |LinkedBlockingQueue|同時支持有限制的blocking queue,也能支持無限制的積壓產品(數量不能超過Integer.MAX_VALUE) |PriorityBlockingQueue|不同產品的被消費優先級不同,無限制的積壓產品 |SynchronousQueue|不允許積壓產品

    這些不同的行為模式中,較為常見的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。舉例來說,如果我們利用BlockingQueue 來實現一個郵件系統(著名的qmail就是利用pipe技術構建的核心架構)。我們知道郵件有不同的級別,如果當前隊列里有加急郵件需要處理的話,系統將優先處理加急郵件。 我們將以郵件傳遞為例子,說明PriorityBlockingQueue的使用方法。(注:這里的這個郵件模型只是一個非常簡陋的模型,用來說明PriorityBlockingQueue的使用方法而已, 和實際應用有很大的差距)

    首先,我們需要了解郵件傳遞過程的基本模型。在這個簡單的郵件傳送模型中涉及到下列概念 *MDA: Mail Deliver Agent, 負責接受指定用戶的郵件。 *MTA: Mail Transfer Agent, 負責接受遠程傳送過來的郵件,并將其傳送給收件人的MDA 它們和郵件用戶之間的關系如下圖

    其中MTA使用Queue傳送郵件給MDA。因此,不同的用戶會使用不同的Mail Queue。下面是MailQueue的代碼
    public class MailQueue<E> extends PriorityBlockingQueue<E>{
    public E take () throws InterruptedException {
    E ren =super.take();
    Utils._log("take:"+ren);
    return ren;
    }

    public void put (E o) {
    super.put(o);
    Utils._log("put:"+o);
    }
    }
    為了能夠根據收件人的Mail Address找到相應的Mail Queue, 使用一個MailQueueFactory來產生MailQueue
    public class MailQueueFactory {
    //A ConcurrentHashMap is used here instead of Hashtable
    static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
    new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
    public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
    return getMailQueue(e.getMailAccount());
    }

    public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
    return getMailQueue (m.getReceiver());
    }

    public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
    mailQueues.putIfAbsent (e,new MailQueue<Mail>());
    MailQueue<Mail> mailQ =mailQueues.get(e);

    return mailQ;
    }
    }
    需要注意的是,我們在MailQueueFactory里面使用了ConcurrentHashMap,而不是傳統的Hashtable, 雖然Hashtable是thread-safe,但是缺乏putIfAbsent這樣的 原子函數,如果不小心設計的話,會造成對同一個MailQueue重復初始化,從而導致死鎖問題。 下面看Mail的定義
    public class Mail implements Comparable{
    public final static int emergencyMail =0;
    public final static int normalMail =1;

    static AtomicInteger serialCounter =new AtomicInteger(0);

    private int mailLevel;
    private int serialNumber =serialCounter.addAndGet(1);
    private MailAccount receiver =null;
    private MailAccount sender =null;
    private Date sendTime =new Date();

    public Mail (String from, String to, int level) {
    ...
    }

    //Get functions
    ...

    public int compareTo(Object o) {
    if (o instanceof Mail) {
    return compareTo ((Mail)o);
    }
    return 0;
    }

    public int compareTo (Mail o) {
    if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
    if (o.serialNumber==this.serialNumber)
    return 0;
    if (o.serialNumber>this.serialNumber)
    return -1;
    return 1;
    }
    if (this.mailLevel==emergencyMail) return -1;
    return 1;
    }
    //Other functions
    ...
    }
    這里值得注意的是AtomicInteger的使用,它被用來做內部serialNumber的產生。另外就是compareTo函數的使用,PriorityBlockingQueue使用Comparable接口來判定元素的優先級別。這里所定義的優先級如下: *如果郵件類別相同,則序列號小的郵件有較大的優先級 *如果郵件類別不同,則emergencyMail有較大的優先級 最后是Deliver Agent 和 Transfer Agent的代碼
    public class MailDeliverer {
    MailAccount mailAccount =null;

    public MailDeliverer (MailAccount account) {
    this.mailAccount =account;
    }

    public MailAccount getMailAccount() {
    return mailAccount;
    }

    public Mail retrieveMail () {
    Mail mail =null;
    while (mail==null) {
    try {
    mail =MailQueueFactory.getMailQueue(this).take();
    }catch (Exception e) {
    Utils._log("Encounter Exception",e);
    }
    }
    return mail;
    }
    }

    public class MailTransfer {
    private static MailTransfer instance =new MailTransfer ();
    private MailTransfer () { }

    public static MailTransfer getInstance () {
    return instance;
    }

    public void processMail (Mail m) {
    BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
    try {
    mailQ.put(m);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲女女女同性video| 亚洲成a人片在线观看中文app| 亚洲欧美精品午睡沙发| 国产妇乱子伦视频免费| 亚洲妓女综合网99| 男男AV纯肉无码免费播放无码| wwwxxx亚洲| 日韩人妻无码免费视频一区二区三区| 成人亚洲国产va天堂| 国产精品久久久久影院免费| 在线观看亚洲专区| 久久国产成人精品国产成人亚洲| 丁香花在线观看免费观看图片| 亚洲码国产精品高潮在线| 暖暖日本免费中文字幕| 亚洲电影免费观看| 狼友av永久网站免费观看| 国产亚洲漂亮白嫩美女在线 | 黄色网址大全免费| 亚洲日韩在线第一页| 视频免费在线观看| 18gay台湾男同亚洲男同| 亚欧免费视频一区二区三区| 亚洲午夜精品一区二区麻豆| 五月婷婷亚洲综合| 无码日韩精品一区二区免费暖暖| 亚洲第一精品电影网| 日本高清免费不卡在线| 久久免费香蕉视频| 亚洲国产精品久久网午夜| 国产免费一区二区三区VR| 两个人看的www免费高清| 亚洲国产视频网站| 亚洲阿v天堂在线2017免费| 久久精品电影免费动漫| 亚洲av日韩综合一区久热| 亚洲av无码乱码国产精品| 久久久久久国产精品免费免费| 久青草国产免费观看| 亚洲人成影院午夜网站| 精品国产亚洲男女在线线电影|