@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "foo", value = "bar"),
@Argument(name = "baz")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
注意隊列的x-message-ttl
參數(shù)設(shè)為了10秒鐘,因為參數(shù)類型不是String
, 因此我們指定了它的類型,在這里是Integer
.有了這些聲明后,如果隊列已經(jīng)存在了,參數(shù)必須匹配現(xiàn)有隊列上的參數(shù).對于header交換器,我們設(shè)置binding arguments 要匹配頭中foo為bar,且baz可為任意值的消息. x-match
參數(shù)則意味著必須同時滿足兩個條件.
參數(shù)名稱,參數(shù)值,及類型可以是屬性占位符(${...}
) 或SpEL 表達式(#{...}
). name
必須要能解析為String
; type的表達式必須能解析為Class
或類的全限定名. value
必須能由DefaultConversionService
類型進行轉(zhuǎn)換(如上面例子中x-message-ttl
).
如果name 解析為null
或空字符串,那么將忽略 @Argument
.
元注解(Meta-Annotations)
有時,你想將同樣的配置用于多個監(jiān)聽器上. 為減少重復(fù)配置,你可以使用元注解來創(chuàng)建你自己的監(jiān)聽器注解:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public@interface MyAnonFanoutListener {
}
public class MetaListener {
@MyAnonFanoutListener
public void handle1(String foo) {
...
}
@MyAnonFanoutListener
public void handle2(String foo) {
...
}
}
在這個例子中,每個通過@MyAnonFanoutListener創(chuàng)建的監(jiān)聽器都會綁定一個匿名,自動刪除的隊列到fanout交換器 metaFanout上
. 元注解機制是簡單的,在那些用戶定義注解中的屬性是不會經(jīng)過檢查的- 因此你不能從元注解中覆蓋設(shè)置.當(dāng)需要高級配置時,使用一般的 @Bean
定義.
Enable Listener Endpoint Annotations
為了啟用 @RabbitListener
注解,需要在你的某個@Configuration類中添加@EnableRabbit
注解.
@Configuration
@EnableRabbit
publicclass AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
默認(rèn)情況下,基礎(chǔ)設(shè)施會查找一個名為rabbitListenerContainerFactory
的bean作為工廠來源來創(chuàng)建消息監(jiān)聽器容器. 在這種情況下,會忽略RabbitMQ 基礎(chǔ)設(shè)施計劃, processOrder
方法可使用核心輪詢大小為3個線程最大10個線程的池大小來調(diào)用.
可通過使用注解或?qū)崿F(xiàn)RabbitListenerConfigurer
接口來自定義監(jiān)聽器容器工廠. 默認(rèn)只需要注冊至少一個Endpoints,而不需要一個特定的容器工廠.查看javadoc來了解詳情和例子.
如果你更喜歡XML配置,可使用 <rabbit:annotation-driven>
元素.
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers "value="3"/>
<property name="maxConcurrentConsumers"value="10"/>
</bean>
注解方法的消息轉(zhuǎn)換
在調(diào)用監(jiān)聽器之前,在管道中有兩個轉(zhuǎn)換步驟. 第一個使用 MessageConverter
來將傳入的Spring AMQP Message
轉(zhuǎn)換成spring-消息系統(tǒng)的消息. 當(dāng)目標(biāo)方法調(diào)用時,消息負載將被轉(zhuǎn)換,如果有必要,也會參考消息參數(shù)類型來進行.
第一步中的默認(rèn) MessageConverter
是一個Spring AMQP SimpleMessageConverter
,它可以處理String
和 java.io.Serializable對象之間的轉(zhuǎn)換
; 其它所有的將保留為byte[]
. 在下面的討論中,我們稱其為消息轉(zhuǎn)換器.
第二個步驟的默認(rèn)轉(zhuǎn)換器是GenericMessageConverter
,它將委派給轉(zhuǎn)換服務(wù)(DefaultFormattingConversionService的實例
). 在下面的討論中,我們稱其為方法參數(shù)轉(zhuǎn)換器.
要改變消息轉(zhuǎn)換器,可在連接工廠bean中設(shè)置其相關(guān)屬性:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}
這配置了一個Jackson2 轉(zhuǎn)換器,希望頭信息能通過它來指導(dǎo)轉(zhuǎn)換.
你也可以考慮使用ContentTypeDelegatingMessageConverter
,它可以處理不同內(nèi)容類型的轉(zhuǎn)換.
大多數(shù)情況下,沒有必要來定制方法參數(shù)轉(zhuǎn)換器,除非你想要用自定義的ConversionService
.
在1.6版本之前,用于轉(zhuǎn)換JSON的類型信息必須在消息頭中提供或者需要一個自定義的ClassMapper
. 從1.6版本開始,如果沒有類型信息頭,類型可根據(jù)目標(biāo)方法參數(shù)推斷.
類型推斷只能用于 @RabbitListener
的方法級.
參考 the section called “Jackson2JsonMessageConverter” 來了解更多信息.
如果您希望自定義方法參數(shù)轉(zhuǎn)換器,您可以這樣做如下:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
...
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}
@Bean
public ConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
編程式 Endpoint 注冊
RabbitListenerEndpoint
提供了一個Rabbit endpoint 模型并負責(zé)為那個模型配置容器.除了通過RabbitListener注解檢測外,這個基礎(chǔ)設(shè)施允許你通過編程來配置endpoints.
@Configuration
@EnableRabbit
publicclass AppConfig implements RabbitListenerConfigurer {
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在上面的例子中,我們使用了SimpleRabbitListenerEndpoint
(它使用MessageListener
來進行處理),但你也可以構(gòu)建你自己的endpoint變種來描述自定義的調(diào)用機制.
應(yīng)該指出的是,你也可以跳過@RabbitListener
的使用,通過RabbitListenerConfigurer來編程注冊你的endpoints.
Annotated Endpoint Method Signature
到目前為止,我們已經(jīng)在我們的端點上注入了一個簡單的字符串,但它實際上可以有一個非常靈活的方法簽名。讓我們重寫它,以一個自定義的頭來控制注入順序:
@Component
publicclass MyService {
@RabbitListener(queues = "myQueue")
publicvoid processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
下面是你可以在監(jiān)聽端點上注入的主要元素:
原生org.springframework.amqp.core.Message
.
用于接收消息的
com.rabbitmq.client.Channel
org.springframework.messaging.Message
代表的是傳入的AMQP消息.注意,這個消息持有自定義和標(biāo)準(zhǔn)的頭部信息 (AmqpHeaders定義
).
從1.6版本開始, 入站deliveryMode
頭可以AmqpHeaders.RECEIVED_DELIVERY_MODE
使用,代替了AmqpHeaders.DELIVERY_MODE
.
@Header
-注解方法參數(shù)可 提取一個特定頭部值,包括標(biāo)準(zhǔn)的AMQP頭.
@Headers
-注解參數(shù)為了訪問所有頭信息,必須能指定為java.util.Map
.
非注解元素(非支持類型(如. Message
和Channel
))可認(rèn)為是負荷(payload).你可以使用 @Payload來明確標(biāo)識
. 你也可以添加額外的 @Valid來進行驗證
.
注入Spring消息抽象的能力是特別有用的,它可受益于存儲在特定傳輸消息中的信息,而不需要依賴于特定傳輸API.
@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}
方法參數(shù)的處理是由DefaultMessageHandlerMethodFactory
提供的,它可以更進一步地定制以支持其它的方法參數(shù). 轉(zhuǎn)換和驗證支持也可以定制.
例如,如果我們想確保我們的Order在處理之前是有效的,我們可以使用@Valid
來注解負荷,并配置必須驗證器,就像下面這樣:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
@Override
publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
監(jiān)聽多個隊列
當(dāng)使用queues
屬性時,你可以指定相關(guān)的容器來監(jiān)聽多個隊列. 你可以使用 @Header
注解來指定對于那些隊列中收到的消息對POJO方法可用:
@Component
public class MyService {
@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
從1.5版本開始,隊列名稱可以使用屬性占位符和SpEL:
@Component
public class MyService {
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
在1.5版本之前,只有單個隊列可以這種方法進行指定,每個隊列需要一個單獨的屬性.
回復(fù)管理
MessageListenerAdapter
現(xiàn)有的支持已經(jīng)允許你的方法有一個非void的返回類型.在這種情況下,調(diào)用的結(jié)果被封裝在一個發(fā)送消息中,其消息發(fā)送地址要么是原始消息的ReplyToAddress頭指定的地址要么是監(jiān)聽器上配置的默認(rèn)地址.默認(rèn)地址現(xiàn)在可通過@SendTo
注解進行設(shè)置.
假設(shè)我們的processOrder
方法現(xiàn)在需要返回一個OrderStatus
, 可將其寫成下面這樣來自動發(fā)送一個回復(fù):
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你需要以傳輸獨立的方式來設(shè)置其它頭,你可以返回Message
,就像這樣:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
@SendTo
值按照exchange/routingKey模式(其中的一部分可以省略)來作為對exchange
和 routingKey
的回復(fù).有效值為:
foo/bar
- 以交換器和路由鍵進行回復(fù).
foo/
- 以交換器和默認(rèn)路由鍵進行回復(fù).
bar
or /bar
- 以路由鍵和默認(rèn)交換器進行回復(fù).
/
or empty - 以默認(rèn)交換器和默認(rèn)路由鍵進行回復(fù).
@SendTo
也可以沒有value
屬性. 這種情況等價于空的sendTo 模式. @SendTo
只能應(yīng)用于沒有replyToAddress
屬性的入站消息中.
從1.5版本開始, @SendTo
值可以通過bean SpEL 表達式初始化,例如…
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return"test.sendTo.reply.spel";
}
表達式必須能評估為String
,它可以是簡單的隊列名稱(將發(fā)送到默認(rèn)交換器中) 或者是上面談到的exchange/routingKey
形式.
在初始化時,#{...}
表達式只評估一次.
對于動態(tài)路由回復(fù),消息發(fā)送者應(yīng)該包含一個reply_to
消息屬性或使用運行時SpEL 表達式.
從1.6版本開始, @SendTo
可以是SpEL 表達式,它可在運行時根據(jù)請求和回復(fù)來評估:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表達式的運行時性質(zhì)是由 !{...}
定界符表示的. 表達式評估上下文的#root
對象有三個屬性:
request
- o.s.amqp.core.Message
請求對象.source
- 轉(zhuǎn)換后的 o.s.messaging.Message<?>
.result
- 方法結(jié)果.
上下文有一個map 屬性訪問器,標(biāo)準(zhǔn)類型轉(zhuǎn)換器以及一個bean解析器,允許引用上下文中的其它beans (如.@someBeanName.determineReplyQ(request, result)
).
總結(jié)一下, #{...}
只在初始化的時候評估一次, #root
對象代表的是應(yīng)用程序上下文; beans可通過其名稱來引用. !{...}
會在運行時,對于每個消息,都將使用root對象的屬性進行評估,bean可以使用其名稱進行引用,前輟為@
.
多方法監(jiān)聽器
從1.5.0版本開始,@RabbitListener
注解現(xiàn)在可以在類級上進行指定.與新的@RabbitHandler
注解一起,基于傳入消息的負荷類型,這可以允許在單個監(jiān)聽器上調(diào)用不同的方法.這可以用一個例子來描述:
@RabbitListener(id="multi", queues = "someQueue")
publicclass MultiListenerBean {
@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}
@RabbitHandler
public String baz(Baz baz) {
...
}
@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}
}
在這種情況下,獨立的 @RabbitHandler
方法會被調(diào)用,如果轉(zhuǎn)換后負荷是Bar
, Baz
或Qux
. 理解基于負荷類型系統(tǒng)來確定唯一方法是很重要的.類型檢查是通過單個無注解參數(shù)來執(zhí)行的,否則就要使用@Payload
進行注解. 注意同樣的方法簽名可應(yīng)用于方法級 @RabbitListener
之上.
注意,如果有必要,需要在每個方法上指定@SendTo,
在類級上它是不支持的.
@Repeatable @RabbitListener
從1.6版本開始,@RabbitListener
注解可用 @Repeatable進行標(biāo)記
. 這就是說,這個注解可多次出現(xiàn)在相同的注解元素上(方法或類).在這種情況下,對于每個注解,都會創(chuàng)建獨立的監(jiān)聽容器,它們每個都會調(diào)用相同的監(jiān)聽器@Bean
. Repeatable 注解能用于 Java 8+;當(dāng)在Java 7-使用時,同樣的效果可以使用 @RabbitListeners
"container" 注解(包含@RabbitListener注解的數(shù)組)來達到.
Proxy @RabbitListener and Generics
如果你的服務(wù)是用于代理(如,在 @Transactional的情況中
) ,當(dāng)接口有泛型參數(shù)時,需要要一些考慮.要有一個泛型接口和特定實現(xiàn),如:
interface TxService<P> {
String handle(P payload, String header);
}
static class TxServiceImpl implements TxService<Foo> {
@Override
@RabbitListener(...)
public String handle(Foo foo, String rk) {
...
}
}
你被迫切換到CGLIB目標(biāo)類代理,因為接口handle方法的實際實現(xiàn)只是一個橋接方法.在事務(wù)管理的情況下, CGLIB是通過注解選項來配置的- @EnableTransactionManagement(proxyTargetClass = true)
. 在這種情況下,所有注解都需要在實現(xiàn)類的目標(biāo)方法上進行聲明:
static class TxServiceImpl implements TxService<Foo> {
@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
容器管理
由注解創(chuàng)建的容器不會在上下文中進行注冊.你可以調(diào)用 RabbitListenerEndpointRegistry的getListenerContainers()方法來獲取所有容器集合.然后,你可以迭代這個集合,例如,停止/啟動所有容器或調(diào)用在其注冊上調(diào)用Lifecycle
方法(調(diào)用每個容器中的操作).
你也可以使用id來獲取單個容器的引用,即 getListenerContainer(String id)
; 例如registry.getListenerContainer("multi")
.
從1.5.2版本開始,你可以調(diào)用getListenerContainerIds()方法來獲取所有注冊容器的id.
從1.5版本開始,你可在RabbitListener端點上為容器分配一個組(group).
這提供了一種機制來獲取子集容器的引用
; 添加一個group
屬性會使Collection<MessageListenerContainer>
類型的bean使用組名稱注冊在上下文中.
一些不同的線程可與異步消費者關(guān)聯(lián)。
當(dāng)RabbitMQ Client投遞消息時,來自于
SimpleMessageListener
配置的TaskExecutor中的線程會調(diào)用MessageListener.如果沒有配置,將會使用SimpleAsyncTaskExecutor
. 如果使用了池化的executor,須確保池大小可以支撐并發(fā)處理.
當(dāng)使用默認(rèn)SimpleAsyncTaskExecutor
時,對于調(diào)用監(jiān)聽器的線程,監(jiān)聽器容器的beanName
將用作threadNamePrefix
. 這有益于日志分析,在日志appender配置中,一般建議總是包含線程名稱.當(dāng)在SimpleMessageListenerContainer的taskExecutor屬性中指定TaskExecutor
時,線程名稱是不能修改的.建議你使用相似的技術(shù)來命名線程, 幫助在日志消息中的線程識別。
當(dāng)創(chuàng)建連接時,在
CachingConnectionFactory
配置的Executor將傳遞給RabbitMQ Client
,并且它的線程將用于投遞新消息到監(jiān)聽器容器.在寫作的時候,如果沒有配置,client會使用池大小為5的內(nèi)部線程池executor.
RabbitMQ client
使用ThreadFactory
來為低端I/O(socket)操作創(chuàng)建線程.要改變這個工廠,你需要配置底層RabbitMQ ConnectionFactory,
正如the section called “Configuring the Underlying Client Connection Factory”中所描述.
雖然高效,但異步消費者存在一個問題:如何來探測它們什么是空閑的 - 當(dāng)有一段時間沒有收到消息時,用戶可能想要采取某些動作.
從1.6版本開始, 當(dāng)沒有消息投遞時,可配置監(jiān)聽器容器來發(fā)布ListenerContainerIdleEvent
事件. 當(dāng)容器是空閑的,事件會每隔idleEventInterval
毫秒發(fā)布事件.
要配置這個功能,須在容器上設(shè)置idleEventInterval
:
xml
<rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"...
>
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
Java
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
@RabbitListener
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}
在上面這些情況中,當(dāng)容器空閑時,每隔60秒就會發(fā)布事件.
事件消費
通過實現(xiàn)ApplicationListener
可捕獲這些事件- 要么是一個一般的監(jiān)聽器,要么是一個窄化的只接受特定事件的監(jiān)聽器. 你也可以使用Spring Framework 4.2中引入的@EventListener.
下面的例子在單個類中組合使用了@RabbitListener
和@EventListener
.重點要理解,應(yīng)用程序監(jiān)聽器會收到所有容器的事件,因此如果你只對某個容器采取措施,那么你需要檢查監(jiān)聽器id.你也可以使用@EventListener
條件來達到此目的.
事件有4個屬性:
source
- 監(jiān)聽容器實例id
- 監(jiān)聽器id(或容器bean名稱)idleTime
- 當(dāng)事件發(fā)布時,容器已經(jīng)空閑的時間queueNames
- 容器監(jiān)聽的隊列名稱
public class Listener {
@RabbitListener(id="foo", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}
@EventListener(condition = "event.listenerId == 'foo'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}
}
重要
事件監(jiān)聽器會查看所有容器的事件,因此,在上面的例子中,我們根據(jù)監(jiān)聽器ID縮小了要接收的事件.
警告
如果你想使用idle事件來停止監(jiān)聽器容器,你不應(yīng)該在調(diào)用監(jiān)聽器的線程上來調(diào)用container.stop()
方法- 它會導(dǎo)致延遲和不必要的日志消息. 相反,你應(yīng)該把事件交給一個不同的線程,然后可以停止容器。
3.1.7 消息轉(zhuǎn)換器
介紹
AmqpTemplate
同時也定義了多個發(fā)送和接收消息(委派給MessageConverter)的方法.
MessageConverter
本身是很簡單的. 在每個方向上它都提供了一個方法:一個用于轉(zhuǎn)換成Message,另一個用于從Message中轉(zhuǎn)換.注意,當(dāng)轉(zhuǎn)換成Message時,除了object外,你還需要提供消息屬性. "object"參數(shù)通常對應(yīng)的是Message body.
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
AmqpTemplate中相關(guān)的消息發(fā)送方法列舉在下邊. 這比我們前面提到的要簡單,因為它們不需要Message
實例. 相反地, MessageConverter
負責(zé)創(chuàng)建每個消息(通過將提供的對象轉(zhuǎn)換成Message
body的字節(jié)數(shù)組,以及添加提供的MessageProperties)
.
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,這里只有兩個方法:一個接受隊列名稱,另一個依賴于模板設(shè)置的隊列屬性.
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
SimpleMessageConverter
MessageConverter
策略的默認(rèn)實現(xiàn)被稱為SimpleMessageConverter
. 如果你沒有明確配置,RabbitTemplate實例會使用此轉(zhuǎn)換器的實例.它能處理基于文本內(nèi)容,序列化Java對象,以及簡單的字節(jié)數(shù)組.
從 Message中轉(zhuǎn)換
如果傳入消息的內(nèi)容類型以"text" (如. "text/plain")開頭,它同時也會檢查內(nèi)容編碼屬性,以確定將消息body字節(jié)數(shù)組轉(zhuǎn)換成字符串所要使用的字符集. 如果在輸入消息中沒有指定內(nèi)容編碼屬性, 它默認(rèn)會使用"UTF-8"字符集.如果你需要覆蓋默認(rèn)設(shè)置,你可以配置一個SimpleMessageConverter
實例,設(shè)置其"defaultCharset" 屬性,再將其注入到RabbitTemplate
實例中.
如果傳入消息的內(nèi)容類型屬性值為"application/x-java-serialized-object", SimpleMessageConverter
將嘗試將字節(jié)數(shù)組反序列化為一個Java object. 雖然這對于簡單的原型是有用的,但一般不推薦依賴于Java序列化機制,因為它會生產(chǎn)者和消費者之間的緊密耦合。當(dāng)然,這也排除了在兩邊使用非Java的可能性.由于AMQP 是線路級協(xié)議, 因這樣的限制失去了許多優(yōu)勢,這是不幸的. 在后面的兩個章節(jié)中,我們將探討通過豐富的域?qū)ο蟮膬?nèi)容來替代java序列化.
對于其它內(nèi)容類型,SimpleMessageConverter
會以字節(jié)數(shù)組形式直接返回消息body內(nèi)容.
參考the section called “Java Deserialization” 來了解更多信息.
轉(zhuǎn)換成消息
當(dāng)從任意Java對象轉(zhuǎn)換成Message時, SimpleMessageConverter
同樣可以處理字節(jié)數(shù)組,字符串,以及序列化實例.它會將每一種都轉(zhuǎn)換成字節(jié)(在字節(jié)數(shù)組的情況下,不需要任何轉(zhuǎn)換), 并且會相應(yīng)地設(shè)置內(nèi)容類型屬性.如果要轉(zhuǎn)換的對象不匹配這些類型,Message body 將是null.
SerializerMessageConverter
除了它可以使用其它application/x-java-serialized-object轉(zhuǎn)換的Spring框架Serializer
和 Deserializer
實現(xiàn)來配置外,此轉(zhuǎn)換器類似于SimpleMessageConverter
.
參考the section called “Java Deserialization” 來了解更多信息.
Jackson2JsonMessageConverter
轉(zhuǎn)換成消息
正如前面章節(jié)提到的,一般來說依賴于Java序列化機制不是推薦的.另一個常見更靈活且可跨語言平臺的選擇JSON (JavaScript Object Notation).可通過在RabbitTemplate實例上配置轉(zhuǎn)換器來覆蓋默認(rèn)SimpleMessageConverter
.Jackson2JsonMessageConverter
使用的是com.fasterxml.jackson
2.x 包.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
正如上面展示的, Jackson2JsonMessageConverter
默認(rèn)使用的是DefaultClassMapper
. 類型信息是添加到MessageProperties中的(也會從中獲取)
. 如果入站消息在MessageProperties中沒有包含類型信息,但你知道預(yù)期類型,你可以使用defaultType
屬性來配置靜態(tài)類型
<bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
轉(zhuǎn)換Message
入站消息會根據(jù)發(fā)送系統(tǒng)頭部中添加的類型信息來轉(zhuǎn)換成對象.
在1.6之前的版本中,如果不存在類型信息,轉(zhuǎn)換將失敗。從1.6版開始,如果類型信息丟失,轉(zhuǎn)換器將使用Jsckson默認(rèn)值(通常是一個map)來轉(zhuǎn)換JSON.
此外,從1.6版本開始,當(dāng)在方法上使用
@RabbitListener
注解時, 推斷類型信息會添加到MessageProperties
; 這允許轉(zhuǎn)換器轉(zhuǎn)換成目標(biāo)方法的參數(shù)類型.這只適用于無注解的參數(shù)或使用@Payload
注解的單個參數(shù). 在分析過程中忽略類型消息的參數(shù)。
重要
默認(rèn)情況下,推斷類型信息會覆蓋inbound __TypeId__
和發(fā)送系統(tǒng)創(chuàng)建的相關(guān)headers. 這允許允許接收系統(tǒng)自動轉(zhuǎn)換成不同的領(lǐng)域?qū)ο? 這只適用于具體的參數(shù)類型(不是抽象的或不是接口)或者來自java.util
包中的對象.其它情況下,將使用 __TypeId__
和相關(guān)的頭.也可能有你想覆蓋默認(rèn)行為以及總是使用__TypeId__信息的情況. 例如, 讓我們假設(shè)你有一個接受Foo參數(shù)的@RabbitListener
,但消息中包含了Bar(
它是的Foo
(具體類)的子類). 推斷類型是不正確的.要處理這種情況,需要設(shè)置Jackson2JsonMessageConverter 的TypePrecedence
屬性為TYPE_ID
而替換默認(rèn)的INFERRED
. 這個屬性實際上轉(zhuǎn)換器的DefaultJackson2JavaTypeMapper
,但為了方便在轉(zhuǎn)換器上提供了一個setter方法. 如果你想注入一個自定義類型mapper, 你應(yīng)該設(shè)置屬性mapper.
@RabbitListener
public void foo(Foo foo) {...}
@RabbitListener
public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void foo(Foo foo, o.s.amqp.core.Message message) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void foo(Foo foo, String bar) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<?> message) {...}
上面前4種情況下,轉(zhuǎn)換器會嘗試轉(zhuǎn)換成Foo
類型. 第五個例子是無效的,因為我們不能確定使用哪個參數(shù)來接收消息負荷. 在第六個例子中, Jackson 會根據(jù)泛型WildcardType來應(yīng)用
.
然而,你也可以創(chuàng)建一個自定義轉(zhuǎn)換器,并使用targetMethod
消息屬性來決定將JSON轉(zhuǎn)換成哪種類型.
這種類型接口只能在@RabbitListener
注解聲明在方法級上才可實現(xiàn).在類級@RabbitListener
, 轉(zhuǎn)換類型用來選擇調(diào)用哪個@RabbitHandler
方法.基于這個原因,基礎(chǔ)設(shè)施提供了targetObject
消息屬性,它可用于自定義轉(zhuǎn)換器來確定類型.
MarshallingMessageConverter
還有一個選擇是MarshallingMessageConverter
.它會委派到Spring OXM 包的 Marshaller
和 Unmarshaller
策略接口實現(xiàn).
你可從here了解更多. 在配置方面,最常見的是只提供構(gòu)造器參數(shù),因為大部分Marshaller
的實現(xiàn)都將實現(xiàn)Unmarshaller
.
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
ContentTypeDelegatingMessageConverter
這個類是在1.4.2版本中引入的,并可基于MessageProperties的contentType屬性允許委派給一個特定的MessageConverter
.默認(rèn)情況下,如果沒有contentType屬性或值沒有匹配配置轉(zhuǎn)換器時,它會委派給SimpleMessageConverter
.
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
重要
當(dāng)從不可信任的來源反序列化Java對象時,存在一個可能的漏洞.如果從不可信來源,使用內(nèi)容類型 application/x-java-serialized-object
來接收消息,你可以考慮配置允許哪些包/類能反序列化.
這既適用于SimpleMessageConverter,也適用于SerializerMessageConverter,當(dāng)它被配置為使用一個DefaultDeserializer時 -或含蓄地或通過配置方式的。
默認(rèn)情況下,白名單列表是空的,這意味著所有類都會反序列化.你可以設(shè)置模式列表,如 foo.*
, foo.bar.Baz
或 *.MySafeClass
.模式會按照順序進行檢查,直到找到匹配的模式.如果沒有找到匹配,將拋出SecurityException
.在這些轉(zhuǎn)換器上,可使用whiteListPatterns
屬性來設(shè)置.
消息屬性轉(zhuǎn)換器
MessagePropertiesConverter
策略接口用于Rabbit Client BasicProperties
與Spring AMQP MessageProperties
之間轉(zhuǎn)換. 默認(rèn)實現(xiàn)(DefaultMessagePropertiesConverter
)通常可滿雖大部分需求,但如果有需要,你可以自己實現(xiàn). 當(dāng)大小不超過1024字節(jié)時,默認(rèn)屬性轉(zhuǎn)換器將 BasicProperties
中的LongString
轉(zhuǎn)換成String
. 更大的 LongString
將不會進行轉(zhuǎn)換(參考下面的內(nèi)容.這個限制可通過構(gòu)造器參數(shù)來覆蓋.
從1.6版本開始, 現(xiàn)在headers 長超過 long string 限制(默認(rèn)為1024) 將被DefaultMessagePropertiesConverter
保留作為 LongString
. 你可以通過 the getBytes[]
, toString()
, 或getStream()
方法來訪問內(nèi)容.
此前, DefaultMessagePropertiesConverter
會將這樣的頭轉(zhuǎn)換成一個 DataInputStream
(實際上它只是引用了LongString的
DataInputStream
). 在輸出時,這個頭不會進行轉(zhuǎn)換(除字符串外,如在流上調(diào)用toString()方法 java.io.DataInputStream@1d057a39)
.
更大輸入LongString
頭現(xiàn)在可正確地轉(zhuǎn)換,在輸出時也一樣.
它提供了一個新的構(gòu)造器來配置轉(zhuǎn)換器,這樣可像以前一樣來工作:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
另外,從1.6版本開始,在 MessageProperties中添加了一個新屬性correlationIdString
.此前,當(dāng)在RabbitMQ 客戶端中轉(zhuǎn)換BasicProperties
時,將會執(zhí)行不必要的byte[] <-> String
轉(zhuǎn)換,這是因為 MessageProperties.correlationId
是一個byte[]
而 BasicProperties
使用的是String
.
(最終,RabbitMQ客戶端使用UTF-8字符串轉(zhuǎn)化為字節(jié)并放在協(xié)議消息中).
為提供最大向后兼容性,新屬性correlationIdPolicy
已經(jīng)被加入到了DefaultMessagePropertiesConverter
.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy
枚舉參數(shù).
默認(rèn)情況下,它設(shè)置為BYTES
(復(fù)制先前的行為).
對于入站消息:
STRING
- 只映射correlationIdString
屬性BYTES
- 只映射correlationId
屬性BOTH
- 會同時映射兩個屬性
對于出站消息:
STRING
- 只映射correlationIdString
屬性BYTES
- 只映射correlationId
屬性BOTH
- 兩種屬性都會考慮,但會優(yōu)先考慮String 屬性
也從1.6版本開始,入站deliveryMode
屬性不再需要映射 MessageProperties.deliveryMode
,相反使用MessageProperties.receivedDeliveryMode
來代替.另外,入站userId
屬性也不需要再映射MessageProperties.userId
,相反使用MessageProperties.receivedUserId
來映射.
這種變化是為了避免這些屬性的意外傳播,如果同樣的MessageProperties
對象用于出站消息時.
3.1.8 修改消息- 壓縮以及更多
提供了許多的擴展點,通過它們你可以對消息執(zhí)行預(yù)處理,要么在發(fā)送RabbitMQ之前,要么在接收到消息之后.
正如你在Section 3.1.7, “Message Converters”看到的,這樣的擴展點存在于AmqpTemplate
convertAndReceive
操作中,在那里你可以提供一個MessagePostProcessor
.
例如,你的POJO轉(zhuǎn)換之后, MessagePostProcessor
允許你在Message上設(shè)置自定義的頭或?qū)傩裕?/font>
從1.4.2版本開始,額外的擴展點已經(jīng)添加到RabbitTemplate
- setBeforePublishPostProcessors()
和setAfterReceivePostProcessors()
. 第一個開啟了一個post processor來在發(fā)送消息到RabbitMQ之前立即運行.當(dāng)使用批量時(參考 the section called “Batching”), 這會在批處理裝配之后發(fā)送之前調(diào)用.
第二個會在收到消息后立即調(diào)用.
這些擴展點對于壓縮這此功能是有用的,基于這些目的,提供了多個MessagePostProcessor
:
- GZipPostProcessor
- ZipPostProcessor
針對于發(fā)送前的消息壓縮,以及
- GUnzipPostProcessor
- UnzipPostProcessor
針對于消息解壓.
類似地, SimpleMessageListenerContainer
也有一個 setAfterReceivePostProcessors()
方法,
允許在消息收到由容器來執(zhí)行解壓縮.