<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年6月>
    2930311234
    567891011
    12131415161718
    19202122232425
    262728293012
    3456789

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    在本章中,將包含下面的內容:
    1. 連接中間件
    2. 生產消息
    3. 消費消息
    4. 使用JSON序列化消息
    5. 使用RPC消息 
    6.  廣播消息
    7. 使用direct交換器來處理消息路由
    8. 使用topic交換器來處理消息路由
    9. 保證消息處理
    10. 分發消息到多個消費者
    11. 使用消息屬性
    12. 事務消息
    13. 處理未路由消息
    要運行本章內的示例,你需要首先:
    1. 安裝Java JDK 1.6+
    2. 安裝Java RabbitMQ client library
    3. 正確地配置CLASSPATH 以及你喜歡的開發環境(Eclipse,NetBeans, 等等)
    4. 在某臺機器上安裝RabbitMQ server (也可以是同一個本地機器)
    連接到中間件
    每個使用AMQP的應用程序都必須建立一個與AMQP中間件的連接.默認情況下,RabbitMQ (以及任何其它1.0版本之前的AMQP中間件) 通過運行于5672端口之上且相當可靠傳輸協議-TCP來工作的, 即IANA分配的端口. 

    要創建一個連接RabbitMQ中間件的Java客戶端,你必須執行下面的步驟:
    1. 從Java RabbitMQ client library中必須的類:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    2. 創建客戶端ConnectionFactory的實例:
    ConnectionFactory factory = new ConnectionFactory();
    3. 設置ConnectionFactory 選項:
    factory.setHost(rabbitMQhostname);
    4. 連接RabbitMQ broker:
    Connection connection = factory.newConnection();
    5. 從剛創建的連接中創建一個通道:
    Channel channel = connection.createChannel();
    6. 一旦在RabbitMQ上完成了工作,就需要釋放通道和連接:
    channel.close();
    connection.close();

    How it works…
    使用Java client API, 應用程序必須創建一個ConnectionFactory實例,并且使用setHost()方法來設置運行RabbitMQ的主機.在導入相關類后(第1步),我們實例化了工廠對象(第2步).在這個例子中,我們只是用可選的命令行參數來設置主機名稱,但是,在后面的章節中,你可以找到更多關于連接選項的信息.第4步,實際上我們已經創建了連接到RabbitMQ中間件的連接.
    在這里,我們使用了默認的連接參數,用戶:guest,密碼:guest,以及虛擬主機:/,后面我們會討論這些參數.

    但現在我還沒有準備好與中間件通信,我們必須設置一個通信的channel(第5步).這是AMQP中的一個高級概念,使用此抽象,可以讓多個不同的消息會話使用同一個邏輯connection.
    實際上, Java client library 中的所有通信操作都是通過channel實例的方法來執行的.如果你正在開發多線程應用程序,強烈建議在每個線程中使用不同的channel.如果多個線程使用同一個channel,在channel方法調用中會順序執行,從而導致性能損失.最佳實踐是打開一個connection,并將其在多個不同線程之間分享.每個線程負責其獨立channel的創建,使用和銷毀.

    可對任何RabbitMQ connection指定多個不同的可選屬性.你可以在在線文檔(http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc)上找到它們. 除了AMQP 虛擬主機外,其它選項都不需要說明.

    虛擬主機是一個管理容器,在單個RabbitMQ實例中,允許配置多個邏輯上獨立的中間件主機, 以讓多個不同獨立應用程序能夠共享同一個RabbitMQ server. 每個虛擬主機都能獨立地配置權限,交換器,隊列,并在邏輯上獨立的環境中工作.

    也可以連接字符串(連接URI)來指定連接選項,即使用factory.setUri() 方法:
    ConnectionFactory factory = new ConnectionFactory();
    String uri="amqp://user:pass@hostname:port/vhost";
    factory.setUri(uri);

    URI必須與 RFC3 (http://www.ietf.org/rfc/rfc3986.txt)的語法規范保持一致.

    生產消息 
    在本配方中, 我們將學習了如何將消息發送到AMQP隊列. 我們將介紹AMQP消息的構建塊:消息,隊列,以及交換器.你可以在Chapter01/Recipe02/src/rmqexample中找到代碼.
    w to do it…
    在連接到中間件后, 像前面配方中看到的一樣,你可以按下面的步驟來來發送消息:

    1. 聲明隊列, 在 com.rabbitmq.client.Channel上調用queueDeclare()方法:
    String myQueue = "myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null); //創建一個名為myFirstQueue,持久化的,非限制的,不自動刪除的隊列,
    2. 發送第一個消息到RabbitMQ broker:
    String message = "My message to myFirstQueue";
    channel.basicPublish("",myQueue, null, message.getBytes());
    3. 使用不同的選項發送第二個消息:
    channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

    注意:隊列名稱是大小寫敏感的: MYFIRSTQUEUE與myFirstQueue是不同的.
    如何工作
    在第一個基本例子中,我們能夠發送一個消息到RabbitMQ.在信道建立后,第一個步驟可以確保目標隊列存在,這項任務是通過調用queueDeclare()方法來聲明隊列的(步驟1).
    如果隊列已經存在的話,此方法不會做任何事情,否則,它會自己創建一個隊列.如果隊列已存在,但使用了不同的參數進行創建,queueDeclare() 方法會拋出異常.

    注意,大部分的AMQP操作只是Channel Java接口的方法.
    所有與broker交互的操作都需要通過channel來實施.
    讓我們來深入探討queueDeclare() 方法. 其模板可以在http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/查看. 文檔看起來像下面這樣:

    實際上我們使用了第二個重載的方法:
    AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete,java.util.Map<java.lang.String,java.lang.Object> arguments)
    throws java.io.IOException
    其參數含義是:
    1. queue: 用來存儲消息的隊列名稱
    2. durable: 用于指定當服務器重啟時,隊列是否能復活.注意,當服務器重啟時,如果要保證消息持久性,必須將隊列聲明為持久化的
    3. exclusive: 用于指定此隊列是否只限制于當前連接.
    4. autoDelete:當隊列不再使用時,用于指示RabbitMQ broker是否要自動刪除隊列.
    5. arguments: 這是一個可選的隊列構建參數map.

    在第二步中,實際上我們才會將消息發送到RabbitMQ broker.
    RabbitMQ絕不打開消息體,對它來說,消息是透明的,因此你可以使用任何喜歡的序列化格式.通常我們會使用JSON, 但也可以使用XML, ASN.1, 標準的或自定義的ASCII或二進制格式. 最重要的事情是客戶端程序需要知道如何來解析數據.

    現在我們來深度解析 basicPublish()方法:
    void basicPublish(java.lang.String exchange,java.lang.String routingKey, AMQP.BasicProperties props, byte[] body) throws java.io.IOException

    在我們的例子中,exchange參數被設置成空字符串"", 即默認的交換器routingKey 參數設置成了隊列的名稱. 在這種情況下,消息將直接發送到routingKey指定的隊列中body 參數設置成了字符串的字節數組,也就是我們想要發送的消息props 參數默認設置成了null; 這些是消息屬性,我們將在Using message properties中深入討論.

    在步驟3中,我們發送了完全相同的消息,但將消息屬性設置成了MessageProperties.PERSISTENT_TEXT_PLAIN;通過這種方式我們要求RabbitMQ將此消息標記為持久化消息.
    兩個消息已經分發到了RabbitMQ broker, 邏輯上已經在myFirstQueue隊列上排隊了. 消息會駐留在緩沖區中,直到有一個客戶端來消費(通常來說,是一個不同的客戶端).
    如果隊列和消息都聲明為持久化,消息就會被標記為持久化的,broker會將其存儲在磁盤上.如果兩個條件中的任何一個缺失,消息將會存儲在內存中. 對于后者來說,當服務器重啟時,緩沖消息將不會復活,但消息的投遞和獲取會更快.我們將Chapter 8Performance Tuning for RabbitMQ來深入探討這個主題.

    更多
    在本章節中,我們將討論檢查RabbitMQ狀態的方法,以及隊列是否存在的方法.

    如何檢查RabbitMQ狀態
    要檢查RabbitMQ狀態,你可以使用rabbitmqctl命令行工具.在Linux設置中,它應該在PATH環境變量中.在Windows中,可在programs |
    RabbitMQ Server RabbitMQ Command Prompt (sbin dir). 我們可從命令行提示窗口中運行rabbitmqctl.bat.

    我們可以使用rbbitmqclt list_queues來檢查隊列狀態.在下面的截屏中,顯示了運行例子之前和之后的情景.


    在上面的截屏中,我們可以看到myfirstqueue隊列,其后跟著數字2, 它表示緩存在我們隊列中的消息數目(待發送消息數目).
    現在我們可以嘗試重啟RabbitMQ, 或者重啟主機.成功重啟RabbitMQ依賴于使用的OS:
    在Linux, RedHat, Centos, Fedora, Raspbian上:
    service rabbitmq-server restart
    在Linux, Ubuntu, Debian上:
    /etc/init.d/rabbitmq restart
    在Windows上:
    sc stop rabbitmq / sc start rabbitmq
    當我們再次運行rabbitmqclt list_queues 時,能期望有多少個消息呢?

    檢查隊列是否已經存在
    要確定特定隊列是否已經存在, 用channel.queueDeclarePassive()來代替channel.queueDeclare(). 兩個方法在隊列已經存在的情況下,會表現出相同的行為,否則,channel.queueDeclare()會創建隊列,但channel.queueDeclarePassive()會拋出異常.

    消費消息
    在本配方中,我們將關閉此回路.我們已經知道了如何將消息發送到RabbitMQ—或者任何AMQP broker—現在,我們要學習如何獲取這些消息.
    你可以在Chapter01/Recipe03/src/rmqexample/ nonblocking 找到源碼.

    如何做
    要消費前面配方中發送的消息,我們需要執行下面的步驟:
    1. 聲明我們要從哪里消費消息的隊列:
    String myQueue="myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null);
    2. 定義一個繼承自DefaultConsumer的消費類:
    public class ActualConsumer extends DefaultConsumer {
    public ActualConsumer(Channel channel) {
    super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
    String message = new String(body);
    System.out.println("Received: " + message);
    }
    }

    3. 創建一個消費對象實例,再綁定到我們的channel上:
    ActualConsumer consumer = new ActualConsumer(channel);
    4. 開始消費消息:
    String consumerTag = channel.basicConsume(myQueue, true,consumer);
    5. 一旦完成,取消消費者(
    其API含義為:取消當前消費者(不能再收到此隊列上的消息,重新運行消費者可以收到消息)并調用consumer的handleCancelOk方法):
    channel.basicCancel(consumerTag);
    如何運作
    在我們建立了與AMQP broker的connection 和channel后,我們需要確保我們從哪個隊列來消費消息(step 1).事實上,消費者可以在生產者發送消息到隊列之前就已經啟動了,此時有可能隊列還不存在,為了避免隊列上后續操作失敗,我們需要聲明隊列(譯者注:但消費聲明隊列這個動作并不必須的,只要生產者聲明了隊列,消費者不需要調用queueDeclare方法同樣可以消費消息,在這里只能認為是一種保險措施).
    TIP:
    通過允許生產者和消費者聲明相同的隊列,我們可以解藕其存在性,同時啟動的順序也不重要.

    步驟2的核心,我們通過覆蓋handleDelivery()方法定義了我們特定的消費者,以及在步驟3中我們進行實例化。在Java client API中,消費者回調是通過com.rabbitmq.client.Consumer接口定義的.我們從 DefaultConsumer擴展了我們的消費者,DefaultConsumer提供了Consumer 接口所有方法中無具體操作的實現.在步驟3中,通過調用channel.basicConsume(),我們讓消費者開始了消費消息.每個channel的消費者總是同一個線程上執行,而且是獨立于調用者的.
    現在我們已經從myQueue中激活了一個消費者,Java client library就會開始從RabbitMQ broker的隊列中獲取消息,并且會對每個消費者都調用handleDelivery().
    channel.basicConsume()方法調用后,我們會坐等主線程結束. 消息正在以非阻塞方式進行消費。
    只有當我們按Enter之后, 執行過程會到步驟5,然后消費者退出.在這個時刻,消費者線程會停止調用我們的消費者對象,因此我們可以釋放資源并退出。

    更多
    在本章節中,我們將了解更多關于消費者線程以及阻塞語義的用法.
    更多的消費者線程
    在連接定義期間,RabbitMQ Java API 會按消費者線程需要分配一個線程池。所有綁定到同一個channel的消費者都會使用線程池中的單個線程來運行;但是,有可能不同channel的消費者也可通過同一個線程來處理. 這就是為什么要在消費方法避免長時間操作的原因,為了避免阻塞其它消費者,可以在我們的自己定義的線程池中進行處理,就像我們例子中展示的一樣,但這不是必須的。我們已經定義了一個線程池, java.util.concurrent.ExecutorService, 因此可在連接期間將其傳入:
    ExecutorService eService = Executors.newFixedThreadPool(10);
    Connection connection = factory.newConnection(eService);
    這是由我們來進行管理的,因此我們要負責對其終止:
    eService.shutdown();
    但是,必須要記住的是,如果你沒有定義你自己的ExecutorService線程池,Java client library會在連接創建期間創建一個,并會在銷毀對應連接時,自動銷毀連接池。

    阻塞語義
    也可以使用阻塞語義,但如果不是用于簡單程序和測試用例,我們強烈反對這種方法;本配方中的消息消費是非阻塞的。然而,如果你要查找阻塞方案的源代碼的話,可以參考Chapter01/Recipe03/
    src/rmqexample/blocking.
    See also
    在官方的http://www.rabbitmq.com/releases/rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/Consumer.html 的Javadoc文檔中,你可以找到消費者接口的所有可用方法。

    使用JSON來序列化消息體(y)tion with JSON
    在AMQP中,消息是不透明的實體,AMQP不提供任何標準的方式來編解碼消息.但是,web應用程序經常使用JSON來作為應用程序層格式,JavaSciprt序列化格式已經變成了事實上的標準,在這種情況下,RabbitMQ client Java library 可以包含一些實用函數.另一方面,這也不是唯一的協議,任何程序可以選擇它自己的協議(XML, Google Protocol Buffers, ASN.1, or proprietary).
    在這個例子中,我們將展示如何使用JSON協議來編解碼消息 體. 我們會使用Java編寫的發布者(Chapter01/Recipe04/Java_4/src/rmqexample)來發送消息,并用 Python語言編寫的消費者來消費消息 (Chapter01/Recipe04/Python04).

    如何做How to do it…
    要實現一個Java生產者和一個Python消費者, 你可以執行下面的步驟:
    1. Java: 除了導入Connecting to the broker配方中提到的包外,我們還要導入:
    import com.rabbitmq.tools.json.JSONWriter;
    2. Java: 創建一個非持久化隊列:
    String myQueue="myJSONBodyQueue_4";
    channel.queueDeclare(MyQueue, false, false, false, null);
    3. Java: 創建一個使用樣例數據的Book列表:
    List<Book>newBooks = new ArrayList<Book>();
    for (inti = 1; i< 11; i++) {
    Book book = new Book();
    book.setBookID(i);
    book.setBookDescription("History VOL: " + i );
    book.setAuthor("John Doe");
    newBooks.add(book);
    }

    4. Java: 使用JSONwriter來序列化newBooks實例:
    JSONWriter rabbitmqJson = new JSONWriter();
    String jsonmessage = rabbitmqJson.write(newBooks);
    5. Java: 最后發送jsonmessage:
    channel.basicPublish("",MyQueue,null,jsonmessage.getBytes());
    6. Python: 要使用Pika library,我們必須要導入下面的包:
    import pika;
    import json;
    Python 有JSON處理的內鍵包.
    7. Python: 創建RabbitMQ的連接,使用下面的代碼:
    connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
    8. Python: 聲明隊列,綁定消費者,然后再注冊回調:
    channel = connection.channel()
    my_queue = "myJSONBodyQueue_4"
    channel.queue_declare(queue=my_queue)
    channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
    channel.start_consuming()
    How it works…
    在我們設置環境后(步驟1和步驟2),我們使用了write(newbooks)來序列化newbooks類。此方法返回返回的JSON字符串,就像下面的展示的一樣:
    [
    {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 1",
    "bookID" : 1
    },
    {
    "author" : "John Doe",
    "bookDescription" : "History VOL: 2",
    "bookID" : 2
    }
    ]

    步驟4中,我們發布了一個jsonmessage到myJSONBodyQueue_4隊列中.現在Python消費者可以從同一個隊列中獲取消息。在Python中我們看如何操作:
    connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
    channel = connection.channel()
    queue_name = "myJSONBodyQueue_4"
    channel.queue_declare(queue=my_queue)
    ..
    channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
    channel.start_consuming()
    正如Java實現中看到的一樣,我們必須創建一個連接,然后再創建一個通道.channel.queue_declare(queue=myQueue)方法,我們聲明了非持久化,不受連接限制,不會自己刪除的隊列。 如果要改變隊列的屬性,我們方法中添加參數,就像下面這樣:
    channel.queue_declare(queue=myQueue,durable=True)

    當不同AMQP clients聲明了相同隊列時,那么確保有相同的durable, exclusive, 和autodelete 屬性是相當重要的(如果隊列名稱相同,但屬性不同會拋異常),否則,
     channel.queue_declare()會拋異常。
    對于channel.basic_consume()方法, client會從給定的隊列中消費消息,當接收到消息后,會調用consumer_callback()回調方法。
    在Java中我們是在消費者接口中定義的回調,但在Python中,它們只是傳遞給basic_consume()方法, 更多的功能,更少的聲明,是Python的典范.
    consumer_callback回調如下:
    def consumer_callback(ch, method, properties, body):
    newBooks=json.loads(body);
    print" Count books:",len(newBooks);
    for item in newBooks:
    print 'ID:',item['bookID'], '-
    Description:',item['bookDescription'],' -
    Author:',item['author']
    回調接收到消息后,使用json.loads()來反序列化消息,然后就可以準備讀取newBooks的結構了。
    更多
    包含在RabbitMQ client library中的JSON幫助類是非常簡單的,在真實項目中,你可以使用外部JSON library.如:強大的google-gson (https://code.google.com/p/google-gson/) 或 jackson (http://jackson.codehaus.org/).
    使用RPC消息
    遠程過程調用(RPC)通常用于client-server架構. client提出需要執行服務器上的某些操作請求,然后等待服務器響應.
    消息架構試圖使用發后即忘(fire-and-forget)的消息形式來實施一種完全不同的解決方案,但是可以使用設計合理的AMQP隊列和增加型RPC來實施,如下所示:


    上面的圖形描述了request queue是與responder相關聯的,reply queues 與callers是相聯的.但是,當我們在使用RabbitMQ的時候,所有的涉及的端點(callers和responders) 都是AMQP clients.現在我們將描述Chapter01/Recipe05/Java_5/src/rmqexample/rpc例子中的操作步驟.

    如何做
    執行下面的步驟來實現RPC responder:
    1. 聲明一個請求隊列, responder會在此處來等候RPC請求:
    channel.queueDeclare(requestQueue, false, false, false,null);
    2. 通過覆蓋DefaultConsumer.handleDelivery()來定義我們特定的RpcResponderConsumer消費者, 在接收到每個RPC請求的時,消費者將:
    ? 執行RPC請求中的操作
    ? 準備回復消息
    ? 通過下面的代碼在回復屬性中設置correlation ID:
    BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
    將答案發送到回復隊列中:
    getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
    ?發送應答給RPC request:
    getChannel().basicAck(envelope.getDeliveryTag(), false);
    3. 開始消費消息,直到我們看到了回復消息才停止:
    現在讓我們來執行下面的步驟來實現RPC caller:
    1. 聲明請求隊列,在這里responder會等待RPC請求:
    channel.queueDeclare(requestQueue, false, false, false,null);
    2. 創建一個臨時的,私有的,自動刪除的回復隊列:
    String replyQueue = channel.queueDeclare().getQueue();
    3. 定義我們特定的消費者RpcCallerConsumer, 它用于接收和處理RPC回復. 它將:
    ? 當收到回復時,通過覆蓋handleDelivery()用于指明要做什么
    (在我們的例子中,定義了AddAction()):
    public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws java.io.IOException {
    String messageIdentifier =properties.getCorrelationId();
    String action = actions.get(messageIdentifier);
    actions.remove(messageIdentifier);
    String response = new String(body);
    OnReply(action, response);
    }
    4. 調用channel.basicConsume()方法啟動消息消費.
    5. 準備和序列化請求(我們例子中是messageRequest).
    6. 初始化一個任意的唯一消息標識符(messageIdentifier).
    7. 當消費者收到相應回復的時候,定義應該做什么,通過使用messageIdentifier來綁定動作.在我們的例子中,我們通過調用我們自定義的方法 consumer.AddAction()來完成的.
    8. 發布消息到請求隊列,設置其屬性:
    BasicProperties props = new BasicProperties.Builder().correlationId(messageIdentifier)replyTo(replyQueue).build();
    channel.basicPublish("", requestQueue,props,messageRequest.getBytes());
    如何工作
    在我們的例子中,RPC responder扮演的是RPC server的角色; responder會監聽requestQueue公共隊列(步驟1),這里放置了調用者的請求.
    另一方面,每個調用者會在其私有隊列上消費responder的回復信息(步驟5).當caller發送消息時(步驟11),它包含兩個屬性:一個是用于監聽的臨時回復隊列 (replyTo())名稱,另一個是消息標識(correlationId()),當回復消息時,用于標識caller.事實上,在我們的例子中,我們已經實現了一個異步的RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).
    回到responder, RPC邏輯全在RpcResponderConsumer中.這不同于特定的非阻塞consumer,就像我們在消費消息配方中看到的一樣,但不同的是下面兩點細節:
    回復的隊列名稱是通過消息屬性來獲取的,即properties.getReplyTo().其值已經被caller設成了私有,臨時回復隊列.
    回復消息必須包含在correlation ID標識的隊列中(待查)

    TIP
    RPC responde不會使用correlation ID;它只用來讓caller收到對應請求的回復消息
    更多
    本章節我們會討論阻塞RPC的使用.
    使用阻塞RPC
    有時,簡單性比可擴展性更重要.在這種情況下,可以使用包含在 Java RabbitMQ client library中實現了阻塞RPC語義的幫助類:
    com.rabbitmq.client.RpcClient
    com.rabbitmq.client.StringRpcServer
    邏輯是相同的,
    但沒有非阻塞消費者參與, 并且臨時隊列和correlation IDs 的處理對于用戶來說是透明的.
    你可以在Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc找到相關的例子.
    擴展注意
    當有多個callers會發生什么呢?它主要以標準的RPC client/server 架構來工作.但如果運行多個reponders會怎樣呢?
    在這種情況下,所有的responders都會從請求隊列中關注消費消息. 此外, responders可位于不同的主機.  這個主題的更多信息請參考配方-分發消息給多個消費者.
    廣播消息es
    在本例中,我們看到如何將同一個消息發送給有可能很大量的消費者.這是一個典型的廣播消息到大量客戶端的消息應用.舉例來說,在大型多人游戲中更新記分板的時候,或在一個社交網絡應用中發布新聞的時候,都需要將消息廣播給多個消費者. 
    在本配方中,我們同時探討生產者和消費者實現.因為它是非常典型的消費者可以使用不同的技術和編程語言,在AMQP中,我們將使用Java, Python, 以及Ruby來展示這種互通性.
    我們會感謝AMQP中隔離交換器和隊列帶來的好處.Chapter01/Recipe06/中找到源碼.
    如何做
    要做這道菜,我們需要四個不同的代碼:
    1. Java發布者
    2. Java消費者
    3. Python消費者
    4. Ruby消費者
    準備Java發布者:
    1. 聲明一個fanout類型的交換器:
    channel.exchangeDeclare(myExchange, "fanout");
    2. 發送一個消息到交換器:
    channel.basicPublish(myExchange, "", null,jsonmessage.getBytes());

    然后準備Java消費者:
    1. 聲明同一個生產者聲明的fanout交換器:
    channel.exchangeDeclare(myExchange, "fanout");
    2. 自動創建一個新的臨時隊列:
    String queueName = channel.queueDeclare().getQueue();
    3. 將隊列綁定到交換器上:
    channel.queueBind(queueName, myExchange, "");
    4. 定義一個自定義,非阻塞消費者,這部分內容已經在消費消息食譜中看到過了
    .
    5. 調用channel.basicConsume()來消費消息
    相對于Java消費者來說,Python消費者的源碼是非常簡單的,因此這里沒必要再重復必要的步驟,只需要遵循Java消費者的步驟,可參考Chapter01/Recipe06/Python_6/PyConsumer.py的代碼.
    Ruby消費者中,你必須使用"bunny" 然后再使用URI連接.
    可查看在Chapter01/Recipe06/Ruby_6/RbConsumer.rb的源碼
    現在我們要把這些整合到一起來看食譜:
    1. 啟動一個Java生產者的實例; 消息將立即進行發布.
    2. 啟動一個或多個Java/Python/Ruby的消費者實例; 消費者只有當它們運行的時候,才能接接收到消息.
    3. 停止其中一個消費者,而生產者繼續運行,然后再重啟這個消費者,我們可以看到消費者在停止期間會丟失消息.
    如何運作
    生產者和消費者都通過單個連接連上了RabbitMQ,消息的邏輯路徑如下圖所示:
    在步驟1中,我們已經聲明了交換器,與隊列聲明的邏輯一樣: 如果指定的交換器不存在,將會進行創建;否則,不做任何事情.exchangeDeclare()方法的第二個參數是一個字符串, 它用于指定交換器的類型,在我們這里,交換器類型是fanout.
    在步驟2中,生產者向交換器發送了一條消息. 你可以使用下面的命令來查看它以及其它已定義的交換器:
    rabbitmqctl list_exchanges
    channel.basicPublish() 方法的第二個參數是路由鍵(routing key),在使用fanout交換器時,此參數通常會忽略.第三個設置為null的參數, 此參數代表可選的消息屬性(更多信息可參考使用消息屬性食譜).第四個參數是消息本身.
    當我們啟動一個消費者的時候,它創建一個它自己的臨時隊列(步驟9). 使用channel.queueDeclare()空重載,我們會創建一個非持久化,私有的,自動刪除的,隊列名稱自動生成的隊列.
    運行一對消費者,并用rabbitmqctl list_queues查看,我們可以兩個隊列,每個消費者一個, 還有奇怪的名字,還有前面食譜中用到的持久化隊列myFirstQueue ,如下圖所示:
    在步驟5中,我們將隊列綁定到了myExchange交換器上.可以用下面的命令來監控這些綁定:
    rabbitmqctl list_bindings
    監控是AMQP非常重要的一面; 消息是通過交換器來路由到綁定隊列的,且會在隊列中緩存.

    TIP
    交換器不會緩存消息,它只是邏輯元素.
    fanout交換器在通過消息拷貝,來將消息路由到每個綁定的隊列中,因此,如果沒有綁定隊列,消息就不會被消費者接收(參考處理未路由消息食譜來了解更多信息).
    一旦我們關閉了消費者,我們暗中地銷毀了其私有臨時隊列(這就是為什么隊列是自動刪除的,否則,這些隊列在未使用后會保留下來,broker上的隊列數目會無限地增長), 消息也不會緩存了.
    當重啟消費者的時候,它會創建一個新的獨立的隊列,只要我們將其綁定到myExchange上,發布者發送的消息就會緩存到這個隊列上,并被消費者消費.

    更多
    當RabbitMQ第一次啟動的時候,它創建一些預定的交換器. 執行rabbitmqctl list_exchanges命令,我們可以觀察到許多存在的交換器,也包含了我們在本食譜中定義的交換器:
    所有出現在這里的amq.*交換器都是由AMQP brokers預先定義的,它可用來代替你定義你自己的交換器;它們不需要聲明.
    我們可以使用amq.fanout來替換myLastnews.fanout_6, 對于簡單應用程序來說,這是很好的選擇. 但一般來說,應用程序來聲明和使用它們自己的交換器.
    本食譜使用的重載,交換器是非自動刪除的(won't be deleted as soon as the last client detaches it) 和非持久化的(won't survive server restarts). 你可以在http://www.rabbitmq.com/releases/ rabbitmq-java-client/current-javadoc/找到更多的選項和重載.

    使用Direct交換器來路由消息
    要本食譜中,我們將看到如何選擇消費消息子集(部分消息), 只路由那些感涂在的AMQP隊列,以及忽略其它隊列.
    一個典型的使用場景是實現一個聊天器, 在這里每個隊列代表了一個用戶.我們可以查看下面的目錄找到相關的例子:Chapter01/Recipe07/Java_7/src/rmqexample/direct

    我們將展示如何同時實現生產者和消費者.實現生產者,執行下面的步驟:
    1. 聲明一個direct交換器:
    channel.exchangeDeclare(exchangeName, "direct", false,false, null);
    2. 發送一些消息到交換器,使用任意的routingKey 值:
    channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
    要實現消費者,執行下面的步驟:
    1. 聲明同樣的交換器,步驟與上面步驟相同.
    2. 創建一個臨時隊列:
    String myQueue = channel.queueDeclare().getQueue();
    3. 使用bindingKey將隊列綁定到交換器上. 假如你要使用多個binding key,可按需要多次執行這個操作:
    channel.queueBind(myQueue,exchangeName,bindingKey);
    4. 在創建了適當的消費對象后,可以參考消費消息食譜來消費消息.
    如何工作
    在本食譜中,我們使用任意的字符串(也稱為路由鍵)來向direct交換器發布消息(step 2).在fanout交換器中,如果沒有綁定隊列的話,消息是不是存儲的,但在這里,根據在綁定時指定的綁定鍵,消費者可以選擇消息轉發這些隊列(步驟5).
    僅當路由鍵與綁定鍵相同的消息才會被投遞到這些隊列.
    TIP
    過濾操作是由AMQP broker來操作,而不是消費者;路由鍵與綁定鍵不同的消息是不會放置到隊列中的.但是,可允許多個隊列使用相同的綁定鍵,broker會將匹配的消息進行拷貝,并投遞給它們.也允許在同一個隊列/交換綁定上綁定多個不同的綁定鍵,這樣就可以投遞所有相應的消息.
    更多
    假如我們使用指定的路由鍵來將消息投遞到交換器,但在這個指定鍵上卻沒有綁定隊列,那么消息會默默的銷毀.
    然而, 當發生這種情況時,生產者可以檢測這種行為,正如處理未路由消息食譜中描述的一樣.

    使用topic交換器來路由消息
    Direct 和topic 交換器在概念上有點相似,最大的不同點是direct交換器使用精準匹配來選擇消息的目的地,而topic交換器允許使用通配符來進行模式匹配.
    例如, BBC使用使用topic交換器來將新故事路由到恰當的RSS訂閱.
    你可以在這里找到topic交換器的例子:Chapter01/Recipe08/Java_8/src/rmqexample/topic

    如何做
    我們先從生產者開始:
    1. 聲明一個topic交換器:
    channel.exchangeDeclare(exchangeName, "topic", false,false, null);
    2. 使用任意的路由鍵將消息發送到交換器:
    channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
    接下來,消費者:
    1. 聲明相同的交換,如步驟1做的一樣.
    2. 創建一個臨時隊列:
    String myQueue = channel.queueDeclare().getQueue();
    3. 使用綁定鍵將隊列綁定到交換器上,這里也可以包含通配符:
    channel.queueBind(myQueue,exchangeName,bindingKey);
    4. 在創建適當的消費者對象后,可以像消息消息食譜中一樣來消費消息.
    如何工作
    以先前的食譜中,用字符串標記來將消息發送到topic交換器中(步驟2),但對于topic交換器來說,組合多個逗號分隔的單詞也是很重要的;它們會被當作主題消息.例如,在我們的例子中,我們用:
    technology.rabbitmq.ebook
    sport.golf.paper
    sport.tennis.ebook
    要消息這些消息,消費者需要將myQueue綁定到交換器上(步驟5)

    使用topic交換器, 步驟5中指定的訂閱綁定/綁定鍵可以是一系列逗號分隔的單詞或通配符. AMQP通配符只包括:
    1. #: 匹配0或多個單詞
    2. *: 只精確匹配一個單詞
    例如:
    1. #.ebook 和 *.*.ebook 可匹配第一個和第三個發送消息
    2. sport.# and sport.*.* 可匹配第二個和第三個發送消息
    3. # 可匹配任何消息
    在最后一種情況中,topic交換器的行為類似于fanout交換器, 但性能不同,當使用這種形式時性能更高

    更多
    再次說明,如果消息不能投遞到任何隊列,它們會被默默地銷毀.當發生此種情況時,生產者可以檢測這種行為,就如處理未路由消息食譜中描述的一樣.

    保證消息處理
    在這個例子中,我們將展示在消費消息時,我們如何來使用明確的應答.消息在消費者獲取并對broker作出應答前,它會一直存在于隊列中.應答可以是明確的或隱含的.在先前的例子中,我們使用的是隱含應答.為了能實際查看這個例子,你可以運行生產消息食譜中的發布者,然后你運行消費者來獲取消息,可在Chapter01/Recipe09/Java_9/中找到.

    如何做
    為了能保證消費者處理完消息后能應答消息,你可以執行下面的步驟:
    1. 聲明一個隊列:
    channel.queueDeclare(myQueue, true, false, false,null);
    2. 綁定消費者與隊列,并設置basicConsume()方法的autoAck參數為false:
    ActualConsumer consumer = new ActualConsumer(channel);
    boolean autoAck = false; // n.b.
    channel.basicConsume(MyQueue, autoAck, consumer);
    3. 消費消息,并發送應答:
    public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
    String message = new String(body);
    this.getChannel().basicAck(envelope.getDeliveryTag(),false);

    }
    如何工作
    在創建隊列后(步驟1),我們將消費者加入到隊列中,并且定義了應答行為(步驟2).
    參數autoack = false表明RabbitMQ client API會自己來發送明確的應答.
    在我們從隊列收到消息后,我們必須向RabbitMQ發送應答,以表示我們收到到消息并適當地處理了,因此我們調用了channel.basicAck()(步驟3).
    RabbitMQ只有在收到了應答后,才會從隊列中刪除消息.
    TIP
    如果在消費者不發送應答,消費者會繼續接收后面的消息;但是,當你斷開了消費者后,所有的消息仍會保留在隊列中.消息在RabbitMQ收到應答前,都認為沒有被消費.可以注解basicAck()調用來演示這種行為.
    channel.basicAck()方法有兩個參數:
    1. deliveryTag
    2. multiple
    deliveryTag參數是由服務器為消息指定的值,你可以通過使用delivery.getEnvelope().getDeliveryTag()來獲取.
    如果multiple設置為false,client只會應答deliveryTag參數的消息, 否則,client會應答此消息之前的所有消息. 通過向RabbitMQ應答一組消息而不是單個消息,此標志允許我們優化消費消息.
    TIP
    消息只能應答一次,如果對同一個消息應答了多次,方法會拋出preconditionfailed 異常.
    調用channel.basicAck(0,true),則所有未應答的消息都會得到應答,0 代表所有消息.此外,調用channel.basicAck(0,false) 會引發異常.

    更多
    下面的章節,我們還會討論basicReject()方法,此方法是RabbitMQ擴展,它允許更好的靈活性.

    也可參考
    分發消息到多個消費者食譜是一個更好解釋明確應答真實例子.

    分發消息到多個消費者
    在這個例子中,我們將展示如何來創建一個動態負責均衡器,以及如何將消息分發到多個消費者.我們將創建一個文件下載器.
    你可在Chapter01/Recipe10/Java_10/找到源碼.

    如何做
    為了能讓兩個以上的RabbitMQ clients能盡可能的負載均衡來消費消息,你必須遵循下面的步驟:
    1. 聲明一個命令隊列, 并按下面這樣指定basicQos:
    channel.queueDeclare(myQueue, false, false, false,null);
    channel.basicQos(1);
    2. 使用明確應答來綁定一個消費者:
    channel.basicConsume(myQueue, false, consumer);
    3. 使用channel.basicPublish()來發送一個或多個消息.
    4. 運行兩個或多個消費者.


    如何工作
    發布者發送了一條帶下載地址的消息:
    String messageUrlToDownload="http://www.rabbitmq.com/releases/rabbitmq-dotnetclient/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf";
    channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());
    消費者獲取到了這個消息:
    System.out.println("Url to download:" + messageURL);
    downloadUrl(messageURL);
    一旦下載完成,消費者將向broker發送應答,并開始準備下載下一個:
    getChannel().basicAck(envelope.getDeliveryTag(),false);
    System.out.println("Ack sent!");
    System.out.println("Wait for the next download...");
    消費者按塊的方式能獲取消息,但實際上,當消費者發送應答時,消息就會從隊列中刪除,在先前的食譜中,我們已經看過這種情況了.
    另一個方面,在本食譜中使用了多個消費才,第一個會預先提取消息,其它后啟動的消費者在隊列中找不到任何可用的消息.為了平等地發分發消息,我們需要使用channel.basicQos(1)來指定一次只預先提取一個消息.

    也可參考
    在Chapter 8Performance Tuning for RabbitMQ中可以找到更多負載均衡的信息.
    使用消息屬性
    在這個例子中,我們將展示如何AMQP消息是如何分解的,以及如何使用消息屬性.
    你可在Chapter01/Recipe11/Java_11/找到源碼.

    如何做
    要訪問消息屬性,你必須執行下面的步驟:
    1. 聲明一個隊列:
    channel.queueDeclare(MyQueue, false, false, false,null);
    2. 創建一個BasicProperties類:
    Map<String,Object>headerMap = new HashMap<String,Object>();
    headerMap.put("key1", "value1");
    headerMap.put("key2", new Integer(50) );
    headerMap.put("key3", new Boolean(false));
    headerMap.put("key4", "value4");
    BasicProperties messageProperties = new BasicProperties.Builder()
    .timestamp(new Date())
    .contentType("text/plain")
    .userId("guest")
    .appId("app id: 20")
    .deliveryMode(1)
    .priority(1)
    .headers(headerMap)
    .clusterId("cluster id: 1")
    .build();

    3. 使用消息屬性來發布消息:
    channel.basicPublish("",myQueue,messageProperties,message.getBytes())
    4. 消費消息并打印屬性:
    System.out.println("Property:" + properties.toString());

    如何工作
    AMQP 消息(也稱為內容)被分成了兩部分:
    1. 內容頭
    2. 內容體(先前例子我們已經看到過了)
    在步驟2中,我們使用BasicProperties創建一個內容頭:
    Map<String,Object>headerMap = new HashMap<String, Object>();
    BasicProperties messageProperties = new BasicProperties.Builder()
    .timestamp(new Date())
    .userId("guest")
    .deliveryMode(1)
    .priority(1)
    .headers(headerMap)
    .build();
    在這個對象中,我們設置了下面的屬性:
    1. timestamp: 消息時間戳.
    2. userId: 哪個用戶發送的消息(默認是"guest"). 在下面的章節中,我們將了解用戶管理.
    3. deliveryMode: 如果設置為1,則消息是非持久化的, 如果設置為2,則消息是持久化的(你可以參考食譜連接broker).
    4. priority: 用于定義消息的優先級,其值可以是0到9.
    5. headers: 一個HashMap<String, Object>頭,你可以在其中自由地定義字段.
    TIP
    RabbitMQ BasicProperties 類是一個AMQP內容頭實現.BasicProperties的屬性可通過BasicProperties.Builder()構建.頭準備好了,我們可使用
    channel.basicPublish("",myQueue, messageProperties,message.getBytes())來發送消息,在這里,messageProperties是消息頭,message是消息體.
    在步驟中,消費者獲得了一個消息:
    public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
    System.out.println("***********message header****************");
    System.out.println("Message sent at:"+ properties.getTimestamp());
    System.out.println("Message sent by user:"+ properties.getUserId());
    System.out.println("Message sent by App:"+properties.getAppId());
    System.out.println("all properties :" + properties.toString());
    System.out.println("**********message body**************");
    String message = new String(body);
    System.out.println("Message Body:"+message);
    }
    參數properties包含了消息頭,body包含了消息體.

    更多
    使用消息屬性可以優化性能.將審計信息或日志信息寫入body,通常是一種典型的錯誤,因為消費者需要解析body來獲取它們.
    body 消息只可以包含應用程序數據(如,一個Book class),而消息屬性可以持有消息機制相關或其它實現細節的相關信息.
    例如 ,如果消費者想知道消息是何時發送的,那么你可以使用timestamp屬性, 或者消費者需要根據一個定制標記來區分消息,你可以將它們放入header HashMap屬性中.

    也可參考
    MessageProperties類對于標準情況,包含了一些預先構建的BasicProperties類. 可查看http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/
    MessageProperties.html
    在這個例子中,我們只是使用了一些屬性,你可在http://www.rabbitmq.com/releases//rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/AMQP.BasicProperties.html獲取更多信息.

    消息事務
    在本例中,我們將討論如何使用channel事務. 在生產消息食譜中,我們已經了解了如何來使用持久化消息,但如果broker不能將消息寫入磁盤的話,那么你就會丟失消息.使用AQMP事務,你可以確保消息不會丟失.
    你可在Chapter01/Recipe12/Java_12/找到相關源碼.

    如何做
    通過下面的步驟,你可以使用事務性消息:
    1. 創建持久化隊列
    channel.queueDeclare(myQueue, true, false, false, null);
    2. 設置channel為事務模式:
    channel.txSelect();
    3. 發送消息到隊列,然后提交操作:
    channel.basicPublish("", myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    channel.txCommit();



    如何工作
    在創建了持久化隊列后(step 1),我們將channel設置成了事務模式,使用的方法是txSelect() (step 2). 使用 txCommit()確保消息存儲在隊列并寫入磁盤,然后消息將投遞給消費者.txCommit() 或txRollback()之前,必須至少調用一次txSelect().
    在一個DBMS中,你可以使用回滾方法.在下面的情況下,消息不會被存儲或投遞:
    channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
    channel.txRollback();

    更多
    事務會降低應用程序的性能,因為broker不會緩存消息,且tx操作是同步的.

    也可參考
    在后面的章節中,我們會討論發布確認插件,這是一種較快確認操作的方式.

    處理未路由消息
    在這個例子中,我們將展示如何管理未路由的消息. 未路由消息指的是沒有目的地的消息.如,一個消息發送到了無任何綁定隊列的交換器上.
    未路由消息不同于死消息 ,前者是發送到無任何隊列目的地的交換器上,而后者指的是消息到達了隊列,但由于消費者的決策,過期TTL,或者超過隊列長度限制而被拒絕的消息 你可以在Chapter01/Recipe13/Java_13/找到源碼.

    如何做
    為了處理未路由的消息,你需要執行下面的操作:
    1. 第一步實現ReturnListener接口:
    public class HandlingReturnListener implements ReturnListener
    @Override
    public void handleReturn…
    2. 將HandlingReturnListener類添加到channel.addReturnListener():
    channel.addReturnListener(new HandlingReturnListener());
    3. 然后創建一個交換機:
    channel.exchangeDeclare(myExchange, "direct", false, false,null);

    4. 最后發布一個強制消息到交換器:
    boolean isMandatory = true;
    channel.basicPublish(myExchange, "",isMandatory, null,message.getBytes());

    如何工作
    當我們運行發布者的時候,發送到myExchange的消息因為沒有綁定任何隊列不會到達任何目的地.但這些消息不會,它們會被重定向到一個內部隊列. .HandlingReturnListener類會使用handleReturn()來處理這些消息.ReturnListener類綁定到了一個發布者channel上, 且它會獵捕那些不能路由的消息
    在源碼示例中,你可以找到消費者,你也可以一起運行生產者和消費者,然后再停止消費者.

    更多
    如果沒有設置channel ReturnListener, 未路由的消息只是被broker默默的拋棄.在這種情況下,你必須注意未路由消息,將mandatory 標記設置為true是相當重要的,如果為false,未路由的消息也會被拋棄.
    posted on 2016-06-03 23:22 胡小軍 閱讀(2601) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 在线观看www日本免费网站| 全黄性性激高免费视频| 性做久久久久免费观看| 亚洲国产一区国产亚洲| 国产亚洲精品VA片在线播放| a级成人毛片免费图片| 四虎永久成人免费影院域名| 亚洲av日韩av永久在线观看| 1000部啪啪毛片免费看| 成人精品国产亚洲欧洲| 国产美女无遮挡免费视频网站 | 国产免费一区二区三区| 国产综合亚洲专区在线| 亚洲男同gay片| 四色在线精品免费观看| 亚洲精品在线免费观看视频| 麻豆成人久久精品二区三区免费| 亚洲国产精品成人久久| 国产午夜精品理论片免费观看| 亚洲无线码在线一区观看| 国产在线观看xxxx免费| 亚洲精品天堂在线观看| 亚洲国产一区国产亚洲| 精品国产香蕉伊思人在线在线亚洲一区二区| 免费的黄色的网站| 亚洲七七久久精品中文国产| 特级毛片aaaa级毛片免费| 亚洲热线99精品视频| 国产美女无遮挡免费视频 | 最新仑乱免费视频| 亚洲国产美女精品久久久| 国产yw855.c免费视频| 18禁黄网站禁片免费观看不卡| 国产精品1024在线永久免费| 亚洲精品国产精品乱码视色| 免费A级毛片无码专区| a免费毛片在线播放| MM1313亚洲国产精品| 精品亚洲成在人线AV无码| 国产亚洲精品免费| 久久久久国色AV免费观看性色|