本章我們將覆蓋:
- 如何使用消息過期
- 如何使指定隊列上的消息過期
- 如何讓隊列過期
- 管理駁回的(rejected)或過期的消息
- 理解其它備用交換器擴展
- 理解有效user-ID擴展
- 通知隊列消息者失敗
- 理解交換器到交換器擴展
- 在消息中嵌入消息目的地
介紹
在本章中,我們將展示關于RabbitMQ擴展上的一些食譜.這些擴展不是AMQP 0-9-1標準的一部分,使用它們會破壞其它AMQPbroker的兼容性。
另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優化問題的有效解決方案。
本章中的例子將更為真實,例如,配置參數,如列表和交換器, 以及路由鍵名稱將定義在Constants接口中。事實上,一個真正的應用程序會遵循這樣的準則從配置文件中讀取配置文件,以在不同應用程序中共享。
然而,在下面的例子中,為了更簡短和較好的可讀性,我們并沒有指定Constants的命名空間。
如何讓消息過期
在本食譜中,我們將展示如何讓消息過期.食譜的資源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
- Producer.java
- Consumer.java
- GetOne.java
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
本示例的核心是Producer.java文件.為了產生在給定生存時間(TTL)后過期的消息,我們需要執行下面的步驟:
1. 創建或聲明一個用來發送消息的交換器, 并將其綁定到隊列上,就像第1章使用AMQP看到的一樣:
channel.exchangeDeclare(exchange, "direct", false);
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, routingKey);
2. 像下面這樣初始化可選消息屬性TTL:
BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
3. 使用下面的代碼來發布消息:
channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
如何工作
在這個例子中,生產者創建了一個交換器,一個命名隊列,并將它們進行了綁定,當隊列上沒有附著任何消費者,過期消息就顯得非常有意義了。
設置過期時間TTL (以毫秒設置),會促使RabbitMQ在消息過期時,如果消息沒有被客戶端及時消費,立即刪除消息.
在我們的例子中,我們假設應用程序發布了JVM資源統計信息到給定隊列,如果存在消費者,那么會像平時一樣,獲取到實時數據,反之,如果不存在這樣的消費者,那么消息會給定生存時間后立即過期。通過這種方式,可以避免我們收集大量的數據。一旦消息者綁定到了隊列中,它會得到先前的消息(未過期)。進一步的試驗,你可以用GetOne.java文件來替換Consumer.java文件運行.
在調用 channel.basicGet() 時,會使你一次只能消費一個消息。
TIP
可使用channel.basicGet()方法來檢查未消費消息的隊列.也可以通過為第二參數傳遞false來調用,即autoAck標志.
在這里我們可以通過調用rabbitmqctl list_queues來監控RabbitMQ隊列的狀態。
也可參考
默認情況下,過期消息會丟失,但它們可以路由到其它地方。可參考管理拒絕消息或過期消息食譜來了解更多信息.
如何讓指定隊列上的消息過期
在本食譜中,我們將展示指定消息TTL的第二種方式.這次,我們不再通過消息屬性來指定,而是通過緩存消息的隊列來進行指定。在這種情況下,生產者只是簡單地發布消息到交換器中,因此,在交換器上綁定標準隊列和過期消息隊列是可行的。
要在這方面進行備注,須存在一個創建自定義的隊列的消費者。生產者是相當標準的.
像前面的食譜一樣,你可以在Chapter02/Recipe02/Java/src/rmqexample找到這三個源碼。
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做現在我們將展示創建特定消息TTL隊列的必要步驟。在我們的例子中,需要在Consumer.java文件中執行下面的步驟:
1. 按下面來聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 創建或聲明隊列,像下在這樣為x-message-ttl可選參數指定10,000毫秒的超時時間:
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queue, false, false, false, arguments);
3. 綁定隊列到交換器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
在這個例子中,為了最終分析,我們再次假設生產者發送了JVM統計數據給RabbitMQ。最終因為Producer.java文件將其發到一個交換機,如果無消費者連接的話,消息最終會丟失。
想要監控或分析這些統計數據的消費有下面三種選擇:
- 綁定到一個臨時隊列,即調用無參的channel.queueDeclare()方法
- 綁定到一個非自動刪除的命名隊列
- 綁定到一個非自動刪除的命名隊列,并且指定x-message-ttl ,如步驟2中展示的一樣.
在第一種情況中,消費者將獲取實時統計數據,但當它掉線期間,它將不能在數據上執行分析。
在第二種情況中,為了能讓它掉線期間,能獲取到發送的消息,可以使用一個命名隊列(最終是持久化的).但在掉線較長時間后,再重啟時,它將有巨大的backlog來進行恢復,因此在隊列中可能存在大部分舊消息的垃圾。
在第三種情況中,舊消息垃圾會通過RabbitMQ自己來執行,以使我們從消費者和broker中獲益。
更多
當設置per-queue TTL, 就像本食譜中看到的一樣,只要未到超時時間,消息就不會被丟棄,此時消費者還可以嘗試消費它們。
當使用queue TTL時, 這里有一個細微的變化,但使用per-message TTL時,在broker隊列中可能會存在過期消息.
在這種情況下,這些過期消息仍然會占據資源(內存),同時broker統計數據中仍然會計數,直到它們不會到隊列頭部時。
也中參考
在這種情況下,過期消息也會恢復。參考管理駁回或過期消息食譜.
如何讓隊列過期
在第三種情況中,TTL不關聯任何消息,只關聯對列。這種情況對于服務器重啟和更新,是一個完美的選擇。一旦TTL超時,在最后一個消費者停止消費后,RabbitMQ會丟棄隊列.
前面TTL相關食譜,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java , Consumer.java ,and GetOne.java 相關文件。
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
在前面的例子中,擴展只需要關注Consumer.java :
1. 使用下面的代碼來創建或聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 創建或聲明隊列,并為x-expires可選參數指定30,000毫秒的超時時間:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("x-expires", 30000);
channel.queueDeclare(queue, false, false, false,arguments);
3. 將隊列綁定到交換器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
當我們運行Consumer.java或 GetOne.java 文件的時候, 超時隊列已經創建好了,在消費者附著到隊列上或調用channel.basicGet()時,它將持續存在.
只有當我們停止這兩個操作超過30秒時,隊列才會被刪除,并且隊列包含的消息也會清除。
TIP
無論生產者是否向其發送了消息,隊列事實上都是獨立刪除的。
在這個試驗課程中,我們可通過 rabbitmqctl list_queues 命令來監控RabbitMQ 隊列狀態.
因此,我們可以想像一種場景,有一個統計分析程序需要重啟來更新其代碼。由于命名隊列有較長的超時時間,因此重啟時,不會丟失任何消息。如果我們停止,隊列會在超過TTL后被刪除,無價值的消息將不再存儲。
管理駁回或過期消息
在這個例子中,我們將展示如何使用死信交換器來管理過期或駁回的消息. 死信交換器是一種正常的交換器,死消息會在這里重定向,如果沒有指定,死消息會被broker丟棄。
你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源碼文件:
- Producer.java
- Consumer.java
要嘗試過期消息,你可以使用第一個代碼來發送帶TTL的消息,就如如何使指定隊列上消息過期食譜中描述的一樣.
一旦啟動了,消費者不允許消息過期,但可以可以駁回消息,最終導致成為死消息。
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
下面的步驟展示了使用死信交換器來管理過期或駁回消息:
1. 創建一個工作交換品節和死信交換器:
channel.exchangeDeclare(Constants.exchange, "direct", false);
channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
2. 創建使用使用死信交換器和 x-message-ttle參數的隊列:
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange",exchange_dead_letter);
channel.queueDeclare(queue, false, false, false,arguments);
3. 然后像下面這樣綁定隊列:
channel.queueBind(queue, exchange, "");
4. 最后使用channel.basicPublish()來向交換器發送消息 .
5. 要嘗試駁回消息,我們需要配置一個消費者,就像前面例子中看到的一樣,并使用下面的代碼來駁回消息:
basicReject(envelope.getDeliveryTag(), false);
如何工作
我們先從第一個場景開始(單獨使用producer): the expired messages. 在步驟中,我們創建兩個交換器,工作交換器和死信交換器。在步驟2中,我們使用下面兩個可選參數來創建隊列:
- 使用arguments.put("x-message-ttl", 10000)來設置消息TTL ,正如如何使指定隊列上消息過期食譜中描述的一樣.
- 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)來設置死信交換器名稱;
正如你所看到的,我們只是在配置中添加了可選的隊列參數。因此,當生產者發送消息到交換器時,它會隊列參數來路由。消息會在10秒后過期,之后它會重定向到exchange_dead_letter
TIP
死信交換器是一個標準的交換器,因此你可以基于任何目的來使用.
對于第二種場景,食譜的消費者會駁回消息.當消費者得到消息后, 它會使用basicReject()方法來發回一個否定應答(nack),當broker收到nack時,它會將消息重定向到exchange_dead_letter. 通過在死信交換器上綁定隊列,你可以管理這些消息。
當消息重定向到死信隊列時,broker會修改header消息,并在x-dead鍵中增加下面的值:
- reason : 表示隊列是否過期的或駁回的(requeue =false )
- queue : 表示隊列源,例如stat_queue_02/05
- time : 表示消息變為死信的日期和時間
- exchange : 表示交換器,如monitor_exchange_02/05
- routing-keys : 表示發送消息時原先使用的路由鍵
要在實踐中查看這些值,你可使用GetOneDeadLetterQ 類.這可以創建queue_dead_letter隊列并會綁定到exchange_dead_letter
更多
你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")來指定死信路由鍵 ,它將會代替原來的路由鍵.這也就意味著你可以用不同的路由鍵來將不同消息路由到同一個隊列中。相當棒。
理解交替交換器擴展
目前,在第1章使用 AMQP中我們已經展示了如何來處理未路由消息(消息發布到了交換器,但未能達到隊列). AMQP讓生產者通過此條件進行應答,并最終決定是否有需要再次將消息分發到不同的目的地。通過這種擴展,我們可在broker中指定一個交替交換器來路由消息,而并不會對生產者造成更多的干預,本食譜的代碼在Chapter02/Recipe05/Java/src/rmqexample .
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
在本食譜中,我們會在Producer.java中聲明交替交換器.
1. 將交換器的名字(無目的地路由消息)-alternateExchange ,放到可選參數map的"alternate-exchange"中,如下所示:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange", alternateExchange);
2. 通過傳遞arguments map來聲明交換器來發送消息:
channel.exchangeDeclare(exchange, "direct", false, false,arguments);
3. 聲明alternateExchange自身(已經在步驟1中指定了),如下所示:
channel.exchangeDeclare(alternateExchange, "direct",false);
4. 聲明標準化持久化隊列,并使用路由鍵alertRK將其綁定到alternateExchange交換器中:
channel.queueDeclare(missingAlertQueue, true, false, false,null);
channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
如何工作
在這個例子中,我們再次使用了生成統計數據的producer,正如先前的例子一樣.但這次,我們添加了路由鍵來讓producer指定一個重要的級別,名為infoRK或alertRK (在例子中是隨機分配的).如果你運行一個producer以及至少一個consumer,將不會丟失任何消息,并且一切都會正常工作.
TIP
Consumers在交換器和隊列的聲明中,必須傳遞相同的可選參數,否則會拋出異常。
但如果沒有消費者監聽的話,而我們不想丟失報警的話,這就是為什么必須選擇讓producer創建alternateExchange (步驟3)并將其綁定到持久隊列-missingAlertQueue的原因 (步驟4).
在單獨運行producer的時候,你將看到報警存儲在這里.alternate交換器讓我們在不丟失消息的情況下可以路由消息.你可通過調用rabbitmqctllist_queues或運行CheckAlerts.java來檢查狀態 .
最后的代碼讓我們可以查看隊列的內容和第一個消息,但不會進行消費。完成這種行為是簡單的,它足可以避免這種事實:RabbitMQ client發送了ack,消息未消費,而只是進行監控。
現在,如果我們再次運行Consumer.java文件,它會從missingAlertQueue隊列中獲取并消費消息.這不是自動的,我們可以選擇性地從此隊列中獲取消息。
通過創建第二個消費者實例( missingAlertConsumer ) 并使用相同的代碼來從兩個不同隊列消費消息就可以完成這種效果。如果在處理實時消息時,想要得到不同的行為,那么我們可以創建一個不同的消費者。
更多
在這個例子中,步驟3和步驟4是可選的。 當定義交換器時,可為交替交換器指定名稱,對于其是否存在或是否綁定到任何隊列上,并不作強制要求 。如果交替交換器不存在,生產者可通過在丟失消息上設置mandatory標志來得到應答,就如在第1章中處理未路由消息食譜中看到的一樣。
甚至有可能出現另一種交換器-它自己的備用交換器,備用交換器可以是鏈式的,并且無目的地消息在按序地重試,直到找到一個目的地。
如果在交換器鏈的末尾仍然沒有找到目的地,消息將會丟失,生產者可通過調設置mandatory 標志和指定一個合適的ReturnListener參數得到通知。
理解經過驗證的user-ID擴展
依據AMQP, 當消費者得到消息時,它是不知道發送者信息的。一般說來,消費者不應該關心是誰生產的消息,對于生產者-消費者解藕來說是相當有利的。然而,有時出于認證需要,為了達到此目的,RabbitMQ 提供了有效的user-ID擴展。
在本例中,我們使用有效user-IDs模擬了訂單。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源碼.
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
完成下面的步驟,以使用經過驗證的user IDs來模擬訂單:
1. 像下面一樣聲明或使用持久化隊列:
channel.queueDeclare(queue, true, false, false, null);
2.發送消息時,使用BasicProperties對象,在消息頭中指定一個user ID:
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest");
channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
3. 消費者獲取到訂單后,可像下面這樣打印訂單數據和消息頭:
System.out.println("The message has been placed by "+properties.getUserId());
如何工作
當設置了user-ID時,RabbitMQ 會檢查是否是同一個用戶打開的連接。在這個例子中,用戶是guest ,即RabbitMQ默認用戶.
通過調用properties.getUserId() 方法,消費者可以訪問發送者user ID。如果你想在步驟2中設置非當前用戶的userId,channel.basicPublish()會拋出異常.
TIP
如果不使用user-ID屬性,用戶將是非驗證的,properties.getUserId()方法會返回null.
也可參考
要更好的理解這個例子,你應該知道用戶和虛擬機管理,這部分內容將在下個章節中講解。在下個章節中,我們將了解如何通過在應用程序中使用SSL來提高程序的安全性。只使用user-ID屬性,我們可保證用戶已認證,但所有信息都是未加密的,因此很容易暴露。
根據AMQP標準,消費者不會得到隊列刪除的通知。一個正在刪除隊列上等待消息的消費者不會收到任何錯誤信息,并會無限期地等待。然而,RabbitMQ client提供了一種擴展來讓消息收到一個cancel參數-即消費者cancel通知。我們馬上就會看到這個例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代碼.
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
如何做
為了能讓擴展工作,你只需要執行下面的步驟:
1.在自定義的消費者中覆蓋handleCancel()方法,可繼承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
public void handleCancel(String consumerTag) throws IOException {
...
}
如何工作
在我們的例子中,我們選擇實現一個消費者,這個消費者只在生產者是持久化的,且隊列是由生產者創建的情況下才能工作。
因此,如果隊列是非持久化的,Consumer.java文件會立即錯誤退出. 此行為可以通過調用channel.queueDeclarePassive()來完成 .
Producer.java類在其啟動時會創建隊列,并在其關閉時調用channel.queueDelete()方法刪除隊列,如果當隊列關閉時,而消費者正在消費隊列,那么RabbitMQ client會調用步驟1中覆蓋的handleCancel()方法來立即通知消費者。
相對于顯示調用channel.basicCancel() 消費者使用handleCancel()方法可以任意理由來退出。只有在這種情況下,RabbitMQ client library會調用Consumer接口的方法: handleCancelOK()
更多
RabbitMQ client library 支持并聲明了這種特性。
也可參考
在集群中,如果一個節點失效了,也會發生同樣的事情:client在隊列刪除后仍然得不到通知,除非它定義了覆蓋了自己的handleCancel()方法。關于這點的更多信息,可參考Chapter 6,開發可伸縮性應用程序。
理解交換器到交換器擴展
默認情況下,AMQP支持交換器到隊列,但不支持交換器到交換器綁定。在本例中,我們將展示如何使用RabbitMQ 交換機到交換機擴展.
在本例中,我們將合并來自兩個不同交換器的消息到第三個交換器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源碼.
準備
為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣,并像廣播消息食譜中來運行生產者以及使用topic交換器來處理消息路由。
如何做
完成下面的步驟來使用RabbitMQ 交換器到交換器擴展:
1. 使用下面的代碼來聲明我們需要追蹤消息的交換器:
channel.exchangeDeclare(exchange, "topic", false);
2. 使用exchangeBind()來綁定其它例子中的交換器 :
channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
3. 啟動追蹤消費者:
TraceConsumer consumer = new TraceConsumer(channel);
String consumerTag = channel.basicConsume(myqueue, false,consumer);
如何工作
在步驟1中,我們創建了一個新的交換器,在步驟2中我們綁定到了下面的交換器:
- ref_exchange_c1_6 (廣播消息) 與exchange綁定.
- ref_exchange_c1_8 (使用topic來處理消息路由)與exchange綁定 .
在步驟3中, 消費者可以綁定一個隊列到exchange上以任意地獲取所有消息.
交換器到交換器擴展的工作方式與交換器到隊列綁定過程類似,你也可以指定一個路由鍵來過濾消息.在步驟2中,我們可以使用#(匹配所有消息)來作為路由鍵。通過改變路由鍵你可以使用制作一個filter!
在消息中內嵌消息目的地
在本例子中,我們會展示如何發送單個發布帶路由鍵的的消息.標準AMQP不提供此特性,但幸運的是,RabbitMQ使用消息屬性header提供了此特性. 這種擴展稱為sender-selected分發.
此擴展的行為類似于電子郵件邏輯.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).這也是為什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
- Producer.java
- Consumer.java
- StatsConsumer.java
- CCStatsConsumer.java
- BCCStatsConsumer.java
準備
To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
如何做
完成下面的步驟來使用單個發布帶路由鍵的的消息:
1. 使用下面的代碼來創建或聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 在消息的header屬性中指定CC , BCC路由鍵:
List<String> ccList = new ArrayList<String>();
ccList.add(backup_alert_routing_key);
headerMap.put("CC", ccList);
List<String> ccList = new ArrayList<String>();
bccList.add(send_alert_routing_key);
headerMap.put("BCC", bccList);
BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
3. 使用下面的三個路由鍵來綁定三個隊列three queues to the exchange using the following three routing keys:
channel.queueBind(myqueue,exchange, alert_routing_key);
channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
4. 使用三個消費者來消費消息
如何工作
當生產者使用CC和BCC消息屬性來發送消息時,broker會在所有路由鍵的隊列上拷貝消息 。在本例中,stat類會直接使用路由鍵alert_routing_key來向交換器發送消息,同時它也會將消息拷貝到使用CC和BCC參數信息來將消息拷貝到myqueueCC_BK,myqueueBCC_SA隊列中。
當像e-mails一樣發生時,在分發消息到隊列前,BCC信息會被broker從消息頭中刪除,你可查看所有我們示例消費者的輸出來觀察這種行為。
更多
正常情況下,AMQP不會改變消息頭,但BCC擴展是例外。這種擴展可減少發往broker的消息數目。沒有此擴展,生產者只能使用不同的路由鍵來發送多個消息的拷貝。
posted on 2016-06-05 19:51
胡小軍 閱讀(1338)
評論(0) 編輯 收藏 所屬分類:
RabbitMQ