<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Dict.CN 在線詞典, 英語學習, 在線翻譯

    都市淘沙者

    荔枝FM Everyone can be host

    統(tǒng)計

    留言簿(23)

    積分與排名

    優(yōu)秀學習網(wǎng)站

    友情連接

    閱讀排行榜

    評論排行榜

    用NIO實現(xiàn)的一個Chat Demo [轉]

    發(fā)現(xiàn)網(wǎng)上找到的許多NIO的用例在跑起來后都有許多問題,最常見的就是沒有對interest event進行合理的registry和unregistry,導致程序一直在loopling,又或者當客戶端或服務器端連接斷開時有死循環(huán)的跡象。忍不住自己做了一個demo,我想可以作為一個NIO應用的模板去修改,只要把doRead,doWrite之類的用基于線程的Handler去處理,那就基本可以滿足需求了。
    這個Demo的目的是在Client和Server間建立類似QQ聊天那樣的功能,讓客戶端和服務器端都支持用戶輸入和異步消息顯示(因為服務器端要支持用戶的console輸入,所以不要用多個客戶端進行連接,否則可能會出現(xiàn)難以預測的問題)。
    代碼中用紅色顯示的地方是我認為需要注意的地方,說老實話NIO雖然很強大,但完全用Non-Blocking來編程,有許多需要小心的地方,一不小心還可能造成死循環(huán)。就像ReentrantLock之于Synchronized,如果基本的IO能滿足需求,就不必強求應用NIO。
    注意:OP_WRITE應該是在寫入準備就緒的時候才添加到SelectionKey里面去,而且在寫入完成后一定要去除,否則selector.select()方法就不會被blocking而造成死循環(huán)。

    MyNioServer.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Set;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.Selector;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;

    public class MyNioServer {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioServer(int port) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(
                    new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            ssc.register(sel, SelectionKey.OP_ACCEPT);
        }

        public void startup() {
            System.out.println("Server start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }
                    Set<SelectionKey> set = sel.selectedKeys();
                    Iterator<SelectionKey> it = set.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            System.out.println("Key isAcceptable");
                            doAccept(key);
                        }
                        if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        }
                        if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    set.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Server stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void doAccept(SelectionKey key) {
            try {
                SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
                sc.configureBlocking(false);
                SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
                newkey.attach(new LinkedList<ByteBuffer>());
                new Thread(new UserInteractive(newkey)).start();
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("Failed to accept new client.");
            }
            System.out.println("end doAccept");
        }

        // TODO buffersize performance testing
        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                int count = 0;
                while ( (count = sc.read(bb)) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                //if client disconnected, read return -1
                if(count == -1){
                    System.out.println("client disconnected");
                    disconnect(key);   
                } else {
                    System.out.println("message received from client:" + sb.toString());
                }
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            System.out.println("end doRead");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //TODO find out how to shutdown
        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                System.out.println("UserInteractive thread start...");
                BufferedReader br = new BufferedReader(new InputStreamReader(
                        System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        ((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
                        System.out
                                .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        key.selector().wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioServer server = new MyNioServer(10001);
                server.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting…");
            }
        }
    }


    MyNioClient.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.SelectionKey;
    import java.text.MessageFormat;
    import java.util.LinkedList;
    import java.util.Set;
    import java.util.Iterator;

    public class MyNioClient {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioClient(int port) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);    // this operation need to be executed before socket.connnect, for OP_CONNECT event
            sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
        }

        public void startup() {
            System.out.println("Client start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }               
                    Set<SelectionKey> selectedKeys = sel.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        //printKeyInfo(key);
                        if (key.isConnectable()) {
                            System.out.println("Key isConnectable");
                            doConnect(key);
                        } else if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        } else if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    selectedKeys.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Client stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void printKeyInfo(SelectionKey key) {
            String keyStr = MessageFormat
                    .format(
                            "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
                            key.interestOps(), key.readyOps(), key.isValid(), key
                                    .isAcceptable(), key.isConnectable(), key
                                    .isReadable(), key.isWritable());
            System.out.println(keyStr);
        }

        private void doConnect(SelectionKey key) {
            try {
                boolean flag = ((SocketChannel) key.channel()).finishConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            System.out.println("unregister OP_CONNECT from interestOps");
            key.interestOps(SelectionKey.OP_READ);
            key.attach(new LinkedList<ByteBuffer>());
            new Thread(new UserInteractive(key)).start();
        }

        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                while (sc.read(bb) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                System.out.println("message received from server:" + sb.toString());
            } catch (IOException e) {
                e.printStackTrace();
                disconnect(key);
                System.exit(1);
            }
            System.out.println("now end readMessage");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                        .attachment();
                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        if ("quit".equalsIgnoreCase(inputLine)) {
                            key.channel().close();
                            System.exit(1);
                            break;
                        }
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        outseq.offer(bb);
                        System.out
                        .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ
                                | SelectionKey.OP_WRITE);
                        sel.wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioClient client = new MyNioClient(10001);
                client.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting...");
            }
        }

    }

    posted on 2010-05-29 12:38 都市淘沙者 閱讀(1355) 評論(1)  編輯  收藏 所屬分類: 多線程并發(fā)編程

    評論

    # re: 用NIO實現(xiàn)的一個Chat Demo [轉][未登錄] 2015-04-08 23:36 harry

    請教一個問題,為什么UserInteractive里面 SelctionKey.wakeup以后,就成了寫就緒模式呢(key.isWritable()是true)  回復  更多評論   

    主站蜘蛛池模板: 亚洲免费在线观看视频| 又大又硬又爽免费视频| 伊人久久大香线蕉亚洲| 黄色网址大全免费| 国产资源免费观看| 国产偷国产偷亚洲高清人 | 在线播放高清国语自产拍免费| 337p日本欧洲亚洲大胆色噜噜| 人妻丰满熟妇无码区免费 | 亚洲成a人一区二区三区| 免费人妻精品一区二区三区| 亚洲av午夜精品一区二区三区 | 国产小视频免费观看| 国产精品免费观看调教网| 亚洲视频在线观看网站| 亚洲成人影院在线观看| 亚洲免费人成视频观看| 亚洲欧美日韩综合久久久| 四虎影在线永久免费四虎地址8848aa| 丰满亚洲大尺度无码无码专线 | 亚洲av无码一区二区三区乱子伦 | 毛片基地免费观看| 亚洲人成网站18禁止| 亚洲成a人片在线观看日本| 在线观看免费中文视频| 亚洲字幕AV一区二区三区四区| 午夜免费福利网站| 一级特黄aaa大片免费看| 伊人久久综在合线亚洲2019| 免费下载成人电影| 羞羞漫画页面免费入口欢迎你| 亚洲精品免费视频| 妞干网免费视频在线观看| 免费不卡在线观看AV| 在线观看黄片免费入口不卡| 亚洲日本乱码一区二区在线二产线| 精品久久久久久久免费加勒比| 18女人腿打开无遮掩免费| 免费精品国产自产拍在线观看| 国产精品亚洲专区在线观看| 亚洲综合色成在线播放|