yanf4j發布一個
0.50-beta2版本,這個版本最重要的改進就是引入了
客戶端連接非阻塞API,主要最近的工作要用到,所以添加了。兩個核心類
TCPConnectorController和
UDPConnectorController分別用于TCP和UDP的客戶端連接控制。例如,現在的UDP echo client可以寫成:
//客戶端echo handler
class EchoClientHandler extends HandlerAdapter {
public void onReceive(Session udpSession, Object t) {
DatagramPacket datagramPacket = (DatagramPacket) t;
System.out.println("recv:" + new String(datagramPacket.getData()));
}
@Override
public void onMessageSent(Session session, Object t) {
System.out.println("send:" + new String((byte[]) t));
}
}
//連接代碼,并發送UDP包
UDPConnectorController connector = new UDPConnectorController();
connector.setSoTimeout(1000);
connector.setHandler(new EchoClientHandler());
connector.connect(new InetSocketAddress(InetAddress.getByName(host),
port));
for (int i = 0; i < 10000; i++) {
String s = "hello " + i;
DatagramPacket packet = new DatagramPacket(s.getBytes(), s.length());
connector.send(packet);
}
UDP不是面向連接的,因此connect方法僅僅是調用了底層DatagramChannel.connect方法,用來限制接收和發送的packet的遠程端點。
再來看看TCPConnectorController的使用,同樣看Echo Client的實現:
//客戶端的echo handler
class EchoHandler extends HandlerAdapter<String> {
@Override
public void onConnected(Session session) {
try {
//一連接就發送NUM個字符串
for (int i = 0; i < NUM; i++)
session.send(generateString(i));
} catch (Exception e) {
}
}
public String generateString(int len) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < MESSAGE_LEN; i++)
sb.append(i);
return sb.toString();
}
@Override
public void onReceive(Session session, String t) {
//打印接收到字符串
if (DEBUG)
System.out.println("recv:" + t);
}
}
//...連接API,TCPConnectorController示例
Configuration configuration = new Configuration();
configuration.setTcpSessionReadBufferSize(256 * 1024); // 設置讀的緩沖區大小
TCPConnectorController connector = new TCPConnectorController(configuration,
new StringCodecFactory());
connector.setHandler(new EchoHandler());
connector.setCodecFactory(new StringCodecFactory());
try {
connector.Connect(new InetSocketAddress("localhost", 8080));
} catch (IOExceptione) {
e.printStackTrace();
}
注意,connect方法
并不阻塞,而是立即返回,連接是否建立可以通過
TCPConnectorController.isConnected()方法來判斷,因此通常你可能會這樣使用:
try {
connector.Connect(new InetSocketAddress("localhost", 8080));
while(!connector.isConnected())
;
} catch (Exception e) {
e.printStackTrace();
}
來強制確保后面對connector的使用是已經連接上的connector,然而更好的做法是在Handler的onConnected()回調方法中處理邏輯,因為這個方法僅僅在連接建立后才會被調用。
兩個ConnectorController都有系列send方法,用于發送數據:
TCPConnectorController.send(Object msg) throws InterruptedException
UDPConnectorController.send(DatagramPacket packet) throws InterruptedException
UDPConnectorController.send(SocketAddress targetAddr, Object msg)throws InterruptedException
0.50-beta2帶來的另一個修改就是Session接口添加
setReadBufferByteOrder方法,用于設置session接收緩沖區的字節序,默認是網絡字節序,也就是大端法。這個方法建議在Handler的onSessionStarted回調方法中調用。
在0.50-beta最重要的修改是引入了
session發送隊列緩沖區的流量控制選項。默認情況下,session的發送緩沖隊列是無界的,隊列的push和pop也全然不會阻塞。在設置了緩沖隊列的高低水位選項后即引入了發送流量控制,規則如下:
a)當發送隊列中的數據總量大于高水位標記(highWaterMark),Session.send將阻塞
b)在條件a的作用下,Session.send的阻塞將持續到發送隊列中的數據總量小于于低水位標記(lowWaterMark)才解除。
緩沖隊列高低水位的設置通過Controller的下列方法設置:
public void setSessionWriteQueueHighWaterMark(int highWaterMark);
public void setSessionWriteQueueLowWaterMark(int lowWaterMark);
緩沖隊列的流量控制想法來自ACE的ACE_Message_Queue,是通過com.google.code.yanf4j.util.MessageQueue類實現的。
0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send(Object msg,long timeout),在超過timeout時間后send仍然阻塞時即終止send。注意,現在Session.send的這兩個方法都返回一個bool值來表示send成功與否,并且都將響應中斷(僅限啟動了流量控制選項)拋出InterruptedException。