消息中間件原理及JMS簡(jiǎn)介之二
作者:orangelizq
2.3 消息中間件的傳遞模式
消息中間件一般有兩種傳遞模型:點(diǎn)對(duì)點(diǎn)模型(PTP)和發(fā)布-訂閱模型(Pub/Sub)[2]。
1. 點(diǎn)對(duì)點(diǎn)模型(PTP)
點(diǎn)對(duì)點(diǎn)模型用于消息生產(chǎn)者和消息消費(fèi)者之間點(diǎn)到點(diǎn)的通信。消息生產(chǎn)者將消息發(fā)動(dòng)到由某個(gè)名字標(biāo)識(shí)的特定消費(fèi)者。這個(gè)名字實(shí)際上對(duì)應(yīng)于消息服務(wù)中的一個(gè)隊(duì)列(Queue),在消息傳動(dòng)給消費(fèi)者之前它被存儲(chǔ)在這個(gè)隊(duì)列中。隊(duì)列可以是持久的,以保證在消息服務(wù)出現(xiàn)故障時(shí)仍然能夠傳遞消息。
2. 發(fā)布-訂閱模型(Pub/Sub)
發(fā)布-訂閱模型用稱為主題(topic)的內(nèi)容分層結(jié)構(gòu)代替了PTP模型中的惟一目的地,發(fā)送應(yīng)用程序發(fā)布自己的消息,指出消息描述的是有關(guān)分層結(jié)構(gòu)中的一個(gè)主題的信息。希望接收這些消息的應(yīng)用程序訂閱了這個(gè)主題。訂閱包含子主題的分層結(jié)構(gòu)中的主題的訂閱者可以接收該主題和其子主題發(fā)表的所有消息。
下圖展示了發(fā)布和訂閱模型:[2]

多個(gè)應(yīng)用程序可以就一個(gè)主題發(fā)布和訂閱消息,而應(yīng)用程序?qū)ζ渌巳匀皇悄涿摹?/span>MOM 起著代理(broker)的作用,將一個(gè)主題已發(fā)表的消息路由給該主題的所有訂閱者。
2.4 消息中間件產(chǎn)品與JMS
從上個(gè)世紀(jì)90年代初,隨著不同廠商消息中間件大量上市,消息中間件技術(shù)得到了長(zhǎng)足的發(fā)展。目前,IBM和BEA的中間件產(chǎn)品在銀行、證券、電信等高端行業(yè),以及IT等行業(yè)中得到廣泛應(yīng)用。IBM憑借其在1999年推出的應(yīng)用服務(wù)器WebSphere,扎根金融、證券等行業(yè),在超大型以及系統(tǒng)整合型應(yīng)用方面優(yōu)勢(shì)突出;BEA則是專門從事中間件開(kāi)發(fā)的公司,它的應(yīng)用服務(wù)器WebLogic在美國(guó)市場(chǎng)占有率超過(guò)60%,在國(guó)內(nèi)電信及證券行業(yè)占據(jù)主要地位;Sun、Oracle、Sybase和Borland等廠商也都有自己的應(yīng)用服務(wù)器;近年來(lái),以金蝶、東方通等公司為代表的國(guó)產(chǎn)中間件產(chǎn)品也發(fā)展迅速。[3]
由于沒(méi)有統(tǒng)一的規(guī)范和標(biāo)準(zhǔn),基于消息中間件的應(yīng)用不可移植,不同的消息中間件也不能互操作,這大大阻礙了消息中間件的發(fā)展。 Java Message Service(JMS, Java消息服務(wù))是SUN及其伙伴公司提出的旨在統(tǒng)一各種消息中間件系統(tǒng)接口的規(guī)范。它定義了一套通用的接口和相關(guān)語(yǔ)義,提供了諸如持久、驗(yàn)證和事務(wù)的消息服務(wù),它最主要的目的是允許Java應(yīng)用程序訪問(wèn)現(xiàn)有的消息中間件。JMS規(guī)范沒(méi)有指定在消息節(jié)點(diǎn)間所使用的通訊底層協(xié)議,來(lái)保證應(yīng)用開(kāi)發(fā)人員不用與其細(xì)節(jié)打交道,一個(gè)特定的JMS實(shí)現(xiàn)可能提供基于TCP/IP、HTTP、UDP或者其它的協(xié)議。
目前許多廠商采用并實(shí)現(xiàn)了JMS API,現(xiàn)在,JMS產(chǎn)品能夠?yàn)槠髽I(yè)提供一套完整的消息傳遞功能,下面是一些比較流行的JMS商業(yè)軟件和開(kāi)源產(chǎn)品。
1.IBM MQSeries
IBM MQ系列產(chǎn)品提供的服務(wù)使得應(yīng)用程序可以使用消息隊(duì)列進(jìn)行相互交流,通過(guò)一系列基于Java的API,提供了MQSeries在Java中應(yīng)用開(kāi)發(fā)的方法。它支持點(diǎn)到點(diǎn)和發(fā)布/訂閱兩種消息模式,在基本消息服務(wù)的基礎(chǔ)上增加了結(jié)構(gòu)化消息類,通過(guò)工作單元提供數(shù)據(jù)整合等內(nèi)容。
2.WebLogic
WebLogic是BEA公司實(shí)現(xiàn)的基于工業(yè)標(biāo)準(zhǔn)的J2EE應(yīng)用服務(wù)器,支持大多數(shù)企業(yè)級(jí)JavaAPI,它完全兼容JMS規(guī)范,支持點(diǎn)到點(diǎn)和發(fā)布/訂閱消息模式,它具有以下一些特點(diǎn):
1) 通過(guò)使用管理控制臺(tái)設(shè)置JMS配置信息;
2) 支持消息的多點(diǎn)廣播;
3) 支持持久消息存儲(chǔ)的文件和數(shù)據(jù)庫(kù);
4) 支持XML消息,動(dòng)態(tài)創(chuàng)建持久隊(duì)列和主題。
3.SonicMQ
SonicMQ是Progress公司實(shí)現(xiàn)的JMS產(chǎn)品。除了提供基本的消息驅(qū)動(dòng)服務(wù)之外,SonicMQ也提供了很多額外的企業(yè)級(jí)應(yīng)用開(kāi)發(fā)工具包,它具有以下一些基本特征:
1) 提供JMS規(guī)范的完全實(shí)現(xiàn),支持點(diǎn)到點(diǎn)消息模式和發(fā)布/訂閱消息模式;
2) 支持層次安全管理;
3) 確保消息在Internet上的持久發(fā)送;
4) 動(dòng)態(tài)路由構(gòu)架(DRA)使企業(yè)能夠通過(guò)單個(gè)消息服務(wù)器動(dòng)態(tài)的交換消息;
5) 支持消息服務(wù)器的集群。
4.Active MQ
Active MQ是一個(gè)基于Apcache 2.0 licenced發(fā)布,開(kāi)放源碼的JMS產(chǎn)品。其特點(diǎn)為:
1) 提供點(diǎn)到點(diǎn)消息模式和發(fā)布/訂閱消息模式;
2) 支持JBoss、Geronimo等開(kāi)源應(yīng)用服務(wù)器,支持Spring框架的消息驅(qū)動(dòng);
3) 新增了一個(gè)P2P傳輸層,可以用于創(chuàng)建可靠的P2P JMS網(wǎng)絡(luò)連接;
4) 擁有消息持久化、事務(wù)、集群支持等JMS基礎(chǔ)設(shè)施服務(wù)。
5.OpenJMS
OpenJMS是一個(gè)開(kāi)源的JMS規(guī)范的實(shí)現(xiàn),它包含以下幾個(gè)特征:
1) 它支持點(diǎn)到點(diǎn)模型和發(fā)布/訂閱模型;
2) 支持同步與異步消息發(fā)送;
3) 可視化管理界面,支持Applet;
4) 能夠與Jakarta Tomcat這樣的Servlet容器結(jié)合;
5) 支持RMI、TCP、HTTP與SSL協(xié)議。
三、消息中間件應(yīng)用之JMS
3.1 JMS簡(jiǎn)介
Java Message Service 規(guī)范 1.1 聲稱:JMS 是一組接口和相關(guān)語(yǔ)義,它定義了 JMS 客戶如何訪問(wèn)企業(yè)消息產(chǎn)品的功能。
在 JMS 之前,每一家 MOM 廠商都用專有 API 為應(yīng)用程序提供對(duì)其產(chǎn)品的訪問(wèn),通常可用于許多種語(yǔ)言,其中包括 Java 語(yǔ)言。JMS 通過(guò) MOM 產(chǎn)品為 Java 程序提供了一個(gè)發(fā)送和接收消息的標(biāo)準(zhǔn)的、便利的方法。用 JMS 編寫(xiě)的程序可以在任何實(shí)現(xiàn) JMS 標(biāo)準(zhǔn)的 MOM 上運(yùn)行。
JMS 可移植性的關(guān)鍵在于:JMS API 是由 Sun 作為一組接口而提供的。提供了 JMS 功能的產(chǎn)品是通過(guò)提供一個(gè)實(shí)現(xiàn)這些接口的提供者來(lái)做到這一點(diǎn)的。開(kāi)發(fā)人員可以通過(guò)定義一組消息和一組交換這些消息的客戶機(jī)應(yīng)用程序建立 JMS 應(yīng)用程序。[4]
JMS1.0版本于1998年推出,最新的版本是2002發(fā)布的JMS 1.1規(guī)范。JMS支持消息中間件的兩種傳遞模式:點(diǎn)到點(diǎn)模式和發(fā)布-訂閱模式。在JMS 1.1以前的版本中,每一種都有自己的特定于該模式的一組客戶機(jī)接口。JMS1.1版本提供了單一的一組接口,它允許客戶機(jī)可以在兩個(gè)模式中發(fā)送和接收消息。這些“模式無(wú)關(guān)的接口”保留了每一個(gè)模式的語(yǔ)義和行為,是實(shí)現(xiàn) JMS 客戶機(jī)的最好選擇。
統(tǒng)一模式的好處是:
1) 使得用于客戶機(jī)的編程更簡(jiǎn)單。
2) 隊(duì)列和主題的操作可以是同一事務(wù)的一部分。
3) 為JMS提供者提供了優(yōu)化其實(shí)現(xiàn)的機(jī)會(huì)。
3.2 JMS體系結(jié)構(gòu)
3.2.1 JMS接口描述
JMS 支持兩種消息類型PTP 和Pub/Sub,分別稱作:PTP Domain 和Pub/Sub Domain,這兩種接口都繼承統(tǒng)一的JMS Parent 接口,JMS 主要接口如下所示:
JMS Parent
|
PTP Domain
|
Pub/Sub Domain
|
ConnectionFactory
|
QueueConnectionFactory
|
TopicConnectionFactory
|
Connection
|
QueueConnection
|
TopicConnection
|
Destination
|
Queue
|
Topic
|
Session
|
QueueSession
|
TopicSession
|
MessageProducer
|
QueueSender
|
TopicPublisher
|
MessageConsumer
|
QueueReceiver
|
TopicSubscriber
|
以下是對(duì)這些接口的簡(jiǎn)單描述:
ConnectionFactory:連接工廠,JMS 用它創(chuàng)建連接
Connection:JMS 客戶端到JMS Provider 的連接
Destination:消息的目的地
Session:一個(gè)發(fā)送或接收消息的線程
MessageProducer: 由Session 對(duì)象創(chuàng)建的用來(lái)發(fā)送消息的對(duì)象
MessageConsumer: 由Session 對(duì)象創(chuàng)建的用來(lái)接收消息的對(duì)象
3.2.2 JMS消息模型
JMS 消息由以下幾部分組成:消息頭,屬性,消息體。[4]
l 消息頭(header):JMS消息頭包含了許多字段,它們是消息發(fā)送后由JMS提供者或消息發(fā)送者產(chǎn)生,用來(lái)表示消息、設(shè)置優(yōu)先權(quán)和失效時(shí)間等等,并且為消息確定路由。
l 屬性(property):由消息發(fā)送者產(chǎn)生,用來(lái)添加刪除消息頭以外的附加信息。
l 消息體(body):由消息發(fā)送者產(chǎn)生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
3.3 JMS編程實(shí)踐
廣義上說(shuō),一個(gè)JMS應(yīng)用是幾個(gè)JMS 客戶端交換消息,開(kāi)發(fā)JMS客戶端應(yīng)用由以下幾步構(gòu)成:[2]
1) 用JNDI 得到ConnectionFactory對(duì)象;
2) 用JNDI 得到目標(biāo)隊(duì)列或主題對(duì)象,即Destination對(duì)象;
3) 用ConnectionFactory創(chuàng)建Connection 對(duì)象;
4) 用Connection對(duì)象創(chuàng)建一個(gè)或多個(gè)JMS Session;
5) 用Session 和Destination 創(chuàng)建MessageProducer和MessageConsumer;
6) 通知Connection 開(kāi)始傳遞消息。
3.3.1 消息生產(chǎn)者編程
消息生產(chǎn)者程序如下:[2]
package org.jms.test;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Sender {
public static void main(String[] args) {
new Sender().send();
}
public void send() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(queue);
//Send messages
String messageText = null;
while (true) {
System.out.println("Enter message to send or 'quit':");
messageText = reader.readLine();
if ("quit".equals(messageText))
break;
TextMessage message = session.createTextMessage(messageText);
sender.send(message);
}
//Exit
System.out.println("Exiting...");
reader.close();
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
3.3.2 消息消費(fèi)者編程
消息消費(fèi)者程序如下:[2]
package compute;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Receiver implements MessageListener {
private boolean stop = false;
public static void main(String[] args) {
new Receiver().receive();
}
public void receive() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
//Prompt for JNDI names
System.out.println("Enter ConnectionFactory name:");
String factoryName = reader.readLine();
System.out.println("Enter Destination name:");
String destinationName = reader.readLine();
reader.close();
//Look up administered objects
InitialContext initContext = new InitialContext();
ConnectionFactory factory =
(ConnectionFactory) initContext.lookup(factoryName);
Destination destination = (Destination) initContext.lookup(destinationName);
initContext.close();
//Create JMS objects
Connection connection = factory.createConnection();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(queue);
receiver.setMessageListener(this);
connection.start();
//Wait for stop
while (!stop) {
Thread.sleep(1000);
}
//Exit
System.out.println("Exiting...");
connection.close();
System.out.println("Goodbye!");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void onMessage(Message message) {
try {
String msgText = ((TextMessage) message).getText();
System.out.println(msgText);
if ("stop".equals(msgText))
stop = true;
} catch (JMSException e) {
e.printStackTrace();
stop = true;
}
}
}
以上程序都較為簡(jiǎn)單,基本上為自解釋的。
四、消息中間件總結(jié)
消息中間件自從產(chǎn)生以來(lái)發(fā)展迅速,在分布式聯(lián)機(jī)事務(wù)處理環(huán)境中,它擔(dān)當(dāng)通訊資源管理器(CRM)的角色,為分布式應(yīng)用提供實(shí)時(shí)、高效、可靠的、跨越不同操作系統(tǒng)、不同網(wǎng)絡(luò)的消息傳遞服務(wù),同時(shí)消息中間件減少了開(kāi)發(fā)跨平臺(tái)應(yīng)用程序的復(fù)雜性。在要求可靠傳輸?shù)南到y(tǒng)中可以利用消息中間件作為一個(gè)通訊平臺(tái),向應(yīng)用提供可靠傳輸功能來(lái)傳遞消息和文件。
參考文獻(xiàn)
[1] 李華琰、郭英奎 著:Java 中間件開(kāi)發(fā)技術(shù) . 中國(guó)水利水電出版社,2005
[2] JMS在線教程 . 2004 http://www6.software.ibm.com/developerworks/cn/education/java/j-jms
[3] 基于JMS的消息中間件的研究和設(shè)計(jì) . 姚剛. 2006
[4] 消息中間件和JMS . 2005. http://www.huihoo.org/jfox/jfoxmq/mom_jms.html
[5] 基于JMS的數(shù)據(jù)匯集系統(tǒng)的研究與實(shí)現(xiàn). 2006.
http://www.cndw.com/tech/program/2006042760285.asp
Author: orangelizq
email: orangelizq@163.com

歡迎大家訪問(wèn)我的個(gè)人網(wǎng)站
萌萌的IT人
posted on 2008-01-27 16:17
桔子汁 閱讀(21412)
評(píng)論(2) 編輯 收藏 所屬分類:
J2EE