<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0

    UNIX網(wǎng)絡(luò)編程5種I/O模型

    • I/O 復(fù)用模型(最大的優(yōu)勢(shì)是多路復(fù)用)
      Linux提供select/poll,進(jìn)程通過將一個(gè)或多個(gè)fd傳遞給select或poll系統(tǒng)調(diào)用,阻塞在select操作上,這樣select/poll可以幫我們偵測(cè)多個(gè)fd是否處于就緒狀態(tài)。select/poll是順序掃描fd是否就緒,而且支持的fd數(shù)量有限,因此它的使用受到了一些制約。Linux還提供了一個(gè)epoll系統(tǒng)調(diào)用,epoll使用基于事件驅(qū)動(dòng)方式代替順序掃描,因此性能更高。當(dāng)有fd就緒時(shí),立即回調(diào)函數(shù)rollback
    • I/O 多路復(fù)用技術(shù)
      • I/O 多路復(fù)用技術(shù)通過把多個(gè)I/O 的阻塞復(fù)用到同一個(gè)select的阻塞上,從而使得系統(tǒng)在單線程的情況下可以同時(shí)處理多個(gè)客戶端請(qǐng)求
      • 目前支持I/O多路復(fù)用的系統(tǒng)調(diào)用有select、pselect、poll、epoll,在 Linux網(wǎng)絡(luò)編程過程中,很長一段時(shí)間都使用select做輪詢和網(wǎng)絡(luò)事件通知,然而select的些固有缺陷導(dǎo)致了它的應(yīng)用受到了很大的限制,最終Linux不得不在新的內(nèi)核版本中尋找select的替代方案,最終選擇了epoll
        • 支持一個(gè)進(jìn)程打開的socket描述符(FD ) 不受限制(僅受限于操作系統(tǒng)的最大文 件句柄數(shù))
          • select最大的缺陷就是單個(gè)進(jìn)程所打開的FD是有一定限制的,它由FD_SETSIZE設(shè) 置,默認(rèn)值是1024,選擇修改這個(gè)宏需要重新編譯內(nèi)核且網(wǎng)絡(luò)效率會(huì)下降
          • cat /proc/sys/fs/file- max
        • I/O 效率不會(huì)隨著FD數(shù)目的增加而線性下降
          • 由于網(wǎng)絡(luò)延時(shí)或者鏈路空閑,任一時(shí)刻只有少部分的socket是 “活躍”的,但是select/poll每次調(diào)用都會(huì)線性掃描全部的集合,導(dǎo)致效率呈現(xiàn)線性下降。epoll不存在這個(gè)問題,它只會(huì)對(duì)“活躍”的socket進(jìn)行操作
        • 使用mmap加速內(nèi)核與用戶空間的消息傳遞
          • 無論是select、poll還是epoll都需要內(nèi)核把FD消息通知給用戶空間,如何避免不必 要的內(nèi)存復(fù)制就顯得非常重要,epoll是通過內(nèi)核和用戶空間mmap同一塊內(nèi)存來實(shí)現(xiàn)的
          • mmap-map files or devices into memory
        • epoll的API更加簡(jiǎn)單
      • 用來克服select/poll缺點(diǎn)的方法不只有epoll, epoll只是一種Linux的實(shí)現(xiàn)方案。在 freeBSD下有kqueue
    • 從5種I/O模型來看,其實(shí)都涉及到兩個(gè)階段
      • 等待數(shù)據(jù)準(zhǔn)備就緒
      • 數(shù)據(jù)從內(nèi)核復(fù)制到用戶空間
        • 對(duì)于阻塞io,調(diào)用recvfrom,阻塞直到第二個(gè)階段完成或者錯(cuò)誤才返回
        • 對(duì)于非阻塞io,調(diào)用recvfrom,如果緩沖區(qū)沒有數(shù)據(jù)則直接返回錯(cuò)誤,一般都對(duì)非阻塞I/O 模型進(jìn)行輪詢檢査這個(gè)狀態(tài),看內(nèi)核是不是有數(shù)據(jù)到來;數(shù)據(jù)準(zhǔn)備后,第二個(gè)階段也是阻塞的
        • 對(duì)于I/O復(fù)用模型,第一個(gè)階段進(jìn)程阻塞在select調(diào)用,等待1個(gè)或多個(gè)套接字(多路)變?yōu)榭勺x,而第二個(gè)階段是阻塞的
          • 這里進(jìn)程是被select阻塞但不是被socket io阻塞
          • java nio實(shí)現(xiàn)
            • 是否阻塞configureBlocking(boolean block)
            • selector事件到來時(shí)(只是判斷是否可讀/可寫)->具體的讀寫還是由阻塞和非阻塞決定->如阻塞模式下,如果輸入流不足r字節(jié)則進(jìn)入阻塞狀態(tài),而非阻塞模式下則奉行能讀到多少就讀到多少的原則->立即返回->
            • 同理寫也是一樣->selector只是通知可寫->但是能寫多少數(shù)據(jù)也是有阻塞和非阻塞決定->如阻塞模式->如果底層網(wǎng)絡(luò)的輸出緩沖區(qū)不能容納r個(gè)字節(jié)則會(huì)進(jìn)入阻塞狀態(tài)->而非阻塞模式下->奉行能輸出多少就輸出多少的原則->立即返回
            • 對(duì)于accept->阻塞模式->沒有client連接時(shí),線程會(huì)一直阻塞下去->而非阻塞時(shí)->沒有客戶端連接->方法立刻返回null->
        • 對(duì)于信號(hào)驅(qū)動(dòng)I/O模型,應(yīng)用進(jìn)程建立SIGIO信號(hào)處理程序后立即返回,非阻塞,數(shù)據(jù)準(zhǔn)備就緒時(shí),生成SIGIO信號(hào)并通過信號(hào)回調(diào)應(yīng)用程序通過recvfrom來讀取數(shù)據(jù),第二個(gè)階段也是阻塞的
        • 而對(duì)于異步I/O模型來說,第二個(gè)階段的時(shí)候內(nèi)核已經(jīng)通知我們數(shù)據(jù)復(fù)制完成了
    • Java NIO的核心類庫多路復(fù)用器Selector就是基于epoll的多路復(fù)用技術(shù)實(shí)現(xiàn)
      • Enhancements in JDK 6 Release
        • A new java.nio.channels.SelectorProvider implementation that is based on the Linux epoll event notification facility is included. The epoll facility is available in the Linux 2.6, and newer, kernels. The new epoll-based SelectorProvider implementation is more scalable than the traditional poll-based SelectorProvider implementation when there are thousands of SelectableChannels registered with a Selector. The new SelectorProvider implementation will be used by default when the 2.6 kernel is detected. The poll-based SelectorProvider will be used when a pre-2.6 kernel is detected.
          • 即JDK6版本中默認(rèn)的SelectorProvider即為epoll(Linux 2.6 kernal)
      • macosx-sun.nio.ch.KQueueSelectorProvider
      • solaris-sun.nio.ch.DevPollSelectorProvider
      • linux
        • 2.6以上版本-sun.nio.ch.EPollSelectorProvider
        • 以下版本-sun.nio.ch.PollSelectorProvider
      • windows-sun.nio.ch.WindowsSelectorProvider
      • Oracle jdk會(huì)自動(dòng)選擇合適的Selector,如果想設(shè)置特定的Selector
        • -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
    • Netty Native transports
      • Since 4.0.16, Netty provides the native socket transport for Linux using JNI. This transport has higher performance and produces less garbage
      • Netty's epoll transport uses epoll edge-triggered while java's nio library uses level-triggered. Beside this the epoll transport expose configuration options that are not present with java's nio like TCPCORK, SOREUSEADDR and more.
        • 即Netty的Linux原生傳輸層使用了epoll邊緣觸發(fā)
        • 而jdk的nio類庫使用的是epoll水平觸發(fā)
      • epoll ET(Edge Triggered) vs LT(Level Triggered)
        • 簡(jiǎn)單來說就是當(dāng)邊緣觸發(fā)時(shí),只有 fd 變成可讀或可寫的那一瞬間才會(huì)返回事件。當(dāng)水平觸發(fā)時(shí),只要 fd 可讀或可寫,一直都會(huì)返回事件
        • 簡(jiǎn)單地說,如果你有數(shù)據(jù)過來了,不去取LT會(huì)一直騷擾你,提醒你去取,而ET就告訴你一次,愛取不取,除非有新數(shù)據(jù)到來,否則不再提醒
        • Nginx大部分event采用epoll EPOLLET(邊沿觸發(fā))的方法來觸發(fā)事件,只有l(wèi)isten端口的讀事件是EPOLLLT(水平觸發(fā)).對(duì)于邊沿觸發(fā),如果出現(xiàn)了可讀事件,必須及時(shí)處理,否則可能會(huì)出現(xiàn)讀事件不再觸發(fā),連接餓死的情況
    • Java7 NIO2
      • implemented using IOCP on Windows
        • WindowsAsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
        • 即nio2在windows的底層實(shí)現(xiàn)是iocp
      • Linux using epoll
        • UnixAsynchronousSocketChannelImpl implements Port.PollableChannel
        • 即nio2在linux 2.6后的底層實(shí)現(xiàn)還是epoll
          • 通過epoll模擬異步
          • 個(gè)人認(rèn)為也許linux內(nèi)核本身的aio實(shí)現(xiàn)方案其實(shí)并不是很完善,或多或少有這樣或者那樣的問題,即使用了aio,也沒有明顯的性能優(yōu)勢(shì)
            • Not faster than NIO (epoll) on unix systems (which is true)
    • Reactor/Proactor

      • 兩種IO設(shè)計(jì)模式
      • Reactor-Dispatcher/Notifier
        • Don't call us, we'll call you
      • Proactor-異步io
      • Reactor通過某種變形,可以將其改裝為Proactor,在某些不支持異步I/O的系統(tǒng)上,也可以隱藏底層的實(shí)現(xiàn),利于編寫跨平臺(tái)代碼
    • 參考

    Java I/O類庫的發(fā)展和改進(jìn)

    • BIO(blocking)
      • 采用BIO通信模型的服務(wù)端,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽客戶端的連接,它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理,處理完成之后,通過輸出流返回應(yīng)答給客戶端,線程銷毀
        • 該模型最大的問題就是缺乏彈性伸縮能力,當(dāng)客戶端并發(fā)訪問量增加后,服務(wù)端的線程個(gè)數(shù)和客戶端并發(fā)訪問數(shù)呈1: 1 的正比關(guān)系,由于線程是Java虛擬機(jī)非常寶貴的系統(tǒng)資源,當(dāng)線程數(shù)膨脹之后,系統(tǒng)的性能將急劇下降,隨著并發(fā)訪問量的繼續(xù)增大,系統(tǒng)會(huì)發(fā)生線程堆棧溢出、創(chuàng)建新線程失敗等問題,并最終導(dǎo)致進(jìn)程宕機(jī)或者值死,不能對(duì)外提供服務(wù)
        • ServerSocket/Socket/輸入輸出流(阻塞)
    • 偽異步I/O
      • 為了改進(jìn)一線程一連接模型,后來又演進(jìn)出了一種通過線程池或者消息隊(duì)列實(shí)現(xiàn)1個(gè)或者多個(gè)線程處理N個(gè)客戶端的模型
      • 采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種叫做偽異步的I/O通信框架
      • 當(dāng)有新的客戶端接入時(shí),將客戶端的Socket封裝成一個(gè)Task,投遞到后端的線程池中進(jìn)行處理,JDK的線程池維護(hù)一個(gè)消息隊(duì)列和N個(gè)活躍線程,對(duì)消息隊(duì)列中的任務(wù)進(jìn)行處理
        • 當(dāng)對(duì)方發(fā)送請(qǐng)求或者應(yīng)答消息比較緩慢,或者網(wǎng)絡(luò)傳輸較慢時(shí),讀取輸入流一方的通信線程將被長時(shí)間阻塞,如果對(duì)方要60s 才能夠?qū)?shù)據(jù)發(fā)送完成,讀取一方的I/O線程也將會(huì)被同步阻塞60s, 在此期間,其他接入消息只能在消息隊(duì)列中排隊(duì)
        • 當(dāng)消息的接收方處理緩慢的時(shí)候,將不能及時(shí)地從TCP緩沖區(qū)讀取數(shù)據(jù),這將會(huì)導(dǎo)致發(fā)送方的TCP window size( 滑動(dòng)窗口)不斷減小,直到為0,雙方處于Keep-Alive狀態(tài),消息發(fā)送方將不能再向TCP緩沖區(qū)寫入消息
    • NIO
      • SocketChannel和ServerSocketChannel,支持阻塞和非阻塞兩種模式
      • Buffer/Channel/Selector(多路復(fù)用器,可同時(shí)輪詢多個(gè)Channel)
        • java.nio.ByteBuffer的幾個(gè)常用方法
          • flip、clear、compact、mark、rewind、hasRemaining、isDirect等
      • 客戶端發(fā)起的連接操作是異步的
      • SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的數(shù)據(jù)它不會(huì)同步等待,直接返回,這樣I/O 通信線程就可以處理其他的鏈路,不需要同步等待這個(gè)鏈路可用
      • JDK的 Selector在 Linux等主流操作系統(tǒng)上通過epoll實(shí)現(xiàn),它沒有連接句柄數(shù)的限制
    • AIO
      • AsynchronousServerSocketChannel、AsynchronousSocketChannel
      • CompletionHandler<V,A>
        • V The result type of the I/O operation
        • A The type of the object attached to the I/O operation
      • 既然已經(jīng)接收客戶端成功了,為什么還要再次調(diào)用accept方法呢?原因是這樣的:調(diào)用AsynchronousServerSocketChannel的accept方法后,如果有新的客戶端連接接入,系統(tǒng)將回調(diào)我們傳入的CompletionHandler實(shí)例的completed方法,表示新的客戶端已經(jīng)接入成功。因?yàn)橐粋€(gè)AsynchronousServerSocketChannel可以接收成千上萬個(gè)客戶端,所以需要繼續(xù)調(diào)用它的accept方法,接收其他的客戶端連接,最終形成一個(gè)循環(huán)。每當(dāng)接收一個(gè)客戶讀連接成功之后,再異步接收新的客戶端連接
    • 不選擇Java原生NIO編程的原因
      • N10的類庫和API繁雜,使用麻煩
      • 需要具備其他的額外技能做鋪墊,例如熟悉Java多線程
      • 可靠性能力補(bǔ)齊,工作景和難度都非常大。例如客戶端面臨斷連重連、網(wǎng)絡(luò)閃斷、半包讀寫、失敗緩存、網(wǎng)絡(luò)擁塞和異常碼流的處理等問題
      • JDK NIO 的 BUG, 例如臭名昭著的epollbug, 它會(huì)導(dǎo)致Selector空輪詢,最終導(dǎo)致CPU100%
    • 為什么選擇Netty
      • 健壯性、功能、性能、可定制性和可擴(kuò)展性在同類框架中都是首屈一指的,它已經(jīng)得到成百上千的商用項(xiàng)目驗(yàn)證
      • API使用簡(jiǎn)單
      • 預(yù)置了多種編解碼功能,支持多種主流協(xié)議
      • 可以通過ChannelHand丨er對(duì)通信框架進(jìn)行靈活地?cái)U(kuò)展
      • 性能高
      • Netty修復(fù)了己經(jīng)發(fā)現(xiàn)的所有JDKNIO BUG
      • 社區(qū)活躍,版本迭代周期短
      • 經(jīng)歷了大規(guī)模的商業(yè)應(yīng)用考驗(yàn),質(zhì)量得到驗(yàn)證

    Netty 入門

    • ServerBootstrap、EventLoopGroup(boss)、EventLoopGroup(worker)、NioServerSocketChannel、ChannelOption、ChannelInitializer、ChannelPipeline、ChannelFuture、ChannelHandlerAdapter、ChannelHandlerContext
    • Bootstrap、NioSocketChannel
    • try/finally、shutdownGracefully(boss、worker)
    • ChannelHandlerContext的 flush方法,它的作用是將消息發(fā)送隊(duì)列中的消息寫入SocketChannel中發(fā)送給對(duì)方.從性能角度考慮,為了防止頻繁地喚醒Selector進(jìn)行消息發(fā)送,Netty的 write方法并不直接將消息寫入SocketChannel中,調(diào)用write方法只是把待發(fā)送的消息放到發(fā)送緩沖數(shù)組中,再通過調(diào)用flush方法,將發(fā)送緩沖區(qū)中的消息全部寫到SocketChannel中
    • 基于Netty開發(fā)的都是非Web的Java應(yīng)用,它的打包形態(tài)非常簡(jiǎn)單,就是一個(gè)普通的.jar 包,通常可以使用Eclipse、Ant、Ivy、Gradle等進(jìn)行構(gòu)建

    TCP 粘包/拆包問題的解決之道

    • TCP是個(gè)“流”協(xié)議,所謂流,就是沒有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的流水,它們是連成一片的,其間并沒有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為 , 一個(gè)完 整的包可能會(huì)被 TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。
    • 由于底層的TCP無法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計(jì)來解決
      • 消息定長,例如每個(gè)報(bào)文的大小為固定長度200字節(jié),如果不夠,空位補(bǔ)空格
      • 在包尾增加回車換行符進(jìn)行分割,例如FTP協(xié)議
      • 將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設(shè)計(jì)思路為消息頭的第一個(gè)字段使用int32來表示消息的總長度
    • 沒有考慮讀半包問題,這在功能測(cè)試時(shí)往往沒有問題,但是一旦壓力上來,或者發(fā)送大報(bào)文之后,就會(huì)存在粘包/拆包問題,如循環(huán)發(fā)送100條消息,則可能會(huì)出現(xiàn)TCP粘包
    • 為了解決TCP粘包/拆包導(dǎo)致的半包讀寫問題,Netty默認(rèn)提供了多種編解碼器用于處理半包
      • LineBasedFrameDecoder
        • A decoder that splits the received {@link ByteBuf}s on line endings
      • StringDecoder
        • A decoder that splits the received {@link ByteBuf}s on line endings

    分隔符和定長解碼器的應(yīng)用

    • DelimiterBasedFrameDecoder
      • A decoder that splits the received {@link ByteBuf}s by one or more delimiters
    • FixedLengthFrameDecoder
      • A decoder that splits the received {@link ByteBuf}s by the fixed number of bytes

    編解碼技術(shù)

    • 基于Java提供的對(duì)象輸入/輸出流ObjectlnputStream和 ObjectOutputStream,可以直接把Java對(duì)象作為可存儲(chǔ)的字節(jié)數(shù)組寫入文件 ,也可以傳輸?shù)骄W(wǎng)絡(luò)上,Java序列化的目的:
      • 網(wǎng)絡(luò)傳輸
      • 對(duì)象持久化
    • Java序列化的缺點(diǎn)
      • 無法跨語言
      • 序列化后的碼流太大
        • 對(duì)于字符串
          • byte[] value = this.userName.getBytes();
          • buffer.putInt(value.length);
          • buffer.put(value);
      • 序列化性能太低
    • 業(yè)界主流的編解碼框架
      • Google的Protobuf
      • Facebook的Thrift
      • JBoss Marshalling
        • JBoss Marshalling是一個(gè)Java對(duì)象的序列化API包,修正了 JDK自帶的序列化包的很多問題,但又保持跟java.io.Serializable接口的兼容

    MessagePack編解碼

    • MessagePack介紹
      • It's like JSON. but fast and small
      • MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller
      • http://msgpack.org/
      • 提供了對(duì)多語言的支持
    • API介紹

      // Create serialize objects.
           List<String> src = new ArrayList<String>();
           src. add (,,msgpackw);
           src.add("kumofs");
           src.add("viver">;
           MessagePack msgpack = new MessagePack();
           // Serialize
           byte[] raw = msgpack.write(src);
           // Deserialize directly using a template
           L±8t<String> dstl = msgpack. read (raw, Ten 5 >lates . tList (Ten^>lates. TString));
           
    • MessagePack編碼器和解碼器開發(fā)

      • MessageToByteEncoder<I
      • ByteToMessageDecoder、MessageToMessageDecoder<I
      • LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
        • A decoder that splits the received {@link ByteBuf}s dynamically by the value of the length field in the message
        • public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast)
        • 功能很強(qiáng)大,可指定消息長度字段的偏移等,而不僅僅是消息頭的第一個(gè)字段就是長度
      • LengthFieldPrepender extends MessageToMessageEncoder
        • An encoder that prepends the length of the message.
        • 可自動(dòng)前面加上消息長度字段

    Google Protobuf 編解碼

    • 主要使用了netty默認(rèn)提供的關(guān)于protobuf的編解碼器
      • ProtobufVarint32FrameDecoder extends ByteToMessageDecoder
        • A decoder that splits the received {@link ByteBuf}s dynamically by the value of the Google Protocol Buffers
      • ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder
        • An encoder that prepends the the Google Protocol Buffers
      • ProtobufEncoder extends MessageToMessageEncoder
        • Encodes the requested Google Protocol Buffers Message And MessageLite into a {@link ByteBuf}
      • ProtobufDecoder extends MessageToMessageDecoder
        • Decodes a received {@link ByteBuf} into a Google Protocol Buffers Message And MessageLite
        • 注意其構(gòu)造函數(shù)要傳一個(gè)MessageLite對(duì)象,即協(xié)議類型,用來反序列化

     BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
    +--------+---------------+ +---------------+
    | Length | Protobuf Data |----->| Protobuf Data |
    | 0xAC02 | (300 bytes) | | (300 bytes) |
    +--------+---------------+ +---------------+
    BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
    +---------------+ +--------+---------------+
    | Protobuf Data |-------------->| Length | Protobuf Data |
    | (300 bytes) | | 0xAC02 | (300 bytes) |
    +---------------+ +--------+---------------+
    
    • Protobuf的使用注意事項(xiàng)
      • ProtobufDecoder僅僅負(fù)責(zé)解碼,它不支持讀半包。因此,在 ProtobufDecoder前面, 一定要有能夠處理讀半包的解碼器
        • 使用Netty提供的ProtobufVarint32FrameDecoder,它可以處理半包消息
        • 繼承Netty提供的通用半包解碼器LengthFieldBasedFrameDecoder
        • 繼承ByteToMessageDecoder類,自己處理半包消息

    JBoss Marshalling 編解碼

    • JBoss的Marshalling完全兼容JDK序列化
    • MarshallingDecoder extends LengthFieldBasedFrameDecoder
      • Decoder which MUST be used with {@link MarshallingEncoder}
      • 需要傳入U(xiǎn)nmarshallerProvider和maxObjectSize
    • MarshallingEncoder extends MessageToByteEncoder
      • {@link MessageToByteEncoder} implementation which uses JBoss Marshalling to marshal an Object
      • 需要傳入MarshallerProvider
    • Netty的Marshalling編解碼器支持半包和粘包的處理,對(duì)于開發(fā)者而言,只需要正確地將 Marshalling編碼器和解碼器加入到ChannelPipeline 中,就能實(shí)現(xiàn)對(duì)Marshalling序列化的支持
    • HTTP協(xié)議開發(fā)應(yīng)用

      • HTTP請(qǐng)求消息(HttpRequest)
        • HTTP請(qǐng)求行
        • HTTP消息頭
        • HTTP請(qǐng)求正文
      • HTTP響應(yīng)消息(HttpResponse)
        • 狀態(tài)行、消息報(bào)頭、響應(yīng)正文
      • Netty HTTP文件服務(wù)器
        • HttpRequestDecoder
        • HttpObjectAggregator
        • HttpResponseEncoder
        • ChunkedWriteHandler、ChunkedFile
        • FullHttpRequest、FullHttpResponse、DefaultFullHttpResponse
      • Netty HTTP+ XML協(xié)議棧開發(fā)
        • 很多基于HTTP的應(yīng)用都是后臺(tái)應(yīng)用,HTTP僅僅是承載數(shù)據(jù)交換的一個(gè)通道,是一個(gè)載體而不是Web容器
        • JiBX是一款非常優(yōu)秀的XML (Extensible Markup Language) 數(shù)據(jù)綁定框架
          • JiBX is a tool for binding XML data to Java objects
          • Unmarshal是將XML文件轉(zhuǎn)換成Java對(duì)象,而 Marshal則是將Java對(duì)象編排成規(guī)范的XML文件
          • xpp-XML Pull Parsing
        • 過程
          • 構(gòu)造請(qǐng)求消息HttpXmlRequest(封裝一個(gè)FullHttpRequest和一個(gè)Object)
          • 定義請(qǐng)求消息編碼器HttpXmlRequestEncoder,對(duì)HttpXmlRequest進(jìn)行編碼
            • 對(duì)請(qǐng)求消息中的業(yè)務(wù)object通過jibx序列化為xml字符串,隨后將它封裝成Netty的 ByteBuf
            • 構(gòu)造HTTP消息頭-HttpHeaders/DefaultFullHttpRequest
            • 請(qǐng)求消息消息體不為空,也沒有使用Chunk方式,所以在HTTP消息頭中設(shè)置消息體的長度Content-Length
            • 后續(xù)Netty的 HTTP請(qǐng)求編碼器繼續(xù)對(duì)HTTP請(qǐng)求消息進(jìn)行編碼
          • 定義請(qǐng)求消息解碼器HttpXmlRequestDecoder
            • 從HTTP消息體中獲取請(qǐng)求碼流,通過JiBx框架對(duì)它進(jìn)行反序列化(FullHttpRequest#content),得到請(qǐng)求object對(duì)象,并封裝為HttpXmlRequest
            • 回調(diào)業(yè)務(wù)handler, 業(yè)務(wù)得到的就是解碼后的POJO對(duì)象和HTTP消息頭
            • 注意-decoder有一個(gè)參數(shù)是業(yè)務(wù)obj的clazz對(duì)象
          • 同理封裝一個(gè)應(yīng)答消息HttpXmlResponse(封裝一個(gè)FullHttpResponse和一個(gè)Obect)
          • 定義應(yīng)答消息編碼器HttpXmlResponseEncoder
            • 同上,對(duì)應(yīng)答消息中的object通過jibx序列化xml字符串并轉(zhuǎn)為ByteBuf
            • 構(gòu)造HTTP應(yīng)答消息FullHttpResponse,這里注意因?yàn)?Netty的 DefaultFullHttpResponse沒有提供動(dòng)態(tài)設(shè)置消息體content的接口,只能在第一次構(gòu)造的時(shí)候設(shè)置內(nèi)容,同上也需要設(shè)置Content-Length,Content-Type為text/xml
          • 定義應(yīng)答消息解碼器HttpXmlResponseDecoder
            • DefaultFullHttpResponse和HTTP應(yīng)答消息反序列化后的object對(duì)象構(gòu)造HttpXmlResponse(DefaultFullHttpResponse#content)
        • 流程
          • 客戶端
            • HttpResponseDecoder、HttpObjectAggregator、HttpXmlResponseDecoder(傳入object clazz)、HttpRequestEncoder、HttpXmlRequestEncoder
            • HttpXmlClientHandle
              • channelActive時(shí)構(gòu)造HttpXmlRequest
              • messageReceived中直接得到HttpXmlResponse
          • 服務(wù)端
            • HttpRequestDecoder、HttpObjectAggregator、HttpXmlRequestDecoder(傳入object clazz)、HttpResponseEncoder、HttpXmlResponseEncoder
            • HttpXmlServerHandler
              • messageReceived中直接得到HttpXmlRequest進(jìn)行業(yè)務(wù)處理,然后發(fā)送HttpXmlResponse,如果非keepAlive模式,則發(fā)送完畢候關(guān)閉鏈接(通過在ChannelFuture加一個(gè)Listener監(jiān)聽)

      WebSocket協(xié)議開發(fā)

      • WebSocket是 HTML5 開始提供的一種瀏覽器與服務(wù)器間進(jìn)行全雙工通信的網(wǎng)絡(luò)技術(shù)
      • 在 WebSocketAPI中,瀏覽器和服務(wù)器只需要做個(gè)握手的動(dòng)作,然后,瀏覽器和服務(wù)器之間就形成了一條快速通道,兩者就可以直接互相傳送數(shù)據(jù)了。WebSocket基于TCP雙向全雙工進(jìn)行消息傳遞,在同一時(shí)刻,既可以發(fā)送消息,也可以接收消息,相比HTTP的半雙工協(xié)議,性能得到很大提升
      • WebSocket設(shè)計(jì)出來的目的就是要取代輪詢和Comet技術(shù),使客戶端瀏覽器具備像 C/S 架構(gòu)下桌面系統(tǒng)一樣的實(shí)時(shí)通信能力
      • 為了建立一個(gè)WebSocket連接,客戶端瀏覽器首先要向服務(wù)器發(fā)起一個(gè)HTTP請(qǐng)求,這個(gè)請(qǐng)求和通常的HTTP請(qǐng)求不同.包含了一些附加頭信息,其中附加頭信息“Upgrade:WebSocket”表明這是個(gè)申請(qǐng)協(xié)議升級(jí)的HTTP請(qǐng)求。服務(wù)器端解析這些附加的頭信息,然后生成應(yīng)答信息返回給客戶端,客戶端和服務(wù)器端的WebSocket連接就建立起來了,雙方可以通過這個(gè)連接通道自由地傳速信息,并且這個(gè)連接會(huì)持續(xù)存在直到客戶端或者服務(wù)器端的某一方主動(dòng)關(guān)閉連接
      • 握手成功之后,服務(wù)端和客戶端就可以通過“messages”的方式進(jìn)行通信了,一個(gè)消息由一個(gè)或者多個(gè)幀組成
      • Netty WebSocket 協(xié)議開發(fā)
        • Netty內(nèi)置了WebSocket協(xié)議相關(guān)的api
        • WebSocketServer
          • HttpServerCodec、HttpObjectAggregator、ChunkedWriteHandler
          • WebSocketServerHandler
            • messageReceived,第一次握手請(qǐng)求消息由HTTP協(xié)議承載,所以它是一個(gè)HTTP消息,執(zhí)行handleHttpRequest方法來處理WebSocket握手請(qǐng)求,對(duì)握手請(qǐng)求消息進(jìn)行判斷,如果消息頭中沒有包含Upgrade字段或者它的值不是websocket, 則返回HTTP 400響應(yīng)
            • 握手請(qǐng)求簡(jiǎn)單校驗(yàn)通過之后,開始構(gòu)造握手工廠,創(chuàng)建握手處理類WebSocketServerHandshaker, 通過它構(gòu)造握手響應(yīng)消息返回給客戶端,同時(shí)將WebSocket相關(guān)的編碼和解碼類動(dòng)態(tài)添加到ChannelPipeline中,用于WebSocket消息的編解碼(WebSocketServerHandshaker#handshake)
            • 添加WebSocket Encoder和 WebSocket Decoder之后,服務(wù)端就可以自動(dòng)對(duì)WebSocket
            • 消息進(jìn)行編解碼了,后面的業(yè)務(wù)handler可以直接對(duì)WebSocket對(duì)象進(jìn)行操作(WebSocketFrame)
            • 直接對(duì)控制幀進(jìn)行判斷并返回應(yīng)答消息
        • 而客戶端則是嵌套在html中,由js進(jìn)行websocket的相關(guān)接口開發(fā)

      私有協(xié)議棧開發(fā)

      • 在傳統(tǒng)的Java應(yīng)用中,通常使用以下4 種方式進(jìn)行跨節(jié)點(diǎn)通信
        • 通過RM1進(jìn)行遠(yuǎn)程服務(wù)調(diào)用
        • 通過Java的 Socket+Java序列化的方式進(jìn)行跨節(jié)點(diǎn)調(diào)用
        • 利用一些開源的RPC框架進(jìn)行遠(yuǎn)程服務(wù)調(diào)用,例如Facebook的Thrift、 Apache 的Avro等
        • 利用標(biāo)準(zhǔn)的公有協(xié)議進(jìn)行跨節(jié)點(diǎn)服務(wù)調(diào)用,例如HTTP+XML、RESTful+JSON或 者 WebService
      • 跨節(jié)點(diǎn)的遠(yuǎn)程服務(wù)調(diào)用,除了鏈路層的物理連接外,還需要對(duì)請(qǐng)求和響應(yīng)消息進(jìn)行編解碼。在請(qǐng)求和應(yīng)答消息本身以外,也需要攜帶一些其他控制和管理類指令,例如鏈路建立的握手請(qǐng)求和響應(yīng)消息、鏈路檢測(cè)的心跳消息等。當(dāng)這些功能組合到一起之后,就會(huì)形成私有協(xié)議
      • Netty協(xié)議棧功能設(shè)計(jì)
        • Netty協(xié)議棧用于內(nèi)部各模塊之間的通信,它基于TCP/IP協(xié)議棧,是一個(gè)類HTTP協(xié)議的應(yīng)用層協(xié)議棧
        • 在分布式組網(wǎng)環(huán)境下,每個(gè)Netty節(jié) 點(diǎn) (Netty進(jìn)程)之間建立長連接,使用Netty協(xié)議進(jìn)行通信。Netty節(jié)點(diǎn)并沒有服務(wù)端和客戶端的區(qū)分,誰首先發(fā)起連接,誰就作為客戶端,另一方自然就成為服務(wù)端。一個(gè)Netty節(jié)點(diǎn)既可以作為客戶端連接另外的Netty節(jié)點(diǎn),也可以作為Netty服務(wù)端被其他Netty節(jié)點(diǎn)連接
      • 協(xié)議棧功能描述
        • 承載了業(yè)務(wù)內(nèi)部各模塊之間的消息交互和服務(wù)調(diào)用
        • 基于Netty的NIO通信框架,提供髙性能的異步通信能力
        • 提供消息的編解碼框架,可以實(shí)現(xiàn)POJO的序列化和反序列化
        • 提供基于IP地址的白名申.接入認(rèn)證機(jī)制
        • 鏈路的有效性校驗(yàn)機(jī)制
        • 鏈路的斷連重連機(jī)制
      • 通信模型
        • Netty協(xié)議棧客戶端發(fā)送握手請(qǐng)求消息,攜帶節(jié)點(diǎn)ID等有效身份認(rèn)證信息
        • Netty協(xié)議棧服務(wù)端對(duì)握手請(qǐng)求消息進(jìn)行合法性校驗(yàn),包括節(jié)點(diǎn)ID有效性校驗(yàn)、節(jié)點(diǎn)重復(fù)登錄校驗(yàn)和IP地址合法性校驗(yàn),校驗(yàn)通過后,返回登錄成功的握手應(yīng)答消息
        • 鏈路建立成功之后,客戶端發(fā)送業(yè)務(wù)消息
        • 鏈路成功之后,服務(wù)端發(fā)送心跳消息
        • 鏈路建立成功之后,客戶端發(fā)送心跳消息
        • 鏈路建立成功之后,服務(wù)端發(fā)送業(yè)務(wù)消息
        • 服務(wù)端退出時(shí),服務(wù)端關(guān)閉連接,客戶端感知對(duì)方關(guān)閉連接后,被動(dòng)關(guān)閉客戶端連接
        • Netty協(xié)議通信雙方鏈路建立成功之后,雙方可以進(jìn)行全雙工通信,無論客戶端還是服務(wù)端,都可以主動(dòng)發(fā)送請(qǐng)求消息給對(duì)方,通信方式可以是TWOWAY或者ONE WAY, 雙方之間的心跳采用Ping-Pong機(jī)制,當(dāng)鏈路處于空閑狀態(tài)時(shí),客戶端主動(dòng)發(fā)送Ping消息給服務(wù)端,服務(wù)端接收到Ping消息后發(fā)送應(yīng)答消息Pong給客戶端,如果客戶端連續(xù)發(fā)送N 條 Ping消息都沒有接收到服務(wù)端返回的Pong消息,說明鏈路已經(jīng)掛死或者對(duì)方處于異常狀態(tài),客戶端主動(dòng)關(guān)閉連接,間隔周期T 后發(fā)起重連操作,直到重連成功
      • 消息定義
        • 消息頭
          • crcCode int 32位 消息校驗(yàn)碼 = OxABEF(2字節(jié)) + 主版本號(hào)(1字節(jié)) + 次版本號(hào)(1字節(jié))
          • length int 32位 消息長度= 消息頭的長度+消息體長度
          • sessionID long 64位 集群節(jié)點(diǎn)全局唯一id
          • type byte 8位 消息類型(包括握手請(qǐng)求、應(yīng)答、心跳請(qǐng)求、應(yīng)答等)
          • priority byte 8位 消息優(yōu)先級(jí)
          • attachment Map<String,Object> 變長 可選字段,用于擴(kuò)展消息頭
        • 消息體
          • Object 變長
      • 鏈路的建立
        • 考慮到安全,鏈路建立需要通過基于IP 地址或者號(hào)段的黑白名單安全認(rèn)證機(jī)制,在實(shí)際商用項(xiàng)目中,安全認(rèn)證機(jī)制會(huì)更加嚴(yán)格,如通過密鑰對(duì)用戶名和密碼進(jìn)行安全認(rèn)證
        • 客戶端與服務(wù)端鏈路建立成功之后,由客戶端發(fā)送握手請(qǐng)求消息
        • 服務(wù)端接收到客戶端的握手請(qǐng)求消息之后,如 果 IP 校驗(yàn)通過,返回握手成功應(yīng)答消息給客戶端,應(yīng)用層鏈路建立成功
      • 鏈路的關(guān)閉
        • 當(dāng)對(duì)方宕機(jī)或者重啟時(shí),會(huì)主動(dòng)關(guān)閉鏈路,另一方讀取到操作系統(tǒng)的通知信號(hào),得知對(duì)方REST鏈路,需要關(guān)閉連接,釋放自身的句柄等資源
          • proc/sys/net/ipv4/tcp_retries2
        • 消息讀寫過程中,發(fā)生了 I/O 異常,需要主動(dòng)關(guān)閉連接
        • 心跳消息讀寫過程中發(fā)生了 I/O 異常,需要主動(dòng)關(guān)閉連接
        • 心跳超時(shí),需要主動(dòng)關(guān)閉連接
        • 發(fā)生編碼異常等不可恢復(fù)錯(cuò)誤時(shí),需要主動(dòng)關(guān)閉連接
      • 可靠性設(shè)計(jì)
        • Netty協(xié)議棧可能會(huì)運(yùn)行在非常惡劣的網(wǎng)絡(luò)環(huán)境中,網(wǎng)絡(luò)超時(shí)、閃斷、對(duì)方進(jìn)程僵死或者處理緩慢等情況都有可能發(fā)生
        • 心跳機(jī)制
          • 在網(wǎng)絡(luò)空閑時(shí)采用心跳機(jī)制來檢測(cè)鏈路的互通性, 一旦發(fā)現(xiàn)網(wǎng)絡(luò)故障,立即關(guān)閉鏈路,主動(dòng)重連
          • 當(dāng)網(wǎng)絡(luò)處于空閑狀態(tài)持續(xù)時(shí)間達(dá)到T(連續(xù)周期T沒有讀寫消息)時(shí),客戶端主動(dòng)發(fā)送Ping心跳消息給服務(wù)端
          • 如果在下一個(gè)周期T到來時(shí)客戶端沒有收到對(duì)方發(fā)送的Pong心眺應(yīng)答消息或者讀取到服務(wù)端發(fā)送的其他業(yè)務(wù)消息,則心跳失敗計(jì)數(shù)器加1
          • 每當(dāng)客戶端接收到服務(wù)的業(yè)務(wù)消息或者Pong應(yīng)答消息時(shí),將心跳失敗計(jì)數(shù)器清零:連續(xù)N次沒有接收到服務(wù)端的Pong消息或者業(yè)務(wù)消息,則關(guān)閉鏈路,間隔INTERVAL時(shí)間后發(fā)起重連操作
          • 服務(wù)端網(wǎng)絡(luò)空閑狀態(tài)持續(xù)時(shí)間達(dá)到T后,服務(wù)端將心跳失敗計(jì)數(shù)器加1;只要接收到客戶端發(fā)送的Ping消息或者其他業(yè)務(wù)消息,計(jì)數(shù)器清零
          • 服務(wù)端連續(xù)N次沒有接收到客戶端的Ping消息或者其他業(yè)務(wù)消息,則關(guān)閉鏈路,釋放資源,等待客戶端重連
          • 通過Ping-Pong雙向心跳機(jī)制,可以保證無論通信哪一方出現(xiàn)網(wǎng)絡(luò)故障,都能被及時(shí)地檢測(cè)出來。為了防止由于對(duì)方短時(shí)間內(nèi)繁忙沒有及時(shí)返回應(yīng)答造成的誤判,只有連續(xù)N次心跳檢測(cè)都失敗才認(rèn)定鏈路己經(jīng)損害,需要關(guān)閉鏈路并重建鏈路
          • 當(dāng)讀或者寫心跳消息發(fā)生I/O異常的時(shí)候,說明鏈路己經(jīng)中斷,此時(shí)需要立即關(guān)閉鏈路,如果是客戶端,需要重新發(fā)起連接。如果是服務(wù)端,需要清空緩存的半包信息,等待客戶端重連
            • 之前的項(xiàng)目中則直接是客戶端每30s向服務(wù)器發(fā)送一個(gè)ping消息同時(shí)服務(wù)器返回一個(gè)ping消息;如果30s左右(網(wǎng)絡(luò)延遲),鏈路空閑則斷開鏈接;偌一方出現(xiàn)宕機(jī)則30s后可直接檢測(cè)數(shù)來--但相對(duì)比而言,作者的方法更為嚴(yán)謹(jǐn)一些
        • 重連機(jī)制
          • 如果鏈路中斷,等待INTERVAL時(shí)間后,由客戶端發(fā)起重連操作,如果重連失敗,間隔周期INTERVAL后再次發(fā)起重連,直到重連成功
          • 為了保證服務(wù)端能夠有充足的時(shí)間釋放句柄資源,在首次斷連時(shí)客戶端需要等待INTERVAL時(shí)間之后再發(fā)起重連,而不是失敗后就立即重連
          • 為了保證句柄資源能夠及時(shí)釋放,無論什么場(chǎng)景下的重連失敗,客戶端都必須保證自身的資源被及時(shí)釋放
          • 重連失敗后,需要打印異常堆棧信息,方便后續(xù)的問題定位
        • 重復(fù)登錄保護(hù)
          • 當(dāng)客戶端握手成功之后,在鏈路處于正常狀態(tài)下,不允許客戶端重復(fù)登錄,以防止客戶端在異常狀態(tài)下反復(fù)重連導(dǎo)致句柄資源被耗盡
          • 緩存客戶端的地址列表,通過該列表檢查客戶端是否已登陸
          • 當(dāng)服務(wù)端連續(xù)N次心跳超時(shí)之后需要主動(dòng)關(guān)閉鏈路,清空該客戶端的地址緩存信息,以保證后續(xù)該客戶端可以重連成功,防止被重復(fù)登錄保護(hù)機(jī)制拒絕掉
            • 猜測(cè)是可能會(huì)出現(xiàn)類似客戶端認(rèn)為舊鏈接已經(jīng)logout了,嘗試重新登陸;但是服務(wù)器認(rèn)為舊的鏈路還在(如客戶端宕機(jī))等;所以還是需要心跳機(jī)制輔助
        • 消息緩存重發(fā)
          • 無論客戶端還是服務(wù)端,當(dāng)發(fā)生鏈路中斷之后,在鏈路恢復(fù)之前,緩存在消息隊(duì)列中待發(fā)送的消息不能丟失,等鏈路恢復(fù)之后,重新發(fā)送這些消息,保證鏈路中斷期間消息不丟失
          • 考慮到內(nèi)存溢出的風(fēng)險(xiǎn),建議消息緩存隊(duì)列設(shè)置上限,當(dāng)達(dá)到上限之后,應(yīng)該拒絕繼續(xù)向該隊(duì)列添加新的消息
        • 安全性設(shè)計(jì)
          • 為了保證整個(gè)集群環(huán)境的安全,內(nèi)部長連接采用基于IP 地址的安全認(rèn)證機(jī)制,服務(wù)端對(duì)握手請(qǐng)求消息的IP 地址進(jìn)行合法性校驗(yàn)-白名單
          • 如果將Netty協(xié)議棧放到公網(wǎng)中使用,需要采用更加嚴(yán)格的安全認(rèn)證機(jī)制,例如基于密鑰和AES加密的用戶名+密碼認(rèn)證機(jī)制,也可以采用SSL/TSL安全傳輸
        • 可擴(kuò)展性設(shè)計(jì)
          • 通過Netty消息頭中的可選附件attachment字段,業(yè)務(wù)可以方便地進(jìn)行自定義擴(kuò)展
          • Netty協(xié)議棧架構(gòu)需要具備一定的擴(kuò)展能力,例如統(tǒng)一的消息攔截、接口日志、安全、加解密等可以被方便地添加和刪除,不需要修改之前的邏輯代碼,類 似 Servlet的 FilterChain和 AOP, 但考慮到性能因素,不推薦通過AOP來實(shí)現(xiàn)功能的擴(kuò)展
            • 直接通過ChannelPipeline即可
      • 開發(fā)過程
        • 定義協(xié)議消息,NettyMessage,包括消息頭Header和消息體Object
          • 消息頭則包括之前說的crcCode、length等字段
        • 定義編解碼類
          • NettyMessageEncoder extends MessageToByteEncoder
            • 編碼header
            • 對(duì)于header中的attachment,則首先寫入map的長度,然后遍歷map,寫key的長度,寫key的bytes(String),最后通過JBoss Marshalling對(duì)value進(jìn)行編碼
              • 注意在作者的這個(gè)示例中,自己手動(dòng)寫的MarshallingEncoder,其實(shí)netty已經(jīng)提供了MarshallingEncoder(參數(shù)不同),通過源代碼可以看到,encode核心代碼部分基本相同
            • 編碼消息體
            • 最后將消息長度重寫
          • NettyMessageDecoder extends LengthFieldBasedFrameDecoder
            • 這里用到了Netty的LengthFieldBasedFrameDecoder解碼器,它支持自動(dòng)的TCP粘包和半包處理,只需要給出標(biāo)識(shí)消息長度的字段偏移量和消息長度自身所占的字節(jié)數(shù),netty就能自動(dòng)實(shí)現(xiàn)對(duì)半包的處理
            • 解碼過程和編碼過程恰恰相反,這里不再詳述
          • 握手和安全認(rèn)證
            • LoginAuthReqHandler,在通道激活時(shí)發(fā)起握手請(qǐng)求
            • 客戶端握手請(qǐng)求發(fā)送之后,按照協(xié)議規(guī)范,服務(wù)端需要返回握手應(yīng)答消息(channelRead中),首先判斷消息是否是握手應(yīng)答消息,如果不是,直接透?jìng)鹘o后面的 ChannelHandler 進(jìn)行處理;如果是握手應(yīng)答消息,則對(duì)應(yīng)答結(jié)果進(jìn)行判斷,如果非0 , 說明認(rèn)證失敗,關(guān)閉鏈路,重新發(fā)起連接
            • LoginAuthRespHandler#channelRead,對(duì)重復(fù)登陸進(jìn)行判斷,然后白名單判斷,如果均校驗(yàn)成功則構(gòu)造握手應(yīng)答消息
            • 當(dāng)發(fā)生異常關(guān)閉鏈路的時(shí)候需要將客戶端的信息從登錄注冊(cè)表中刪除,以保證后續(xù)客戶端可以重連成功
        • 心跳檢測(cè)機(jī)制
          • 握手成功之后,由客戶端主動(dòng)發(fā)送心跳消息,服務(wù)端接收到心跳消息之后,返回心跳應(yīng)答消息
          • 當(dāng)握手成功之后,握手請(qǐng)求Handler會(huì)繼續(xù)將握手成功消息向下透?jìng)髦罤eartBeatReqHandler,接收到之后對(duì)消息進(jìn)行判斷,如果是握手成功消息,則啟動(dòng)無限循環(huán) 定時(shí)器用于定期發(fā)送心跳消息。由于NioEventLoop是一個(gè)Schedule, 因此它支持定時(shí)器的執(zhí)行,如每5s發(fā)送一條心跳消息
          • 而服務(wù)器的HeartBeatRespHandler則比較簡(jiǎn)單,接收到心跳請(qǐng)求消息之后,構(gòu)造心跳應(yīng)答消息返回即可'
          • 心跳超時(shí)的實(shí)現(xiàn)非常簡(jiǎn)單,直接利用Netty的 ReadTimeoutHandler機(jī)制,當(dāng)一定周期內(nèi) (默認(rèn)值50s) 沒有讀取到對(duì)方任何消息時(shí),需要主動(dòng)關(guān)閉鏈路。如果是客戶端,重新發(fā)起連接;如果是服務(wù)端,釋放資源,清除客戶端登錄緩存信息,等待服務(wù)端重連
            • 可參考ReadTimeoutHandler#readTimedOut
          • 斷線重連
            • 當(dāng)客戶端感知斷連事件之后,釋放資源,重新發(fā)起連接
            • try塊代碼最后一行是future.channel().closeFuture().sync(),線程等待鏈接關(guān)閉
            • 當(dāng)關(guān)閉后執(zhí)行finally塊代碼,嘗試重連
        • 客戶端代碼
          • NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthReqHandler、HeartBeatReqHandler
          • 這次我們綁定了本地端口,主要用于服務(wù)端重復(fù)登錄保護(hù),另外,從產(chǎn)品管理角度看,一般情況下不允許系統(tǒng)隨便使用隨機(jī)端口
          • 利用Netty的 ChannelPipeline和 Channe丨Handler機(jī)制,可以非常方便地實(shí)現(xiàn)功能解耦 和業(yè)務(wù)產(chǎn)品的定制。例如本例程中的心跳定時(shí)器、握手請(qǐng)求和后端的業(yè)務(wù)處理可以通過不同的Handler來實(shí)現(xiàn),類似于AOP。通過Handler Chain的機(jī)制可以方便地實(shí)現(xiàn)切面攔截和定制,相比于AOP它的性能更高
        • 服務(wù)端代碼
          • NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthRespHandler、HeartBeatRespHandler
          • 客戶端宕機(jī)重啟之后,服務(wù)端需要能夠清除緩存信息,允許客戶端重新登錄

      protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
           if (!closed) {
           ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
           ctx.close();
           closed = true;
           }
           }
           

      • 總結(jié)
        • 當(dāng)鏈路斷連的時(shí)候,已經(jīng)放入發(fā)送隊(duì)列中的消息不能丟失,更加通用的做法是提供通知機(jī)制,將發(fā)送失敗的消息通知給業(yè)務(wù)測(cè),由業(yè)務(wù)做決定:是丟棄還是緩存重發(fā)
      • 關(guān)于編解碼的一些測(cè)試
        • sendBuf.setInt(4, sendBuf.readableBytes() - 8)
          • 這個(gè)是編碼最后將長度值寫入了,這個(gè)為啥-8呢
        • NettyMessageDecoder(1024 * 1024, 4, 4))
          • 可以看到解碼傳入的LengthFieldBasedFrameDecoder參數(shù)分別是4,4,兩個(gè)4分別代碼lengthFieldOffset和lengthFieldLength,即長度字段的偏移和長度字段的表示長度
        • 從下面的源碼注釋來看,這個(gè)長度其實(shí)是指長度字段后的所有字節(jié)的長度,從解碼角度來說也是如此,從源碼解讀亦如此,請(qǐng)注意這個(gè)問題
        • 所以建議好好看源代碼,源代碼才是王道

      lengthFieldOffset = 2
           lengthFieldLength = 3
           lengthAdjustment = 0
           initialBytesToStrip = 0
           BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
           +--------+--------+----------------+ +----------+----------+--------------
           |Header 1| Length |Actual Content |----->| Header 1 | Length | Actual Content |
           |0xCAFE | 0x00000C| "HELLO, WORLD"| | 0xCAFE | 0x00000C | "HELLO, WORLD"
           

      服務(wù)端創(chuàng)建

      • Netty服務(wù)端創(chuàng)建關(guān)鍵步驟
        • 創(chuàng)建ServerBootstrap實(shí)例。ServerBootstrap是 Netty服務(wù)端的啟動(dòng)輔助類,它提供了一系列的方法用于設(shè)置服務(wù)端啟動(dòng)相關(guān)的參數(shù)-門面模式對(duì)各種能力進(jìn)行抽象和封裝
          • 引入Builder模式,因?yàn)閰?shù)太多
        • 設(shè)置并綁定Reactor線程池。Netty的 Reactor線程池是EventLoopGroup,它實(shí)際就是 EventLoop的數(shù)組。EventLoop的職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器Selector上的Channel,Selector的輪詢操作由綁定的EventLoop線程run方法驅(qū)動(dòng),在一個(gè)循環(huán)體內(nèi)循環(huán)執(zhí)行.
          • EventLoop的職責(zé)不僅僅是處理網(wǎng)絡(luò)I/O 事件,用戶自定義的Task和定時(shí)任務(wù)Task也 統(tǒng) 由 EventLoop負(fù)責(zé)處理,這樣線程模型就實(shí)現(xiàn)了統(tǒng)一
          • 從調(diào)度層面看,也不存在從EventLoop線程中再啟動(dòng)其他類型的線程用于異步執(zhí)行另外的任務(wù),這樣就避免了多線程并發(fā)操作和鎖競(jìng)爭(zhēng),提升了I/O 線程的處理和調(diào)度性能
        • 設(shè)置并綁定服務(wù)端Channel。作為NIO服務(wù)端,需要?jiǎng)?chuàng)建ServerSocketChannel,Netty對(duì)原生的NIO類庫進(jìn)行了封裝,對(duì)應(yīng)實(shí)現(xiàn)是NioServerSocketChannel
          • Netty通過工廠類,利用反射創(chuàng)建NioServerSocketChannel對(duì)象。由于服務(wù)端監(jiān)聽端口往往只需要在系統(tǒng)啟動(dòng)時(shí)才會(huì)調(diào)用,因此反射對(duì)性能的影響并不大
          • ServerBootstrapChannelFactory
        • 鏈路建立的時(shí)候創(chuàng)建并初始化ChannelPipeline。ChannelPipeline并不是NIO服務(wù)端必需的,它本質(zhì)就是一個(gè)負(fù)責(zé)處理網(wǎng)絡(luò)事件的職責(zé)鏈,負(fù)責(zé)管理和執(zhí)行ChannelHandler。網(wǎng)絡(luò)事件以事件流的形式在ChannelPipeline中流轉(zhuǎn)。典型的網(wǎng)絡(luò)事件如下:
          • 鏈路注冊(cè)、鏈路激活、鏈路斷開、接收到請(qǐng)求消息、請(qǐng)求消息接收并處理完畢、發(fā)送應(yīng)答消息、鏈路發(fā)生異常、發(fā)生用戶自定義事件
        • 初始化 ChannelPipeline 完成之后,添加并設(shè)置 ChannelHandler是 Netty提供給用戶定制和擴(kuò)展的關(guān)鍵接口。利用ChannelHandler用戶可以完成大多數(shù)的功能定制,例如消息編解碼、心跳、安全認(rèn)證、TSL/SSL認(rèn)證、流量控制和流景整形等。
          • Netty同時(shí)也提供了大量的系統(tǒng)ChannelHandler供用戶使用 ,比較實(shí)用的系統(tǒng)ChannelHandler如:
            • ByteToMessageCodec、LengthFieldBasedFrameDecoder、 LoggingHandler、SslHandler、 IdleStateHandler、 ChannelTrafficShapingHandler、Base64Decoder 和 Base64Encoder
        • 綁定并啟動(dòng)監(jiān)聽端口。在綁定監(jiān)聽端口之前系統(tǒng)會(huì)做一系列的初始化和檢測(cè)工作,完成之后,會(huì)啟動(dòng)監(jiān)聽端口,并將ServerSocketChannel注冊(cè)到Selector上監(jiān)聽客戶端連接
        • Selector輪詢。由Reactor線程N(yùn)ioEventLoop負(fù)責(zé)調(diào)度和執(zhí)行Selector輪詢操作,選擇準(zhǔn)備就緒的Channel集合
          • 參考源代碼NioEventLoopGroup、NioEventLoop的源代碼實(shí)現(xiàn)
            • 會(huì)傳入SelectorProvider.provider用來打開Selector
            • 每一個(gè)NioEventLoop持有一個(gè)Selector
        • 當(dāng)輪詢到準(zhǔn)備就緒的Channel之后,就 由 Reactor線 程 NioEventLoop執(zhí)ChannelPipeline的相應(yīng)方法,最終調(diào)度并執(zhí)行ChannelHandler
          • 源碼請(qǐng)參考ChannelPipeline
          • fireXXX->DefaultChannelPipeline->AbstractChannelHandlerContext.invokeXXX(head)
        • 執(zhí)行Netty系統(tǒng)ChannelHandler和用戶添加定制的ChannelHandler

       I/O Request
           * via {@link Channel} or
           * {@link ChannelHandlerContext}
           * |
           * +---------------------------------------------------+---------------+
           * | ChannelPipeline | |
           * | \|/ |
           * | +---------------------+ +-----------+----------+ |
           * | | Inbound Handler N | | Outbound Handler 1 | |
           * | +----------+----------+ +-----------+----------+ |
           * | /|\ | |
           * | | \|/ |
           * | +----------+----------+ +-----------+----------+ |
           * | | Inbound Handler N-1 | | Outbound Handler 2 | |
           * | +----------+----------+ +-----------+----------+ |
           * | /|\ . |
           * | . . |
           * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
           * | [ method call] [method call] |
           * | . . |
           * | . \|/ |
           * | +----------+----------+ +-----------+----------+ |
           * | | Inbound Handler 2 | | Outbound Handler M-1 | |
           * | +----------+----------+ +-----------+----------+ |
           * | /|\ | |
           * | | \|/ |
           * | +----------+----------+ +-----------+----------+ |
           * | | Inbound Handler 1 | | Outbound Handler M | |
           * | +----------+----------+ +-----------+----------+ |
           * | /|\ | |
           * +---------------+-----------------------------------+---------------+
           * | \|/
           * +---------------+-----------------------------------+---------------+
           * | | | |
           * | [ Socket.read() ] [ Socket.write() ] |
           * | |
           * | Netty Internal I/O Threads (Transport Implementation) |
           * +-------------------------------------------------------------------+
           

      • Netty服務(wù)端創(chuàng)建源碼分析
        • EventLoopGroup acceptorGroup = new NioEventLoopGroup()、EventLoopGroup IOGroup = new NioEventLoopGroup()
          • acceptor線程池
          • io-processor線程池
          • 并不是必須要?jiǎng)?chuàng)建兩個(gè)不同的EventLoopGroup, 也可以只創(chuàng)建一個(gè)并共享
        • TCP的backlog參數(shù)
          • backlog指定了內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù),對(duì)于給定的監(jiān)聽套接口,內(nèi)核要維護(hù)兩個(gè)隊(duì)列:未鏈接隊(duì)列和已連接隊(duì)列
            • 和tcp建立鏈接的三次握手相關(guān)
            • backlog被規(guī)定為兩個(gè)隊(duì)列總和的最大值
            • Netty默認(rèn)的backlog為100,Lighttpd中此值達(dá)到128x8,可根據(jù)實(shí)際場(chǎng)景和網(wǎng)絡(luò)狀況進(jìn)行靈活設(shè)置
        • TCP參數(shù)設(shè)置完成后,用戶可以為啟動(dòng)輔助類和其父類分別指定Handler。兩類Handler的用途不同
          • AbstractBootstrap#handler,指定父類的handler
          • ServerBootstrap#childHandler,指定子類的handler
          • 通過看源代碼,發(fā)現(xiàn)此書書中的描述有點(diǎn)不準(zhǔn)確(或者說不直觀),其實(shí)父類中的handler是添加到ServerSocketChannel的pipeline的,這個(gè)handler在server啟動(dòng)后就行執(zhí)行,如LoggingHandler
          • 子類的handler是添加導(dǎo)SocketChannel的pipeline的
        • 最后一步,就是綁定本地端口,啟動(dòng)服務(wù)
          • AbstractBootstrap#doBind、initAndRegister
          • ServerBootstrap#init(Channel channel)
            • 設(shè)置Socket參數(shù)和NioServerSocketChannel的附加屬性
            • 將AbstractBootstrap的Handler添加到 NioServerSocketChannel的 ChannelPipeline中
            • 將用于服務(wù)端注冊(cè)的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中
              • ServerBootstrapAcceptor#channelRead
                • child.pipeline().addLast(childHandler),將子handler加到了SocketChannel的pipeline中
        • 當(dāng) NioServerSocketChannel 初始化完成之后,需要將它注冊(cè)到Reactor線程的多路復(fù)用器上監(jiān)聽新客戶端的接入
          • AbstractChannel$AbstractUnsafe#register#register0
            • AbstractNioChannel#doRegister->注冊(cè)到NioEventLoop的Selector上
            • pipeline.fireChannelRegistered,觸發(fā)注冊(cè)事件,傳遞給pipeline,執(zhí)行父類的 handler
            • 另外判斷是否是NioEventLoop自身發(fā)起的操作。如果是,則不存在并發(fā)操作,直接執(zhí)行Channel注冊(cè):如果由其他線程發(fā)起,則封裝成一個(gè)Task放入消息隊(duì)列中異步執(zhí)行。此處,由于是由ServerBootstrap所在線程執(zhí)行的注冊(cè)操作,所以會(huì)將其封裝成Task投遞到NioEventLoop中執(zhí)行
          • ServerBootstrap#createChannel
            • EventLoop eventLoop = group().next()
              • 這里順序選取了一個(gè)線程,注意這里的group是指parentGroup
      • 客戶端接入源碼分析
        • 負(fù)責(zé)處理網(wǎng)絡(luò)讀寫、連接和客戶端請(qǐng)求接入的Reactor線程就是NioEventLoop,當(dāng)多路復(fù)用器檢測(cè)到新的準(zhǔn)備就緒的Channel時(shí),默認(rèn)執(zhí)行processSelectedKeysOptimized
          • NioEventLoop#run
          • 由于Channel的Attachment是NioServerSocketChannel, 所以執(zhí)processSelectedKey
            • 根據(jù)就緒的操作位,執(zhí)行不同的操作
            • unsafe.read()
              • NioMessageUnsafe#read
                • NioServerSocketChannel#doReadMessages
                  • 接收新的客戶端連接(調(diào)用accept)并創(chuàng)建NioSocketChannels
                  • 注意初始化new NioSocketChannel(this, childEventLoopGroup().next(), ch),即要傳入childGroup中的一個(gè)線程
                • 接收到新的客戶端連接后,觸發(fā)ChannelPipeline ChannelRead方法
                • 執(zhí)行headChannelHandlerContext的fireChannelRead 方法,事件在 ChannelPipeline中傳遞,執(zhí)行ServerBootstrapAcceptor的channelRead方法
                  • 將啟動(dòng)時(shí)傳入的childHandler加入到客戶端SocketChannel的 ChannelPipeline中
                  • 設(shè)置客戶端SocketChannel的TCP參數(shù)
                  • 注冊(cè)SocketChannel到多路復(fù)用器
      • 總結(jié)
        • 源代碼分析是基于netty-all-5.0.0.Alpha1版本,其實(shí)有些代碼和4.x還是有一些區(qū)別的
        • 對(duì)于NioServerSocketChannel和NioSocketChannel,在其構(gòu)造中均會(huì)指定readInterestOp
          • super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT)
            • AbstractNioMessageServerChannel
          • super(parent, eventLoop, ch, SelectionKey.OP_READ)
            • AbstractNioChannel

      客戶端創(chuàng)建

      • Bootstrap是Socket客戶端創(chuàng)建工具類,用戶通過Bootstrap可以方便地創(chuàng)建的客戶端并發(fā)起異步 TCP連接操作
      • Netty客戶端創(chuàng)建流程分析
        • 用戶線程創(chuàng)建Bootstrap實(shí)例,通過API設(shè)置創(chuàng)建客戶端相關(guān)的參數(shù),異步發(fā)起客戶端連接
        • 創(chuàng)建處理客戶端連接、I/O讀寫的Reactor線程組NioEventLoopGroup。可以通過構(gòu)造函數(shù)指定I/O線程的個(gè)數(shù),默認(rèn)為CPU內(nèi)核數(shù)的2倍
        • 通過Bootstrap的ChannelFactory和用戶指定的Channel類型創(chuàng)建用于客戶端連接的NioSocketChannel,它的功能類似于JDK NIO類庫提供的SocketChannel
        • 創(chuàng)建默認(rèn)的Channel Handler Pipeline, 用于調(diào)度和執(zhí)行網(wǎng)絡(luò)事件
          • 注意這里調(diào)用的是父類的handler方法
        • 異步發(fā)起TCP連接,判斷連接是否成功。如果成功,則直接將NioSocketChannel注冊(cè)到多路復(fù)用器上,監(jiān)聽讀操作位,用于數(shù)據(jù)報(bào)讀取和消息發(fā)送:如果沒有立即連接成功,則注冊(cè)連接監(jiān)聽位到多路復(fù)用器,等待連接結(jié)果
        • 注冊(cè)對(duì)應(yīng)的網(wǎng)絡(luò)監(jiān)聽狀態(tài)位到多路復(fù)用器
        • 由多路復(fù)用器在1/0現(xiàn)場(chǎng)中輪詢各Channel, 處理連接結(jié)果
        • 如果連接成功,設(shè)置Future結(jié)果,發(fā)送連接成功事件,觸發(fā)ChannelPipeline執(zhí)行
        • 由ChannelPipeline調(diào)度執(zhí)行系統(tǒng)和用戶的ChannelHandler, 執(zhí)行業(yè)務(wù)邏輯
      • Netty客戶端創(chuàng)建源碼分析
        • Bootstrap是 Netty提供的客戶端連接工具類,主要用于簡(jiǎn)化客戶端的創(chuàng)建
        • 客戶端相對(duì)于服務(wù)端,只需要一個(gè)處理I/O讀寫的線程組即可
        • Bootstrap也提供了客戶端TCP參數(shù)設(shè)置接口
          • SO_TIMEOUT 控制讀取操作將阻塞多少毫秒
          • SO_SNDBUF 套接字使用的發(fā)送緩沖區(qū)大小
          • SO_RCVBUF 套接字使用的接收緩沖區(qū)大小
          • SO_REUSEADDR
            • 用于決定如果網(wǎng)絡(luò)上仍然有數(shù)據(jù)向舊的ServerSocket傳輸數(shù)據(jù) ,是否允許新的 ServerSocket綁定到與舊的ServerSocket同樣的端口上
          • CONNECTTIMEOUTMILLIS
            • 客戶端連接超時(shí)時(shí)間,由于NIO原生的客戶端并不提供設(shè)置連接超時(shí)的接口,因此 , Netty采用的是自定義連接超時(shí)定時(shí)器負(fù)責(zé)檢測(cè)和超時(shí)控制
          • TCPN0DELAY
            • 激活或禁止TCP_NODELAY套接字選項(xiàng),它決定是否使用Nagle算法。如果是時(shí)延敏感型的應(yīng)用,建議關(guān)閉Nagle算法
        • 對(duì)于TCP客戶端連接,默認(rèn)使用NioSocketChannel
          • BootstrapChannelFactory利用channelClass類型信息 , 通過反射機(jī)制創(chuàng)建NioSocketChannel對(duì)象
        • Bootstrap為了簡(jiǎn)化Handle 的編排,提供 Channellnitializer,它繼承了 ChannelHandlerAdapter, 當(dāng) TCP鏈路注冊(cè)成功之后,調(diào)用initChannel接口,用于設(shè)置用戶ChannelHandler
          • ChannelInitializer#channelRegistered
            • initChannel
        • ChannelFuture f = b.connect(host, port).sync(),發(fā)起客戶端連接
      • 客戶端連接操作
        • Bootstrap#connect
          • doConnect
            • 首先要?jiǎng)?chuàng)建和初始化NioSocketChannel
              • AbstractBootstrap#initAndRegister
                • createChannel-channelFactory().newChannel(group().next())
                  • 這里的group為父類的parentGroup
                • init(channel)
                • 初始化Channel之后,將其注冊(cè)到Selector上
                  • channel.unsafe().register(regFuture)
            • doConnect0
              • 從該操作開始,連接操作切換到了Netty的NIO線程N(yùn)ioEventLoop中進(jìn)行,此時(shí)客戶端返回,連接操作異步執(zhí)行
            • doConnectO最終調(diào)用HeadHandler的connect方法
              • AbstractNioUnsafe#connect
                • NioSocketChannel#doConnect
                  • javaChannel().socket().bind(localAddress)
                  • boolean connected = javaChannel().connect(remoteAddress)
                  • if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT)}
                • 異步連接返回之后,需要判斷連接結(jié)果,如果連接成功,則觸發(fā)ChannelActive事件,否則注冊(cè)SelectionKey.OP_CONNECT到多路復(fù)用器
        • 異步連接結(jié)果通知
          • NioEventLoop的 Selector輪詢客戶端連接Channel,當(dāng)服務(wù)端返回握手應(yīng)答之后,對(duì)連接結(jié)果進(jìn)行判斷
            • NioEventLoop#processSelectedKey
            • 監(jiān)聽SelectionKey.OP_CONNECT
              • AbstractNioUnsafe#finishConnect
                • doFinishConnect
                  • NioSocketChannel#doFinishConnect,判斷JDK的SocketChannel的連接結(jié)果
                • 連接成功之后,調(diào)用fulfillConnectPromise方法,觸發(fā)鏈路激活事件,該事件由ChannelPipeline進(jìn)行傳播
      • 客戶端連接超時(shí)機(jī)制
        • 在創(chuàng)建Netty客戶端的時(shí)候,可以通過ChannelOption.CONNECTTIMEOUTMILLIS配置項(xiàng)設(shè)置連接超時(shí)時(shí)間
        • 發(fā)起連接的同時(shí),啟動(dòng)連接超時(shí)檢測(cè)定時(shí)器
          • AbstractNioUnsafe.connect
            • 一旦超時(shí)定時(shí)器執(zhí)行,說明客戶端連接超時(shí)
            • 如果在連接超時(shí)之前獲取到連接結(jié)果,則刪除連接超時(shí)定時(shí)器,防止其被觸發(fā)
              • AbstractNioUnsafe#finishConnect中finally塊的處理

      ByteBuf和相關(guān)輔助類

      • NIO的ByteBuffer的主要缺點(diǎn)
        • 長度固定,一旦分配完成,它的容量不能動(dòng)態(tài)擴(kuò)展和收縮
        • 只有一個(gè)標(biāo)識(shí)位置的指針position, 讀寫的時(shí)候需要手工調(diào)用flip和rewind等,使用者必須小心謹(jǐn)慎地處理這些API
        • 一些高級(jí)和實(shí)用的特性不支持
      • Netty的ByteBuf
        • 7種 ava基礎(chǔ)類型、byte數(shù)組、ByteBuffer (ByteBuf) 等的讀寫
        • 緩沖區(qū)自身的copy和 slice等
        • 設(shè)置網(wǎng)絡(luò)字節(jié)序
        • 構(gòu)造緩沖區(qū)實(shí)例
        • 操作位置指針等方法
          • ByteBuf通過兩個(gè)位置指針來協(xié)助緩沖區(qū)的讀寫操作,讀操作使用readerlndex, 寫操作使用 writerlndex
          • 由于寫操作不修改readerlndex指針,讀操作不修改writerlndex指針,因此讀寫之間不再需要調(diào)整位置指針,這極大地簡(jiǎn)化了緩沖區(qū)的讀寫操作
        • ByteBuf會(huì)自動(dòng)進(jìn)行動(dòng)態(tài)擴(kuò)展
      • ByteBuf功能介紹
        • 順序讀操作(read)
        • 順序?qū)懖僮?write)
        • readerlndex和writerlndex
          • 調(diào)用 ByteBuf的read 操作時(shí),從readerlndex處開始讀取。readerlndex到 writerlndex之間的空間為可讀的字節(jié)緩沖區(qū);從 writerlndex到 capacity之間為可寫的字節(jié)緩沖區(qū);0到readerlndex 之間是已經(jīng)讀取過的緩沖區(qū),可以調(diào)用discardReadBytes操作來重用這部分空間,以節(jié)約內(nèi)存,防止ByteBuf的動(dòng)態(tài)擴(kuò)張
        • Discardable bytes
          • 將已讀的字節(jié)部分丟棄
          • 調(diào)用discardReadBytes會(huì)發(fā)生字節(jié)數(shù)組的內(nèi)存復(fù)制,所以,頻繁調(diào)用將會(huì)導(dǎo)致性能下降
        • Readable bytes 和 Writable bytes
          • 可讀空間段是數(shù)據(jù)實(shí)際存儲(chǔ)的區(qū)域,以read或者skip開頭的任何操作都將會(huì)從 readerlndex開始讀取或者跳過指定的數(shù)據(jù),操作完成之后readerlndex增加了讀取或者跳 過的字節(jié)數(shù)長度
          • 可寫空間段是尚未被使用可以填充的空閑空間,任何以write開頭的操作都會(huì)從writerlndex開始向空閑空間寫入字節(jié),操作完成之后writerlndex增加了寫入的字節(jié)數(shù)長度
        • Clear操作
        • Mark和Rest操作
        • 查找操作
        • Derived buffers
          • duplicate 返回當(dāng)前ByteBuf的復(fù)制對(duì)象,共享緩沖區(qū),讀寫索引獨(dú)立
          • copy復(fù)制一個(gè)新的ByteBuf對(duì)象,內(nèi)容和索引都獨(dú)立
          • slice 可讀子緩沖區(qū)(起始位置從readerlndex到 writerlndex),共享內(nèi)容,讀寫索引獨(dú)立維護(hù)
        • 轉(zhuǎn)換成標(biāo)準(zhǔn)的ByteBuffer
          • ByteBuf#nioBuffer、nioBuffer(index,length)
          • 返回后的ByteBuffer無法感知原ByteBuf的動(dòng)態(tài)擴(kuò)展操作
        • 隨機(jī)讀寫(set和 get)
      • 源碼概要分析
        • 從內(nèi)存分配的角度看,ByteBuf可以分為兩類
          • 堆內(nèi)存(HeapByteBuf) 字節(jié)
            • 可以被JVM 自動(dòng)回收;缺點(diǎn)就是如果進(jìn)行Socket的 I/O 讀寫,需要額外做一次內(nèi)存復(fù)制,將堆內(nèi)存對(duì)應(yīng)的緩沖區(qū)復(fù)制到內(nèi)核Channel中
          • 直接內(nèi)存(DirectByteBuf) 字節(jié)
            • 區(qū):非堆內(nèi)存,它在堆外進(jìn)行內(nèi)存分配;但是將它寫入或者從Socket Channel中讀取時(shí),由于少了一次內(nèi)存復(fù)制,速度比堆內(nèi)存快
          • ByteBuf的最佳實(shí)踐是在I/O通信線程的讀寫緩沖區(qū)使用DirectByteBuf, 后端業(yè)務(wù)消息的編解碼模塊使用HeapByteBuf, 這樣組合可以達(dá)到性能最優(yōu)
          • 從內(nèi)存回收角度看,ByteBuf也分為兩類:基于對(duì)象池的ByteBuf和普通ByteBuf。兩者的主要區(qū)別就是基于對(duì)象池的ByteBuf可以重用ByteBuf對(duì)象,它自己維護(hù)了一個(gè)內(nèi)存池,可以循環(huán)利用創(chuàng)建的ByteBuf,提升內(nèi)存的使用效率,降低由于高負(fù)載導(dǎo)致的頻繁GC
        • AbstractByteBuf
          • ByteBuffer的一個(gè)最大的缺點(diǎn)就是一旦完成分配之后不能動(dòng)態(tài)調(diào)整其容量。由于很多場(chǎng)景下我們無法預(yù)先判斷需要編碼和解碼的POJO對(duì)象長度,因此只能根據(jù)經(jīng)驗(yàn)數(shù)據(jù)給個(gè)估計(jì)值。如果這個(gè)值偏大,就會(huì)導(dǎo)致內(nèi)存的浪費(fèi);如果這個(gè)值偏小,遇到大消息編碼的時(shí)候就會(huì)發(fā)生緩沖區(qū)溢出異常。使用者需要自己捕獲這個(gè)異常,并重新計(jì)算緩沖區(qū)的大小,將原來的內(nèi)容復(fù)制到新的緩沖區(qū)中,然后重置指針。這種處理策略對(duì)用戶非常不友好,而且稍有不慎,就會(huì)引入新的問題
          • 采用倍增或者步進(jìn)算法,動(dòng)態(tài)擴(kuò)張需要進(jìn)行內(nèi)存復(fù)制,頻繁的內(nèi)存復(fù)制會(huì)導(dǎo)致性能下降;采用先倍增后步進(jìn)
        • AbstractReferenceCountedByteBuf
          • 引用計(jì)數(shù)
          • ReferenceCounted
            • A reference-counted object that requires explicit deallocation
            • retain
              • increases the reference count
            • release
              • decreases the reference count
        • UnpooledHeapByteBuf
          • UnpooledHeapByteBuf是基于堆內(nèi)存進(jìn)行內(nèi)存分配的字節(jié)緩沖區(qū),它沒有基于對(duì)象池 技術(shù)實(shí)現(xiàn),這就意味著每次I/O 的讀寫都會(huì)創(chuàng)建個(gè)新的UnpooledHeapByteBuf
        • PooledByteBuf
          • PoolArena,Netty的內(nèi)存池實(shí)現(xiàn)類
          • 為了集中管理內(nèi)存的分配和釋放,同時(shí)提高分配和釋放內(nèi);時(shí)候的性能,很多框架和應(yīng)用都會(huì)通過預(yù)先申請(qǐng)一大塊內(nèi)存,然后通過提供相應(yīng)的分配和釋放接口來使用內(nèi)存。這樣一來,對(duì)內(nèi)存的管理就被集中到幾個(gè)類或者函數(shù)中,由于不再頻繁使用系統(tǒng)調(diào)用來申請(qǐng)和釋放內(nèi)存,應(yīng)用或者系統(tǒng)的性能也會(huì)大大提髙。在這種設(shè)計(jì)思路下,預(yù)先申請(qǐng)的那一大塊內(nèi)存就被稱為Memory Arena
          • Netty的 PoolArena是由多個(gè)Chunk組成的大塊內(nèi)存區(qū)域,而每個(gè)Chunk則由一個(gè)或者多個(gè)Page組成
          • 由于采用內(nèi)存池實(shí)現(xiàn),所以新創(chuàng)建PooledDirectByteBuf對(duì)象時(shí)不能直接new —個(gè)實(shí)例,而是從內(nèi)存池中獲取,然后設(shè)置引用計(jì)數(shù)器的值
        • ByteBufHolder
          • 相當(dāng)于協(xié)議的消息體容器
        • ByteBufAllocator
          • responsible to allocate buffers
        • CompositeByteBuf
          • A virtual buffer which shows multiple buffers as a single merged buffer
          • CompositeByteBuf允許將多個(gè)ByteBuf的實(shí)例組裝到一起,形成一個(gè)統(tǒng)一的視圖
          • 如某個(gè)協(xié)議POJO對(duì)象包含兩部分:消息頭和消息體,它們都是ByteBuf對(duì)象。當(dāng)需要對(duì)消息進(jìn)行編碼的時(shí)候需要進(jìn)行整合
        • ByteBufUtil
          • A collection of utility methods that is related with handling ByteBuf
          • encodeString、decodeString、hexDump

      Channel 和 Unsafe

      • 類似于NIO的 Channel, Netty提供了自己的Channel和其子類實(shí)現(xiàn),用于異步I/O操作和其他相關(guān)的操作
      • Unsafe是個(gè)內(nèi)部接口,聚合在Channel中協(xié)助進(jìn)行網(wǎng)絡(luò)讀寫相關(guān)的操作,因?yàn)樗脑O(shè)計(jì)初衷就是Channel的內(nèi)部輔助類,不應(yīng)該被Netty框架的上層使用者調(diào)用,所以被命名為Unsafe
      • io.netty.channd.Channel是Netty網(wǎng)絡(luò)操作抽象類,它聚合了一組功能,包括但不限于網(wǎng)路的讀、寫,客戶端發(fā)起連接,主動(dòng)關(guān)閉連接,鏈路關(guān)閉,獲取通信雙方的網(wǎng)絡(luò)地址等。它也包含了Netty框架相關(guān)的一些功能,包括獲取該Channel的EventLoop , 獲取緩沖分配器 ByteBufAllocator和pipeline等
      • 為什么不使用JDK NIO 原生的Channel
        • JDK的SocketChannel和ServerSocketChannel沒有統(tǒng)一的Channel接口
        • JDK的SocketChannel和ServerSocketChannel的主要職責(zé)就是網(wǎng)絡(luò) I/O 操作,由于它們是SPI(service provider interface)類接口,由具體的虛擬機(jī)廠家來提供;直接實(shí)現(xiàn)SocketChannel 和 ServerSocketChannel抽象類,其工作量和重新開發(fā)一個(gè)新的 Channel 功能類是差不多的
        • Netty的Channel需要能夠跟 Netty的整體架構(gòu)融合在一起,例如 I/O 模型、基于 ChannelPipeline 的定制模型,以及基于元數(shù)據(jù)描述配置化的 TCP參數(shù)等,這些 JDK的SocketChannel和 ServerSocketChannel都沒有提供,需要重新封裝
        • 自定義的Channel, 功能實(shí)現(xiàn)更加靈活
      • Channel的功能介紹
        • 網(wǎng)絡(luò)I/O操作
          • read、write、flush、close、disconnect、close、connect、bind等
          • ctx.close() starts to flow through the ChannelPipeline from the point of the ChannelHandlerContext while ctx.channel().close() will start from the tail of the ChannelPipeline all the time
          • 從NioSocketChannel的doDisconnect實(shí)現(xiàn)來看,其直接調(diào)用了doClose,所以從TCP一層面上可以理解,disconnect和close一樣
        • 其他
          • 通過eventLoop()方法可以獲取到Channel注冊(cè)的EventLoop
          • 通過metadata()方法就可以獲取當(dāng)前Channel的TCP參數(shù)配置
          • parent()。對(duì)于服務(wù)端Channel而言,它的父Channel為空:對(duì)于客戶端Channel, 它的 Channel就是創(chuàng)建它的ServerSocketChannel
          • 用戶獲取Channel標(biāo)識(shí)的id
            • ChannelId、DefaultChannelId
      • Channel源碼分析
        • AbstractChannel
          • 聚合了所有Channel使用到的能力對(duì)象,由AbstractChannel提供初始化和統(tǒng)一封裝
          • 網(wǎng)絡(luò)讀寫操作會(huì)觸發(fā)CharmelPipeline對(duì)應(yīng)的事件方法。Netty基于事件驅(qū)動(dòng),我們也可以理解為當(dāng)Chnanel進(jìn)行I/O 操作時(shí)產(chǎn)生生對(duì)應(yīng)的I/O 事件,然后驅(qū)動(dòng)事件在ChannelPipeline中傳播,由對(duì)應(yīng)的ChannelHandler對(duì)事件進(jìn)行攔截和處理
          • 網(wǎng)絡(luò) I/O 操作直接調(diào)用 DefaultChannelPipeline 的相關(guān)方法,由DefaultChannelPipeline中對(duì)應(yīng)的ChannelHandler進(jìn)行具體的邏輯處理
          • 提供了一些公共API具體實(shí)現(xiàn) 例如localAddress()和remoteAddress()
        • AbstractNioChannel
          • Abstract base class for {@link Channel} implementations which use a Selector based approach.
          • doRegister、doBeginRead
        • AbstractNioByteChannel
          • {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes
          • doWrite,注意處理半包
            • 注意環(huán)形數(shù)組-ChannelOutboundBuffer#Entry[] buffer
        • AbstractNioMessageChannel
          • {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages
          • 一個(gè)發(fā)送的是ByteBuf或者FileRegion, 它們可以直接被發(fā)送:另一個(gè)發(fā)送的則是POJO對(duì)象
        • AbstractNioMessageServerChannel
          • 定義了一個(gè) EventLoopGroup 類型的childGroup, 用于給新接入的客戶端NioSocketChannel分配EventLoop
        • NioServerSocketChannel
          • 很多方法實(shí)現(xiàn)通過調(diào)用javaChannel
          • doReadMessages,javaChannel().accept()
            • new NioSocketChannel(this, childEventLoopGroup().next(), ch)
        • NioSocketChannel
          • doConnect
            • javaChannel().socket().bind
            • javaChannel().connect
          • doWrite
            • ChannelConfig#getWriteSpinCount:Returns the maximum loop count for a write operation
          • doReadBytes
            • Read bytes into the given ByteBuf and return the amount
      • Unsafe
        • Unsafe接口實(shí)際上是Channel接口的輔助接口
        • 實(shí)際的I/O讀寫操作都是由Unsafe接口負(fù)責(zé)完成的
        • AbstractChannel$AbstractUnsafe
          • register
          • bind
          • disconnect
            • doDisconnect() might have closed the channel
          • close
            • 從disconnect和close的實(shí)現(xiàn)來看,close做的工作更多,如首先判斷是否處于刷新狀態(tài),如果處于刷新狀態(tài)說明還有消息尚未發(fā)送出去,需要等到所有消息發(fā)送完成再關(guān)閉鏈路等
          • write
            • 實(shí)際上將消息添加到環(huán)形發(fā)送數(shù)組中
          • flush#flush0#doWrite
          • AbstractNioUnsafe
          • NioByteUnsafe
            • read
            • AdaptiveRecvByteBufAllocator,緩沖區(qū)大小可以動(dòng)態(tài)調(diào)整的ByteBuf分配器
              • Netty根據(jù)上次實(shí)際讀取的碼流大小對(duì)下次的接收Buffer緩沖區(qū)進(jìn)行預(yù)測(cè)和調(diào)整,能夠最大限度地滿足不同行業(yè)的應(yīng)用場(chǎng)景
              • 其根據(jù)本次讀取的實(shí)際字節(jié)數(shù)對(duì)下次接收緩沖區(qū)的容量進(jìn)行動(dòng)態(tài)調(diào)整
            • doReadBytes
            • 完成一次異步讀之后,就會(huì)觸發(fā)一次ChannelRead事件
              • 在沒有做仟何半包處理的情況下,以 ChannelRead的觸發(fā)次數(shù)做計(jì)數(shù)器來進(jìn)行性能分析和統(tǒng)計(jì),是完全錯(cuò)誤的
            • 連續(xù)讀操作做上限控制,默認(rèn)值為16次,無論TCP緩沖區(qū)有多少碼流需要讀取,只要連續(xù)16次沒有讀完,都需要強(qiáng)制退出,等待下次selector輪詢周期再執(zhí)行
            • 完成多路復(fù)用器本輪讀操作之后,觸發(fā)ChannelReadComplete事件

      ChannelPipeline和ChannelHandler

      • Netty的 Channel過濾器實(shí)現(xiàn)原理與ServletFilter機(jī)制一致,它將Channel的數(shù)據(jù)管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動(dòng)和傳遞。ChannelPipeline 持有 I/O 事件攔截器ChannelHandler的鏈表,由ChannelHandler對(duì) I/O 事件進(jìn)行攔截和處理,可以 方便地通過新增和刪除ChannelHandler來實(shí)現(xiàn)不同的業(yè)務(wù)邏輯定制,不需要對(duì)已有的ChannelHandler進(jìn)行修改,能夠?qū)崿F(xiàn)對(duì)修改封閉和對(duì)擴(kuò)展的支持
      • ChannelPipeline 的事件處理
        • 底層的SocketChannel read方法讀取 ByteBuf,觸發(fā) ChannelRead 事件,由 I/O線程 NioEventLoop 調(diào)用 ChannelPipeline 的 fireChannelRead(Object msg)方法,將消息 (ByteBuf) 傳輸?shù)?ChannelPipeline中
        • 消息依次被 HeadHandler、ChannelHandlerl、ChannelHandler2 ... TailHandler 攔 截和處理,在這個(gè)過程中,任何ChannelHandler都可以中斷當(dāng)前的流程,結(jié)束消息的傳遞
        • 調(diào)用ChannelHandlerContext的 write方法發(fā)送消息,消息從TailHandler開始,途經(jīng) ChannelHandlerN ... ChannelHandlerl、HeadHandler, 最終被添加到消息發(fā)送緩沖區(qū)中等待刷新和發(fā)送,在此過程中也可以中斷消息的傳遞
        • Netty中的事件分為inbound事件和outbound事件。inbound事件通常由I/O 線程觸發(fā)
          • ChannelHandlerContext#fireChannelRegistered
          • ChannelHandlerContext#fireChannelActive
          • ChannelHandlerContext#fireChannelRead
          • ChannelHandlerContext#fireChannelReadComplete
          • ChannelHandlerContext#fireExceptionCaught
          • ChannelHandlerContext#fireUserEventTriggered
          • ChannelHandlerContext#fireChannelWritabilityChanged
          • ChannelHandlerContext#fireChannelInactive
            • 舉例如DefaultChannelHandlerContext#fireChannelRead
              • findContextInbound(MASKCHANNELREAD)
                • Inbound
        • Outbound事件通常是由用戶主動(dòng)發(fā)起的網(wǎng)絡(luò)I/O操作
          • bind、connect、write、flush、read、disconnect、close
            • 舉例如DefaultChannelHandlerContext#write
              • findContextOutbound(MASK_WRITE)
                • Outbound
      • 自定義攔截器
        • 通常ChannelHandler只需要繼承 ChannelHandlerAdapter類覆蓋自己關(guān)心的方法即可
      • 構(gòu)建pipeline
        • 使用ServerBootstrap或者Bootstrap啟動(dòng)服務(wù)端或者客戶端時(shí),Netty會(huì)為每個(gè)Channel連接創(chuàng)建個(gè)獨(dú)立的pipeline
        • 對(duì)于使用者而言,只需要將自定義的攔截器加入到pipeline中即可
        • 對(duì)于類似編解碼這樣的ChannelHandler,它存在先后順序
        • Pipeline支持指定位置添加或者刪除攔截器
      • ChannelPipeline源碼分析
        • ChannelPipeline支持運(yùn)行期動(dòng)態(tài)修改,線程安全
        • 其內(nèi)部維護(hù)里一個(gè)鏈表(DefaultChannelHandlerContext.next/prev)和name2ctx的Map
      • ChannelPipeline 的 inbound 事件
        • pipeline中以fireXXX命名的方法都是從I/O線程流向用戶業(yè)務(wù)Handler的inbound事件
          • head.fireXXX 調(diào)用HeadHandler對(duì)應(yīng)的fireXXX方法
            • DefaultChannelHandlerContext#fireXXX
          • 執(zhí)行相關(guān)邏輯
      • ChannelPipeline的outbound 事件
        • 由用戶線程或者代碼發(fā)起的I/O操作被稱為outbound事件
        • Pipeline本身并不直接進(jìn)行 I/O 操作,在前面對(duì)Channel和Unsafe的介紹中我們知道 最終都是由Unsafe和Channel 來實(shí)現(xiàn)真正的 I/O操作的。 Pipeline負(fù)責(zé)將 I/O 事件通TailHandler 行調(diào)度和傳播,最終調(diào)用Unsafe的I/O方法進(jìn)行I/O操作
        • 整體由NioEventLoop進(jìn)行驅(qū)動(dòng)
      • ChannelHandler
        • 基于ChannelHandler接口,用戶可以方便地進(jìn)行業(yè)務(wù)邏輯定制
        • ChannelHandler 支持注解,Shareble和Skip(被Skip注解的方法不會(huì)被調(diào)用)
      • ChannelHandlerAdapter
        • 所有方法都被加了@Skip注解,這些方法在執(zhí)行的過程中會(huì)被忽略,直接跳到下一個(gè)ChannelHandler中執(zhí)行對(duì)應(yīng)的方法
      • ByteToMessageDecoder、MessageToMessageDecoder
      • LengthFieldBasedFrameDecoder
        • lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip
          • the offset of the length field
          • the length of the length field
          • the compensation value to add to the value of the length field
          • the number of first bytes to strip out from the decoded frame
            • lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
            • frameLength = getUnadjustedFrameLength 根據(jù)offset和fieldLen獲取幀長度
            • frameLength += lengthAdjustment + lengthFieldEndOffset
            • int actualFrameLength = frameLengthInt - initialBytesToStrip
              • 可以看到這個(gè)lengthAdjustment是個(gè)加法
              • in some protocols, the length field represents the length of the whole message, including the message header
        • 詳見LengthFieldBasedFrameDecoder的javadoc注釋和源代碼
      • MessageToByteEncoder
      • MessageToMessageEncoder
      • LengthFieldPrepender
        • An encoder that prepends the length of the message
        • lengthIncludesLengthFieldLength,消息長度將包含長度本身占用的字節(jié)數(shù)
          • length = msg.readableBytes() + lengthAdjustment
          • if (lengthIncludesLengthFieldLength) length += lengthFieldLength

      EventLoop和EventLoopGroup

      • Reactor單線程模型,是指所有的I/O操作都在同一個(gè)NIO線程上面完成
        • 由于Reactor模式使用的是異步非阻塞I/O, 所有的I/O操作都不會(huì)導(dǎo)致阻塞,理論上一個(gè)線程可以獨(dú)立處理所有I/O相關(guān)的操作
      • Reactor多線程模型與單線程模型最大的區(qū)別就是有一組NIO線程來處理I/O操作
        • 有專門一個(gè)NIO線程 — Acceptor線程用于監(jiān)聽服務(wù)端
        • 網(wǎng)絡(luò)I/O操作讀、寫等由一個(gè)NIO線程池負(fù)責(zé)
        • 1個(gè)NIO線程可以同時(shí)處理N條鏈路,但是1個(gè)鏈路只對(duì)應(yīng)一個(gè)NIO線程
          • 1個(gè)NIO線程負(fù)責(zé)監(jiān)聽和處理所有的客戶端連接可能會(huì)存在性能問題
      • 主從Reactor多線程模型
        • 服務(wù)端用于接收客戶端連接的不再是一個(gè)單獨(dú)的NIO線程,而是一個(gè)獨(dú)立的NIO線程池
      • 最佳實(shí)踐
        • 創(chuàng)建兩個(gè)NioEventLoopGroup,用于邏輯隔離NIOAcceptor和 NIO線程
        • 盡量不要在ChannelHandler中啟動(dòng)用戶線程(解碼后用于將POJO消息派發(fā)到后端業(yè)務(wù)線程的除外)
        • 解碼要放在NIO線程調(diào)用的解碼Handler中進(jìn)行
        • 如果業(yè)務(wù)邏輯操作非常簡(jiǎn)單,沒有復(fù)雜的業(yè)務(wù)邏輯計(jì)算,沒有可能會(huì)導(dǎo)致線程被阻塞的磁盤操作、數(shù)據(jù)庫操作、網(wǎng)路操作等,可以直接在NIO線程上完成業(yè)務(wù)邏輯編排,不需要切換到用戶線程
        • 如果業(yè)務(wù)邏輯處理復(fù)雜,不要在NIO線程上完成,建議將解碼后的POJO消息封裝成Task, 派發(fā)到業(yè)務(wù)線程池中由業(yè)務(wù)線程執(zhí)行,以保證NIO線程盡快被釋放,處理其他的I/O操作
      • NioEventLoop 設(shè)計(jì)原理
        • 并不是一個(gè)純粹的I/O線程,它除了負(fù)責(zé)I/O的讀寫之外
        • 通過調(diào)用 NioEventLoop 的 execute(Runnable task)方法實(shí)現(xiàn),Netty 有很多系統(tǒng)Task, 創(chuàng)建它們的主要原因是:當(dāng) I/O 線程和用戶線程同時(shí)操作網(wǎng)絡(luò)資源時(shí),為了防止并發(fā)操作導(dǎo)致的鎖競(jìng)爭(zhēng),將用戶線程的操作封裝成Task放入消息隊(duì)列中,由I/O 線程負(fù)責(zé)執(zhí)行,這樣就實(shí)現(xiàn)了局部無鎖化
        • 通過調(diào)用NioEventLoop的schedule處理定義任務(wù)
        • NioEventLoop中的run方法是SingleThreadEventExecutor中定義的抽象方法
      • NioEventLoop
        • 作為NIO框架的Reactor線程,NioEventLoop需要處理網(wǎng)絡(luò)I/O讀寫事件,因此它必須聚合一個(gè)多路復(fù)用器對(duì)象
        • rebuildSelector
          • 解決the infamous epoll 100% CPU bug
          • 在某個(gè)周期(例 如 100ms) 內(nèi)如果連續(xù)發(fā)生JV次空輪詢,說明觸發(fā)了 JDK NIO的epoll()死循環(huán)bug
        • 處理完I/O事件之后,NioEventLoop需要執(zhí)行非I/O操作的系統(tǒng)Task和定時(shí)任務(wù)
          • 為了保證兩者都能得到足夠的CPU時(shí)間被執(zhí)行,Netty提供了I/O比例供用戶定制
          • setIoRatio
            • The default value is{@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks

      Future 和 Promise

      • Netty強(qiáng)烈建議直接通過添加監(jiān)聽器的方式獲取I/O操作結(jié)果
        • 當(dāng) I/O操作完成之后,I/O線程會(huì)回調(diào)ChannelFuture中 GenericFutureListener的operationComplete方法,并把ChannelFuture對(duì)象當(dāng)作方法的入?yún)?/span>
      • 異步I/O 操作有兩類超時(shí):一個(gè)是TCP層面的I/O 超時(shí),另一個(gè)是業(yè)務(wù)邏輯層面的操作超時(shí)
      • Promise是可寫的Future,F(xiàn)uture自身并沒有寫操作相關(guān)的接口,Netty通 過Promise對(duì)Future進(jìn)行擴(kuò)展,用于設(shè)置I/O操作的結(jié)果
        • 循環(huán)判斷的原因是防止線程被意外喚醒導(dǎo)致的功能異常(虛假喚醒)
        • 由于在I/O線程中調(diào)用Promise的await或者sync方法會(huì)導(dǎo)致死鎖
          • checkDeadLock

      Netty架構(gòu)剖析

      • Netty采用了典型的三層網(wǎng)絡(luò)架構(gòu)進(jìn)行設(shè)計(jì)和開發(fā)
        • Reactor通信調(diào)度層
        • 職責(zé)鏈ChannelPipeline
        • 業(yè)務(wù)邏輯編排層(Service ChannelHandler)
      • 關(guān)鍵架構(gòu)質(zhì)量屬性
        • 高性能
          • 采用異步非阻塞的I/O類庫,基于Reactor模式
          • TCP接收和發(fā)送緩沖區(qū)使用直接內(nèi)存代替堆內(nèi)存
          • 支持通過內(nèi)存池的方式循環(huán)利用ByteBuf
          • 可配置的I/O 線程數(shù)、TCP參數(shù)
          • 采用環(huán)形數(shù)組緩沖區(qū)實(shí)現(xiàn)無鎖化并發(fā)編程
          • 合理地使用線程安全容器、原子類等
          • 關(guān)鍵資源的處理使用單線程串行化
          • 通過引用計(jì)數(shù)器及時(shí)地申請(qǐng)釋放不再被引用的對(duì)象
        • 可靠性
          • 鏈路有效性檢測(cè)(讀空閑超時(shí)機(jī)制、寫空閑超時(shí)機(jī)制)
          • 內(nèi)存保護(hù)機(jī)制
          • 優(yōu)雅停機(jī)
            • 優(yōu)雅停機(jī)往往需要設(shè)置個(gè)最大超時(shí)時(shí)間T,如果達(dá)到T后系統(tǒng)仍然沒有退出,則通過 Kill - 9 pid強(qiáng)殺當(dāng)前的進(jìn)程
        • 可定制性
          • 責(zé)任鏈模式、基于接口的開發(fā)、提供了大量工廠類、提供了大量的系統(tǒng)參數(shù)供用戶按需設(shè)置
        • 可擴(kuò)展性
          • 可以方便地進(jìn)行應(yīng)用層協(xié)議定制

      Java 多線程編程在 Netty中的應(yīng)用

      • Java內(nèi)存模型
        • 工作內(nèi)存和主內(nèi)存
        • Java內(nèi)存交互協(xié)議
        • 對(duì)于SUN的 JDK,在 Windows和 Linux操作系統(tǒng)上采用了內(nèi)核線程的實(shí)現(xiàn)方式
          • 這種線程由內(nèi)核來完成線程切換,內(nèi)核通過線程調(diào)度器對(duì)線程進(jìn)行調(diào)度,并負(fù)責(zé)將線程任務(wù)映射到不同的處理器上
      • Netty的并發(fā)編程實(shí)踐
        • 對(duì)共享的可變數(shù)據(jù)進(jìn)行正確的同步-synchronized
          • 鎖的范圍需要盡可能的小
        • 正確使用鎖
          • 始終使用wait循環(huán)來調(diào)用wait方法,永遠(yuǎn)不要在循環(huán)之外調(diào)用wait方法。這樣做的原因是盡管并不滿足被喚醒條件,但是由于其他線程調(diào)用notifyAlIO方法會(huì)導(dǎo)致被阻塞線程意外喚醒,此時(shí)執(zhí)行條件并不滿足
        • volatile的正確使用
          • 線程可見性
          • 禁止指令重排序優(yōu)化
          • volatile最適合使用的是個(gè)線程寫,其他線程讀的場(chǎng)合
        • CAS指令和原子類
          • 悲觀鎖
          • 樂觀鎖。簡(jiǎn)單地說,就是先進(jìn)行操作,操作完成之后再判斷操作是否成功,是否有并發(fā)問題,如果有則進(jìn)行失敗補(bǔ)償,如果沒有就算操作成功--CAS自旋
            • sun.misc.Unsafe
            • AtomicIntegerFieldUpdater
        • 線程安全類的應(yīng)用
          • ConcurrentLinkedQueue
          • JDK的線程安全容器底層采用了 CAS、volatile和 ReadWriteLock實(shí)現(xiàn),相比于傳統(tǒng) 重量級(jí)的同步鎖,采用了更輕量、細(xì)粒度的鎖,因此,性能會(huì)更高
          • Netty對(duì) JDK的線程池進(jìn)行了封裝和改造,但是,本質(zhì)上仍然是利用了線程池和線程安全隊(duì)列簡(jiǎn)化了多線程編程
        • 讀寫鎖的應(yīng)用
          • HashedWheelTimer
        • 線程安全性文檔說明
          • 在 Netty中,對(duì)于一些關(guān)鍵的類庫,給出了線程安全性的API DOC
        • 不要依賴線程優(yōu)先級(jí)
          • Netty中默認(rèn)的線程工廠實(shí)現(xiàn)類,開放了包含設(shè)置線程優(yōu)先級(jí)字段的構(gòu)造函數(shù)。這是 個(gè)錯(cuò)誤的決定.實(shí)際上JDK的線程優(yōu)先級(jí)是無法跨平臺(tái)正確運(yùn)行的

      高性能之道

      • I/O 通信性能三原則
        • 傳輸、協(xié)議、線程
      • Netty
        • 異步非阻塞通信
        • 高效的Reactor線程模型
          • 個(gè)人認(rèn)為采用第三種所謂主從模型的話,則需要綁定多個(gè)端口,每一個(gè)端口與一個(gè)boss thread綁定
            • 實(shí)際bind的時(shí)候才會(huì)創(chuàng)建NioServerSocketChannel
        • 無鎖化的串行設(shè)計(jì)
        • 高效的并發(fā)編程
        • 高性能的序列化框架
        • 零拷貝
          • Netty的接收和發(fā)送ByteBuffer采用DIRECT BUFFERS
          • 第二種“零拷貝”的實(shí)現(xiàn)CompositeByteBuf, 它對(duì)外將多個(gè)ByteBuf封裝成1個(gè)ByteBuf
          • 很多操作系統(tǒng)直接將文件緩沖區(qū)的內(nèi)容發(fā)送到目標(biāo)Channel中,而不需要通過循環(huán)拷貝的方式
        • 內(nèi)存池
          • PooledByteBufAllocator#DEFAULT#directBuffer
            • 使用內(nèi)存池分配器創(chuàng)建直接內(nèi)存緩沖區(qū)
            • 過 RECYCLER的 get方法循環(huán)使用ByteBuf對(duì)象,如果是非內(nèi)存池實(shí)現(xiàn),則直接創(chuàng)建 一個(gè)新的ByteBuf對(duì)象
              • setRefCnt方法設(shè)置引用計(jì)數(shù)器
          • Unpooled#directBuffer
            • 使用非堆內(nèi)存分配器創(chuàng)建的直接內(nèi)存緩沖區(qū)
        • 靈活的TCP參數(shù)配置能力

      可靠性

      • 網(wǎng)絡(luò)通信類故障
        • 客戶端連接超時(shí)
          • ChannelOption.CONNECTTIMEOUTMILLIS
          • 設(shè)置完連接超時(shí)之后,Netty在發(fā)起連接的時(shí)候,會(huì)根據(jù)超時(shí)時(shí)間創(chuàng)建ScheduledFuture 掛載在Reactor線程上,用于定時(shí)監(jiān)測(cè)是否發(fā)生連接超時(shí)
            • AbstractNioUnsafe#connect
            • 如果在超時(shí)期限內(nèi)處理完成連接操作,則取消連接超時(shí)定時(shí)任務(wù)
        • 通信對(duì)端強(qiáng)制關(guān)閉連接
          • 強(qiáng)制關(guān)閉客戶端,服務(wù)端己經(jīng)監(jiān)控到客戶端強(qiáng)制關(guān)閉了連接,釋放了連接句柄
            • I/O異常被統(tǒng)一處理,該異常向上拋,由NioByteUnsafe進(jìn)行統(tǒng)一異常處理
          • 鏈路關(guān)閉
            • 己方或者對(duì)方主動(dòng)關(guān)閉鏈接并不屬于異常場(chǎng)景,因此不會(huì)產(chǎn)生Exception事件通知 Pipeline
        • 定制I/O故障
          • 客戶端的斷連重連機(jī)制
          • 消息的緩存重發(fā)
          • 接口日志中詳細(xì)記錄故障細(xì)節(jié)
          • 運(yùn)維相關(guān)功能,例如告警、觸發(fā)郵件/短信等
            • Netty的處理策略是發(fā)生I/O 異常,底層的資源由它負(fù)責(zé)釋放,同時(shí)將異常堆找信息 以事件的形式通知給上層用戶,由用戶對(duì)異常進(jìn)行定制
        • 鏈路的有效性檢測(cè)
          • 心跳檢測(cè)的目的就是確認(rèn)當(dāng)前鏈路可用,對(duì)方活著并且能夠正常接收和發(fā)送消息
          • 心跳檢測(cè)機(jī)制
            • Ping-Pong型心跳:由通信一方定時(shí)發(fā)送Ping消息,對(duì)方接收到Ping消息之后,立即返回Pong應(yīng)答消息給對(duì)方,屬于請(qǐng)求-響應(yīng)型心跳
            • Ping-Ping型心跳:不區(qū)分心跳請(qǐng)求和應(yīng)答,由通信雙方按照約定定時(shí)向?qū)Ψ桨l(fā)送心跳Ping消息,它屬于雙向心跳
              • 連續(xù)N次心跳檢測(cè)都沒有收到對(duì)方的Pong應(yīng)答消息或者Ping請(qǐng)求消息,則認(rèn)為 鏈路己經(jīng)發(fā)生邏輯失效,這被稱作心跳超時(shí)
              • 讀取和發(fā)送心跳消息的時(shí)候如何直接發(fā)生了IO異常
            • Netty的心跳檢測(cè)實(shí)際上是利用了鏈路空閑檢測(cè)機(jī)制實(shí)現(xiàn)的
              • 讀空閑
              • 寫空閑
              • 讀寫空閑
                • 鏈路空閑的時(shí)候并沒有關(guān)閉鏈路,而是觸發(fā)IdleStateEvem事 件 ,用戶訂閱IdleStateEvent事件,用于自定義邏輯處理
          • Reactor線程的保護(hù)
            • 循環(huán)體內(nèi)一定要捕獲Throwable
            • 規(guī)避NIO BUG
          • 內(nèi)存保護(hù)
            • 鏈路總數(shù)的控制:每條鏈路都包含接收和發(fā)送緩沖區(qū),鏈路個(gè)數(shù)太多容易導(dǎo)致內(nèi) 存溢出
            • 單個(gè)緩沖區(qū)的上限控制
            • 緩沖區(qū)內(nèi)存釋放
            • NIO消息發(fā)送隊(duì)列的長度上限控制
              • 緩沖區(qū)的內(nèi)存泄漏保護(hù)
                • 為了提升內(nèi)存的利用率,Netty提供了內(nèi)存池和對(duì)象池.為了防止因?yàn)橛脩暨z漏導(dǎo)致內(nèi)存泄漏,Netty在 Pipeline的尾Handler中自動(dòng)對(duì)內(nèi)存進(jìn)行釋放
              • 實(shí)際的商用環(huán)境中,如果遇到畸形碼流攻擊、協(xié)議消息編碼異常、消息丟包等問題時(shí),可能會(huì)解析到一個(gè)超長的長度字段
              • 流量整形
                • Netty流量整形的原理是:對(duì)每次讀取到的ByteBuf可寫字節(jié)數(shù)進(jìn)行計(jì)算,獲取當(dāng)前的報(bào)文流量,然后與流量整形閾值對(duì)比。如果已經(jīng)達(dá)到或者超過了閾值。則計(jì)算等待時(shí)間delay, 將當(dāng)前的ByteBuf放到定時(shí)任務(wù)Task中緩存,由定時(shí)任務(wù)線程池在延遲delay之后繼續(xù)處理該ByteBuf
                • 用戶可以通過參數(shù)設(shè)置:報(bào)文的接收速率、報(bào)文的發(fā)送速率、整形周期
                • Netty也支持鏈路級(jí)的流量整形
          • 優(yōu)雅停機(jī)接口
      • 優(yōu)化建議

        • 發(fā)送隊(duì)列容量上限控制
          • 如果網(wǎng)絡(luò)對(duì)方處理速度比較慢,導(dǎo)致TCP滑窗長時(shí)間為0 ; 或者消息發(fā)送方發(fā)送速度 過快,或者一次批量發(fā)送消息量過大,都可能會(huì)導(dǎo)致ChannelOutboundBuffer的內(nèi)存膨脹, 這可能會(huì)導(dǎo)致系統(tǒng)的內(nèi)存溢出
          • 過啟動(dòng)項(xiàng)的ChannelOption設(shè)置發(fā)送隊(duì)列的長度或者通過-D 啟動(dòng)參數(shù)配置該長度
        • 回推發(fā)送失敗的消息
          • Mina的實(shí)現(xiàn),當(dāng)發(fā)生鏈路異常之后,Mina會(huì)將尚未發(fā)送的整包消息隊(duì)列封裝到異常對(duì)象中,然后推送給用戶Handler, 由用戶來決定后續(xù)的處理策略
      • 安全性

        • Netty通過Ssmandler提供了對(duì)SSL的支持,它支持的SSL協(xié)議類型包括:SSLV2、SSLV3 和 TLS
          • SSL單向認(rèn)證
          • SSL雙向認(rèn)證
          • 第三方CA認(rèn)證
        • Netty擴(kuò)展的安全特性
          • IP地址黑名單機(jī)制
          • 接入認(rèn)證

      展望

      • 屬性配置
        • 獲取見:SystemPropertyUtil
        • 如:io.netty.noUnsafe、io.netty.eventLoopThreads(默認(rèn)cpu個(gè)數(shù)*2)
    posted on 2017-01-19 22:00 landon 閱讀(3365) 評(píng)論(0)  編輯  收藏 所屬分類: Book
    主站蜘蛛池模板: 日韩免费毛片视频| 免费观看AV片在线播放| 吃奶摸下高潮60分钟免费视频| 亚洲视频精品在线观看| 手机看片国产免费永久| 国产色婷婷精品免费视频| 亚洲日本在线电影| 日本最新免费不卡二区在线| 亚洲日本中文字幕天天更新| 日本高清免费不卡视频| 国产成人精品亚洲| 亚洲国产精品自在拍在线播放 | 色综合久久精品亚洲国产| 无码国产精品久久一区免费 | 亚洲国产老鸭窝一区二区三区| 亚洲黄色网站视频| 亚洲精品免费在线观看| 久久精品国产亚洲AV香蕉| 国产成人yy免费视频| 国产精品亚洲综合久久| 日本不卡高清中文字幕免费| 免费国产黄网站在线看| 亚洲中文字幕无码一久久区| 久久久久久久99精品免费观看| 成人免费无码大片a毛片| 亚洲色大情网站www| 免费大片在线观看网站| 久久九九免费高清视频| 亚洲制服中文字幕第一区| 99视频全部免费精品全部四虎| 亚洲中久无码永久在线观看同| 亚洲人成网站色7799| 无码专区一va亚洲v专区在线 | 日本免费高清一本视频| 免费人成视频在线播放| 国产成人精品日本亚洲专区61| 色综合久久精品亚洲国产| 亚洲人成色777777在线观看| 久久久久久曰本AV免费免费| 青草久久精品亚洲综合专区| 久久亚洲高清观看|