<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類(lèi)

    隨筆檔案

    搜索

    •  

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    Queue Affinity 和 LocalizedQueueConnectionFactory

    當(dāng)在集群中使用HA隊(duì)列時(shí),為了獲取最佳性能,可以希望連接到主隊(duì)列所在的物理broker. 雖然CachingConnectionFactory 可以配置為使用多個(gè)broker 地址; 這會(huì)失敗的,client會(huì)嘗試按順序來(lái)連接. LocalizedQueueConnectionFactory 使用管理插件提供的 REST API來(lái)確定包含master隊(duì)列的節(jié)點(diǎn).然后,它會(huì)創(chuàng)建(或從緩存中獲取)一個(gè)只連接那個(gè)節(jié)點(diǎn)的CachingConnectionFactory .如果連接失敗了,將會(huì)確定一個(gè)新的消費(fèi)者可連接的master節(jié)點(diǎn). LocalizedQueueConnectionFactory 使用默認(rèn)的連接工廠進(jìn)行配置,在隊(duì)列物理位置不能確定的情況下,它會(huì)按照正常情況來(lái)連接集群.

    LocalizedQueueConnectionFactory 是一個(gè)RoutingConnectionFactory , SimpleMessageListenerContainer 會(huì)使用隊(duì)列名稱作為其lookup key ,這些已經(jīng)在上面的 the section called “Routing Connection Factory” 討論過(guò)了.


    基于這個(gè)原因(使用隊(duì)列名稱來(lái)作查找鍵),LocalizedQueueConnectionFactory 只在容器配置為監(jiān)聽(tīng)某個(gè)單一隊(duì)列時(shí)才可使用.

    RabbitMQ 管理插件應(yīng)該在每個(gè)節(jié)點(diǎn)上開(kāi)啟.

    警告

    這種連接工廠用于長(zhǎng)連接,如用在SimpleMessageListenerContainer的連接.它的目的不是用于短連接, 如在 RabbitTemplate中使用,這是因?yàn)樵谶B接前,它要調(diào)用REST API. 此外,對(duì)于發(fā)布操作來(lái)說(shuō),隊(duì)列是未知的,不管如何, 消息會(huì)發(fā)布到所有集群成員中,因此查找節(jié)點(diǎn)的邏輯幾乎沒(méi)有什么意義。

    這里有一個(gè)樣例配置,使用了Spring Boot的RabbitProperties來(lái)配置工廠:

    @Autowired
    private RabbitProperties props;
    
    private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };
    
    private final String[] nodes = { "rabbit@host1", "rabbit@host2" };
    
    @Bean
    public ConnectionFactory defaultConnectionFactory() {
        CachingConnectionFactory cf = new CachingConnectionFactory();
        cf.setAddresses(this.props.getAddresses());
        cf.setUsername(this.props.getUsername());
        cf.setPassword(this.props.getPassword());
        cf.setVirtualHost(this.props.getVirtualHost());
        return cf;
    }
    
    @Bean
    public ConnectionFactory queueAffinityCF(
            @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
           return new LocalizedQueueConnectionFactory(defaultCF,
                StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
                this.adminUris, this.nodes,
                this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
                false, null);
    }

    注意,三個(gè)參數(shù)是 addressesadminUris 和 nodes的數(shù)組. 當(dāng)一個(gè)容器試圖連接一個(gè)隊(duì)列時(shí),它們是有位置性的,它決定了哪個(gè)節(jié)點(diǎn)上的隊(duì)列是mastered,并以同樣數(shù)組位置來(lái)連接其地址.

    發(fā)布者確認(rèn)和返回

    確認(rèn)和返回消息可通過(guò)分別設(shè)置CachingConnectionFactory的 publisherConfirms 和publisherReturns 屬性為ture來(lái)完成.

    當(dāng)設(shè)置了這些選項(xiàng)時(shí),由工廠創(chuàng)建的通道將包裝在PublisherCallbackChannel,這用來(lái)方便回調(diào). 當(dāng)獲取到這樣的通道時(shí),client可在channel上注冊(cè)一個(gè) PublisherCallbackChannel.ListenerPublisherCallbackChannel 實(shí)現(xiàn)包含一些邏輯來(lái)路由確認(rèn)/返回給適當(dāng)?shù)谋O(jiān)聽(tīng)器. 這些特性將在下面的章節(jié)中進(jìn)一步解釋.

    對(duì)于一些更多的背景信息, 可以參考下面的博客:Introducing Publisher Confirms.

    記錄通道關(guān)閉事件

    1.5版本中引入了允許用戶控制日志級(jí)別的機(jī)制.

    CachingConnectionFactory 使用默認(rèn)的策略來(lái)記錄通道關(guān)閉事件:

    • 不記錄通道正常關(guān)閉事件 (200 OK).
    • 如果通道是因?yàn)槭〉谋粍?dòng)的隊(duì)列聲明關(guān)閉的,將記錄為debug級(jí)別.
    • 如果通道關(guān)閉是因?yàn)?span style="color: #6d180b; font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; background-color: #f2f2f2;">basic.consume專用消費(fèi)者條件而拒絕引起的,將被記錄為INFO級(jí)別.
    • 所有其它的事件將記錄為ERROR級(jí)別.

    要修改此行為,需要在CachingConnectionFactory的closeExceptionLogger屬性中注入一個(gè)自定義的ConditionalExceptionLogger.

    也可參考the section called “Consumer Failure Events”.

    運(yùn)行時(shí)緩存屬性

    從1.6版本開(kāi)始CachingConnectionFactory 通過(guò)getCacheProperties()方法提供了緩存統(tǒng)計(jì). 這些統(tǒng)計(jì)數(shù)據(jù)可用來(lái)在生產(chǎn)環(huán)境中優(yōu)化緩存.例如, 最高水位標(biāo)記可用來(lái)確定是否需要加大緩存.如果它等于緩存大小,你也許應(yīng)該考慮進(jìn)一步加大.

    Table 3.1. CacheMode.CHANNEL的緩存屬性

    PropertyMeaning
    channelCacheSize

    當(dāng)前配置的允許空閑的最大通道數(shù)量.

    localPort

    連接的本地端口(如果可用的話). 在可以在RabbitMQ 管理界面中關(guān)聯(lián) connections/channels.

    idleChannelsTx

    當(dāng)前空閑(緩存的)的事務(wù)通道的數(shù)目.

    idleChannelsNotTx
    當(dāng)前空閑(緩存的)的非事務(wù)通道的數(shù)目.
    idleChannelsTxHighWater

    同時(shí)空閑(緩存的)的事務(wù)通道的最大數(shù)目

    idleChannelsNotTxHighWater

    同時(shí)空閑(緩存的)的非事務(wù)通道的最大數(shù)目.

    Table 3.2. CacheMode.CONNECTION的緩存屬性

    PropertyMeaning
    openConnections

    表示連接到brokers上連接對(duì)象的數(shù)目.

    channelCacheSize

    當(dāng)前允許空閑的最大通道數(shù)目

    connectionCacheSize

    當(dāng)前允許空閑的最大連接數(shù)目.

    idleConnections

    當(dāng)前空閑的連接數(shù)目.

    idleConnectionsHighWater

    目前已經(jīng)空閑的最大連接數(shù)目.

    idleChannelsTx:<localPort>

    在當(dāng)前連接上目前空閑的事務(wù)通道的數(shù)目. 屬性名的localPort部分可用來(lái)在RabbitMQ 管理界面中關(guān)聯(lián)connections/channels.

    idleChannelsNotTx:<localPort>

    在當(dāng)前連接上目前空閑和非事務(wù)通道的數(shù)目.屬性名的localPort部分可用來(lái)在RabbitMQ管理界面中關(guān)聯(lián)connections/channels 

    idleChannelsTxHighWater:
    <localPort>

    已同時(shí)空閑的事務(wù)通道的最大數(shù)目. 屬性名的 localPort部分可用來(lái)在RabbitMQ管理界面中關(guān)聯(lián)connections/channels.

    idleChannelsNotTxHighWater:
    <localPort>

    憶同時(shí)空閑的非事務(wù)通道的最大數(shù)目.屬性名的localPort部分可用來(lái)RabbitMQ管理界面中關(guān)聯(lián)connections/channels.


    cacheMode 屬性 (包含CHANNEL 或 CONNECTION ).

    Figure 3.1. JVisualVM Example

    cacheStats


    3.1.3 添加自定義Client 連接屬性

    CachingConnectionFactory 現(xiàn)在允許你訪問(wèn)底層連接工廠,例如, 設(shè)置自定義client 屬性:

    connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

    當(dāng)在RabbitMQ管理界面中查看連接時(shí),將會(huì)看到這些屬性.


    3.1.4 AmqpTemplate

    介紹

    像其它Spring Framework提供的高級(jí)抽象一樣, Spring AMQP 提供了扮演核心角色的模板. 定義了主要操作的接口稱為AmqpTemplate. 這些操作包含了發(fā)送和接收消息的一般行為.換句話說(shuō),它們不是針對(duì)某個(gè)特定實(shí)現(xiàn)的,從其名稱"AMQP"就可看出.另一方面,接口的實(shí)現(xiàn)會(huì)盡量作為AMQP協(xié)議的實(shí)現(xiàn).不像JMS,它只是接口級(jí)別的API實(shí)現(xiàn), AMQP是一個(gè)線路級(jí)協(xié)議.協(xié)議的實(shí)現(xiàn)可提供它們自己的client libraries, 因此模板接口的實(shí)現(xiàn)都依賴特定的client library.目前,只有一個(gè)實(shí)現(xiàn):RabbitTemplate. 在下面的例子中,你會(huì)經(jīng)常看到"AmqpTemplate",但當(dāng)你查看配置例子或者任何實(shí)例化或調(diào)用setter方法的代碼時(shí),你都會(huì)看到實(shí)現(xiàn)類(lèi)型(如."RabbitTemplate").

    正如上面所提到的, AmqpTemplate 接口定義了所有發(fā)送和接收消息的基本操作. 我們將分別在以下兩個(gè)部分探索消息發(fā)送和接收。

    也可參考the section called “AsyncRabbitTemplate”.

    添加重試功能

    從1.3版本開(kāi)始, 你可為RabbitTemplate 配置使用 RetryTemplate 來(lái)幫助處理broker連接的問(wèn)題. 參考spring-retry 項(xiàng)目來(lái)了解全部信息;下面就是一個(gè)例子,它使用指數(shù)回退策略(exponential back off policy)和默認(rèn)的 SimpleRetryPolicy (向調(diào)用者拋出異常前,會(huì)做三次嘗試).

    使用XML命名空間:

    <rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
    <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
    <property name="initialInterval" value="500" />
    <property name="multiplier" value="10.0" />
    <property name="maxInterval"value="10000" />
    </bean>
    </property>
    </bean>

    使用 @Configuration:

    @Bean
    public AmqpTemplate rabbitTemplate();
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		RetryTemplate retryTemplate = new RetryTemplate();
    		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    		backOffPolicy.setInitialInterval(500);
    		backOffPolicy.setMultiplier(10.0);
    		backOffPolicy.setMaxInterval(10000);
    		retryTemplate.setBackOffPolicy(backOffPolicy);
    		template.setRetryTemplate(retryTemplate);
    		return template;
    }

    從1.4版本開(kāi)始,除了retryTemplate 屬性外,RabbitTemplate 上也支持recoveryCallback 選項(xiàng). 它可用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback)第二個(gè)參數(shù).

    RecoveryCallback 會(huì)有一些限制,因?yàn)樵趓etry context只包含lastThrowable 字段.在更復(fù)雜的情況下,你應(yīng)該使用外部RetryTemplate,這樣你就可以通過(guò)上下文屬性傳遞更多信息給RecoveryCallback

    retryTemplate.execute(
        new RetryCallback<Object, Exception>() {
    
            @Override
      public Object doWithRetry(RetryContext context) throws Exception {
                context.setAttribute("message", message);
                return rabbitTemplate.convertAndSend(exchange, routingKey, message);
            }
        }, new RecoveryCallback<Object>() {
    
            @Overridepublic Object recover(RetryContext context) throws Exception {
                Object message = context.getAttribute("message");
                Throwable t = context.getLastThrowable();
                // Do something with message
       return null;
            }
        });
    }

    在這種情況下,你不需要在RabbitTemplate中注入RetryTemplate.

    發(fā)布者確認(rèn)和返回

    AmqpTemplateRabbitTemplate 實(shí)現(xiàn)支持發(fā)布者確認(rèn)和返回.

    對(duì)于返回消息,模板的 mandatory 屬性必須設(shè)置為true, 或者對(duì)于特定消息,其 mandatory-expression 必須評(píng)估為true .
    此功能需要將CachingConnectionFactory 的publisherReturns 屬性設(shè)置為true (參考 the section called “Publisher Confirms and Returns”).
    返回是通過(guò)注冊(cè)在RabbitTemplate.ReturnCallback(通過(guò)調(diào)用setReturnCallback(ReturnCallback callback))來(lái)返回給客戶端的. 回調(diào)必須實(shí)現(xiàn)下面的方法:

    void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);

    每個(gè)RabbitTemplate只支持一個(gè)ReturnCallback .也可參考the section called “Reply Timeout”.

    對(duì)于發(fā)布者確認(rèn)(又名發(fā)布者應(yīng)答), 模板需要將 CachingConnectionFactory 中的publisherConfirms 屬性設(shè)置為true.
    確認(rèn)是通過(guò)注冊(cè)在RabbitTemplate.ConfirmCallback(通過(guò)調(diào)用setConfirmCallback(ConfirmCallback callback)) 發(fā)送給client的. 回調(diào)必須實(shí)現(xiàn)下面的方法:

    void confirm(CorrelationData correlationData, boolean ack, String cause);

    CorrelationData 對(duì)象是在發(fā)送原始消息的時(shí)候,由client提供的. ack 為true 表示確認(rèn),為false時(shí),表示不確認(rèn)(nack). 對(duì)于nack , cause可能會(huì)包含nack的原因(如果生成nack時(shí),它可用的話).
    一個(gè)例子是當(dāng)發(fā)送消息到一個(gè)不存在的交換器時(shí).在那種情況下,broker會(huì)關(guān)閉通道; 關(guān)閉的原因會(huì)包含在cause中cause 是1.4版本中加入的.

    RabbitTemplate中只支持一個(gè)ConfirmCallback.

    當(dāng)rabbit模板完成發(fā)送操作時(shí),會(huì)關(guān)閉通道; 這可以排除當(dāng)連接工廠緩存滿時(shí)(緩存中還有空間,通道沒(méi)有物理關(guān)閉,返回/確認(rèn)正常處理)確認(rèn)和返回的接待問(wèn)題.
    當(dāng)緩存滿了的時(shí)候, 框架會(huì)延遲5秒來(lái)關(guān)閉,以為接收確認(rèn)/返回消息留有時(shí)間.當(dāng)使用確認(rèn)時(shí),通道會(huì)在收到最后一個(gè)確認(rèn)時(shí)關(guān)閉.
    當(dāng)使用返回時(shí),通道會(huì)保持5秒的打開(kāi)狀態(tài).一般建議將連接工廠的
    channelCacheSize 設(shè)為足夠大,這樣發(fā)布消息的通道就會(huì)返回到緩存中,而不是被關(guān)閉.
    你可以使用RabbitMQ管理插件來(lái)監(jiān)控通道的使用情況;如果你看到通道打開(kāi)/關(guān)閉的非常迅速,那么你必須考慮加大緩存,從而減少服務(wù)器的開(kāi)銷(xiāo).

    Messaging 集成

    從1.4版本開(kāi)始, 構(gòu)建于RabbitTemplate上的RabbitMessagingTemplate提供了與Spring Framework消息抽象的集成(如.org.springframework.messaging.Message).
    This allows you to create the message to send in generic manner.

    驗(yàn)證 User Id

    從1.6版本開(kāi)始,模板支持user-id-expression (當(dāng)使用Java配置時(shí),為userIdExpression). 如果發(fā)送消息,user id屬性的值將在評(píng)估表達(dá)式后進(jìn)行設(shè)置.評(píng)價(jià)的根對(duì)象是要發(fā)送的消息。

    例子:

    <rabbit:template...user-id-expression="'guest'" />
    <rabbit:template...user-id-expression="@myConnectionFactory.username" />

    第一個(gè)示例是一個(gè)文本表達(dá)式;第二個(gè)例子將獲取上下文中連接工廠bean的username 屬性.

    3.1.5 發(fā)送消息

    介紹

    當(dāng)發(fā)送消息時(shí),可使用下面的任何一種方法:

    void send(Message message) throws AmqpException;
    
    void send(String routingKey, Message message) throws AmqpException;
    
    void send(String exchange, String routingKey, Message message) throws AmqpException;

    我們將使用上面列出的最后一個(gè)方法來(lái)討論,因?yàn)樗鼘?shí)際是最清晰的.它允許在運(yùn)行時(shí)提供一個(gè)AMQP Exchange 名稱和路由鍵(routing key).最后一個(gè)參數(shù)是負(fù)責(zé)初建創(chuàng)建Message實(shí)例的回調(diào).使用此方法來(lái)發(fā)送消息的示例如下:

    amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
        new Message("12.34".getBytes(), someProperties));

    如果你打算使用模板實(shí)例來(lái)多次(或多次)向同一個(gè)交換器發(fā)送消息時(shí),"exchange" 可設(shè)置在模板自已身上.在這種情況中,可以使用上面列出的第二個(gè)方法. 下面的例子在功能上等價(jià)于前面那個(gè):

    amqpTemplate.setExchange("marketData.topic");
    amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

    如果在模塊上設(shè)置"exchange"和"routingKey"屬性,那么方法就只接受Message 參數(shù):

    amqpTemplate.setExchange("marketData.topic");
    amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
    amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

    關(guān)于交換器和路由鍵更好的想法是明確的參數(shù)將總是會(huì)覆蓋模板默認(rèn)值.事實(shí)上, 即使你不在模板上明確設(shè)置這些屬性, 總是有默認(rèn)值的地方. 在兩種情況中,默認(rèn)值是空字符串,這是合情合理的. 
    就路由鍵而言,它并不總是首先需要的 (如. Fanout 交換器). 此外,綁定的交換器上的隊(duì)列可能會(huì)使用空字符串. 這些在模板的路由鍵中都是合法的.
    就交換器名稱而言,空字符串也是常常使用的,因?yàn)锳MQP規(guī)范定義了無(wú)名稱的"默認(rèn)交換器".
    由于所有隊(duì)列可使用它們的隊(duì)列名稱作為路由鍵自動(dòng)綁定到默認(rèn)交換器上(它是Direct交換器e) ,
    上面的第二個(gè)方法可通過(guò)默認(rèn)的交換器將簡(jiǎn)單的點(diǎn)對(duì)點(diǎn)消息傳遞到任何隊(duì)列.
    只需要簡(jiǎn)單的將隊(duì)列名稱作為路由鍵-
    在運(yùn)行時(shí)提供方法參數(shù):

    RabbitTemplate template = new RabbitTemplate(); // 使用默認(rèn)的無(wú)名交換器
    template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

    或者,如果你喜歡創(chuàng)建一個(gè)模板用于主要或?qū)iT(mén)向一個(gè)隊(duì)列發(fā)送消息以下是完全合理的:

    RabbitTemplate template = new RabbitTemplate(); // 使用默認(rèn)無(wú)名交換器
    template.setRoutingKey("queue.helloWorld"); // 但我們總是向此隊(duì)列發(fā)送消息
    template.send(new Message("Hello World".getBytes(), someProperties));

    Message Builder API

    1.3版本開(kāi)始,通過(guò) MessageBuilder 和 MessagePropertiesBuilder提供了消息構(gòu)建API; 它們提供了更加方便地創(chuàng)建消息和消息屬性的方法:

    Message message = MessageBuilder.withBody("foo".getBytes())
    	.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    	.setMessageId("123")
    	.setHeader("bar", "baz")
    	.build();

    MessageProperties props = MessagePropertiesBuilder.newInstance()
    	.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    	.setMessageId("123")
    	.setHeader("bar", "baz")
    	.build();
    Message message = MessageBuilder.withBody("foo".getBytes())
    	.andProperties(props)
    	.build();

    每個(gè)MessageProperies上定義的屬性都可以被設(shè)置. 其它方法包括setHeader(String key, String value),removeHeader(String key)removeHeaders(), 和copyProperties(MessageProperties properties).
    每個(gè)屬性方法都有一個(gè)set*IfAbsent() 變種. 
    在默認(rèn)的初始值存在的情況下, 方法名為set*IfAbsentOrDefault().

    提供了五個(gè)靜態(tài)方法來(lái)創(chuàng)建初始message builder:

    public static MessageBuilder withBody(byte[] body) 1
    public static MessageBuilder withClonedBody(byte[] body) 2
    public static MessageBuilder withBody(byte[] body, int from, int to) 3
    public static MessageBuilder fromMessage(Message message) 4
    public static MessageBuilder fromClonedMessage(Message message) 5

    1

    builder創(chuàng)建的消息body是參數(shù)的直接引用.

    2

    builder創(chuàng)建的消息body是包含拷貝原字節(jié)數(shù)組的新數(shù)組.

    3

    build創(chuàng)建的消息body是包含原字節(jié)數(shù)組范圍的新數(shù)組.查看Arrays.copyOfRange() 來(lái)了解更多信息.

    4

    builder創(chuàng)建的消息body是原body參數(shù)的直接引用. 參數(shù)的屬性將拷貝到新MessageProperties對(duì)象中.

    5

    builer創(chuàng)建的消息body包含參數(shù)body的新數(shù)組.參數(shù)的屬性將拷貝到新的MessageProperties 對(duì)象中.

    public static MessagePropertiesBuilder newInstance() 1
    public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2
    public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3

    1

    新消息屬性將使用默認(rèn)值進(jìn)行初始化

    2

    builder會(huì)使用提供的properties對(duì)象進(jìn)行初始化,build() 方法也會(huì)返回參數(shù)properties對(duì)象.

    3

    參數(shù)的屬性會(huì)拷貝到新的MessageProperties對(duì)象中.

    AmqpTemplateRabbitTemplate 實(shí)現(xiàn)中, 每個(gè)send() 方法的重載版本都接受一個(gè)額外的CorrelationData對(duì)象.
    當(dāng)啟用了發(fā)布者確認(rèn)時(shí),此對(duì)象會(huì)在3.1.4, “AmqpTemplate”的回調(diào)中返回.這允許發(fā)送者使用確認(rèn)(ack或nack)來(lái)關(guān)聯(lián)發(fā)送的消息.

    發(fā)布者返回

    當(dāng)模板的mandatory 屬性為true時(shí),返回消息將由 Section 3.1.4, “AmqpTemplate”描述的回調(diào)來(lái)返回.

    從1.4版本開(kāi)始,RabbitTemplate 支持 SpEL mandatoryExpression 屬性,它將對(duì)每個(gè)請(qǐng)求消息進(jìn)行評(píng)估,作為根評(píng)估對(duì)象來(lái)解析成布爾值. Bean引用,如"@myBean.isMandatory(#root)" 可用在此表達(dá)式中.

    發(fā)布者返回內(nèi)部也可用于RabbitTemplate 的發(fā)送和接收操作中. 參考the section called “Reply Timeout” 來(lái)了解更多信息.

    批量

    從1.4.2版本開(kāi)始,引入了BatchingRabbitTemplate.它是RabbitTemplate 的子類(lèi),覆蓋了send 方法,此方法可根據(jù)BatchingStrategy來(lái)批量發(fā)送消息只有當(dāng)一個(gè)批次完成時(shí)才會(huì)向RabbitMQ發(fā)送消息。

    public interface BatchingStrategy {
    
    	MessageBatch addToBatch(String exchange, String routingKey, Message message);
    
    	Date nextRelease();
    
    	Collection<MessageBatch> releaseBatches();
    
    }
    警告
    成批的數(shù)據(jù)是保存在內(nèi)存中的,如果出現(xiàn)系統(tǒng)故障,未發(fā)送的消息將會(huì)丟失.

    這里提供了一個(gè) SimpleBatchingStrategy .它支持將消息發(fā)送到單個(gè) exchange/routing key.它有下面的屬性:

    • batchSize - 發(fā)送前一個(gè)批次中消息的數(shù)量
    • bufferLimit - 批量消息的最大大小;如果超過(guò)了此值,它會(huì)取代batchSize并導(dǎo)致要發(fā)送的部分批處理
    • timeout - 當(dāng)沒(méi)有新的活動(dòng)添加到消息批處理時(shí)之后,將發(fā)送部分批處理的時(shí)間(a time after which a partial batch will be sent when there is no new activity adding messages to the batch)

    SimpleBatchingStrategy 通過(guò)在每個(gè)消息的前面嵌入4字節(jié)二進(jìn)制長(zhǎng)度來(lái)格式化批次消息. 這是通過(guò)設(shè)置springBatchFormat消息屬性為lengthHeader4向接收系統(tǒng)傳達(dá)的.


    重要

    批量消息自動(dòng)由監(jiān)聽(tīng)器容器來(lái)分批(de-batched)(使用springBatchFormat消息頭).拒絕批量消息中的任何一個(gè)會(huì)將導(dǎo)致拒絕整個(gè)批次消息.

    3.1.6 接收消息

    介紹

    Message 接收總是比發(fā)送稍顯復(fù)雜.有兩種方式來(lái)接收Message. 最簡(jiǎn)單的選擇是在輪詢方法調(diào)用中一次只接收一個(gè)消息更復(fù)雜的更常見(jiàn)的方法是注冊(cè)一個(gè)偵聽(tīng)器,按需異步的接收消息
    在下面兩個(gè)子章節(jié)中,我們將看到這兩種方法的示例
    .

    Polling Consumer

    AmqpTemplate 自身可用來(lái)輪詢消息接收.默認(rèn)情況下,如果沒(méi)有可用消息,將會(huì)立即返回 null;它是無(wú)阻塞的.
    從1.5版本開(kāi)始,你可以設(shè)置receiveTimeout,以毫秒為單位, receive方法會(huì)阻塞設(shè)定的時(shí)間來(lái)等待消息.小于0的值則意味著無(wú)限期阻塞 (或者至少要等到與broker的連接丟失).
    1.6版本引入了receive 方法的變種,以允許在每個(gè)調(diào)用上都可設(shè)置超時(shí)時(shí)間.

    警告

    由于接收操作會(huì)為每個(gè)消息創(chuàng)建一個(gè)新的QueueingConsumer,這種技術(shù)并不適用于大容量環(huán)境,可考慮使用異步消費(fèi)者,或?qū)?/span>receiveTimeout 設(shè)為0來(lái)應(yīng)對(duì)這種情況.

    這里有四個(gè)簡(jiǎn)單可用的receive 方法.同發(fā)送方的交換器一樣, 有一種方法需要直接在模板本身上設(shè)置的默認(rèn)隊(duì)列屬性, 還有一種方法需要在運(yùn)行接受隊(duì)列參數(shù).
    版本
    1.6 引入了接受timeoutMillis 的變種,基于每個(gè)請(qǐng)求重寫(xiě)了receiveTimeout 方法.

    Message receive() throws AmqpException;
    
    Message receive(String queueName) throws AmqpException;
    
    Message receive(long timeoutMillis) throws AmqpException;
    
    Message receive(String queueName, long timeoutMillis) throws AmqpException;

    與發(fā)送消息的情況類(lèi)似, AmqpTemplate 有一些便利的方法來(lái)接收POJOs 而非Message 實(shí)例, 其實(shí)現(xiàn)可提供一種方法來(lái)定制MessageConverter 以用于創(chuàng)建返回的Object:

    Object receiveAndConvert() throws AmqpException;
    
    Object receiveAndConvert(String queueName) throws AmqpException;
    
    Message receiveAndConvert(long timeoutMillis) throws AmqpException;
    
    Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

    類(lèi)似于sendAndReceive 方法,從1.3版本開(kāi)始, AmqpTemplate 有多個(gè)便利的receiveAndReply 方法同步接收,處理,以及回應(yīng)消息:

    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
    	   throws AmqpException;
    
    <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     	throws AmqpException;
    
    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    	String replyExchange, String replyRoutingKey) throws AmqpException;
    
    <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    	String replyExchange, String replyRoutingKey) throws AmqpException;
    
    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     	ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
    
    <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

    AmqpTemplate 實(shí)現(xiàn)會(huì)負(fù)責(zé)receive 和 reply 階段.在大多數(shù)情況下,如果有必要,你只需要提供ReceiveAndReplyCallback 的實(shí)現(xiàn)來(lái)為收到的消息執(zhí)行某些業(yè)務(wù)邏輯或?yàn)槭盏降南?gòu)建回應(yīng)對(duì)象.
    注意,ReceiveAndReplyCallback 可能返回null. 在這種情況下,將不會(huì)發(fā)送回應(yīng),receiveAndReply 的工作類(lèi)似于receive 方法. 這允許相同的隊(duì)列用于消息的混合物,其中一些可能不需要答復(fù)。

    自動(dòng)消息(請(qǐng)求和應(yīng)答)轉(zhuǎn)換只能適應(yīng)于提供的回調(diào)不是ReceiveAndReplyMessageCallback 實(shí)例的情況下- 它提供了一個(gè)原始的消息交換合同。

    ReplyToAddressCallback 只在這種情況中有用,需要根據(jù)收到的信息通過(guò)自定義邏輯來(lái)決定replyTo 地址,并在ReceiveAndReplyCallback中進(jìn)行回應(yīng)的情況. 默認(rèn)情況下,請(qǐng)求消息中的 replyTo 信息用來(lái)路由回復(fù).

    下面是一個(gè)基于POJO的接收和回復(fù)…​

    boolean received =
            this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
    
                    public Invoice handle(Order order) {
                            return processOrder(order);
                    }
            });
    if (received) {
            log.info("We received an order!");
    }

    異步消費(fèi)者

    重要 
    Spring AMQP 也支持注解監(jiān)聽(tīng)器endpoints(通過(guò)使用 @RabbitListener 注解)并提供了一個(gè)開(kāi)放的基礎(chǔ)設(shè)施,編程注冊(cè)端點(diǎn)。
    這是目前為止建立一個(gè)異步消費(fèi)者的最方便方式, 參考the section called “Annotation-driven Listener Endpoints”來(lái)了解更多詳情.
    消息監(jiān)聽(tīng)器

    對(duì)于異步消息接收, 會(huì)涉及到一個(gè)專用組件(不是AmqpTemplate).此組件可作為消息消費(fèi)回調(diào)的容器.
    稍后,我們會(huì)講解這個(gè)容器和它的屬性,但首先讓我們來(lái)看一下回調(diào),因?yàn)檫@里是你的應(yīng)用程序代碼與消息系統(tǒng)集成的地方. MessageListener 接口:

    public interface MessageListener {
        void onMessage(Message message);
    }

    如果出于任何理由,你的回調(diào)邏輯需要依賴于AMQP Channel實(shí)例,那么你可以使用ChannelAwareMessageListener. 它看起來(lái)是很相似的,但多了一個(gè)額外的參數(shù):

    public interface ChannelAwareMessageListener {
        void onMessage(Message message, Channel channel) throws Exception;
    }
    MessageListenerAdapter
    如果您希望在應(yīng)用程序邏輯和消息API之間保持嚴(yán)格的分離,則可以依賴于框架所提供的適配器實(shí)現(xiàn)。
    這是通常被稱為“消息驅(qū)動(dòng)的POJO”支持。當(dāng)使用適配器時(shí),只需要提供一個(gè)適配器本身應(yīng)該調(diào)用的實(shí)例引用即可。
    MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
        listener.setDefaultListenerMethod("myMethod");

    你也可以繼承適配器,并實(shí)現(xiàn)getListenerMethodName()方法(基于消息來(lái)動(dòng)態(tài)選擇不同的方法). 這個(gè)方法有兩個(gè)參數(shù):originalMessage 和extractedMessage后者是轉(zhuǎn)換后的結(jié)果.默認(rèn)情況下,需要配置SimpleMessageConverter ;
    參考
    the section called “SimpleMessageConverter” 來(lái)了解更多信息以及其它轉(zhuǎn)換器的信息.

    從1.4.2開(kāi)始,原始消息包含consumerQueue 和 consumerTag 屬性,這些屬性可用來(lái)確定消息是從那個(gè)隊(duì)列中收到的.

    從1.5版本開(kāi)始,你可以配置消費(fèi)者queue/tag到方法名稱的映射(map)以動(dòng)態(tài)選擇要調(diào)用的方法.如果map中無(wú)條目,我們將退回到默認(rèn)監(jiān)聽(tīng)器方法.

    容器

    你已經(jīng)看過(guò)了消息監(jiān)聽(tīng)回調(diào)上的各種各樣的選項(xiàng),現(xiàn)在我們將注意力轉(zhuǎn)向容器. 基本上,容器處理主動(dòng)(active)的職責(zé),這樣監(jiān)聽(tīng)器回調(diào)可以保持被動(dòng)(passive). 容器是“生命周期”組件的一個(gè)例子。
    它提供了啟動(dòng)和停止的方法
    .當(dāng)配置容器時(shí),你本質(zhì)上縮短了AMQP Queue和 MessageListener 實(shí)例之間的距離.你必須提供一個(gè)ConnectionFactory 的引用,隊(duì)列名稱或隊(duì)列實(shí)例.
    下面是使用默認(rèn)實(shí)現(xiàn)
    SimpleMessageListenerContainer 的最基礎(chǔ)的例子:

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setQueueNames("some.queue");
    container.setMessageListener(new MessageListenerAdapter(somePojo));

    作為一個(gè)主動(dòng)組件, 最常見(jiàn)的是使用bean定義來(lái)創(chuàng)建監(jiān)聽(tīng)器容器,這樣它就可以簡(jiǎn)單地運(yùn)行于后臺(tái).這可以通過(guò)XML來(lái)完成:

    <rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
    </rabbit:listener-container>

    或者你可以@Configuration 風(fēng)格:

    @Configuration
    public class ExampleAmqpConfiguration {
    
        @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(rabbitConnectionFactory());
            container.setQueueName("some.queue");
            container.setMessageListener(exampleListener());
            return container;
        }
    
        @Bean
    public ConnectionFactory rabbitConnectionFactory() {
            CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            return connectionFactory;
        }
    
        @Bean
    public MessageListener exampleListener() {
            returnnew MessageListener() {
                publicvoid onMessage(Message message) {
                    System.out.println("received: " + message);
                }
            };
        }
    }

    RabbitMQ Version 3.2開(kāi)始, broker支持消費(fèi)者優(yōu)先級(jí)了(參考 Using Consumer Priorities with RabbitMQ). 

    這可以通過(guò)在消費(fèi)者設(shè)置x-priority 參數(shù)來(lái)啟用. 

    SimpleMessageListenerContainer 現(xiàn)在支持設(shè)置消費(fèi)者參數(shù):

    container.setConsumerArguments(Collections.
    <String, Object> singletonMap("x-priority", Integer.valueOf(10)));

    為了方便,命名空間在listener元素上提供了priority 屬性:

    <rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
    </rabbit:listener-container>

    從1.3版本開(kāi)始,容器監(jiān)聽(tīng)的隊(duì)列可在運(yùn)行時(shí)進(jìn)行修改,參考 Section 3.1.18, “Listener Container Queues”.

    auto-delete 隊(duì)列

    當(dāng)容器配置為監(jiān)聽(tīng)auto-delete 隊(duì)列或隊(duì)列有x-expires 選項(xiàng)或者broker配置了Time-To-Live 策略,隊(duì)列將在容器停止時(shí)(最后的消費(fèi)者退出時(shí))由broker進(jìn)行刪除.
    在1.3版本之前,容器會(huì)因隊(duì)列缺失而不能重啟; 當(dāng)連接關(guān)閉/打開(kāi)時(shí),RabbitAdmin 只能自動(dòng)重新聲明隊(duì)列.

    1.3版本開(kāi)始, 在啟動(dòng)時(shí),容器會(huì)使用RabbitAdmin 來(lái)重新聲明缺失的隊(duì)列.

    您也可以使用條件聲明(the section called “Conditional Declaration”) 與auto-startup="false" 來(lái)管理隊(duì)列的延遲聲明,直到容器啟動(dòng).

    <rabbit:queue id="otherAnon" declared-by="containerAdmin" />
    <rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
    <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
    </rabbit:direct-exchange>
    <rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo"queues="otherAnon" admin="containerAdmin" />
    </rabbit:listener-container>
    <rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />

    在這種情況下,隊(duì)列和交換器是由 containerAdmin 來(lái)聲明的,auto-startup="false" 因此在上下文初始化期間不會(huì)聲明元素.同樣,出于同樣原因,容器也不會(huì)啟動(dòng).當(dāng)容器隨后啟動(dòng)時(shí),它會(huì)使用containerAdmin引用來(lái)聲明元素.

    批量消息

    批量消息會(huì)自動(dòng)地通過(guò)監(jiān)聽(tīng)器容器 (使用springBatchFormat 消息頭)來(lái)解批(de-batched). 拒絕批量消息中的任何一個(gè)都將導(dǎo)致整批消息被拒絕. 參考the section called “Batching” 來(lái)了解更多關(guān)于批量消息的詳情.


    消費(fèi)者失敗事件

    從1.5版本開(kāi)始,無(wú)論時(shí)候,當(dāng)監(jiān)聽(tīng)器(消費(fèi)者)經(jīng)歷某種失敗時(shí),SimpleMessageListenerContainer 會(huì)發(fā)布應(yīng)用程序事件. 事件ListenerContainerConsumerFailedEvent 有下面的屬性:

    • container - 消費(fèi)者經(jīng)歷問(wèn)題的監(jiān)聽(tīng)容器.
    • reason - 失敗的文本原因。
    • fatal - 一個(gè)表示失敗是否是致命的boolean值;對(duì)于非致命異常,容器會(huì)根據(jù)retryInterval嘗試重新啟動(dòng)消費(fèi)者.
    • throwable -捕捉到的Throwable.

    這些事件能通過(guò)實(shí)現(xiàn)ApplicationListener<ListenerContainerConsumerFailedEvent>來(lái)消費(fèi).

    當(dāng) concurrentConsumers 大于1時(shí),系統(tǒng)級(jí)事件(如連接失敗)將發(fā)布到所有消費(fèi)者.

    如果消費(fèi)者因隊(duì)列是專有使用而失敗了,默認(rèn)情況下,在發(fā)布事件的時(shí)候,也會(huì)發(fā)出WARN 日志. 要改變?nèi)罩拘袨?需要在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger屬性中提供自定義的ConditionalExceptionLogger.
    也可參考the section called “Logging Channel Close Events”.

    致命錯(cuò)誤都記錄在ERROR級(jí)別中,這是不可修改的。


    posted on 2016-08-13 12:38 胡小軍 閱讀(6251) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): RabbitMQ
    主站蜘蛛池模板: 国产免费MV大全视频网站| 四虎影视免费永久在线观看| 久久亚洲AV成人无码国产| 国产无遮挡又黄又爽免费网站| 亚洲欧洲久久av| 一区二区免费在线观看| 免费在线观看a级毛片| 牛牛在线精品观看免费正| 亚洲欧洲一区二区三区| a毛片成人免费全部播放| 国产特黄一级一片免费| 亚洲精品人成无码中文毛片 | 一区二区视频免费观看| 亚洲精品国自产拍在线观看| 一级毛片免费在线观看网站| 国产亚洲视频在线播放| 高清永久免费观看| 精品亚洲一区二区| 久久精品电影免费动漫| 777亚洲精品乱码久久久久久| 69式互添免费视频| 亚洲首页国产精品丝袜| 成人免费视频试看120秒| AV激情亚洲男人的天堂国语| 天堂亚洲免费视频| 中国一级特黄的片子免费 | 欧洲乱码伦视频免费国产| 亚洲国产精品狼友中文久久久| www成人免费视频| 亚洲αv在线精品糸列| 一区二区三区福利视频免费观看| 亚洲视频在线不卡| 中文字幕人成无码免费视频| 久久亚洲精品11p| 国产亚洲精品自在线观看| 国产又黄又爽胸又大免费视频 | 亚洲第一精品电影网| 噼里啪啦电影在线观看免费高清| 亚洲色大成网站www| 免费成人在线观看| 中文字幕免费人成乱码中国|