在看完了server端的啟動,再來看client端的啟動過程是怎么進行的。例子是TelentServer對應的TelentClient
public class TelnetClient {
public static void main(String[] args) throws Exception {

ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}

}
}
直接看connect方法
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.
//NioClientSocketChannel構造函數中會觸發channelopen
//TelnetClientPipelineFactory中的upstreamhandler沒有重寫channelOpen,這里只是一直往下傳遞該事件
Channel ch = getFactory().newChannel(pipeline);
ch.getConfig().setOptions(getOptions());
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
然后執行ch.connect(remoteAddress);
這里是NioClientSocketChannel-->NioSocketChannel-->AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress) {
return Channels.connect(this, remoteAddress);
}
public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelFuture future = future(channel, true);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
}
從TelnetClientPipelineFactory的pipeline中由下往上傳遞CONNECTED事件,這里只有一個StringEncoder-->OneToOneEncoder,其
handleDownstream方法對該事件不做處理,往上傳遞該事件,執行DefaultChannelHandlerContext.sendDownstream
public void sendDownstream(ChannelEvent e) {
//在StringEncoder之前再沒有downstreamhandler
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
執行NioClientSocketPipelineSink.eventSunk,其中會執行
private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
//如果返回true,調用nioworker.register,開始啟動nioworker線程處理該channel的讀寫
//否則,交給boss.register方法,在boss線程中完成連接
if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
//為當前clientsocketchannel添加closed的listener
channel.getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if (!cf.isDone()) {
cf.setFailure(new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
boss.register(channel);
}
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}
執行boss.register,在boss線程中確保該channel連接成功,這里會啟動boss線程
void register(NioClientSocketChannel channel) {
//在RegisterTask的run方法里注冊SelectionKey.OP_CONNECT
Runnable registerTask = new RegisterTask(this, channel);

boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
最后啟動boss.run,其中processSelectedKeys里執行connect
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
k.cancel();
//連接成功,才在nioworker中啟動一個新線程來處理該socketchannel的讀寫
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
ch.worker.close(ch, succeededFuture(ch));
}
}
之后就是交給nioworker線程來進行數據的發送和接收了。