<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2025年5月>
    27282930123
    45678910
    11121314151617
    18192021222324
    25262728293031
    1234567

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

         摘要: 在本章節(jié)中我們將覆蓋:創(chuàng)建一個本地服務(wù)器集群創(chuàng)建一個簡單集群自動添加一個RabbitMQ 集群引入消息負(fù)載均衡器創(chuàng)建集群客戶端介紹RabbitMQ提供了各種各樣的特性以及集群功能.通過使用集群,一組適當(dāng)配置的主機(jī)的行為與單個broker實例一樣,但集群帶有下面的目的:高可用性: 如果一個節(jié)點(diǎn)宕機(jī)了,分布式broker仍然能接受和處理消息.這方面內(nèi)容會在Chapter 7,Developi...  閱讀全文
    posted @ 2016-06-15 20:54 胡小軍 閱讀(1948) | 評論 (2)編輯 收藏
         摘要: 在本章中我們將覆蓋:使用Spring來開發(fā)web監(jiān)控程序使用Spring來開發(fā)異步web搜索使用STOMP來開發(fā)web監(jiān)控程序介紹RabbitMQ可以像客戶端一樣使用在服務(wù)端。當(dāng)前,RabbitMQ覆蓋了大部分使用的語言和技術(shù)來構(gòu)建web程序,如PHP,Node.js, Python, Ruby, 以及其它.你可以在http://www.rabbitmq.com/devtools.html找到全部...  閱讀全文
    posted @ 2016-06-14 22:07 胡小軍 閱讀(2284) | 評論 (0)編輯 收藏
         摘要: 在本章中我們將覆蓋:使用.NET client通過MQTT綁定iPhone應(yīng)用與RabbitMQ在Andriod上使用消息來更新Google Maps通過Andriod后端來發(fā)布消息使用Qpid來交換RabbitMQ消息使用Mosquitto來交換RabbitMQ消息使用.NET clients來綁定WCF程序介紹在前面的章節(jié)中,我們已經(jīng)介紹了基本概念。現(xiàn)在,我們要使用這些概念來創(chuàng)建真實的應(yīng)用程序...  閱讀全文
    posted @ 2016-06-13 20:25 胡小軍 閱讀(1772) | 評論 (2)編輯 收藏

    RabbitMQ的目標(biāo)是盡可能廣泛地支持大部分平臺.RabbitMQ 可運(yùn)行在任何支持Erlang的平臺上, 包括內(nèi)嵌系統(tǒng),多核集群,云服務(wù)器.

    下面的平臺支持Erlang,因此也可以運(yùn)行RabbitMQ:

    • Linux
    • Windows, NT through 10
    • Windows Server 2003/2008/2012
    • Mac OS X
    • Solaris
    • FreeBSD
    • TRU64
    • VxWorks

    RabbitMQ的開源版本大部分都部署在下面的平臺上:

    • Ubuntu and Debian-based Linux distributions
    • Fedora, CentOS and RPM-based Linux distributions
    • openSUSE and derived distributions (including SLES and SLERT)
    • Mac OS X
    • Windows XP and later

    Windows

    RabbitMQ可運(yùn)行Windows XP及其后續(xù)版本中(Server 2003, Vista, Windows 7, Windows 8, Windows 10, Server 2008 and Server 2012). 盡管沒有測試,但應(yīng)該可以運(yùn)行在Windows NT ,Windows 2000 上.

    64位的Windows Erlang VM從R15版本開始可用.建議使用最新的64位Erlang版本來運(yùn)行。參考Erlang version compatibility page.

    通用UNIX

    雖沒有官方支持,Erlang 和 RabbitMQ 能運(yùn)行在含有POSIX layer including Solaris, FreeBSD, NetBSD, OpenBSD的操作系統(tǒng)上.

    虛擬平臺

    RabbitMQ 可運(yùn)行物理或虛擬硬件上. 這可以允許不支持的平臺通過仿真來運(yùn)行RabbitMQ.
    參考EC2 guide 來了解RabbitMQ如何運(yùn)行在Amazon EC2上的更多信息.

    posted @ 2016-06-06 00:09 胡小軍 閱讀(1244) | 評論 (0)編輯 收藏

    名稱

    rabbitmq-server — 啟動RabbitMQ AMQP server

    語法

    rabbitmq-server [-detached]

    描述

    RabbitMQ是AMQP的實現(xiàn), 后者是高性能企業(yè)消息通信的新興標(biāo)準(zhǔn). RabbitMQ server是AMQP 中間件的健壯,可擴(kuò)展實現(xiàn).

    前端運(yùn)行rabbitmq-server,它會顯示橫幅消息,會報告啟動時的過程信息,最后會顯示"broker running",以表明RabbitMQ中間件已經(jīng)成功啟動。

    要關(guān)閉server,只需要終止過程或使用rabbitmqctl(1)(即:rabbitmqctl stop).

    環(huán)境變量

    RABBITMQ_MNESIA_BASE

    默認(rèn)是 /var/lib/rabbitmq/mnesia. 用于設(shè)置Mnesia 數(shù)據(jù)庫文件存放的目錄.

    RABBITMQ_LOG_BASE

    日志目錄 ,server生成的/var/log/rabbitmq. Log 日志文志會放置在文件會放置在此目錄.(如:window10下默認(rèn)安裝時,日志目錄為:C:\Users\Administrator\AppData\Roaming\RabbitMQ\log

    RABBITMQ_NODENAME

    默認(rèn)是rabbit. 當(dāng)你想在一臺機(jī)器上運(yùn)行多個節(jié)點(diǎn)時,此配置是相當(dāng)有用的, RABBITMQ_NODENAME在每個erlang-node和機(jī)器的組合中應(yīng)該唯一。

    參考clustering on a single machine guide 來更多細(xì)節(jié).

    RABBITMQ_NODE_IP_ADDRESS

    默認(rèn)情況下,RabbitMQ會綁定到所有網(wǎng)絡(luò)接口上,如果只想綁定某個網(wǎng)絡(luò)接口,可修改此設(shè)置。

    RABBITMQ_NODE_PORT

    默認(rèn)是5672.

    選項

    -detached

    以后端的方式來啟動進(jìn)程 ,注意,這會導(dǎo)致pid無法寫入到pid文件中.例如:

    rabbitmq-server -detached

    以后端方式來啟動RabbitMQ AMQP server.

    也可參考

    rabbitmq-env.conf(5) rabbitmqctl(1)

    posted @ 2016-06-06 00:06 胡小軍 閱讀(1183) | 評論 (0)編輯 收藏
         摘要: 本章我們將覆蓋:使用虛擬主機(jī)配置用戶使用SSL實現(xiàn)客戶端證書從瀏覽器中管理RabbitMQ配置RabbitMQ參數(shù)開Python程序來監(jiān)控RabbitMQ自己開發(fā)web程序來監(jiān)控RabbitMQ介紹一旦安裝后,RabbitMQ不需要任何配置就可以工作. 然而,RabbitMQ有許多的配置選項,這些配置選項使得它更為靈活,能工作于多種不同環(huán)境中.在本章中,我們將看到如何改變配置來滿足應(yīng)用程序的需求。...  閱讀全文
    posted @ 2016-06-05 20:10 胡小軍 閱讀(1606) | 評論 (0)編輯 收藏

    一.安裝Erlang

    1、下載推薦的安裝包

    2、安裝

    安裝依賴包

    yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

    #rpm -ivh esl-erlang_18.3-1~centos~7_amd64.rpm

    二.安裝RabbitMQ
    下載RabbitMQ
    # wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
    # rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

    安裝rabbitmq-server的過程中遇到了一個問題:

    Error: Package: rabbitmq-server-3.6.1-1.noarch (/rabbitmq-server-3.6.1-1.noarch) 
    Requires: erlang >= R16B-3 
    You could try using --skip-broken to work around the problem 
    You could try running: rpm -Va --nofiles --nodigest

    這是由于erlang的版本問題,其實是沒有影響的,你可以使用下面的命令進(jìn)行安裝:

    #rpm -ivh --nodeps rabbitmq-server-3.6.1-1.noarch.rpm


    啟動

    #service rabbitmq-server start --后臺方式運(yùn)行

    #service rabbitmq-server stop  --停止運(yùn)行

    #service rabbitmq-server status --查看狀態(tài)

    #rabbitmq-server start 

    可以看到使用的日志文件

    日志目錄

    /var/log/rabbitmq

    #cat /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log 可以看到下面的日志記錄

    ...................................................................................................................................................................................................................................................

    =INFO REPORT==== 28-Apr-2016::04:20:10 ===
    node           : rabbit@iZ94nxslz66Z
    home dir       : /var/lib/rabbitmq
    config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
    cookie hash    : fisYwC976M1LblhTfYslpg==
    log            : /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log
    sasl log       : /var/log/rabbitmq/rabbit@iZ94nxslz66Z-sasl.log
    database dir   : /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Memory limit set to 397MB of 992MB total.
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Disk free limit set to 50MB
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Limiting to approx 65435 file handles (58889 sockets)
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    FHC read buffering:  OFF
    FHC write buffering: ON
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Database directory at /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z is empty. Initialising from scratch...
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Priority queues enabled, real BQ is rabbit_variable_queue
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Adding vhost '/'
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Creating user 'guest'
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Setting user tags for user 'guest' to [administrator]
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Setting permissions for 'guest' in '/' to '.*', '.*', '.*'
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    msg_store_transient: using rabbit_msg_store_ets_index to provide index
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    msg_store_persistent: using rabbit_msg_store_ets_index to provide index
    =WARNING REPORT==== 28-Apr-2016::04:20:11 ===
    msg_store_persistent: rebuilding indices from scratch
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    started TCP Listener on [::]:5672
    =INFO REPORT==== 28-Apr-2016::04:20:11 ===
    Server startup complete; 0 plugins started.
    =INFO REPORT==== 28-Apr-2016::04:21:52 ===
    Management plugin: using rates mode 'basic'
    =INFO REPORT==== 28-Apr-2016::04:21:52 ===
    Management plugin started. Port: 15672
    =INFO REPORT==== 28-Apr-2016::04:21:52 ===
    Statistics database started.
    =INFO REPORT==== 28-Apr-2016::04:21:52 ===
    Plugins changed; enabled [mochiweb,webmachine,rabbitmq_web_dispatch,
                              amqp_client,rabbitmq_management_agent,
                              rabbitmq_management], disabled []
    =INFO REPORT==== 28-Apr-2016::04:23:01 ===
    Stopping RabbitMQ
    =INFO REPORT==== 28-Apr-2016::04:23:01 ===
    stopped TCP Listener on [::]:5672
    =INFO REPORT==== 28-Apr-2016::04:23:01 ===
    Stopped RabbitMQ application
    =INFO REPORT==== 28-Apr-2016::04:23:01 ===
    Halting Erlang VM
    =INFO REPORT==== 28-Apr-2016::04:23:29 ===
    Starting RabbitMQ 3.6.1 on Erlang 18.3
    Copyright (C) 2007-2016 Pivotal Software, Inc.
    Licensed under the MPL.  See http://www.rabbitmq.com/
    ...................................................................................................................................................................................................................................................


    卸載

    #rpm -qa|grep rabbitmq
    rabbitmq-server-3.6.1-1.noarch
    #rpm -e --nodeps rabbitmq-server-3.6.1-1.noarch
    #rpm -qa|grep erlang
    esl-erlang-18.3-1.x86_64
    #rpm -e --nodeps esl-erlang-18.3-1.x86_64

    管理

    Rabbitmq服務(wù)器的主要通過rabbitmqctl和rabbimq-plugins兩個工具來管理,以下是一些常用功能。

    1). 服務(wù)器啟動與關(guān)閉

          啟動: rabbitmq-server –detached

          關(guān)閉:rabbitmqctl stop

          若單機(jī)有多個實例,則在rabbitmqctlh后加–n 指定名稱

    2). 插件管理

          開啟某個插件:rabbitmq-pluginsenable xxx

          關(guān)閉某個插件:rabbitmq-pluginsdisablexxx

          注意:重啟服務(wù)器后生效。

    3).virtual_host管理

          新建virtual_host: rabbitmqctladd_vhost  xxx

          撤銷virtual_host:rabbitmqctl  delete_vhost xxx

    4). 用戶管理

          新建用戶:rabbitmqctl add_user xxxpwd

          刪除用戶:   rabbitmqctl delete_user xxx

          改密碼: rabbimqctlchange_password {username} {newpassword}

          設(shè)置用戶角色:rabbitmqctlset_user_tags {username} {tag ...}

                  Tag可以為 administrator,monitoring, management

    5). 權(quán)限管理

          權(quán)限設(shè)置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

                   Vhostpath

                   Vhost路徑

                   user

          用戶名

                  Conf

          一個正則表達(dá)式match哪些配置資源能夠被該用戶訪問。

                  Write

          一個正則表達(dá)式match哪些配置資源能夠被該用戶讀。

                   Read

          一個正則表達(dá)式match哪些配置資源能夠被該用戶訪問。

    6). 獲取服務(wù)器狀態(tài)信息

           服務(wù)器狀態(tài):rabbitmqctl status

           隊列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                    Queueinfoitem可以為:name,durable,auto_delete,arguments,messages_ready,

                    messages_unacknowledged,messages,consumers,memory

           Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                     Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

           Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                     Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

           Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

           Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

           Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

          Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

     

     

    常用命令:

    查看所有隊列信息

    # rabbitmqctl list_queues

    關(guān)閉應(yīng)用

    # rabbitmqctl stop_app

    啟動應(yīng)用,和上述關(guān)閉命令配合使用,達(dá)到清空隊列的目的

    # rabbitmqctl start_app

    清除所有隊列

    # rabbitmqctl reset

    更多用法及參數(shù),可以執(zhí)行如下命令查看

    # rabbitmqctl

     

     

    rabbitmq常用命令

    rabbitmq-server start  或者   service rabbitmq-server start     #啟動rabbitmq

    rabbitmqctl list_exchanges 

    rabbitmqctl list_bindings

    rabbitmqctl list_queues #分別查看當(dāng)前系統(tǒng)種存在的Exchange和Exchange上綁定的Queue信息。

    rabbitmqctl status  #查看運(yùn)行信息

    rabbitmqctl stop     #停止運(yùn)行rabbitmq

    rabbitmq-plugins enable rabbitmq_management  

    #啟動rabbitmq的圖形管理界面,這個操作必須重啟rabbitmq, 然后在web中 http://127.0.0.1:15672 用戶名和密碼都是guest guest。如果局域網(wǎng)無法訪問設(shè)置防火墻過濾規(guī)則或關(guān)閉防火墻。


    posted @ 2016-06-05 20:08 胡小軍 閱讀(3189) | 評論 (0)編輯 收藏

    概述

    RabbitMQ broker是一個或多個Erlang節(jié)點(diǎn)的邏輯分組,多個運(yùn)行的RabbitMQ應(yīng)用程序可共享用戶,虛擬主機(jī),隊列,交換機(jī),綁定以及運(yùn)行時參數(shù)。有時我們將多個節(jié)點(diǎn)的集合稱為集群。

    什么是復(fù)制?

    RabbitMQ broker操作所需的所有數(shù)據(jù)/狀態(tài)都可以在多個節(jié)點(diǎn)間復(fù)制. 例外是消息隊列,默認(rèn)情況下它駐留在一個節(jié)點(diǎn), 盡管它們對所有節(jié)點(diǎn)來說,是可見的,可達(dá)的.要在集群中跨節(jié)點(diǎn)復(fù)制隊列,可參考high availability 文檔(注意,你仍然先需要一個工作集群).

    主機(jī)名解析需求

    RabbitMQ節(jié)點(diǎn)彼此之間使用域名,要么是簡短的,要么是全限定的(FQDNs). 因此,集群中所有成員的主機(jī)名都必須是可解析的,也可用于機(jī)器上的命令行工具,如rabbitmqctl.

    主機(jī)名解析可使用任何一種標(biāo)準(zhǔn)的操作系統(tǒng)提供方法:

    • DNS 記錄
    • 本地主機(jī)文件(e.g. /etc/hosts)
    在更加嚴(yán)格的環(huán)境中,DNS記錄或主機(jī)文件修改是受限的,不可能的或不受歡迎的, Erlang VM可通過使用替代主機(jī)名解析方法來配置, 如一個替代的DNS服務(wù)器,一個本地文件,一個非標(biāo)準(zhǔn)的主機(jī)文件位置或一個混合方法. 這些方法可以與標(biāo)準(zhǔn)操作主機(jī)名解析方法一起協(xié)同工作。

    要使用FQDNs, 參考RABBITMQ_USE_LONGNAME in the Configuration guide.

    集群構(gòu)成

    集群可以通過多種方式來構(gòu)建:

    一個集群的構(gòu)成可以動態(tài)修改. 所有RabbitMQ brokers開始都是以單個節(jié)點(diǎn)來運(yùn)行的. 這些節(jié)點(diǎn)可以加入到集群中, 隨后也可以脫離集群再次成為單一節(jié)點(diǎn)。

    故障處理

    RabbitMQ brokers 可以容忍個別節(jié)點(diǎn)故障. 節(jié)點(diǎn)可以隨意地啟動和關(guān)閉,只要在已知關(guān)閉的時間內(nèi)能夠聯(lián)系到集群節(jié)點(diǎn).

    RabbitMQ 集群有多種模式來處理網(wǎng)絡(luò)分化, 主要是一致性方向. 集群是在LAN中使用的,不推薦在WAN中運(yùn)行集群. Shovel 或 Federation 插件對于跨WAN連接brokers ,有更好的解決方案. 注意 Shovel 和 Federation 不等同于集群.

    磁盤和內(nèi)存節(jié)點(diǎn)

    節(jié)點(diǎn)可以是磁盤節(jié)點(diǎn),也可以是內(nèi)存節(jié)點(diǎn)。多數(shù)情況下,你希望所有的節(jié)點(diǎn)都是磁盤節(jié)點(diǎn),但RAM節(jié)點(diǎn)是一種特殊情況,它可以提高集群中隊列和,交換機(jī),綁定的性能. 當(dāng)有疑問時,最好只使用磁盤節(jié)點(diǎn)。

    集群文字記錄(Transcript)

    下面是通過三臺機(jī)器-rabbit1rabbit2rabbit3來設(shè)置和操作RabbitMQ集群的文字記錄.

    我們假設(shè)用戶已經(jīng)登錄到這三臺機(jī)器上,并且都已經(jīng)在機(jī)器上安裝了RabbitMQ,以及rabbitmq-server 和rabbitmqctl 腳本都已經(jīng)在用戶的PATH環(huán)境變量中.

    This transcript can be modified to run on a single host, as explained more details below.

    節(jié)點(diǎn)(以及CLI工具)之間如何來認(rèn)證: Erlang Cookie

    RabbitMQ 節(jié)點(diǎn)和CLI 工具(如rabbitmqctl) 使用cookie來確定每個節(jié)點(diǎn)之間是否可以通信. 兩個節(jié)點(diǎn)之間要能通信,它們必須要有相同的共享密鑰Erlang cookie. cookie只是具有字母數(shù)字特征的字符串。只要你喜歡,它可長可短. 每個集群節(jié)點(diǎn)必須有相同的cookie.

    當(dāng)RabbitMQ 服務(wù)器啟動時,Erlang VM 會自動地創(chuàng)建一個隨機(jī)的cookie文件. 最簡單的處理方式是允許一個節(jié)點(diǎn)來創(chuàng)建文件,然后再將這個文件拷貝到集群的其它節(jié)點(diǎn)中。

    在 Unix 系統(tǒng)中, cookie的通常位于/var/lib/rabbitmq/.erlang.cookie 或$HOME/.erlang.cookie.

    在Windows中, 其位置在C:\Users\Current User\.erlang.cookie(%HOMEDRIVE% + %HOMEPATH%\.erlang.cookie) 或C:\Documents and Settings\Current User\.erlang.cookie, 對于RabbitMQ Windows service其位置在C:\Windows\.erlang.cookie。如果使用了Windows service ,  cookie可被放于這兩個位置中.

    作為替代方案,你可以在 rabbitmq-server 和 rabbitmqctl 腳本中調(diào)用erl時,插入"-setcookie cookie"選項.

    當(dāng)cookie未配置時 (例如,不相同), RabbitMQ 會記錄這樣的錯誤"Connection attempt from disallowed node" and "Could not auto-cluster".

    啟動獨(dú)立節(jié)點(diǎn)

    集群可通過重新配置,而將現(xiàn)有RabbitMQ 節(jié)點(diǎn)加入到集群配置中. 因此第一步是以正常的方式在所有節(jié)點(diǎn)上啟動RabbitMQ:

    rabbit1$ rabbitmq-server -detached 
    rabbit2$ rabbitmq-server -detached
    rabbit3$ rabbitmq-server -detached

    這會創(chuàng)建三個獨(dú)立的RabbitMQ brokers, 每個節(jié)點(diǎn)一個,可通過cluster_status命令來驗證:

    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

    rabbitmq-server shell腳本來啟動RabbitMQ broker的節(jié)點(diǎn)名稱是rabbit@shorthostname,在這里,短節(jié)點(diǎn)名稱是小寫的(如上面的rabbit@rabbit1). 如果在windows上,你使用rabbitmq-server.bat批處理文件來啟動,短節(jié)點(diǎn)名稱是大寫的(如:rabbit@RABBIT1). 當(dāng)你輸入節(jié)點(diǎn)名稱時,不論是大寫還是小寫的,這些字符串都必須精確匹配。

    創(chuàng)建集群

    為了把這三個節(jié)點(diǎn)構(gòu)建到一個集群中,我們可以告訴其中的兩個節(jié)點(diǎn), 假設(shè)為rabbit@rabbit2 和 rabbit@rabbit3, 將加入到第三個節(jié)點(diǎn)的集群中,這第三個節(jié)點(diǎn)假設(shè)為rabbit@rabbit1.

    首先我們將rabbit@rabbit2加入到rabbit@rabbit1的集群中. 要做到這一點(diǎn),我們必須在rabbit@rabbit2 上停止RabbitMQ應(yīng)用程序,并將其加入到rabbit@rabbit1 集群中, 然后再重啟RabbitMQ 應(yīng)用程序. 

    注意:加入集群會隱式地重置節(jié)點(diǎn), 因此這會刪除此節(jié)點(diǎn)上先前存在的所有資源和數(shù)據(jù).(如何備份數(shù)據(jù))

    rabbit2$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit2 ...done.
    rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
    Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
    rabbit2$ rabbitmqctl start_app
    Starting node rabbit@rabbit2 ...done.

    在每個節(jié)點(diǎn)上通過運(yùn)行cluster_status 命令,我們可以看到兩個節(jié)點(diǎn)已經(jīng)加入了集群:

    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

    現(xiàn)在我們將rabbit@rabbit3節(jié)點(diǎn)加入到同一個集群中. 操作步驟同上面的一致,除了這次我們選擇rabbit2來加入集群,但這并不重要:

    rabbit3$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit3 ...done.
    rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
    Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
    rabbit3$ rabbitmqctl start_app
    Starting node rabbit@rabbit3 ...done.

    在任何一個節(jié)點(diǎn)上通過運(yùn)行cluster_status命令,我們可以看到三個節(jié)點(diǎn)已經(jīng)加入了集群:

    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

    通過上面的步驟,當(dāng)集群運(yùn)行的時候,我們可以在任何時候?qū)⑿碌墓?jié)點(diǎn)加入到集群中.

    重啟集群節(jié)點(diǎn)

    注意,加入到集群中的節(jié)點(diǎn)可在任何時候停止, 對于崩潰來說也沒有問題. 在這兩種情況下,集群剩余的節(jié)點(diǎn)將不受影響地繼續(xù)操作,當(dāng)它們重啟的時候,這些崩潰的節(jié)點(diǎn)會再次自動追趕上其它的集群節(jié)點(diǎn)。

    我們關(guān)閉了節(jié)點(diǎn)rabbit@rabbit1和rabbit@rabbit3,并在每步觀察集群的狀態(tài):

    rabbit1$ rabbitmqctl stop 
    Stopping and halting node rabbit@rabbit1 ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}] ...done.
    rabbit3$ rabbitmqctl stop
    Stopping and halting node rabbit@rabbit3 ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2]}] ...done.

    譯者注:關(guān)閉了rabbit1節(jié)點(diǎn)后,運(yùn)行的節(jié)點(diǎn)已經(jīng)沒有rabbit1節(jié)點(diǎn)了

    現(xiàn)在我們再次啟動節(jié)點(diǎn),并檢查集群狀態(tài):

    rabbit1$ rabbitmq-server -detached 
    rabbit1$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmq-server -detached
    rabbit1$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

    這里有一些重要的警告:

    • 當(dāng)整個集群崩潰的時候, 最后一個崩潰的節(jié)點(diǎn)必須第一個上線.如果不是這樣,節(jié)點(diǎn)將會等待最后一個磁盤節(jié)點(diǎn)30秒以確認(rèn)其重新上線,否則就會失敗. 如果最后一個下線的節(jié)點(diǎn),不能再重新上線,那么它可能會使用forget_cluster_node命令來從集群中刪除 - 查閱 rabbitmqctl頁面來了解更多信息.
    • 如果所有集群節(jié)點(diǎn)都在同一個時間內(nèi)停止且不受控制(如斷電)。在這種情況下,你可以在某個節(jié)點(diǎn)上使用force_boot命令使其再次成為可啟動的-查閱 rabbitmqctl頁面來了解更多信息.

    脫離集群

    當(dāng)節(jié)點(diǎn)不再是集群的一部分時,可以明確地將其從集群中刪除. 首先我們將節(jié)點(diǎn)rabbit@rabbit3從集群中刪除, 以使其回歸獨(dú)立操作.要做到這一點(diǎn),需要在rabbit@rabbit3節(jié)點(diǎn)上停止RabbitMQ 應(yīng)用程序,重設(shè)節(jié)點(diǎn),并重啟RabbitMQ應(yīng)用程序.

    rabbit3$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit3 ...done.
    rabbit3$ rabbitmqctl reset
    Resetting node rabbit@rabbit3 ...done.
    rabbit3$ rabbitmqctl start_app
    Starting node rabbit@rabbit3 ...done.

    在節(jié)點(diǎn)上運(yùn)行cluster_status 命令來確認(rèn)rabbit@rabbit3節(jié)點(diǎn)現(xiàn)在已不再是集群的一部分,并且會獨(dú)自操作:

    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

    我們也可以遠(yuǎn)程地刪除節(jié)點(diǎn),這是相當(dāng)有用的,舉例來說,當(dāng)處理無反應(yīng)的節(jié)點(diǎn)時.舉例來說,我們可以從 rabbit@rabbit2中刪除rabbit@rabbi1.

    rabbit1$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit1 ...done.
    rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
    Removing node rabbit@rabbit1 from cluster ... ...done.

    注意,rabbit1仍然認(rèn)為它與rabbit2處在一個集群中,但嘗試啟動時會出現(xiàn)一個錯誤.這時,我們需要對其進(jìn)行重置以使其能再次啟動.

    rabbit1$ rabbitmqctl start_app 
    Starting node rabbit@rabbit1 ... Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
    rabbit1$ rabbitmqctl reset
    Resetting node rabbit@rabbit1 ...done.
    rabbit1$ rabbitmqctl start_app Starting node rabbit@mcnulty ... ...done.

    現(xiàn)在, cluster_status 命令會顯示三個節(jié)點(diǎn)都是獨(dú)立節(jié)點(diǎn),并且操作是獨(dú)立的:

    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
    rabbit3$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

    注意:rabbit@rabbit2節(jié)點(diǎn)仍然殘留有集群的狀態(tài)(譯者注:怎么看出來的呢?), 但是 rabbit@rabbit1 和rabbit@rabbit3 節(jié)點(diǎn)是新鮮的RabbitMQ brokers.如果我們想重新初始化rabbit@rabbit2節(jié)點(diǎn),我們可以按其它節(jié)點(diǎn)的步驟來操作:

    rabbit2$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit2 ...done.
    rabbit2$ rabbitmqctl reset
    Resetting node rabbit@rabbit2 ...done.
    rabbit2$ rabbitmqctl start_app
    Starting node rabbit@rabbit2 ...done.

    升級集群

    當(dāng)從主版本或小版本進(jìn)行升級時 (如:從3.0.x 到3.1.x,或從2.x.x 到3.x.x),或者是升級Erlang時, 整個集群在升級時必須記下來(taken down) (因為集群不會像這樣來運(yùn)行多個混合的版本). 當(dāng)從補(bǔ)丁版本升級到另一個時(如:從3.0.x 到3.0.y)時,這種情況是不會出現(xiàn)的;這些版本在集群中是可以混合使用的(例外是3.0.0不能與 3.0.x 系列后的版本混合).

    在主版本與小版本之間升級時,RabbitMQ有必要的話會自動更新其持久化數(shù)據(jù). 在集群中,此任務(wù)是由第一個磁盤節(jié)點(diǎn)來啟動的("upgrader"節(jié)點(diǎn)). 因此在升級RabbitMQ集群時,你不需要嘗試先啟動RAM節(jié)點(diǎn),任何啟動的RAM節(jié)點(diǎn)都會發(fā)生錯誤,并且不能啟動.

    雖然不是嚴(yán)格必須的,但使用磁盤節(jié)點(diǎn)來作為升級節(jié)點(diǎn)通常是好的主意,最后停止那個節(jié)點(diǎn)。

    自動升級只適用于2.1.1及其之后的版本,如果你有更早的集群 ,你必須重新構(gòu)建升級.

    單臺機(jī)器上的集群

    在某些情況下,在一臺機(jī)器上運(yùn)行RabbitMQ節(jié)點(diǎn)的集群是有用的(試驗性質(zhì)). 

    要在一臺機(jī)器上運(yùn)行多個RabbitMQ節(jié)點(diǎn),必須確保節(jié)點(diǎn)含有不同的節(jié)點(diǎn)名稱,數(shù)據(jù)存儲路徑,日志文件位置,綁定到不同的端口,并包含那些插件使用的端口等等 .參考配置指南中的RABBITMQ_NODENAMERABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT文檔 ,以及 File and Directory Locations guide指南中的 RABBITMQ_MNESIA_DIRRABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE。

    你可以在同一個主機(jī)上通過重復(fù)調(diào)用rabbitmq-server(rabbitmq-server.bat on Windows)來手動地啟動多個節(jié)點(diǎn) . 例如:

    $ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
    $ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
    $ rabbitmqctl -n hare stop_app
    $ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
    $ rabbitmqctl -n hare start_app

    這會設(shè)置兩個節(jié)點(diǎn)的集群,這兩個節(jié)點(diǎn)都是磁盤節(jié)點(diǎn). 注意,如果你想打開非AMQP的其它端口,你需要通過命令行進(jìn)行配置

    $ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
    $ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached

    主機(jī)名稱變更

    RabbitMQ節(jié)點(diǎn)使用主機(jī)名來相互通信.因此,所有節(jié)點(diǎn)名稱都集群中的節(jié)點(diǎn)應(yīng)該都能被解析.對于像 rabbitmqctl這樣的工具來說,也是如此.

    除此之外,默認(rèn)情況下,RabbitMQ使用當(dāng)前系統(tǒng)的主機(jī)名稱來命名數(shù)據(jù)庫目錄.如果主機(jī)名變了,將會創(chuàng)建一個空的數(shù)據(jù)庫.為避免數(shù)據(jù)丟失,應(yīng)該總是設(shè)置一個固定的,可解析的主機(jī)名稱。無論何時,只要主機(jī)名變化了,你就必須要重啟RabbitMQ:

    $ /etc/init.d/rabbitmq-server restart

    類似的效果可通過使用 rabbit@localhost作為broker節(jié)點(diǎn)名稱來達(dá)到。這個解決方案的影響是集群將不會工作,因為選中的主機(jī)名不能被遠(yuǎn)程主機(jī)所解析。當(dāng)從遠(yuǎn)程主機(jī)調(diào)用時,類似地rabbitmqctl命令也會失敗. 免遭此缺點(diǎn)的復(fù)雜方案是使用DNS,如:如果運(yùn)行EC2,則使用 Amazon Route 53 。如果你想使用節(jié)點(diǎn)名稱的全限定主機(jī)名(RabbitMQ 默認(rèn)使用短名稱),那么可使用DNS解析, 可設(shè)置環(huán)境變量 RABBITMQ_USE_LONGNAME=true.


    防火墻節(jié)點(diǎn)

    當(dāng)在一個數(shù)據(jù)中心或可靠網(wǎng)絡(luò)時,帶防火墻的集群節(jié)點(diǎn)是存在的,但這些節(jié)點(diǎn)通常被防火墻隔離。再一次聲明,當(dāng)各節(jié)點(diǎn)之間的網(wǎng)絡(luò)連接不穩(wěn)定時,集群不建議在WAN在使用

    在多數(shù)配置中,你需要打開4369和25672端口以使用集群正常工作.

    Erlang 使用Port Mapper Daemon (epmd) 來解析集群中的節(jié)點(diǎn)名稱. 默認(rèn)epmd端口是4369,但它可以通過ERL_EPMD_PORT環(huán)境變量進(jìn)行修改.所有的節(jié)點(diǎn)都必須使用同一個端口。詳細(xì)信息可參考Erlang epmd manpage.

    一旦分布式Erlang節(jié)點(diǎn)通過empd解析后,其它節(jié)點(diǎn)將會嘗試直接通信。默認(rèn)地通信端口比RABBITMQ_NODE_PORT (即,默認(rèn)是25672)高了20000. 這可以通過RABBITMQ_DIST_PORT 環(huán)境變量修改

    跨集群Erlang版本

    集群中所有節(jié)點(diǎn)必須運(yùn)行相同版本的Erlang.

    從客戶端連接集群

    客戶端可以正常連接到集群中的任意節(jié)點(diǎn),如果那個節(jié)點(diǎn)發(fā)生故障了 ,只要有剩余集群節(jié)點(diǎn)幸存,當(dāng)客戶端發(fā)現(xiàn)在關(guān)閉的連接時,它就能夠重新連接到剩余幸存的集群節(jié)點(diǎn)上。一般來說,將節(jié)點(diǎn)主機(jī)名稱或IP地址放到客戶端程序是極其不明智的,這會導(dǎo)致缺乏靈活性,并需要客戶端程序重新編輯,編譯,重新配置以適應(yīng)集群配置變化或者集群節(jié)點(diǎn)變化。相反,我們建議采用更抽象的方法: 如有簡短TTL配置的動態(tài)DNS服務(wù)或普通的TCP負(fù)載均衡器. 一般來說,這方面的管理集群內(nèi)連接節(jié)點(diǎn)是超出了RabbitMQ本身的范圍,我們建議使用其他技術(shù)專門設(shè)計來解決這些問題。

    內(nèi)存節(jié)點(diǎn)集群

    內(nèi)存節(jié)點(diǎn)只在內(nèi)存中保存其元數(shù)據(jù)。它不會像磁盤節(jié)點(diǎn)將元數(shù)據(jù)寫入到磁盤中,但它們擁有更好的性能。 然而,也應(yīng)該注意到,由于持久化隊列數(shù)據(jù)總是存儲在磁盤上的,其性能提升只會影響資源管理(如: 添加/刪除隊列,交換機(jī),或虛擬主機(jī)), 但不會影響發(fā)布或消費(fèi)的速度.

    內(nèi)存節(jié)點(diǎn)是一個高級使用例子;當(dāng)設(shè)置你的第一個集群時,你應(yīng)該不使用它們。你應(yīng)該用足夠的磁盤節(jié)點(diǎn)來處理冗余需求,然后如果有必要,再用內(nèi)存節(jié)點(diǎn)進(jìn)行擴(kuò)展.

    集群中只含有內(nèi)存節(jié)點(diǎn)是相當(dāng)脆弱的,如果集群停止了,你將不能再次啟動,并且會導(dǎo)致數(shù)據(jù)丟失。RabbitMQ在許多情況下,會阻止創(chuàng)建只包含內(nèi)存節(jié)點(diǎn)的集群,但不能完全阻止。

    (譯者注:在集群構(gòu)建中,最好有兩個或以上的磁盤節(jié)點(diǎn),然后再考慮使用內(nèi)存節(jié)點(diǎn)進(jìn)行擴(kuò)展)

    創(chuàng)建內(nèi)存節(jié)點(diǎn)

    當(dāng)節(jié)點(diǎn)加入集群時,我們可將其聲明為內(nèi)存節(jié)點(diǎn). 我們可以通過使用像先前rabbitmqctl join_cluster命令再加--ram標(biāo)志來達(dá)到目的:

    rabbit2$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit2 ...done.
    rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
    Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
    rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.


    rabbit1$ rabbitmqctl cluster_status 
    Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
    rabbit2$ rabbitmqctl cluster_status
    Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

    改變節(jié)點(diǎn)類型

    我們可以將節(jié)點(diǎn)的類型從磁盤修改為內(nèi)存,反之亦然. 假設(shè)我們想反轉(zhuǎn)rabbit@rabbit2 和 rabbit@rabbit1的節(jié)點(diǎn)類型,即先將內(nèi)存節(jié)點(diǎn)轉(zhuǎn)換為磁盤節(jié)點(diǎn),隨后再將其從磁盤節(jié)點(diǎn)轉(zhuǎn)換為內(nèi)存節(jié)點(diǎn).要做到這點(diǎn),我們可以使用change_cluster_node_type命令. 首先節(jié)點(diǎn)必須先停止.

    rabbit2$ rabbitmqctl stop_app 
    Stopping node rabbit@rabbit2 ...done. rabbit2$
    rabbitmqctl change_cluster_node_type disc
    Turning rabbit@rabbit2 into a disc node ... ...done. Starting node rabbit@rabbit2 ...done.
    rabbit1$
    rabbitmqctl stop_app
    Stopping node rabbit@rabbit1 ...done.
    rabbit1$
    rabbitmqctl change_cluster_node_type ram
    Turning rabbit@rabbit1 into a ram node ...
    rabbit1$
    rabbitmqctl start_app
    Starting node rabbit@rabbit1 ...done.
    posted @ 2016-06-05 19:53 胡小軍 閱讀(3949) | 評論 (0)編輯 收藏
    本章我們將覆蓋:
    1. 如何使用消息過期
    2. 如何使指定隊列上的消息過期
    3. 如何讓隊列過期
    4. 管理駁回的(rejected)或過期的消息
    5. 理解其它備用交換器擴(kuò)展
    6. 理解有效user-ID擴(kuò)展
    7. 通知隊列消息者失敗
    8. 理解交換器到交換器擴(kuò)展
    9. 在消息中嵌入消息目的地
    介紹
    在本章中,我們將展示關(guān)于RabbitMQ擴(kuò)展上的一些食譜.這些擴(kuò)展不是AMQP 0-9-1標(biāo)準(zhǔn)的一部分,使用它們會破壞其它AMQPbroker的兼容性。
    另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現(xiàn)了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優(yōu)化問題的有效解決方案。

    本章中的例子將更為真實,例如,配置參數(shù),如列表和交換器, 以及路由鍵名稱將定義在Constants接口中。事實上,一個真正的應(yīng)用程序會遵循這樣的準(zhǔn)則從配置文件中讀取配置文件,以在不同應(yīng)用程序中共享。
    然而,在下面的例子中,為了更簡短和較好的可讀性,我們并沒有指定Constants的命名空間。

    如何讓消息過期
    在本食譜中,我們將展示如何讓消息過期.食譜的資源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
    1. Producer.java
    2. Consumer.java
    3. GetOne.java
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    本示例的核心是Producer.java文件.為了產(chǎn)生在給定生存時間(TTL)后過期的消息,我們需要執(zhí)行下面的步驟:
    1. 創(chuàng)建或聲明一個用來發(fā)送消息的交換器, 并將其綁定到隊列上,就像第1章使用AMQP看到的一樣:
    channel.exchangeDeclare(exchange, "direct", false);
    channel.queueDeclare(queue, false, false, false, null);
    channel.queueBind(queue, exchange, routingKey);
    2. 像下面這樣初始化可選消息屬性TTL:
    BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
    3. 使用下面的代碼來發(fā)布消息:
    channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
    如何工作
    在這個例子中,生產(chǎn)者創(chuàng)建了一個交換器,一個命名隊列,并將它們進(jìn)行了綁定,當(dāng)隊列上沒有附著任何消費(fèi)者,過期消息就顯得非常有意義了。
    設(shè)置過期時間TTL (以毫秒設(shè)置),會促使RabbitMQ在消息過期時,如果消息沒有被客戶端及時消費(fèi),立即刪除消息.
    在我們的例子中,我們假設(shè)應(yīng)用程序發(fā)布了JVM資源統(tǒng)計信息到給定隊列,如果存在消費(fèi)者,那么會像平時一樣,獲取到實時數(shù)據(jù),反之,如果不存在這樣的消費(fèi)者,那么消息會給定生存時間后立即過期。通過這種方式,可以避免我們收集大量的數(shù)據(jù)。一旦消息者綁定到了隊列中,它會得到先前的消息(未過期)。進(jìn)一步的試驗,你可以用GetOne.java文件來替換Consumer.java文件運(yùn)行.
    在調(diào)用 channel.basicGet() 時,會使你一次只能消費(fèi)一個消息。
    TIP
    可使用channel.basicGet()方法來檢查未消費(fèi)消息的隊列.也可以通過為第二參數(shù)傳遞false來調(diào)用,即autoAck標(biāo)志.

    在這里我們可以通過調(diào)用rabbitmqctl list_queues來監(jiān)控RabbitMQ隊列的狀態(tài)。  

    也可參考
    默認(rèn)情況下,過期消息會丟失,但它們可以路由到其它地方。可參考管理拒絕消息或過期消息食譜來了解更多信息.

    如何讓指定隊列上的消息過期
    在本食譜中,我們將展示指定消息TTL的第二種方式.這次,我們不再通過消息屬性來指定,而是通過緩存消息的隊列來進(jìn)行指定。在這種情況下,生產(chǎn)者只是簡單地發(fā)布消息到交換器中,因此,在交換器上綁定標(biāo)準(zhǔn)隊列和過期消息隊列是可行的。
    要在這方面進(jìn)行備注,須存在一個創(chuàng)建自定義的隊列的消費(fèi)者。生產(chǎn)者是相當(dāng)標(biāo)準(zhǔn)的.
    像前面的食譜一樣,你可以在Chapter02/Recipe02/Java/src/rmqexample找到這三個源碼。
     
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    現(xiàn)在我們將展示創(chuàng)建特定消息TTL隊列的必要步驟。在我們的例子中,需要在Consumer.java文件中執(zhí)行下面的步驟:
    1. 按下面來聲明交換器:
    channel.exchangeDeclare(exchange, "direct", false);
    2. 創(chuàng)建或聲明隊列,像下在這樣為x-message-ttl可選參數(shù)指定10,000毫秒的超時時間:
    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-message-ttl", 10000);
    channel.queueDeclare(queue, false, false, false, arguments);
    3. 綁定隊列到交換器上:
    channel.queueBind(queue, exchange, routingKey);
    如何工作
    在這個例子中,為了最終分析,我們再次假設(shè)生產(chǎn)者發(fā)送了JVM統(tǒng)計數(shù)據(jù)給RabbitMQ。最終因為Producer.java文件將其發(fā)到一個交換機(jī),如果無消費(fèi)者連接的話,消息最終會丟失。
    想要監(jiān)控或分析這些統(tǒng)計數(shù)據(jù)的消費(fèi)有下面三種選擇:
    1. 綁定到一個臨時隊列,即調(diào)用無參的channel.queueDeclare()方法
    2. 綁定到一個非自動刪除的命名隊列
    3. 綁定到一個非自動刪除的命名隊列,并且指定x-message-ttl ,如步驟2中展示的一樣.
    在第一種情況中,消費(fèi)者將獲取實時統(tǒng)計數(shù)據(jù),但當(dāng)它掉線期間,它將不能在數(shù)據(jù)上執(zhí)行分析。
    在第二種情況中,為了能讓它掉線期間,能獲取到發(fā)送的消息,可以使用一個命名隊列(最終是持久化的).但在掉線較長時間后,再重啟時,它將有巨大的backlog來進(jìn)行恢復(fù),因此在隊列中可能存在大部分舊消息的垃圾。
    在第三種情況中,舊消息垃圾會通過RabbitMQ自己來執(zhí)行,以使我們從消費(fèi)者和broker中獲益。
    更多
    當(dāng)設(shè)置per-queue TTL, 就像本食譜中看到的一樣,只要未到超時時間,消息就不會被丟棄,此時消費(fèi)者還可以嘗試消費(fèi)它們。
    當(dāng)使用queue TTL時, 這里有一個細(xì)微的變化,但使用per-message TTL時,在broker隊列中可能會存在過期消息.
    在這種情況下,這些過期消息仍然會占據(jù)資源(內(nèi)存),同時broker統(tǒng)計數(shù)據(jù)中仍然會計數(shù),直到它們不會到隊列頭部時。
    也中參考
    在這種情況下,過期消息也會恢復(fù)。參考管理駁回或過期消息食譜.
    如何讓隊列過期
    在第三種情況中,TTL不關(guān)聯(lián)任何消息,只關(guān)聯(lián)對列。這種情況對于服務(wù)器重啟和更新,是一個完美的選擇。一旦TTL超時,在最后一個消費(fèi)者停止消費(fèi)后,RabbitMQ會丟棄隊列.
    前面TTL相關(guān)食譜,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java ,  Consumer.java ,and  GetOne.java 相關(guān)文件。
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    在前面的例子中,擴(kuò)展只需要關(guān)注Consumer.java :
    1. 使用下面的代碼來創(chuàng)建或聲明交換器:
    channel.exchangeDeclare(exchange, "direct", false);
    2. 創(chuàng)建或聲明隊列,并為x-expires可選參數(shù)指定30,000毫秒的超時時間:
    Map<String, Object> arguments = new HashMap<String,Object>();
    arguments.put("x-expires", 30000);
    channel.queueDeclare(queue, false, false, false,arguments);
    3. 將隊列綁定到交換器上:
    channel.queueBind(queue, exchange, routingKey);
    如何工作
    當(dāng)我們運(yùn)行Consumer.java或 GetOne.java 文件的時候, 超時隊列已經(jīng)創(chuàng)建好了,在消費(fèi)者附著到隊列上或調(diào)用channel.basicGet()時,它將持續(xù)存在.
    只有當(dāng)我們停止這兩個操作超過30秒時,隊列才會被刪除,并且隊列包含的消息也會清除。
    TIP
    無論生產(chǎn)者是否向其發(fā)送了消息,隊列事實上都是獨(dú)立刪除的。

    在這個試驗課程中,我們可通過 rabbitmqctl list_queues 命令來監(jiān)控RabbitMQ 隊列狀態(tài).
    因此,我們可以想像一種場景,有一個統(tǒng)計分析程序需要重啟來更新其代碼。由于命名隊列有較長的超時時間,因此重啟時,不會丟失任何消息。如果我們停止,隊列會在超過TTL后被刪除,無價值的消息將不再存儲。
    管理駁回或過期消息
    在這個例子中,我們將展示如何使用死信交換器來管理過期或駁回的消息. 死信交換器是一種正常的交換器,死消息會在這里重定向,如果沒有指定,死消息會被broker丟棄。
    你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源碼文件:
    1. Producer.java
    2. Consumer.java
    要嘗試過期消息,你可以使用第一個代碼來發(fā)送帶TTL的消息,就如如何使指定隊列上消息過期食譜中描述的一樣.
    一旦啟動了,消費(fèi)者不允許消息過期,但可以可以駁回消息,最終導(dǎo)致成為死消息。
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    下面的步驟展示了使用死信交換器來管理過期或駁回消息:
    1. 創(chuàng)建一個工作交換品節(jié)和死信交換器:
    channel.exchangeDeclare(Constants.exchange, "direct", false);
    channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
    2. 創(chuàng)建使用使用死信交換器和 x-message-ttle參數(shù)的隊列:
    arguments.put("x-message-ttl", 10000);
    arguments.put("x-dead-letter-exchange",exchange_dead_letter);
    channel.queueDeclare(queue, false, false, false,arguments);
    3. 然后像下面這樣綁定隊列:
    channel.queueBind(queue, exchange, "");
    4. 最后使用channel.basicPublish()來向交換器發(fā)送消息 .
    5. 要嘗試駁回消息,我們需要配置一個消費(fèi)者,就像前面例子中看到的一樣,并使用下面的代碼來駁回消息:
    basicReject(envelope.getDeliveryTag(), false);
    如何工作
    我們先從第一個場景開始(單獨(dú)使用producer): the expired messages. 在步驟中,我們創(chuàng)建兩個交換器,工作交換器和死信交換器。在步驟2中,我們使用下面兩個可選參數(shù)來創(chuàng)建隊列:
    1. 使用arguments.put("x-message-ttl", 10000)來設(shè)置消息TTL ,正如如何使指定隊列上消息過期食譜中描述的一樣.
    2. 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)來設(shè)置死信交換器名稱;
    正如你所看到的,我們只是在配置中添加了可選的隊列參數(shù)。因此,當(dāng)生產(chǎn)者發(fā)送消息到交換器時,它會隊列參數(shù)來路由。消息會在10秒后過期,之后它會重定向到exchange_dead_letter 
    TIP
    死信交換器是一個標(biāo)準(zhǔn)的交換器,因此你可以基于任何目的來使用.
    對于第二種場景,食譜的消費(fèi)者會駁回消息.當(dāng)消費(fèi)者得到消息后, 它會使用basicReject()方法來發(fā)回一個否定應(yīng)答(nack),當(dāng)broker收到nack時,它會將消息重定向到exchange_dead_letter. 通過在死信交換器上綁定隊列,你可以管理這些消息。
    當(dāng)消息重定向到死信隊列時,broker會修改header消息,并在x-dead鍵中增加下面的值:
    1. reason : 表示隊列是否過期的或駁回的(requeue =false )
    2. queue : 表示隊列源,例如stat_queue_02/05
    3. time : 表示消息變?yōu)樗佬诺娜掌诤蜁r間
    4. exchange : 表示交換器,如monitor_exchange_02/05
    5. routing-keys : 表示發(fā)送消息時原先使用的路由鍵
    要在實踐中查看這些值,你可使用GetOneDeadLetterQ 類.這可以創(chuàng)建queue_dead_letter隊列并會綁定到exchange_dead_letter 
    更多
    你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")來指定死信路由鍵 ,它將會代替原來的路由鍵.這也就意味著你可以用不同的路由鍵來將不同消息路由到同一個隊列中。相當(dāng)棒。
    理解交替交換器擴(kuò)展
    目前,在第1章使用 AMQP中我們已經(jīng)展示了如何來處理未路由消息(消息發(fā)布到了交換器,但未能達(dá)到隊列). AMQP讓生產(chǎn)者通過此條件進(jìn)行應(yīng)答,并最終決定是否有需要再次將消息分發(fā)到不同的目的地。通過這種擴(kuò)展,我們可在broker中指定一個交替交換器來路由消息,而并不會對生產(chǎn)者造成更多的干預(yù),本食譜的代碼在Chapter02/Recipe05/Java/src/rmqexample .

    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    在本食譜中,我們會在Producer.java中聲明交替交換器.
    1. 將交換器的名字(無目的地路由消息)-alternateExchange ,放到可選參數(shù)map的"alternate-exchange"中,如下所示:
    Map<String, Object> arguments = new HashMap<String,Object>();
    arguments.put("alternate-exchange", alternateExchange);
    2. 通過傳遞arguments map來聲明交換器來發(fā)送消息:
    channel.exchangeDeclare(exchange, "direct", false, false,arguments);
    3. 聲明alternateExchange自身(已經(jīng)在步驟1中指定了),如下所示:
    channel.exchangeDeclare(alternateExchange, "direct",false);
    4. 聲明標(biāo)準(zhǔn)化持久化隊列,并使用路由鍵alertRK將其綁定到alternateExchange交換器中:
    channel.queueDeclare(missingAlertQueue, true, false, false,null);
    channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
    如何工作
    在這個例子中,我們再次使用了生成統(tǒng)計數(shù)據(jù)的producer,正如先前的例子一樣.但這次,我們添加了路由鍵來讓producer指定一個重要的級別,名為infoRK或alertRK (在例子中是隨機(jī)分配的).如果你運(yùn)行一個producer以及至少一個consumer,將不會丟失任何消息,并且一切都會正常工作.
    TIP
    Consumers在交換器和隊列的聲明中,必須傳遞相同的可選參數(shù),否則會拋出異常。
    但如果沒有消費(fèi)者監(jiān)聽的話,而我們不想丟失報警的話,這就是為什么必須選擇讓producer創(chuàng)建alternateExchange (步驟3)并將其綁定到持久隊列-missingAlertQueue的原因 (步驟4).
    在單獨(dú)運(yùn)行producer的時候,你將看到報警存儲在這里.alternate交換器讓我們在不丟失消息的情況下可以路由消息.你可通過調(diào)用rabbitmqctllist_queues或運(yùn)行CheckAlerts.java來檢查狀態(tài) .
    最后的代碼讓我們可以查看隊列的內(nèi)容和第一個消息,但不會進(jìn)行消費(fèi)。完成這種行為是簡單的,它足可以避免這種事實:RabbitMQ client發(fā)送了ack,消息未消費(fèi),而只是進(jìn)行監(jiān)控。
    現(xiàn)在,如果我們再次運(yùn)行Consumer.java文件,它會從missingAlertQueue隊列中獲取并消費(fèi)消息.這不是自動的,我們可以選擇性地從此隊列中獲取消息。
    通過創(chuàng)建第二個消費(fèi)者實例( missingAlertConsumer ) 并使用相同的代碼來從兩個不同隊列消費(fèi)消息就可以完成這種效果。如果在處理實時消息時,想要得到不同的行為,那么我們可以創(chuàng)建一個不同的消費(fèi)者。

    更多
    在這個例子中,步驟3和步驟4是可選的。 當(dāng)定義交換器時,可為交替交換器指定名稱,對于其是否存在或是否綁定到任何隊列上,并不作強(qiáng)制要求 。如果交替交換器不存在,生產(chǎn)者可通過在丟失消息上設(shè)置mandatory標(biāo)志來得到應(yīng)答,就如在第1章中處理未路由消息食譜中看到的一樣。
    甚至有可能出現(xiàn)另一種交換器-它自己的備用交換器,備用交換器可以是鏈?zhǔn)降模⑶覠o目的地消息在按序地重試,直到找到一個目的地。
    如果在交換器鏈的末尾仍然沒有找到目的地,消息將會丟失,生產(chǎn)者可通過調(diào)設(shè)置mandatory 標(biāo)志和指定一個合適的ReturnListener參數(shù)得到通知。
    理解經(jīng)過驗證的user-ID擴(kuò)展
    依據(jù)AMQP, 當(dāng)消費(fèi)者得到消息時,它是不知道發(fā)送者信息的。一般說來,消費(fèi)者不應(yīng)該關(guān)心是誰生產(chǎn)的消息,對于生產(chǎn)者-消費(fèi)者解藕來說是相當(dāng)有利的。然而,有時出于認(rèn)證需要,為了達(dá)到此目的,RabbitMQ 提供了有效的user-ID擴(kuò)展。
    在本例中,我們使用有效user-IDs模擬了訂單。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源碼.
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
    如何做
    完成下面的步驟,以使用經(jīng)過驗證的user IDs來模擬訂單:
    1. 像下面一樣聲明或使用持久化隊列:
    channel.queueDeclare(queue, true, false, false, null);
    2.發(fā)送消息時,使用BasicProperties對象,在消息頭中指定一個user ID:
    BasicProperties messageProperties = new BasicProperties.Builder()
    .timestamp(new Date())
    .userId("guest");
    channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
    3. 消費(fèi)者獲取到訂單后,可像下面這樣打印訂單數(shù)據(jù)和消息頭:
    System.out.println("The message has been placed by "+properties.getUserId());
    如何工作
    當(dāng)設(shè)置了user-ID時,RabbitMQ 會檢查是否是同一個用戶打開的連接。在這個例子中,用戶是guest ,即RabbitMQ默認(rèn)用戶.
    通過調(diào)用properties.getUserId() 方法,消費(fèi)者可以訪問發(fā)送者user ID。如果你想在步驟2中設(shè)置非當(dāng)前用戶的userId,channel.basicPublish()會拋出異常.
    TIP
    如果不使用user-ID屬性,用戶將是非驗證的,properties.getUserId()方法會返回null.
    也可參考
    要更好的理解這個例子,你應(yīng)該知道用戶和虛擬機(jī)管理,這部分內(nèi)容將在下個章節(jié)中講解。在下個章節(jié)中,我們將了解如何通過在應(yīng)用程序中使用SSL來提高程序的安全性。只使用user-ID屬性,我們可保證用戶已認(rèn)證,但所有信息都是未加密的,因此很容易暴露。
    隊列失敗時通知消費(fèi)者

    根據(jù)AMQP標(biāo)準(zhǔn),消費(fèi)者不會得到隊列刪除的通知。一個正在刪除隊列上等待消息的消費(fèi)者不會收到任何錯誤信息,并會無限期地等待。然而,RabbitMQ client提供了一種擴(kuò)展來讓消息收到一個cancel參數(shù)-即消費(fèi)者cancel通知。我們馬上就會看到這個例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代碼.

    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。

    如何做
    為了能讓擴(kuò)展工作,你只需要執(zhí)行下面的步驟:
    1.在自定義的消費(fèi)者中覆蓋handleCancel()方法,可繼承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
    public void handleCancel(String consumerTag) throws IOException {
    ...
    }
    如何工作
    在我們的例子中,我們選擇實現(xiàn)一個消費(fèi)者,這個消費(fèi)者只在生產(chǎn)者是持久化的,且隊列是由生產(chǎn)者創(chuàng)建的情況下才能工作。
    因此,如果隊列是非持久化的,Consumer.java文件會立即錯誤退出. 此行為可以通過調(diào)用channel.queueDeclarePassive()來完成 .
    Producer.java類在其啟動時會創(chuàng)建隊列,并在其關(guān)閉時調(diào)用channel.queueDelete()方法刪除隊列,如果當(dāng)隊列關(guān)閉時,而消費(fèi)者正在消費(fèi)隊列,那么RabbitMQ client會調(diào)用步驟1中覆蓋的handleCancel()方法來立即通知消費(fèi)者。
    相對于顯示調(diào)用channel.basicCancel() 消費(fèi)者使用handleCancel()方法可以任意理由來退出。只有在這種情況下,RabbitMQ client library會調(diào)用Consumer接口的方法:  handleCancelOK() 
    更多
    消費(fèi)者cancel通知是client library的擴(kuò)展,而不是AMQP client libraries的常規(guī)方法.一個實例它們的library必須將其聲明為可選屬性(參考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
    RabbitMQ client library 支持并聲明了這種特性。
    也可參考
    在集群中,如果一個節(jié)點(diǎn)失效了,也會發(fā)生同樣的事情:client在隊列刪除后仍然得不到通知,除非它定義了覆蓋了自己的handleCancel()方法。關(guān)于這點(diǎn)的更多信息,可參考Chapter 6,開發(fā)可伸縮性應(yīng)用程序。
    理解交換器到交換器擴(kuò)展
    默認(rèn)情況下,AMQP支持交換器到隊列,但不支持交換器到交換器綁定。在本例中,我們將展示如何使用RabbitMQ 交換機(jī)到交換機(jī)擴(kuò)展.
    在本例中,我們將合并來自兩個不同交換器的消息到第三個交換器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源碼.
    準(zhǔn)備
    為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣,并像廣播消息食譜中來運(yùn)行生產(chǎn)者以及使用topic交換器來處理消息路由。
    如何做
    完成下面的步驟來使用RabbitMQ 交換器到交換器擴(kuò)展:
    1. 使用下面的代碼來聲明我們需要追蹤消息的交換器:
    channel.exchangeDeclare(exchange, "topic", false);
    2. 使用exchangeBind()來綁定其它例子中的交換器 :
    channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
    channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
    3. 啟動追蹤消費(fèi)者:
    TraceConsumer consumer = new TraceConsumer(channel);
    String consumerTag = channel.basicConsume(myqueue, false,consumer);
    如何工作
    在步驟1中,我們創(chuàng)建了一個新的交換器,在步驟2中我們綁定到了下面的交換器:
    1. ref_exchange_c1_6 (廣播消息) 與exchange綁定.
    2. ref_exchange_c1_8 (使用topic來處理消息路由)與exchange綁定 .
    在步驟3中, 消費(fèi)者可以綁定一個隊列到exchange上以任意地獲取所有消息.
    交換器到交換器擴(kuò)展的工作方式與交換器到隊列綁定過程類似,你也可以指定一個路由鍵來過濾消息.在步驟2中,我們可以使用#(匹配所有消息)來作為路由鍵。通過改變路由鍵你可以使用制作一個filter!
    在消息中內(nèi)嵌消息目的地
    在本例子中,我們會展示如何發(fā)送單個發(fā)布帶路由鍵的的消息.標(biāo)準(zhǔn)AMQP不提供此特性,但幸運(yùn)的是,RabbitMQ使用消息屬性header提供了此特性. 這種擴(kuò)展稱為sender-selected分發(fā).
    此擴(kuò)展的行為類似于電子郵件邏輯.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).這也是為什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
    1. Producer.java
    2. Consumer.java
    3. StatsConsumer.java
    4. CCStatsConsumer.java
    5. BCCStatsConsumer.java
    準(zhǔn)備
    To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
    如何做
    完成下面的步驟來使用單個發(fā)布帶路由鍵的的消息:
    1. 使用下面的代碼來創(chuàng)建或聲明交換器:
    channel.exchangeDeclare(exchange, "direct", false);
    2. 在消息的header屬性中指定CC , BCC路由鍵:
    List<String> ccList = new ArrayList<String>();
    ccList.add(backup_alert_routing_key);
    headerMap.put("CC", ccList);
    List<String> ccList = new ArrayList<String>();
    bccList.add(send_alert_routing_key);
    headerMap.put("BCC", bccList);
    BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
    channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
    3. 使用下面的三個路由鍵來綁定三個隊列three queues to the exchange using the following three routing keys:
    channel.queueBind(myqueue,exchange, alert_routing_key);
    channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
    channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
    4. 使用三個消費(fèi)者來消費(fèi)消息
    如何工作
    當(dāng)生產(chǎn)者使用CC和BCC消息屬性來發(fā)送消息時,broker會在所有路由鍵的隊列上拷貝消息 。在本例中,stat類會直接使用路由鍵alert_routing_key來向交換器發(fā)送消息,同時它也會將消息拷貝到使用CC和BCC參數(shù)信息來將消息拷貝到myqueueCC_BK,myqueueBCC_SA隊列中。
    當(dāng)像e-mails一樣發(fā)生時,在分發(fā)消息到隊列前,BCC信息會被broker從消息頭中刪除,你可查看所有我們示例消費(fèi)者的輸出來觀察這種行為。
    更多
    正常情況下,AMQP不會改變消息頭,但BCC擴(kuò)展是例外。這種擴(kuò)展可減少發(fā)往broker的消息數(shù)目。沒有此擴(kuò)展,生產(chǎn)者只能使用不同的路由鍵來發(fā)送多個消息的拷貝。
    posted @ 2016-06-05 19:51 胡小軍 閱讀(1339) | 評論 (0)編輯 收藏

    概述

    RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關(guān)鍵類和接口有:

    • Channel
    • Connection
    • ConnectionFactory
    • Consumer
    協(xié)議操作可通過Channel接口來進(jìn)行.Connection用于開啟channels,注冊connection生命周期事件處理, 并在不需要時關(guān)閉connections.
    Connections是通過ConnectionFactory來初始化的,在ConnectionFactory中,你可以配置不同的connection設(shè)置,如:虛擬主機(jī)和用戶名等等.

    Connections 和 Channels

    核心API類是Connection和Channel, 它們代表對應(yīng)AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來導(dǎo)入:

    import com.rabbitmq.client.Connection; 
    import com.rabbitmq.client.Channel;

    連接到broker

    下面的代碼會使用給定的參數(shù)連接到AMQP broker:

    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    Connection conn = factory.newConnection();

    也可以使用URIs 來設(shè)置連接參數(shù):

    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
    Connection conn = factory.newConnection();


    Connection 接口可用來打開一個channel:

    Channel channel = conn.createChannel(); 

    channel現(xiàn)在可用來發(fā)送和接收消息,正如后續(xù)章節(jié)中描述的一樣.

    要斷開連接,只需要簡單地關(guān)閉channel和connection:

    channel.close(); conn.close();

    關(guān)閉channel被認(rèn)為是最佳實踐,但在這里不是嚴(yán)格必須的 - 當(dāng)?shù)讓舆B接關(guān)閉的時候,channel也會自動關(guān)閉.

    使用 Exchanges 和 Queues

    采用交換器和隊列工作的客戶端應(yīng)用程序,是AMQP高級別構(gòu)建模塊。在使用前,必須先聲明.聲明每種類型的對象都需要確保名稱存在,如果有必要須進(jìn)行創(chuàng)建.

    繼續(xù)上面的例子,下面的代碼聲明了一個交換器和一個隊列,然后再將它們進(jìn)行綁定.

    channel.exchangeDeclare(exchangeName, "direct", true); 
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, routingKey);

    這實際上會聲明下面的對象,它們兩者都可以可選參數(shù)來定制. 在這里,它們兩個都沒有特定參數(shù)。

    1. 一個類型為direct,且持久化,非自動刪除的交換器
    2. 采用隨機(jī)生成名稱,且非持久化,私有的,自動刪除隊列

    上面的函數(shù)然后使用給定的路由鍵來綁定隊列和交換器.

    注意,當(dāng)只有一個客戶端時,這是一種典型聲明隊列的方式:它不需要一個已知的名稱,其它的客戶端也不會使用它(exclusive),并會被自動清除(autodelete).
    如果多個客戶端想共享帶有名稱的隊列,下面的代碼應(yīng)該更適合:

    channel.exchangeDeclare(exchangeName, "direct", true); 
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    這實際上會聲明:

    1. 一個類型為direct,且持久化,非自動刪除的交換器
    2. 一個已知名稱,且持久化的,非私有,非自動刪除隊列

    注意,Channel API 的方法都是重載的。這些 exchangeDeclarequeueDeclare 和queueBind 都使用的是預(yù)設(shè)行為.
    這里也有更多參數(shù)的長形式,它們允許你按需覆蓋默認(rèn)行為,允許你完全控制。


    發(fā)由消息

    要向交換器中發(fā)布消息,可按下面這樣來使用Channel.basicPublish方法:

    byte[] messageBodyBytes = "Hello, world!".getBytes(); 
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

    為了更好的控制,你可以使用重載方法來指定mandatory標(biāo)志,或使用預(yù)先設(shè)置的消息屬性來發(fā)送消息:

    channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

    這會使用分發(fā)模式2(持久化)來發(fā)送消息, 優(yōu)先級為1,且content-type 為"text/plain".你可以使用Builder類來構(gòu)建你自己的消息屬性對象:

    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

    下面的例子使用自定義的headers來發(fā)布消息:

    Map<String, Object> headers = new HashMap<String, Object>(); 
    headers.put("latitude", 51.5252949);
    headers.put("longitude", -0.0905493);
    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

    下面的例子使用expiration來發(fā)布消息:

    channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

    BasicProperties is an inner class of the autogenerated holder class AMQP.

    Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

    Channels 和并發(fā)考慮(線程安全性)

    Channel 實例不能在多個線程間共享。應(yīng)用程序必須在每個線程中使用不同的channel實例,而不能將同個channel實例在多個線程間共享。 有些channl上的操作是線程安全的,有些則不是,這會導(dǎo)致傳輸時出現(xiàn)錯誤的幀交叉。
    在多個線程共享channels也會干擾Publisher Confirms.

    通過訂閱來來接收消息

    import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

    接收消息最高效的方式是用Consumer接口來訂閱。當(dāng)消息到達(dá)時,它們會自動地進(jìn)行分發(fā),而不需要顯示地請求

    當(dāng)在調(diào)用Consumers的相關(guān)方法時, 個別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務(wù)端來生成,參考 the AMQP specification document.
    同一個channel上的消費(fèi)者必須有不同的consumer tags.

    實現(xiàn)Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設(shè)置訂閱時,將其傳遞給basicConsume調(diào)用:

    boolean autoAck = false; 
    channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
    @Override
    publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
    String routingKey = envelope.getRoutingKey();
    String contentType = properties.getContentType();
    long deliveryTag = envelope.getDeliveryTag();
    // (process the message components here ...)
    channel.basicAck(deliveryTag, false);
    }
    });

    在這里,由于我們指定了autoAck = false,因此消費(fèi)者有必要應(yīng)答分發(fā)的消息,最便利的方式是在handleDelivery 方法中處理.

    更復(fù)雜的消費(fèi)者可能需要覆蓋更多的方法,實踐中,handleShutdownSignal會在channels和connections關(guān)閉時調(diào)用,handleConsumeOk 會在其它消費(fèi)者之前

    調(diào)用
    ,傳遞consumer tag(不明白,要研究)。

     

    消費(fèi)者可實現(xiàn)handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。

    你可以使用Channel.basicCancel來顯示地取消某個特定的消費(fèi)者:

    channel.basicCancel(consumerTag);

    passing the consumer tag.

    消費(fèi)者回調(diào)是在單獨(dú)線程上處理的,這意味著消費(fèi)者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上調(diào)用阻塞方法。

    每個Channel都有其自己的dispatch線程.對于一個消費(fèi)者一個channel的大部分情況來說,這意味著消費(fèi)者不會阻擋其它的消費(fèi)者。如果在一個channel上多個消費(fèi)者,則必須意識到長時間運(yùn)行的消費(fèi)者可能阻擋此channel上其它消費(fèi)者回調(diào)調(diào)度.

    獲取單個消息

    要顯示地獲取一個消息,可使用Channel.basicGet.返回值是一個GetResponse實例, 在它之中,header信息(屬性) 和消息body都可以提取:

    boolean autoAck = false; 
    GetResponse response = channel.basicGet(queueName, autoAck);
    if (response == null) {
    // No message retrieved.
    } else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

    因為autoAck = false,你必須調(diào)用Channel.basicAck來應(yīng)答你已經(jīng)成功地接收了消息:

    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

    處理未路由消息

    如果發(fā)布消息時,設(shè)置了"mandatory"標(biāo)志,但如果消息不能路由的話,broker會將其返回到發(fā)送客戶端 (通過 AMQP.Basic.Return 命令).

    要收到這種返回的通知, clients可實現(xiàn)ReturnListener接口,并調(diào)用Channel.setReturnListener.如果channel沒有配置return listener,那么返回的消息會默默地丟棄。

    channel.setReturnListener(new ReturnListener() {     
        publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
    ...
        }
    });

     return listener將被調(diào)用,例如,如果client使用"mandatory"標(biāo)志向未綁定隊列的direct類型交換器發(fā)送了消息.

    關(guān)閉協(xié)議

    AMQP client 關(guān)閉概述

    AMQP 0-9-1 connection和channel 使用相同的方法來管理網(wǎng)絡(luò)故障,內(nèi)部故障,以及顯示本地關(guān)閉.

    AMQP 0-9-1 connection  和 channel 有如下的生命周期狀態(tài):

    • open: 準(zhǔn)備要使用的對象
    • closing: 對象已顯示收到收到本地關(guān)閉通知, 并向任何支持的底層對象發(fā)出關(guān)閉請求,并等待其關(guān)閉程序完成
    • closed: 對象已收到所有底層對象的完成關(guān)閉通知,最終將執(zhí)行關(guān)閉操作

    這些對象總是以closed狀態(tài)結(jié)束的,不管基于什么原因引發(fā)的關(guān)閉,比如:應(yīng)用程序請求,內(nèi)部client library故障, 遠(yuǎn)程網(wǎng)絡(luò)請求或網(wǎng)絡(luò)故障.

    AMQP connection 和channel 對象會持有下面與關(guān)閉相關(guān)的方法:

    • addShutdownListener(ShutdownListener 監(jiān)聽器)和removeShutdownListener(ShutdownListener 監(jiān)聽器),用來管理監(jiān)聽器,當(dāng)對象轉(zhuǎn)為closed狀態(tài)時,將會觸發(fā)這些監(jiān)聽器.注意,在已經(jīng)關(guān)閉的對象上添加一個ShutdownListener將會立即觸發(fā)監(jiān)聽器
    • getCloseReason(), 允許同其交互以了解對象關(guān)閉的理由
    • isOpen(), 用于測試對象是否處于open狀態(tài)
    • close(int closeCode, String closeMessage), 用于顯示通知對象關(guān)閉

    可以像這樣來簡單使用監(jiān)聽器:

    import com.rabbitmq.client.ShutdownSignalException; 
    import com.rabbitmq.client.ShutdownListener;
    connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause) { ... } }
    );

    關(guān)閉環(huán)境信息

    可通過顯示調(diào)用getCloseReason()方法或通過使用ShutdownListener類中的業(yè)務(wù)方法的cause參數(shù)來ShutdownSignalException中獲取關(guān)閉原因的有用信息.

    ShutdownSignalException 類提供方法來分析關(guān)閉的原因.通過調(diào)用isHardError()方法,我們可以知道是connection錯誤還是channel錯誤.getReason()會返回相關(guān)cause的相關(guān)信息,這些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引發(fā)的異常,如網(wǎng)絡(luò)通信故障,在這種情況下,可通過getCause()方法來獲取信息).

    public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication()) {
    Method reason = cause.getReason(); ... } ... }
    else { Channel ch = (Channel)cause.getReference(); ... } }

    原子使用isOpen()方法

    channel和connection對象的isOpen()方法不建議在在生產(chǎn)代碼中使用,因為此方法的返回值依賴于shutdown cause的存在性. 下面的代碼演示了竟?fàn)帡l件的可能性:

    public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

    相反,我們應(yīng)該忽略這種檢查,并簡單地嘗試這種操作.如果代碼執(zhí)行期間,connection的channel關(guān)閉了,那么將拋出ShutdownSignalException,這就表明對象處于一種無效狀態(tài)了.當(dāng)broker意外關(guān)閉連接時,我們也應(yīng)該捕獲由SocketException引發(fā)的IOException,或者當(dāng)broker清理關(guān)閉時,捕獲ShutdownSignalException.

    public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

    高級連接選項

    Consumer線程池

    Consumer 線程默認(rèn)是通過一個新的ExecutorService線程池來自動分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的線程池.下面的示例展示了一個比正常分配稍大的線程池:

    ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
    Executors 和 ExecutorService 都是java.util.concurrent包中的類.

    當(dāng)連接關(guān)閉時,默認(rèn)的ExecutorService將會被shutdown(), 但用戶自定義的ExecutorService (如上面所示)將不會被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關(guān)閉(通過調(diào)用它的shutdown() 方法), 否則池中的線程可能會阻止JVM終止.

    同一個executor service,可在多個連接之間共享,或者連續(xù)地在重新連接上重用,但在shutdown()后,則不能再使用.

    使用這種特性時,唯一需要考慮的是:在消費(fèi)者回調(diào)的處理過程中,是否有證據(jù)證明有嚴(yán)重的瓶頸. 如果沒有消費(fèi)者執(zhí)行回調(diào),或很少,默認(rèn)的配置是綽綽有余. 開銷最初是很小的,分配的全部線程資源也是有界限的,即使偶爾可能出現(xiàn)一陣消費(fèi)活動.

    使用Host列表

    可以傳遞一個Address數(shù)組給newConnection()Address只是 com.rabbitmq.client 包中包含host和port組件的簡單便利類. 例如:

    Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
    將會嘗試連接hostname1:portnumber1, 如果不能連接,則會連接hostname2:portnumber2,然后會返回數(shù)組中第一個成功連接(不會拋出IOException)上broker的連接.這完全等價于在factory上重復(fù)調(diào)用factory.newConnection()方法來設(shè)置host和port, 直到有一個成功返回.

    如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么線程池將與第一個成功連接相關(guān)聯(lián).

    心跳超時

    參考Heartbeats guide 來了解更多關(guān)于心跳及其在Java client中如何配置的更多信息.

    自定義線程工廠

    像Google App Engine (GAE)這樣的環(huán)境會限制線程直接實例化. 在這樣的環(huán)境中使用RabbitMQ Java client, 需要配置一個定制的ThreadFactory,即使用合適的方法來實例化線程,如: GAE's ThreadManager. 下面是Google App Engine的相關(guān)代碼.

    import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

    網(wǎng)絡(luò)故障時自動恢復(fù)

    Connection恢復(fù)

    clients和RabbitMQ節(jié)點(diǎn)之間的連接可發(fā)生故障. RabbitMQ Java client 支持連接和拓?fù)?queues, exchanges, bindings, 和consumers)的自動恢復(fù). 大多數(shù)應(yīng)用程序的連接自動恢復(fù)過程會遵循下面的步驟:

    1. 重新連接
    2. 恢復(fù)連接監(jiān)聽器
    3. 重新打開通道
    4. 恢復(fù)通道監(jiān)聽器
    5. 恢復(fù)通道basic.qos 設(shè)置,發(fā)布者確認(rèn)和事務(wù)設(shè)置
    拓?fù)浠謴?fù)包含下面的操作,每個通道都會執(zhí)行下面的步驟:
    1. 重新聲明交換器(except for predefined ones)
    2. 重新聲明隊列
    3. 恢復(fù)所有綁定
    4. 恢復(fù)所有消費(fèi)者
    要啟用自動連接恢復(fù),須使用factory.setAutomaticRecoveryEnabled(true):
    ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
    如果恢復(fù)因異常失敗(如. RabbitMQ節(jié)點(diǎn)仍然不可達(dá)),它會在固定時間間隔后進(jìn)行重試(默認(rèn)是5秒). 時間間隔可以進(jìn)行配置:
    ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
    當(dāng)提供了address列表時,將會在所有address上逐個進(jìn)行嘗試:
    ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

    恢復(fù)監(jiān)聽器

    可在可恢復(fù)連接和通道上注冊一個或多個恢復(fù)監(jiān)聽器. 當(dāng)啟用了連接恢復(fù)時,ConnectionFactory#newConnection 和 Connection#createChannel 的連接已實現(xiàn)了com.rabbitmq.client.Recoverable,并提供了兩個方法:

    • addRecoveryListener
    • removeRecoveryListener
    注意,在使用這些方法時,你需要將connections和channels強(qiáng)制轉(zhuǎn)換為Recoverable.

    發(fā)布影響

    當(dāng)連接失敗時,使用Channel.basicPublish方法發(fā)送的消息將會丟失. client不會保證在連接恢復(fù)后,消息會得到分發(fā).要確保發(fā)布的消息到達(dá)了RabbitMQ,應(yīng)用程序必須使用Publisher Confirms 


    拓?fù)浠謴?fù)

    拓?fù)浠謴?fù)涉及交換器,隊列,綁定以及消費(fèi)者恢復(fù).默認(rèn)是啟用的,但也可以禁用:

    ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

    手動應(yīng)答和自動恢復(fù)

    當(dāng)使用手動應(yīng)答時,在消息分發(fā)與應(yīng)答之間可能存在網(wǎng)絡(luò)連接中斷. 在連接恢復(fù)后,RabbitMQ會在所有通道上重設(shè)delivery標(biāo)記. 也就是說,使用舊delivery標(biāo)記的basic.ackbasic.nack, 以及basic.reject將會引發(fā)channel exception. 為了避免發(fā)生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復(fù)期間單調(diào)地增長. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以轉(zhuǎn)換這些 delivery tags,并且不再發(fā)送過期的delivery tags. 使用手動應(yīng)答和自動恢復(fù)的應(yīng)用程序必須負(fù)責(zé)處理重新分發(fā).

    未處理異常

    關(guān)于connection, channel, recovery, 和consumer lifecycle 的異常將會委派給exception handler,Exception handler是實現(xiàn)了ExceptionHandler接口的任何對象. 默認(rèn)情況下,將會使用DefaultExceptionHandler實例,它會將異常細(xì)節(jié)輸出到標(biāo)準(zhǔn)輸出上.

    可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用于由此factory創(chuàng)建的所有連接:

    ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
    Exception handlers 應(yīng)該用于異常日志.

    Google App Engine上的RabbitMQ Java Client

    在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個自定義的線程工廠來初始化線程,如使用GAE's ThreadManager. 此外,還需要設(shè)置一個較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時:

    ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

    警告和限制

    為了能使拓?fù)浠謴?fù), RabbitMQ Java client 維持了聲明隊列,交換器,綁定的緩存. 緩存是基于每個連接的.某些RabbitMQ的特性使得客戶端不能觀察到拓?fù)涞淖兓?如,當(dāng)隊列因TTL被刪除時. RabbitMQ Java client 會嘗試在下面的情況中使用緩存實體失效:

    • 當(dāng)隊列被刪除時.
    • 當(dāng)交換器被刪除時.
    • 當(dāng)綁定被刪除時.
    • 當(dāng)消費(fèi)者在自動刪除隊列上退出時.
    • 當(dāng)隊列或交換器不再綁定到自動刪除的交換器上時.
    然而, 除了單個連接外,client不能跟蹤這些拓?fù)渥兓? 依賴于自動刪除隊列或交換器的應(yīng)用程序,正如TTL隊列一樣 (注意:不是消息TTL!), 如果使用了自動連接恢復(fù),在知道不再使用或要刪除時,必須明確地刪除這些緩存實體,以凈化 client-side 拓?fù)鋍ache. 這些可通過Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind來進(jìn)行.

    RPC (Request/Reply) 模式

    為了便于編程, Java client API提供了一個使用臨時回復(fù)隊列的RpcClient類來提供簡單的RPC-style communication.

    此類不會在RPC參數(shù)和返回值上強(qiáng)加任何特定格式. 它只是簡單地提供一種機(jī)制來向帶特定路由鍵的交換器發(fā)送消息,并在回復(fù)隊列上等待響應(yīng).

    import com.rabbitmq.client.RpcClient;  
    RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

    (其實現(xiàn)細(xì)節(jié)為:請求消息使用basic.correlation_id唯一值字段來發(fā)送消息,并使用basic.reply_to來設(shè)置響應(yīng)隊列的名稱.)

    一旦你創(chuàng)建此類的實例,你可以使用下面的任意一個方法來發(fā)送RPC請求:

    byte[] primitiveCall(byte[] message); 
    String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

    primitiveCall 方法會將原始byte數(shù)組轉(zhuǎn)換為請求和響應(yīng)的消息體. stringCall只是一個primitiveCall的簡單包裝,將消息體視為帶有默認(rèn)字符集編碼的String實例.

    mapCall 變種稍為有些復(fù)雜: 它會將原始java值包含在java.util.Map中,并將其編碼為AMQP 0-9-1二進(jìn)制表示形式,并以同樣的方式來解碼response. (注意:在這里,對一些值對象類型有所限制,具體可參考javadoc.)

    所有的編組/解組便利方法都使用primitiveCall來作為傳輸機(jī)制,其它方法只是在它上面的做了一個封裝.

    posted @ 2016-06-04 00:37 胡小軍 閱讀(15635) | 評論 (1)編輯 收藏
    僅列出標(biāo)題
    共5頁: 上一頁 1 2 3 4 5 下一頁 
    主站蜘蛛池模板: 亚洲午夜电影在线观看高清| 亚洲欧洲无码一区二区三区| 亚洲色最新高清av网站| 一级毛片成人免费看a| 99久久人妻精品免费一区| 青草草在线视频永久免费| 亚洲综合国产一区二区三区| 久久久久se色偷偷亚洲精品av| jizz免费观看| 99久久国产热无码精品免费| 亚洲免费在线观看| 亚洲成年人电影在线观看| 免费亚洲视频在线观看| 久久国产免费观看精品3| 亚洲AⅤ视频一区二区三区| 777亚洲精品乱码久久久久久 | 亚洲伊人久久大香线蕉| 污视频网站免费观看| 69精品免费视频| 亚洲午夜精品久久久久久浪潮| 亚洲午夜电影一区二区三区| 国产精品免费观看视频| 青青久在线视频免费观看| 亚洲无av在线中文字幕| 亚洲成a人无码亚洲成www牛牛| 无码人妻一区二区三区免费看| 免费中文字幕在线观看| 精品亚洲AV无码一区二区三区| 久久久精品视频免费观看| 狼友av永久网站免费观看| 91亚洲精品视频| 你懂的在线免费观看| 又粗又硬又大又爽免费视频播放| 91亚洲性爱在线视频| 大地资源在线资源免费观看| 免费成人午夜视频| 亚洲人成自拍网站在线观看| 精品一区二区三区免费毛片爱| 亚洲国产日韩在线视频| 野花视频在线官网免费1| 成人免费在线视频|