The Cafe Sample(小賣部訂餐例子)
小賣部有一個(gè)訂飲料服務(wù),客戶可以通過訂單來訂購所需要飲料。小賣部提供兩種咖啡飲料
LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)。每種又都分冷飲和熱飲
整個(gè)流程如下:
1.有一個(gè)下訂單模塊,用戶可以按要求下一個(gè)或多個(gè)訂單。
2.有一個(gè)訂單處理模塊,處理訂單中那些是關(guān)于訂購飲料的。
3.有一個(gè)飲料訂購處理模塊,處理拆分訂購的具體是那些種類的飲料,把具體需要生產(chǎn)的飲料要求發(fā)給生產(chǎn)模塊
4.有一個(gè)生產(chǎn)模塊
這個(gè)例子利用Spring Integration實(shí)現(xiàn)了靈活的,可配置化的模式集成了上述這些服務(wù)模塊。
先來看一下配置文件
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 啟動(dòng)Message bus 消息服務(wù)總線 支持四個(gè)屬性
auto-startup[boolean是否自動(dòng)啟動(dòng) default=true]如果設(shè)置false,則需要手動(dòng)調(diào)用applicationContext.start()方法
auto-create-channels[boolean是否自動(dòng)注冊(cè)MessageChannel default=false],如果使用的MessagChannle不存在
error-channel 設(shè)置錯(cuò)誤時(shí)信息發(fā)送的MessageChannle,如果不設(shè)置,則使用DefaultErrorChannel
dispatcher-pool-size 使用的啟動(dòng)線程數(shù),默認(rèn)為10-->
<message-bus/>
<!-- 啟動(dòng)支持元數(shù)據(jù)標(biāo)記 -->
<annotation-driven/>
<!-- 設(shè)置 @Component標(biāo)識(shí)的元數(shù)據(jù)掃描包(package) -->
<context:component-scan base-package="org.springframework.integration.samples.cafe"/>
<!-- 下面啟動(dòng)了四個(gè) MessageChannel服務(wù) 處理接收發(fā)送端發(fā)過來的消息和把消息流轉(zhuǎn)到消息的消費(fèi)端 -->
<!-- 屬性說明: capacity 消息最大容量默認(rèn)為100 publish-subscribe是否是發(fā)布訂閱模式,默認(rèn)為否
id bean的id名稱 datatype ? -->
<channel id="orders"/> <!-- 訂單Channel -->
<channel id="drinks"/> <!-- 飲料訂單Channel,處理飲料的類別 -->
<channel id="coldDrinks"/> <!-- 冷飲生產(chǎn)Channel -->
<channel id="hotDrinks"/> <!-- 熱飲生產(chǎn)Channel -->
<!-- 消息處理終端 接收 channel coldDrinks的消息后,執(zhí)行barista.prepareColdDrink方法 生產(chǎn)冷飲 -->
<!-- 屬性說明: input-channel 接收消息的Channel必須 default-output-channel設(shè)置默認(rèn)回復(fù)消息Channel
handler-ref 引用bean的id名稱 handler-method Handler處理方法名(參數(shù)類型必須與發(fā)送消息的payLoad使用的一致)
error-handler設(shè)置錯(cuò)誤時(shí)信息發(fā)送的MessageChannle reply-handler 消息回復(fù)的Channel -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執(zhí)行barista.prepareHotDrink方法 生產(chǎn)熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
<!-- 定義一個(gè)啟動(dòng)下定單操作的bean,它通過 channel orders下定單 -->
<beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe">
<beans:property name="orderChannel" ref="orders"/>
</beans:bean>
</beans:beans>
下面我們來看一下源代碼目錄:
我們來看一下整體服務(wù)是怎么啟動(dòng)的
首先我們來看一下CafeDemo這個(gè)類,它觸發(fā)下定單操作、
1 public class CafeDemo {
2
3 public static void main(String[] args) {
4 //加載Spring 配置文件
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //啟動(dòng) Spring容器(啟動(dòng)所有實(shí)現(xiàn) org.springframework.context.Lifecycle接口的實(shí)現(xiàn)類的start方法)
13 context.start();
14 //從Spring容器 取得cafe實(shí)例
15 Cafe cafe = (Cafe) context.getBean("cafe");
16 DrinkOrder order = new DrinkOrder();
17 //一杯熱飲 參數(shù)說明1.飲料類型 2.數(shù)量 3.是否是冷飲(true表示冷飲)
18 Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2, false);
19 Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3, true);
20 order.addDrink(hotDoubleLatte);
21 order.addDrink(icedTripleMocha);
22 //下100個(gè)訂單
23 for (int i = 0; i < 100; i++) {
24 //調(diào)用cafe的placeOrder下訂單
25 cafe.placeOrder(order);
26 }
27 }
28 }
下面是Cafe的源代碼
1 public class Cafe {
2
3 private MessageChannel orderChannel;
4
5
6 public void setOrderChannel(MessageChannel orderChannel) {
7 this.orderChannel = orderChannel;
8 }
9
10 //其實(shí)下訂單操作,調(diào)用的是orderChannel(orders channel)的send方法,把消息發(fā)出去
11 public void placeOrder(DrinkOrder order) {
12 this.orderChannel.send(new GenericMessage<DrinkOrder>(order));
13 //GenericMessage有三個(gè)構(gòu)建方法,參考如下
14 //new GenericMessage<T>(Object id, T payload);
15 //new GenericMessage<T>(T payload);
16 //new GenericMessage<T>(T payload, MessageHeader headerToCopy)
17 }
18 }
下面我們來看一下哪個(gè)類標(biāo)記有@MessageEndpoint(input="orders") 表示它會(huì)消費(fèi)orders Channel的消息
我們發(fā)現(xiàn)OrderSplitter類標(biāo)記這個(gè)元數(shù)據(jù),下面是源代碼,我們來分析
1 //標(biāo)記 MessageEndpoint 元數(shù)據(jù), input表示 設(shè)置后所有 orders Channel消息都會(huì)被OrderSplitter收到
2 @MessageEndpoint(input="orders")
3 public class OrderSplitter {
4
5 //@Splitter表示,接收消息后,調(diào)用這個(gè)類的該方法. 其的參數(shù)類型必須與message的 payload屬性一致。
6 //即在new GenericMessage<T>的泛型中指定
7 //元數(shù)據(jù)設(shè)置的 channel屬性表示,方法執(zhí)行完成后,會(huì)把方法返回的結(jié)果保存到message的payload屬性后,發(fā)送到指定的channel中去
8 //這里指定發(fā)送到 drinks channel
9 @Splitter(channel="drinks")
10 public List<Drink> split(DrinkOrder order) {
11 return order.getDrinks(); //方法中,是把訂單中的飲料訂單取出來
12 }
13 }
接下來,與找OrderSplitter方法相同,我們要找哪個(gè)類標(biāo)記有@MessageEndpoint(input="drinks") 表示它會(huì)消費(fèi)drinks Channel的消息
找到DrinkRouter這個(gè)類
1 @MessageEndpoint(input="drinks")
2 public class DrinkRouter {
3
4 //@Router表示,接收消息后,調(diào)用這個(gè)類的該方法. 其的參數(shù)類型必須與message的 payload屬性一致。
5 //方法執(zhí)行完畢后,其返回值為 在容器中定義的channel名稱。channel名稱必須存在
6 @Router
7 public String resolveDrinkChannel(Drink drink) {
8 return (drink.isIced()) ? "coldDrinks" : "hotDrinks"; //方法中,是根據(jù)處理飲料是否是冷飲,送不同的channel處理
9 }
10 }
備注:@Router可以把消息路由到多個(gè)channel,實(shí)現(xiàn)方式如下
@Router
public MessageChannel route(Message message) {
}
@Router
public List<MessageChannel> route(Message message) {
}
@Router
public String route(Foo payload) {
}
@Router
public List<String> route(Foo payload) {
}
接下來,我們就要找 MessageEndpoint 標(biāo)記為處理 "coldDrinks" 和 "hotDrinks" 的類,我們發(fā)現(xiàn)
這個(gè)兩個(gè)類并不是通過元數(shù)據(jù)@MessageEndpoint來實(shí)現(xiàn)的,而是通過容器配置
(下面會(huì)演示如何用元數(shù)據(jù)配置,但元數(shù)據(jù)配置有局限性。這兩種配置方式看大家喜好,系統(tǒng)中都是可以使用)
下面是容器配置信息:
<!-- 消息處理終端 接收 channel coldDrinks的消息后,執(zhí)行barista.prepareColdDrink方法 生產(chǎn)冷飲 -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執(zhí)行barista.prepareHotDrink方法 生產(chǎn)熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
我們來看一下源代碼:
1 @Component //這個(gè)必須要有,表示是一個(gè)消息處理組件
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 public void prepareColdDrink(Drink drink) {
31 try {
32 Thread.sleep(this.coldDrinkDelay);
33 } catch (InterruptedException e) {
34 Thread.currentThread().interrupt();
35 }
36 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
37 }
38
39 }
如果要用元數(shù)據(jù)標(biāo)識(shí)實(shí)現(xiàn)上述方法:
要用元數(shù)據(jù)配置,它不像容器配置,可以在一個(gè)類中,支持多個(gè)不同的Handler方法。以處理prepareColdDrink方法為例
1 @MessageEndpoint(input="coldDrinks") //加了該元數(shù)據(jù),它會(huì)自動(dòng)掃描,并作為@Componet標(biāo)記處理
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 @Handler//回調(diào)處理的方法
31 public void prepareColdDrink(Drink drink) {
32 try {
33 Thread.sleep(this.coldDrinkDelay);
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 }
37 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
38 }
39 }
這樣整個(gè)流程就執(zhí)行完了,最終我們的飲料產(chǎn)品就按照訂單生產(chǎn)出來了。累了吧,喝咖啡提神著呢!!!
初充:
下面是針對(duì) Spring Integration adapter擴(kuò)展的學(xué)習(xí)筆記
JMS Adapters
jms adapters 目前有兩種實(shí)現(xiàn)
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter. 前者是使用Srping的JmsTemplate模板類通過輪循的方式接收消息
后者是使用則通過代理Spring的DefaultMessageListenerContainer實(shí)例,實(shí)現(xiàn)消息驅(qū)動(dòng)的方式。
xml配置如下:
JmsPollingSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsPollingSourceAdapter">
<constructor-arg ref="jmsTemplate"/>
<property name="channel" ref="exampleChannel"/>
<property name="period" value="5000"/> <!-- 輪循時(shí)間間隔 -->
<property name="messageMapper" ref=""/> <!-- message轉(zhuǎn)換 -->
</bean>
<!-- 備注:消息的轉(zhuǎn)換方式如下:
收到JMS Message消息后,SourceAdapter會(huì)調(diào)用Spring的MessageConverter實(shí)現(xiàn)類,把javax.jms.Message對(duì)象
轉(zhuǎn)換成普通Java對(duì)象,再調(diào)用Spring Integration的MessageMapper把該對(duì)象轉(zhuǎn)成 org.springframework.integration.message.Message對(duì)象 -->
JmsMessageDrivenSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsMessageDrivenSourceAdapter">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
<property name="channel" ref="exampleChannel"/>
<property name="messageConverter" ref=""/> <!-- jms消息對(duì)象轉(zhuǎn)換 -->
<property name="messageMapper" ref="" /> <!-- 普通java對(duì)象轉(zhuǎn)換成 Spring Integration Message -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- sesssion回復(fù)模式 AUTO_ACKNOWLEDGE=1 CLIENT_ACKNOWLEDGE=2 DUPS_OK_ACKNOWLEDGE=3 SESSION_TRASACTED=0-->
</bean>
另外還有一個(gè)比較有用的類JmsTargetAdapter 它實(shí)現(xiàn)了MessageHandler接口。它提把Spring Integration Message對(duì)象轉(zhuǎn)換成
JMS消息并發(fā)送到指定的消息隊(duì)列。與JMS服務(wù)連接的實(shí)現(xiàn)可以通過設(shè)定 jmsTemplate屬性引用或是connectionFactory和destination
或destinationName屬性。
<bean class="org.springframework.integration.adapter.jms.JmsTargetAdapter">
<constructor-arg ref="connectionFactory"/>
<constructor-arg value="example.queue"/>
<!--或是以下配置
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
或是
<constructor-arg ref="jmsTemplate"/> -->
</bean>
Good Luck!
Yours Matthew!