<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    天道酬勤

    點點滴滴的足跡,幻化成漫天的云彩
    posts - 22, comments - 0, trackbacks - 0, articles - 2
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    ActiveMQ之三 -- 使用ActiveMQ來傳送文件

    Posted on 2011-08-28 23:40 匆匆過客 閱讀(5292) 評論(0)  編輯  收藏 所屬分類: Java
    這個方法還有待研究,目前還有如下幾個疑點:
    1. ActiveMQ 報出這樣的信息:
    INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
    destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info
    2. 這種以異步方式傳送資料,能保證客戶端能以正確的順序接收到文件段麼?

    使用ActiveMQ傳送文件,發送端必須將文件拆成一段一段,每段封裝在獨立的Message中,逐次發送到客戶端。例如下面的例子,Producer通過發送命令,告訴文件傳送的開始,發送中,結束。客戶端接收到這些命令之后,就知道如何接收資料了。

    客戶端收到內容后,根據命令將內容合并到一個文件中。 
    package org.apache.activemq.exchange.file;

    import java.io.BufferedOutputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.StreamMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Consumer {

        
    /**
         * 
    @param args
         
    */
        
    public static void main(String[] args) throws JMSException, IOException {
            ConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");

            Connection connection 
    = factory.createConnection();
            connection.start();

            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination 
    = session.createTopic("EXCHANGE.FILE");

            MessageConsumer consumer 
    = session.createConsumer(destination);

            
    boolean appended = false;
            
    try {
                
    while (true) {
                    Message message 
    = consumer.receive(5000);
                    
    if (message == null) {
                        
    continue;
                    }

                    
    if (message instanceof StreamMessage) {
                        StreamMessage streamMessage 
    = (StreamMessage) message;
                        String command 
    = streamMessage.getStringProperty("COMMAND");
                        
                        
    if ("start".equals(command)) {
                            appended 
    = false;
                            
    continue;
                        }

                        
    if ("sending".equals(command)) {
                            
    byte[] content = new byte[4096];
                            String file_name 
    = message.getStringProperty("FILE_NAME");
                            BufferedOutputStream bos 
    = null;
                            bos 
    = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
                            
    if (!appended) {
                                appended 
    = true;
                            }
                            
    while (streamMessage.readBytes(content) > 0) {
                                bos.write(content);
                            }
                            bos.close();
                            
    continue;
                        }

                        
    if ("end".equals(command)) {
                            appended 
    = false;
                            
    continue;
                        }
                    }
                }
            } 
    catch (JMSException e) {
                
    throw e;
            } 
    finally {
                
    if (connection != null) {
                    connection.close();
                }
            }

        }

    }

    發送端將文件分包,逐次發送到客戶端 
    package org.apache.activemq.exchange.file;

    import java.io.BufferedInputStream;
    import java.io.IOException;
    import java.io.InputStream;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.StreamMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Publisher {

        
    public static String FILE_NAME = "01.mp3";
        
        
    public static void main(String[] args) throws JMSException, IOException {
            ConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection 
    = factory.createConnection();
            connection.start();
            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination 
    = session.createTopic("EXCHANGE.FILE");        
            MessageProducer producer 
    = session.createProducer(destination);
            
    long time = System.currentTimeMillis();
            
            
    //通知客戶端開始接受文件
            StreamMessage message = session.createStreamMessage();
            message.setStringProperty(
    "COMMAND""start");
            producer.send(message);
            
            
    //開始發送文件
            byte[] content = new byte[4096];
            InputStream ins 
    = Publisher.class.getResourceAsStream(FILE_NAME);
            BufferedInputStream bins 
    = new BufferedInputStream(ins);
            
    while (bins.read(content) > 0) {
                
    //
                message = session.createStreamMessage();
                message.setStringProperty(
    "FILE_NAME", FILE_NAME);
                message.setStringProperty(
    "COMMAND""sending");
                message.clearBody();
                message.writeBytes(content);
                producer.send(message);
            }
            bins.close();
            ins.close();
            
            
    //通知客戶端發送完畢
            message = session.createStreamMessage();
            message.setStringProperty(
    "COMMAND""end");
            producer.send(message);
            
            connection.close();
            
            System.out.println(
    "Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
        }
    }
    主站蜘蛛池模板: 亚洲精品视频专区| 狠狠色伊人亚洲综合成人| 亚洲国产精品lv| 日本卡1卡2卡三卡免费| 亚洲欧洲无码AV电影在线观看| 亚洲美女视频免费| 亚洲第一福利视频| 57pao国产成永久免费视频| 91亚洲视频在线观看| 久久久久久国产a免费观看黄色大片| 国产免费不卡v片在线观看 | 国产福利免费视频| 亚洲国产精品狼友中文久久久| 亚洲国产精品无码av| 国产成人精品无码免费看| 亚洲高清在线视频| 免费A级毛片在线播放| 亚洲人成伊人成综合网久久| 最近中文字幕无吗高清免费视频| 亚洲精品少妇30p| 久久美女网站免费| 91嫩草亚洲精品| 日本一区二区三区日本免费| 美女被艹免费视频| 久久精品国产精品亚洲艾草网| 亚洲aⅴ天堂av天堂无码麻豆 | 亚洲香蕉在线观看| 精品国产麻豆免费网站| 国产精品hd免费观看| 久久久久亚洲精品天堂| 免费爱爱的视频太爽了| 久久www免费人成看国产片| 亚洲AV无码乱码国产麻豆穿越 | 亚洲乱码一区av春药高潮| 国产美女无遮挡免费视频网站| 亚洲丁香色婷婷综合欲色啪| 免费精品国偷自产在线在线 | 黄色a级免费网站| 久久久亚洲欧洲日产国码是AV| 成在线人免费无码高潮喷水| 亚洲久本草在线中文字幕|