<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 9, comments - 4, trackbacks - 0, articles - 21

    生產者/消費者模型

    Posted on 2007-10-27 16:23 一步一步努力向上爬 閱讀(3553) 評論(0)  編輯  收藏 所屬分類: 設計模式
    生產者/消費者模型是最基本的并發協作模型,是所有并發協作的基礎。可以這么說,其他的并發協作都是供求關系模型的變種。生產者,消費者之間的供求關系可以簡單的 使用管道來構造。讓我們看兩者之間的行為模式: *生產/消費模型:消費者如果無消費對象,就會阻塞直到有消費對象到達;一個消費對象僅供一個消費者消費。 *BlockingQueue: 如果隊列為空,則讀取操作將會阻塞直至隊列有新的內容到達;隊列中對象一旦被讀取,將從隊列中移走。 由此可見,阻塞隊列天然符合生產/消費模型的供求行為模式。在前面展示condition的用法的時候,曾經 用過生產者/消費者模型來舉例。那個例子如果改用BlockingQueue來寫的話就十分簡單
        ...
    BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
    ...
    public void supply () {
    q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
    }
    ...
    public void cunsume () {
    String product =q.take();
    System.out.println("consume product:"+product);
    }
    從BlockingQueue也可以看出,它和UNIX系統下面的Pipe十分相似。所不同的不過是兩點,首先,pipe是進程間的,命名管道甚至可以在非親緣進程間使用,而BlockingQueue 目前只是線程間的通信手段。當然,由于java本身強大的動態類裝載功能,這個缺陷對java程序之間的溝通限制并不大。其次,pipe是基于字節流的,而BlockingQueue是 基于對象的,這使得BlockingQueue更加易用,不過卻讓BlockingQueue綁定了Java語言,使進一步成為輕量級本地進程通信工具的難度增大。

    從前面對生產/消費模型的行為方式可以看出,生產/消費模型著重于規范消費者的行為模式,當消費速度超過生產速度的時候,消費者就會被阻塞。而對于生產者的行為則沒有 規定。當生產速度超過消費速度,生產者的行為模式可以分為以下幾種: #當積壓的產品達到一定數量時,生產者被阻塞 #無論有多少積壓產品,生產者都不會被阻塞 #不能有任何積壓產品,生產者在當前產品未被消費之前,會被阻塞 對于產品來說,也有不同的行為模式 #產品只有在被生產出來一段時間之后才能被消費(先花點時間晾晾干?) #不同類別的產品被消費的優先級不同(有鉆石的話,黃金就先放一邊吧:))

    根據生產者行為模式的不同Concurrent包提供了不同的BlockingQueue的實現 ||Queue種類||行為描述 |ArrayBlockingQueue|有限制的blocking queue,積壓的產品不得超過制訂數量 |DelayQueue|產品只有生產出一段時間之后,才能被消費,無限制的積壓產品 |LinkedBlockingQueue|同時支持有限制的blocking queue,也能支持無限制的積壓產品(數量不能超過Integer.MAX_VALUE) |PriorityBlockingQueue|不同產品的被消費優先級不同,無限制的積壓產品 |SynchronousQueue|不允許積壓產品

    這些不同的行為模式中,較為常見的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。舉例來說,如果我們利用BlockingQueue 來實現一個郵件系統(著名的qmail就是利用pipe技術構建的核心架構)。我們知道郵件有不同的級別,如果當前隊列里有加急郵件需要處理的話,系統將優先處理加急郵件。 我們將以郵件傳遞為例子,說明PriorityBlockingQueue的使用方法。(注:這里的這個郵件模型只是一個非常簡陋的模型,用來說明PriorityBlockingQueue的使用方法而已, 和實際應用有很大的差距)

    首先,我們需要了解郵件傳遞過程的基本模型。在這個簡單的郵件傳送模型中涉及到下列概念 *MDA: Mail Deliver Agent, 負責接受指定用戶的郵件。 *MTA: Mail Transfer Agent, 負責接受遠程傳送過來的郵件,并將其傳送給收件人的MDA 它們和郵件用戶之間的關系如下圖

    其中MTA使用Queue傳送郵件給MDA。因此,不同的用戶會使用不同的Mail Queue。下面是MailQueue的代碼
    public class MailQueue<E> extends PriorityBlockingQueue<E>{
    public E take () throws InterruptedException {
    E ren =super.take();
    Utils._log("take:"+ren);
    return ren;
    }

    public void put (E o) {
    super.put(o);
    Utils._log("put:"+o);
    }
    }
    為了能夠根據收件人的Mail Address找到相應的Mail Queue, 使用一個MailQueueFactory來產生MailQueue
    public class MailQueueFactory {
    //A ConcurrentHashMap is used here instead of Hashtable
    static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
    new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
    public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
    return getMailQueue(e.getMailAccount());
    }

    public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
    return getMailQueue (m.getReceiver());
    }

    public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
    mailQueues.putIfAbsent (e,new MailQueue<Mail>());
    MailQueue<Mail> mailQ =mailQueues.get(e);

    return mailQ;
    }
    }
    需要注意的是,我們在MailQueueFactory里面使用了ConcurrentHashMap,而不是傳統的Hashtable, 雖然Hashtable是thread-safe,但是缺乏putIfAbsent這樣的 原子函數,如果不小心設計的話,會造成對同一個MailQueue重復初始化,從而導致死鎖問題。 下面看Mail的定義
    public class Mail implements Comparable{
    public final static int emergencyMail =0;
    public final static int normalMail =1;

    static AtomicInteger serialCounter =new AtomicInteger(0);

    private int mailLevel;
    private int serialNumber =serialCounter.addAndGet(1);
    private MailAccount receiver =null;
    private MailAccount sender =null;
    private Date sendTime =new Date();

    public Mail (String from, String to, int level) {
    ...
    }

    //Get functions
    ...

    public int compareTo(Object o) {
    if (o instanceof Mail) {
    return compareTo ((Mail)o);
    }
    return 0;
    }

    public int compareTo (Mail o) {
    if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
    if (o.serialNumber==this.serialNumber)
    return 0;
    if (o.serialNumber>this.serialNumber)
    return -1;
    return 1;
    }
    if (this.mailLevel==emergencyMail) return -1;
    return 1;
    }
    //Other functions
    ...
    }
    這里值得注意的是AtomicInteger的使用,它被用來做內部serialNumber的產生。另外就是compareTo函數的使用,PriorityBlockingQueue使用Comparable接口來判定元素的優先級別。這里所定義的優先級如下: *如果郵件類別相同,則序列號小的郵件有較大的優先級 *如果郵件類別不同,則emergencyMail有較大的優先級 最后是Deliver Agent 和 Transfer Agent的代碼
    public class MailDeliverer {
    MailAccount mailAccount =null;

    public MailDeliverer (MailAccount account) {
    this.mailAccount =account;
    }

    public MailAccount getMailAccount() {
    return mailAccount;
    }

    public Mail retrieveMail () {
    Mail mail =null;
    while (mail==null) {
    try {
    mail =MailQueueFactory.getMailQueue(this).take();
    }catch (Exception e) {
    Utils._log("Encounter Exception",e);
    }
    }
    return mail;
    }
    }

    public class MailTransfer {
    private static MailTransfer instance =new MailTransfer ();
    private MailTransfer () { }

    public static MailTransfer getInstance () {
    return instance;
    }

    public void processMail (Mail m) {
    BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
    try {
    mailQ.put(m);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 国产免费的野战视频| 久久久久亚洲精品无码网址| 免费无遮挡无遮羞在线看| 亚洲熟妇无码八AV在线播放| 成人毛片18女人毛片免费| 久久国产精品国产自线拍免费| 亚洲成AV人片高潮喷水| 日韩精品一区二区亚洲AV观看| 亚洲精品网站在线观看不卡无广告 | 久久精品亚洲中文字幕无码麻豆 | 亚洲人成毛片线播放| 亚洲va中文字幕无码久久| 免费h成人黄漫画嘿咻破解版| 亚洲熟妇无码一区二区三区| 久久精品国产精品亚洲艾草网| 久久精品国产亚洲一区二区三区| 国产乱子伦精品免费女| 成年女人午夜毛片免费视频| 日韩精品无码一区二区三区免费| 一级a性色生活片久久无少妇一级婬片免费放 | 亚洲午夜久久久影院伊人| 亚洲国产成人乱码精品女人久久久不卡| 拍拍拍又黄又爽无挡视频免费| 曰曰鲁夜夜免费播放视频 | 亚洲国产电影在线观看| 亚洲综合区图片小说区| 亚洲福利在线观看| 久久精品国产亚洲av成人| 国产精品亚洲片在线观看不卡| AV在线亚洲男人的天堂| 亚洲伊人久久成综合人影院| 亚洲毛片av日韩av无码| 亚洲精品国产va在线观看蜜芽| 免费在线观看中文字幕| 亚洲成AV人在线观看网址| 亚洲国产精品毛片av不卡在线| 亚洲国产电影av在线网址| 亚洲精品乱码久久久久久蜜桃| 亚洲日韩在线中文字幕第一页| 亚洲中文字幕成人在线| 久久99国产亚洲高清观看首页|