<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    上善若水
    In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
    posts - 146,comments - 147,trackbacks - 0

    前記

    很早以前就有讀Netty源碼的打算了,然而第一次嘗試的時候從Netty4開始,一直抓不到核心的框架流程,后來因為其他事情忙著就放下了。這次趁著休假重新撿起這個硬骨頭,因為Netty3現在還在被很多項目使用,因而這次決定先從Netty3入手,瞬間發現Netty3的代碼比Netty4中規中矩的多,很多概念在代碼本身中都有清晰的表達,所以半天就把整個框架的骨架搞清楚了。再讀Netty4對Netty3的改進總結,回去讀Netty4的源碼,反而覺得輕松了,一種豁然開朗的感覺。

    記得去年讀Jetty源碼的時候,因為代碼太龐大,并且自己的HTTP Server的了解太少,因而只能自底向上的一個一個模塊的疊加,直到最后把所以的模塊連接在一起而看清它的真正核心骨架。現在讀源碼,開始習慣先把骨架理清,然后延伸到不同的器官、血肉而看清整個人體。

    本文從Reactor模式在Netty3中的應用,引出Netty3的整體架構以及控制流程;然而除了Reactor模式,Netty3還在ChannelPipeline中使用了Intercepting Filter模式,這個模式也在Servlet的Filter中成功使用,因而本文還會從Intercepting Filter模式出發詳細介紹ChannelPipeline的設計理念。本文假設讀者已經對Netty有一定的了解,因而不會包含過多入門介紹,以及幫Netty做宣傳的文字。

    Netty3中的Reactor模式

    Reactor模式在Netty中應用非常成功,因而它也是在Netty中受大肆宣傳的模式,關于Reactor模式可以詳細參考本人的另一篇文章《Reactor模式詳解》,對Reactor模式的實現是Netty3的基本骨架,因而本小節會詳細介紹Reactor模式如何應用Netty3中。

    如果讀《Reactor模式詳解》,我們知道Reactor模式由Handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler構成,在Java的實現版本中,Channel對應Handle,Selector對應Synchronous Event Demultiplexer,并且Netty3還使用了兩層Reactor:Main Reactor用于處理Client的連接請求,Sub Reactor用于處理和Client連接后的讀寫請求(關于這個概念還可以參考Doug Lea的這篇PPT:Scalable IO In Java)。所以我們先要解決Netty3中使用什么類實現所有的上述模塊并把他們聯系在一起的,以NIO實現方式為例:

    模式是一種抽象,但是在實現中,經常會因為語言特性、框架和性能需要而做一些改變,因而Netty3對Reactor模式的實現有一套自己的設計:
    1. ChannelEvent:Reactor是基于事件編程的,因而在Netty3中使用ChannelEvent抽象的表達Netty3內部可以產生的各種事件,所有這些事件對象在Channels幫助類中產生,并且由它將事件推入到ChannelPipeline中,ChannelPipeline構建ChannelHandler管道,ChannelEvent流經這個管道實現所有的業務邏輯處理。ChannelEvent對應的事件有:ChannelStateEvent表示Channel狀態的變化事件,而如果當前Channel存在Parent Channel,則該事件還會傳遞到Parent Channel的ChannelPipeline中,如OPEN、BOUND、CONNECTED、INTEREST_OPS等,該事件可以在各種不同實現的Channel、ChannelSink中產生;MessageEvent表示從Socket中讀取數據完成、需要向Socket寫數據或ChannelHandler對當前Message解析(如Decoder、Encoder)后觸發的事件,它由NioWorker、需要對Message做進一步處理的ChannelHandler產生;WriteCompletionEvent表示寫完成而觸發的事件,它由NioWorker產生;ExceptionEvent表示在處理過程中出現的Exception,它可以發生在各個構件中,如Channel、ChannelSink、NioWorker、ChannelHandler中;IdleStateEvent由IdleStateHandler觸發,這也是一個ChannelEvent可以無縫擴展的例子。注:在Netty4后,已經沒有ChannelEvent類,所有不同事件都用對應方法表達,這也意味這ChannelEvent不可擴展,Netty4采用在ChannelInboundHandler中加入userEventTriggered()方法來實現這種擴展,具體可以參考這里
    2. ChannelHandler:在Netty3中,ChannelHandler用于表示Reactor模式中的EventHandler。ChannelHandler只是一個標記接口,它有兩個子接口:ChannelDownstreamHandler和ChannelUpstreamHandler,其中ChannelDownstreamHandler表示從用戶應用程序流向Netty3內部直到向Socket寫數據的管道,在Netty4中改名為ChannelOutboundHandler;ChannelUpstreamHandler表示數據從Socket進入Netty3內部向用戶應用程序做數據處理的管道,在Netty4中改名為ChannelInboundHandler。
    3. ChannelPipeline:用于管理ChannelHandler的管道,每個Channel一個ChannelPipeline實例,可以運行過程中動態的向這個管道中添加、刪除ChannelHandler(由于實現的限制,在最末端的ChannelHandler向后添加或刪除ChannelHandler不一定在當前執行流程中起效,參考這里)。ChannelPipeline內部維護一個ChannelHandler的雙向鏈表,它以Upstream(Inbound)方向為正向,Downstream(Outbound)方向為方向。ChannelPipeline采用Intercepting Filter模式實現,具體可以參考這里,這個模式的實現在后一節中還是詳細介紹。
    4. NioSelector:Netty3使用NioSelector來存放Selector(Synchronous Event Demultiplexer),每個新產生的NIO Channel都向這個Selector注冊自己以讓這個Selector監聽這個NIO Channel中發生的事件,當事件發生時,調用幫助類Channels中的方法生成ChannelEvent實例,將該事件發送到這個Netty Channel對應的ChannelPipeline中,而交給各級ChannelHandler處理。其中在向Selector注冊NIO Channel時,Netty Channel實例以Attachment的形式傳入,該Netty Channel在其內部的NIO Channel事件發生時,會以Attachment的形式存在于SelectionKey中,因而每個事件可以直接從這個Attachment中獲取相關鏈的Netty Channel,并從Netty Channel中獲取與之相關聯的ChannelPipeline,這個實現和Doug Lea的Scalable IO In Java一模一樣。另外Netty3還采用了Scalable IO In Java中相同的Main Reactor和Sub Reactor設計,其中NioSelector的兩個實現:Boss即為Main Reactor,NioWorker為Sub Reactor。Boss用來處理新連接加入的事件,NioWorker用來處理各個連接對Socket的讀寫事件,其中Boss通過NioWorkerPool獲取NioWorker實例,Netty3模式使用RoundRobin方式放回NioWorker實例。更形象一點的,可以通過Scalable IO In Java的這張圖表達:

    若與Ractor模式對應,NioSelector中包含了Synchronous Event Demultiplexer,而ChannelPipeline中管理著所有EventHandler,因而NioSelector和ChannelPipeline共同構成了Initiation Dispatcher。
    5. ChannelSink:在ChannelHandler處理完成所有邏輯需要向客戶端寫響應數據時,一般會調用Netty Channel中的write方法,然而在這個write方法實現中,它不是直接向其內部的Socket寫數據,而是交給Channels幫助類,內部創建DownstreamMessageEvent,反向從ChannelPipeline的管道中流過去,直到第一個ChannelHandler處理完畢,最后交給ChannelSink處理,以避免阻塞寫而影響程序的吞吐量。ChannelSink將這個MessageEvent提交給Netty Channel中的writeBufferQueue,最后NioWorker會等到這個NIO Channel已經可以處理寫事件時無阻塞的向這個NIO Channel寫數據。這就是上圖的send是從SubReactor直接出發的原因。
    6. Channel:Netty有自己的Channel抽象,它是一個資源的容器,包含了所有一個連接涉及到的所有資源的飲用,如封裝NIO Channel、ChannelPipeline、Boss、NioWorkerPool等。另外它還提供了向內部NIO Channel寫響應數據的接口write、連接/綁定到某個地址的connect/bind接口等,個人感覺雖然對Channel本身來說,因為它封裝了NIO Channel,因而這些接口定義在這里是合理的,但是如果考慮到Netty的架構,它的Channel只是一個資源容器,有這個Channel實例就可以得到和它相關的基本所有資源,因而這種write、connect、bind動作不應該再由它負責,而是應該由其他類來負責,比如在Netty4中就在ChannelHandlerContext添加了write方法,雖然netty4并沒有刪除Channel中的write接口。

    Netty3中的Intercepting Filter模式

    如果說Reactor模式是Netty3的骨架,那么Intercepting Filter模式則是Netty的中樞。Reactor模式主要應用在Netty3的內部實現,它是Netty3具有良好性能的基礎,而Intercepting Filter模式則是ChannelHandler組合實現一個應用程序邏輯的基礎,只有很好的理解了這個模式才能使用好Netty,甚至能得心應手。

    關于Intercepting Filter模式的詳細介紹可以參考這里,本節主要介紹Netty3中對Intercepting Filter模式的實現,其實就是DefaultChannelPipeline對Intercepting Filter模式的實現。在上文有提到Netty3的ChannelPipeline是ChannelHandler的容器,用于存儲與管理ChannelHandler,同時它在Netty3中也起到橋梁的作用,即它是連接Netty3內部到所有ChannelHandler的橋梁。作為ChannelPipeline的實現者DefaultChannelPipeline,它使用一個ChannelHandler的雙向鏈表來存儲,以DefaultChannelPipelineContext作為節點:
    public interface ChannelHandlerContext {
        Channel getChannel();

        ChannelPipeline getPipeline();

        String getName();

        ChannelHandler getHandler();

        
    boolean canHandleUpstream();
        
    boolean canHandleDownstream();
        
    void sendUpstream(ChannelEvent e);
        
    void sendDownstream(ChannelEvent e);
        Object getAttachment();

        
    void setAttachment(Object attachment);
    }

    private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
       
    volatile DefaultChannelHandlerContext next;
       
    volatile DefaultChannelHandlerContext prev;
       
    private final String name;
       
    private final ChannelHandler handler;
       
    private final boolean canHandleUpstream;
       
    private final boolean canHandleDownstream;
       
    private volatile Object attachment;
    .....
    }
    在DefaultChannelPipeline中,它存儲了和當前ChannelPipeline相關聯的Channel、ChannelSink以及ChannelHandler鏈表的head、tail,所有ChannelEvent通過sendUpstream、sendDownstream為入口流經整個鏈表:
    public class DefaultChannelPipeline implements ChannelPipeline {
        
    private volatile Channel channel;
        
    private volatile ChannelSink sink;
        
    private volatile DefaultChannelHandlerContext head;
        
    private volatile DefaultChannelHandlerContext tail;
    ......
        
    public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext head 
    = getActualUpstreamContext(this.head);
            
    if (head == null) {
                
    return;
            }
            sendUpstream(head, e);
        }

        
    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            
    try {
                ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
            } 
    catch (Throwable t) {
                notifyHandlerException(e, t);
            }
        }

        
    public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext tail 
    = getActualDownstreamContext(this.tail);
            
    if (tail == null) {
                
    try {
                    getSink().eventSunk(
    this, e);
                    
    return;
                } 
    catch (Throwable t) {
                    notifyHandlerException(e, t);
                    
    return;
                }
            }
            sendDownstream(tail, e);
        }

        
    void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            
    if (e instanceof UpstreamMessageEvent) {
                
    throw new IllegalArgumentException("cannot send an upstream event to downstream");
            }
            
    try {
                ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
            } 
    catch (Throwable t) {
                e.getFuture().setFailure(t);
                notifyHandlerException(e, t);
            }
        }
    對Upstream事件,向后找到所有實現了ChannelUpstreamHandler接口的ChannelHandler組成鏈(getActualUpstreamContext()),而對Downstream事件,向前找到所有實現了ChannelDownstreamHandler接口的ChannelHandler組成鏈(getActualDownstreamContext()):
        private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
            
    if (ctx == null) {
                
    return null;
            }
            DefaultChannelHandlerContext realCtx 
    = ctx;
            
    while (!realCtx.canHandleUpstream()) {
                realCtx 
    = realCtx.next;
                
    if (realCtx == null) {
                    
    return null;
                }
            }
            
    return realCtx;
        }
        
    private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
            
    if (ctx == null) {
                
    return null;
            }
            DefaultChannelHandlerContext realCtx 
    = ctx;
            
    while (!realCtx.canHandleDownstream()) {
                realCtx 
    = realCtx.prev;
                
    if (realCtx == null) {
                    
    return null;
                }
            }
            
    return realCtx;
        }
    在實際實現ChannelUpstreamHandler或ChannelDownstreamHandler時,調用 ChannelHandlerContext中的sendUpstream或sendDownstream方法將控制流程交給下一個 ChannelUpstreamHandler或下一個ChannelDownstreamHandler,或調用Channel中的write方法發送 響應消息。
    public class MyChannelUpstreamHandler implements ChannelUpstreamHandler {
        
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            
    // handle current logic, use Channel to write response if needed.
            
    // ctx.getChannel().write(message);
            ctx.sendUpstream(e);
        }
    }

    public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
        
    public void handleDownstream(
                ChannelHandlerContext ctx, ChannelEvent e) 
    throws Exception {
            
    // handle current logic
            ctx.sendDownstream(e);
        }
    }
    當ChannelHandler向ChannelPipelineContext發送事件時,其內部從當前ChannelPipelineContext節點出發找到下一個ChannelUpstreamHandler或ChannelDownstreamHandler實例,并向其發送ChannelEvent,對于Downstream鏈,如果到達鏈尾,則將ChannelEvent發送給ChannelSink:
    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext prev 
    = getActualDownstreamContext(this.prev);
       
    if (prev == null) {
           
    try {
                getSink().eventSunk(DefaultChannelPipeline.
    this, e);
            } 
    catch (Throwable t) {
                notifyHandlerException(e, t);
            }
        } 
    else {
            DefaultChannelPipeline.
    this.sendDownstream(prev, e);
        }
    }

    public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext next 
    = getActualUpstreamContext(this.next);
       
    if (next != null) {
            DefaultChannelPipeline.
    this.sendUpstream(next, e);
        }
    }
    正是因為這個實現,如果在一個末尾的ChannelUpstreamHandler中先移除自己,在向末尾添加一個新的ChannelUpstreamHandler,它是無效的,因為它的next已經在調用前就固定設置為null了。

    ChannelPipeline作為ChannelHandler的容器,它還提供了各種增、刪、改ChannelHandler鏈表中的方法,而且如果某個ChannelHandler還實現了LifeCycleAwareChannelHandler,則該ChannelHandler在被添加進ChannelPipeline或從中刪除時都會得到同志:
    public interface LifeCycleAwareChannelHandler extends ChannelHandler {
        
    void beforeAdd(ChannelHandlerContext ctx) throws Exception;
        
    void afterAdd(ChannelHandlerContext ctx) throws Exception;
        
    void beforeRemove(ChannelHandlerContext ctx) throws Exception;
        
    void afterRemove(ChannelHandlerContext ctx) throws Exception;
    }

    public interface ChannelPipeline {
        
    void addFirst(String name, ChannelHandler handler);
        
    void addLast(String name, ChannelHandler handler);
        
    void addBefore(String baseName, String name, ChannelHandler handler);
        
    void addAfter(String baseName, String name, ChannelHandler handler);
        
    void remove(ChannelHandler handler);
        ChannelHandler remove(String name);

        
    <extends ChannelHandler> T remove(Class<T> handlerType);
        ChannelHandler removeFirst();

        ChannelHandler removeLast();

        
    void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
        ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

        
    <extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
        ChannelHandler getFirst();

        ChannelHandler getLast();

        ChannelHandler get(String name);

        
    <extends ChannelHandler> T get(Class<T> handlerType);
        ChannelHandlerContext getContext(ChannelHandler handler);

        ChannelHandlerContext getContext(String name);

        ChannelHandlerContext getContext(Class
    <? extends ChannelHandler> handlerType);
        
    void sendUpstream(ChannelEvent e);
        
    void sendDownstream(ChannelEvent e);
        ChannelFuture execute(Runnable task);

        Channel getChannel();

        ChannelSink getSink();

        
    void attach(Channel channel, ChannelSink sink);
        
    boolean isAttached();
        List
    <String> getNames();
        Map
    <String, ChannelHandler> toMap();
    }

    在DefaultChannelPipeline的ChannelHandler鏈條的處理流程為:

    參考:

    《Netty主頁》
    《Netty源碼解讀(四)Netty與Reactor模式》
    《Netty代碼分析》
    Scalable IO In Java
    Intercepting Filter Pattern
    posted on 2015-09-04 09:40 DLevin 閱讀(7672) 評論(0)  編輯  收藏 所屬分類: ArchitectureNetty
    主站蜘蛛池模板: 国产亚洲av片在线观看16女人| 激情综合亚洲色婷婷五月APP| 免费无码VA一区二区三区| 亚洲国产成人资源在线软件| 成**人免费一级毛片| 国产成人无码免费网站| 亚洲精品国产专区91在线| 国产gav成人免费播放视频| 免费人成网站在线观看不卡 | 久久精品国产精品亚洲艾草网美妙| 免费观看男人吊女人视频| 亚洲色无码专区一区| 久久亚洲高清观看| 白白国产永久免费视频| 久久免费观看国产精品88av| 国产精品亚洲综合一区在线观看| 中文字幕亚洲精品| 久久影院亚洲一区| 最新中文字幕电影免费观看| 玖玖在线免费视频| 在线观看免费亚洲| 亚洲国产精品无码久久久| 亚洲女久久久噜噜噜熟女| 日本免费电影一区| 黄色成人免费网站| 在线看片免费人成视频福利| 精品亚洲av无码一区二区柚蜜| 亚洲狠狠狠一区二区三区| 亚洲精品无码久久久久sm| 国产精品成人免费综合| 男女做羞羞的事视频免费观看无遮挡| 久久久WWW免费人成精品| 亚洲6080yy久久无码产自国产| 亚洲欧洲日产国码在线观看| 亚洲精品蜜桃久久久久久| 亚洲av无码乱码在线观看野外 | 成年在线观看免费人视频草莓| 日韩视频在线观看免费| aa在线免费观看| 乱爱性全过程免费视频| 18禁亚洲深夜福利人口|