<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    3.1.15 消息監(jiān)聽器容器配置

    有相當(dāng)多的配置SimpleMessageListenerContainer 相關(guān)事務(wù)和服務(wù)質(zhì)量的選項(xiàng),它們之間可以互相交互.當(dāng)使用命名空間來(lái)配置<rabbit:listener-container/>時(shí),

    下表顯示了容器屬性名稱和它們等價(jià)的屬性名稱(在括號(hào)中).

    未被命名空間暴露的屬性,以`N/A`表示.
    Table 3.3. 消息監(jiān)聽器容器的配置選項(xiàng)
    屬性
    (group)
    描述

    只在使用命名空間時(shí)可用. 當(dāng)經(jīng)過(guò)指定時(shí),類型為Collection<MessageListenerContainer> 的bean會(huì)使用它這個(gè)名稱進(jìn)行注冊(cè),容器會(huì)將每個(gè)<listener/> 元素添加到集合中.這就允許,如,通過(guò)迭代集合來(lái)啟動(dòng)/停止該組容器.如果多個(gè)<listener-container/> 元素有相同的group值, 那么集合中的容器是所有指定容器的總和.
    屬性

    channelTransacted(channel-transacted)

    描述

    Boolean標(biāo)志,用于表示在事務(wù)中的所有消息都應(yīng)該應(yīng)答(手動(dòng)地或自動(dòng)地)

    屬性

    acknowledgeMode(acknowledge)

    描述

    • NONE: 不發(fā)送應(yīng)答(與channelTransacted=true不兼容). RabbitMQ 稱此為 "autoack",因?yàn)閎roker假設(shè)消費(fèi)者沒(méi)有采取任何動(dòng)作應(yīng)答了所有消息.
    • MANUAL:監(jiān)聽器必須調(diào)用Channel.basicAck()來(lái)手動(dòng)應(yīng)答所有消息.
    • AUTO :容器會(huì)自動(dòng)應(yīng)答所有消息, 除非MessageListener 拋出了異常. 注意acknowledgeMode 與channelTransacted 是互補(bǔ)的- 如果通道是事務(wù)的,那么broker除了ack外,還需要提交通知. 這是默認(rèn)模式. 也可參考txSize.

    屬性

    transactionManager(transaction-manager)

    描述

    監(jiān)聽器操作的外部事務(wù)管理器. 也是與channelTransacted互補(bǔ)的 -如果通道是事務(wù)的,那么其事務(wù)會(huì)用外部事務(wù)來(lái)同步.

    屬性

    prefetchCount(prefetch)

    描述

    可接受來(lái)自broker一個(gè)socket幀中的消息數(shù)目. 數(shù)值越大,消息分發(fā)速度就越快, 但無(wú)序處理的風(fēng)險(xiǎn)更高. 
    如果acknowledgeMode為NONE它會(huì)忽略. 它會(huì)增長(zhǎng),如果有必要,須匹配txSize.

    屬性

    shutdownTimeout(N/A)

    描述
    當(dāng)容器關(guān)閉時(shí)(例如. 關(guān)閉ApplicationContext),用于等待正在傳輸消息的上限時(shí)間.默認(rèn)是5秒. 當(dāng)達(dá)到上限時(shí),如果通道是非事務(wù)的,消息將被丟棄.

    屬性

    txSize(transaction-size)

    描述
    當(dāng)acknowledgeMode 為AUTO時(shí),在發(fā)送ack前(等待每一個(gè)消息達(dá)到接收超時(shí)設(shè)置),容器將試圖處理這個(gè)數(shù)目的消息 . 當(dāng)事務(wù)通道提交后也是一樣的.如果prefetchCount 小于txSize,prefetchCount 會(huì)增長(zhǎng)以匹配txSize.

    屬性
    receiveTimeout(receive-timeout)
    描述
    等待消息的最大時(shí)間.如果acknowledgeMode=NONE 這只有很小的效果 - 容器只旋轉(zhuǎn)一輪,并要求另一個(gè)消息. 當(dāng)在txSize>1的事務(wù)通道中有最大效果,因?yàn)樗軐?dǎo)致已經(jīng)消費(fèi)但沒(méi)有應(yīng)答的消息直接超時(shí)過(guò)期.

    屬性

    autoStartup(auto-startup)

    描述

    用于當(dāng)ApplicationContext啟動(dòng)時(shí)(作為SmartLifecycle 回調(diào)的一部分,發(fā)生在所有bean初始化之后)是否同時(shí)啟動(dòng)容器的標(biāo)志.默認(rèn)為true,如果在容器啟動(dòng)時(shí),中間件暫不可用,那么可將其設(shè)為false,隨后在確認(rèn)中間件已啟動(dòng)后,手動(dòng)調(diào)用start() 方法來(lái)啟動(dòng).

    屬性

    phase(phase)

    描述

    當(dāng)autoStartup為true時(shí),容器中的生命周期階段應(yīng)該啟動(dòng)和停止.值越小,容器就會(huì)越早啟動(dòng),并越晚停止.默認(rèn)值Integer.MAX_VALUE,這意味著容器會(huì)越晚啟動(dòng)并盡快停止.

    屬性

    adviceChain(advice-chain)

    描述

    應(yīng)用于監(jiān)聽器執(zhí)行路徑上的AOP Advice數(shù)組. 它可用于額外的橫切關(guān)注點(diǎn),如broker死亡事件中的自動(dòng)重試. 
    注意,只要broker還活著,出現(xiàn)AMQP錯(cuò)誤后的重新連接是由CachingConnectionFactory來(lái)處理的.

    屬性

    taskExecutor(task-executor)

    描述

    執(zhí)行監(jiān)聽器調(diào)用程序的Spring TaskExecutor引用(或標(biāo)準(zhǔn)JDK 1.5+ Executor). 默認(rèn)是 SimpleAsyncTaskExecutor, 用于內(nèi)部管理線程.

    屬性

    errorHandler(error-handler)

    描述

    在MessageListener執(zhí)行期間,用于處理未捕獲異常的ErrorHandler策略的引用. 默認(rèn)是 ConditionalRejectingErrorHandler.

    屬性

    concurrentConsumers(concurrency)

    描述
    每個(gè)監(jiān)聽器上初始啟動(dòng)的并發(fā)消費(fèi)者數(shù)目. 參考Section 3.1.16, “Listener Concurrency”.

    屬性

    axConcurrentConsumers(max-concurrency)

    描述

    啟動(dòng)并發(fā)消費(fèi)者的最大數(shù)目,如果有必要,可以按需設(shè)置.必須要大于或等于concurrentConsumers

    參考Section 3.1.16, “Listener Concurrency”.

    屬性

    startConsumerMinInterval(min-start-interval)

    描述
    啟動(dòng)新消費(fèi)者之間的時(shí)間間隔,單位為毫秒. 
    參考 Section 3.1.16, “Listener Concurrency”. 默認(rèn) 10000 (10 秒).

    屬性

    stopConsumerMinInterval(min-stop-interval)

    描述

    停止消費(fèi)者的時(shí)間間隔, 由于最后一個(gè)消費(fèi)者已經(jīng)停止了,這時(shí)可以檢測(cè)到空閑消費(fèi)者.

    參考Section 3.1.16, “Listener Concurrency”. 默認(rèn) 60000 (1 分鐘).

    屬性

    consecutiveActiveTrigger(min-consecutive-active)

    描述

    消費(fèi)者收到連續(xù)消息的最小數(shù)量,當(dāng)考慮啟動(dòng)一個(gè)新的消費(fèi)者,接收不會(huì)發(fā)生超時(shí)。也會(huì)受txsize影響參考 Section 3.1.16, “Listener Concurrency”. 默認(rèn)為10.

    屬性

    consecutiveIdleTrigger(min-consecutive-idle)

    描述

    在考慮停止一個(gè)消費(fèi)者,消費(fèi)者必須經(jīng)歷的最小接收超時(shí)時(shí)間,也會(huì)受txsize影響

    參考 Section 3.1.16, “Listener Concurrency”. 默認(rèn)為10.

    屬性

    connectionFactory(connection-factory)

    描述
    connectionFactory的引用; 當(dāng)使用XML命名空間配置時(shí),默認(rèn)引用bean名稱是"rabbitConnectionFactory".

    屬性

    defaultRequeueRejected(requeue-rejected)

    描述

    用以確定因監(jiān)聽器拋出異常而遭受拒絕的消息是否需要重新入列. 默認(rèn)為true.
    屬性
    recoveryInterval(recovery-interval)
    描述
    如果消費(fèi)者不是因致命原因而導(dǎo)致啟動(dòng)失敗,則用于設(shè)置重啟消費(fèi)者的時(shí)間間隔,單位毫秒. 默認(rèn)為5000.與recoveryBackOff互斥.
    屬性
    recoveryBackOff(recovery-back-off)
    描述
    果消費(fèi)者不是因致命原因而導(dǎo)致啟動(dòng)失敗,則用于指定 BackOff 啟動(dòng)消費(fèi)者的時(shí)間間隔. 默認(rèn)是每5秒無(wú)限重試的FixedBackOff. 與recoveryInterval互斥.
    屬性
    exclusive(exclusive)
    描述
    用于確定容器中的單個(gè)消費(fèi)者是否具有獨(dú)占訪問(wèn)隊(duì)列的權(quán)限。當(dāng)其值為1時(shí),容器的concurrency必須為1時(shí)。如果另一個(gè)消費(fèi)者有獨(dú)占訪問(wèn)權(quán),容器將根據(jù)恢復(fù)時(shí)間間隔或恢復(fù)后退試圖恢復(fù)消費(fèi)者。
    當(dāng)使用命名空間時(shí),此屬性會(huì)隨著隊(duì)列名稱出現(xiàn)在<rabbit:listener/>元素中。默認(rèn)為false。
    屬性
    rabbitAdmin(admin)
    描述
    監(jiān)聽器監(jiān)聽了多個(gè)自動(dòng)刪除隊(duì)列時(shí),當(dāng)其發(fā)現(xiàn)在啟動(dòng)時(shí)隊(duì)列消失了,容器會(huì)使用RabbitAdmin 來(lái)聲明消失的隊(duì)列,并進(jìn)行交換器的相關(guān)綁定.
    如果此元素配置成使用條件聲明(參考
    the section called “Conditional Declaration”), 容器必須使用配置的admin來(lái)聲明那些元素.
    這里指定的admin;只在使用帶有條件聲明的自動(dòng)刪除隊(duì)列時(shí)才需要. 如果你不想在容器啟動(dòng)前聲明自動(dòng)刪除隊(duì)列,可在amdin中將 
    auto-startup 設(shè)為false. 默認(rèn)情況下,RabbitAdmin 會(huì)聲明所有非條件元素.
    屬性
    missingQueuesFatal(missing-queues-fatal)
    描述

    從1.3.5版本開始,SimpleMessageListenerContainer 就有了這個(gè)新屬性.

    當(dāng)設(shè)為true (默認(rèn)值)時(shí),如果配置隊(duì)列在中間件都不可用, 這會(huì)視為是致命的.這會(huì)導(dǎo)致應(yīng)用程序上下文初始化失敗; 同時(shí), 當(dāng)容器還在運(yùn)行時(shí)刪除了隊(duì)列,也會(huì)發(fā)生這樣的情況.
    默認(rèn)情況下,消費(fèi)者進(jìn)行3次重試來(lái)連接隊(duì)列(5秒時(shí)間間隔),如果所有嘗試都失敗了則會(huì)停止容器.

    在以前版本中,此選項(xiàng)是不可配置的.

    當(dāng)設(shè)置為false, 再做了三次重試后,容器將進(jìn)入恢復(fù)模式, 這也伴隨其它問(wèn)題,如中間件已經(jīng)發(fā)生了故障.

    容器會(huì)根據(jù)recoveryInterval 屬性來(lái)嘗試恢復(fù). 在每次恢復(fù)嘗試期間,每個(gè)消費(fèi)者會(huì)以5秒的時(shí)間間隔來(lái)嘗試4次被動(dòng)聲明. 這個(gè)過(guò)程將無(wú)限期地繼續(xù)下去(譯者注:有點(diǎn)沖突)。

    你也可以使用properties bean來(lái)為所有的容器全局設(shè)置屬性,如下所示:

    <util:properties id="spring.amqp.global.properties">
    <prop key="smlc.missing.queues.fatal">false</prop>
    </util:properties>

    如果容器明確的設(shè)置了 missingQueuesFatal 屬性,全局屬性的值對(duì)此容器將無(wú)效.

    默認(rèn)的retry屬性(5秒間隔3次重試)可通過(guò)下面的屬性值來(lái)覆蓋.

    屬性
    mismatchedQueuesFatal(mismatched-queues-fatal)
    描述

    這是1.6版本中加入的新屬性.當(dāng)容器啟動(dòng)時(shí),如果此屬性為true (默認(rèn)為false), 容器會(huì)檢查上下文中聲明的隊(duì)列是否中間件中存在的隊(duì)列是否一致.

    如果屬性不匹配(如. auto-delete) 或參數(shù) (e.g. x-message-ttl) 存在, 容器 (和應(yīng)用程序上下文) 會(huì)拋出致命異常而導(dǎo)致啟動(dòng)失敗.如果是在恢復(fù)期間檢測(cè)到的問(wèn)題,容器會(huì)停止.

    必須在上下文中存在單個(gè)RabbitAdmin (或使用rabbitAdmin屬性在容器上特別配置);否則此屬性必須為false.

    如果在初始啟動(dòng)期間,中間件還不可用,容器啟動(dòng)后,當(dāng)建立連接時(shí)會(huì)檢查條件.

    重要

    該檢查針對(duì)的是上下文的所有隊(duì)列,而不僅僅是特定監(jiān)聽器配置使用的隊(duì)列.如果你希望只檢查容器使用的隊(duì)列,你需要為這個(gè)容器配置單獨(dú)的RabbitAdmin , 并使用rabbitAdmin 屬性為其提供一個(gè)引用. 

    參考“Conditional Declaration”章節(jié)來(lái)了解更多信息.

    屬性

    autoDeclare(auto-declare)
    描述

    從1.4版本開始, SimpleMessageListenerContainer 引入了這個(gè)新屬性.

    當(dāng)設(shè)置為true 時(shí)(默認(rèn)值),容器會(huì)使用RabbitAdmin 來(lái)重新聲明所有 AMQP 對(duì)象(Queues, Exchanges, Bindings).
    如果在啟動(dòng)期間探測(cè)到至少有一個(gè)隊(duì)列缺失了,可能因?yàn)樗亲詣?dòng)刪除隊(duì)列或過(guò)期隊(duì)列,但不管隊(duì)列缺失是基于什么原因,重新聲明仍會(huì)進(jìn)行處理(譯者注:太浪費(fèi)了).
    要禁用這種行為, 可設(shè)置其屬性為false. 但需要注意的是,如果所有隊(duì)列都缺失了(譯者注:全部還是部分),容器會(huì)啟動(dòng)失敗.

    在1.6版本之前,如果在上下文中存在多個(gè)admin,容器會(huì)隨機(jī)選擇一個(gè).反之,如果沒(méi)有admin,它會(huì)從內(nèi)部創(chuàng)建一個(gè).
    無(wú)論是哪種情況,這都將導(dǎo)致非預(yù)期結(jié)果出現(xiàn)
    . 從1.6版本開始,為了能使autoDeclare 工作,必須要上下文中明確存在一個(gè)RabbitAdmin,或者特定實(shí)例的引用必須要在容器中使用rabbitAdmin屬性中配置.

    屬性

    declarationRetries(declaration-retries)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.

    用于設(shè)置被動(dòng)聲明失敗時(shí),重新嘗試的次數(shù).被動(dòng)聲明發(fā)生在當(dāng)消費(fèi)者啟動(dòng)了或從多個(gè)隊(duì)列中消費(fèi)時(shí),初始化期間部分隊(duì)列還不可用的情況下.
    當(dāng)重試次數(shù)用完后,如果還是不能被動(dòng)聲明配置隊(duì)列,那么上面的missingQueuesFatal屬性將控制容器行為. 默認(rèn): 3次重試 (4 次嘗試).

    屬性

    failedDeclarationRetryInterval(failed-declaration-retry-interval)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.

    重新嘗試被動(dòng)聲明的時(shí)間間隔. 被動(dòng)聲明發(fā)生在當(dāng)消費(fèi)者啟動(dòng)了或從多個(gè)隊(duì)列中消費(fèi)時(shí),初始化期間部分隊(duì)列還不可用的情況下. 默認(rèn): 5000 (5秒).

    屬性

    retryDeclarationInterval(missing-queue-retry-interval)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.

    如果配置隊(duì)列的一個(gè)子集在消費(fèi)者初始化過(guò)程中可用,則消費(fèi)者將從這些隊(duì)列中開始消費(fèi)。消費(fèi)者將被動(dòng)地使用此間隔聲明丟失的隊(duì)列。

    當(dāng)這個(gè)間隔過(guò)去后,會(huì)再次使用declarationRetries 和 failedDeclarationRetryInterval

    如果還有缺失隊(duì)列,消費(fèi)者在重新嘗試之前會(huì)等待此時(shí)間間隔. 

    這個(gè)過(guò)程會(huì)不停地進(jìn)行到所有隊(duì)列可用. 默認(rèn): 60000 (1分鐘).

    屬性

    consumerTagStrategy(consumer-tag-strategy)
    描述

    從1.4.5版本開始,SimpleMessageListenerContainer 有了這個(gè)新屬性. 命名空間屬性在1.5.x中可用.

    之間,只能使用中間件生成的consumer tags;盡管現(xiàn)在這仍是默認(rèn)的配置,但現(xiàn)在你可以提供一個(gè)ConsumerTagStrategy的實(shí)現(xiàn), 這樣就可為每個(gè)消費(fèi)者創(chuàng)建獨(dú)特的tag.

    屬性

    idleEventInterval(idle-event-integer)
    描述

    從1.6版本開始,SimpleMessageListenerContainer 有了這個(gè)新屬性. 

    參考"Detecting Idle Asynchronous Consumers"章節(jié).

    3.1.16 監(jiān)聽器并發(fā)

    默認(rèn)情況下,監(jiān)聽器容器會(huì)啟動(dòng)單個(gè)消費(fèi)者來(lái)接收隊(duì)列中的消息.

    當(dāng)檢查前面章節(jié)中的表格時(shí),你會(huì)發(fā)現(xiàn)有許多屬性可控制并發(fā).最簡(jiǎn)單的是concurrentConsumers, 它會(huì)創(chuàng)建固定數(shù)量的消費(fèi)者來(lái)并發(fā)處理消息.

    在1.3.0版本之前,這只能在容器停止時(shí)才可設(shè)置.

    從1.3.0版本開始,你可以動(dòng)態(tài)調(diào)整 concurrentConsumers 屬性.如果容器運(yùn)行時(shí)修改了,會(huì)根據(jù)新設(shè)置來(lái)調(diào)需要的消費(fèi)者(添加或刪除).

    此外,在容器中添加了一個(gè)新屬性 maxConcurrentConsumers 來(lái)基于工作負(fù)載來(lái)動(dòng)態(tài)調(diào)整并發(fā)數(shù). 

    它可與其它四個(gè)屬性一起工作: consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval.
    在默認(rèn)設(shè)置的情況下,加大消費(fèi)者的算法如下:

    如果還沒(méi)有達(dá)到maxConcurrentConsumers ,如果現(xiàn)有消費(fèi)者活動(dòng)了10個(gè)連續(xù)周期且離最后消費(fèi)者啟動(dòng)至少消逝了10秒鐘,那么將啟動(dòng)新的消費(fèi)者. 如果消費(fèi)者在txSize * receiveTimeout 毫秒內(nèi)至少收到一個(gè)消息,那么就認(rèn)為此消費(fèi)者是活動(dòng)的.

    在默認(rèn)設(shè)置的情況下,減少消費(fèi)者的算法如下:

    如果有超過(guò)concurrentConsumers 數(shù)量的消費(fèi)者在運(yùn)行,且檢測(cè)到消費(fèi)者連續(xù)超時(shí)(空閑)了10個(gè)周期,且最后一個(gè)消費(fèi)者至少停止了60秒,那么消費(fèi)者將停止.
    超時(shí)依賴于receiveTimeout 和 txSize 屬性.當(dāng)在txSize * receiveTimeout 毫秒內(nèi)未收到消息,則認(rèn)為消費(fèi)者是空閑的.
    因此,當(dāng)有默認(rèn)超時(shí)(1秒)和 txSize為4,那么在空閑40秒后,會(huì)認(rèn)為消費(fèi)者是空閑的并會(huì)停止(4超時(shí)對(duì)應(yīng)1個(gè)空閑檢測(cè)).


    實(shí)際上,如果整個(gè)容器空閑一段時(shí)間,消費(fèi)者將只會(huì)被停止。這是因?yàn)閎roker將分享其在所有活躍的消費(fèi)者的工作。

    3.1.17 專用消費(fèi)者

    也是從1.3版本開始,監(jiān)聽器容器可配置單個(gè)專用消費(fèi)者; 這可以阻其它容器來(lái)消費(fèi)隊(duì)列直到當(dāng)前消費(fèi)者退出. 

    這樣的容器的并發(fā)性必須是1。

    當(dāng)使用專用消費(fèi)者時(shí),其它容器會(huì)根據(jù)recoveryInterval 屬性來(lái)消費(fèi)隊(duì)列, 如果嘗試失敗,會(huì)記錄一個(gè) WARNing 信息.

    3.1.18 監(jiān)聽器容器隊(duì)列

    1.3版本在監(jiān)聽器容器中引入許多處理多個(gè)隊(duì)列的改善措施.

    容器配置必須監(jiān)聽至少一個(gè)隊(duì)列以上以前也是這樣的情況,但現(xiàn)在可以在運(yùn)行時(shí)添加和刪除隊(duì)列了。當(dāng)任何預(yù)先獲取的消息被處理后,容器將回收(取消和重新創(chuàng)建)。
    參考方法
    addQueuesaddQueueNamesremoveQueuesand removeQueueNames.當(dāng)刪除隊(duì)列時(shí),至少要保留一個(gè)隊(duì)列.

    現(xiàn)在,只要有可用隊(duì)列消費(fèi)者就會(huì)啟動(dòng) -先前如果沒(méi)有可用隊(duì)列,容器會(huì)停止.現(xiàn)在,唯一的問(wèn)題是是否有可用隊(duì)列.如果只是部分隊(duì)列可用,容器會(huì)每60秒嘗試被動(dòng)聲明(和消費(fèi))缺失隊(duì)列.

    此外,如果消費(fèi)才從broker中收到了通道(例如,隊(duì)列被刪除)消費(fèi)者會(huì)嘗試重新恢復(fù),重新恢復(fù)的消費(fèi)會(huì)繼續(xù)處理來(lái)自其它配置隊(duì)列中的消息. 之前是隊(duì)列上的取消會(huì)取消整個(gè)消費(fèi)者,最終容器會(huì)因缺失隊(duì)列而停止.

    如果你想永久刪除隊(duì)列,你應(yīng)該在刪除隊(duì)列的之前或之后更新容器,以避免消費(fèi).

    3.1.19 恢復(fù):從錯(cuò)誤和代理失敗中恢復(fù)

    介紹

    Spring提供了一些關(guān)鍵的 (最流行的)高級(jí)特性來(lái)處理協(xié)議錯(cuò)誤或中間件失敗時(shí)的恢復(fù)與自動(dòng)重連接. 

    主要的重連接特性可通過(guò)CachingConnectionFactory 自身來(lái)開啟. 它也常有利于使用rabbitadmin自動(dòng)聲明的特點(diǎn).
    除此之外, 如果你關(guān)心保證投遞,你也許需要在RabbitTemplate中使用
    channelTransacted 標(biāo)記以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO (或者自己來(lái)手動(dòng)應(yīng)答) .

    交換器、隊(duì)列和綁定的自動(dòng)聲明

    RabbitAdmin 組件在啟動(dòng)時(shí)可聲明交換器,隊(duì)列,綁定.它是通過(guò)ConnectionListener懶執(zhí)行的,因此如果啟動(dòng)時(shí)broker不存在,也沒(méi)有關(guān)系. 
    Connection 第一次使用時(shí)(如.發(fā)送消息) ,監(jiān)聽器會(huì)被觸發(fā),admin功能也會(huì)應(yīng)用.這種在監(jiān)聽器中自動(dòng)聲明的好處是,如果連接出于任何原因斷開了,(如. broker死了,網(wǎng)絡(luò)中斷問(wèn)題.),它們會(huì)在下次有需要的時(shí)候重新應(yīng)用.

    這種方式的隊(duì)列聲明必須要有固定的名稱;要么是明確聲明,要么是由框架生成AnonymousQueue.匿名隊(duì)列是非持久化的,專用的,且自動(dòng)刪除的.

    重要

    自動(dòng)聲明只在cachingConnectionFactory 緩存模式是CHANNEL (默認(rèn))才可用. 這種限制的存在是因?yàn)閷S煤妥詣?dòng)刪除隊(duì)列是綁定到connection上的.

    同步操作中的故障和重試選項(xiàng)

    如果你在同步序列中使用RabbitTemplate時(shí)丟失了broker的連接,那么Spring AMQP會(huì)拋出一個(gè)AmqpException (通常但并不總是AmqpIOException).
    我們不想隱藏存在問(wèn)題的事實(shí),因此你可以捕獲并對(duì)異常進(jìn)行處理.
    如果你懷疑連接丟失了,而且這不是你的錯(cuò),那么最簡(jiǎn)單的事情就是執(zhí)行再次嘗試操作. 重試操作可以手動(dòng)進(jìn)行,也可以使用Spring Retry來(lái)處理重試(強(qiáng)制或聲明).

    Spring Retry 提供了兩個(gè)AOP攔截器并提供非常靈活的方式來(lái)指定retry的參數(shù)(嘗試的次數(shù),異常類型, 補(bǔ)償算法等等.). Spring AMQP同時(shí)也提供了一些方便的工廠bean來(lái)創(chuàng)建Spring Retry攔截器, 你可以使用強(qiáng)類型回調(diào)接口來(lái)實(shí)現(xiàn)恢復(fù)邏輯.參考Javadocs和 StatefulRetryOperationsInterceptor 和StatelessRetryOperationsInterceptor 的屬性來(lái)了解更多詳情. 

    如果沒(méi)有事務(wù),或者如果一個(gè)事務(wù)是在重試回調(diào)中啟動(dòng)的話,則無(wú)狀態(tài)重試是適當(dāng)?shù)摹W⒁猓鄬?duì)于有狀態(tài)重試,無(wú)狀態(tài)重試只是簡(jiǎn)單配置和分析,如果存在一個(gè)正在進(jìn)行的事務(wù)必須回滾或肯定會(huì)回滾的話, 這種無(wú)狀態(tài)重試則是不合適的.
    在事務(wù)中間掉下來(lái)的連接與回退有同樣的效果, 所以對(duì)于事務(wù)開始于堆棧上的重連接來(lái)說(shuō),有狀態(tài)重試通常是最佳選擇(so for reconnection where the transaction is started higher up the stack, stateful retry is usually the best choice).

    從1.3版本開始,提供了builder API來(lái)幫助在Java中使用這些攔截器(或者在 @Configuration 類中),例如:

    @Bean
    public StatefulRetryOperationsInterceptor interceptor() {
    	return RetryInterceptorBuilder.stateful()
    			.maxAttempts(5)
    			.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
    			.build();
    }

    只有部分retry特性能通過(guò)這種方式,更加高級(jí)的特性需要在RetryTemplate 中配置. 

    參考Spring Retry Javadocs 來(lái)了解可用策略,配置的完整信息.

    消息監(jiān)聽器和異步情況

    如果 MessageListener 因業(yè)務(wù)異常而失敗,異常可由消息監(jiān)聽器容器來(lái)處理,然后它會(huì)繼續(xù)回去監(jiān)聽其它信息.如果失敗是由于掉下的連接引起的(非業(yè)務(wù)異常),那么監(jiān)聽此消費(fèi)者的監(jiān)聽器將退出和重啟.

    SimpleMessageListenerContainer 可以無(wú)逢地進(jìn)行處理,并且它會(huì)在日志中記錄監(jiān)聽器即將重啟. 

    事實(shí)上,它會(huì)循環(huán)不斷地嘗試重新啟動(dòng)消費(fèi)者,只有當(dāng)消費(fèi)者有非常糟糕的行為時(shí),才會(huì)放棄。一個(gè)副作用是,如果broker在容器啟動(dòng)時(shí)關(guān)閉,它將會(huì)繼續(xù)嘗試直到建立一個(gè)連接。

    業(yè)務(wù)異常處理, 相對(duì)于協(xié)議錯(cuò)誤和連接丟失,它可能需要更多考慮和一些自定義配置,特別是處于事務(wù)或 容器應(yīng)答時(shí).
    在2.8.x版本之前, RabbitMQ對(duì)于死信行為沒(méi)有定義,因此默認(rèn)情況下,一個(gè)因拒絕或因業(yè)務(wù)異常導(dǎo)致回退的消息可循環(huán)往
    復(fù)地重新分發(fā).
    要限制客戶端的重新分發(fā)的次數(shù),一個(gè)選擇是在監(jiān)聽器的通知鏈中添加一個(gè)
    StatefulRetryOperationsInterceptor. 攔截器有一個(gè)實(shí)現(xiàn)了自定義死信動(dòng)作的恢復(fù)回調(diào): 

    什么是適合你的特定的環(huán)境。

    另一個(gè)選擇是設(shè)置容器的rejectRequeued屬性為false. 這會(huì)導(dǎo)致丟棄所有失敗的消息.當(dāng)使用RabbitMQ 2.8.x+時(shí),這也有利于傳遞消息到一個(gè)死的信件交換。

    或者,你可以拋出一個(gè)AmqpRejectAndDontRequeueException;這會(huì)阻止消息重新入列,不管defaultRequeueRejected 屬性設(shè)置的是什么.

    通常情況下,可以組合使用這兩種技術(shù) 在通知鏈中使用StatefulRetryOperationsInterceptor, 在此處是MessageRecover 拋出AmqpRejectAndDontRequeueExceptionMessageRecover 會(huì)一直調(diào)用,直到耗盡了所有重試.
    默認(rèn)MessageRecoverer 只是簡(jiǎn)單的消費(fèi)錯(cuò)誤消息,并發(fā)出WARN消息.在這種情況下,消息是通過(guò)應(yīng)答的,且不會(huì)發(fā)送到死信交換器中.

    從1.3版本開始,提供了一個(gè)新的RepublishMessageRecoverer,它允許在重試次數(shù)耗盡后,發(fā)布失敗消息:

    @Bean
    RetryOperationsInterceptor interceptor() {
    	return RetryInterceptorBuilder.stateless()
    			.maxAttempts(5)
    			.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
    			.build();
    }

    RepublishMessageRecoverer 會(huì)使用消息頭的額外信息來(lái)發(fā)布,這些信息包括異常信息,棧軌跡,原始交換器和路由鍵.額外的頭可通過(guò)創(chuàng)建其子類和覆蓋additionalHeaders()方法來(lái)添加.

    重試的異常分類

    Spring Retry 可以非常靈活地決定哪些異常可調(diào)用重試. 默認(rèn)配置是對(duì)所有異常都進(jìn)行重試.用戶異常可以包裝在ListenerExecutionFailedException 中,我們需要確保分類檢查異常原因默認(rèn)的分類只是看頂部級(jí)別的異常。

    從 Spring Retry 1.0.3開始BinaryExceptionClassifier 有一個(gè)屬性traverseCauses (默認(rèn)為false). 當(dāng)當(dāng)為true時(shí),它將遍歷異常的原因,直到它找到一個(gè)匹配或沒(méi)有原因。

    要使用分類重試,需要使用一個(gè)SimpleRetryPolicy ,其構(gòu)造函數(shù)將接受最大嘗試次數(shù),Exception的Map,以及一個(gè)boolean值(traverseCauses),且還需要將此策略注入給RetryTemplate.

    3.1.20 調(diào)試

    Spring AMQP 提供廣泛的日志記錄,尤其是在DEBUG級(jí)別.

    如果你想在應(yīng)用程序和broker之間監(jiān)控AMQP協(xié)議,你可以使用像WireShark的工具, 它有一個(gè)插件可用于解碼協(xié)議.
    另一個(gè)選擇是, RabbitMQ java client自身攜帶了一個(gè)非常有用的工具類:Tracer.當(dāng)以main方式運(yùn)行時(shí),默認(rèn)情況下,它監(jiān)聽于5673 ,并連接本地的5672端口.
    只需要簡(jiǎn)單的運(yùn)行它,并修改你的連接工廠配置,將其連接到本地的5673端口. 它就會(huì)在控制臺(tái)中顯示解碼的協(xié)議信息.參考Tracer javadocs 來(lái)了解詳細(xì)信息.


    3.2 Logging Subsystem AMQP Appenders

    框架為多個(gè)流行的日志系統(tǒng)提供了日志appenders:

    • log4j (從Spring AMQP1.1版本開始)
    • logback (從Spring AMQP1.4版本開始)
    • log4j2 (從Spring AMQP1.6版本開始)

    appenders使用正常機(jī)制為為子系統(tǒng)配置,可用屬性參照下面的規(guī)定。

    3.2.1 共同屬性

    下面的屬性對(duì)于所有appenders都可用:

    Table 3.4. 共同Appender屬性


    PropertyDefaultDescription
    exchangeName
    logs

    用于發(fā)布日志事件的交換器名稱.

    exchangeType
    topic

    發(fā)布日志事件的交換器類型- 只在appender聲明了交換器的情況下才需要. 參考declareExchange.

    routingKeyPattern
    %c.%p

    日志子系統(tǒng)生成路由鍵的模式格式.

    applicationId

    Application ID - 如果模式包含 %X{applicationId},則將其添加到路由鍵.

    senderPoolSize
    2

    用于發(fā)布日志事件的線程數(shù)目.

    maxSenderRetries
    30

    當(dāng)broker不可用時(shí)或有某些錯(cuò)誤時(shí),重試的次數(shù). 延時(shí)重試像: N ^ log(N)N 表示重試次數(shù).

    addresses

    一個(gè)逗號(hào)分隔的broker地址列表: host:port[,host:port]* -覆蓋host 和 port.

    host
    localhost

    要連接RabbitMQ的主機(jī).

    port
    5672
    virtualHost
    /

    要連接的RabbitMQ虛擬主機(jī).

    username
    guest

    要連接RabbitMQ的用戶.

    password
    guest

    要連接RabbitMQ的用戶密碼.

    contentType
    text/plain

    日志消息的content-type屬性

    contentEncoding

     日志消息的content-encoding屬性.

    declareExchange
    false

    當(dāng)appender啟動(dòng)時(shí),是否需要聲明配置的交換器.也可參考 durable 和autoDelete.

    durable
    true

    當(dāng)declareExchange 為 true ,durable 標(biāo)志才會(huì)設(shè)置此值.

    autoDelete
    false

    當(dāng) declareExchange 為true , auto delete 標(biāo)志才會(huì)設(shè)置此值.

    charset
    null

    當(dāng)將字符串轉(zhuǎn)成byte[]時(shí)要使用的編碼,默認(rèn)為null (使用系統(tǒng)默認(rèn)字符集).如果當(dāng)前平臺(tái)上不支持此字符集,將回退到使用系統(tǒng)字符集.

    deliveryMode
    PERSISTENT

    PERSISTENT 或 NON_PERSISTENT 用于決定RabbitMQ是否應(yīng)該持久化消息.

    generateId
    false

    用于確定messageId 屬性是否需要設(shè)置成唯一值.

    clientConnectionProperties
    null

    一個(gè)逗號(hào)分隔的key:value 對(duì),它是連接RabbitMQ時(shí)設(shè)置的自定義客戶端屬性

    3.2.2 Log4j Appender

    樣例log4j.properties片斷. 

    log4j.appender.amqp.addresses=foo:5672,bar:5672
    log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
    log4j.appender.amqp.applicationId=myApplication
    log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
    log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
    log4j.appender.amqp.layout.ConversionPattern=%d %p %t [%c] - <%m>%n
    log4j.appender.amqp.generateId=true
    log4j.appender.amqp.charset=UTF-8
    log4j.appender.amqp.durable=false
    log4j.appender.amqp.deliveryMode=NON_PERSISTENT
    log4j.appender.amqp.declareExchange=true

    3.2.3 Log4j2 Appender

    樣例 log4j2.xml 片斷

    <Appenders>
        ...
        <RabbitMQ name="rabbitmq"
            addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
            exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
            applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
            contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
            charset="UTF-8"
            senderPoolSize="3" maxSenderRetries="5">
        </RabbitMQ>
    </Appenders>

    3.2.4 Logback Appender

    樣例 logback.xml 片斷

    <appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
        <layout>
            <pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
        </layout>
        <addresses>foo:5672,bar:5672</addresses>
        <abbreviation>36</abbreviation>
        <applicationId>myApplication</applicationId>
        <routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
        <generateId>true</generateId>
        <charset>UTF-8</charset>
        <durable>false</durable>
        <deliveryMode>NON_PERSISTENT</deliveryMode>
        <declareExchange>true</declareExchange>
    </appender>

    3.2.5 定制Messages

    每個(gè)appenders都可以子類化,以允許你在發(fā)布前修改消息.

    Customizing the Log Messages. 

    public class MyEnhancedAppender extends AmqpAppender {
    
        @Override
     public Message postProcessMessageBeforeSend(Message message, Event event) {
            message.getMessageProperties().setHeader("foo", "bar");
            return message;
        }
    
    }

    3.2.6 定制客戶端屬性

    簡(jiǎn)化 String 屬性

    每個(gè)appender都支持在RabbitMQ連接中添加客戶端屬性.

    log4j. 

    log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux

    logback. 

    <appender name="AMQP"...>...
    <clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
    ...</appender>

    log4j2. 

    <Appenders>
        ...
        <RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>

    這些屬性是逗號(hào)分隔的key:value 隊(duì)列表; 鍵和值不能包含逗號(hào)或 冒號(hào).

    當(dāng)RabbitMQ Admin UI中查看連接上,你會(huì)看到這些屬性.

    Log4j和logback先進(jìn)技術(shù)

    使用 log4j 和 logback appenders, appenders 可以是子類化的, 允許你在連接建立前,修改客戶連接屬性:

    定制客戶端連接屬性. 

    public class MyEnhancedAppender extends AmqpAppender {
    
        private String foo;
    
        @Override
    protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
            clientProperties.put("foo", this.foo);
        }
    
        public void setFoo(String foo) {
            this.foo = foo;
        }
    
    }

    對(duì)于 log4j2, 添加 log4j.appender.amqp.foo=bar 到log4j.properties 來(lái)設(shè)置發(fā)展.

    對(duì)于logback, 在logback.xml中添加 <foo>bar</foo> .

    當(dāng)然,對(duì)于像這個(gè)例子中簡(jiǎn)單的String 屬性,可以使用先前的技術(shù);

    子類允許更豐富的屬性(如添加 Map 的numeric 屬性).

    使用log4j2, 子類是不被支持的,因?yàn)?log4j2 使用靜態(tài)工廠方法.

    3.3 樣例應(yīng)用程序

    3.3.1 介紹

     Spring AMQP Samples 項(xiàng)目包含了兩個(gè)樣例應(yīng)用程序. 第一個(gè)簡(jiǎn)單的"Hello World" 示例演示了同步和異步消息的處理. 它為理解基礎(chǔ)部分提供了一個(gè)很好的開端.
    第二個(gè)基于股票交易的例子演示了真實(shí)應(yīng)用程序中的交互場(chǎng)景.在本章中,我們會(huì)每個(gè)示例進(jìn)行快速瀏覽,
    使您可以專注于最重要的組成部分.
    這兩個(gè)例子都是基于Maven的,因此你可以直接將它們導(dǎo)入任何支持Maven的IDE中(如. 
    SpringSource Tool Suite).

    3.3.2 Hello World

    介紹

    Hello World示例演示了同步和異步消息處理.你可以導(dǎo)入spring-rabbit-helloworld 示例到IDE中并跟隨下面的討論.

    同步例子

    src/main/java 目錄中,導(dǎo)航到org.springframework.amqp.helloworld 包中.

    打開HelloWorldConfiguration 類,你可以注意到它包含了@Configuration 類級(jí)注解和一些@Bean 方法級(jí)注解. 

    這是Spring 的基于Java的配置.你可進(jìn)一步的了解here.

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =null;
        connectionFactory =new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; }

    配置中同樣也包含了RabbitAdmin的實(shí)例, 它會(huì)默認(rèn)查找類型為Exchange, Queue, 或 Binding的bean并在broker中進(jìn)行聲明.
    事實(shí)上,"helloWorldQueue" bean是在HelloWorldConfiguration 中生成的,因?yàn)樗?Queue的實(shí)例.

    @Bean
    public Queue helloWorldQueue() {
        returnnew Queue(this.helloWorldQueueName);
    }

    重看"rabbitTemplate"bean配置,你會(huì)看到它將helloWorldQueue的名稱設(shè)成了"queue"屬性(用于接收消息) 以及"routingKey" 屬性(用于發(fā)送消息).

    現(xiàn)在,我們已經(jīng)探索了配置,讓我們看看實(shí)際上使用這些組件的代碼。
    首先,從同一個(gè)包內(nèi)打開Producer類。它包含一個(gè)用于創(chuàng)建Spring ApplicationContext的main()方法.


    publicstaticvoid main(String[] args) {

        ApplicationContext context =
            new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("Hello World"); System.out.println("Sent: Hello World"); }

    在上面的例子中你可以看到, 取回的AmqpTemplate用來(lái)發(fā)送消息.因?yàn)榭蛻舳舜a應(yīng)該盡可能地依賴于接口,因此類型是AmqpTemplate而不是RabbitTemplate.
    即使在HelloWorldConfiguration中創(chuàng)建的bean是RabbitTemplate的實(shí)例,依賴于接口則意味著這端代碼更具有便攜性(portable) (配置可以獨(dú)立于代碼進(jìn)行修改).
    因?yàn)閏onvertAndSend() 方法是通過(guò)模板來(lái)調(diào)用的,因此模板會(huì)將調(diào)用委派給它的MessageConverter實(shí)例.在這種情況下,它默認(rèn)使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中為"rabbitTemplate"指定其它的實(shí)現(xiàn).

    現(xiàn)在打開Consumer類. 它實(shí)際上共享了同一個(gè)配置基類,這意味著它將共享"rabbitTemplate" bean. 這就是為什么我們要使用"routingKey" (發(fā)送) 和"queue" (接收)來(lái)配置模板的原因.
    正如你在Section 3.1.4, “AmqpTemplate”中看到的,你可以代替在發(fā)送方法中傳遞routingKey參數(shù),代替在接收方法中傳遞queue 參數(shù).  Consumer 代碼基本上是Producer的鏡子,只不過(guò)調(diào)用的是receiveAndConvert()而非convertAndSend()方法.

    publicstaticvoid main(String[] args) {
        ApplicationContext context =
            new AnnotationConfigApplicationContext(RabbitConfiguration.class);
        AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
        System.out.println("Received: " + amqpTemplate.receiveAndConvert());
    }

    你如果運(yùn)行Producer,然后再運(yùn)行Consumer, 在控制臺(tái)輸出中,你應(yīng)該能看到消息"Received: Hello World" 

    異步示例

    我們已經(jīng)講解了同步Hello World樣例, 是時(shí)候移動(dòng)到一個(gè)稍微先進(jìn),但更強(qiáng)大的選擇上了.稍微修改一下代碼,Hello World 樣例就可以可以提供異步接收的示例了,又名 Message-driven POJOs. 事實(shí)上,有一個(gè)子包明確地提供了這種功能: org.springframework.amqp.samples.helloworld.async.

    再一次地我們將從發(fā)送端開始. 打開ProducerConfiguration類可注意到它創(chuàng)建了一個(gè)"connectionFactory"和"rabbitTemplate" bean.
    這次,由于配置是專用于消息發(fā)送端,因此我們不需要任何隊(duì)列定義,RabbitTemplate只須設(shè)置routingKey屬性.
    回想一下,消息是發(fā)送到交換器上的而不是直接發(fā)到隊(duì)列上的. AMQP默認(rèn)交換器是無(wú)名稱的direct類型交換器.
    所有隊(duì)列都是通過(guò)使用它們的名稱作為路由鍵綁定到默認(rèn)交換器上的.這就是為什么在這里我們只提供路由鍵的原因.

    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.helloWorldQueueName);
        return template;
    }

    由于這個(gè)示例展示的是異步消息處理,生產(chǎn)方設(shè)計(jì)為連續(xù)發(fā)送消息(盡管類似于同步版本中的 message-per-execution模型,但不太明顯,實(shí)際上它是消息驅(qū)動(dòng)消費(fèi)者)負(fù)責(zé)連續(xù)發(fā)送消息的組件是作為ProducerConfiguration類中的內(nèi)部類來(lái)定義的,每3秒執(zhí)行一次

    static class ScheduledProducer {
    
       @Autowired
     private volatile RabbitTemplate rabbitTemplate;
    
        private final AtomicInteger counter = new AtomicInteger();
    
        @Scheduled(fixedRate = 3000)
     public void sendMessage() {
            rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
        }
    }

    你不必要完全了解這些細(xì)節(jié),因?yàn)檎嬲年P(guān)注點(diǎn)是接收方(我們馬上就會(huì)講解).然而,如果你還熟悉Spring 3.0 任務(wù)調(diào)度支持,你可從here這里來(lái)了解.
    簡(jiǎn)短故事是:在 ProducerConfiguration 中的"postProcessor" bean使用調(diào)度器來(lái)注冊(cè)了任務(wù).

    現(xiàn)在,讓我們轉(zhuǎn)向接收方. 為強(qiáng)調(diào) Message-driven POJO 行為,將從對(duì)消息起反應(yīng)的組件開始. 

    此類被稱為HelloWorldHandler.

    publicclass HelloWorldHandler {
    
        publicvoid handleMessage(String text) {
            System.out.println("Received: " + text);
        }
    
    }

    相當(dāng)明顯的, 這是一個(gè)POJO. 它沒(méi)有繼承任何基類,它沒(méi)有實(shí)現(xiàn)任何接口,它甚至不包含任何導(dǎo)入. 它將通過(guò)Spring AMQP MessageListenerAdapter來(lái)適配MessageListener接口.然后適配器可配置在SimpleMessageListenerContainer上.
    在這個(gè)例子中,容器是在ConsumerConfiguration類中創(chuàng)建的.你可以看到POJO是包裝在適配器中的.

    @Bean
    public SimpleMessageListenerContainer listenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueName(this.helloWorldQueueName);
        container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
        return container;
    }

    SimpleMessageListenerContainer是一個(gè)Spring生命周期組件,默認(rèn)會(huì)自動(dòng)啟動(dòng).如果你看了Consumer類的內(nèi)部,你會(huì)看到main()方法中除了一行啟動(dòng)創(chuàng)建ApplicationContext的代碼外,其它什么都沒(méi)有.
    Producer的main()方法也只有一行啟動(dòng),因?yàn)橐?@Scheduled注解的組件會(huì)自動(dòng)開始執(zhí)行.你可以任何順序來(lái)啟動(dòng)Producer 和Consumer,你會(huì)看每秒就會(huì)發(fā)送消息和接收到消息.

    3.3.3 股票交易(Stock Trading)

    Stock Trading 示例演示了比Hello World示例更高級(jí)的消息場(chǎng)景.然而,配置卻是很相似的 - 只是有一點(diǎn)復(fù)雜.
    由于我們已經(jīng)詳細(xì)講解了Hello World配置,因此在這里我們將重點(diǎn)關(guān)注不一樣的東西. 有一個(gè)服務(wù)器發(fā)送市場(chǎng)數(shù)據(jù)(
    股票報(bào)價(jià))到Topic交換器中.
    然后,客戶端可訂閱市場(chǎng)數(shù)據(jù),即通過(guò)使用路由模式(如. "app.stock.quotes.nasdaq.*")來(lái)綁定隊(duì)列(e.g. "app.stock.quotes.nasdaq.*").
    這個(gè)例子的其它主要功能是 有一個(gè)
    請(qǐng)求回復(fù)“股票交易”的互動(dòng),它是由客戶發(fā)起并由服務(wù)器來(lái)處理的這涉及到一個(gè)私有的“回復(fù)(replyTo)”隊(duì)列,發(fā)送客戶端的信息在請(qǐng)求消息中。

    服務(wù)器的核心配置在RabbitServerConfiguration類中(位于 org.springframework.amqp.rabbit.stocks.config.server 包中).
    它繼承了 AbstractStockAppRabbitConfiguration. 這是服務(wù)器和客戶端定義常用資源的地方,包括市場(chǎng)數(shù)據(jù)Topic交換器(其名稱為app.stock.marketdata) 以及服務(wù)器公開股票交易的隊(duì)列(其名稱為app.stock.request).
    在那個(gè)公共配置文件中,你會(huì)看到在RabbitTemplate上配置了一個(gè)JsonMessageConverter.

    服務(wù)器特有配置由2部分組成.首先,它在RabbitTemplate上配置了市場(chǎng)數(shù)據(jù)交換器,這樣在發(fā)送消息時(shí),就不必提供交換器名稱.它是通過(guò)基礎(chǔ)配置類中的抽象回調(diào)方法中定義做到這一點(diǎn)的.

    public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
    }

    其次, 聲明了股票請(qǐng)求隊(duì)列.在這里,它不需要任何明確的綁定,因?yàn)樗鼘⒁运约旱拿Q作為路由鍵來(lái)綁定到無(wú)名稱的默認(rèn)交換器上.正如先前提到的,AMQP規(guī)范定義了此種行為.

    @Beanpublic Queue stockRequestQueue() {
        returnnew Queue(STOCK_REQUEST_QUEUE_NAME);
    }

    現(xiàn)在你已經(jīng)看過(guò)了服務(wù)器的AMQP資源配置,導(dǎo)航到src/test/java目錄下的org.springframework.amqp.rabbit.stocks包.在那里你會(huì)實(shí)際的 提供了main()方法的Server類.
    它基于server-bootstrap.xml 創(chuàng)建了一個(gè)ApplicationContext.在那里,你會(huì)看到發(fā)布虛假市場(chǎng)數(shù)據(jù)的調(diào)度任務(wù).

    那個(gè)配置依賴于Spring 3.0的"task"命名空間支持.bootstrap配置文件也導(dǎo)入了其它一些文件.最令人關(guān)注的是位于src/main/resources目錄下的server-messaging.xml.在那里,你會(huì)看到"messageListenerContainer" bean,它負(fù)責(zé)處理股票交易請(qǐng)求.
    最后在看一下定義在src/main/resources目錄下的server-handlers.xml,其中定義了一個(gè) "serverHandler" bean
    .這個(gè)bean是ServerHandler類的實(shí)例,它是Message-driven POJO 的好例子,它也有發(fā)送回復(fù)消息的能力.
    注意,它自身并沒(méi)有與框架或任何AMQP概念耦合.它只是簡(jiǎn)單地接受TradeRequest并返回一個(gè)TradeResponse.

    public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
    }

    現(xiàn)在我們已經(jīng)看了服務(wù)端的重要配置和代碼,讓我們轉(zhuǎn)向客戶端.最佳起點(diǎn)是從 org.springframework.amqp.rabbit.stocks.config.client 包下的RabbitClientConfiguration開始. 

    注意,它聲明了兩個(gè)不帶明確參數(shù)的隊(duì)列.

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }
    
    @Bean
    public Queue traderJoeQueue() {
        return amqpAdmin().declareQueue();
    }

    那些是私有隊(duì)列, 唯一名稱會(huì)自動(dòng)自成.客戶端會(huì)用第一個(gè)生成的隊(duì)列來(lái)綁定由服務(wù)端公開的市場(chǎng)交換器.
    記住在AMQP中,消費(fèi)者與隊(duì)列交互,而生產(chǎn)者與交換器交互. 隊(duì)列和交換器之間的綁定指示broker從給定的交換器中投遞或路由什么消息給隊(duì)列.
    由于市場(chǎng)交換器是一個(gè)Topic交換器,綁定可通過(guò)路由正則表達(dá)式來(lái)表達(dá). 

    RabbitClientConfiguration聲明了一個(gè)Binding對(duì)象,其對(duì)象是通過(guò)BindingBuilder的便利API來(lái)生成的.

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;
    
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
            marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    注意,實(shí)際值已經(jīng)在屬性文件(src/main/resources目錄下的"client.properties")中外部化了,因此我們使用Spring的@Value 注解來(lái)注入值.這通常是一個(gè)好主意,否則值就會(huì)硬編碼在類中,沒(méi)有修改就沒(méi)有重新編譯.

    在這種情況下,通過(guò)修改綁定中的路由正則表達(dá)式,可很容易地運(yùn)行多個(gè)版本的Client.讓我們立即嘗試.

    啟動(dòng)運(yùn)行org.springframework.amqp.rabbit.stocks.Server然后再運(yùn)行 org.springframework.amqp.rabbit.stocks.Client.你將會(huì)看到NASDAQ股票的交易報(bào)價(jià),因?yàn)殛P(guān)聯(lián)stocks.quote.pattern 鍵的值在client.properties中是app.stock.quotes.nasdaq.
    現(xiàn)在,保持現(xiàn)有Server 和Client 運(yùn)行,將其屬性值修改為app.stock.quotes.nyse.再啟動(dòng)第二個(gè)
    Client實(shí)例.你會(huì)看到第一個(gè)client仍然接收NASDAQ 報(bào)價(jià),而第二個(gè)client接收的NYSE報(bào)價(jià). 
    你可以改變模式,獲取所有的股票報(bào)價(jià)或個(gè)別股票的報(bào)價(jià)。

    最后一個(gè)我們將暴露的特性是從客戶端的角度來(lái)看待請(qǐng)求-回復(fù)交互.記住我們已經(jīng)看了ServerHandler,它會(huì)接受TradeRequest對(duì)象并返回TradeResponse對(duì)象. 客戶端相應(yīng)的代碼是 RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway 包).為發(fā)送消息,它會(huì)委派給RabbitTemplate.

    public void send(TradeRequest tradeRequest) {
        getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
                try {
                    message.getMessageProperties().setCorrelationId(
                        UUID.randomUUID().toString().getBytes("UTF-8"));
                }
                catch (UnsupportedEncodingException e) {
                    thrownew AmqpException(e);
                }
                return message;
            }
        });
    }

    注意,在發(fā)送消息前,它設(shè)置了"replyTo"地址. 這提供了隊(duì)列,此隊(duì)列是由上面的"traderJoeQueue" bean 定義生成的. 以下是StockServiceGateway類的@Bean定義.

    @Bean
    public StockServiceGateway stockServiceGateway() {
        RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
        gateway.setRabbitTemplate(rabbitTemplate());
        gateway.setDefaultReplyToQueue(traderJoeQueue());
        return gateway;
    }

    如果你沒(méi)有運(yùn)行服務(wù)器和客戶端,現(xiàn)在就啟動(dòng)它們. 嘗試使用100 TCKR的格式來(lái)發(fā)送請(qǐng)求.經(jīng)過(guò)一個(gè)簡(jiǎn)短的人工延遲來(lái)模擬“處理”請(qǐng)求,你應(yīng)該看到一個(gè)確認(rèn)消息出現(xiàn)在客戶端上。

    3.4 測(cè)試支持

    3.4.1 介紹

    為異步程序?qū)懠蓽y(cè)試比測(cè)試簡(jiǎn)單程序更復(fù)雜. 當(dāng)引入了@RabbitListener這樣的注解時(shí),這尤其更加復(fù)雜.

    現(xiàn)在的問(wèn)題是發(fā)送消息后,如何來(lái)驗(yàn)證, 監(jiān)聽器按預(yù)期收到了消息.

    框架自身帶有許多單元測(cè)試和集成測(cè)試;有些使用mocks, 另外一些使用真實(shí)的RabbitMQ broker來(lái)集成測(cè)試. 您可以參照測(cè)試場(chǎng)景的一些想法進(jìn)行測(cè)試。

    Spring AMQP 1.6版本引入了sring-rabbit-test jar ,它提供一些測(cè)試復(fù)雜場(chǎng)景的測(cè)試. 預(yù)計(jì)這一項(xiàng)目將隨著時(shí)間的推移進(jìn)行擴(kuò)展,但我們需要社會(huì)反饋以幫助測(cè)試。請(qǐng)使用JIRA問(wèn)題或GitHub提供這樣的反饋。

    3.4.2 Mockito Answer<?> 實(shí)現(xiàn)

    當(dāng)前有兩個(gè)Answer<?> 實(shí)現(xiàn)可幫助測(cè)試:

    第一個(gè), LatchCountDownAndCallRealMethodAnswer 提供了返回null和計(jì)數(shù)下一個(gè)鎖存器的Answer<Void>.

    LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
    doAnswer(answer)
        .when(listener).foo(anyString(), anyString());
    
    ...
    
    assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));

    第二個(gè), LambdaAnswer<T> 提供了一種調(diào)用真正方法的機(jī)制,并提供機(jī)會(huì)來(lái)返回定制結(jié)果(基于InvocationOnMock和結(jié)果).

    public class Foo {
    
        public String foo(String foo) {
            return foo.toUpperCase();
        }
    
    }
    Foo foo = spy(new Foo());
    
    doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
        .when(foo).foo(anyString());
    assertEquals("FOOFOO", foo.foo("foo"));
    
    doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
        .when(foo).foo(anyString());
    assertEquals("FOOfoo", foo.foo("foo"));
    
    doAnswer(new LambdaAnswer<String>(false, (i, r) ->
        "" + i.getArguments()[0] + i.getArguments()[0])).when(foo).foo(anyString());
    assertEquals("foofoo", foo.foo("foo"));

    When using Java 7 or earlier:

    doAnswer(new LambdaAnswer<String>(true, new ValueToReturn<String>() {
        @Overridepublic String apply(InvocationOnMock i, String r) {
            return r + r;
        }
    })).when(foo).foo(anyString());

    3.4.3 @RabbitListenerTest and RabbitListenerTestHarness

    在你的@Configuration 類中使用 @RabbitListenerTest (它也會(huì)通過(guò)@EnableRabbit來(lái)啟用@RabbitListener 探測(cè)).注解會(huì)導(dǎo)致框架使用子類RabbitListenerTestHarness來(lái)代替標(biāo)準(zhǔn)RabbitListenerAnnotationBeanPostProcessor.

    RabbitListenerTestHarness 通過(guò)兩種方式來(lái)增強(qiáng)監(jiān)聽器 - 將其包裝進(jìn)Mockito Spy, 啟用了Mockito 存根和驗(yàn)證操作.也可在監(jiān)聽器添加Advice 來(lái)啟用對(duì)參數(shù),結(jié)果或異常的訪問(wèn).
    你可以控制哪一個(gè)(或兩個(gè))來(lái)在
    @RabbitListenerTest啟用屬性
    . 后者用于訪問(wèn)調(diào)用中更為低級(jí)數(shù)據(jù)- 它也支持測(cè)試線程阻塞,直到異步監(jiān)聽器被調(diào)用.

    重要

    final @RabbitListener 不能被發(fā)現(xiàn)或通知 ,同時(shí),只有帶id屬性的監(jiān)聽器才能發(fā)現(xiàn)或通知.

    讓我們看一些例子.

    使用spy:

    @Configuration
    @RabbitListenerTest
    public class Config {
    
        @Bean
     public Listener listener() {
            returnnew Listener();
        }
    
        ...
    
    }
    
    public class Listener {
    
        @RabbitListener(id="foo", queues="#{queue1.name}")
     public String foo(String foo) {
            return foo.toUpperCase();
        }
    
        @RabbitListener(id="bar", queues="#{queue2.name}")
     public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
            ...
        }
    
    }
    
    public class MyTests {
    
        @Autowired
        private RabbitListenerTestHarness harness;
    1@Test
     public void testTwoWay() throws Exception {
            assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"    ));
    
            Listener listener = this.harness.getSpy("foo"); 2
            assertNotNull(listener);
            verify(listener).foo("foo");
        }
    
        @Test
    public void testOneWay() throws Exception {
            Listener listener = this.harness.getSpy("bar");
            assertNotNull(listener);
    
            LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2); 3
            doAnswer(answer).when(listener).foo(anyString(), anyString());4
      this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
      this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
    
            assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
            verify(listener).foo("bar", this.queue2.getName());
            verify(listener).foo("baz", this.queue2.getName());
        }
    
    }

    1

    將harness 注入進(jìn)測(cè)試用于,這樣我們可訪問(wèn)spy.

    2

    獲取spy引用,這樣我們可以驗(yàn)證是否按預(yù)期在調(diào)用. 由于這是一個(gè)發(fā)送和接收操作,因此不必暫停測(cè)試線程,因?yàn)?code style="font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; color: #6d180b; background-color: inherit;">RabbitTemplate 在等待回復(fù)時(shí)已經(jīng)暫停過(guò)了.

    3

    在這種情況下,我們只使用了發(fā)送操作,因?yàn)槲覀冃枰粋€(gè)門閂來(lái)等待對(duì)容器線程中監(jiān)聽器的異步調(diào)用. 我們使用了Answer<?> 一個(gè)實(shí)現(xiàn)來(lái)幫助完成.

    4

    配置spy來(lái)調(diào)用Answer.

    使用捕獲建議:
    @Configuration
    @ComponentScan
    @RabbitListenerTest(spy = false, capture = true)
    public class Config {
    
    }
    
    @Service
    public class Listener {
    
        private boolean failed;
    
        @RabbitListener(id="foo", queues="#{queue1.name}")
     public String foo(String foo) {
            return foo.toUpperCase();
        }
    
        @RabbitListener(id="bar", queues="#{queue2.name}")
    public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
            if (!failed && foo.equals("ex")) {
                failed = true;
                thrownew RuntimeException(foo);
            }
            failed = false;
        }
    
    }
    
    public class MyTests {
    
         @Autowired
      private RabbitListenerTestHarness harness; 1
      @Test
      public void testTwoWay() throws Exception {
            assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"  ));
    
            InvocationData invocationData =
                this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS); 2
            assertThat(invocationData.getArguments()[0], equalTo("foo"));     3
            assertThat((String) invocationData.getResult(), equalTo("FOO"));
        }
    
        @Test
      public void testOneWay() throws Exception {
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
    
            InvocationData invocationData =
                this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS); 4
            Object[] args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("bar"));
            assertThat((String) args[1], equalTo(queue2.getName()));
    
            invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
            args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("baz"));
    
            invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
            args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("ex"));
            assertEquals("ex", invocationData.getThrowable().getMessage()); 5
        }
    
    }

    1

    將harness注入進(jìn)測(cè)試用例,以便我們能獲取spy的訪問(wèn).

    2

    使用 harness.getNextInvocationDataFor() 來(lái)獲取調(diào)用數(shù)據(jù) - 在這種情況下,由于它處于request/reply 場(chǎng)景,因?yàn)闆](méi)有必要等待,因?yàn)闇y(cè)試線程在RabbitTemplate 中等待結(jié)果的時(shí)候,已經(jīng)暫停過(guò)了.

    3

    我們可以驗(yàn)證參數(shù)和結(jié)果是否與預(yù)期一致

    4

    這次,我們需要時(shí)間來(lái)等待數(shù)據(jù),因?yàn)樗谌萜骶€程上是異步操作,我們需要暫停測(cè)試線程.

    5

    當(dāng)監(jiān)聽器拋出異常時(shí),可用調(diào)用數(shù)據(jù)中的throwable 屬性


    4. Spring 整合- 參考

    這部分參考文檔提供了在Spring集成項(xiàng)目中提供AMQP支持的快速介紹.

    4.1 Spring 整合AMQP支持4.1.1 介紹

    Spring Integration 項(xiàng)目包含了構(gòu)建于Spring AMQP項(xiàng)目之上的AMQP 通道適配器(Channel Adapters)和網(wǎng)關(guān)(Gateways). 那些適配器是在Spring集成項(xiàng)目中開發(fā)和發(fā)布的.在Spring 集成中, "通道適配器" 是單向的,而網(wǎng)關(guān)是雙向的(請(qǐng)求-響應(yīng)).
    我們提供了入站通道適配器(inbound-channel-adapter),出站通道適配器( outbound-channel-adapter), 入站網(wǎng)關(guān)(inbound-gateway),以及出站網(wǎng)關(guān)(outbound-gateway).

    由于AMQP 適配器只是Spring集成版本的一部分,因?yàn)槲臋n也只針對(duì)Spring集成發(fā)行版本部分可用. 

    作為一個(gè)品酒師,我們只快速了解這里的主要特征。

    4.1.2 入站通道適配器

    為了從隊(duì)列中接收AMQP消息,需要配置一個(gè)個(gè)<inbound-channel-adapter>

    <amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

    4.1.3 站通道適配器
    為了向交換器發(fā)送AMQP消息,需要配置一個(gè)<outbound-channel-adapter>. 除了交換名稱外,還可選擇提供路由鍵。

    <amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange"    routing-key="foo" amqp-template="rabbitTemplate"/>

    4.1.4 入站網(wǎng)關(guān)

    為了從隊(duì)列中接收AMQP消息,并回復(fù)到它的reply-to地址,需要配置一個(gè)<inbound-gateway>.

    <amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP"         queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

    4.1.5 出站網(wǎng)關(guān)

    為了向交換器發(fā)送AMQP消息并接收來(lái)自遠(yuǎn)程客戶端的響應(yīng),需要配置一個(gè)<outbound-gateway>.

    除了交換名稱外,還可選擇提供路由鍵。

    <amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP"       exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>

    5. 其它資源

    除了這份參考文檔,還有其它資源可幫助你了解AMQP.

    5.1 進(jìn)階閱讀

    對(duì)于那些不熟悉AMQP的人來(lái)說(shuō),  規(guī)范 實(shí)際上具有相當(dāng)?shù)目勺x性. 

    這當(dāng)然是信息的權(quán)威來(lái)源,對(duì)于熟悉規(guī)范的人來(lái)說(shuō),Spring AMQP代碼應(yīng)該很容易理解。
    目前RabbitMQ實(shí)現(xiàn)基于2.8.x版本,并正式支持AMQP 0.8和9.1。我們推薦閱讀9.1文檔。

    在RabbitMQ Getting Started 頁(yè)面上,還有許多精彩的文章,演示, 博客. 因?yàn)楫?dāng)前只有Spring AMQP實(shí)現(xiàn), 但我們?nèi)越ㄗh將其作為了解所有中間件相關(guān)概念的起點(diǎn).




    posted on 2016-08-13 16:24 胡小軍 閱讀(6554) 評(píng)論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 亚洲综合无码一区二区| 久久一区二区三区免费播放| 四虎永久免费地址在线观看| 亚洲欧美不卡高清在线| 成人免费无码大片a毛片软件| 久久久久亚洲国产| 91免费资源网站入口| 亚洲AV无码国产精品色| 99在线视频免费观看视频| 香蕉大伊亚洲人在线观看| 免费看少妇作爱视频| 色窝窝亚洲av网| | 亚洲人成77777在线观看网| 国产91免费在线观看| 亚洲高清视频在线| 免费羞羞视频网站| 无遮挡呻吟娇喘视频免费播放| 亚洲AV成人精品日韩一区18p| eeuss草民免费| 图图资源网亚洲综合网站| 久久中文字幕免费视频| 亚洲人成网网址在线看| 免费理论片51人人看电影| 美女免费精品高清毛片在线视| 亚洲视频人成在线播放| 三年片在线观看免费| 亚洲色图在线播放| 99久久这里只精品国产免费| 亚洲Aⅴ在线无码播放毛片一线天 亚洲avav天堂av在线网毛片 | 国产一区在线观看免费| 九九全国免费视频| 国产AV无码专区亚洲Av| 88xx成人永久免费观看| 亚洲人成图片网站| 可以免费观看一级毛片黄a| 成人A毛片免费观看网站| 亚洲一区二区成人| 午夜视频免费观看| www.xxxx.com日本免费| 亚洲最新视频在线观看|