本文將告訴你如何使用Netty2來編一個網(wǎng)絡(luò)應(yīng)用程序(包括客戶端和服務(wù)端)。我會介紹一個簡單的SumUp協(xié)議,用來對整數(shù)求和。通過源代碼的一步步講解,你會了解到Netty2的每個特性。

SumUp 協(xié)議

SumUp服務(wù)會加總從客戶端送來的ADD消息中的所有值,并且為每個ADD消息返回一個RESULT消息。所有消息都是由header和body兩部分組成:

netty_s1.png

header包含type和sequence兩個字段。type表示消息的類型(0是RESULT消息,1是ADD消息)。sequence用來表示一組對應(yīng)的ADD和RESULT(也就是說,服務(wù)器回應(yīng)ADD消息時,應(yīng)在RESULT中使用與ADD一樣的sequence值)。

ADD 消息

ADD消息包含了要被求和的值。

netty_s2.png

RESULT消息

RESULT具有不固定長度的消息體。當(dāng)計算沒問題時,body內(nèi)容是加總的值(4bytes),如果有錯誤或溢位,則是2bytes. 見下圖:

netty_s3.png

netty_s4.png

實(shí)現(xiàn)MessageRecognizer

MessageRecognizer從送來的數(shù)據(jù)中重組出Message對象。這兒我們實(shí)現(xiàn)了一個SumUpMessageRecognizer,用于客戶端和服務(wù)端的信息重組。

public class SumUpMessageRecognizer implements MessageRecognizer {
  
   public static final int CLIENT_MODE = 1;
  
   public static final int SERVER_MODE = 2;
  
   private int mode;
  
   public SumUpMessageRecognizer(int mode) {
        switch (mode) {
            case CLIENT_MODE:
                    case SERVER_MODE:
                 this.mode = mode;
                 break;
                     default:
                 throw new IllegalArgumentException("invalid mode: " + mode);
        }
   }
  
   public Message recognize(ByteBuffer buf) throws MessageParseException {
        // return null if message type is not arrived yet.
        if (buf.remaining() < Constants.TYPE_LEN)
             return null;
    
        int type = buf.getShort();
        switch (mode) {
            // 如果是server模式,只讓它接收ADD消息.
            case SERVER_MODE:
                 switch (type) {
                     case Constants.ADD:
                      return new AddMessage();
                     default:
                          throw new MessageParseException("unknown type: " + type);
                 }
            //如果是客戶端模式,只讓它接收RESULT消息.
            case CLIENT_MODE:
                         switch (type) {
                     case Constants.RESULT:
                          return new ResultMessage();
                     default:
                              throw new MessageParseException("unknown type: " + type);
                 }
            default:
                 throw new InternalError(); // this cannot happen
        }
   }
  }

實(shí)現(xiàn)ADD和RESULT消息

netty_s5.png

我們必須實(shí)現(xiàn)ADD和RESULT消息: ADD和RESULT。 它們都有公共的header,最好的方式是實(shí)現(xiàn)一個AbstractMessage,并且從它繼承出Add和Result消息。

源代碼:

實(shí)現(xiàn)協(xié)議處理流程

實(shí)現(xiàn)了Messagerecognizer和Message之后,要實(shí)現(xiàn)Server和Client是非常容易的事情,通過下面的代碼,你會很容易理解如何去實(shí)現(xiàn)協(xié)議的處理流程。

實(shí)現(xiàn)Server

實(shí)現(xiàn)服務(wù)端兩個主要的類,一個是Server類,另一個是ServerSessionListener. Server類負(fù)責(zé)啟動主程序并監(jiān)聽連接。而ServerSessionListener用于處理和發(fā)送消息。

public class Server {

private static final int SERVER_PORT = 8080;
private static final int DISPATCHER_THREAD_POOL_SIZE = 16;

public static void main(String[] args) throws Throwable {
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
  
  // 啟動缺省數(shù)量的I/O工作線程
  ioProcessor.start();
  
  // 啟動指定數(shù)量的event dispatcher 線程
  eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
  eventDispatcher.start();
  
  // 準(zhǔn)備 message recognizer
  MessageRecognizer recognizer = new SumUpMessageRecognizer(
  SumUpMessageRecognizer.SERVER_MODE);
  
  // 準(zhǔn)備session監(jiān)聽器,用于處理通訊過程.
 ServerSessionListener listener = new ServerSessionListener();
 
 // 開啟server socket通道
 ServerSocketChannel ssc = ServerSocketChannel.open();
 ssc.socket().bind(new InetSocketAddress(SERVER_PORT));
 
 // 監(jiān)聽連接,并開始通訊
 System.out.println("listening on port " + SERVER_PORT);
 for (;;) {
 // 接受connection
 SocketChannel channel = ssc.accept();
 
 // 建立新的session
 Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);
 
 // 添加session監(jiān)聽器
 session.addSessionListener(listener);
 
 // 開始通訊
 session.start();
 }
 }
 }
 

public class ServerSessionListener implements SessionListener {

        public ServerSessionListener() {
        }

        public void connectionEstablished(Session session) {
               System.out.println(session.getSocketAddress() + " connected");
               // 設(shè)置空閑時間為60秒
               session.getConfig().setIdleTime(60);
               // 設(shè)置sum的初始值為0。
               session.setAttachment(new Integer(0));
        }

        public void connectionClosed(Session session) {
               System.out.println(session.getSocketAddress() + " closed");
        }

        // 當(dāng)收到client發(fā)來的消息時,此方法被調(diào)用
        public void messageReceived(Session session, Message message) {
               System.out.println(session.getSocketAddress() + " RCVD: " + message);
               // client端只發(fā)送AddMessage. 其它情況要另作處理
               // 在這里只是簡單的進(jìn)行類型轉(zhuǎn)換處理
               AddMessage am = (AddMessage) message;
               // 將收到的消息里的值加上當(dāng)前sum的值.
               int sum = ((Integer) session.getAttachment()).intValue();
               int value = am.getValue();
               long expectedSum = (long) sum + value;
               if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
                       // 如果溢位返回錯誤消息
                       ResultMessage rm = new ResultMessage();
                       rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
                       rm.setOk(false);
                       session.write(rm);
               } else {
                       //  加總
                       sum = (int) expectedSum;
                       session.setAttachment(new Integer(sum));
                       // 返回結(jié)果消息
                       ResultMessage rm = new ResultMessage();
                       rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
                       rm.setOk(true);
                       rm.setValue(sum);
                       session.write(rm);
               }
        }

 

        public void messageSent(Session session, Message message) {
               System.out.println(session.getSocketAddress() + " SENT: " + message);
        }

 

        public void sessionIdle(Session session) {
               System.out.println(session.getSocketAddress()
                               + " disconnecting the idle");
               // 關(guān)閉空閑的會話。
               session.close();
        }

        // 異常發(fā)生時,將調(diào)用此方法
        public void exceptionCaught(Session session, Throwable cause) {
               System.out.println(Thread.currentThread().getName()
                               + session.getSocketAddress() + " exception:");
               cause.printStackTrace(System.out);

               if (cause instanceof MessageParseException) {
                       // 印出錯誤信息內(nèi)容,便于調(diào)試
                       MessageParseException mpe = (MessageParseException) cause;
                       ByteBuffer buf = mpe.getBuffer();
                       System.out.println(buf);
                       System.out.print("Buffer Content: ");
                       while (buf.remaining() > 0) {
                               System.out.print(buf.get() & 0xFF);
                               System.out.print(' ');
                       }

                       System.out.println();
               }
               // 關(guān)閉會話
               session.close();
        }
}

服務(wù)端運(yùn)行后,其輸出的內(nèi)容示例如下:

listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed

實(shí)現(xiàn)客戶端

跟服務(wù)端對應(yīng),主要由Client和ClientSessionListener組成。

public class Client {        
private static final String HOSTNAME = "localhost";         
private static final int PORT = 8080;         
private static final int CONNECT_TIMEOUT = 30; // seconds         
private static final int DISPATCHER_THREAD_POOL_SIZE = 4;         

public static void main(String[] args) throws Throwable {               
	// 預(yù)備要加總的值。               
	int[] values = new int[args.length];               
	for (int i = 0; i < args.length; i++) {                       
		values[i] = Integer.parseInt(args[i]);               
	}                
	// 初始化 I/O processor 和 event dispatcher               
	IoProcessor ioProcessor = new IoProcessor();               
	ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();                
	// 開始缺省數(shù)量的I/O工作線程               
	ioProcessor.start();                
	// 啟動指定數(shù)量的event dispatcher線程        
	eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE               
	eventDispatcher.start();                
	// 準(zhǔn)備 message recognizer               
	MessageRecognizer recognizer = new SumUpMessageRecognizer(                               
	SumUpMessageRecognizer.CLIENT_MODE);                
	// 準(zhǔn)備客戶端會話。               
	Session session = new Session(ioProcessor, new InetSocketAddress(                               
	HOSTNAME, PORT), recognizer, eventDispatcher);                                             
	session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);                              
	// 開始會話,并使用ClientSessionListener監(jiān)聽。               
	ClientSessionListener listener = new ClientSessionListener(values);               
	session.addSessionListener(listener);               
	session.start();                              
	// 一直等到加總完成               
	while ( !listener.isComplete() ) {                       
		Thread.sleep(1000);               
	}                              
	// 停止 I/O processor 和 event dispatcher               
	eventDispatcher.stop();               
	ioProcessor.stop();        
	} 
} 
public class ClientSessionListener implements SessionListener {
        private final int[] values;
        private boolean complete;
        public ClientSessionListener(int[] values) {
               this.values = values;
        }
        public boolean isComplete() {
               return complete;
        }
        // 當(dāng)連接建立好后會調(diào)用此方法。
        public void connectionEstablished(Session session) {
               System.out.println("connected to " + session.getSocketAddress());
               // 發(fā)送加總請求。
               for (int i = 0; i < values.length; i++) {
                       AddMessage m = new AddMessage();
                       m.setSequence(i);
                       m.setValue(values[i]);
                       session.write(m);
               }
        }
        public void connectionClosed(Session session) {
               System.out.println("disconnected from " + session.getSocketAddress());
        }
        // 當(dāng)收到server的回應(yīng)信息時,會調(diào)用此方法
        public void messageReceived(Session session, Message message) {
               System.out.println("RCVD: " + message);

               // 服務(wù)端只發(fā)送ResultMessage. 其它情況下
               // 要通過instanceOf來判斷它的類型.
               ResultMessage rm = (ResultMessage) message;
               if (rm.isOk()) {
                       // 如果ResultMessage是OK的.
                       // 根據(jù)ResultMessage的sequence值來判斷如果,
                       // 一次消息的sequence值,則
                       if (rm.getSequence() == values.length - 1) {
                               // 打印出結(jié)果.
                               System.out.println("The sum: " + rm.getValue());
                               // 關(guān)閉會話
                               session.close();
                               complete = true;
                       }
               } else {
                       // 如有錯誤,則打印錯誤信息,并結(jié)束會話.
                       System.out.println("server error, disconnecting...");
                       session.close();
                       complete = true;
               }
        }
 
        public void messageSent(Session session, Message message) {
	    	System.out.println("SENT: " + message);
        }
 
        public void sessionIdle(Session session) {
        }

         public void exceptionCaught(Session session, Throwable cause) {
               cause.printStackTrace(System.out);
               if (cause instanceof ConnectException) {
                       // 如果連接server失敗, 則間隔5秒重試連接.
                       System.out.println("sleeping...");
                       try {
                               Thread.sleep(5000);
                       } catch (InterruptedException e) {
                       }
                       System.out.println("reconnecting... " + session.getSocketAddress());
                       session.start();
               } else {
                       session.close();
               }
        }
 }

通過上面的例子,你也許會發(fā)現(xiàn)實(shí)現(xiàn)一個自定義的協(xié)議原來如此簡單。你如果用Netty試著去實(shí)現(xiàn)自己的smtp或pop協(xié)議,我想也不會是一件難事了。

Netty2的首頁在http://gleamynode.net/dev/projects/netty2/index.html,你可以在這找到本文的全部源碼。


Attachments:

netty_s1.png Info on netty_s1.png 3071 bytes
netty_s2.png Info on netty_s2.png 3142 bytes
netty_s3.png Info on netty_s3.png 4268 bytes
netty_s4.png Info on netty_s4.png 3316 bytes
netty_s5.png Info on netty_s5.png 13361 bytes