Netty項目中,自帶了很多使用的例子,對于剛剛開始接觸和學習Netty源碼的開發者來說,可以通過例子來更好的理解Netty的具體實現。源碼可以再netty 4.0的example找到。
1 public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {
2 private static final Logger logger = Logger.getLogger(
3 EchoServerHandler.class.getName());
4
5 @Override
6 public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
7 ByteBuf out = ctx.nextOutboundByteBuffer();
8 out.discardReadBytes();
9 out.writeBytes(in);
10 ctx.flush();
11 }
12
13 @Override
14 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
15 // Close the connection when an exception is raised.
16 logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
17 ctx.close();
18 }
19 }
Line 1: 聲明一個EchoServerHandler, 并且繼承了ChannelInboundByteHandlerAdapter。 這樣EchoServerHandler就可以處理client發送過來的request。
Line 6: 重寫inboundBufferUpdated方法,對client發送過來的request進行對ByteBuffer對象的操作。關于ByteBuffer的概念將在以后章節討論。
Line 7: ctx.nextOutboundByteBuffer()將返回一個ByteBuffer對象。如果該對象不存在,那么拋出UnsupportedOperationException異常。
Line 14: 重寫exceptionCaught在server端捕獲異常。
1 public class EchoServer {
2
3 private final int port;
4
5 public EchoServer(int port) {
6 this.port = port;
7 }
8
9 public void run() throws Exception {
10 // Configure the server.
11 ServerBootstrap b = new ServerBootstrap();
12 try {
13 b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
14 .channel(NioServerSocketChannel.class)
15 .option(ChannelOption.SO_BACKLOG, 100)
16 .localAddress(new InetSocketAddress(port))
17 .childOption(ChannelOption.TCP_NODELAY, true)
18 .handler(new LoggingHandler(LogLevel.INFO))
19 .childHandler(new ChannelInitializer<SocketChannel>() {
20 @Override
21 public void initChannel(SocketChannel ch) throws Exception {
22 ch.pipeline().addLast(
23 new LoggingHandler(LogLevel.INFO),
24 new EchoServerHandler());
25 }
26 });
27
28 // Start the server.
29 ChannelFuture f = b.bind().sync();
30
31 // Wait until the server socket is closed.
32 f.channel().closeFuture().sync();
33 } finally {
34 // Shut down all event loops to terminate all threads.
35 b.shutdown();
36 }
37 }
38
39 public static void main(String[] args) throws Exception {
40 int port;
41 if (args.length > 0) {
42 port = Integer.parseInt(args[0]);
43 } else {
44 port = 8080;
45 }
46 new EchoServer(port).run();
47 }
48 }
Line 11: 通過ServerBootStrap對象,來啟動服務器
Line 13: 通過group方法,來綁定EventLoopGroup,EventLoopGroup用來處理SocketChannel和Channel上面的所有時間和IO。
Line 16: localAddress方法用于綁定服務器地址和端口。
Line 18,19: handler方法和childhandler方法用于指定各種ChannelHandler對象,指定的ChannelHandler對象將用于處理client端來的request。
Line 21: 初始化一個Channel對象,并且綁定之前定義的EchoServerHandler類。
Line 22: 將EchoServerHandler添加到Pipeline中。
Line 29: ServerBootstrap對象準備就緒,啟動server,
Line 32: 等待直到server socket關閉
public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(firstMessage);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
EchoClientHandler類的實現與EchoServerHandler類似,也都繼承了ChannelInboundByteHandlerAdapter。不同在于重寫了channelActive()方法。
1 public class EchoClient {
2
3 private final String host;
4 private final int port;
5 private final int firstMessageSize;
6
7 public EchoClient(String host, int port, int firstMessageSize) {
8 this.host = host;
9 this.port = port;
10 this.firstMessageSize = firstMessageSize;
11 }
12
13 public void run() throws Exception {
14 // Configure the client.
15 Bootstrap b = new Bootstrap();
16 try {
17 b.group(new NioEventLoopGroup())
18 .channel(NioSocketChannel.class)
19 .option(ChannelOption.TCP_NODELAY, true)
20 .remoteAddress(new InetSocketAddress(host, port))
21 .handler(new ChannelInitializer<SocketChannel>() {
22 @Override
23 public void initChannel(SocketChannel ch) throws Exception {
24 ch.pipeline().addLast(
25 new LoggingHandler(LogLevel.INFO),
26 new EchoClientHandler(firstMessageSize));
27 }
28 });
29
30 // Start the client.
31 ChannelFuture f = b.connect().sync();
32
33 // Wait until the connection is closed.
34 f.channel().closeFuture().sync();
35 } finally {
36 // Shut down the event loop to terminate all threads.
37 b.shutdown();
38 }
39 }
40
41 public static void main(String[] args) throws Exception {
42 // Print usage if no argument is specified.
43 if (args.length < 2 || args.length > 3) {
44 System.err.println(
45 "Usage: " + EchoClient.class.getSimpleName() +
46 " <host> <port> [<first message size>]");
47 return;
48 }
49
50 // Parse options.
51 final String host = args[0];
52 final int port = Integer.parseInt(args[1]);
53 final int firstMessageSize;
54 if (args.length == 3) {
55 firstMessageSize = Integer.parseInt(args[2]);
56 } else {
57 firstMessageSize = 256;
58 }
59
60 new EchoClient(host, port, firstMessageSize).run();
61 }
62 }
Line 20: 指定遠程服務器的地址和端口(localhost:8080)
備注:因為筆者開始寫Netty源碼分析的時候,Netty 4.0還是處于Alpha階段,之后的API可能還會有改動,筆者將會及時更改。使用開源已經有好幾年的時間了,一直沒有時間和精力來具體研究某個開源項目的具體實現,這次是第一次寫開源項目的源碼分析,如果文中有錯誤的地方,歡迎讀者可以留言指出。對于轉載的讀者,請注明文章的出處。
希望和廣大的開發者/開源愛好者進行交流,歡迎大家的留言和討論。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;