<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Dict.CN 在線詞典, 英語學習, 在線翻譯

    都市淘沙者

    荔枝FM Everyone can be host

    統計

    留言簿(23)

    積分與排名

    優秀學習網站

    友情連接

    閱讀排行榜

    評論排行榜

    用NIO實現的一個Chat Demo [轉]

    發現網上找到的許多NIO的用例在跑起來后都有許多問題,最常見的就是沒有對interest event進行合理的registry和unregistry,導致程序一直在loopling,又或者當客戶端或服務器端連接斷開時有死循環的跡象。忍不住自己做了一個demo,我想可以作為一個NIO應用的模板去修改,只要把doRead,doWrite之類的用基于線程的Handler去處理,那就基本可以滿足需求了。
    這個Demo的目的是在Client和Server間建立類似QQ聊天那樣的功能,讓客戶端和服務器端都支持用戶輸入和異步消息顯示(因為服務器端要支持用戶的console輸入,所以不要用多個客戶端進行連接,否則可能會出現難以預測的問題)。
    代碼中用紅色顯示的地方是我認為需要注意的地方,說老實話NIO雖然很強大,但完全用Non-Blocking來編程,有許多需要小心的地方,一不小心還可能造成死循環。就像ReentrantLock之于Synchronized,如果基本的IO能滿足需求,就不必強求應用NIO。
    注意:OP_WRITE應該是在寫入準備就緒的時候才添加到SelectionKey里面去,而且在寫入完成后一定要去除,否則selector.select()方法就不會被blocking而造成死循環。

    MyNioServer.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Set;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.Selector;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;

    public class MyNioServer {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioServer(int port) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(
                    new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            ssc.register(sel, SelectionKey.OP_ACCEPT);
        }

        public void startup() {
            System.out.println("Server start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }
                    Set<SelectionKey> set = sel.selectedKeys();
                    Iterator<SelectionKey> it = set.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            System.out.println("Key isAcceptable");
                            doAccept(key);
                        }
                        if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        }
                        if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    set.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Server stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void doAccept(SelectionKey key) {
            try {
                SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
                sc.configureBlocking(false);
                SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
                newkey.attach(new LinkedList<ByteBuffer>());
                new Thread(new UserInteractive(newkey)).start();
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("Failed to accept new client.");
            }
            System.out.println("end doAccept");
        }

        // TODO buffersize performance testing
        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                int count = 0;
                while ( (count = sc.read(bb)) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                //if client disconnected, read return -1
                if(count == -1){
                    System.out.println("client disconnected");
                    disconnect(key);   
                } else {
                    System.out.println("message received from client:" + sb.toString());
                }
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            System.out.println("end doRead");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //TODO find out how to shutdown
        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                System.out.println("UserInteractive thread start...");
                BufferedReader br = new BufferedReader(new InputStreamReader(
                        System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        ((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
                        System.out
                                .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        key.selector().wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioServer server = new MyNioServer(10001);
                server.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting…");
            }
        }
    }


    MyNioClient.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.SelectionKey;
    import java.text.MessageFormat;
    import java.util.LinkedList;
    import java.util.Set;
    import java.util.Iterator;

    public class MyNioClient {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioClient(int port) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);    // this operation need to be executed before socket.connnect, for OP_CONNECT event
            sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
        }

        public void startup() {
            System.out.println("Client start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }               
                    Set<SelectionKey> selectedKeys = sel.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        //printKeyInfo(key);
                        if (key.isConnectable()) {
                            System.out.println("Key isConnectable");
                            doConnect(key);
                        } else if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        } else if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    selectedKeys.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Client stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void printKeyInfo(SelectionKey key) {
            String keyStr = MessageFormat
                    .format(
                            "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
                            key.interestOps(), key.readyOps(), key.isValid(), key
                                    .isAcceptable(), key.isConnectable(), key
                                    .isReadable(), key.isWritable());
            System.out.println(keyStr);
        }

        private void doConnect(SelectionKey key) {
            try {
                boolean flag = ((SocketChannel) key.channel()).finishConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            System.out.println("unregister OP_CONNECT from interestOps");
            key.interestOps(SelectionKey.OP_READ);
            key.attach(new LinkedList<ByteBuffer>());
            new Thread(new UserInteractive(key)).start();
        }

        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                while (sc.read(bb) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                System.out.println("message received from server:" + sb.toString());
            } catch (IOException e) {
                e.printStackTrace();
                disconnect(key);
                System.exit(1);
            }
            System.out.println("now end readMessage");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                        .attachment();
                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        if ("quit".equalsIgnoreCase(inputLine)) {
                            key.channel().close();
                            System.exit(1);
                            break;
                        }
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        outseq.offer(bb);
                        System.out
                        .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ
                                | SelectionKey.OP_WRITE);
                        sel.wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioClient client = new MyNioClient(10001);
                client.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting...");
            }
        }

    }

    posted on 2010-05-29 12:38 都市淘沙者 閱讀(1353) 評論(1)  編輯  收藏 所屬分類: 多線程并發編程

    評論

    # re: 用NIO實現的一個Chat Demo [轉][未登錄] 2015-04-08 23:36 harry

    請教一個問題,為什么UserInteractive里面 SelctionKey.wakeup以后,就成了寫就緒模式呢(key.isWritable()是true)  回復  更多評論   

    主站蜘蛛池模板: 亚洲深深色噜噜狠狠爱网站| 亚洲乱码在线观看| 久久久久久av无码免费看大片| 国产精品视_精品国产免费| 亚洲熟伦熟女专区hd高清| 国产精品免费观看久久| 亚洲区日韩精品中文字幕| 精品无码国产污污污免费| 亚洲aⅴ无码专区在线观看春色 | 2020天堂在线亚洲精品专区| a拍拍男女免费看全片| 亚洲人成综合在线播放| 好爽又高潮了毛片免费下载 | 春意影院午夜爽爽爽免费| 国产精品亚洲产品一区二区三区| 一级毛片不卡免费看老司机| 在线亚洲午夜理论AV大片| 日本中文字幕免费高清视频| 亚洲最大的视频网站| 在线观看免费宅男视频| 白白色免费在线视频| 国产精品久久久亚洲| 久久www免费人成看片| 亚洲人成未满十八禁网站| 亚洲美日韩Av中文字幕无码久久久妻妇 | 中文字幕一精品亚洲无线一区| 国产免费无码一区二区| 亚洲导航深夜福利| 国产极品粉嫩泬免费观看| 精精国产www视频在线观看免费| 亚洲国产精品久久久久婷婷软件| 一二三四免费观看在线视频中文版| 大桥未久亚洲无av码在线| 亚洲国产精品一区第二页 | 波多野结衣免费视频观看| 99在线热播精品免费99热| 亚洲一级特黄特黄的大片| 高清在线亚洲精品国产二区| 99精品视频在线观看免费专区| 亚洲乱码中文字幕在线| 亚洲处破女AV日韩精品|