在本章中我們將覆蓋涉及:- 鏡像隊列
- 同步隊列
- 優化鏡像策略
- 在幾個broker之間分發消息
- 創建一個地理位置集群復制
- 過濾和轉發消息
- 將高可用技術結合在一起
- 客戶端高可用性
介紹
RabbitMQ通過數據復制來達到高可用,當數據完整性、服務連續性是最重要的時候, 這一點與存儲(如:RAID解決方案),數據庫,以及所有IT基礎設施解決方案是相同的。
事實上,這些解決方案不僅可以避免數據丟失,也可以避免計劃維護和系統故障時的停機時間。
我們將看到RabbitMQ中簡單而高效的鏡像隊列解決方案. 通過本食譜,我們將看到多個不同用例,以及接近最小化性能優化(處理高可用時,總會付出點成本。
然后 ,我們將看到如何實施地理位置復制,這種方案適用于
應用程序對QoS要求較高,尤其是整個網站掉線時(如, 由于網絡問題,應急電源問題,自然災害,或人為錯誤),也要保證可用性.
這種方案對于云計算資源也給出了建議.如,當使用Amazon Web Services (AWS)時,我們強烈建議在不同可用區域上來分配應用程序(
http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
由于不同區域之間的連接有較高延遲, 跨不同可用區域來創建集群是不可取的,但每個區域創建一個集群,然后像創建一個地理位置復制食譜中介紹的來復制數據是推薦的。
無論簡單與否,對于鏡像隊列或地理位置復制集群來說,客戶端只有在收到相應的回復通知(使用發布確認ack消息或使用AMQP標準的事務機制)時,它才會假設消息已成功分發,并擔保消息已經抵達鏡像隊列 .
TIP
如果你正在處理一個地理集群復制,客戶端發布消息后,在消息在多個節點上復制和在本地建立鏡像隊列之后,RabbitMQ將發送ack消息發回給客戶端. 地理消息的復制是異步的。 發布在本地集群的消息已經被認為是安全的,通常異步復制到遠程集群對于災難恢復是可容忍的.
在客戶端高可用食譜中,我們將看到如何正確地實現一個客戶端來處理連接高可用集群的情況。
鏡像隊列
RabbitMQ集群默認無鏡像隊列.隊列存儲在客戶端連接并創建它們的broker節點中。無論何時,如果這個節點發生了故障,所有存儲在此節點中的隊列和消息將變得不可用。
如果你將隊列和消息定義為持久化的,那么可能在節點恢復時,不會丟失數據,但這光這些是不夠的。
事實上,設計一個高可用程序是不能被接受的.在許多情況下,應用程序必須能夠承受一個組件的死亡,而且程序不能中斷。
ha策略可幫助解決這個問題.在本食譜中,我們將向你展示如何在集群所有節點上反射隊列.
準備
你需要有帶三個節點的集群
如何做
配置鏡像隊列有兩種方法,即使用rabbitmqctl來配置,或使用web管理插件(或它的API).我們將按下面的步驟來展示這兩種方法:
1. 首先,你需要一個RabbitMQ集群,可參考第6章-開發可伸縮應用程序-創建一個簡單集群食譜。
2. 在web管理中,導航到Admin | Policies | Add/ update a policy來配置策略.
3. 在Name字段中輸入mirror-all,在Pattern字段中輸入^mirr\. ,在
Definition中輸入ha-mode和 all.
4. 點擊Add policy,如下面的截圖所示:

5. 或者,也可以通過rabbitmqctl來執行下面的命令:
rabbitmqctl set_policy ha-all "^mirr\." '{"ha-mode":"all"}'
6. 創建一個名為mirr.q_connection_1_1的隊列. (重要的是前輟必須是mirr.; 你可以按你喜歡的來稱呼隊列.)
如何工作
在創建集群之后(步驟1),你需要使用ha策略來定義鏡像隊列的行為(步驟2).
TIP
RabbitMQ 策略是一種鍵值對,它可以用來描述federation plugin, mirrored queues, alternate exchanges, dead lettering, per queue TTLs, and maximum queue length的行為.
mirror-all參數是策略名稱,^mirr\.字符串是正則表達式. 我們指示RabbitMQ來反射所有名稱以mirr.開頭的隊列.
TIP
策略通過正則表達式來定義,這可以允許你創建靈活的,復雜的反射行為 .
最后一個參數是反射模式。通過ha-mode:all,隊列會使用master slave模式來集群所有節點上進行反射(slaves可以是多個). 無論何時,只要匹配給定模式的隊列在某個節點上創建了,就會在其它節點上進行復制,同時,只要有客戶端向它插入了消息,消息也會在所有slaves進行復制。
TIP
最佳實踐是客戶端應該總是連接持有master隊列的節點,只有在它發生故障時,才連接其它節點。
例如,如果我們有三個節點,名為rabbit@rabbitmqc1, rabbit@rabbitmqc2,和rabbit@rabbitmqc3, 如果我們在
rabbitmqc1節點上創建一個前輟為mirr.的隊列(對于這個隊列來說,它是master),那么隊列將在其它兩個節點(
slaves)上進行復制. 要檢查隊列的狀態,可打開web管理控制臺,并點擊隊列的Overview tab ,將會看類似下面的截圖:

如果rabbitmqc1節點宕機或不可用,將會為此隊提拔一個新的master.例如,如果我們調用rabbitmqctl stop_app來執行關閉操作, rabbit@
rabbitmqc3 節點將會被提拔為mirr.q_connection_1_1隊列的新master,如下面的截圖所示:

當我們重啟rabbit@rabbitmqc1, 隊列又會再次反射.多虧了策略,它將會成為slave,并且不再像先前一樣提拔為master:

集群再次成為全反射的了.
更多
在本食譜中,我們已經了解了如何來快速設置鏡像隊列,但對于基于RabbitMQ構建完整的高可用方案還是不夠的.事實上,RabbitMQ 反射配置只是讓broker保留了消息的拷貝,而不造成消息丟失.但是,適當在客戶端處理連接和消息也是很重要的。
在下面的食譜中,我們會了解鏡像隊列的詳細情況
也可參考
正如我們在鏡像隊列食譜中看到的,當配置了鏡像后,消息就會在集群節點間復制。
然而,在任何時候, 一個新節點都可以加入到集群中,并可以開始包含消息的鏡像隊列.那么集群是如何來表示這種存儲消息的呢?
我們假設有一個單獨節點且在其個隊列上存儲了一些消息的典型場景:

現在,如果我們向集群中添加了一個節點并正確地配置了ha策略,第一個節點的隊列變成鏡像隊列,并且后續消息會復制到新添加的節點上,如下所示:

重要的是,當第二個節點加入到集群時,master隊列中已有消息默認是不會復制到第二個節點上的。如果此時master掛掉了,消息將會丟失。
然而,在活(live)隊列的情況下,只要消費者消耗了單個拷貝消息,配置就會變成全部復制,如下截圖所示:

或者,在隊列沒被耗盡的情況下,重要的是要顯式地同步隊列到鏡像,以提高可靠性。(這就是為什么我們是增加鏡像的原因).
這不是默認的行為,因為同步任務會對broker的性能產生重要影響.
當啟動一個同步時,隊列將被卡住,直到這個過程完成. 在這個食譜中,我們將向您展示如何檢查復制狀態和隊列同步。
準備
你需要一個帶鏡像策略的RabbitMQ集群(參考鏡像隊列食譜).
如何做
為了看到未同步隊列的行為,我們將使用以下步驟來手動模擬節點失效的場景:
1. 配置鏡像隊列前輟為mirr.,正如在鏡像隊列中食譜中看到的一樣(我們稱為節點rabbit@rabbitmqc1、rabbit@rabbitmqc2).
2. 創建一個名為mirr.q_connection_1_1的隊列.
3. 從web控制臺來檢查隊列的狀態,你應該可以看到下面的截圖:

4. 你也可以使用 rabbitmqctl list_queues name policyslave_pids. 結果應該看起來像下面這樣:
mirr.q_connection_1_1 ha-all all
<rabbit@rabbitmqc1.2.2844.1>
[<rabbit@rabbitmqc2.2.3363.1>]
running
5. 使用rabbitmqctl stop_app來關閉
rabbit@rabbitmqc2 節點 (實際上它不影響節點).
6. 使用rabbit@rabbitmqc1 節點來向隊列發布非持久化消息node.
7. 使用 rabbitmqctl start_app來重啟
rabbit@rabbitmqc2節點上的應用程序. 然后,像下面截圖來檢查隊列:

8. 點擊Syncronise按扭來同步隊列或使用rabbitmqctl sync_queue mirr.q_connection_1_1來同步:

如何工作
通過步驟1-4創建穩定情況后,我們通過停止rabbit@rabbitmqc2節點模擬了節點故障情形.當我們使用rabbit@rabbitmqc1節點向隊列中發布消息時,消息不是鏡像的,因為我們只運行了一個節點。
當rabbit@rabbitmqc2節點再次運行時(步驟7),其隊列是未同步的,這一點我們已經在介紹章節中看過了.
你可使用web管理插件或rabbitmqctl來同步隊列.通過這種方式,你的消息可在集群節點間復制.
更多
我們已經看了如何手動來同步隊列,但也可通過自動的方式來操作. 當你配置ha策略時,只需要增加ha-sync-mode=automatic的配置項就可以了,如下面的截圖所示:
如果你用兩種策略來模擬故障場景,你可能會看到下面的截圖:

也可參考
在鏡像隊列食譜中,我們已經看到如何在集群節點間反射隊列.在多于兩個節點上復制消息可以改善系統的可利用性,但如果集群因高負載增長,它會對應用程序的性能產生負面影響。在本食譜中,我們將向您展示,為了保證每個隊列都有兩份拷貝,如何將每個隊列分散到兩個節點上:
通過這種方式,每個隊列都有一個master和一個slave.
準備
你需要一個帶鏡像策略的RabbitMQ集群(參考鏡像隊列食譜)
如何做
在本食譜中,我們使用了四臺機器;讓我們看詳細步驟:
1. 創建四個節點,其節點名稱分別為rabbit@rabbitmqc1, rabbit@rabbitmqc2, rabbit@rabbitmqc3, 和rabbit@rabbitmqc4的集群. 下面的截圖顯示了實際集群:

2. 使用下面的代碼來創建ha策略:
"mirr-pair" as name,
"^pair\."as pattern
"ha-mode":"exactly"
"ha-params": 2
"ha-sync-mode":"automatic" as parameters
參考下面的截圖:

3. 創建一個名稱為任意前輟的隊列.你可以直接使用web管理控制臺:

如何工作
在步驟2中,我們將ha-mode配置為exactly. 此參數需要ha-params參數(它表示用于反射隊列的節點個數,在我們的例子中是2個節點).在步驟3中,創建了一個前輟pair的新隊列,它將作為選擇節點的master, 并在另一個節點上得到一個slave隊列。
在真實應用程序中,也就是不連接管理控制臺,我們會從應用程序調用channel.queueDeclare()來創建隊列. 在這種情況下,連接的節點將成為master隊列.
為了能均衡地向集群分配隊列,使用負載均衡器來連接所有節點或讓客戶端以round-robin來連接集群中的所有可用節點是非常重要的,這一點我們已經在客戶端高可用性食譜中看過了。
HA/Mirroring 特性增加了負載,可能會影響性能.
更多
exactly 參數會從集群中自動選擇一個
節點作為slave. 你也可以使用更多的參數節點來選擇鏡像隊列是否應該替換,正如下面截圖中看到的一樣:

也可參考
ha-mirror 插件需要依賴一個集群,這一點,我們已經第6章-開發可伸縮應用程序 看過了,同樣的,它也不能容忍網絡分化問題。
為了能跨WAN復制消息,你可以使用federation插件.此插件不依賴于集群,因此你可在WAN上聯合更多的RabbitMQ實例,即使這此實例擁有不同的Erlang版本.
準備
你需要準備兩個或兩個以上的RabbitMQ節點. 在本例中,我們使用兩臺Linux機器,其RabbitMQ節點名稱分別為rabbit@rabbitmqc1、rabbit@rabbitmqc2.
如何做
必須先啟用federation插件;缺省情況下,它是禁用的.對于這兩臺機器,須執行下面的步驟:
1. 用下面的命令來啟用插件:
rabbitmq-plugins enable rabbitmq_federation
2. 用下面的命令來為插件啟用web管理控制臺:
rabbitmq-plugins enable rabbitmq_federation_management
3. 重啟 RabbitMQ,并使用rabbitmqctl status來檢查插件,如下面的截圖所示:

針對rabbitmqc1機器,執行下面的步驟:
4.也可以通過管理控制臺檢查狀態.在 Admin tab, 在右邊,你可以看到Federation Status,以及Federation Upstream,如下面的截圖所示:

5. 本國federation upstreams. 打開web管理控制臺并導航至Admin | Federation Upstreams | Add a new upstream,然后填寫下面的字段:
‰ Name: first_upstream
‰ URI: amqp://rabbitmqc2

6. 針對rabbitmqc1機器,配置federation policy. 打開管理控制臺,并導航至Admin | Policies | Add / update a policy, 然后填寫下面的字段:
‰ Name: fed_policy
‰ Pattern: ^fed\.
‰ Definition: federation-upstream-set:all
7. 針對rabbitmqc1機器,添加一個前輟為fed.的新交換機,如fed.myfanoutexchange.
8.
針對rabbitmqc1機器, 在web管理控制臺中導航至 Admin | Federation Status來檢查
upstream狀態, 如果一切正常,你應該可以看到下面的截圖:

如何工作
RabbitMQ federation 插件需要遵循步驟1、步驟2來啟用,當然,你也可以使用命令行工具或web管理控制臺來驗證其狀態。
要讓federation工作,你還需要定義一個upstream鏈接(指的是步驟4).
你可以在下游節點,rabbitmqc1上進行定義,并指定其上游節點,rabbitmqc2。
TIP
federation 插件不需要集群,URI配置可使用IP地址,例如, amqp://192.168.0.23. 在這里,不需要使用短主機名稱。
發布到上游節點上的federated交換器上的消息將在相應的交換機上傳播。
TIP
federation嚴格上單向的.向下游節點發布的消息不會被復制到上游。
當在rabbitmqc1節點上使用fed.前輟創建了交換機時,如fed.myfanoutexchange, 交換器也會在rabbitmqc2broker上創建.
消息是以異步的方式來復制的,這不同于鏡像隊列,因此不能保證高可用性.
更多
除了ha-mirror和federation插件,還有一個shovel插件可用來增強應用程序的可靠性. shovel是一種隊列到交換機的機制。通過將集成在broker中的RabbitMQ客戶端作為一個插件,可以消耗來自一個或多個隊列的消息,并將其重定向到其他的本地或遠程代理.
作為一個client,它可用于WAN連接,并能容忍網絡分化情景.
在本食譜中,我們將展示如何簡單地配置來使用插件.
我們將通過WAN連接的方式,在不同broker的兩個隊列之間來發送消息,看起來就像下面這樣:

準備
你需要兩個RabbitMQ實例;其節點名稱分別為rabbitmq@rabbitmqc1、rabbitmq@rabbitmqShovel.
如何做
你需要執行下面的步驟來在rabbitmq@rabbitmqShovel節點上安裝shovel插件:
1. 使用命令 rabbitmq-plugins enable rabbitmq_shovel來啟用插件.
2. 使用 rabbitmq-plugins enable rabbitmq_shovel_management命令來啟用插件的web管理控制臺.
3. 編輯,或在不存在的情況下,創建rabbitmq.config文件,并像下面一樣來增加shovel配置:
[{rabbitmq_shovel,
[{shovels,
[{my_books_shovel,
[
{sources,
[{broker, "amqp://yourrabbitmqip"}]},
{destinations, [ {broker, "amqp://"}]}
, {queue, <<"myBooksQueueCopy">>}
, {prefetch_count, 10}
, {reconnect_delay, 5}
]}
]}
]}
].
你可以使用Chapter07/Recipe05/simple_shovel_rabbitmq.config中的樣例配置文件.
4. 重啟broker.
5. 創建前圖所示的隊列.
6. 通過web管理控制臺來檢查shovel.
如何工作
在這個例子中,插件將從rabbitmq@rabbitmqc1的myBooksQueueCopy隊列來消費消息,并將這些消息發布到 rabbitmq@rabbitmqShovel的myBooksQueueCopy隊列. 這兩個brokers可以是地理位置上完全分離的,因為shovel插件實際上是內嵌RabbitMQ client,它從一個節點消息消費消息,然后又把消息轉發到另一個節點。
本例中的 shovel是隨同RabbitMQ啟動的,它會啟動對源隊列內容的輪詢,甚至是隊列本身無定義時,也會持續地輪詢。
source 和 destination參數是強制配置的.
TIP
如果沒有指定URI(amqp://)的話,插件會使用localhost作為broker IP.
我們創建復制隊列的原因是消息是由插件來消費的.因此myBooksQueue 隊列是由應用程序消費者來消費的,而myBooksQueueCopy是由shovel來消費的.
更多
也可參考
在本食譜中,我們已經了解簡單的消息傳遞.在下面的食譜中,我們將看到如何動態地來綁定shovel.
過濾和轉發消息
在本食譜中,我們將實現一個可選擇性地消息轉發.
我們將讓shovel插件轉發消息的子集到不同的目的地. 一種可能的使用場景是,有三個不同的站點,它們有下面的職責:- 一個用于接受book訂單
- 一個只需要以london為路由鍵的訂單
- 一個只需要以rome為路由鍵的訂單
其目的是在不干涉源broker的情況下,有選擇性地添加或刪除shovels:

準備
你需要準備三個brokers;我們將其命名為rbbitmq@rabbitmqc1, rabbitmq@rabbitmqShovelLondon,rabbitmq@rabbitmqShovelRome.
如何做
在本例中,rabbitmq@rabbitmqc1 broker是shovels將要連接的節點,此插件不是必須的,但我們須為其它broker啟用shovel:
1. 在rabbitmq@rabbitmqShovelLondon和rabbitmq@rabbitmqShovelRome節點上啟用shovel插件,就如在創建地理位置復制食譜中看到的一樣
2. 為rabbitmq@rabbitmqShovelLondon節點創建一個shovel腳本:
{sources, [ {broker, "amqp://rabbitmqc1IP"},
{declarations, [ 'queue.declare'
{routing_key, <<"london">>}
....
, {destinations, [ {broker, "amqp://"}]}
, {prefetch_count, 10}
,{publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key,<<"from_london_order">>} ]}
...
你可在 Chapter07/Recipe06/london_shovel_rabbitmq.config中找到完整的配置.
3. 為rabbitmq@rabbitmqShovelRome節點創建一個shovel腳本:
{sources, [ {broker, "amqp://rabbitmqc1IP"},
{declarations, [ 'queue.declare'
...
{routing_key, <<"rome">>}
...
, {destinations, [ {broker, "amqp://"}]}
, {prefetch_count, 10}
,{publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key,<<"from_rome_order">>} ]}
...
你可在Chapter07/Recipe06/rome_shovel_rabbitmq.config中找到完整的配置.
4. 重啟brokers.
5. 發送兩個消息到myBooksExchange, 一個使用london路由鍵,另一個使用rome路由鍵.
如何工作
步驟2和步驟3中的腳本包含了每個broker的交換機和隊列的聲明.因此只要運行聲明,shovel插件將在遠程節點上(默認)聲明、創建兩個隊列,如下面的截圖所示:

londonorders隊列已由rabbitmq@rabbitmqShovelLondon節點聲明,romeorders隊列已由rabbitmq@rabbitmqShovelRome節點聲明.
腳本同時也在相同的節點上聲明了myBooksExchange交換機,并使用london和rome進行了相應地綁定,如下面的截圖所示:

通過這種方式,當你使用rome路由鍵發布消息到myBooksExchange交換器時,消息將會路由到本地romeorders隊列上,然后會被運行在
rabbitmq@rabbitmqShovelRome節點立即消費。一旦消息到達節點,shovel會將其發布到本地my_exchange topic交換器,最終它會路由到新路由鍵為from_rome_order的my_queue.
你可以看到在下面的截圖:
shovel插件是一個強大的工具,它可以應用于不同的上下文中;如,你可以在單個broker上使用它來創建一個異步隊列鏡像.
也可參考
RabbitMQ 有三種不同的方式來在brokers中分發消息:
- 集群隊列鏡像
- Federation
- Shovel
在本食譜中,我們將向你展示如何結合集群,高可用隊列鏡像, 以及shovel來跨WAN來
從ha-cluster到單個RabbitMQ
節點傳輸消息, 如下面的截圖所示:

準備
你需要準備三個RabbitMQ節點.
如何做
在同一個LAN中需要有兩個節點,第三個節點應該在此LAN外.我們將節點分別命名為rabbit@rabbitmqc1,rabbit@rabbitmqc2, 和rabbit@rabbitmqShovel:
1. 使用rabbit@rabbitmqc1、rabbit@rabbitmqc2來創建集群,我們將其稱為Cluster1.
2.在Cluster1中,在集群中創建一個名為
myBooksExchange的topic交換機.
3.
在Cluster1中, 創建一個名為mirr.orders的ha-queue,并使用"#"路由鍵將其綁定到myBooksExchange.
4. 在rabbit@rabbitmqShovel上, 啟用shovel插件.
5. 在rabbit@rabbitmqShovel上,創建或編輯rabbitmq.config來增加shovel配置:

你可在 Chapter07/Recipe07/rabbitmq.config找到配置文件,并將其直接拷貝到配置文件夾中.
6. 在rabbit@rabbitmqShovel上重啟broker.
如何工作
在步驟1和2中,我們創建一個集群和兩個鏡像隊列。shovel配置(步驟5)創建了一個名為mirr.myshovelqueuecluster1的隊列.
此隊列是兩個RabbitMQ節點的鏡像.在集群中,你至少應該有兩個隊列:

shovel配置中的brokers參數包含節點的集群地址.如果一個節點發生了故障,shovel將連接到其它節點,在這里mirr.myshovelqueuecluster1隊列已經事先存在了,因為它是鏡像的。此配置只會在Cluster1節點發生故障時會丟失數據,其它情況下,是不會丟失數據的。
更多
在使用shovel插件時,它可以創建一些奇怪的拓撲,特別是在一個循環放置把兩個或兩個以上的shovels. 因此你如果想創建一個雙向復制,你應該使用federation插件來防止無止循環.在這種情況下,你可以設置 max-hops參數來達到此目的.
也可參考
客戶端高可用性
即使是RabbitMQ broker提供了許多高可用的服務端選項,但如果連接客戶端沒有采取措施的話,那么高可用性也是無用的:
- Clients必須確保生產的消息已經成功發送給RabbitMQ,并有適當的錯誤處理機制,例如,如果有需要重新發布消息.
- Clients消費消息時必須確保消息不是復制的; 鑒于轉發消息的可能性,消息有可能會重復
- 在消費者端的消息也一樣。這個操作通常被稱為重復數據刪除.
- Clients必須確保它們連接的RabbitMQ節點是健康的. 特別是等待消息的消費者,不能意識到在當服務器卡住了而不能接收消息的情況
- 客戶端應該嘗試連接到任何可用的集群節點,無論是基于最大可靠性還是統一的資源分配.
這些機制在client libraries中不是內置的,但應該按照指南來實現 (
http://www.rabbitmq.com/reliability.html 和
http://www.rabbitmq.com/ha.html#behaviour) 。
在本食譜中我們將展示這些實現.
一如既往,增加可靠性會造成性能的損失:集群配置,隊列鏡像在生產消息和消費消息時都需要檢查,這樣就增加了每個消息的延遲,并降低了最大消息速率。
大多數情況下,為降低在這些條件下的延遲,不需要做什么,但通過增長集群節點數目來以處理更高的信息速率是可能的.在這里,RabbitMQ的伸縮性扮演了這種角色.
我們將介紹如何開發客戶端來從RabbitMQ集群的鏡像隊列上可靠地生產和消費消息。
準備
要運行本食譜,你需要下面的工具:
- 必須有兩個或兩個以上節點的RabbitMQ集群
- 有鏡像隊列食譜中展示的ha-configuration;鏡像隊列須匹配正則表達式^mirr\.
- RabbitMQ Java Client API
如何做
這個例子由兩個Java程序組成:ProducerMain.java和ConsumerMain.java,你可在Chapter09/Recipe08找到源碼。UML圖如下:

上面的截圖展示了需要開發可靠客戶端的步驟.讓我們從生產者和消費者共同的步驟開始:
1. 寫一個通用的方法來試圖打開連接,直到成功連接(打開ReliableClient.java文件):
protected void waitForConnection() throws InterruptedException {
while (true) {
ConnectionFactory factory = new ConnectionFactory();
ArrayList<Address> addresses = new ArrayList<Address>();
for (int i = 0; i<Constants.hosts.length; ++i) {
addresses.add(new Address(Constants.hosts[i],Constants.port));
}
// randomize the order used to try the servers:
// distribute their usage
Collections.shuffle(addresses);
Address[] addrArr=new Address[Constants.hosts.length];
addresses.toArray(addrArr);
try {
connection = factory.newConnection(addrArr);
channel = connection.createChannel();
channel.exchangeDeclare(Constants.exchange, "direct",false);
channel.queueDeclare(Constants.queue,Constants.durableQueue, Constants.exclusiveQueue,Constants.autodeleteQueue, null);
channel.queueBind(Constants.queue,Constants.exchange,Constants.routingKey);
return;
} catch (Exception e) {
e.printStackTrace();
disconnect();
Thread.sleep(1000);
}
}
}
2. 編寫一個disconnect方法(打開ReliableClient.java文件):
protected void disconnect() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
channel = null;
}
if (connection != null && connection.isOpen()) {
connection.close();
connection = null;
}
} catch (IOException e) {
// just ignore
e.printStackTrace();
}
}
然后,讓我們看一下可靠的生產者是如何工作的(打開ReliableProducer.java文件):
3. 繼承ReliableClient類并像下面一樣來覆蓋waitForConnection()方法:
public class ReliableProducer extends ReliableClient {
...
@Override
protected void waitForConnection() throws
InterruptedException {
super.waitForConnection();
try {
channel.confirmSelect();
} catch (IOException e) {
e.printStackTrace();
}
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
ReliableProducer.this.removeItemsUpto(deliveryTag);
} else {
ReliableProducer.this.removeItem(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
ReliableProducer.this.requeueItemsUpto(deliveryTag);
} else {
ReliableProducer.this.requeueItem(deliveryTag);
}
}
});
}
4. 編寫一個方法來讓ReliableProducer類來從其內部dataQueue隊列來發布消息:
protected void publishFromLocalQueue() throws InterruptedException {
try {
for (;;) {
synchronized (dataQueue) {
if (dataQueue.isEmpty()) {
dataQueue.wait(1000);
// if the queue stays empty for more than
// one second, disconnect and
// wait offline
if (dataQueue.isEmpty()) {
System.out.println("disconnected for inactivity");
disconnect();
dataQueue.wait();
waitForConnection();
}
}
}
DataItem item = dataQueue.peek();
BasicProperties messageProperties = newBasicProperties.Builder().messageId(Long.toString(item.getId())).deliveryMode(2).build();
long deliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.queue,messageProperties, item.getData().getBytes());
// only after successfully publishing,
// move the item to the
// container of pending items.
// They will be removed from it only
// upon the
// reception of the confirms from the broker.
synchronized (pendingItems) {
pendingItems.put(deliveryTag, item);
}
dataQueue.remove();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
} catch (IOException e) {
// do nothing: the connection will be closed
// and then retried
}
}
5. 在ProducerMain.java的main()方法中編寫一個循環方法,它將啟動一個后臺線程,它將用來等待一個連接,在local隊列中異步發送消息:
public void startAsynchronousPublisher() {
exService = Executors.newSingleThreadExecutor();
exService.execute(new Runnable() {
@Override
public void run() {
try {
for (;;) {
waitForConnection();
publishFromLocalQueue();
disconnect();
}
} catch (InterruptedException ex) {
// disconnect and exit
disconnect();
}
}
});
}
6. 在ProducerMain.java中編寫調用ReliableProducer的方法,它將在local dataQueue中臨時存儲消息:
public void send(String data) {
synchronized (dataQueue) {
dataQueue.add(data);
dataQueue.notify();
}
}
7. 在這里, dataQueue是線程安全類DataQueue的實例,它包含了一個唯一的索引:
public class DataQueue {
..
public synchronized long add(String data) {
++lastID;
dataQueue.add(new DataItem(data,lastID));
returnlastID;
}
...
}
現在讓我們看一下ReliableConsumer類中所需要的步驟:
8. 在這里,我們也覆蓋了ReliableClient.WaitForConnection()的方法:
public class ReliableConsumer extends ReliableClient {
...
@Override
protected void waitForConnection() throws InterruptedException {
super.waitForConnection();
try {
channel.basicConsume(Constants.queue, false, new Consumer() {
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("got handleCancel signal");
}
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("got handleCancelOk signal");
}
@Override
public void handleConsumeOk(String consumerTag) {
System.out.println("got handleConsumeOK signal");
}
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws IOException {
long messageId =Long.parseLong(properties.getMessageId());
if (worker != null) {
// if the message is not a re-delivery,
// sure it is not aretransmission
if (!envelope.isRedeliver() ||
toBeWorked(messageId)) {
try {
worker.handle(new String(body));
// the message is ack'ed just after it has
// been
// secured (handled, stored in database...)
setAsWorked(messageId);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (WorkerException e) {
// the message worker has reported
// an exception,
// so the message
// cannot be considered to be handled
// properly,so requeue it
channel.basicReject
(envelope.getDeliveryTag(), true);
}
}
}
}
@Override
public void handleRecoverOk(String consumerTag) {
System.out.println("got recoverOK signal");
}
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException cause) {
System.out.println("got shutdown signal");
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
9. 在后臺線程中編寫一個方法來啟動ReliableConsumer的異步消費者:
public void StartAsynchronousConsumer() {
exService = Executors.newSingleThreadExecutor();
exService.execute(new Runnable() {
@Override
public void run() {
try {
for (;;) {
waitForConnection();
synchronized (this) {
this.wait(5000);
}
disconnect();
}
} catch (InterruptedException ex) {
disconnect();
}
}
});
}
10. 讓ReliableConsumer類允許設置一個下面接口的回調:
public interface MessageWorker {
public void handle(String message) throws
WorkerException;
}
11. 代碼將被傳遞到ConsumerMain.java 中,并在每個接收的消息中調用:
reliableConsumer.setWorker(new MessageWorker() {
@Override
public void handle(String message) throws WorkerException
{
System.out.println("received: " + message);
++count;
}
});
如何工作
為了可靠地連接到集群,同時連接多個節點是很重要的。這一點可以顯示地完成,正如本例子中展示的一樣—在這里,是通過客戶端來輪詢不同的節點-或者使用負載均衡器。
TIP
根據情況的不同,通過隨機連接集群中的節點是有用的(這種方案稱為"active-active"配置:master 和 slave 都是活動的), 或最好是master, 或當master不可用時使用slave(這通常稱為active-passive 配置).
然后在生產方,當真正的應用程序發送一個消息時,消息不是實際發送的,而是放在一個臨時隊列中。直到消息到達那里之前,應用程序必須假定該消息尚未被broker接收。
只要后臺循環線程(指的是步驟4)成功發送了消息,它會從隊列中拉取消息,并將其存儲在pendingItems hash map中.即使到了現在,應用程序也不能確保消息已經成功地存儲到了鏡像隊列中,如果節點停機時,消息仍然會丟失.
TIP
正如我們已經指出的,即使我們使用鏡像隊列,我們需要確保我們不會在發布消息時或消費者消息時丟失消息。
ReliableProducer 類只有當其收到來自broker的確認時才能保證消息沒有丟失, 即它可通過channel.addConfirmListener() (步驟3)來檢查.在此時,將會使用delivery tag來從map中刪除.
注意,在本例中,在消息確認超時時,沒有提供重新傳送消息的機制.在真實應用程序中,這需要根據真實情況添加這種功能。
TIP
AMQP 0-9-1 不包含確認機制. 如果需要更嚴格的處理,你需要使用事務, 這通常在數量級上是低效的,因為它們的行為需要保持同步.
我們已經走了一半的路了;我們可以肯定的是,該消息處在一個hacluster的鏡像隊列中。真 的!但是,現在我們需要保證,在消費信息時,我們不會失去它。這還不夠,我們還需要確保,對于相同的消息,我們不會消費多次。
實際上,消費者的連接方法也啟動了一個后臺消費者,這是通過調用channel.basicConsume()來實現的,同時也實現了handleDelivery()回調(步驟8).
正如第1章節,使用AMQP中介紹的一樣,如果回調中的一切都是正常的,那么消息就收到了.用戶回調worker.handle()將被調用,且ack消息會發回到broker,此時,如果消息被正常消費的話,就會被刪除.
但如果用戶回調拋出WorkerException,客戶端拒絕了消息. 再次回到這個點,例子中的連接是打開的,這種情況下的消息會在同一個隊列上重新排隊(channel.basicReject()的第二個參數設置為true), 但也有可能這個消息會被重定向到dead-letter 隊列,或者在真實程序中拒絕和丟失.
然而,也有可能,消息被消費了,回調也被調用了,但ack消息因為網絡問題或broker意外關閉而沒有到達應用程序. 這也是將tem ID 設置在DataQueue實例的地方(步驟7). 需要在接收端防止消息重復. 事實上, 當它沒有被應答時,RabbitMQ會在下次重新連接時,重新傳輸消息.
TIP
由于當RabbitMQ懷疑處在高風險條件時,可以重新傳輸消息 ,因為我們需要一種機制來保證避免消息的重復.
在這個食譜中,我們假定有一個單調遞增從0開始的值,它將用來標記所需要的消息-再次說明,在真實程序中,可能會有不同的方案.
在這個食譜中,我們在下面的三次時刻中使用了計數器來保證可靠性:
1.deliveryTag參數(指的是步驟4), 它在生產端,用來在生產端檢查確認,并在確認后從pendingItems hash map中刪除
2.deliveryTag參數(指的是步驟8), 它在消費端,用來向RabbitMQ集群發送ack消息
3. DataQueue item ID (指的是步驟7,步驟8)用于在消費端避免重復消息
posted on 2016-07-02 19:11
胡小軍 閱讀(1913)
評論(0) 編輯 收藏 所屬分類:
RabbitMQ