本文將告訴你如何使用Netty2來編一個網(wǎng)絡(luò)應(yīng)用程序(包括客戶端和服務(wù)端)。我會介紹一個簡單的SumUp協(xié)議,用來對整數(shù)求和。通過源代碼的一步步講解,你會了解到Netty2的每個特性。
SumUp服務(wù)會加總從客戶端送來的ADD消息中的所有值,并且為每個ADD消息返回一個RESULT消息。所有消息都是由header和body兩部分組成:
header包含type和sequence兩個字段。type表示消息的類型(0是RESULT消息,1是ADD消息)。sequence用來表示一組對應(yīng)的ADD和RESULT(也就是說,服務(wù)器回應(yīng)ADD消息時,應(yīng)在RESULT中使用與ADD一樣的sequence值)。
ADD消息包含了要被求和的值。
RESULT具有不固定長度的消息體。當(dāng)計算沒問題時,body內(nèi)容是加總的值(4bytes),如果有錯誤或溢位,則是2bytes. 見下圖:
MessageRecognizer從送來的數(shù)據(jù)中重組出Message對象。這兒我們實(shí)現(xiàn)了一個SumUpMessageRecognizer,用于客戶端和服務(wù)端的信息重組。
public class SumUpMessageRecognizer implements MessageRecognizer {
public static final int CLIENT_MODE = 1;
public static final int SERVER_MODE = 2;
private int mode;
public SumUpMessageRecognizer(int mode) {
switch (mode) {
case CLIENT_MODE:
case SERVER_MODE:
this.mode = mode;
break;
default:
throw new IllegalArgumentException("invalid mode: " + mode);
}
}
public Message recognize(ByteBuffer buf) throws MessageParseException {
// return null if message type is not arrived yet.
if (buf.remaining() < Constants.TYPE_LEN)
return null;
int type = buf.getShort();
switch (mode) {
// 如果是server模式,只讓它接收ADD消息.
case SERVER_MODE:
switch (type) {
case Constants.ADD:
return new AddMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
//如果是客戶端模式,只讓它接收RESULT消息.
case CLIENT_MODE:
switch (type) {
case Constants.RESULT:
return new ResultMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
default:
throw new InternalError(); // this cannot happen
}
}
}
我們必須實(shí)現(xiàn)ADD和RESULT消息: ADD和RESULT。 它們都有公共的header,最好的方式是實(shí)現(xiàn)一個AbstractMessage,并且從它繼承出Add和Result消息。
源代碼:
實(shí)現(xiàn)了Messagerecognizer和Message之后,要實(shí)現(xiàn)Server和Client是非常容易的事情,通過下面的代碼,你會很容易理解如何去實(shí)現(xiàn)協(xié)議的處理流程。
實(shí)現(xiàn)服務(wù)端兩個主要的類,一個是Server類,另一個是ServerSessionListener. Server類負(fù)責(zé)啟動主程序并監(jiān)聽連接。而ServerSessionListener用于處理和發(fā)送消息。
public class Server {
private static final int SERVER_PORT = 8080;
private static final int DISPATCHER_THREAD_POOL_SIZE = 16;
public static void main(String[] args) throws Throwable {
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 啟動缺省數(shù)量的I/O工作線程
ioProcessor.start();
// 啟動指定數(shù)量的event dispatcher 線程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
eventDispatcher.start();
// 準(zhǔn)備 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.SERVER_MODE);
// 準(zhǔn)備session監(jiān)聽器,用于處理通訊過程.
ServerSessionListener listener = new ServerSessionListener();
// 開啟server socket通道
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(SERVER_PORT));
// 監(jiān)聽連接,并開始通訊
System.out.println("listening on port " + SERVER_PORT);
for (;;) {
// 接受connection
SocketChannel channel = ssc.accept();
// 建立新的session
Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);
// 添加session監(jiān)聽器
session.addSessionListener(listener);
// 開始通訊
session.start();
}
}
}
public class ServerSessionListener implements SessionListener {
public ServerSessionListener() {
}
public void connectionEstablished(Session session) {
System.out.println(session.getSocketAddress() + " connected");
// 設(shè)置空閑時間為60秒
session.getConfig().setIdleTime(60);
// 設(shè)置sum的初始值為0。
session.setAttachment(new Integer(0));
}
public void connectionClosed(Session session) {
System.out.println(session.getSocketAddress() + " closed");
}
// 當(dāng)收到client發(fā)來的消息時,此方法被調(diào)用
public void messageReceived(Session session, Message message) {
System.out.println(session.getSocketAddress() + " RCVD: " + message);
// client端只發(fā)送AddMessage. 其它情況要另作處理
// 在這里只是簡單的進(jìn)行類型轉(zhuǎn)換處理
AddMessage am = (AddMessage) message;
// 將收到的消息里的值加上當(dāng)前sum的值.
int sum = ((Integer) session.getAttachment()).intValue();
int value = am.getValue();
long expectedSum = (long) sum + value;
if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
// 如果溢位返回錯誤消息
ResultMessage rm = new ResultMessage();
rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
rm.setOk(false);
session.write(rm);
} else {
// 加總
sum = (int) expectedSum;
session.setAttachment(new Integer(sum));
// 返回結(jié)果消息
ResultMessage rm = new ResultMessage();
rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
rm.setOk(true);
rm.setValue(sum);
session.write(rm);
}
}
public void messageSent(Session session, Message message) {
System.out.println(session.getSocketAddress() + " SENT: " + message);
}
public void sessionIdle(Session session) {
System.out.println(session.getSocketAddress()
+ " disconnecting the idle");
// 關(guān)閉空閑的會話。
session.close();
}
// 異常發(fā)生時,將調(diào)用此方法
public void exceptionCaught(Session session, Throwable cause) {
System.out.println(Thread.currentThread().getName()
+ session.getSocketAddress() + " exception:");
cause.printStackTrace(System.out);
if (cause instanceof MessageParseException) {
// 印出錯誤信息內(nèi)容,便于調(diào)試
MessageParseException mpe = (MessageParseException) cause;
ByteBuffer buf = mpe.getBuffer();
System.out.println(buf);
System.out.print("Buffer Content: ");
while (buf.remaining() > 0) {
System.out.print(buf.get() & 0xFF);
System.out.print(' ');
}
System.out.println();
}
// 關(guān)閉會話
session.close();
}
}
服務(wù)端運(yùn)行后,其輸出的內(nèi)容示例如下:
listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed
跟服務(wù)端對應(yīng),主要由Client和ClientSessionListener組成。
public class Client {
private static final String HOSTNAME = "localhost";
private static final int PORT = 8080;
private static final int CONNECT_TIMEOUT = 30; // seconds
private static final int DISPATCHER_THREAD_POOL_SIZE = 4;
public static void main(String[] args) throws Throwable {
// 預(yù)備要加總的值。
int[] values = new int[args.length];
for (int i = 0; i < args.length; i++) {
values[i] = Integer.parseInt(args[i]);
}
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 開始缺省數(shù)量的I/O工作線程
ioProcessor.start();
// 啟動指定數(shù)量的event dispatcher線程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE
eventDispatcher.start();
// 準(zhǔn)備 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.CLIENT_MODE);
// 準(zhǔn)備客戶端會話。
Session session = new Session(ioProcessor, new InetSocketAddress(
HOSTNAME, PORT), recognizer, eventDispatcher);
session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);
// 開始會話,并使用ClientSessionListener監(jiān)聽。
ClientSessionListener listener = new ClientSessionListener(values);
session.addSessionListener(listener);
session.start();
// 一直等到加總完成
while ( !listener.isComplete() ) {
Thread.sleep(1000);
}
// 停止 I/O processor 和 event dispatcher
eventDispatcher.stop();
ioProcessor.stop();
}
}
public class ClientSessionListener implements SessionListener {
private final int[] values;
private boolean complete;
public ClientSessionListener(int[] values) {
this.values = values;
}
public boolean isComplete() {
return complete;
}
// 當(dāng)連接建立好后會調(diào)用此方法。
public void connectionEstablished(Session session) {
System.out.println("connected to " + session.getSocketAddress());
// 發(fā)送加總請求。
for (int i = 0; i < values.length; i++) {
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(values[i]);
session.write(m);
}
}
public void connectionClosed(Session session) {
System.out.println("disconnected from " + session.getSocketAddress());
}
// 當(dāng)收到server的回應(yīng)信息時,會調(diào)用此方法
public void messageReceived(Session session, Message message) {
System.out.println("RCVD: " + message);
// 服務(wù)端只發(fā)送ResultMessage. 其它情況下
// 要通過instanceOf來判斷它的類型.
ResultMessage rm = (ResultMessage) message;
if (rm.isOk()) {
// 如果ResultMessage是OK的.
// 根據(jù)ResultMessage的sequence值來判斷如果,
// 一次消息的sequence值,則
if (rm.getSequence() == values.length - 1) {
// 打印出結(jié)果.
System.out.println("The sum: " + rm.getValue());
// 關(guān)閉會話
session.close();
complete = true;
}
} else {
// 如有錯誤,則打印錯誤信息,并結(jié)束會話.
System.out.println("server error, disconnecting...");
session.close();
complete = true;
}
}
public void messageSent(Session session, Message message) {
System.out.println("SENT: " + message);
}
public void sessionIdle(Session session) {
}
public void exceptionCaught(Session session, Throwable cause) {
cause.printStackTrace(System.out);
if (cause instanceof ConnectException) {
// 如果連接server失敗, 則間隔5秒重試連接.
System.out.println("sleeping...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("reconnecting... " + session.getSocketAddress());
session.start();
} else {
session.close();
}
}
}
通過上面的例子,你也許會發(fā)現(xiàn)實(shí)現(xiàn)一個自定義的協(xié)議原來如此簡單。你如果用Netty試著去實(shí)現(xiàn)自己的smtp或pop協(xié)議,我想也不會是一件難事了。
Netty2的首頁在http://gleamynode.net/dev/projects/netty2/index.html
,你可以在這找到本文的全部源碼。