<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Terry.Li-彬

    虛其心,可解天下之問(wèn);專其心,可治天下之學(xué);靜其心,可悟天下之理;恒其心,可成天下之業(yè)。

      BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      143 隨筆 :: 344 文章 :: 130 評(píng)論 :: 0 Trackbacks

    2.6 Features
    ??? ActiveMQ包含了很多功能強(qiáng)大的特性,下面簡(jiǎn)要介紹其中的幾個(gè)。
    2.6.1 Exclusive Consumer
    ??? Queue中的消息是按照順序被分發(fā)到consumers的。然而,當(dāng)你有多個(gè)consumers同時(shí)從相同的queue中提取消息時(shí),你將失去這個(gè)保 證。因?yàn)檫@些消息是被多個(gè)線程并發(fā)的處理。有的時(shí)候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結(jié)束之前執(zhí)行更新這個(gè)訂單的操 作。
    ??? ActiveMQ從4.x版本起開(kāi)始支持Exclusive Consumer (或者說(shuō)Exclusive Queues)。 Broker會(huì)從多個(gè)consumers中挑選一個(gè)consumer來(lái)處理queue中所有的消息,從而保證了消息的有序處理。如果這個(gè)consumer 失效,那么broker會(huì)自動(dòng)切換到其它的consumer。
    ??? 可以通過(guò)Destination Options 來(lái)創(chuàng)建一個(gè)Exclusive Consumer,如下:

    Java代碼
    1. queue?=? new ?ActiveMQQueue( "TEST.QUEUE?consumer.exclusive=true" );??
    2. consumer?=?session.createConsumer(queue);??
    ??? 順便說(shuō)一下,可以給consumer設(shè)置優(yōu)先級(jí),以便針對(duì)網(wǎng)絡(luò)情況(如network hops)進(jìn)行優(yōu)化,如下:
    Java代碼
    1. queue?=?new?ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true?&consumer.priority=10");??

    ?

    2.6.2 Message Groups
    ??? 用Apache官方文檔的話說(shuō),Message Groups rock!它是Exclusive Consumer功能的增強(qiáng)。邏輯上,Message Groups 可以看成是一種并發(fā)的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來(lái)區(qū)分message group。Message Groups特性保證所有具有相同JMSXGroupID 的消息會(huì)被分發(fā)到相同的consumer(只要這個(gè)consumer保持active)。另外一方面,Message Groups特性也是一種負(fù)載均衡的機(jī)制。
    ??? 在一個(gè)消息被分發(fā)到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker 會(huì)檢查是否有某個(gè)consumer擁有這個(gè)message group。如果沒(méi)有,那么broker會(huì)選擇一個(gè)consumer,并將它關(guān)聯(lián)到這個(gè)message group。此后,這個(gè)consumer會(huì)接收這個(gè)message group的所有消息,直到:

    • Consumer被關(guān)閉。
    • Message group被關(guān)閉。通過(guò)發(fā)送一個(gè)消息,并設(shè)置這個(gè)消息的JMSXGroupSeq為0。

    ?? 從4.1版本開(kāi)始,ActiveMQ支持一個(gè)布爾字段JMSXGroupFirstForConsumer 。當(dāng)某個(gè)message group的第一個(gè)消息被發(fā)送到consumer的時(shí)候,這個(gè)字段被設(shè)置。如果客戶使用failover transport連接到broker。在由于網(wǎng)絡(luò)問(wèn)題等造成客戶重新連接到broker的時(shí)候,相同message group的消息可能會(huì)被分發(fā)到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會(huì)被重新設(shè)置。?

    ?? 以下是使用message groups的例子:

    Java代碼
    1. Mesasge?message?=?session.createTextMessage("<foo>hey</foo>");??
    2. message.setStringProperty("JMSXGroupID",?"IBM_NASDAQ_20/4/05");??
    3. ...??
    4. producer.send(message);??

    2.6.3 JMS Selectors
    ??? JMS Selectors用于在訂閱中,基于消息屬性對(duì)進(jìn)行消息的過(guò)濾。JMS Selectors由SQL92語(yǔ)法定義。以下是個(gè)Selectors的例子:
    Java代碼
    1. consumer?=?session.createConsumer(destination,?"JMSType?=?'car'?AND?weight?>?2500");??
    ???? 在JMS Selectors表達(dá)式中,可以使用IN、NOT IN、LIKE等,例如:
    ??? LIKE '12%3' ('123' true,'12993' true,'1234' false)
    ??? LIKE 'l_se' ('lose' true,'loose' false)
    ??? LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
    ??? 需要注意的是,JMS Selectors表達(dá)式中的日期和時(shí)間需要使用標(biāo)準(zhǔn)的long型毫秒值。另外表達(dá)式中的屬性不會(huì)自動(dòng)進(jìn)行類型轉(zhuǎn)換,例如:
    Java代碼
    1. myMessage.setStringProperty("NumberOfOrders",?"2");??
    ??? "NumberOfOrders > 1" 求值結(jié)果是false。關(guān)于JMS Selectors的詳細(xì)文檔請(qǐng)參考javax.jms.Message的javadoc。
    ??? 上一小節(jié)介紹的Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理,但是卻不能確定被哪個(gè)consumer處理。在某些情況下,Message Groups可以和JMS Selector一起工作,例如:
    ??? 設(shè)想有三個(gè)consumers分別是A、B和C。你可以在producer中為消息設(shè)置三個(gè)message groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer A處理。需要注意的是,這種做法有以下缺點(diǎn):
    • producer必須知道當(dāng)前正在運(yùn)行的consumers,也就是說(shuō)producer和consumer被耦合到一起。
    • 如果某個(gè)consumer失效,那么應(yīng)該被這個(gè)consumer消費(fèi)的消息將會(huì)一直被積壓在broker上。

    2.6.4 Pending Message Limit Strategy
    ??? 首先簡(jiǎn)要介紹一下prefetch機(jī)制。ActiveMQ通過(guò)prefetch機(jī)制來(lái)提高性能,這意味這客戶端的內(nèi)存里可能會(huì)緩存一定數(shù)量的消息。緩存消 息的數(shù)量由prefetch limit來(lái)控制。當(dāng)某個(gè)consumer的prefetch buffer已經(jīng)達(dá)到上限,那么broker不會(huì)再向consumer分發(fā)消息,直到consumer向broker發(fā)送消息的確認(rèn)。可以通過(guò)在 ActiveMQConnectionFactory或者ActiveMQConnection上設(shè)置ActiveMQPrefetchPolicy對(duì)象 來(lái)配置prefetch policy。也可以通過(guò)connection options或者destination options來(lái)配置。例如:
    ??? tcp://localhost:61616?jms.prefetchPolicy.all=50
    ??? tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    ??? queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
    ??? prefetch size的缺省值如下:

    • persistent queues (default value: 1000)
    • non-persistent queues (default value: 1000)
    • persistent topics (default value: 100)
    • non-persistent topics (default value: Short.MAX_VALUE -1)

    ??? 慢消費(fèi)者會(huì)在非持久的topics上導(dǎo)致問(wèn)題:一旦消息積壓起來(lái),會(huì)導(dǎo)致broker把大量消息保存在內(nèi)存中,broker也會(huì)因此而變慢。未來(lái) ActiveMQ可能會(huì)實(shí)現(xiàn)磁盤(pán)緩存,但是這也還是會(huì)存在性能問(wèn)題。目前ActiveMQ使用Pending Message Limit Strategy來(lái)解決這個(gè)問(wèn)題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過(guò)這個(gè)上限后,新消息到來(lái)時(shí)會(huì)丟棄舊消息。通過(guò)在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:

    • ConstantPendingMessageLimitStrategy。這個(gè)策略使用常量限制。
      例如:<constantPendingMessageLimitStrategy limit="50"/>
    • PrefetchRatePendingMessageLimitStrategy。這個(gè)策略使用prefetch size的倍數(shù)限制。
      例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

    ?? 在以上兩種方式中,如果設(shè)置0意味著除了prefetch之外不再緩存消息;如果設(shè)置-1意味著禁止丟棄消息。?
    ??? 此外,你還可以配置消息的丟棄策略,目前有以下兩種:

    • oldestMessageEvictionStrategy。這個(gè)策略丟棄最舊的消息。
    • oldestMessageWithLowestPriorityEvictionStrategy。這個(gè)策略丟棄最舊的,而且具有最低優(yōu)先級(jí)的消息。

    ?? 以下是個(gè)ActiveMQ配置文件的例子:

    Xml代碼
    1. <broker?persistent="false"?brokerName="${brokername}"?xmlns="http://activemq.org/config/1.0">??
    2. ????<destinationPolicy>??
    3. ??????<policyMap>??
    4. ????????<policyEntries>??
    5. ??????????<policyEntry?topic="PRICES.>">??
    6. ????????????<!--??10?seconds?worth?-->??
    7. ????????????<subscriptionRecoveryPolicy>??
    8. ??????????????<timedSubscriptionRecoveryPolicy?recoverDuration="10000"?/>??
    9. ????????????</subscriptionRecoveryPolicy>??
    10. ??????????????
    11. ????????????<!--?lets?force?old?messages?to?be?discarded?for?slow?consumers?-->??
    12. ????????????<pendingMessageLimitStrategy>??
    13. ??????????????<constantPendingMessageLimitStrategy?limit="10"/>??
    14. ????????????</pendingMessageLimitStrategy>??
    15. ??????????</policyEntry>??
    16. ????????</policyEntries>??
    17. ??????</policyMap>??
    18. ????</destinationPolicy>??
    19. ????...??
    20. </broker>??

    ?

    2.6.5 Composite Destinations
    ??? 從1.1版本起, ActiveMQ支持composite destinations。它允許用一個(gè)虛擬的destination 代表多個(gè)destinations。例如你可以通過(guò)composite destinations在一個(gè)操作中同時(shí)向12個(gè)queue發(fā)送消息。在composite destinations中,多個(gè)destination之間采用","分割。例如:

    Java代碼
    1. Queue?queue?=?new?ActiveMQQueue("FOO.A,FOO.B,FOO.C");??

    ?? 如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:?

    Java代碼
    1. Queue?queue?=?new?ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");???

    ?? 以下是ActiveMQ配置文件進(jìn)行配置的一個(gè)例子:

    Xml代碼
    1. <destinationInterceptors>??
    2. ??<virtualDestinationInterceptor>??
    3. ????<virtualDestinations>??
    4. ??????<compositeQueue?name="MY.QUEUE">??
    5. ????????<forwardTo>??
    6. ??????????<queue?physicalName="FOO"?/>??
    7. ??????????<topic?physicalName="BAR"?/>??
    8. ????????</forwardTo>??
    9. ??????</compositeQueue>??
    10. ????</virtualDestinations>??
    11. ??</virtualDestinationInterceptor>??
    12. </destinationInterceptors>??

    ?? 可以在轉(zhuǎn)發(fā)前,先通過(guò)JMS Selector判斷一個(gè)消息是否需要轉(zhuǎn)發(fā),例如:

    Xml代碼
    1. <destinationInterceptors>??
    2. ??<virtualDestinationInterceptor>??
    3. ????<virtualDestinations>??
    4. ??????<compositeQueue?name="MY.QUEUE">??
    5. ????????<forwardTo>??
    6. ??????????<filteredDestination?selector="odd?=?'yes'"?queue="FOO"/>??
    7. ??????????<filteredDestination?selector="i?=?5"?topic="BAR"/>??
    8. ????????</forwardTo>??
    9. ??????</compositeQueue>??
    10. ????</virtualDestinations>??
    11. ??</virtualDestinationInterceptor>??
    12. </destinationInterceptors>??

    ?

    2.6.6 Mirrored Queues
    ??? 每個(gè)queue中的消息只能被一個(gè)consumer消費(fèi)。然而,有時(shí)候你可能希望能夠監(jiān)視生產(chǎn)者和消費(fèi)者之間的消息流。你可以通過(guò)使用Virtual Destinations 來(lái)建立一個(gè)virtual queue 來(lái)把消息轉(zhuǎn)發(fā)到多個(gè)queues中。但是 為系統(tǒng)中每個(gè)queue都進(jìn)行如此的配置可能會(huì)很麻煩。
    ??? ActiveMQ支持Mirrored Queues。Broker會(huì)把發(fā)送到某個(gè)queue的所有消息轉(zhuǎn)發(fā)到一個(gè)名稱類似的topic,因此監(jiān)控程序可以訂閱這個(gè)mirrored queue topic。為了啟用Mirrored Queues,首先要將BrokerService的useMirroredQueues屬性設(shè)置成true,然后可以通過(guò) destinationInterceptors設(shè)置其它屬性,如mirror topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個(gè)例子:

    Xml代碼
    1. <broker?xmlns="http://activemq.org/config/1.0"?brokerName="MirroredQueuesBroker1"?useMirroredQueues="true">??
    2. ??
    3. ??<transportConnectors>??
    4. ????<transportConnector?uri="tcp://localhost:61616"/>??
    5. ??</transportConnectors>??
    6. ????
    7. ??<destinationInterceptors>??
    8. ??????<mirroredQueue?copyMessage?=?"true"?prefix="Mirror.Topic"/>??
    9. ??</destinationInterceptors>??
    10. ??...??
    11. </broker>??
    ??? 假如某個(gè)producer向名為Foo.Bar的queue中發(fā)送消息,那么你可以通過(guò)訂閱名為Mirror.Topic.Foo.Bar的topic來(lái)獲得發(fā)送到Foo.Bar中的所有消息。
    posted on 2010-09-01 22:28 禮物 閱讀(1616) 評(píng)論(0)  編輯  收藏 所屬分類: ActiveMQ

    只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。

    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 野花香在线视频免费观看大全| 麻豆国产入口在线观看免费| 一级毛片完整版免费播放一区| 亚洲伊人久久大香线蕉结合| 亚洲av午夜福利精品一区人妖| 亚洲日韩精品无码专区网站| 国产成人精品免费视频软件| 国产青草视频在线观看免费影院| 亚洲精品白浆高清久久久久久 | 精品一区二区三区无码免费视频| av片在线观看永久免费| 久久亚洲精品11p| 亚洲AV成人无码网站| 色欲色香天天天综合网站免费| 免费女人高潮流视频在线观看| xxxxwww免费| 中文字幕无码视频手机免费看| 亚洲免费综合色在线视频| 在线亚洲人成电影网站色www | 亚洲永久精品ww47| 亚洲激情视频在线观看| 99ri精品国产亚洲| 亚洲1234区乱码| 亚洲精品国产精品| 九九全国免费视频| 最近免费中文字幕大全视频| 国产免费黄色大片| 精品亚洲一区二区三区在线观看 | 亚洲欧洲成人精品香蕉网| 亚洲色大18成人网站WWW在线播放| 日本亚洲中午字幕乱码| 一个人免费观看视频在线中文| 色窝窝免费一区二区三区| 亚洲AV无码第一区二区三区 | 国产黄色一级毛片亚洲黄片大全| 亚洲人成网站色在线观看| 看全免费的一级毛片| 手机看片国产免费永久| **毛片免费观看久久精品| 国产免费黄色大片| 亚洲欧洲AV无码专区|