<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 195, comments - 34, trackbacks - 0, articles - 1
    Java AIO初探(異步網(wǎng)絡(luò)IO) 

      按照《Unix網(wǎng)絡(luò)編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復(fù)用、信號驅(qū)動IO和異步IO,按照POSIX標(biāo)準(zhǔn)來劃分只分為兩類:同步IO和異步IO.如何區(qū)分呢?首先一個IO操作其實(shí)分成了兩個步驟:發(fā)起IO請求和實(shí)際的IO操作,同步IO和異步IO的區(qū)別就在于第二個步驟是否阻塞,如果實(shí)際的IO讀寫阻塞請求進(jìn)程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信號驅(qū)動IO都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那么就是異步IO.阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那么就是非阻塞IO.

        Java nio 2.0的主要改進(jìn)就是引入了異步IO(包括文件和網(wǎng)絡(luò)),這里主要介紹下異步網(wǎng)絡(luò)IO API的使用以及框架的設(shè)計(jì),以TCP服務(wù)端為例。首先看下為了支持AIO引入的新的類和接口:

        java.nio.channels.AsynchronousChannel標(biāo)記一個channel支持異步IO操作。

        java.nio.channels.AsynchronousServerSocketChannel ServerSocket的aio版本,創(chuàng)建TCP服務(wù)端,綁定地址,監(jiān)聽端口等。

        java.nio.channels.AsynchronousSocketChannel面向流的異步socket channel,表示一個連接。

        java.nio.channels.AsynchronousChannelGroup異步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup綁定一個線程池,這個線程池執(zhí)行兩個任務(wù):處理IO事件和派發(fā)CompletionHandler.AsynchronousServerSocketChannel創(chuàng)建的時候可以傳入一個AsynchronousChannelGroup,那么通過AsynchronousServerSocketChannel創(chuàng)建的AsynchronousSocketChannel將同屬于一個組,共享資源。

        java.nio.channels.CompletionHandler異步IO操作結(jié)果的回調(diào)接口,用于定義在IO操作完成后所作的回調(diào)工作。AIO的API允許兩種方式來處理異步操作的結(jié)果:返回的Future模式或者注冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的調(diào)用是由AsynchronousChannelGroup的線程池派發(fā)的。顯然,線程池的大小是性能的關(guān)鍵因素。AsynchronousChannelGroup允許綁定不同的線程池,通過三個靜態(tài)方法來創(chuàng)建:public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,

        ThreadFactory threadFactory)

        throws IOException

        public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,

        int initialSize)

        public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)

        throws IOException

        需要根據(jù)具體應(yīng)用相應(yīng)調(diào)整,從框架角度出發(fā),需要暴露這樣的配置選項(xiàng)給用戶。

        在介紹完了aio引入的TCP的主要接口和類之后,我們來設(shè)想下一個aio框架應(yīng)該怎么設(shè)計(jì)。參考非阻塞nio框架的設(shè)計(jì),一般都是采用Reactor模式,Reacot負(fù)責(zé)事件的注冊、select、事件的派發(fā);相應(yīng)地,異步IO有個Proactor模式,Proactor負(fù)責(zé)CompletionHandler的派發(fā),查看一個典型的IO寫操作的流程來看兩者的區(qū)別:

        Reactor:  send(msg) -> 消息隊(duì)列是否為空,如果為空  -> 向Reactor注冊O(shè)P_WRITE,然后返回 -> Reactor select -> 觸發(fā)Writable,通知用戶線程去處理 ->先注銷Writable(很多人遇到的cpu 100%的問題就在于沒有注銷),處理Writeable,如果沒有完全寫入,繼續(xù)注冊O(shè)P_WRITE.注意到,寫入的工作還是用戶線程在處理。

        Proactor: send(msg) -> 消息隊(duì)列是否為空,如果為空,發(fā)起read異步調(diào)用,并注冊CompletionHandler,然后返回。 -> 操作系統(tǒng)負(fù)責(zé)將你的消息寫入,并返回結(jié)果(寫入的字節(jié)數(shù))給Proactor -> Proactor派發(fā)CompletionHandler.可見,寫入的工作是操作系統(tǒng)在處理,無需用戶線程參與。事實(shí)上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。

        CompletionHandler有三個方法,分別對應(yīng)于處理成功、失敗、被取消(通過返回的Future)情況下的回調(diào)處理:

        public interface CompletionHandler {

        void completed(V result, A attachment);

        void failed(Throwable exc, A attachment);

        void cancelled(A attachment);

        }

        其中的泛型參數(shù)V表示IO調(diào)用的結(jié)果,而A是發(fā)起調(diào)用時傳入的attchment.

        在初步介紹完aio引入的類和接口后,我們看看一個典型的tcp服務(wù)端是怎么啟動的,怎么接受連接并處理讀和寫,這里引用的代碼都是yanf4j 的aio分支中的代碼,可以從svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

        第一步,創(chuàng)建一個AsynchronousServerSocketChannel,創(chuàng)建之前先創(chuàng)建一個AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以綁定一個AsynchronousChannelGroup,那么通過這個AsynchronousServerSocketChannel建立的連接都將同屬于一個AsynchronousChannelGroup并共享資源:this.asynchronousChannelGroup = AsynchronousChannelGroup。withCachedThreadPool(Executors.newCachedThreadPool(),this.threadPoolSize);然后初始化一個AsynchronousServerSocketChannel,通過open方法:this.serverSocketChannel = AsynchronousServerSocketChannel。open(this.asynchronousChannelGroup);通過nio 2.0引入的SocketOption類設(shè)置一些TCP選項(xiàng):this.serverSocketChannel。setOption(StandardSocketOption.SO_REUSEADDR,true);this.serverSocketChannel。setOption(StandardSocketOption.SO_RCVBUF,16*1024);

        綁定本地地址:

        this.serverSocketChannel。bind(new InetSocketAddress("localhost",8080), 100);其中的100用于指定等待連接的隊(duì)列大小(backlog)。完了嗎?還沒有,最重要的監(jiān)聽工作還沒開始,監(jiān)聽端口是為了等待連接上來以便accept產(chǎn)生一個AsynchronousSocketChannel來表示一個新建立的連接,因此需要發(fā)起一個accept調(diào)用,調(diào)用是異步的,操作系統(tǒng)將在連接建立后,將最后的結(jié)果——AsynchronousSocketChannel返回給你:

        public void pendingAccept(){

        if (this.started  this.serverSocketChannel.isOpen()) { this.acceptFuture = this.serverSocketChannel.accept(null,

        new AcceptCompletionHandler());

        } else {

        throw new IllegalStateException("Controller has been closed");

        }

        注意,重復(fù)的accept調(diào)用將會拋出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一個參數(shù)是你想傳給CompletionHandler的attchment,第二個參數(shù)就是注冊的用于回調(diào)的CompletionHandler,最后返回結(jié)果Future.你可以對future做處理,這里采用更推薦的方式就是注冊一個CompletionHandler.那么accept的CompletionHandler中做些什么工作呢?顯然一個赤裸裸的AsynchronousSocketChannel是不夠的,我們需要將它封裝成session,一個session表示一個連接(mina里就叫IoSession了),里面帶了一個緩沖的消息隊(duì)列以及一些其他資源等。在連接建立后,除非你的服務(wù)器只準(zhǔn)備接受一個連接,不然你需要在后面繼續(xù)調(diào)用pendingAccept來發(fā)起另一個accept請求:

        private final class AcceptCompletionHandler implements

        CompletionHandler {

        @Override

        public void cancelled(Object attachment){

        logger.warn("Accept operation was canceled");

        }

      @Override

        public void completed(AsynchronousSocketChannel socketChannel,

        Object attachment){

        try {

        logger.debug("Accept connection from " + socketChannel.getRemoteAddress());

        configureChannel(socketChannel);

        AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);

        Session session = new AioTCPSession(sessionConfig,AioTCPController.this.configuration。getSessionReadBufferSize(),AioTCPController.this.sessionTimeout);session.start();

        registerSession(session);

        } catch(Exception e){

        e.printStackTrace();logger.error("Accept error", e);

        notifyException(e);

        } finally {

        pendingAccept();

        }

        @Override

        public void failed(Throwable exc, Object attachment) { logger.error("Accept error", exc);

        try {

        notifyException(exc);

        } finally {

        pendingAccept();

        }

        注意到了吧,我們在failed和completed方法中在最后都調(diào)用了pendingAccept來繼續(xù)發(fā)起accept調(diào)用,等待新的連接上來。有的同學(xué)可能要說了,這樣搞是不是遞歸調(diào)用,會不會堆棧溢出?實(shí)際上不會,因?yàn)榘l(fā)起accept調(diào)用的線程與CompletionHandler回調(diào)的線程并非同一個,不是一個上下文中,兩者之間沒有耦合關(guān)系。要注意到,CompletionHandler的回調(diào)共用的是AsynchronousChannelGroup綁定的線程池,因此千萬別在回調(diào)方法中調(diào)用阻塞或者長時間的操作,例如sleep,回調(diào)方法最好能支持超時,防止線程池耗盡。

        連接建立后,怎么讀和寫呢?回憶下在nonblocking nio框架中,連接建立后的第一件事是干什么?注冊O(shè)P_READ事件等待socket可讀。異步IO也同樣如此,連接建立后馬上發(fā)起一個異步read調(diào)用,等待socket可讀,這個是Session.start方法中所做的事情:

        public class AioTCPSession {

        protected void start0(){

        pendingRead();

        }

        protected final void pendingRead(){

        if (!isClosed()  this.asynchronousSocketChannel.isOpen()) { if (!this.readBuffer.hasRemaining()) { this.readBuffer = ByteBufferUtils。increaseBufferCapatity(this.readBuffer);

        }

        this.readFuture = this.asynchronousSocketChannel.read(this.readBuffer, this, this.readCompletionHandler);

        } else {

        throw new IllegalStateException(

        "Session Or Channel has been closed");

        }

        }

        AsynchronousSocketChannel的read調(diào)用與AsynchronousServerSocketChannel的accept調(diào)用類似,同樣是非阻塞的,返回結(jié)果也是一個Future,但是寫的結(jié)果是整數(shù),表示寫入了多少字節(jié),因此read調(diào)用返回的是Future,方法的第一個參數(shù)是讀的緩沖區(qū),操作系統(tǒng)將IO讀到數(shù)據(jù)拷貝到這個緩沖區(qū),第二個參數(shù)是傳遞給CompletionHandler的attchment,第三個參數(shù)就是注冊的用于回調(diào)的CompletionHandler.這里保存了read的結(jié)果Future,這是為了在關(guān)閉連接的時候能夠主動取消調(diào)用,accept也是如此。現(xiàn)在可以看看read的CompletionHandler的實(shí)現(xiàn):

        public final class ReadCompletionHandler implements

        CompletionHandler {

        private static final Logger log = LoggerFactory

        。getLogger(ReadCompletionHandler.class);

        protected final AioTCPController controller;

        public ReadCompletionHandler(AioTCPController controller){

        this.controller = controller;

        }

        @Override

        public void cancelled(AbstractAioSession session){

        log.warn("Session(" + session.getRemoteSocketAddress()

        + ")read operation was canceled");

        }

        @Override

        public void completed(Integer result, AbstractAioSession session) { if (log.isDebugEnabled())

        log.debug("Session(" + session.getRemoteSocketAddress()

        + ")read +" + result + " bytes");

        if(result 0){

        session.updateTimeStamp();session.getReadBuffer()。flip();session.decode();session.getReadBuffer()。compact();

        }

        } finally {

        try {

        session.pendingRead();

        } catch(IOException e){

        session.onException(e);session.close();

        }

        controller.checkSessionTimeout();

        }

        @Override

        public void failed(Throwable exc, AbstractAioSession session) { log.error("Session read error", exc);session.onException(exc);session.close();

        }

        }

     如果IO讀失敗,會返回失敗產(chǎn)生的異常,這種情況下我們就主動關(guān)閉連接,通過session.close()方法,這個方法干了兩件事情:關(guān)閉channel和取消read調(diào)用:if (null != this.readFuture) { this.readFuture.cancel(true);

        }

        this.asynchronousSocketChannel.close();   在讀成功的情況下,我們還需要判斷結(jié)果result是否小于0,如果小于0就表示對端關(guān)閉了,這種情況下我們也主動關(guān)閉連接并返回。如果讀到一定字節(jié),也就是result大于0的情況下,我們就嘗試從讀緩沖區(qū)中decode出消息,并派發(fā)給業(yè)務(wù)處理器的回調(diào)方法,最終通過pendingRead繼續(xù)發(fā)起read調(diào)用等待socket的下一次可讀。可見,我們并不需要自己去調(diào)用channel來進(jìn)行IO讀,而是操作系統(tǒng)幫你直接讀到了緩沖區(qū),然后給你一個結(jié)果表示讀入了多少字節(jié),你處理這個結(jié)果即可。而nonblocking IO框架中,是reactor通知用戶線程socket可讀了,然后用戶線程自己去調(diào)用read進(jìn)行實(shí)際讀操作。這里還有個需要注意的地方,就是decode出來的消息的派發(fā)給業(yè)務(wù)處理器工作最好交給一個線程池來處理,避免阻塞group綁定的線程池。

        IO寫的操作與此類似,不過通常寫的話我們會在session中關(guān)聯(lián)一個緩沖隊(duì)列來處理,沒有完全寫入或者等待寫入的消息都存放在隊(duì)列中,隊(duì)列為空的情況下發(fā)起write調(diào)用:

        protected void write0(WriteMessage message){

        boolean needWrite = false;

        synchronized (this.writeQueue) { needWrite = this.writeQueue.isEmpty();this.writeQueue.offer(message);

        }

        if(needWrite){

        pendingWrite(message);

        }

        protected final void pendingWrite(WriteMessage message){

        message = preprocessWriteMessage(message);

        if (!isClosed()  this.asynchronousSocketChannel.isOpen()) { this.asynchronousSocketChannel.write(message.getWriteBuffer(),this, this.writeCompletionHandler);

        } else {

        throw new IllegalStateException(

        "Session Or Channel has been closed");

        }

        write調(diào)用返回的結(jié)果與read一樣是一個Future,而write的CompletionHandler處理的核心邏輯大概是這樣:

        @Override

        public void completed(Integer result, AbstractAioSession session) { if (log.isDebugEnabled())

        log.debug("Session(" + session.getRemoteSocketAddress()

        + ")writen " + result + " bytes");

        WriteMessage writeMessage;

        Queue writeQueue = session.getWriteQueue();

        synchronized(writeQueue){

        writeMessage = writeQueue.peek();if (writeMessage.getWriteBuffer() == null || !writeMessage.getWriteBuffer()。hasRemaining()) { writeQueue.remove();if (writeMessage.getWriteFuture() != null) { writeMessage.getWriteFuture()。setResult(Boolean.TRUE);

        }

        try {

        session.getHandler()。onMessageSent(session,writeMessage.getMessage());

        } catch(Exception e){

        session.onException(e);

        }

        writeMessage = writeQueue.peek();

        }

        if (writeMessage != null) {

        try {

        session.pendingWrite(writeMessage);

        } catch(IOException e){

        session.onException(e);session.close();

        }

        compete方法中的result就是實(shí)際寫入的字節(jié)數(shù),然后我們判斷消息的緩沖區(qū)是否還有剩余,如果沒有就將消息從隊(duì)列中移除,如果隊(duì)列中還有消息,那么繼續(xù)發(fā)起write調(diào)用。

        重復(fù)一下,這里引用的代碼都是yanf4j aio分支中的源碼,感興趣的朋友可以直接check out出來看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio.在引入了aio之后,java對于網(wǎng)絡(luò)層的支持已經(jīng)非常完善,該有的都有了,java也已經(jīng)成為服務(wù)器開發(fā)的首選語言之一。java的弱項(xiàng)在于對內(nèi)存的管理上,由于這一切都交給了GC,因此在高性能的網(wǎng)絡(luò)服務(wù)器上還是Cpp的天下。java這種單一堆模型比之erlang的進(jìn)程內(nèi)堆模型還是有差距,很難做到高效的垃圾回收和細(xì)粒度的內(nèi)存管理。

        這里僅僅是介紹了aio開發(fā)的核心流程,對于一個網(wǎng)絡(luò)框架來說,還需要考慮超時的處理、緩沖buffer的處理、業(yè)務(wù)層和網(wǎng)絡(luò)層的切分、可擴(kuò)展性、性能的可調(diào)性以及一定的通用性要求。



    主站蜘蛛池模板: 四虎影视精品永久免费网站| 人人狠狠综合久久亚洲88| 老司机午夜性生免费福利| 久久久久亚洲av毛片大| 在线观看的免费网站无遮挡| 亚洲一区二区无码偷拍| 亚洲国产综合无码一区| 24小时日本在线www免费的| 日本在线观看免费高清| 91天堂素人精品系列全集亚洲| 日韩高清在线免费看| 国产一区二区免费视频| 亚洲精品无码久久久久秋霞| 好看的亚洲黄色经典| 女人18毛片a级毛片免费视频| caoporn成人免费公开| 亚洲乱码中文字幕小综合| 亚洲精品无码激情AV| 手机看黄av免费网址| 国产一级高青免费| 亚洲另类无码专区丝袜| 亚洲男人第一av网站| 亚洲v国产v天堂a无码久久| 91在线视频免费91| 最好免费观看高清在线| 粉色视频免费入口| wwwxxx亚洲| 亚洲久本草在线中文字幕| 亚洲国产一区二区视频网站| 欧美大尺寸SUV免费| 秋霞人成在线观看免费视频 | 人成免费在线视频| 亚洲伊人精品综合在合线| 久久精品夜色国产亚洲av| 亚洲人成网站18禁止一区| 成年女人色毛片免费看| 在线精品一卡乱码免费| 日韩免费电影网址| 久久久久久久久久久免费精品| 女bbbbxxxx另类亚洲| ASS亚洲熟妇毛茸茸PICS|