Posted on 2010-08-18 18:08
dennis 閱讀(4888)
評論(0) 編輯 收藏 所屬分類:
java
這個問題的由來是有一個朋友報告xmemcached在高并發(fā)下會頻繁斷開重連,導致cache不可用,具體請看這個
issue。
我一開始以為是他使用方式上有問題,溝通了半天還是搞不明白。后來聽聞他說他的代碼會中斷運行時間過長的任務,這些任務內(nèi)部調(diào)用了xmemcached跟memcached交互。我才開始懷疑是不是因為中斷引起了連接的關閉。
我們都知道,nio的socket channel都是實現(xiàn)了
java.nio.channels.InterruptibleChannel接口,看看這個接口描述:
A channel that can be asynchronously closed and interrupted.
A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel's close method. This will cause the blocked thread to receive an AsynchronousCloseException.
A channel that implements this interface is also
interruptible: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the blocked thread's interrupt method.
This will cause the channel to be closed, the blocked thread to receive a
ClosedByInterruptException, and the blocked thread's interrupt status to be set.
If a thread's interrupt status is already set and it invokes a blocking I/O operation upon a channel then the channel will be closed and the thread will immediately receive a ClosedByInterruptException; its interrupt status will remain set.
意思是說實現(xiàn)了這個接口的channel,首先可以被異步關閉,阻塞的線程拋出AsynchronousCloseException,其次阻塞在該 channel上的線程如果被中斷,會引起channel關閉并拋出ClosedByInterruptException的異常。如果在調(diào)用 channel的IO方法之前,線程已經(jīng)設置了中斷狀態(tài),同樣會引起channel關閉和拋出ClosedByInterruptException。
回到xmemcached的問題,為什么中斷會引起xmemcached關閉連接?難道xmemcached會在用戶線程調(diào)用channel的IO operations。答案是肯定的,xmemcached的網(wǎng)絡層實現(xiàn)了一個小優(yōu)化,當連接里的緩沖隊列為空的情況下,會直接調(diào)用 channel.write嘗試發(fā)送數(shù)據(jù);如果隊列不為空,則放入緩沖隊列,等待Reactor去執(zhí)行實際的發(fā)送工作,這個優(yōu)化是為了最大化地提高發(fā)送效率。這會導致在用戶線程中調(diào)用channel.write(緩沖的消息隊列為空的時候),如果這時候用戶線程中斷,就會導致連接斷開,這就是那位朋友反饋的問題的根源。
Netty3的早期版本也有同樣的優(yōu)化,但是在之后的版本,這個優(yōu)化被另一個方案替代,寫入消息的時候無論如何都會放入緩沖隊列,但是Netty會判斷當前寫入的線程是不是NioWorker, 如果是的話,就直接flush整個發(fā)送隊列做IO寫入,如果不是,則加入發(fā)送緩沖區(qū)等待NioWorker線程去發(fā)送。這個是在NioWorker的 writeFromUserCode方法里實現(xiàn)的:
void writeFromUserCode(final NioSocketChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
// 這里我們可以確認 Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
我估計netty的作者后來也意識到了在用戶線程調(diào)用channel的IO操作的危險性。xmemcached這個問題的解決思路也應該跟Netty差不多。但是從我的角度,我希望交給用戶去選擇,如果你確認你的用戶線程沒有調(diào)用中斷,那么允許在用戶線程去write可以達到更高的發(fā)送效率,更短的響應時間;如果你的用戶線程有調(diào)用中斷,那么最好有個選項去設置,發(fā)送的消息都將加入緩沖隊列讓Reactor去寫入,有更好的吞吐量,同時避免用戶線程涉及到 IO操作。