Posted on 2011-08-28 23:40
匆匆過客 閱讀(5292)
評論(0) 編輯 收藏 所屬分類:
Java
這個方法還有待研究,目前還有如下幾個疑點:1. ActiveMQ 報出這樣的信息:
INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info2. 這種以異步方式傳送資料,能保證客戶端能以正確的順序接收到文件段麼?
使用ActiveMQ傳送文件,發送端必須將文件拆成一段一段,每段封裝在獨立的Message中,逐次發送到客戶端。例如下面的例子,Producer通過發送命令,告訴文件傳送的開始,發送中,結束。客戶端接收到這些命令之后,就知道如何接收資料了。
客戶端收到內容后,根據命令將內容合并到一個文件中。
package org.apache.activemq.exchange.file;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
/**
* @param args
*/
public static void main(String[] args) throws JMSException, IOException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("EXCHANGE.FILE");
MessageConsumer consumer = session.createConsumer(destination);
boolean appended = false;
try {
while (true) {
Message message = consumer.receive(5000);
if (message == null) {
continue;
}
if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage) message;
String command = streamMessage.getStringProperty("COMMAND");
if ("start".equals(command)) {
appended = false;
continue;
}
if ("sending".equals(command)) {
byte[] content = new byte[4096];
String file_name = message.getStringProperty("FILE_NAME");
BufferedOutputStream bos = null;
bos = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
if (!appended) {
appended = true;
}
while (streamMessage.readBytes(content) > 0) {
bos.write(content);
}
bos.close();
continue;
}
if ("end".equals(command)) {
appended = false;
continue;
}
}
}
} catch (JMSException e) {
throw e;
} finally {
if (connection != null) {
connection.close();
}
}
}
}
發送端將文件分包,逐次發送到客戶端
package org.apache.activemq.exchange.file;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publisher {
public static String FILE_NAME = "01.mp3";
public static void main(String[] args) throws JMSException, IOException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("EXCHANGE.FILE");
MessageProducer producer = session.createProducer(destination);
long time = System.currentTimeMillis();
//通知客戶端開始接受文件
StreamMessage message = session.createStreamMessage();
message.setStringProperty("COMMAND", "start");
producer.send(message);
//開始發送文件
byte[] content = new byte[4096];
InputStream ins = Publisher.class.getResourceAsStream(FILE_NAME);
BufferedInputStream bins = new BufferedInputStream(ins);
while (bins.read(content) > 0) {
//
message = session.createStreamMessage();
message.setStringProperty("FILE_NAME", FILE_NAME);
message.setStringProperty("COMMAND", "sending");
message.clearBody();
message.writeBytes(content);
producer.send(message);
}
bins.close();
ins.close();
//通知客戶端發送完畢
message = session.createStreamMessage();
message.setStringProperty("COMMAND", "end");
producer.send(message);
connection.close();
System.out.println("Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
}
}