2.6 Features
??? ActiveMQ包含了很多功能強(qiáng)大的特性,下面簡(jiǎn)要介紹其中的幾個(gè)。
2.6.1 Exclusive Consumer
???
Queue中的消息是按照順序被分發(fā)到consumers的。然而,當(dāng)你有多個(gè)consumers同時(shí)從相同的queue中提取消息時(shí),你將失去這個(gè)保
證。因?yàn)檫@些消息是被多個(gè)線程并發(fā)的處理。有的時(shí)候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結(jié)束之前執(zhí)行更新這個(gè)訂單的操
作。
??? ActiveMQ從4.x版本起開(kāi)始支持Exclusive Consumer (或者說(shuō)Exclusive Queues)。
Broker會(huì)從多個(gè)consumers中挑選一個(gè)consumer來(lái)處理queue中所有的消息,從而保證了消息的有序處理。如果這個(gè)consumer
失效,那么broker會(huì)自動(dòng)切換到其它的consumer。
??? 可以通過(guò)Destination Options 來(lái)創(chuàng)建一個(gè)Exclusive Consumer,如下:
-
queue?=?
new
?ActiveMQQueue(
"TEST.QUEUE?consumer.exclusive=true"
);??
-
consumer?=?session.createConsumer(queue);??
??? 順便說(shuō)一下,可以給consumer設(shè)置優(yōu)先級(jí),以便針對(duì)網(wǎng)絡(luò)情況(如network hops)進(jìn)行優(yōu)化,如下:
- queue?=?new?ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true?&consumer.priority=10");??
?
2.6.2 Message Groups
???
用Apache官方文檔的話說(shuō),Message Groups rock!它是Exclusive
Consumer功能的增強(qiáng)。邏輯上,Message Groups 可以看成是一種并發(fā)的Exclusive
Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來(lái)區(qū)分message
group。Message Groups特性保證所有具有相同JMSXGroupID
的消息會(huì)被分發(fā)到相同的consumer(只要這個(gè)consumer保持active)。另外一方面,Message
Groups特性也是一種負(fù)載均衡的機(jī)制。
???
在一個(gè)消息被分發(fā)到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker
會(huì)檢查是否有某個(gè)consumer擁有這個(gè)message
group。如果沒(méi)有,那么broker會(huì)選擇一個(gè)consumer,并將它關(guān)聯(lián)到這個(gè)message
group。此后,這個(gè)consumer會(huì)接收這個(gè)message group的所有消息,直到:
- Consumer被關(guān)閉。
- Message group被關(guān)閉。通過(guò)發(fā)送一個(gè)消息,并設(shè)置這個(gè)消息的JMSXGroupSeq為0。
??
從4.1版本開(kāi)始,ActiveMQ支持一個(gè)布爾字段JMSXGroupFirstForConsumer 。當(dāng)某個(gè)message
group的第一個(gè)消息被發(fā)送到consumer的時(shí)候,這個(gè)字段被設(shè)置。如果客戶使用failover
transport連接到broker。在由于網(wǎng)絡(luò)問(wèn)題等造成客戶重新連接到broker的時(shí)候,相同message
group的消息可能會(huì)被分發(fā)到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會(huì)被重新設(shè)置。?
?? 以下是使用message groups的例子:
- Mesasge?message?=?session.createTextMessage("<foo>hey</foo>");??
- message.setStringProperty("JMSXGroupID",?"IBM_NASDAQ_20/4/05");??
- ...??
- producer.send(message);??
2.6.3 JMS Selectors
??? JMS Selectors用于在訂閱中,基于消息屬性對(duì)進(jìn)行消息的過(guò)濾。JMS Selectors由SQL92語(yǔ)法定義。以下是個(gè)Selectors的例子:
- consumer?=?session.createConsumer(destination,?"JMSType?=?'car'?AND?weight?>?2500");??
???? 在JMS Selectors表達(dá)式中,可以使用IN、NOT IN、LIKE等,例如:
??? LIKE '12%3' ('123' true,'12993' true,'1234' false)
??? LIKE 'l_se' ('lose' true,'loose' false)
??? LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
??? 需要注意的是,JMS Selectors表達(dá)式中的日期和時(shí)間需要使用標(biāo)準(zhǔn)的long型毫秒值。另外表達(dá)式中的屬性不會(huì)自動(dòng)進(jìn)行類型轉(zhuǎn)換,例如:
- myMessage.setStringProperty("NumberOfOrders",?"2");??
??? "NumberOfOrders > 1" 求值結(jié)果是false。關(guān)于JMS Selectors的詳細(xì)文檔請(qǐng)參考javax.jms.Message的javadoc。
???
上一小節(jié)介紹的Message Groups雖然可以保證具有相同message
group的消息被唯一的consumer順序處理,但是卻不能確定被哪個(gè)consumer處理。在某些情況下,Message
Groups可以和JMS Selector一起工作,例如:
???
設(shè)想有三個(gè)consumers分別是A、B和C。你可以在producer中為消息設(shè)置三個(gè)message
groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID =
'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer
A處理。需要注意的是,這種做法有以下缺點(diǎn):
- producer必須知道當(dāng)前正在運(yùn)行的consumers,也就是說(shuō)producer和consumer被耦合到一起。
- 如果某個(gè)consumer失效,那么應(yīng)該被這個(gè)consumer消費(fèi)的消息將會(huì)一直被積壓在broker上。
2.6.4 Pending Message Limit Strategy
???
首先簡(jiǎn)要介紹一下prefetch機(jī)制。ActiveMQ通過(guò)prefetch機(jī)制來(lái)提高性能,這意味這客戶端的內(nèi)存里可能會(huì)緩存一定數(shù)量的消息。緩存消
息的數(shù)量由prefetch limit來(lái)控制。當(dāng)某個(gè)consumer的prefetch
buffer已經(jīng)達(dá)到上限,那么broker不會(huì)再向consumer分發(fā)消息,直到consumer向broker發(fā)送消息的確認(rèn)。可以通過(guò)在
ActiveMQConnectionFactory或者ActiveMQConnection上設(shè)置ActiveMQPrefetchPolicy對(duì)象
來(lái)配置prefetch policy。也可以通過(guò)connection options或者destination options來(lái)配置。例如:
??? tcp://localhost:61616?jms.prefetchPolicy.all=50
??? tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
??? queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
??? prefetch size的缺省值如下:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE -1)
???
慢消費(fèi)者會(huì)在非持久的topics上導(dǎo)致問(wèn)題:一旦消息積壓起來(lái),會(huì)導(dǎo)致broker把大量消息保存在內(nèi)存中,broker也會(huì)因此而變慢。未來(lái)
ActiveMQ可能會(huì)實(shí)現(xiàn)磁盤(pán)緩存,但是這也還是會(huì)存在性能問(wèn)題。目前ActiveMQ使用Pending Message Limit
Strategy來(lái)解決這個(gè)問(wèn)題。除了prefetch
buffer之外,你還要配置緩存消息的上限,超過(guò)這個(gè)上限后,新消息到來(lái)時(shí)會(huì)丟棄舊消息。通過(guò)在配置文件的destination
map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:
- ConstantPendingMessageLimitStrategy。這個(gè)策略使用常量限制。
例如:<constantPendingMessageLimitStrategy limit="50"/> - PrefetchRatePendingMessageLimitStrategy。這個(gè)策略使用prefetch size的倍數(shù)限制。
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
?? 在以上兩種方式中,如果設(shè)置0意味著除了prefetch之外不再緩存消息;如果設(shè)置-1意味著禁止丟棄消息。?
??? 此外,你還可以配置消息的丟棄策略,目前有以下兩種:
- oldestMessageEvictionStrategy。這個(gè)策略丟棄最舊的消息。
- oldestMessageWithLowestPriorityEvictionStrategy。這個(gè)策略丟棄最舊的,而且具有最低優(yōu)先級(jí)的消息。
?? 以下是個(gè)ActiveMQ配置文件的例子:
- <broker?persistent="false"?brokerName="${brokername}"?xmlns="http://activemq.org/config/1.0">??
- ????<destinationPolicy>??
- ??????<policyMap>??
- ????????<policyEntries>??
- ??????????<policyEntry?topic="PRICES.>">??
- ??????????????
- ????????????<subscriptionRecoveryPolicy>??
- ??????????????<timedSubscriptionRecoveryPolicy?recoverDuration="10000"?/>??
- ????????????</subscriptionRecoveryPolicy>??
- ??????????????
- ??????????????
- ????????????<pendingMessageLimitStrategy>??
- ??????????????<constantPendingMessageLimitStrategy?limit="10"/>??
- ????????????</pendingMessageLimitStrategy>??
- ??????????</policyEntry>??
- ????????</policyEntries>??
- ??????</policyMap>??
- ????</destinationPolicy>??
- ????...??
- </broker>??
?
2.6.5 Composite Destinations
???
從1.1版本起, ActiveMQ支持composite destinations。它允許用一個(gè)虛擬的destination
代表多個(gè)destinations。例如你可以通過(guò)composite
destinations在一個(gè)操作中同時(shí)向12個(gè)queue發(fā)送消息。在composite
destinations中,多個(gè)destination之間采用","分割。例如:
- Queue?queue?=?new?ActiveMQQueue("FOO.A,FOO.B,FOO.C");??
?? 如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:?
- Queue?queue?=?new?ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");???
?? 以下是ActiveMQ配置文件進(jìn)行配置的一個(gè)例子:
- <destinationInterceptors>??
- ??<virtualDestinationInterceptor>??
- ????<virtualDestinations>??
- ??????<compositeQueue?name="MY.QUEUE">??
- ????????<forwardTo>??
- ??????????<queue?physicalName="FOO"?/>??
- ??????????<topic?physicalName="BAR"?/>??
- ????????</forwardTo>??
- ??????</compositeQueue>??
- ????</virtualDestinations>??
- ??</virtualDestinationInterceptor>??
- </destinationInterceptors>??
?? 可以在轉(zhuǎn)發(fā)前,先通過(guò)JMS Selector判斷一個(gè)消息是否需要轉(zhuǎn)發(fā),例如:
- <destinationInterceptors>??
- ??<virtualDestinationInterceptor>??
- ????<virtualDestinations>??
- ??????<compositeQueue?name="MY.QUEUE">??
- ????????<forwardTo>??
- ??????????<filteredDestination?selector="odd?=?'yes'"?queue="FOO"/>??
- ??????????<filteredDestination?selector="i?=?5"?topic="BAR"/>??
- ????????</forwardTo>??
- ??????</compositeQueue>??
- ????</virtualDestinations>??
- ??</virtualDestinationInterceptor>??
- </destinationInterceptors>??
?
2.6.6 Mirrored Queues
???
每個(gè)queue中的消息只能被一個(gè)consumer消費(fèi)。然而,有時(shí)候你可能希望能夠監(jiān)視生產(chǎn)者和消費(fèi)者之間的消息流。你可以通過(guò)使用Virtual
Destinations 來(lái)建立一個(gè)virtual queue 來(lái)把消息轉(zhuǎn)發(fā)到多個(gè)queues中。但是
為系統(tǒng)中每個(gè)queue都進(jìn)行如此的配置可能會(huì)很麻煩。
??? ActiveMQ支持Mirrored
Queues。Broker會(huì)把發(fā)送到某個(gè)queue的所有消息轉(zhuǎn)發(fā)到一個(gè)名稱類似的topic,因此監(jiān)控程序可以訂閱這個(gè)mirrored queue
topic。為了啟用Mirrored
Queues,首先要將BrokerService的useMirroredQueues屬性設(shè)置成true,然后可以通過(guò)
destinationInterceptors設(shè)置其它屬性,如mirror
topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個(gè)例子:
- <broker?xmlns="http://activemq.org/config/1.0"?brokerName="MirroredQueuesBroker1"?useMirroredQueues="true">??
- ??
- ??<transportConnectors>??
- ????<transportConnector?uri="tcp://localhost:61616"/>??
- ??</transportConnectors>??
- ????
- ??<destinationInterceptors>??
- ??????<mirroredQueue?copyMessage?=?"true"?prefix="Mirror.Topic"/>??
- ??</destinationInterceptors>??
- ??...??
- </broker>??
??? 假如某個(gè)producer向名為Foo.Bar的queue中發(fā)送消息,那么你可以通過(guò)訂閱名為Mirror.Topic.Foo.Bar的topic來(lái)獲得發(fā)送到Foo.Bar中的所有消息。