Java 1.4開始提供的NIO API常用于開發高性能網絡服務器,本文演示了如何用這個API開發一個TCP Echo Server。

 

Java網絡服務器編程 一文演示了如何使用JavaSocket API編寫一個簡單的TCP Echo Server。其阻塞式IO的處理方式雖然簡單,但每個客戶端都需要一個單獨的Thread來處理,當服務器需要同時處理大量客戶端時,這種做法不再可行。使用NIO API可以讓一個或有限的幾個Thread同時處理連接到服務器上的所有客戶端。(關于NIO API的一些介紹,可以在Java NIO API詳解一文中找到。)

 

NIO API允許一個線程通過Selector對象同時監控多個SelectableChannel來處理多路IO,NIO應用程序一般按下圖所示工作:

Figure 1

 

Figure 1 所示,Client一直在循環地進行select操作,每次select()返回以后,通過selectedKeys()可以得到需要處理的SelectableChannel并對其一一處理。

 

這樣做雖然簡單但也有個問題,當有不同類型的SelectableChannel需要做不同的IO處理時,在圖中Client的代碼就需要判斷channel的類型然后再作相應的操作,這往往意味著一連串的if else。更糟糕的是,每增加一種新的channel,不但需要增加相應的處理代碼,還需要對這一串if else進行維護。(在本文的這個例子中,我們有ServerSocketChannelSocketChannel這兩種channel需要分別被處理。)

 

如果考慮將channel及其需要的IO處理進行封裝,抽象出一個統一的接口,就可以解決這一問題。在Listing 1中的NioSession就是這個接口。

 

NioSessionchannel()方法返回其封裝的SelectableChannel對象,interestOps()返回用于這個channel注冊的interestOps。registered()是當SelectableChannel被注冊后調用的回調函數,通過這個回調函數,NioSession可以得到channel注冊后的SelectionKeyprocess()函數則是NioSession接口的核心,這個方法抽象了封裝的SelectableChannel所需的IO處理邏輯。

 

Listing 1:

public interface NioSession {

 

    public SelectableChannel channel();

   

    public int interestOps();

   

    public void registered(SelectionKey key);

   

    public void process();  

}

 

NioSession一起工作的是NioWorker這個類(Listing 2),它是NioSession的調用者,封裝了一個Selector對象和Figure 1中循環select操作的邏輯。理解這個類可以幫助我們了解該如何使用NioSession這個接口。

 

NioWorker實現了Runnable接口,循環select操作的邏輯就在run()方法中。在NioWorker – NioSession這個框架中,NioSessionchannel注冊的時候會被作為attachment送入register函數,這樣,在每次select()操作的循環中,對于selectedKeys()中的每一個SelectionKey,我們都可以通過attachment拿到其相對應的NioSession然后調用其process()方法。

 

每次select()循環還有一個任務,就是將通過add()方法加入到這個NioWorkerNioSession注冊到Selector上。在Listing 2的代碼中可以看出,NioSession中的channel()被取出并注冊在Selector上,注冊所需的interestOpsNioSession中取出,NioSession本身則作為attachment送入register()函數。注冊成功后,NioSessionregistered()回調函數會被調用。

 

NioWorkeradd()方法的作用是將一個NioSession加入到該NioWorker中,并wakeup當前的select操作,這樣在下一次的select()調用之前,這個NioSession會被注冊。stop()方法則是讓一個正在run()NioWorker停止。closeAllChannels()會關閉當前注冊的所有channel,這個方法可在NioWorker不再使用時用來釋放IO資源。

 

Listing 2:

public class NioWorker implements Runnable {

   

    public NioWorker(Selector sel) {

       _sel = sel;

       _added = new HashSet();

    }

   

    public void run() {

       try {

           try {

             

              while (_run) {

                  _sel.select();

                  Set selected = _sel.selectedKeys();

                  for (Iterator itr = selected.iterator(); itr.hasNext();) {

                     SelectionKey key = (SelectionKey) itr.next();

                     NioSession s = (NioSession) key.attachment();

                     s.process();

                     itr.remove();

                  }

                 

                  synchronized (_added) {

                     for (Iterator itr = _added.iterator(); itr.hasNext();) {

                         NioSession s = (NioSession) itr.next();

                         SelectionKey key = s.channel().register(_sel, s.interestOps(), s);

                         s.registered(key);

                         itr.remove();

                     }

                  }

              }

             

           } finally {

              _sel.close();

           }

       } catch (IOException ex) {

           throw new Error(ex);

       }

    }

   

    public void add(NioSession s) {

       synchronized (_added) {

           _added.add(s);

       }

       _sel.wakeup();

    }

   

    public synchronized void stop() {

       _run = false;

       _sel.wakeup();

    }

   

    public void closeAllChannels() {

       for (Iterator itr = _sel.keys().iterator(); itr.hasNext();) {

           SelectionKey key = (SelectionKey) itr.next();

           try {        

              key.channel().close();

           } catch (IOException ex) {}

       }

    }

   

    protected Selector _sel = null;

    protected Collection _added = null;

    protected volatile boolean _run = true;

}

 

Echo Server這個例子中,我們需要一個ServerSocketChannel來接受新的TCP連接,對于每個TCP連接,我們還需要一個SocketChannel來處理這個TCP連接上的IO操作。把這兩種channel和上面的NioWorker – NioSession結構整合在一起,可以得到NioServerSessionNioEchoSession這兩個類,它們分別封裝了ServerSocketChannelSocketChannel及其對應的IO操作。下面這個UML類圖描述了這4個類的關系:

Figure 2

 

可以看到NioWorkerNioSession對新加入的兩個類沒有任何依賴性,NioServerSessionNioEchoSession通過實現NioSession這個接口為系統加入了新的功能。這樣的一個體系架構符合了Open-Close原則,新的功能可以通過實現NioSession被加入而無需對原有的模塊進行修改,這體現了面向對象設計的強大威力。

 

NioServerSession的實現(Listing 3)相對比較簡單,其封裝了一個ServerSocketChannel以及從這個channel上接受新的TCP連接的邏輯。NioServerSession還需要一個NioWorker的引用,這樣每接受一個新的TCP連接,NioServerSession就為其創建一個NioEchoSession的對象,并將這個對象加入到NioWorker中。

 

Listing 3:

public class NioServerSession implements NioSession {

   

    public NioServerSession(ServerSocketChannel channel, NioWorker worker) {

       _channel = channel;

       _worker = worker;

    }

   

    public void registered(SelectionKey key) {}

   

    public void process() {

       try {

           SocketChannel c = _channel.accept();

           if (c != null) {

              c.configureBlocking(false);

              NioEchoSession s = new NioEchoSession(c);

              _worker.add(s);

           }

       } catch (IOException ex) {

           throw new Error(ex);

       }

    }

   

    public SelectableChannel channel() {

       return _channel;

    }

   

    public int interestOps(){

       return SelectionKey.OP_ACCEPT;

    }

   

    protected ServerSocketChannel _channel;

    protected NioWorker _worker;

}

 

NioEchoSession的行為要復雜一些,NioEchoSession會先從TCP連接中讀取數據,再將這些數據用同一個連接寫回去,并重復這個步驟直到客戶端把連接關閉為止。我們可以把“Reading”和“Writing”看作NioEchoSession的兩個狀態,這樣可以用一個有限狀態機來描述它的行為,如下圖所示:

Figure 3

 

接下來的工作就是如何實現這個有限狀態機了。在這個例子中,我們使用State模式來實現它。下面這張UML類圖描述了NioEchoSession的設計細節。

Figure 4

 

NioEchoSession所處的狀態由EchoState這個抽象類來表現,其兩個子類分別對應了“Reading”和“Writing”這兩個狀態。NioEchoSession會將process()interestOps()這兩個方法delegateEchoState來處理,這樣,當NioEchoSession處于不同的狀態時,就會有不同的行為。

 

Listing 4EchoState的實現。EchoState定義了process()interestOps()這兩個抽象的方法來讓子類實現。NioEchoSession中的process()方法會被delegate到其當前EchoStateprocess()方法,NioEchoSession本身也會作為一個描述context的參數被送入EchoStateprocess()方法中。EchoState定義的interestOps()方法則會在NioEchoSession注冊和轉變State的時候被用到。

 

EchoState還定義了兩個靜態的方法來返回預先創建好的ReadStateWriteState,這樣做的好處是可以避免在NioEchoSession轉換state的時候創建一些不必要的對象從而影響性能。然而,這樣做要求state類必須是無狀態的,狀態需要保存在context類,也就是NioEchoSession中。

 

Listing 4:

public abstract class EchoState {

   

    public abstract void process(NioEchoSession s) throws IOException;

   

    public abstract int interestOps();

   

    public static EchoState readState() {

       return _read;

    }

   

    public static EchoState writeState() {

       return _write;

    }

   

    protected static EchoState _read = new ReadState();

    protected static EchoState _write = new WriteState();

}

 

Listing 5NioEchoSession的實現。NioEchoSession包含有一個SocketChannel,這個channel注冊后得到的SelectionKey,一個用于存放數據的ByteBuffer和一個記錄當前stateEchoState對象。在初始化時,EchoState被初始化為一個ReadState。NioEchoSessionprocess()方法和interestOps()方法都delegate到當前的EchoState中。其setState()方法用于切換當前state,在切換state后,NioEchoSession會通過SelectionKey更新注冊的interestOpsclose()方法用于關閉這個NioEchoSession對象。

 

Listing 5:

public class NioEchoSession implements NioSession {

   

    public NioEchoSession(SocketChannel c) {

       _channel = c;

       _buf = ByteBuffer.allocate(128);

       _state = EchoState.readState();

    }

   

    public void registered(SelectionKey key) {

       _key = key;

    }

   

    public void process() {

       try {

           _state.process(this);

       } catch (IOException ex) {

           close();

           throw new Error(ex);

       }

    }

   

    public SelectableChannel channel() {

       return _channel;

    }

   

    public int interestOps() {

       return _state.interestOps();

    }

   

    public void setState(EchoState state) {

       _state = state;

       _key.interestOps(interestOps());

    }

   

    public void close() {

       try {

           _channel.close();

       } catch (IOException ex) {

           throw new Error(ex);

       }

    }

   

    protected SocketChannel _channel = null;

    protected SelectionKey _key;

    protected ByteBuffer _buf = null;

    protected EchoState _state = null;

}

 

Listing 6Listing 7分別是ReadStateWriteState的實現。ReadStateprocess()中會先從NioEchoSessionchannel中讀取數據,如果未能讀到數據,NioEchoSession會繼續留在ReadState;如果讀取出錯,NioEchoSession會被關閉;如果讀取成功,NioEchoSession會被切換到WriteState。WriteState則負責將NioEchoSession中已經讀取的數據寫回到channel中,全部寫完后,NioEchoSession會被切換回ReadState。

 

Listing 6:

public class ReadState extends EchoState {

   

    public void process(NioEchoSession s)

       throws IOException

    {

       SocketChannel channel = s._channel;

       ByteBuffer buf = s._buf;

       int count = channel.read(buf);

 

       if (count == 0) {

           return;

       }

 

       if (count == -1) {

           s.close();

           return;

       }

 

       buf.flip();

       s.setState(EchoState.writeState());

    }

   

    public int interestOps() {

       return SelectionKey.OP_READ;

    }

}

 

Listing 7:

public class WriteState extends EchoState {

   

    public void process(NioEchoSession s)

       throws IOException

    {

       SocketChannel channel = s._channel;

       ByteBuffer buf = s._buf;

       channel.write(buf);

       if (buf.remaining() == 0) {

           buf.clear();

           s.setState(EchoState.readState());

       }

    }

   

    public int interestOps() {

       return SelectionKey.OP_WRITE;

    }

}

 

NioEchoServer(Listing 8)被用來啟動和關閉一個TCP Echo Server,這個類實現了Runnable接口,調用其run()方法就啟動了Echo Server。其shutdown()方法被用來關閉這個Echo Server,注意shutdown()run()finally block中的同步代碼確保了只有當Echo Server被關閉后,shutdown()方法才會返回。

 

Listing 8:

public class NioEchoServer implements Runnable {

   

    public void run() {

       try {

           ServerSocketChannel serv = ServerSocketChannel.open();

           try {

              serv.socket().bind(new InetSocketAddress(7));

              serv.configureBlocking(false);

              _worker = new NioWorker(Selector.open());

              NioServerSession s = new NioServerSession(serv, _worker);

              _worker.add(s);

              _worker.run();

           } finally {

              _worker.closeAllChannels();

              synchronized (this) {

                  notify();

              }

           }

       } catch (IOException ex) {

           throw new Error(ex);

       }

    }

   

    public synchronized void shutdown() {

       _worker.stop();

       try {

           wait();

       } catch (InterruptedException ex) {

           throw new Error(ex);

       }

    }

   

    protected NioWorker _worker = null;

}

 

最后,通過一個簡單的main()函數(Listing 9),我們就可以運行這個Echo Server了。

 

Listing 9:

    public static void main(String [] args) {

       new NioEchoServer().run();

    }

 

我們可以通過telnet程序來檢驗這個程序的運行狀況:

1. 打開一個命令行,輸入 telnet localhost 7 來運行一個telnet程序并連接到Echo Server上。

2. telnet程序中輸入字符,可以看到輸入的字符被顯示在屏幕上。(這是因為Echo Server將收到的字符寫回到客戶端)

3. 多打開幾個telnet程序進行測試,可以看到Echo Server能通過NIO API用一個Thread服務多個客戶端。

作者DaiJiaLin