介紹
Spring AMQP 由少數幾個模塊組成, 每個都以JAR的形式來表現.這些模塊是: spring-amqp和spring-rabbit. spring-amqp模塊包含org.springframework.amqp.core
包.
在那個包中,你會找到表示AMQP核心模塊的類. 我們的目的是提供通用的抽象,不依賴于任何特定中間件的實現或AMQP客戶端庫。
最終用戶代碼將更具有移植性,以便跨供應商實現,因為它們可以對抽象層開發。這些抽象是使用broker的特定模塊實現的,如spring-rabbit。
目前只有一個RabbitMQ實現;而對于抽象的驗證,除了RabbitMQ外,也已經在.Net平臺上使用Apache Qpid得到了驗證。
由于AMQP原則上工作于協議層次,RabbitMQ客戶端可以在任何支持相同的協議版本的broker中使用,但目前我們沒有測試其它任何broker。
這里的概述假設你已經熟悉了AMQP規范.如果你還沒有,那么你可以查看第5章,其它資源中列舉的資源.
Message
AMQP 0-8 和0-9-1 規范沒有定義一個消息類或接口.相反,當執行basicPublish()這樣的操作的時候
, 內容是以字節數組參數進行傳遞的,其它額外屬性也是以單獨參數進行傳遞的. Spring AMQP定義了一個 Message 類來作為AMQP領域模型表示的一部分.Message類的目的是在單個實例中簡化封裝消息體(body)和屬性(header),這樣API就可以變得更簡單. Message類的定義相當簡單.
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
returnthis.body;
}
public MessageProperties getMessageProperties() {
returnthis.messageProperties;
}
}
MessageProperties
接口定義了多個共同屬性,如messageId, timestamp, contentType 等等. 那些屬性可以通過調用setHeader(String key, Object value)
方法使用用戶定義頭(user-defined headers)來擴展.
Exchange
Exchange
接口代表的是AMQP Exchange,它是生產者發送消息的地方.broker虛擬主機中的交換器名稱都是唯一的,同時還有少量的其它屬性:
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
正如你所看到的, Exchange還有一個type (它是在ExchangeTypes中定義的常量)
. 基本類型是: Direct
, Topic
, Fanout
,和Headers
.
在核心包中,你可以找到每種類型的Exchange
接口實現.這些交換器類型會在處理隊列綁定時,行為有所不同.
例如,Direct交換器允許隊列以固定路由鍵進行綁定(通常是隊列的名稱).
Topic交換器支持使用路由正則表達式(*通配符明確匹配一個,而#通配符可匹配0個或多個).
Fanout交換器會把消息發布到所有綁定到它上面的隊列而不考慮任何路由鍵.
關于交換器類型的更多信息,查看Chapter 5, Other Resources.
AMQP規范還要求任何broker必須提供一個默認的無名字的(空字符串)Direct交換器.所有聲明的隊列都可以用它們的名字作為路由鍵綁定到默認交換器中. 在Section 3.1.4, “AmqpTemplate”你會了解到更多關于在Sring AMQP中使用默認交換器的使用情況.
Queue
Queue
代表的是消費者接收消息的組件. 像各種各樣的 Exchange 類,我們的實現目標是作為核心AMQP類型的抽象表示.
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
注意,構造器需要接受隊列名稱作為參數.根據實現, admin template可能會提供生成獨特隊列名稱的方法.這些隊列作為回復地址或用于臨時情景是非常有用的.
基于這種原因,自動生成隊列的exclusive和 autoDelete 屬性都應該設置為true.
參考 Section 3.1.10, “Configuring the broker” 來了解關于使用命名空間來聲明隊列,包括隊列參數的詳細情況.
Binding
生產者發送消息到Exchange,而消費者將從Queue中獲取消息,連接Queues與Exchanges之間的綁定對于通過消息來連接生產者和消費者是非常關鍵的.
在Spring AMQP中,我們定義了一個 Binding
類來表示這些連接. 讓我們重新回顧一下綁定隊列和交換器的操作.
你可以使用固定的路由鍵來綁定 Queue 到 DirectExchange上.
new Binding(someQueue, someDirectExchange, "foo.bar")
你可以使用路由正則表達式來綁定Queue到TopicExchange上.
new Binding(someQueue, someTopicExchange, "foo.*")
你可以不使用路由鍵來綁定Queue到FanoutExchange上.
new Binding(someQueue, someFanoutExchange)
我們還提供了BindingBuilder來方便操作
.
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
上面展示的BindingBuilder 類很清晰,但如果為bind()方法使用靜態導入,這種形式將工作得更好.
本身來說,Binding類的實例只能一個connection中持有數據.換句話說,它不是一個活力(active)組件.
但正如在后面Section 3.1.10, “Configuring the broker”看到的, Binding實例可由AmqpAdmin
類來觸發broker上的綁定操作.
同樣,在同一個章節中,你還會看到Binding實例可在@Configuration類中使用Spring @Bean風
格來定義.
還有方便的基類來簡化生成AMQP相關bean定義和識別隊列,交換器,綁定的方法,這樣當AMQP broker運行程序啟動時,就可以得到聲明.
AmqpTemplate
也在核心包中定義.作為涉及AMQP消息的主要組件, 會在它自己的章節中進行詳細介紹(參考Section 3.1.4, “AmqpTemplate”).
3.1.2 連接和資源管理
介紹
雖然我們在前面章節中描述的AMQP模型是通用的,適用于所有實現,但當我們說到資源管理時,其細節是針對特定broker實現的.因此,在這個章節中,我們只關注我們的"spring-rabbit"模塊,因為到現在為止,RabbitMQ是唯一支持的實現.
RabbitMQ broker中用于管理連接的中心組件是ConnectionFactory
接口. ConnectionFactory
實現的責任是提供一個org.springframework.amqp.rabbit.connection.Connection
的實例,它包裝了com.rabbitmq.client.Connection
.
我們提供的唯一具體實現提CachingConnectionFactory
,默認情況下,會建立應用程序可共享的單個連接代理.連接共享是可行的,因為在AMQP處理消息的工作單位實際是 "channel" (在某些方面,這類似于JMS中Connection 和 Sessionin的關系).
你可以想象,連接實例提供了一個createChannel方法。CachingConnectionFactory
實現支持這些channels的緩存,它會基于它們是否是事務的來單獨維護其緩存.
當創建CachingConnectionFactory的實例時
, hostname 可通過構造器來提供,username 和password 屬性也可以提供.如果你想配置channel緩存的大小(默認是25),你可以調用setChannelCacheSize()
方法.
從1.3版本開始,CachingConnectionFactory
也可以同channel一樣,配置緩存連接.在這種情況下每次調用createConnection()
都會創建一個新連接(或者從緩存中獲取空閑的連接).
關閉連接會將其返回到緩存中(如果還沒有達到緩存大小的話).在這些連接上創建的Channels同樣也會被緩存. 單獨連接的使用在某些環境中是有用的,如從HA 集群中消費, 連接負載均衡器,連接不同的集群成員.設置cacheMode
為 CacheMode.CONNECTION
.
這不會限制連接的數目,它用于指定允許空閑打開連接的數目.
從1.5.5版本開始,提供了一個新屬性connectionLimit
.當設置了此屬性時,它會限制連接的總數目,當達到限制值時,將channelCheckoutTimeLimit
來等待空閑連接.如果時間超時了,將拋出AmqpTimeoutException
.
重要
當緩存模式是CONNECTION時
, 隊列的自動聲明等等 (參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”) 將不再支持.
此外,在寫作的時候,rabbitmq-client
包默認為每個連接(5個線程)創建了一個固定的線程池. 當需要使用大量連接時,你應該考慮在CachingConnectionFactory定制一個executor
. 然后,同一個executor會用于所有連接,其線程也是共享的.
executor的線程池是沒有界限的或按預期使用率來設置(通常, 一個連接至少應該有一個線程).如果在每個連接上創建了多個channels,那么池的大小會影響并發性,因此一個可變的線程池executor應該是最合適的.
理解緩存大小不是限制是很重要的, 它僅僅是可以緩存的通道數量.當說緩存大小是10時,在實際使用中,其實可以是任何數目的通道. 如果超過10個通道被使用,他們都返回到高速緩存,10個將在高速緩存中,其余的將物理關閉。
從1.6版本開始,默認通道緩存大小從1增加到了25. 在高容量,多線程環境中,較小的緩存意味著通道的創建和關閉將以很高的速率運行.加大默認緩存大小可避免這種開銷.
你可以監控通道的使用情況(通過RabbitMQ Admin UI) ,如果看到有很多通道在創建和關閉,你可以增大緩存大小.緩存只會增長按需(以適應應用程序的并發性要求),所以這個更改不會影響現有的低容量應用程序。
從1.4.2版本開始,CachingConnectionFactory
有一個channelCheckoutTimeout屬性
. 當此屬性的值大于0時,channelCacheSize
會變成連接上創建通道數目的限制.
如果達到了限制,調用線程將會阻塞,直到某個通道可用或者超時, 在后者的情況中,將拋出AmqpTimeoutException
異常.
在框架(如.RabbitTemplate)中使用的通道將會可靠地返回到緩存中.如果在框架外創建了通道 (如.直接訪問connection(s)并調用createChannel()
),你必須可靠地返回它們(通過關閉),也許需要在 finally
塊中以防止耗盡通道.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
當使用XML時,配置看起來像下面這樣:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username"value="guest"/>
<property name="password" value="guest"/>
</bean>
這里也有一個 SingleConnectionFactory
實現,它只能用于框架的單元測試代碼中.它比CachingConnectionFactory
簡單,因為它不會緩存通道,由于其缺乏性能和韌性,它不打算用于簡單的測試以外的實際使用.
如果基于某些原因,你需要自己來實現ConnectionFactory
,AbstractConnectionFactory
基類提供了一個非常好的起點.
ConnectionFactory
可使用rabbit命名空間來快速方便的建立:
<rabbit:connection-factory id="connectionFactory"/>
在多數情況下,這是很好的,因為框架會為你選擇最好的默認值.創建的實例會是CachingConnectionFactory
.要記住,默認的緩存大小是25.如果你想緩存更多的通道,你可以設置channelCacheSize 屬性值.在XML中,它看起來像下面這樣:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
在命名空間中,你也可以添加channel-cache-size 屬性:
<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>
默認的緩存模式是CHANNEL, 但你可以使用緩存連接來替換;在這種情況下,我們會使用connection-cache-size
:
<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
Host 和 port 屬性也可以在命名空間中提供:
<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>
此外,如果運行集群環境中,使用addresses屬性.
<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>
下面是一個自定義的線程工廠,其前輟線程名稱為rabbitmq-
.
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
配置底層客戶端連接工廠
CachingConnectionFactory
使用的是 Rabbit client ConnectionFactory的實例
; 當在CachingConnectionFactory
設置等價屬性時,許多屬性(host, port, userName, password, requestedHeartBeat, connectionTimeout
) 來傳遞.
要設置其它屬性(例如clientProperties)
,可定義一個rabbit factory 的實例,并使用CachingConnectionFactory的適當構造器來提供引用.
當使用上面提到的命名空間時,要在connection-factory屬性中提供一個工廠的引用來配置. 為方便起見,提供了一個工廠,以協助在一個Spring應用程序上下文中配置連接工廠,在下一節討論。
<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
RabbitConnectionFactoryBean 和配置SSL
從1.4版本開始, 提供了一個便利的RabbitConnectionFactoryBean
類通過依賴注入來配置底層客戶端連接工廠的SSL屬性.其它設置簡單地委派給底層工廠.以前你必須以編程方式配置SSL選項。
<rabbit:connection-factory id="rabbitConnectionFactory"connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" />
<bean id="clientConnectionFactory" class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>
參考 RabbitMQ Documentation 來了解關于配置SSL的更多信息. 省略的keyStore
和 trustStore
配置將在無證書驗證的情況下,通過SSL來連接. Key和trust store 配置可以按如下提供:
sslPropertiesLocation
屬性是一個Spring Resource
,它指向一個包含下面key的屬性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore
的 truststore
是指向store的 Spring Resources
.通常情況下,這個屬性文件在操作系統之下安全的,應用程序只能讀取訪問.
從Spring AMQP 1.5版本開始,這些屬性可直接在工廠bean上設置.如果同時提供了discrete和 sslPropertiesLocation
屬性, 后者屬性值會覆蓋discrete值.
路由連接工廠
從1.3版本開始,引入了AbstractRoutingConnectionFactory
.這提供了一種機制來配置多個ConnectionFactories的映射,并通過在運行時使用lookupKey來決定目標ConnectionFactory
.
通常,實現會檢查線程綁定上下文. 為了方便, Spring AMQP提供了SimpleRoutingConnectionFactory
, 它會從SimpleResourceHolder中獲取當前線程綁定的lookupKey
:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<propertyname="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
在使用資源后,對其進行解綁是很重要的.更多信息參考AbstractRoutingConnectionFactory的JavaDocs
.
從1.4版本開始, RabbitTemplate
支持SpEL sendConnectionFactorySelectorExpression
和receiveConnectionFactorySelectorExpression
屬性,
它會在每個AMQP 協議交互操作(send
, sendAndReceive
, receive
or receiveAndReply
)進行評估, 為提供的AbstractRoutingConnectionFactory類解析lookupKey值
.
Bean 引用,如"@vHostResolver.getVHost(#root)"
可用于表達式中.對于send
操作, 要發送的消息是根評估對象;對于receive操作
, queueName 是根評估對象.
路由算法為:如果selector 表達式為null
,或等價于null
,或提供的ConnectionFactory
不是AbstractRoutingConnectionFactory的實例
,根據提供的ConnectionFactory 實現,
所有的工作都按之前的進行.同樣的結果也會發生:如果評估結果不為null
,但對于lookupKey 無目標ConnectionFactory,且
the AbstractRoutingConnectionFactory
使用lenientFallback = true進行了配置
.
當然,在AbstractRoutingConnectionFactory
的情況下,它會基于determineCurrentLookupKey()的路由實現來進行回退. 但,如果lenientFallback = false
, 將會拋出 IllegalStateException
異常.
Namespace 在<rabbit:template>
組件中也支持
send-connection-factory-selector-expression
和receive-connection-factory-selector-expression
屬性.
也是從1.4版本開始, 你可以在SimpleMessageListenerContainer配
置路由連接工廠. 在那種情況下,隊列名稱的列表將作為lookup key.例如,如果你在容器中配置setQueueNames("foo", "bar")
,lookup key將是"[foo,bar]"
(無空格).