?
JMS
(
Java Message Service
,
Java消息服務(wù)
)是一組Java應(yīng)用
程序
接口(Java
API
),它提供創(chuàng)建、發(fā)送、接收、讀取
消息
的服務(wù)。由Sun公司和它的合作伙伴設(shè)計的JMS API定義了一組公共的應(yīng)用程序接口和相應(yīng)語法,使得Java程序能夠和其他消息
組件
進(jìn)行通信。
????? JMS是一種與廠商無關(guān)的 API,用來訪問消息收發(fā)系統(tǒng)。它
類
似于
JDBC
(Java
Database
Connectivity):這里,JDBC 是可以用來訪問許多不同關(guān)系
數(shù)據(jù)庫
的 API,而 JMS 則提供同樣與廠商無關(guān)的訪問方法,以訪問消息收發(fā)服務(wù)。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。
JMS 使您能夠通過消息收發(fā)服務(wù)(有時稱為消息中介程序或路由器)從一個 JMS 客戶機(jī)向另一個 JML 客戶機(jī)發(fā)送消息。消息是 JMS 中的一種
類型
對象
,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關(guān)該消息的
元數(shù)據(jù)
組成。消息主體則攜帶著應(yīng)用程序的數(shù)據(jù)或有效負(fù)載。根據(jù)有效負(fù)載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本 (TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節(jié)流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負(fù)載的消息 (Message)。
????? 消息收發(fā)系統(tǒng)是異步的,也就是說,JMS 客戶機(jī)可以發(fā)送消息而不必等待回應(yīng)。比較可知,這完全不同于基于 RPC 的(基于遠(yuǎn)程過程的)系統(tǒng),如 EJB 1.1、CORBA 和 Java RMI 的引用實現(xiàn)。在 RPC 中,客戶機(jī)調(diào)用服務(wù)器上某個分布式對象的一個方法。在方法調(diào)用返回之前,該客戶機(jī)被阻塞;該客戶機(jī)在可以執(zhí)行下一條指令之前,必須等待方法調(diào)用結(jié)束。在 JMS 中,客戶機(jī)將消息發(fā)送給一個虛擬通道(主題或隊列),而其它 JMS 客戶機(jī)則預(yù)訂或監(jiān)聽這個虛擬通道。當(dāng) JMS 客戶機(jī)發(fā)送消息時,它并不等待回應(yīng)。它執(zhí)行發(fā)送操作,然后繼續(xù)執(zhí)行下一條指令。消息可能最終轉(zhuǎn)發(fā)到一個或許多個客戶機(jī),這些客戶機(jī)都不需要作出回應(yīng)。
JMS的通用接口集合以異步方式發(fā)送或接收消息。異步方式接收消息顯然是使用間斷網(wǎng)絡(luò)連接的客戶機(jī),諸如移動電話和PDA的最好的選擇。另外, JMS采用一種寬松結(jié)合方式整合企業(yè)系統(tǒng)的方法,其主要的目的就是創(chuàng)建能夠使用跨平臺數(shù)據(jù)信息的、可移植的企業(yè)級應(yīng)用程序,而把開發(fā)人力解放出來。
Java消息服務(wù)支持兩種消息模型:Point-to-Point消息(P2P)和發(fā)布訂閱消息(Publish Subscribe messaging,簡稱Pub/Sub)。JMS規(guī)范并不要求供應(yīng)商同時支持這兩種消息模型,但開發(fā)者應(yīng)該熟悉這兩種消息模型的優(yōu)勢與缺點。
P2P消息模型是在點對點之間傳遞消息時使用。如果應(yīng)用程序開發(fā)者希望每一條消息都能夠被處理,那么應(yīng)該使用P2P消息模型。與Pub/Sub消息模型不同,P2P消息總是能夠被傳送到指定的位置。
Pub/Sub模型在一到多的消息廣播時使用。如果一定程度的消息傳遞的不可靠性可以被接受的話,那么應(yīng)用程序開發(fā)者也可以使用Pub/Sub消息模型。換句話說,它適用于所有的消息消費程序并不要求能夠收到所有的信息或者消息消費程序并不想接收到任何消息的情況。
JMS通過允許創(chuàng)建持久訂閱來簡化時間相關(guān)性,即使消息預(yù)訂者未激活也可以接收到消息。此外,使用持久訂閱還可通過隊列提供靈活性和可靠性,而仍然允許消息被發(fā)給許多的接收者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic, subscriptionName); Connection對象表示了到兩種消息模型中的任一種的消息系統(tǒng)的連接。服務(wù)器端和客戶機(jī)端對象要求管理創(chuàng)建的JMS連接的狀態(tài)。連接是由Connection Factory創(chuàng)建的并且通過JNDI查尋定位。 //取得用于 P2P的 QueueConnectionFactory QueueConnectionFactory = queueConnectionFactory( ); Context messaging = new InitialContext( ); QueueConnectionFactory = (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”); //取得用于 pub/sub的 TopicConnectionFactory TopicConnectonFactory topicConnectionFactory; Context messaging = new InitialContext(); topicConnectionFactory = (TopicConnectionFactory) messaging.lookup(“TopicConnectionFactory”); 注意:用于P2P的代碼和用于PublishSubscribe的代碼非常相似。
如果session被標(biāo)記為transactional的話,確認(rèn)消息就通過確認(rèn)和校正來自動地處理。如果session沒有標(biāo)記為 transactional,你有三個用于消息確認(rèn)的選項。
· AUTO_ACKNOWLEDGE session將自動地確認(rèn)收到一則消息。
· CLIENT_ACKNOWLEDGE 客戶端程序?qū)⒋_認(rèn)收到一則消息,調(diào)用這則消息的確認(rèn)方法。 · DUPS_OK_ACKNOWLEDGE 這個選項命令session“懶散的”確認(rèn)消息傳遞,可以想到,這將導(dǎo)致消息提供者傳遞的一些復(fù)制消息可能會出錯。這種確認(rèn)的方式只應(yīng)當(dāng)用于消息消費程序可以容忍潛在的副本消息存在的情況。 queueSession = queueConnection.createQueueSession(false, session.AUTO_ACKNOWLEDGE);//P2P topicSession = topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); //Pub-Sub
注意:在本例中,一個session目的從連結(jié)中創(chuàng)建,非值指出session是non-transactional的,并且 session將自動地確認(rèn)收到一則消息。
JMS現(xiàn)在有兩種傳遞消息的方式。標(biāo)記為NON_PERSISTENT的消息最多投遞一次,而標(biāo)記為PERSISTENT的消息將使用暫存后再轉(zhuǎn)送的機(jī)理投遞。如果一個JMS服務(wù)離線,那么持久性消息不會丟失但是得等到這個服務(wù)恢復(fù)聯(lián)機(jī)時才會被傳遞。所以默認(rèn)的消息傳遞方式是非持久性的。即使使用非持久性消息可能降低內(nèi)務(wù)和需要的存儲器,并且這種傳遞方式只有當(dāng)你不需要接收所有的消息時才使用。
雖然 JMS規(guī)范并不需要JMS供應(yīng)商實現(xiàn)消息的優(yōu)先級路線,但是它需要遞送加快的消息優(yōu)先于普通級別的消息。JMS定義了從0到9的優(yōu)先級路線級別,0是最低的優(yōu)先級而9則是最高的。更特殊的是0到4是正常優(yōu)先級的變化幅度,而5到9是加快的優(yōu)先級的變化幅度。舉例來說: topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub 或 queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);//P2P 這個代碼片斷,有兩種消息模型,映射遞送方式是持久的,優(yōu)先級為加快型,生存周期是10000 (以毫秒度量 )。如果生存周期設(shè)置為零,這則消息將永遠(yuǎn)不會過期。當(dāng)消息需要時間限制否則將使其無效時,設(shè)置生存周期是有用的。
JMS定義了五種不同的消息正文格式,以及調(diào)用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級別的兼容性。
· StreamMessage -- Java原始值的數(shù)據(jù)流
· MapMessage--一套名稱-值對
· TextMessage--一個字符串對象
· ObjectMessage--一個序列化的 Java對象
· BytesMessage--一個未解釋字節(jié)的數(shù)據(jù)流
JMS應(yīng)用程序接口提供用于創(chuàng)建每種類型消息和設(shè)置荷載的方法例如,為了在一個隊列創(chuàng)建并發(fā)送一個TextMessage實例,你可以使用下列語句: TextMessage message = queueSession.createTextMessage(); message.setText(textMsg); 以異步方式接收消息,需要創(chuàng)建一個消息監(jiān)聽器然后注冊一個或多個使用MessageConsumer的JMS MessageListener接口。會話(主題或隊列)負(fù)責(zé)產(chǎn)生某些消息,這些消息被傳送到使用onMessage方法的監(jiān)聽者那里。 import javax.jms.*; public class ExampleListener implements MessageListener { //把消息強(qiáng)制轉(zhuǎn)化為TextMessage格式 public void onMessage(Message message) { TextMessage textMsg = null; // 打開并處理這段消息 } } 當(dāng)我們創(chuàng)建QueueReceiver和TopicSubscriber時,我們傳遞消息選擇器字符串: //P2P QueueReceiver QueueReceiver receiver; receiver = session.createReceiver(queue, selector); //Pub-Sub TopicSubscriber TopicSubscriber subscriber; subscriber = session.createSubscriber(topic, selector); 為了啟動消息的交付,不論是Pub/Sub還是P2P,都需要調(diào)用start方法。 TopicConnection.start( ); //pub-sub QueueConnection.start( ); //P2P TopicConnection.start ( );// pub-sub QueueConnection.start ( );// P2P
當(dāng)一條消息被捕捉時,這條消息做為一條必須被強(qiáng)制轉(zhuǎn)化為適當(dāng)消息類型的普通Message對象到達(dá)。這是一個被用來提取或打開消息內(nèi)容的getter方法。下列代碼片段使用StreamMessage類型。 private void unPackMessage (Message message) { String eName; String position; double rate; StreamMessage message; Message = session.createStreamMessage( ); //注意下面的代碼必須按照我給出的順序書寫 message.writeString(eName); message.writeString(position); message.writeDouble(rate); //實現(xiàn)處理消息的必要的程序邏輯 }
停止消息的傳遞,無論是Pub/Sub還是P2P,都調(diào)用stop方法。 TopicConnection.start( ); //pub-sub QueueConnection.start( ); //P2P TopicConnection.start ( );// pub-sub QueueConnection.start ( );// P2P 其他的J2EE組件--servlet或EJB--可以當(dāng)作消息生產(chǎn)者;然而,它們可能只能同步操作,這可能是因為它們的請求-應(yīng)答的性質(zhì)決定的。雖然XML目前還不是被支持的消息類型,發(fā)送一個XML文件和創(chuàng)建一條文本類型消息以及把XML文件添加到消息的有效負(fù)載都一樣簡單,都是以非專有的方式傳送數(shù)據(jù)。值得注意的是,一些JMS供應(yīng)廠商已經(jīng)提供了可用的XML消息類型。但是使用非標(biāo)準(zhǔn)的消息類型可能會出現(xiàn)可移植性問題。 String reportData; //reportData內(nèi)容為XML 文檔 TextMessage message; message = session.createTextMessage(); message.setText (reportData);
消息驅(qū)動組件(MDB)是一個當(dāng)消息到達(dá)時被容器調(diào)用的異步消息消費程序。和entity和session EJB不同,MDB沒有本地和遠(yuǎn)程接口并且是匿名的;它們對于客戶是不可見的。MDB是JMS系統(tǒng)的一部分,作為消費者實現(xiàn)服務(wù)器上的商業(yè)邏輯程序。 一個客戶程序可能通過使用JNDI定位一個與MDB相關(guān)聯(lián)的JMS。 例如: Context initialContext = new InitialContext(); Queue reportInfoQueue = (javax.jms.Queue)initialContext.lookup (“java:comp/env/jms/reportInfoQueue”); MDB是由Bean類和相應(yīng)的XML部署描述符組成。 Bean 類實現(xiàn)MessageDriveBean 接口: import javax.ejb.*; import jms.Message.*; public interface MessageDriveBean { public void ejbCreate(); public void ejbRemove(); public void setMessageDrivenContext(MessageDrivenContext ctx); } 消息監(jiān)聽器接口: import javax.jms.*; public interface MessageListener { public void onMessage( ); }
部署描述符 <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/j2ee/dtds/ejb-jar_2_0.dtd"> <ejb-jar> <enterprise-beans> <message-driven> <ejb-name>MDB</ejb-name> <ejb-class>MDB</ejb-class> <transaction-type>Container</transaction-type> <message-driven-destination> <jms-destination-type>javax.jms.Queue</jms-destination-type> </message-driven-destination> <security-identity> <run-as-specified-identity> <role-name>everyone</role-name> </run-as-specified-identity> </security-identity> </message-driven> </enterprise-beans> </ejb-jar>
? 既然我們現(xiàn)在已經(jīng)有了一些基本的JMS知識,那么我們可以使用JMS做什么呢?任何事情都可以。
? 例如,分別用于銷售、庫存、客戶服務(wù)和賬目處理的系統(tǒng)。這些部門之間的系統(tǒng)很可能已經(jīng)存在了很長時間,這些處理要求把事務(wù)移動到系統(tǒng)中去,這并不是一個小的工作。這就是消息服務(wù)適用的地點。
當(dāng)售貨員完成銷售的時候,一條消息被發(fā)給庫存系統(tǒng);一旦訂單消息發(fā)送給收發(fā)貨人員,就可以按照訂單出貨了。當(dāng)訂單成功地發(fā)貨,系統(tǒng)將通知顧客服務(wù)和會計系統(tǒng)這個訂單已經(jīng)成功的交易了。所有對應(yīng)的每個子系統(tǒng)都自動地根據(jù)收到的消息進(jìn)行更新。
JMS一般都不是用來整合一個系統(tǒng),而是整合許多可能參與消息驅(qū)動環(huán)境的系統(tǒng)。JMS是一個用于開發(fā)和集成企業(yè)應(yīng)用程序的重要的工具。因為許多公司都有以前遺留下來的系統(tǒng)和新近開發(fā)的系統(tǒng)綜合起來的系統(tǒng),消息的使用是整合整個企業(yè)的重要的步驟。
JMS接口描述
????? JMS 支持兩種消息類型PTP 和Pub/Sub,分別稱作:PTP Domain 和Pub/Sub Domain,這兩種接口都繼承統(tǒng)一的JMS Parent 接口,JMS 主要接口如下所示:
JMS Parent?? |
?PTPDomain |
Pub/Sub Domain
|
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver,QueueBrowser |
TopicSubscriber |
????? 以下是對這些接口的簡單描述:
??? ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接
??? Connection :JMS 客戶端到JMS Provider 的連接
??? Destination :消息的目的地
??? Session: 一個發(fā)送或接收消息的線程
??? MessageProducer: 由Session 對象創(chuàng)建的用來發(fā)送消息的對象
??? MessageConsumer: 由Session 對象創(chuàng)建的用來接收消息的對象
?
JMS消息模型
JMS 消息由以下幾部分組成:消息頭,屬性,消息體。
??? 消息頭(Header) - 消息頭包含消息的識別信息和路由信息,消息頭包含一些標(biāo)準(zhǔn)的屬性如:JMSDestination,JMSMessageID 等。
消息頭
|
由誰設(shè)置
|
JMSDestination |
send 或 publish 方法 |
JMSDeliveryMode |
send 或 publish 方法 |
JMSExpiration |
send 或 publish 方法 |
JMSPriority |
send 或 publish 方法 |
JMSMessageID |
send 或 publish 方法 |
JMSTimestamp |
send 或 publish 方法 |
JMSCorrelationID |
客戶 |
JMSReplyTo |
客戶 |
JMSType |
客戶 |
JMSRedelivered |
JMS Provider |
????? 屬性(Properties) - 除了消息頭中定義好的標(biāo)準(zhǔn)屬性外,JMS 提供一種機(jī)制增加新屬性到消息頭中,這種新屬性包含以下幾種:
??? 1. 應(yīng)用需要用到的屬性;
??? 2. 消息頭中原有的一些可選屬性;
??? 3. JMS Provider 需要用到的屬性。
??? 標(biāo)準(zhǔn)的JMS 消息頭包含以下屬性:
?? JMSDestination --消息發(fā)送的目的地
?? JMSDeliveryMode --傳遞模式, 有兩種模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示該消息一定要被送到目的地,否則會導(dǎo)致應(yīng)用錯誤。NON_PERSISTENT 表示偶然丟失該消息是被允許的,這兩種模式使開發(fā)者可以在消息傳遞的可靠性和吞吐量之間找到平衡點。
?? JMSMessageID 唯一識別每個消息的標(biāo)識,由JMS Provider 產(chǎn)生。
?? JMSTimestamp 一個消息被提交給JMS Provider 到消息被發(fā)出的時間。
?? JMSCorrelationID 用來連接到另外一個消息,典型的應(yīng)用是在回復(fù)消息中連接到原消息。
?? JMSReplyTo 提供本消息回復(fù)消息的目的地址。
?? JMSRedelivered 如果一個客戶端收到一個設(shè)置了JMSRedelivered 屬性的消息,則表示可能該客戶端曾經(jīng)在早些時候收到過該消息,但并沒有簽收(acknowledged)。
?? JMSType 消息類型的識別符。
?? JMSExpiration 消息過期時間,等于QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上發(fā)送時刻的GMT 時間值。如果timeToLive值等于零,則JMSExpiration 被設(shè)為零,表示該消息永不過期。如果發(fā)送后,在消息過期時間之后消息還沒有被發(fā)送到目的地,則該消息被清除。
?? JMSPriority 消息優(yōu)先級,從0-9 十個級別,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 嚴(yán)格按照這十個優(yōu)先級發(fā)送消息,但必須保證加急消息要先于普通消息到達(dá)。
??? 消息體(Body) - JMS API 定義了5種消息體格式,也叫消息類型,你可以使用不同形式發(fā)送接收數(shù)據(jù)并可以兼容現(xiàn)有的消息格式,下面描述這5種類型:
消息類型
|
消息體
|
TextMessage |
java.lang.String對象,如xml文件內(nèi)容 |
MapMessage |
名/值對的集合,名是String對象,值類型可以是Java任何基本類型 |
BytesMessage |
字節(jié)流 |
StreamMessage |
Java中的輸入輸出流 |
ObjectMessage |
Java中的可序列化對象 |
Message |
沒有消息體,只有消息頭和屬性。 |
下例演示創(chuàng)建并發(fā)送一個TextMessage到一個隊列:
?
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);
?
下例演示接收消息并轉(zhuǎn)換為合適的消息類型:
?
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}
posted on 2007-01-18 14:38
???MengChuChen 閱讀(782)
評論(0) 編輯 收藏 所屬分類:
webotx