<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Terry.Li-彬

    虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業。

      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

    2.6 Features
    ??? ActiveMQ包含了很多功能強大的特性,下面簡要介紹其中的幾個。
    2.6.1 Exclusive Consumer
    ??? Queue中的消息是按照順序被分發到consumers的。然而,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保 證。因為這些消息是被多個線程并發的處理。有的時候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結束之前執行更新這個訂單的操 作。
    ??? ActiveMQ從4.x版本起開始支持Exclusive Consumer (或者說Exclusive Queues)。 Broker會從多個consumers中挑選一個consumer來處理queue中所有的消息,從而保證了消息的有序處理。如果這個consumer 失效,那么broker會自動切換到其它的consumer。
    ??? 可以通過Destination Options 來創建一個Exclusive Consumer,如下:

    Java代碼
    1. queue?=? new ?ActiveMQQueue( "TEST.QUEUE?consumer.exclusive=true" );??
    2. consumer?=?session.createConsumer(queue);??
    ??? 順便說一下,可以給consumer設置優先級,以便針對網絡情況(如network hops)進行優化,如下:
    Java代碼
    1. queue?=?new?ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true?&consumer.priority=10");??

    ?

    2.6.2 Message Groups
    ??? 用Apache官方文檔的話說,Message Groups rock!它是Exclusive Consumer功能的增強。邏輯上,Message Groups 可以看成是一種并發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message group。Message Groups特性保證所有具有相同JMSXGroupID 的消息會被分發到相同的consumer(只要這個consumer保持active)。另外一方面,Message Groups特性也是一種負載均衡的機制。
    ??? 在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker 會檢查是否有某個consumer擁有這個message group。如果沒有,那么broker會選擇一個consumer,并將它關聯到這個message group。此后,這個consumer會接收這個message group的所有消息,直到:

    • Consumer被關閉。
    • Message group被關閉。通過發送一個消息,并設置這個消息的JMSXGroupSeq為0。

    ?? 從4.1版本開始,ActiveMQ支持一個布爾字段JMSXGroupFirstForConsumer 。當某個message group的第一個消息被發送到consumer的時候,這個字段被設置。如果客戶使用failover transport連接到broker。在由于網絡問題等造成客戶重新連接到broker的時候,相同message group的消息可能會被分發到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會被重新設置。?

    ?? 以下是使用message groups的例子:

    Java代碼
    1. Mesasge?message?=?session.createTextMessage("<foo>hey</foo>");??
    2. message.setStringProperty("JMSXGroupID",?"IBM_NASDAQ_20/4/05");??
    3. ...??
    4. producer.send(message);??

    2.6.3 JMS Selectors
    ??? JMS Selectors用于在訂閱中,基于消息屬性對進行消息的過濾。JMS Selectors由SQL92語法定義。以下是個Selectors的例子:
    Java代碼
    1. consumer?=?session.createConsumer(destination,?"JMSType?=?'car'?AND?weight?>?2500");??
    ???? 在JMS Selectors表達式中,可以使用IN、NOT IN、LIKE等,例如:
    ??? LIKE '12%3' ('123' true,'12993' true,'1234' false)
    ??? LIKE 'l_se' ('lose' true,'loose' false)
    ??? LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
    ??? 需要注意的是,JMS Selectors表達式中的日期和時間需要使用標準的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:
    Java代碼
    1. myMessage.setStringProperty("NumberOfOrders",?"2");??
    ??? "NumberOfOrders > 1" 求值結果是false。關于JMS Selectors的詳細文檔請參考javax.jms.Message的javadoc。
    ??? 上一小節介紹的Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理,但是卻不能確定被哪個consumer處理。在某些情況下,Message Groups可以和JMS Selector一起工作,例如:
    ??? 設想有三個consumers分別是A、B和C。你可以在producer中為消息設置三個message groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer A處理。需要注意的是,這種做法有以下缺點:
    • producer必須知道當前正在運行的consumers,也就是說producer和consumer被耦合到一起。
    • 如果某個consumer失效,那么應該被這個consumer消費的消息將會一直被積壓在broker上。

    2.6.4 Pending Message Limit Strategy
    ??? 首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這客戶端的內存里可能會緩存一定數量的消息。緩存消 息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer分發消息,直到consumer向broker發送消息的確認。可以通過在 ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象 來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
    ??? tcp://localhost:61616?jms.prefetchPolicy.all=50
    ??? tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    ??? queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
    ??? prefetch size的缺省值如下:

    • persistent queues (default value: 1000)
    • non-persistent queues (default value: 1000)
    • persistent topics (default value: 100)
    • non-persistent topics (default value: Short.MAX_VALUE -1)

    ??? 慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。未來 ActiveMQ可能會實現磁盤緩存,但是這也還是會存在性能問題。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限后,新消息到來時會丟棄舊消息。通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:

    • ConstantPendingMessageLimitStrategy。這個策略使用常量限制。
      例如:<constantPendingMessageLimitStrategy limit="50"/>
    • PrefetchRatePendingMessageLimitStrategy。這個策略使用prefetch size的倍數限制。
      例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

    ?? 在以上兩種方式中,如果設置0意味著除了prefetch之外不再緩存消息;如果設置-1意味著禁止丟棄消息。?
    ??? 此外,你還可以配置消息的丟棄策略,目前有以下兩種:

    • oldestMessageEvictionStrategy。這個策略丟棄最舊的消息。
    • oldestMessageWithLowestPriorityEvictionStrategy。這個策略丟棄最舊的,而且具有最低優先級的消息。

    ?? 以下是個ActiveMQ配置文件的例子:

    Xml代碼
    1. <broker?persistent="false"?brokerName="${brokername}"?xmlns="http://activemq.org/config/1.0">??
    2. ????<destinationPolicy>??
    3. ??????<policyMap>??
    4. ????????<policyEntries>??
    5. ??????????<policyEntry?topic="PRICES.>">??
    6. ????????????<!--??10?seconds?worth?-->??
    7. ????????????<subscriptionRecoveryPolicy>??
    8. ??????????????<timedSubscriptionRecoveryPolicy?recoverDuration="10000"?/>??
    9. ????????????</subscriptionRecoveryPolicy>??
    10. ??????????????
    11. ????????????<!--?lets?force?old?messages?to?be?discarded?for?slow?consumers?-->??
    12. ????????????<pendingMessageLimitStrategy>??
    13. ??????????????<constantPendingMessageLimitStrategy?limit="10"/>??
    14. ????????????</pendingMessageLimitStrategy>??
    15. ??????????</policyEntry>??
    16. ????????</policyEntries>??
    17. ??????</policyMap>??
    18. ????</destinationPolicy>??
    19. ????...??
    20. </broker>??

    ?

    2.6.5 Composite Destinations
    ??? 從1.1版本起, ActiveMQ支持composite destinations。它允許用一個虛擬的destination 代表多個destinations。例如你可以通過composite destinations在一個操作中同時向12個queue發送消息。在composite destinations中,多個destination之間采用","分割。例如:

    Java代碼
    1. Queue?queue?=?new?ActiveMQQueue("FOO.A,FOO.B,FOO.C");??

    ?? 如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:?

    Java代碼
    1. Queue?queue?=?new?ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");???

    ?? 以下是ActiveMQ配置文件進行配置的一個例子:

    Xml代碼
    1. <destinationInterceptors>??
    2. ??<virtualDestinationInterceptor>??
    3. ????<virtualDestinations>??
    4. ??????<compositeQueue?name="MY.QUEUE">??
    5. ????????<forwardTo>??
    6. ??????????<queue?physicalName="FOO"?/>??
    7. ??????????<topic?physicalName="BAR"?/>??
    8. ????????</forwardTo>??
    9. ??????</compositeQueue>??
    10. ????</virtualDestinations>??
    11. ??</virtualDestinationInterceptor>??
    12. </destinationInterceptors>??

    ?? 可以在轉發前,先通過JMS Selector判斷一個消息是否需要轉發,例如:

    Xml代碼
    1. <destinationInterceptors>??
    2. ??<virtualDestinationInterceptor>??
    3. ????<virtualDestinations>??
    4. ??????<compositeQueue?name="MY.QUEUE">??
    5. ????????<forwardTo>??
    6. ??????????<filteredDestination?selector="odd?=?'yes'"?queue="FOO"/>??
    7. ??????????<filteredDestination?selector="i?=?5"?topic="BAR"/>??
    8. ????????</forwardTo>??
    9. ??????</compositeQueue>??
    10. ????</virtualDestinations>??
    11. ??</virtualDestinationInterceptor>??
    12. </destinationInterceptors>??

    ?

    2.6.6 Mirrored Queues
    ??? 每個queue中的消息只能被一個consumer消費。然而,有時候你可能希望能夠監視生產者和消費者之間的消息流。你可以通過使用Virtual Destinations 來建立一個virtual queue 來把消息轉發到多個queues中。但是 為系統中每個queue都進行如此的配置可能會很麻煩。
    ??? ActiveMQ支持Mirrored Queues。Broker會把發送到某個queue的所有消息轉發到一個名稱類似的topic,因此監控程序可以訂閱這個mirrored queue topic。為了啟用Mirrored Queues,首先要將BrokerService的useMirroredQueues屬性設置成true,然后可以通過 destinationInterceptors設置其它屬性,如mirror topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個例子:

    Xml代碼
    1. <broker?xmlns="http://activemq.org/config/1.0"?brokerName="MirroredQueuesBroker1"?useMirroredQueues="true">??
    2. ??
    3. ??<transportConnectors>??
    4. ????<transportConnector?uri="tcp://localhost:61616"/>??
    5. ??</transportConnectors>??
    6. ????
    7. ??<destinationInterceptors>??
    8. ??????<mirroredQueue?copyMessage?=?"true"?prefix="Mirror.Topic"/>??
    9. ??</destinationInterceptors>??
    10. ??...??
    11. </broker>??
    ??? 假如某個producer向名為Foo.Bar的queue中發送消息,那么你可以通過訂閱名為Mirror.Topic.Foo.Bar的topic來獲得發送到Foo.Bar中的所有消息。
    posted on 2010-09-01 22:28 禮物 閱讀(1616) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
    主站蜘蛛池模板: 亚洲国产V高清在线观看| 久久久久亚洲av成人无码电影| 亚洲成AV人片高潮喷水| 亚洲av区一区二区三| 大地资源中文在线观看免费版| 亚洲精品电影天堂网| 免费久久精品国产片香蕉| 四虎国产精品免费永久在线| 波多野结衣亚洲一级| 国产成人精品日本亚洲专区| 希望影院高清免费观看视频| 免费无码一区二区| 亚洲免费视频网址| 美腿丝袜亚洲综合| 成人免费在线观看网站| 深夜国产福利99亚洲视频| 午夜不卡久久精品无码免费| 鲁死你资源站亚洲av| 亚洲国产成人私人影院| 免费v片在线观看品善网| 在线观看的免费网站无遮挡| 性生大片视频免费观看一级| 亚洲乱码一二三四五六区| 亚洲女同成av人片在线观看| 韩国18福利视频免费观看| 国产好大好硬好爽免费不卡| 日韩亚洲综合精品国产| 亚洲国产电影在线观看| 亚洲成av人在线视| 亚洲av麻豆aⅴ无码电影| 久久电影网午夜鲁丝片免费| 日本免费人成网ww555在线| 深夜福利在线免费观看| 日韩亚洲产在线观看| 91嫩草亚洲精品| 亚洲成AV人片在线观看无| 亚洲欧洲自拍拍偷精品 美利坚| 在线免费观看一区二区三区| 最近中文字幕大全中文字幕免费| 男女拍拍拍免费视频网站| 免费激情网站国产高清第一页|