這是1.6版本中加入的新屬性.當(dāng)容器啟動(dòng)時(shí),如果此屬性為true (默認(rèn)為false), 容器會(huì)檢查上下文中聲明的隊(duì)列是否中間件中存在的隊(duì)列是否一致.
如果屬性不匹配(如. auto-delete
) 或參數(shù) (e.g. x-message-ttl
) 存在, 容器 (和應(yīng)用程序上下文) 會(huì)拋出致命異常而導(dǎo)致啟動(dòng)失敗.如果是在恢復(fù)期間檢測(cè)到的問(wèn)題,容器會(huì)停止.
必須在上下文中存在單個(gè)RabbitAdmin
(或使用rabbitAdmin
屬性在容器上特別配置);否則此屬性必須為false
.
如果在初始啟動(dòng)期間,中間件還不可用,容器啟動(dòng)后,當(dāng)建立連接時(shí)會(huì)檢查條件.
重要
該檢查針對(duì)的是上下文的所有隊(duì)列,而不僅僅是特定監(jiān)聽器配置使用的隊(duì)列.如果你希望只檢查容器使用的隊(duì)列,你需要為這個(gè)容器配置單獨(dú)的RabbitAdmin
, 并使用rabbitAdmin
屬性為其提供一個(gè)引用.
參考“Conditional Declaration”章節(jié)來(lái)了解更多信息.
屬性
autoDeclare(auto-declare)
描述
從1.4版本開始, SimpleMessageListenerContainer
引入了這個(gè)新屬性.
當(dāng)設(shè)置為true
時(shí)(默認(rèn)值),容器會(huì)使用RabbitAdmin
來(lái)重新聲明所有 AMQP 對(duì)象(Queues, Exchanges, Bindings).
如果在啟動(dòng)期間探測(cè)到至少有一個(gè)隊(duì)列缺失了,可能因?yàn)樗亲詣?dòng)刪除隊(duì)列或過(guò)期隊(duì)列,但不管隊(duì)列缺失是基于什么原因,重新聲明仍會(huì)進(jìn)行處理(譯者注:太浪費(fèi)了).
要禁用這種行為, 可設(shè)置其屬性為false
. 但需要注意的是,如果所有隊(duì)列都缺失了(譯者注:全部還是部分),容器會(huì)啟動(dòng)失敗.
在1.6版本之前,如果在上下文中存在多個(gè)admin,容器會(huì)隨機(jī)選擇一個(gè).反之,如果沒(méi)有admin,它會(huì)從內(nèi)部創(chuàng)建一個(gè).
無(wú)論是哪種情況,這都將導(dǎo)致非預(yù)期結(jié)果出現(xiàn). 從1.6版本開始,為了能使autoDeclare
工作,必須要上下文中明確存在一個(gè)RabbitAdmin
,或者特定實(shí)例的引用必須要在容器中使用rabbitAdmin屬性中配置
.屬性
declarationRetries(declaration-retries)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.
用于設(shè)置被動(dòng)聲明失敗時(shí),重新嘗試的次數(shù).被動(dòng)聲明發(fā)生在當(dāng)消費(fèi)者啟動(dòng)了或從多個(gè)隊(duì)列中消費(fèi)時(shí),初始化期間部分隊(duì)列還不可用的情況下.
當(dāng)重試次數(shù)用完后,如果還是不能被動(dòng)聲明配置隊(duì)列,那么上面的missingQueuesFatal屬性將控制容器行為. 默認(rèn): 3次重試 (4 次嘗試).
屬性
failedDeclarationRetryInterval(failed-declaration-retry-interval)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.
重新嘗試被動(dòng)聲明的時(shí)間間隔. 被動(dòng)聲明發(fā)生在當(dāng)消費(fèi)者啟動(dòng)了或從多個(gè)隊(duì)列中消費(fèi)時(shí),初始化期間部分隊(duì)列還不可用的情況下. 默認(rèn): 5000 (5秒).
屬性
retryDeclarationInterval(missing-queue-retry-interval)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.
如果配置隊(duì)列的一個(gè)子集在消費(fèi)者初始化過(guò)程中可用,則消費(fèi)者將從這些隊(duì)列中開始消費(fèi)。消費(fèi)者將被動(dòng)地使用此間隔聲明丟失的隊(duì)列。
當(dāng)這個(gè)間隔過(guò)去后,會(huì)再次使用declarationRetries 和 failedDeclarationRetryInterval.
如果還有缺失隊(duì)列,消費(fèi)者在重新嘗試之前會(huì)等待此時(shí)間間隔.
這個(gè)過(guò)程會(huì)不停地進(jìn)行到所有隊(duì)列可用. 默認(rèn): 60000 (1分鐘).
屬性
consumerTagStrategy(consumer-tag-strategy)
描述
從1.4.5版本開始,SimpleMessageListenerContainer
有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.
之間,只能使用中間件生成的consumer tags;盡管現(xiàn)在這仍是默認(rèn)的配置,但現(xiàn)在你可以提供一個(gè)ConsumerTagStrategy的實(shí)現(xiàn), 這樣就可為每個(gè)消費(fèi)者創(chuàng)建獨(dú)特的tag.
屬性
idleEventInterval(idle-event-integer)
描述
從1.6版本開始,SimpleMessageListenerContainer
有了這個(gè)新屬性.
參考"Detecting Idle Asynchronous Consumers"章節(jié).
3.1.16 監(jiān)聽器并發(fā)
默認(rèn)情況下,監(jiān)聽器容器會(huì)啟動(dòng)單個(gè)消費(fèi)者來(lái)接收隊(duì)列中的消息.
當(dāng)檢查前面章節(jié)中的表格時(shí),你會(huì)發(fā)現(xiàn)有許多屬性可控制并發(fā).最簡(jiǎn)單的是concurrentConsumers
, 它會(huì)創(chuàng)建固定數(shù)量的消費(fèi)者來(lái)并發(fā)處理消息.
在1.3.0版本之前,這只能在容器停止時(shí)才可設(shè)置.
從1.3.0版本開始,你可以動(dòng)態(tài)調(diào)整 concurrentConsumers
屬性.如果容器運(yùn)行時(shí)修改了,會(huì)根據(jù)新設(shè)置來(lái)調(diào)需要的消費(fèi)者(添加或刪除).
此外,在容器中添加了一個(gè)新屬性 maxConcurrentConsumers
來(lái)基于工作負(fù)載來(lái)動(dòng)態(tài)調(diào)整并發(fā)數(shù).
它可與其它四個(gè)屬性一起工作: consecutiveActiveTrigger
, startConsumerMinInterval
, consecutiveIdleTrigger
, stopConsumerMinInterval
.
在默認(rèn)設(shè)置的情況下,加大消費(fèi)者的算法如下:
如果還沒(méi)有達(dá)到maxConcurrentConsumers
,如果現(xiàn)有消費(fèi)者活動(dòng)了10個(gè)連續(xù)周期且離最后消費(fèi)者啟動(dòng)至少消逝了10秒鐘,那么將啟動(dòng)新的消費(fèi)者. 如果消費(fèi)者在txSize
* receiveTimeout
毫秒內(nèi)至少收到一個(gè)消息,那么就認(rèn)為此消費(fèi)者是活動(dòng)的.
在默認(rèn)設(shè)置的情況下,減少消費(fèi)者的算法如下:
如果有超過(guò)concurrentConsumers
數(shù)量的消費(fèi)者在運(yùn)行,且檢測(cè)到消費(fèi)者連續(xù)超時(shí)(空閑)了10個(gè)周期,且最后一個(gè)消費(fèi)者至少停止了60秒,那么消費(fèi)者將停止.
超時(shí)依賴于receiveTimeout
和 txSize
屬性.當(dāng)在txSize
* receiveTimeout
毫秒內(nèi)未收到消息,則認(rèn)為消費(fèi)者是空閑的.
因此,當(dāng)有默認(rèn)超時(shí)(1秒)和 txSize為
4,那么在空閑40秒后,會(huì)認(rèn)為消費(fèi)者是空閑的并會(huì)停止(4超時(shí)對(duì)應(yīng)1個(gè)空閑檢測(cè)).
實(shí)際上,如果整個(gè)容器空閑一段時(shí)間,消費(fèi)者將只會(huì)被停止。這是因?yàn)閎roker將分享其在所有活躍的消費(fèi)者的工作。
3.1.17 專用消費(fèi)者
也是從1.3版本開始,監(jiān)聽器容器可配置單個(gè)專用消費(fèi)者; 這可以阻其它容器來(lái)消費(fèi)隊(duì)列直到當(dāng)前消費(fèi)者退出.
這樣的容器的并發(fā)性必須是1。
當(dāng)使用專用消費(fèi)者時(shí),其它容器會(huì)根據(jù)recoveryInterval
屬性來(lái)消費(fèi)隊(duì)列, 如果嘗試失敗,會(huì)記錄一個(gè) WARNing 信息.
3.1.18 監(jiān)聽器容器隊(duì)列
1.3版本在監(jiān)聽器容器中引入許多處理多個(gè)隊(duì)列的改善措施.
容器配置必須監(jiān)聽至少一個(gè)隊(duì)列以上; 以前也是這樣的情況,但現(xiàn)在可以在運(yùn)行時(shí)添加和刪除隊(duì)列了。當(dāng)任何預(yù)先獲取的消息被處理后,容器將回收(取消和重新創(chuàng)建)。
參考方法addQueues
, addQueueNames
, removeQueues
and removeQueueNames
.當(dāng)刪除隊(duì)列時(shí),至少要保留一個(gè)隊(duì)列.
現(xiàn)在,只要有可用隊(duì)列消費(fèi)者就會(huì)啟動(dòng) -先前如果沒(méi)有可用隊(duì)列,容器會(huì)停止.現(xiàn)在,唯一的問(wèn)題是是否有可用隊(duì)列.如果只是部分隊(duì)列可用,容器會(huì)每60秒嘗試被動(dòng)聲明(和消費(fèi))缺失隊(duì)列.
此外,如果消費(fèi)才從broker中收到了通道(例如,隊(duì)列被刪除)消費(fèi)者會(huì)嘗試重新恢復(fù),重新恢復(fù)的消費(fèi)會(huì)繼續(xù)處理來(lái)自其它配置隊(duì)列中的消息. 之前是隊(duì)列上的取消會(huì)取消整個(gè)消費(fèi)者,最終容器會(huì)因缺失隊(duì)列而停止.
如果你想永久刪除隊(duì)列,你應(yīng)該在刪除隊(duì)列的之前或之后更新容器,以避免消費(fèi).
3.1.19 恢復(fù):從錯(cuò)誤和代理失敗中恢復(fù)
介紹
Spring提供了一些關(guān)鍵的 (最流行的)高級(jí)特性來(lái)處理協(xié)議錯(cuò)誤或中間件失敗時(shí)的恢復(fù)與自動(dòng)重連接.
主要的重連接特性可通過(guò)CachingConnectionFactory
自身來(lái)開啟. 它也常有利于使用rabbitadmin自動(dòng)聲明的特點(diǎn).
除此之外, 如果你關(guān)心保證投遞,你也許需要在RabbitTemplate中使用channelTransacted
標(biāo)記以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO
(或者自己來(lái)手動(dòng)應(yīng)答) .
交換器、隊(duì)列和綁定的自動(dòng)聲明
RabbitAdmin
組件在啟動(dòng)時(shí)可聲明交換器,隊(duì)列,綁定.它是通過(guò)ConnectionListener懶執(zhí)行的
,因此如果啟動(dòng)時(shí)broker不存在,也沒(méi)有關(guān)系.
Connection
第一次使用時(shí)(如.發(fā)送消息) ,監(jiān)聽器會(huì)被觸發(fā),admin功能也會(huì)應(yīng)用.這種在監(jiān)聽器中自動(dòng)聲明的好處是,如果連接出于任何原因斷開了,(如. broker死了,網(wǎng)絡(luò)中斷問(wèn)題.),它們會(huì)在下次有需要的時(shí)候重新應(yīng)用.
這種方式的隊(duì)列聲明必須要有固定的名稱;要么是明確聲明,要么是由框架生成AnonymousQueue
.匿名隊(duì)列是非持久化的,專用的,且自動(dòng)刪除的.
重要
自動(dòng)聲明只在cachingConnectionFactory
緩存模式是CHANNEL
(默認(rèn))才可用. 這種限制的存在是因?yàn)閷S煤妥詣?dòng)刪除隊(duì)列是綁定到connection上的.
如果你在同步序列中使用RabbitTemplate時(shí)丟失了broker的連接,那么Spring AMQP會(huì)拋出一個(gè)AmqpException
(通常但并不總是AmqpIOException
).
我們不想隱藏存在問(wèn)題的事實(shí),因此你可以捕獲并對(duì)異常進(jìn)行處理.如果你懷疑連接丟失了,而且這不是你的錯(cuò),那么最簡(jiǎn)單的事情就是執(zhí)行再次嘗試操作. 重試操作可以手動(dòng)進(jìn)行,也可以使用Spring Retry來(lái)處理重試(強(qiáng)制或聲明).
Spring Retry 提供了兩個(gè)AOP攔截器并提供非常靈活的方式來(lái)指定retry的參數(shù)(嘗試的次數(shù),異常類型, 補(bǔ)償算法等等.). Spring AMQP同時(shí)也提供了一些方便的工廠bean來(lái)創(chuàng)建Spring Retry攔截器, 你可以使用強(qiáng)類型回調(diào)接口來(lái)實(shí)現(xiàn)恢復(fù)邏輯.參考Javadocs和 StatefulRetryOperationsInterceptor
和StatelessRetryOperationsInterceptor
的屬性來(lái)了解更多詳情.
如果沒(méi)有事務(wù),或者如果一個(gè)事務(wù)是在重試回調(diào)中啟動(dòng)的話,則無(wú)狀態(tài)重試是適當(dāng)?shù)摹W⒁猓鄬?duì)于有狀態(tài)重試,無(wú)狀態(tài)重試只是簡(jiǎn)單配置和分析,如果存在一個(gè)正在進(jìn)行的事務(wù)必須回滾或肯定會(huì)回滾的話, 這種無(wú)狀態(tài)重試則是不合適的.
在事務(wù)中間掉下來(lái)的連接與回退有同樣的效果, 所以對(duì)于事務(wù)開始于堆棧上的重連接來(lái)說(shuō),有狀態(tài)重試通常是最佳選擇(so for reconnection where the transaction is started higher up the stack, stateful retry is usually the best choice).
從1.3版本開始,提供了builder API來(lái)幫助在Java中使用這些攔截器(或者在 @Configuration
類中),例如:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有部分retry特性能通過(guò)這種方式,更加高級(jí)的特性需要在RetryTemplate
中配置.
參考Spring Retry Javadocs 來(lái)了解可用策略,配置的完整信息.
消息監(jiān)聽器和異步情況
如果 MessageListener
因業(yè)務(wù)異常而失敗,異常可由消息監(jiān)聽器容器來(lái)處理,然后它會(huì)繼續(xù)回去監(jiān)聽其它信息.如果失敗是由于掉下的連接引起的(非業(yè)務(wù)異常),那么監(jiān)聽此消費(fèi)者的監(jiān)聽器將退出和重啟.
SimpleMessageListenerContainer
可以無(wú)逢地進(jìn)行處理,并且它會(huì)在日志中記錄監(jiān)聽器即將重啟.
事實(shí)上,它會(huì)循環(huán)不斷地嘗試重新啟動(dòng)消費(fèi)者,只有當(dāng)消費(fèi)者有非常糟糕的行為時(shí),才會(huì)放棄。一個(gè)副作用是,如果broker在容器啟動(dòng)時(shí)關(guān)閉,它將會(huì)繼續(xù)嘗試直到建立一個(gè)連接。
業(yè)務(wù)異常處理, 相對(duì)于協(xié)議錯(cuò)誤和連接丟失,它可能需要更多考慮和一些自定義配置,特別是處于事務(wù)或 容器應(yīng)答時(shí).
在2.8.x版本之前, RabbitMQ對(duì)于死信行為沒(méi)有定義,因此默認(rèn)情況下,一個(gè)因拒絕或因業(yè)務(wù)異常導(dǎo)致回退的消息可循環(huán)往復(fù)地重新分發(fā).
要限制客戶端的重新分發(fā)的次數(shù),一個(gè)選擇是在監(jiān)聽器的通知鏈中添加一個(gè)StatefulRetryOperationsInterceptor
. 攔截器有一個(gè)實(shí)現(xiàn)了自定義死信動(dòng)作的恢復(fù)回調(diào):
什么是適合你的特定的環(huán)境。
另一個(gè)選擇是設(shè)置容器的rejectRequeued屬性為false. 這會(huì)導(dǎo)致丟棄所有失敗的消息.當(dāng)使用RabbitMQ 2.8.x+時(shí),這也有利于傳遞消息到一個(gè)死的信件交換。
或者,你可以拋出一個(gè)AmqpRejectAndDontRequeueException
;這會(huì)阻止消息重新入列,不管defaultRequeueRejected
屬性設(shè)置的是什么.
通常情況下,可以組合使用這兩種技術(shù) 在通知鏈中使用StatefulRetryOperationsInterceptor
, 在此處是MessageRecover
拋出AmqpRejectAndDontRequeueException
. MessageRecover
會(huì)一直調(diào)用,直到耗盡了所有重試.
默認(rèn)MessageRecoverer
只是簡(jiǎn)單的消費(fèi)錯(cuò)誤消息,并發(fā)出WARN消息.在這種情況下,消息是通過(guò)應(yīng)答的,且不會(huì)發(fā)送到死信交換器中.
從1.3版本開始,提供了一個(gè)新的RepublishMessageRecoverer
,它允許在重試次數(shù)耗盡后,發(fā)布失敗消息:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
.build();
}
RepublishMessageRecoverer
會(huì)使用消息頭的額外信息來(lái)發(fā)布,這些信息包括異常信息,棧軌跡,原始交換器和路由鍵.額外的頭可通過(guò)創(chuàng)建其子類和覆蓋additionalHeaders()
方法來(lái)添加.
Spring Retry 可以非常靈活地決定哪些異常可調(diào)用重試. 默認(rèn)配置是對(duì)所有異常都進(jìn)行重試.用戶異常可以包裝在ListenerExecutionFailedException
中,我們需要確保分類檢查異常原因. 默認(rèn)的分類只是看頂部級(jí)別的異常。
從 Spring Retry 1.0.3開始, BinaryExceptionClassifier
有一個(gè)屬性traverseCauses
(默認(rèn)為false
). 當(dāng)當(dāng)為true時(shí),它將遍歷異常的原因,直到它找到一個(gè)匹配或沒(méi)有原因。
要使用分類重試,需要使用一個(gè)SimpleRetryPolicy
,其構(gòu)造函數(shù)將接受最大嘗試次數(shù),Exception的Map,以及一個(gè)boolean值(traverseCauses),且還需要將此策略注入給RetryTemplate
.
3.1.20 調(diào)試
Spring AMQP 提供廣泛的日志記錄,尤其是在DEBUG級(jí)別.
如果你想在應(yīng)用程序和broker之間監(jiān)控AMQP協(xié)議,你可以使用像WireShark的工具, 它有一個(gè)插件可用于解碼協(xié)議.
另一個(gè)選擇是, RabbitMQ java client自身攜帶了一個(gè)非常有用的工具類:Tracer
.當(dāng)以main方式運(yùn)行時(shí),默認(rèn)情況下,它監(jiān)聽于5673 ,并連接本地的5672端口.
只需要簡(jiǎn)單的運(yùn)行它,并修改你的連接工廠配置,將其連接到本地的5673端口. 它就會(huì)在控制臺(tái)中顯示解碼的協(xié)議信息.參考Tracer
javadocs 來(lái)了解詳細(xì)信息.
3.2 Logging Subsystem AMQP Appenders
框架為多個(gè)流行的日志系統(tǒng)提供了日志appenders:
- log4j (從Spring AMQP1.1版本開始)
- logback (從Spring AMQP1.4版本開始)
- log4j2 (從Spring AMQP1.6版本開始)
appenders使用正常機(jī)制為為子系統(tǒng)配置,可用屬性參照下面的規(guī)定。
3.2.1 共同屬性
下面的屬性對(duì)于所有appenders都可用:
Table 3.4. 共同Appender屬性
Property | Default | Description |
---|
exchangeName | logs | 用于發(fā)布日志事件的交換器名稱. |
exchangeType | topic | 發(fā)布日志事件的交換器類型- 只在appender聲明了交換器的情況下才需要. 參考declareExchange . |
routingKeyPattern | %c.%p | 日志子系統(tǒng)生成路由鍵的模式格式. |
applicationId |
| Application ID - 如果模式包含 %X{applicationId},則將其添加到路由鍵 . |
senderPoolSize | 2 | 用于發(fā)布日志事件的線程數(shù)目. |
maxSenderRetries | 30 | 當(dāng)broker不可用時(shí)或有某些錯(cuò)誤時(shí),重試的次數(shù). 延時(shí)重試像: N ^ log(N) , N 表示重試次數(shù). |
addresses |
| 一個(gè)逗號(hào)分隔的broker地址列表: host:port[,host:port]* -覆蓋host 和 port . |
host | localhost | 要連接RabbitMQ的主機(jī). |
port | 5672 | |
virtualHost | / | 要連接的RabbitMQ虛擬主機(jī). |
username | guest | 要連接RabbitMQ的用戶. |
password | guest | 要連接RabbitMQ的用戶密碼. |
contentType | text/plain | 日志消息的content-type屬性 .
|
contentEncoding |
| 日志消息的content-encoding屬性. |
declareExchange | false | 當(dāng)appender啟動(dòng)時(shí),是否需要聲明配置的交換器.也可參考 durable 和autoDelete . |
durable | true | 當(dāng)declareExchange 為 true ,durable 標(biāo)志才會(huì)設(shè)置此值. |
autoDelete | false | 當(dāng) declareExchange 為true , auto delete 標(biāo)志才會(huì)設(shè)置此值. |
charset | null | 當(dāng)將字符串轉(zhuǎn)成byte[]時(shí)要使用的編碼,默認(rèn)為null (使用系統(tǒng)默認(rèn)字符集).如果當(dāng)前平臺(tái)上不支持此字符集,將回退到使用系統(tǒng)字符集. |
deliveryMode | PERSISTENT | PERSISTENT 或 NON_PERSISTENT 用于決定RabbitMQ是否應(yīng)該持久化消息. |
generateId | false | 用于確定messageId 屬性是否需要設(shè)置成唯一值. |
clientConnectionProperties | null | 一個(gè)逗號(hào)分隔的key:value 對(duì),它是連接RabbitMQ時(shí)設(shè)置的自定義客戶端屬性 |
3.2.2 Log4j Appender
樣例log4j.properties片斷.
log4j.appender.amqp.addresses=foo:5672,bar:5672
log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
log4j.appender.amqp.applicationId=myApplication
log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
log4j.appender.amqp.layout.ConversionPattern=%d %p %t [%c] - <%m>%n
log4j.appender.amqp.generateId=true
log4j.appender.amqp.charset=UTF-8
log4j.appender.amqp.durable=false
log4j.appender.amqp.deliveryMode=NON_PERSISTENT
log4j.appender.amqp.declareExchange=true
3.2.3 Log4j2 Appender
樣例 log4j2.xml 片斷.
<Appenders>
...
<RabbitMQ name="rabbitmq"
addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3" maxSenderRetries="5">
</RabbitMQ>
</Appenders>
3.2.4 Logback Appender
樣例 logback.xml 片斷.
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
</appender>
3.2.5 定制Messages
每個(gè)appenders都可以子類化,以允許你在發(fā)布前修改消息.
Customizing the Log Messages.
public class MyEnhancedAppender extends AmqpAppender {
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
}
3.2.6 定制客戶端屬性
簡(jiǎn)化 String 屬性
每個(gè)appender都支持在RabbitMQ連接中添加客戶端屬性.
log4j.
log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux
logback.
<appender name="AMQP"...>...
<clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
...</appender>
log4j2.
<Appenders>
...
<RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>
這些屬性是逗號(hào)分隔的key:value
隊(duì)列表; 鍵和值不能包含逗號(hào)或 冒號(hào).
當(dāng)RabbitMQ Admin UI中查看連接上,你會(huì)看到這些屬性.
Log4j和logback先進(jìn)技術(shù)
使用 log4j 和 logback appenders, appenders 可以是子類化的, 允許你在連接建立前,修改客戶連接屬性:
定制客戶端連接屬性.
public class MyEnhancedAppender extends AmqpAppender {
private String foo;
@Override
protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
clientProperties.put("foo", this.foo);
}
public void setFoo(String foo) {
this.foo = foo;
}
}
對(duì)于 log4j2, 添加 log4j.appender.amqp.foo=bar
到log4j.properties 來(lái)設(shè)置發(fā)展.
對(duì)于logback, 在logback.xml中添加 <foo>bar</foo>
.
當(dāng)然,對(duì)于像這個(gè)例子中簡(jiǎn)單的String 屬性,可以使用先前的技術(shù);
子類允許更豐富的屬性(如添加 Map
的numeric 屬性).
使用log4j2, 子類是不被支持的,因?yàn)?log4j2 使用靜態(tài)工廠方法.
3.3 樣例應(yīng)用程序
3.3.1 介紹
Spring AMQP Samples 項(xiàng)目包含了兩個(gè)樣例應(yīng)用程序. 第一個(gè)簡(jiǎn)單的"Hello World" 示例演示了同步和異步消息的處理. 它為理解基礎(chǔ)部分提供了一個(gè)很好的開端.
第二個(gè)基于股票交易的例子演示了真實(shí)應(yīng)用程序中的交互場(chǎng)景.在本章中,我們會(huì)每個(gè)示例進(jìn)行快速瀏覽,使您可以專注于最重要的組成部分.
這兩個(gè)例子都是基于Maven的,因此你可以直接將它們導(dǎo)入任何支持Maven的IDE中(如. SpringSource Tool Suite).
3.3.2 Hello World
介紹
Hello World示例演示了同步和異步消息處理.你可以導(dǎo)入spring-rabbit-helloworld 示例到IDE中并跟隨下面的討論.
同步例子
在src/main/java 目錄中,導(dǎo)航到org.springframework.amqp.helloworld 包中.
打開HelloWorldConfiguration 類,你可以注意到它包含了@Configuration 類級(jí)注解和一些@Bean 方法級(jí)注解.
這是Spring 的基于Java的配置.你可進(jìn)一步的了解here.
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =null;
connectionFactory =new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中同樣也包含了RabbitAdmin的實(shí)例
, 它會(huì)默認(rèn)查找類型為Exchange, Queue, 或 Binding的bean并在broker中進(jìn)行聲明.
事實(shí)上,"helloWorldQueue" bean是在HelloWorldConfiguration 中生成的,因?yàn)樗?Queue的實(shí)例.
@Bean
public Queue helloWorldQueue() {
returnnew Queue(this.helloWorldQueueName);
}
重看"rabbitTemplate"bean配置,你會(huì)看到它將helloWorldQueue的名稱設(shè)成了"queue"屬性(用于接收消息) 以及"routingKey" 屬性(用于發(fā)送消息).
現(xiàn)在,我們已經(jīng)探索了配置,讓我們看看實(shí)際上使用這些組件的代碼。
首先,從同一個(gè)包內(nèi)打開Producer類。它包含一個(gè)用于創(chuàng)建Spring ApplicationContext的main()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在上面的例子中你可以看到, 取回的AmqpTemplate用來(lái)發(fā)送消息.因?yàn)榭蛻舳舜a應(yīng)該盡可能地依賴于接口,因此類型是AmqpTemplate而不是RabbitTemplate.
即使在HelloWorldConfiguration中創(chuàng)建的bean是RabbitTemplate的實(shí)例,依賴于接口則意味著這端代碼更具有便攜性(portable) (配置可以獨(dú)立于代碼進(jìn)行修改).
因?yàn)閏onvertAndSend() 方法是通過(guò)模板來(lái)調(diào)用的,因此模板會(huì)將調(diào)用委派給它的MessageConverter實(shí)例.在這種情況下,它默認(rèn)使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中為"rabbitTemplate"指定其它的實(shí)現(xiàn).
現(xiàn)在打開Consumer類. 它實(shí)際上共享了同一個(gè)配置基類,這意味著它將共享"rabbitTemplate" bean. 這就是為什么我們要使用"routingKey" (發(fā)送) 和"queue" (接收)來(lái)配置模板的原因.
正如你在Section 3.1.4, “AmqpTemplate”中看到的,你可以代替在發(fā)送方法中傳遞routingKey參數(shù),代替在接收方法中傳遞queue 參數(shù). Consumer 代碼基本上是Producer的鏡子,只不過(guò)調(diào)用的是receiveAndConvert()而非convertAndSend()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
你如果運(yùn)行Producer,然后再運(yùn)行Consumer, 在控制臺(tái)輸出中,你應(yīng)該能看到消息"Received: Hello World"
異步示例
我們已經(jīng)講解了同步Hello World樣例, 是時(shí)候移動(dòng)到一個(gè)稍微先進(jìn),但更強(qiáng)大的選擇上了.稍微修改一下代碼,Hello World 樣例就可以可以提供異步接收的示例了,又名 Message-driven POJOs. 事實(shí)上,有一個(gè)子包明確地提供了這種功能: org.springframework.amqp.samples.helloworld.async.
再一次地我們將從發(fā)送端開始. 打開ProducerConfiguration類可注意到它創(chuàng)建了一個(gè)"connectionFactory"和"rabbitTemplate" bean.
這次,由于配置是專用于消息發(fā)送端,因此我們不需要任何隊(duì)列定義,RabbitTemplate只須設(shè)置routingKey屬性.
回想一下,消息是發(fā)送到交換器上的而不是直接發(fā)到隊(duì)列上的. AMQP默認(rèn)交換器是無(wú)名稱的direct類型交換器.
所有隊(duì)列都是通過(guò)使用它們的名稱作為路由鍵綁定到默認(rèn)交換器上的.這就是為什么在這里我們只提供路由鍵的原因.
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于這個(gè)示例展示的是異步消息處理,生產(chǎn)方設(shè)計(jì)為連續(xù)發(fā)送消息(盡管類似于同步版本中的 message-per-execution模型,但不太明顯,實(shí)際上它是消息驅(qū)動(dòng)消費(fèi)者)負(fù)責(zé)連續(xù)發(fā)送消息的組件是作為ProducerConfiguration類中的內(nèi)部類來(lái)定義的,每3秒執(zhí)行一次.
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
你不必要完全了解這些細(xì)節(jié),因?yàn)檎嬲年P(guān)注點(diǎn)是接收方(我們馬上就會(huì)講解).然而,如果你還熟悉Spring 3.0 任務(wù)調(diào)度支持,你可從here這里來(lái)了解.
簡(jiǎn)短故事是:在 ProducerConfiguration 中的"postProcessor" bean使用調(diào)度器來(lái)注冊(cè)了任務(wù).
現(xiàn)在,讓我們轉(zhuǎn)向接收方. 為強(qiáng)調(diào) Message-driven POJO 行為,將從對(duì)消息起反應(yīng)的組件開始.
此類被稱為HelloWorldHandler.
publicclass HelloWorldHandler {
publicvoid handleMessage(String text) {
System.out.println("Received: " + text);
}
}
相當(dāng)明顯的, 這是一個(gè)POJO. 它沒(méi)有繼承任何基類,它沒(méi)有實(shí)現(xiàn)任何接口,它甚至不包含任何導(dǎo)入. 它將通過(guò)Spring AMQP MessageListenerAdapter來(lái)適配MessageListener接口.然后適配器可配置在SimpleMessageListenerContainer上.
在這個(gè)例子中,容器是在ConsumerConfiguration類中創(chuàng)建的.你可以看到POJO是包裝在適配器中的.
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer是一個(gè)Spring生命周期組件,默認(rèn)會(huì)自動(dòng)啟動(dòng).如果你看了Consumer類的內(nèi)部,你會(huì)看到main()方法中除了一行啟動(dòng)創(chuàng)建ApplicationContext的代碼外,其它什么都沒(méi)有.
Producer的main()方法也只有一行啟動(dòng),因?yàn)橐?@Scheduled注解的組件會(huì)自動(dòng)開始執(zhí)行.你可以任何順序來(lái)啟動(dòng)Producer 和Consumer,你會(huì)看每秒就會(huì)發(fā)送消息和接收到消息.
3.3.3 股票交易(Stock Trading)
Stock Trading 示例演示了比Hello World示例更高級(jí)的消息場(chǎng)景.然而,配置卻是很相似的 - 只是有一點(diǎn)復(fù)雜.
由于我們已經(jīng)詳細(xì)講解了Hello World配置,因此在這里我們將重點(diǎn)關(guān)注不一樣的東西. 有一個(gè)服務(wù)器發(fā)送市場(chǎng)數(shù)據(jù)(股票報(bào)價(jià))到Topic交換器中.
然后,客戶端可訂閱市場(chǎng)數(shù)據(jù),即通過(guò)使用路由模式(如. "app.stock.quotes.nasdaq.*")來(lái)綁定隊(duì)列(e.g. "app.stock.quotes.nasdaq.*").
這個(gè)例子的其它主要功能是 有一個(gè)請(qǐng)求回復(fù)“股票交易”的互動(dòng),它是由客戶發(fā)起并由服務(wù)器來(lái)處理的. 這涉及到一個(gè)私有的“回復(fù)(replyTo)”隊(duì)列,發(fā)送客戶端的信息在請(qǐng)求消息中。
服務(wù)器的核心配置在RabbitServerConfiguration類中(位于 org.springframework.amqp.rabbit.stocks.config.server 包中).
它繼承了 AbstractStockAppRabbitConfiguration. 這是服務(wù)器和客戶端定義常用資源的地方,包括市場(chǎng)數(shù)據(jù)Topic交換器(其名稱為app.stock.marketdata) 以及服務(wù)器公開股票交易的隊(duì)列(其名稱為app.stock.request).
在那個(gè)公共配置文件中,你會(huì)看到在RabbitTemplate上配置了一個(gè)JsonMessageConverter.
服務(wù)器特有配置由2部分組成.首先,它在RabbitTemplate上配置了市場(chǎng)數(shù)據(jù)交換器,這樣在發(fā)送消息時(shí),就不必提供交換器名稱.它是通過(guò)基礎(chǔ)配置類中的抽象回調(diào)方法中定義做到這一點(diǎn)的.
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次, 聲明了股票請(qǐng)求隊(duì)列.在這里,它不需要任何明確的綁定,因?yàn)樗鼘⒁运约旱拿Q作為路由鍵來(lái)綁定到無(wú)名稱的默認(rèn)交換器上.正如先前提到的,AMQP規(guī)范定義了此種行為.
@Beanpublic Queue stockRequestQueue() {
returnnew Queue(STOCK_REQUEST_QUEUE_NAME);
}
現(xiàn)在你已經(jīng)看過(guò)了服務(wù)器的AMQP資源配置,導(dǎo)航到src/test/java目錄下的org.springframework.amqp.rabbit.stocks包.在那里你會(huì)實(shí)際的 提供了main()方法的Server類.
它基于server-bootstrap.xml 創(chuàng)建了一個(gè)ApplicationContext.在那里,你會(huì)看到發(fā)布虛假市場(chǎng)數(shù)據(jù)的調(diào)度任務(wù).
那個(gè)配置依賴于Spring 3.0的"task"命名空間支持.bootstrap配置文件也導(dǎo)入了其它一些文件.最令人關(guān)注的是位于src/main/resources目錄下的server-messaging.xml.在那里,你會(huì)看到"messageListenerContainer" bean,它負(fù)責(zé)處理股票交易請(qǐng)求.
最后在看一下定義在src/main/resources目錄下的server-handlers.xml,其中定義了一個(gè) "serverHandler" bean.這個(gè)bean是ServerHandler類的實(shí)例,它是Message-driven POJO 的好例子,它也有發(fā)送回復(fù)消息的能力.
注意,它自身并沒(méi)有與框架或任何AMQP概念耦合.它只是簡(jiǎn)單地接受TradeRequest并返回一個(gè)TradeResponse.
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
現(xiàn)在我們已經(jīng)看了服務(wù)端的重要配置和代碼,讓我們轉(zhuǎn)向客戶端.最佳起點(diǎn)是從 org.springframework.amqp.rabbit.stocks.config.client 包下的RabbitClientConfiguration開始.
注意,它聲明了兩個(gè)不帶明確參數(shù)的隊(duì)列.
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
那些是私有隊(duì)列, 唯一名稱會(huì)自動(dòng)自成.客戶端會(huì)用第一個(gè)生成的隊(duì)列來(lái)綁定由服務(wù)端公開的市場(chǎng)交換器.
記住在AMQP中,消費(fèi)者與隊(duì)列交互,而生產(chǎn)者與交換器交互. 隊(duì)列和交換器之間的綁定指示broker從給定的交換器中投遞或路由什么消息給隊(duì)列.
由于市場(chǎng)交換器是一個(gè)Topic交換器,綁定可通過(guò)路由正則表達(dá)式來(lái)表達(dá).
RabbitClientConfiguration聲明了一個(gè)Binding對(duì)象,其對(duì)象是通過(guò)BindingBuilder的便利API來(lái)生成的.
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意,實(shí)際值已經(jīng)在屬性文件(src/main/resources目錄下的"client.properties")中外部化了,因此我們使用Spring的@Value 注解來(lái)注入值.這通常是一個(gè)好主意,否則值就會(huì)硬編碼在類中,沒(méi)有修改就沒(méi)有重新編譯.
在這種情況下,通過(guò)修改綁定中的路由正則表達(dá)式,可很容易地運(yùn)行多個(gè)版本的Client.讓我們立即嘗試.
啟動(dòng)運(yùn)行org.springframework.amqp.rabbit.stocks.Server然后再運(yùn)行 org.springframework.amqp.rabbit.stocks.Client.你將會(huì)看到NASDAQ股票的交易報(bào)價(jià),因?yàn)殛P(guān)聯(lián)stocks.quote.pattern 鍵的值在client.properties中是app.stock.quotes.nasdaq.
現(xiàn)在,保持現(xiàn)有Server 和Client 運(yùn)行,將其屬性值修改為app.stock.quotes.nyse.再啟動(dòng)第二個(gè)Client實(shí)例.你會(huì)看到第一個(gè)client仍然接收NASDAQ 報(bào)價(jià),而第二個(gè)client接收的NYSE報(bào)價(jià). 你可以改變模式,獲取所有的股票報(bào)價(jià)或個(gè)別股票的報(bào)價(jià)。
最后一個(gè)我們將暴露的特性是從客戶端的角度來(lái)看待請(qǐng)求-回復(fù)交互.記住我們已經(jīng)看了ServerHandler,它會(huì)接受TradeRequest對(duì)象并返回TradeResponse對(duì)象. 客戶端相應(yīng)的代碼是 RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway 包).為發(fā)送消息,它會(huì)委派給RabbitTemplate.
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
thrownew AmqpException(e);
}
return message;
}
});
}
注意,在發(fā)送消息前,它設(shè)置了"replyTo"地址. 這提供了隊(duì)列,此隊(duì)列是由上面的"traderJoeQueue" bean 定義生成的. 以下是StockServiceGateway類的@Bean定義.
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果你沒(méi)有運(yùn)行服務(wù)器和客戶端,現(xiàn)在就啟動(dòng)它們. 嘗試使用100 TCKR的格式來(lái)發(fā)送請(qǐng)求.經(jīng)過(guò)一個(gè)簡(jiǎn)短的人工延遲來(lái)模擬“處理”請(qǐng)求,你應(yīng)該看到一個(gè)確認(rèn)消息出現(xiàn)在客戶端上。
3.4 測(cè)試支持
3.4.1 介紹
為異步程序?qū)懠蓽y(cè)試比測(cè)試簡(jiǎn)單程序更復(fù)雜. 當(dāng)引入了@RabbitListener這樣的注解時(shí),這尤其更加復(fù)雜.
現(xiàn)在的問(wèn)題是發(fā)送消息后,如何來(lái)驗(yàn)證, 監(jiān)聽器按預(yù)期收到了消息.
框架自身帶有許多單元測(cè)試和集成測(cè)試;有些使用mocks, 另外一些使用真實(shí)的RabbitMQ broker來(lái)集成測(cè)試. 您可以參照測(cè)試場(chǎng)景的一些想法進(jìn)行測(cè)試。
Spring AMQP 1.6版本引入了sring-rabbit-test
jar ,它提供一些測(cè)試復(fù)雜場(chǎng)景的測(cè)試. 預(yù)計(jì)這一項(xiàng)目將隨著時(shí)間的推移進(jìn)行擴(kuò)展,但我們需要社會(huì)反饋以幫助測(cè)試。請(qǐng)使用JIRA問(wèn)題或GitHub提供這樣的反饋。
3.4.2 Mockito Answer<?> 實(shí)現(xiàn)
當(dāng)前有兩個(gè)Answer<?>
實(shí)現(xiàn)可幫助測(cè)試:
第一個(gè), LatchCountDownAndCallRealMethodAnswer
提供了返回null和計(jì)數(shù)下一個(gè)鎖存器的Answer<Void>
.
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());
...
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
第二個(gè), LambdaAnswer<T>
提供了一種調(diào)用真正方法的機(jī)制,并提供機(jī)會(huì)來(lái)返回定制結(jié)果(基于InvocationOnMock和結(jié)果
).
public class Foo {
public String foo(String foo) {
return foo.toUpperCase();
}
}
Foo foo = spy(new Foo());
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(foo).foo(anyString());
assertEquals("FOOFOO", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(foo).foo(anyString());
assertEquals("FOOfoo", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(foo).foo(anyString());
assertEquals("foofoo", foo.foo("foo"));
When using Java 7 or earlier:
doAnswer(new LambdaAnswer<String>(true, new ValueToReturn<String>() {
@Overridepublic String apply(InvocationOnMock i, String r) {
return r + r;
}
})).when(foo).foo(anyString());
3.4.3 @RabbitListenerTest and RabbitListenerTestHarness
在你的@Configuration
類中使用 @RabbitListenerTest
(它也會(huì)通過(guò)@EnableRabbit來(lái)啟用@RabbitListener
探測(cè)).注解會(huì)導(dǎo)致框架使用子類RabbitListenerTestHarness來(lái)代替標(biāo)準(zhǔn)RabbitListenerAnnotationBeanPostProcessor.
RabbitListenerTestHarness
通過(guò)兩種方式來(lái)增強(qiáng)監(jiān)聽器 - 將其包裝進(jìn)Mockito Spy
, 啟用了Mockito
存根和驗(yàn)證操作.也可在監(jiān)聽器添加Advice
來(lái)啟用對(duì)參數(shù),結(jié)果或異常的訪問(wèn).
你可以控制哪一個(gè)(或兩個(gè))來(lái)在@RabbitListenerTest上啟用屬性. 后者用于訪問(wèn)調(diào)用中更為低級(jí)數(shù)據(jù)- 它也支持測(cè)試線程阻塞,直到異步監(jiān)聽器被調(diào)用.
重要
final
@RabbitListener
不能被發(fā)現(xiàn)或通知 ,同時(shí),只有帶id屬性的監(jiān)聽器才能發(fā)現(xiàn)或通知.
讓我們看一些例子.
使用spy:
@Configuration
@RabbitListenerTest
public class Config {
@Bean
public Listener listener() {
returnnew Listener();
}
...
}
public class Listener {
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness;
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
Listener listener = this.harness.getSpy("foo");
assertNotNull(listener);
verify(listener).foo("foo");
}
@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer).when(listener).foo(anyString(), anyString());
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}
}

| 將harness 注入進(jìn)測(cè)試用于,這樣我們可訪問(wèn)spy. |

| 獲取spy引用,這樣我們可以驗(yàn)證是否按預(yù)期在調(diào)用. 由于這是一個(gè)發(fā)送和接收操作,因此不必暫停測(cè)試線程,因?yàn)?code style="font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; color: #6d180b; background-color: inherit;">RabbitTemplate 在等待回復(fù)時(shí)已經(jīng)暫停過(guò)了. |

| 在這種情況下,我們只使用了發(fā)送操作,因?yàn)槲覀冃枰粋€(gè)門閂來(lái)等待對(duì)容器線程中監(jiān)聽器的異步調(diào)用. 我們使用了Answer<?> 一個(gè)實(shí)現(xiàn)來(lái)幫助完成. |

| 配置spy來(lái)調(diào)用Answer . |
使用捕獲建議:
@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {
}
@Service
public class Listener {
private boolean failed;
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
thrownew RuntimeException(foo);
}
failed = false;
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; 
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS);
assertThat(invocationData.getArguments()[0], equalTo("foo"));
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}
@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage());
}
}

| 將harness注入進(jìn)測(cè)試用例,以便我們能獲取spy的訪問(wèn). |

| 使用 harness.getNextInvocationDataFor() 來(lái)獲取調(diào)用數(shù)據(jù) - 在這種情況下,由于它處于request/reply 場(chǎng)景,因?yàn)闆](méi)有必要等待,因?yàn)闇y(cè)試線程在RabbitTemplate 中等待結(jié)果的時(shí)候,已經(jīng)暫停過(guò)了. |

| 我們可以驗(yàn)證參數(shù)和結(jié)果是否與預(yù)期一致 |

| 這次,我們需要時(shí)間來(lái)等待數(shù)據(jù),因?yàn)樗谌萜骶€程上是異步操作,我們需要暫停測(cè)試線程. |

| 當(dāng)監(jiān)聽器拋出異常時(shí),可用調(diào)用數(shù)據(jù)中的throwable 屬性 |
4. Spring 整合- 參考
這部分參考文檔提供了在Spring集成項(xiàng)目中提供AMQP支持的快速介紹.
4.1 Spring 整合AMQP支持4.1.1 介紹
Spring Integration 項(xiàng)目包含了構(gòu)建于Spring AMQP項(xiàng)目之上的AMQP 通道適配器(Channel Adapters)和網(wǎng)關(guān)(Gateways). 那些適配器是在Spring集成項(xiàng)目中開發(fā)和發(fā)布的.在Spring 集成中, "通道適配器" 是單向的,而網(wǎng)關(guān)是雙向的(請(qǐng)求-響應(yīng)).
我們提供了入站通道適配器(inbound-channel-adapter),出站通道適配器( outbound-channel-adapter), 入站網(wǎng)關(guān)(inbound-gateway),以及出站網(wǎng)關(guān)(outbound-gateway).
由于AMQP 適配器只是Spring集成版本的一部分,因?yàn)槲臋n也只針對(duì)Spring集成發(fā)行版本部分可用.
作為一個(gè)品酒師,我們只快速了解這里的主要特征。
4.1.2 入站通道適配器
為了從隊(duì)列中接收AMQP消息,需要配置一個(gè)個(gè)<inbound-channel-adapter>
<amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.3 出站通道適配器
為了向交換器發(fā)送AMQP消息,需要配置一個(gè)<outbound-channel-adapter>. 除了交換名稱外,還可選擇提供路由鍵。
<amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
4.1.4 入站網(wǎng)關(guān)
為了從隊(duì)列中接收AMQP消息,并回復(fù)到它的reply-to地址,需要配置一個(gè)<inbound-gateway>.
<amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.5 出站網(wǎng)關(guān)
為了向交換器發(fā)送AMQP消息并接收來(lái)自遠(yuǎn)程客戶端的響應(yīng),需要配置一個(gè)<outbound-gateway>.
除了交換名稱外,還可選擇提供路由鍵。
<amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
除了這份參考文檔,還有其它資源可幫助你了解AMQP.
5.1 進(jìn)階閱讀
對(duì)于那些不熟悉AMQP的人來(lái)說(shuō), 規(guī)范 實(shí)際上具有相當(dāng)?shù)目勺x性.
這當(dāng)然是信息的權(quán)威來(lái)源,對(duì)于熟悉規(guī)范的人來(lái)說(shuō),Spring AMQP代碼應(yīng)該很容易理解。
目前RabbitMQ實(shí)現(xiàn)基于2.8.x版本,并正式支持AMQP 0.8和9.1。我們推薦閱讀9.1文檔。
在RabbitMQ Getting Started 頁(yè)面上,還有許多精彩的文章,演示, 博客. 因?yàn)楫?dāng)前只有Spring AMQP實(shí)現(xiàn), 但我們?nèi)越ㄗh將其作為了解所有中間件相關(guān)概念的起點(diǎn).