JDK 1.4以前,Java的IO操作集中在java.io這個包中,是基于流的同步(blocking)API。對于大多數應用來說,這樣的API使用很方便,然而,一些對性能要求較高的應用,尤其是服務端應用,往往需要一個更為有效的方式來處理IO。從JDK 1.4起,NIO API作為一個基于緩沖區,并能提供異步(non-blocking)IO操作的API被引入。本文對其進行深入的介紹。

NIO API主要集中在java.nio和它的subpackages中:

 

java.nio

定義了Buffer及其數據類型相關的子類。其中被java.nio.channels中的類用來進行IO操作的ByteBuffer的作用非常重要。

 

java.nio.channels

定義了一系列處理IOChannel接口以及這些接口在文件系統和網絡通訊上的實現。通過Selector這個類,還提供了進行異步IO操作的辦法。這個包可以說是NIO API的核心。

 

java.nio.channels.spi

定義了可用來實現channelselector API的抽象類。

 

java.nio.charset

       定義了處理字符編碼和解碼的類。

 

java.nio.charset.spi

       定義了可用來實現charset API的抽象類。

 

java.nio.channels.spijava.nio.charset.spi這兩個包主要被用來對現有NIO API進行擴展,在實際的使用中,我們一般只和另外的3個包打交道。下面將對這3個包一一介紹。

 

Package java.nio

這個包主要定義了Buffer及其子類。Buffer定義了一個線性存放primitive type數據的容器接口。對于除boolean以外的其他primitive type,都有一個相應的Buffer子類,ByteBuffer是其中最重要的一個子類。

 

下面這張UML類圖描述了java.nio中的類的關系:



 

Buffer

定義了一個可以線性存放primitive type數據的容器接口。Buffer主要包含了與類型(byte, char…)無關的功能。值得注意的是Buffer及其子類都不是線程安全的。

 

每個Buffer都有以下的屬性:

 

capacity

這個Buffer最多能放多少數據。capacity一般在buffer被創建的時候指定。

limit

Buffer上進行的讀寫操作都不能越過這個下標。當寫數據到buffer中時,limit一般和capacity相等,當讀數據時,limit代表buffer中有效數據的長度。

position

/寫操作的當前下標。當使用buffer的相對位置進行讀/寫操作時,讀/寫會從這個下標進行,并在操作完成后,buffer會更新下標的值。

mark

一個臨時存放的位置下標。調用mark()會將mark設為當前的position的值,以后調用reset()會將position屬性設置為mark的值。mark的值總是小于等于position的值,如果將position的值設的比mark小,當前的mark值會被拋棄掉。

 

這些屬性總是滿足以下條件:

0 <= mark <= position <= limit <= capacity

 

limitposition的值除了通過limit()position()函數來設置,也可以通過下面這些函數來改變:

 

Buffer clear()

position設為0,把limit設為capacity,一般在把數據寫入Buffer前調用。

Buffer flip()

limit設為當前position,把position設為0,一般在從Buffer讀出數據前調用。

Buffer rewind()

position設為0limit不變,一般在把數據重寫入Buffer前調用。

 

Buffer對象有可能是只讀的,這時,任何對該對象的寫操作都會觸發一個ReadOnlyBufferExceptionisReadOnly()方法可以用來判斷一個Buffer是否只讀。

 

ByteBuffer

Buffer的子類中,ByteBuffer是一個地位較為特殊的類,因為在java.io.channels中定義的各種channelIO操作基本上都是圍繞ByteBuffer展開的。

 

ByteBuffer定義了4static方法來做創建工作:

 

ByteBuffer allocate(int capacity)

創建一個指定capacityByteBuffer

ByteBuffer allocateDirect(int capacity)

創建一個directByteBuffer,這樣的ByteBuffer在參與IO操作時性能會更好(很有可能是在底層的實現使用了DMA技術),相應的,創建和回收directByteBuffer的代價也會高一些。isDirect()方法可以檢查一個buffer是否是direct的。

ByteBuffer wrap(byte [] array)

ByteBuffer wrap(byte [] array, int offset, int length)

把一個byte數組或byte數組的一部分包裝成ByteBuffer

 

ByteBuffer定義了一系列getput操作來從中讀寫byte數據,如下面幾個:

 

byte get()

ByteBuffer get(byte [] dst)

byte get(int index)

 

ByteBuffer put(byte b)

ByteBuffer put(byte [] src)

ByteBuffer put(int index, byte b)

 

這些操作可分為絕對定位和相對定為兩種,相對定位的讀寫操作依靠position來定位Buffer中的位置,并在操作完成后會更新position的值。

在其它類型的buffer中,也定義了相同的函數來讀寫數據,唯一不同的就是一些參數和返回值的類型。

 

除了讀寫byte類型數據的函數,ByteBuffer的一個特別之處是它還定義了讀寫其它primitive數據的方法,如:

 

int getInt()

       ByteBuffer中讀出一個int值。

ByteBuffer putInt(int value)

       寫入一個int值到ByteBuffer中。

 

讀寫其它類型的數據牽涉到字節序問題,ByteBuffer會按其字節序(大字節序或小字節序)寫入或讀出一個其它類型的數據(int,long…)。字節序可以用order方法來取得和設置:

 

ByteOrder order()

       返回ByteBuffer的字節序。

ByteBuffer order(ByteOrder bo)

       設置ByteBuffer的字節序。

 

ByteBuffer另一個特別的地方是可以在它的基礎上得到其它類型的buffer。如:

 

CharBuffer asCharBuffer()

為當前的ByteBuffer創建一個CharBuffer的視圖。在該視圖buffer中的讀寫操作會按照ByteBuffer的字節序作用到ByteBuffer中的數據上。

 

用這類方法創建出來的buffer會從ByteBufferposition位置開始到limit位置結束,可以看作是這段數據的視圖。視圖bufferreadOnly屬性和direct屬性與ByteBuffer的一致,而且也只有通過這種方法,才可以得到其他數據類型的direct buffer

 

ByteOrder

用來表示ByteBuffer字節序的類,可將其看成java中的enum類型。主要定義了下面幾個static方法和屬性:

 

ByteOrder BIG_ENDIAN

       代表大字節序的ByteOrder

ByteOrder LITTLE_ENDIAN

       代表小字節序的ByteOrder

ByteOrder nativeOrder()

       返回當前硬件平臺的字節序。

 

MappedByteBuffer

ByteBuffer的子類,是文件內容在內存中的映射。這個類的實例需要通過FileChannelmap()方法來創建。

 

 

接下來看看一個使用ByteBuffer的例子,這個例子從標準輸入不停地讀入字符,當讀滿一行后,將收集的字符寫到標準輸出:

    public static void main(String [] args)

       throws IOException

    {

       // 創建一個capacity256ByteBuffer

       ByteBuffer buf = ByteBuffer.allocate(256);

       while (true) {

           // 從標準輸入流讀入一個字符

           int c = System.in.read();

           // 當讀到輸入流結束時,退出循環

           if (c == -1)

              break;

          

           // 把讀入的字符寫入ByteBuffer

           buf.put((byte) c);

           // 當讀完一行時,輸出收集的字符

           if (c == '\n') {

              // 調用flip()使limit變為當前的position的值,position變為0,

              // 為接下來從ByteBuffer讀取做準備

              buf.flip();

              // 構建一個byte數組

              byte [] content = new byte[buf.limit()];

              // ByteBuffer中讀取數據到byte數組中

              buf.get(content);

              // byte數組的內容寫到標準輸出

              System.out.print(new String(content));

              // 調用clear()使position變為0,limit變為capacity的值,

              // 為接下來寫入數據到ByteBuffer中做準備

              buf.clear();

           }

       }

    }

 

 

Package java.nio.channels

這個包定義了Channel的概念,Channel表現了一個可以進行IO操作的通道(比如,通過FileChannel,我們可以對文件進行讀寫操作)。java.nio.channels包含了文件系統和網絡通訊相關的channel類。這個包通過SelectorSelectableChannel這兩個類,還定義了一個進行異步(non-blockingIO操作的API,這對需要高性能IO的應用非常重要。

 

下面這張UML類圖描述了java.nio.channelsinterface的關系:



 

Channel

Channel表現了一個可以進行IO操作的通道,該interface定義了以下方法:

 

boolean isOpen()

       Channel是否是打開的。

void close()

       關閉這個Channel,相關的資源會被釋放。

 

ReadableByteChannel

定義了一個可從中讀取byte數據的channel interface

 

int read(ByteBuffer dst)

channel中讀取byte數據并寫到ByteBuffer中。返回讀取的byte數。

 

WritableByteChannel

定義了一個可向其寫byte數據的channel interface

 

int write(ByteBuffer src)

       ByteBuffer中讀取byte數據并寫到channel中。返回寫出的byte數。

 

ByteChannel

ByteChannel并沒有定義新的方法,它的作用只是把ReadableByteChannelWritableByteChannel合并在一起。

 

ScatteringByteChannel

繼承了ReadableByteChannel并提供了同時往幾個ByteBuffer中寫數據的能力。

 

GatheringByteChannel

繼承了WritableByteChannel并提供了同時從幾個ByteBuffer中讀數據的能力。

 

InterruptibleChannel

用來表現一個可以被異步關閉的Channel。這表現在兩方面:

1.    當一個InterruptibleChannelclose()方法被調用時,其它block在這個InterruptibleChannelIO操作上的線程會接收到一個AsynchronousCloseException

2.    當一個線程blockInterruptibleChannelIO操作上時,另一個線程調用該線程的interrupt()方法會導致channel被關閉,該線程收到一個ClosedByInterruptException,同時線程的interrupt狀態會被設置。

 

 

接下來的這張UML類圖描述了java.nio.channels中類的關系:



 

異步IO

異步IO的支持可以算是NIO API中最重要的功能,異步IO允許應用程序同時監控多個channel以提高性能,這一功能是通過SelectorSelectableChannelSelectionKey3個類來實現的。

 

SelectableChannel代表了可以支持異步IO操作的channel,可以將其注冊在Selector上,這種注冊的關系由SelectionKey這個類來表現(見UML圖)。Selector這個類通過select()函數,給應用程序提供了一個可以同時監控多個IO channel的方法:

 

應用程序通過調用select()函數,讓Selector監控注冊在其上的多個SelectableChannel,當有channelIO操作可以進行時,select()方法就會返回以讓應用程序檢查channel的狀態,并作相應的處理。

 

下面是JDK 1.4中異步IO的一個例子,這段code使用了異步IO實現了一個time server

    private static void acceptConnections(int port) throws Exception {

       // 打開一個Selector

       Selector acceptSelector =

           SelectorProvider.provider().openSelector();

 

       // 創建一個ServerSocketChannel,這是一個SelectableChannel的子類

       ServerSocketChannel ssc = ServerSocketChannel.open();

       // 將其設為non-blocking狀態,這樣才能進行異步IO操作

       ssc.configureBlocking(false);

 

       // ServerSocketChannel對應的socket綁定IP和端口

       InetAddress lh = InetAddress.getLocalHost();

       InetSocketAddress isa = new InetSocketAddress(lh, port);

       ssc.socket().bind(isa);

 

       // ServerSocketChannel注冊到Selector上,返回對應的SelectionKey

       SelectionKey acceptKey =

           ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);

 

       int keysAdded = 0;

 

       // select()函數來監控注冊在Selector上的SelectableChannel

       // 返回值代表了有多少channel可以進行IO操作 (ready for IO)

       while ((keysAdded = acceptSelector.select()) > 0) {

           // selectedKeys()返回一個SelectionKey的集合,

           // 其中每個SelectionKey代表了一個可以進行IO操作的channel

           // 一個ServerSocketChannel可以進行IO操作意味著有新的TCP連接連入了

           Set readyKeys = acceptSelector.selectedKeys();

           Iterator i = readyKeys.iterator();

 

           while (i.hasNext()) {

              SelectionKey sk = (SelectionKey) i.next();

              // 需要將處理過的keyselectedKeys這個集合中刪除

              i.remove();

              // SelectionKey得到對應的channel

              ServerSocketChannel nextReady =

                  (ServerSocketChannel) sk.channel();

              // 接受新的TCP連接

              Socket s = nextReady.accept().socket();

              // 把當前的時間寫到這個新的TCP連接中

              PrintWriter out =

                  new PrintWriter(s.getOutputStream(), true);

              Date now = new Date();

              out.println(now);

              // 關閉連接

              out.close();

           }

       }

    }

這是個純粹用于演示的例子,因為只有一個ServerSocketChannel需要監控,所以其實并不真的需要使用到異步IO。不過正因為它的簡單,可以很容易地看清楚異步IO是如何工作的。

 

SelectableChannel

這個抽象類是所有支持異步IO操作的channel(如DatagramChannelSocketChannel)的父類。SelectableChannel可以注冊到一個或多個Selector上以進行異步IO操作。

 

SelectableChannel可以是blockingnon-blocking模式(所有channel創建的時候都是blocking模式),只有non-blockingSelectableChannel才可以參與異步IO操作。

 

SelectableChannel configureBlocking(boolean block)

       設置blocking模式。

boolean isBlocking()

       返回blocking模式。

 

通過register()方法,SelectableChannel可以注冊到Selector上。

 

int validOps()

返回一個bit mask,表示這個channel上支持的IO操作。當前在SelectionKey中,用靜態常量定義了4IO操作的bit值:OP_ACCEPTOP_CONNECTOP_READOP_WRITE

SelectionKey register(Selector sel, int ops)

將當前channel注冊到一個Selector上并返回對應的SelectionKey。在這以后,通過調用Selectorselect()函數就可以監控這個channelops這個參數是一個bit mask,代表了需要監控的IO操作。

SelectionKey register(Selector sel, int ops, Object att)

這個函數和上一個的意義一樣,多出來的att參數會作為attachment被存放在返回的SelectionKey中,這在需要存放一些session state的時候非常有用。

boolean isRegistered()

       channel是否已注冊在一個或多個Selector上。

 

SelectableChannel還提供了得到對應SelectionKey的方法:

 

SelectionKey keyFor(Selector sel)

返回該channeSelector上的注冊關系所對應的SelectionKey。若無注冊關系,返回null

 

Selector

Selector可以同時監控多個SelectableChannelIO狀況,是異步IO的核心。

 

Selector open()

       Selector的一個靜態方法,用于創建實例。

 

在一個Selector中,有3SelectionKey的集合:

1. key set代表了所有注冊在這個Selector上的channel,這個集合可以通過keys()方法拿到。

2. Selected-key set代表了所有通過select()方法監測到可以進行IO操作的channel,這個集合可以通過selectedKeys()拿到。

3. Cancelled-key set代表了已經cancel了注冊關系的channel,在下一個select()操作中,這些channel對應的SelectionKey會從key setcancelled-key set中移走。這個集合無法直接訪問。

 

以下是select()相關方法的說明:

 

int select()

監控所有注冊的channel,當其中有注冊的IO操作可以進行時,該函數返回,并將對應的SelectionKey加入selected-key set

int select(long timeout)

       可以設置超時的select()操作。

int selectNow()

       進行一個立即返回的select()操作。

Selector wakeup()

       使一個還未返回的select()操作立刻返回。

 

SelectionKey

代表了SelectorSelectableChannel的注冊關系。

 

Selector定義了4個靜態常量來表示4IO操作,這些常量可以進行位操作組合成一個bit mask

 

int OP_ACCEPT

有新的網絡連接可以acceptServerSocketChannel支持這一異步IO

int OP_CONNECT

       代表連接已經建立(或出錯),SocketChannel支持這一異步IO

int OP_READ

int OP_WRITE

       代表了讀、寫操作。

 

以下是其主要方法:

 

Object attachment()

返回SelectionKeyattachmentattachment可以在注冊channel的時候指定。

Object attach(Object ob)

       設置SelectionKeyattachment

SelectableChannel channel()

       返回該SelectionKey對應的channel

Selector selector()

       返回該SelectionKey對應的Selector

void cancel()

       cancel這個SelectionKey所對應的注冊關系。

int interestOps()

       返回代表需要Selector監控的IO操作的bit mask

SelectionKey interestOps(int ops)

       設置interestOps

int readyOps()

       返回一個bit mask,代表在相應channel上可以進行的IO操作。

 

ServerSocketChannel

支持異步操作,對應于java.net.ServerSocket這個類,提供了TCP協議IO接口,支持OP_ACCEPT操作。

 

ServerSocket socket()

       返回對應的ServerSocket對象。

SocketChannel accept()

       接受一個連接,返回代表這個連接的SocketChannel對象。

 

SocketChannel

支持異步操作,對應于java.net.Socket這個類,提供了TCP協議IO接口,支持OP_CONNECTOP_READOP_WRITE操作。這個類還實現了ByteChannelScatteringByteChannelGatheringByteChannel接口。

DatagramChannel和這個類比較相似,其對應于java.net.DatagramSocket,提供了UDP協議IO接口。

 

Socket socket()

       返回對應的Socket對象。

boolean connect(SocketAddress remote)

boolean finishConnect()

connect()進行一個連接操作。如果當前SocketChannelblocking模式,這個函數會等到連接操作完成或錯誤發生才返回。如果當前SocketChannelnon-blocking模式,函數在連接能立刻被建立時返回true,否則函數返回false,應用程序需要在以后用finishConnect()方法來完成連接操作。

 

Pipe

包含了一個讀和一個寫的channel(Pipe.SourceChannelPipe.SinkChannel),這對channel可以用于進程中的通訊。

 

FileChannel

用于對文件的讀、寫、映射、鎖定等操作。和映射操作相關的類有FileChannel.MapMode,和鎖定操作相關的類有FileLock。值得注意的是FileChannel并不支持異步操作。

 

Channels

這個類提供了一系列static方法來支持stream類和channel類之間的互操作。這些方法可以將channel類包裝為stream類,比如,將ReadableByteChannel包裝為InputStreamReader;也可以將stream類包裝為channel類,比如,將OutputStream包裝為WritableByteChannel

 

 

Package java.nio.charset

這個包定義了Charset及相應的encoderdecoder。下面這張UML類圖描述了這個包中類的關系,可以將其中CharsetCharsetDecoderCharsetEncoder理解成一個Abstract Factory模式的實現:



 

Charset

代表了一個字符集,同時提供了factory method來構建相應的CharsetDecoderCharsetEncoder

 

Charset提供了以下static的方法:

 

SortedMap availableCharsets()

       返回當前系統支持的所有Charset對象,用charset的名字作為setkey

boolean isSupported(String charsetName)

       判斷該名字對應的字符集是否被當前系統支持。

Charset forName(String charsetName)

       返回該名字對應的Charset對象。

 

Charset中比較重要的方法有:

 

String name()

       返回該字符集的規范名。

Set aliases()

       返回該字符集的所有別名。

CharsetDecoder newDecoder()

       創建一個對應于這個Charsetdecoder

CharsetEncoder newEncoder()

       創建一個對應于這個Charsetencoder

 

CharsetDecoder

將按某種字符集編碼的字節流解碼為unicode字符數據的引擎。

 

CharsetDecoder的輸入是ByteBuffer,輸出是CharBuffer。進行decode操作時一般按如下步驟進行:

 

1. 調用CharsetDecoderreset()方法。(第一次使用時可不調用)

2. 調用decode()方法0n次,將endOfInput參數設為false,告訴decoder有可能還有新的數據送入。

3. 調用decode()方法最后一次,將endOfInput參數設為true,告訴decoder所有數據都已經送入。

4. 調用decoderflush()方法。讓decoder有機會把一些內部狀態寫到輸出的CharBuffer中。

 

CharsetDecoder reset()

       重置decoder,并清除decoder中的一些內部狀態。

CoderResult decode(ByteBuffer in, CharBuffer out, boolean endOfInput)

ByteBuffer類型的輸入中decode盡可能多的字節,并將結果寫到CharBuffer類型的輸出中。根據decode的結果,可能返回3CoderResultCoderResult.UNDERFLOW表示已經沒有輸入可以decodeCoderResult.OVERFLOW表示輸出已滿;其它的CoderResult表示decode過程中有錯誤發生。根據返回的結果,應用程序可以采取相應的措施,比如,增加輸入,清除輸出等等,然后再次調用decode()方法。

CoderResult flush(CharBuffer out)

有些decoder會在decode的過程中保留一些內部狀態,調用這個方法讓這些decoder有機會將這些內部狀態寫到輸出的CharBuffer中。調用成功返回CoderResult.UNDERFLOW。如果輸出的空間不夠,該函數返回CoderResult.OVERFLOW,這時應用程序應該擴大輸出CharBuffer的空間,然后再次調用該方法。

CharBuffer decode(ByteBuffer in)

一個便捷的方法把ByteBuffer中的內容decode到一個新創建的CharBuffer中。在這個方法中包括了前面提到的4個步驟,所以不能和前3個函數一起使用。

 

decode過程中的錯誤有兩種:malformed-input CoderResult表示輸入中數據有誤;unmappable-character CoderResult表示輸入中有數據無法被解碼成unicode的字符。如何處理decode過程中的錯誤取決于decoder的設置。對于這兩種錯誤,decoder可以通過CodingErrorAction設置成:

1. 忽略錯誤

2. 報告錯誤。(這會導致錯誤發生時,decode()方法返回一個表示該錯誤的CoderResult。)

3. 替換錯誤,用decoder中的替換字串替換掉有錯誤的部分。

 

CodingErrorAction malformedInputAction()

       返回malformed-input的出錯處理。

CharsetDecoder onMalformedInput(CodingErrorAction newAction)

       設置malformed-input的出錯處理。

CodingErrorAction unmappableCharacterAction()

       返回unmappable-character的出錯處理。

CharsetDecoder onUnmappableCharacter(CodingErrorAction newAction)

       設置unmappable-character的出錯處理。

String replacement()

       返回decoder的替換字串。

CharsetDecoder replaceWith(String newReplacement)

       設置decoder的替換字串。

 

CharsetEncoder

unicode字符數據編碼為特定字符集的字節流的引擎。其接口和CharsetDecoder相類似。

 

CoderResult

描述encode/decode操作結果的類。

 

CodeResult包含兩個static成員:

 

CoderResult OVERFLOW

       表示輸出已滿

CoderResult UNDERFLOW

       表示輸入已無數據可用。

 

其主要的成員函數有:

 

boolean isError()

boolean isMalformed()

boolean isUnmappable()

boolean isOverflow()

boolean isUnderflow()

       用于判斷該CoderResult描述的錯誤。

 

int length()

       返回錯誤的長度,比如,無法被轉換成unicode的字節長度。

void throwException()

       拋出一個和這個CoderResult相對應的exception

 

CodingErrorAction

表示encoder/decoder中錯誤處理方法的類。可將其看成一個enum類型。有以下static屬性:

 

CodingErrorAction IGNORE

       忽略錯誤。

CodingErrorAction REPLACE

       用替換字串替換有錯誤的部分。

CodingErrorAction REPORT

報告錯誤,對于不同的函數,有可能是返回一個和錯誤有關的CoderResult,也有可能是拋出一個CharacterCodingException

 

 

參考文獻

David Flanagan – Java in a Nutshell


作者:DaiJiaLin                       

mailto:woodydai@gmail.com

http://blog.csdn.net/DaiJiaLin