這里首先分析下ServerBootstrap的啟動過程,在netty中,channel可以看成是socketchannel的抽象
channelpipeline里存放著channelhandler,channelpipeline根據不同的channelevent觸發對應的操作
如channel的open,bind,connect等
下面以TelnetServer為例來一步步看server啟動
public static void main(String[] args) throws Exception {
// Configure the server.
// new NioServerSocketChannelFactory中初始化一個NioServerSocketPipelineSink,用來處理downstreamhandler
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}
直接看serverbootstrap的bind方法
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
//該隊列中只放了一個Binder
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
//Binder extends SimpleChannelUpstreamHandler,處理channelOpen
ChannelHandler binder = new Binder(localAddress, futureQueue);
//這里parenthandler為null
ChannelHandler parentHandler = getParentHandler();
//初始化DefaultChannelPipeline
//在綁定端口前的pipeline里只有一個binder的upstreamhandler
ChannelPipeline bossPipeline = pipeline();
//這里調用DefaultChannelPipeline的addlast方法,初始化一個DefaultChannelHandlerContext,
//handlercontext里面是一個鏈表結構
//該context中只有一個binder
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
//一切從這里開始,getFactory()==NioServerSocketChannelFactory
Channel channel = getFactory().newChannel(bossPipeline);

}
NioServerSocketChannelFactory.newChannel(ChannelPipeline pipeline)如下
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
//初始化一個NioServerSocketChannel,pipeline中放的是binder,sink是NioServerSocketPipelineSink
return new NioServerSocketChannel(this, pipeline, sink);
}
來看NioServerSocketChannel的構造函數中我們看到這么一句fireChannelOpen(this);引用自Channles
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
//這里調用DefaultChannelPipeline的sendUpstream方法
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
DefaultChannelPipeline.sendUpstream(ChannelEvent e)
public void sendUpstream(ChannelEvent e) {
//this.head==binder
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
return;
}
sendUpstream(head, e);
}
執行
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
//ctx.getHandler()==binder-->SimpleChannelUpstreamHandler
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
這里會在SimpleChannelUpstreamHandler.handleUpstream(ctx, e);中調用binder的channelOpen
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
//設置NioServerSocketChannel的DefaultServerSocketChannelConfig的pipelinetfactory
//在之后的線程分發中會去取該factory的pipeline,即TelnetServerPipelineFactory中的pipeline
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

} finally {
ctx.sendUpstream(evt);
}
//執行NioServerSocketChannel.bind,最終會調用Channels.bind(Channel channel, SocketAddress localAddress)
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished;
}
Channels.bind方法如下:
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelFuture future = future(channel);
//又調用了DefaultChannelPipeline的senddownstream,對應事件是bound
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.BOUND, localAddress));
return future;
}
DefaultChannelPipeline的senddownstream
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
從getActualDownstreamContext返回的是null,所以上面會執行 getSink().eventSunk(this, e);
DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
//Binder是upstream,這里返回null
while (!realCtx.canHandleDownstream()) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
}
}
return realCtx;
}
sendDownstream將執行 getSink().eventSunk(this, e);
getSink()獲得的是NioServerSocketPipelineSink,
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
//根據new DownstreamChannelStateEvent(channel, future, ChannelState.BOUND, localAddress)
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
//在這里完成socketAddress綁定
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
對應的NioServerSocketPipelineSink.bind方法
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
//觸發channelbound
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id +
" (" + channel + ')')));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
先來看Channels.fireChannelBound方法做了什么
public static void fireChannelBound(Channel channel, SocketAddress localAddress) {
//channel.getPipeline()的DefaultChannelPipeline中只有一個binder
//這里調用SimpleChannelUpstreamHandler的handleUpstream中的hannelBound(ctx, evt);
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.BOUND, localAddress));
}
接著看bind方法
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
//在bossexcutor中創建一個boss線程
//在該boss線程中分派新的客戶端連接給workerExecutor,workerExecutor的數量為cpu*2
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id +
" (" + channel + ')')));
在new Boss的時候,注冊channel的accept事件
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
}
最終調用Boss.run()
public void run() {
//獲得當前boss線程,mainreactor
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
//接收新的客戶端連接
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
//分派當前連接給workerexcutor,即subreactor
registerAcceptedChannel(acceptedSocket, currentThread);
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
//這里獲得用戶的pipleline,那么這個是在哪里設置的呢,在Binder的channelopen方法的第一句
// evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
//在這之前的pipeline都是defalutchannelpipeline,里面只有一個Binder
//在這之后,每一個NioAcceptedSocketChannel的pipeline獲得的是TelnetServerPipelineFactory中的pipeline
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
//nioworker充當subreactor
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} 
}
這里調用NioWorker.register
void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
//初始化新的task,將當前accept的socketchannel綁定到nioworker的selecortkey的attch
Runnable registerTask = new RegisterTask(channel, future, server);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
//啟動一個線程來處理該連接
executor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(this, threadName)));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
//加入到任務隊列
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
來看registertask的run方法
public void run() {
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
if (localAddress == null || remoteAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
if (server) {
channel.socket.configureBlocking(false);
}
synchronized (channel.interestOpsLock) {
//這里注冊當前accepted的socketchannel的read事件
channel.socket.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
channel.setConnected();
future.setSuccess();
}
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
if (!(e instanceof ClosedChannelException)) {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
}
if (!server) {
if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
}
}
其中executor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(this, threadName)));
這里的this指向當前的nioworker,調用nioworker.run
public void run() {
//當前nioworker
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(
)'. (BAD)
// 2) Selector is waken up between 'selector.select(
)' and
// 'if (wakenUp.get()) {
}'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(
)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(
)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(
).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
cancelledKeys = 0;
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
來看processSelectedKeys(selector.selectedKeys());
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
//可讀,處理downstream
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
//可寫,處理upstream
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}
從這個過程來看,在netty中,boss線程用來偵聽socket的連接,然后分派該連接給nioworker,在nioworker中有讀和寫的任務注冊線程池,nioworker線程負責從這些線程中獲取任務進行讀寫操作