<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    天道酬勤

    點點滴滴的足跡,幻化成漫天的云彩
    posts - 22, comments - 0, trackbacks - 0, articles - 2
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    ActiveMQ之三 -- 使用ActiveMQ來傳送文件

    Posted on 2011-08-28 23:40 匆匆過客 閱讀(5300) 評論(0)  編輯  收藏 所屬分類: Java
    這個方法還有待研究,目前還有如下幾個疑點:
    1. ActiveMQ 報出這樣的信息:
    INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
    destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info
    2. 這種以異步方式傳送資料,能保證客戶端能以正確的順序接收到文件段麼?

    使用ActiveMQ傳送文件,發送端必須將文件拆成一段一段,每段封裝在獨立的Message中,逐次發送到客戶端。例如下面的例子,Producer通過發送命令,告訴文件傳送的開始,發送中,結束。客戶端接收到這些命令之后,就知道如何接收資料了。

    客戶端收到內容后,根據命令將內容合并到一個文件中。 
    package org.apache.activemq.exchange.file;

    import java.io.BufferedOutputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.StreamMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Consumer {

        
    /**
         * 
    @param args
         
    */
        
    public static void main(String[] args) throws JMSException, IOException {
            ConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");

            Connection connection 
    = factory.createConnection();
            connection.start();

            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination 
    = session.createTopic("EXCHANGE.FILE");

            MessageConsumer consumer 
    = session.createConsumer(destination);

            
    boolean appended = false;
            
    try {
                
    while (true) {
                    Message message 
    = consumer.receive(5000);
                    
    if (message == null) {
                        
    continue;
                    }

                    
    if (message instanceof StreamMessage) {
                        StreamMessage streamMessage 
    = (StreamMessage) message;
                        String command 
    = streamMessage.getStringProperty("COMMAND");
                        
                        
    if ("start".equals(command)) {
                            appended 
    = false;
                            
    continue;
                        }

                        
    if ("sending".equals(command)) {
                            
    byte[] content = new byte[4096];
                            String file_name 
    = message.getStringProperty("FILE_NAME");
                            BufferedOutputStream bos 
    = null;
                            bos 
    = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
                            
    if (!appended) {
                                appended 
    = true;
                            }
                            
    while (streamMessage.readBytes(content) > 0) {
                                bos.write(content);
                            }
                            bos.close();
                            
    continue;
                        }

                        
    if ("end".equals(command)) {
                            appended 
    = false;
                            
    continue;
                        }
                    }
                }
            } 
    catch (JMSException e) {
                
    throw e;
            } 
    finally {
                
    if (connection != null) {
                    connection.close();
                }
            }

        }

    }

    發送端將文件分包,逐次發送到客戶端 
    package org.apache.activemq.exchange.file;

    import java.io.BufferedInputStream;
    import java.io.IOException;
    import java.io.InputStream;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.StreamMessage;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Publisher {

        
    public static String FILE_NAME = "01.mp3";
        
        
    public static void main(String[] args) throws JMSException, IOException {
            ConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection 
    = factory.createConnection();
            connection.start();
            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination 
    = session.createTopic("EXCHANGE.FILE");        
            MessageProducer producer 
    = session.createProducer(destination);
            
    long time = System.currentTimeMillis();
            
            
    //通知客戶端開始接受文件
            StreamMessage message = session.createStreamMessage();
            message.setStringProperty(
    "COMMAND""start");
            producer.send(message);
            
            
    //開始發送文件
            byte[] content = new byte[4096];
            InputStream ins 
    = Publisher.class.getResourceAsStream(FILE_NAME);
            BufferedInputStream bins 
    = new BufferedInputStream(ins);
            
    while (bins.read(content) > 0) {
                
    //
                message = session.createStreamMessage();
                message.setStringProperty(
    "FILE_NAME", FILE_NAME);
                message.setStringProperty(
    "COMMAND""sending");
                message.clearBody();
                message.writeBytes(content);
                producer.send(message);
            }
            bins.close();
            ins.close();
            
            
    //通知客戶端發送完畢
            message = session.createStreamMessage();
            message.setStringProperty(
    "COMMAND""end");
            producer.send(message);
            
            connection.close();
            
            System.out.println(
    "Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
        }
    }
    主站蜘蛛池模板: 亚洲av手机在线观看| 亚洲成av人片在线看片| 亚洲午夜AV无码专区在线播放| 一本色道久久综合亚洲精品高清| 亚洲乱码日产一区三区| 亚洲丝袜美腿视频| 亚洲精品福利你懂| 免费人成网站永久| 久久99免费视频| 毛片a级三毛片免费播放| 免费人成视频在线观看视频| 婷婷久久久亚洲欧洲日产国码AV | 99精品免费观看| 一二三四影视在线看片免费 | 国产在线观看免费视频播放器 | 亚洲综合久久久久久中文字幕| 亚洲av无码专区亚洲av不卡| 免费一级毛片在线播放视频| 亚洲尹人九九大色香蕉网站| 精品视频免费在线| 97在线线免费观看视频在线观看| 亚洲成人黄色在线| 免费黄色一级毛片| 中文字幕亚洲综合久久| 好猛好深好爽好硬免费视频| 99久久免费精品国产72精品九九| 亚洲色大成网站WWW久久九九| 怡红院亚洲红怡院在线观看| AV无码免费永久在线观看| 亚洲国产精品无码中文字| 黄色毛片视频免费| 性做久久久久免费看| 亚洲乱码一二三四区乱码| 免费国产成人高清在线观看麻豆| 国产精品九九久久免费视频| 日韩免费福利视频| 中国国产高清免费av片| 国产午夜亚洲不卡| 国产精品久久永久免费| 色吊丝性永久免费看码| 免费播放特黄特色毛片|