用Windows的IOCP、Linux的epoll、FreeBSD的kqueue寫了一個支持高并發(fā)、多CPU、跨平臺的TCP網(wǎng)絡(luò)服務(wù)框架。
測試
下載netfrm.v2.rar,解壓縮得到netfrm.v2目錄,里面有netfrm.v2.vcproj和src目錄。
測試代碼在src/main.cpp。
#include <stdio.h>
#include "./lance/ldebug.h"
#include "./lance/tcpsrv.hpp"
#include "./lance/systm.h"
class MyClient : public lance::net::Client
{
public: void OnConnect()
{
printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
recv(data, 255);
}
public: void OnDisconnect()
{
printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
}
public: void OnRecv(int len)
{
data[len] = 0x00;
printf("OnRecv: fd=%08x, data=%s\n", fd, data);
if (data[0] == 'a')
{
printf("user exit command\n");
close();
}
recv(data, 255);
}
public: char data[256];
};
int main(char * args[])
{
lance::net::TCPSrv<MyClient> srv;
srv.ip = 0;
srv.port = 1234;
srv.ptr = NULL;
srv.backlogs = 10;
srv.threads = 1;
srv.scheds = 0;
srv.start();
while(true)
{
lance::systm::sleep(2000);
}
return 0;
}
|
測試代碼綁定本機(jī)所有IP地址,在1234端口開啟網(wǎng)絡(luò)服務(wù),接收客戶端發(fā)送的字符串,并將這些字符串打印到控制臺上。
Windows平臺
在Windows XP SP2下用vs2003編譯測試通過。
用vs2003打開netfrm.v2.vcproj,然后編譯、運(yùn)行,會彈出控制臺窗口。
在Windows開始菜單->運(yùn)行->cmd,啟動Windows命令窗口,輸入telnet 127.0.0.1 1234,回車連接到測試網(wǎng)絡(luò)服務(wù),如果一切正常,網(wǎng)絡(luò)服務(wù)控制臺窗口將顯示連接信息,可以在Windows命令窗口隨便輸入信息,這時網(wǎng)絡(luò)服務(wù)控制臺窗口將打印輸入的信息。
如下圖所示:

500)this.width=500;" border="0" width="500">
圖1
輸入字符a表示斷開網(wǎng)絡(luò)連接。
Linux平臺
Linux在Red Hat Enterprise Linux 4下測試通過,其他Linux平臺需要Linux 2.6及以上支持epoll的內(nèi)核。
首先轉(zhuǎn)到src目錄:
$ cd src
編譯:
$ make –f Makefile.linux clean all
這時會在當(dāng)前目錄生成tcpsrv.0.1.bin的可執(zhí)行文件,執(zhí)行:
$ ./ tcpsrv.0.1.bin
再打開一個命令行窗口,測試:
$ telnet 127.0.0.1 1234
輸入字符串并回車,剛才執(zhí)行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
輸入a開頭的字符串將斷開連接。
FreeBSD平臺
FreeBSD在FreeBSD 6.2下測試通過,其他BSD平臺需要支持kqueue的內(nèi)核。
首先轉(zhuǎn)到src目錄:
$ cd src
編譯:
$ make –f Makefile.freebsd clean all
這時會在當(dāng)前目錄生成tcpsrv.0.1.bin的可執(zhí)行文件,執(zhí)行:
$ ./ tcpsrv.0.1.bin
再打開一個命令行窗口,測試:
$ telnet 127.0.0.1 1234
輸入字符串并回車,剛才執(zhí)行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
輸入a開頭的字符串將斷開連接。
使用
目錄結(jié)構(gòu):
src
|---lance
|---tcpsrv.hpp 主要接口文件
|---iocptcpsrv.hpp Windows IOCP網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
|---eptcpsrv.hpp Linux epoll網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
|---kqtcpsrv.hpp FreeBSD kqueue網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
在某種平臺下使用時,src/lance/tcpsrv.hpp必須,其他文件根據(jù)平臺而定。
首先,創(chuàng)建一個Client類,這個類必須繼承lance::net::Client,重載事件通知方法。
// Client對象類,當(dāng)連接建立時自動創(chuàng)建,當(dāng)連接斷開時自動銷毀
class MyClient : public lance::net::Client
{
// 連接建立時被調(diào)動
public: void OnConnect()
{
printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
// 通知調(diào)度系統(tǒng)接收數(shù)據(jù)
// 數(shù)據(jù)這時并沒有真正接收,當(dāng)客戶端有數(shù)據(jù)發(fā)送來時
// 調(diào)度器自動接收數(shù)據(jù),然后通過OnRecv通知數(shù)據(jù)接收完成
recv(data, 255);
}
// 連接斷開時被調(diào)用
public: void OnDisconnect()
{
printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
}
// 當(dāng)有數(shù)據(jù)被接收時調(diào)用,接收的實(shí)際數(shù)據(jù)長度為len
public: void OnRecv(int len)
{
data[len] = 0x00;
printf("OnRecv: fd=%08x, data=%s\n", fd, data);
// 斷開連接命令
if (data[0] == 'a')
{
printf("user exit command\n");
// 通知調(diào)度系統(tǒng)斷開連接,當(dāng)調(diào)度系統(tǒng)處理完成后才真正斷開連接
close();
}
// 通知調(diào)度系統(tǒng)接收數(shù)據(jù)
// 數(shù)據(jù)這時并沒有真正接收,當(dāng)客戶端有數(shù)據(jù)發(fā)送來時
// 調(diào)度器自動接收數(shù)據(jù),然后通過OnRecv通知數(shù)據(jù)接收完成
recv(data, 255);
}
// 數(shù)據(jù)緩沖區(qū)
public: char data[256];
};
|
然后創(chuàng)建一個lance::net::TCPSrv<T>的實(shí)例,這個實(shí)例負(fù)責(zé)調(diào)度網(wǎng)絡(luò)服務(wù)。
具體代碼參考src/main.cpp,lance::net::Client的OnConnect、OnRecv、OnDisconnect都由工作線程池處理,所以里面可以進(jìn)行IO操作而不會影響系統(tǒng)響應(yīng)。
int main(char * args[])
{
lance::net::TCPSrv<MyClient> srv;
// 設(shè)置監(jiān)聽套接字綁定的IP
// 0為綁定所有本機(jī)可用IP地址
srv.ip = 0;
// 監(jiān)聽端口
srv.port = 1234;
// 綁定的對象或資源指針
// MyClient里面可以通過srv->ptr獲取這個指針
srv.ptr = NULL;
// 監(jiān)聽套接字連接隊列長度
srv.backlogs = 10;
// 處理線程池線程數(shù)
srv.threads = 1;
// 調(diào)度器線程數(shù),通常是本機(jī)CPU數(shù)的2倍
// 0表示自動選擇
srv.scheds = 0;
// 啟動網(wǎng)絡(luò)服務(wù)
srv.start();
// 循環(huán),保證進(jìn)程不退出
while(true)
{
lance::systm::sleep(2000);
}
return 0;
}
|
Windows平臺的預(yù)編譯宏是LANCE_WIN32。
Linux平臺的預(yù)編譯宏是LANCE_LINUX。
FreeBSD平臺的預(yù)編譯宏是LANCE_FREEBSD。
Windows平臺編譯需要使用WIN32_LEAN_AND_MEAN和_WIN32_WINNT=0x0500預(yù)編譯宏來避免Winsock2和Windows頭文件的沖突,否則會產(chǎn)生大量類型重定義錯誤。
#define EPOLL_MAX_NFDS 10000 // max sockets queried by epoll.
#define EPOLL_MAX_EVENTS 100 // max events queried by epoll.
#define EPOLL_MAX_QUEUE 1024 // max events in cache queue.
|
Linux平臺有額外三個預(yù)編譯宏,參考src/lance/eptcpsrv.hpp:
FreeBSD平臺有額外三個預(yù)編譯宏,參考src/lance/kqtcpsrv.hpp:
#define KQUEUE_MAX_NFDS 10000 // max sockets queried by kqueue.
#define KQUEUE_MAX_EVENTS 100 // max events queried by kqueue.
#define KQUEUE_MAX_QUEUE 1024 // max events in cache queue.
|
Windows IOCP設(shè)計
首先用戶接口部分,由兩個類lance::net:TCPSrv<T>,lance::net::Client。
lance::net::TCPSrv<T>管理監(jiān)聽套接字、事件調(diào)度和事件處理。
lance::net::Client管理連接套接字。
lance::net::TCPSrv<T>由lance::net::Listener<T>、lance::net::Scheduler<T>、lance::net::Processor<T>組成。
他們之間的關(guān)系如下:

500)this.width=500;" border="0" width="500">
圖2
Listener<T>管理監(jiān)聽套接字,有單獨(dú)的線程執(zhí)行,當(dāng)有連接到來時,創(chuàng)建一個Client的對象實(shí)例,然后通過IOCP系統(tǒng)調(diào)用通知調(diào)度器有連接到來,參考src/lance/iocptcpsrv.hpp
template<typename T>
void Scheduler<T>::push(T * clt)
{
::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
}
|
Scheduler<T>實(shí)際并不做很多事情,只是封裝IOCP句柄,Windows的IOCP功能很豐富,包括管理事件隊列和多CPU支持,所以Scheduler只是一個IOCP的映射。
Processor<T>管理線程池,這些線程池是工作線程,他們輪詢Scheduler的IOCP,從中取出系統(tǒng)事件,IOCP里面有三種事件,一種是客戶端連接事件,一種是客戶端數(shù)據(jù)事件,最后一種是連接斷開事件,當(dāng)有事件到來時,會得到Client對象的指針clt,Client的event包含了事件類型,參考src/lance/iocptcpsrv.hpp:
template<typename T>
DWORD WINAPI Processor<T>::run(LPVOID param)
{
Processor<T>& procor = *(Processor<T> *)param;
Scheduler<T>& scheder = *procor.scheder;
HANDLE iocp = scheder.iocp;
DWORD ready;
ULONG_PTR key;
WSAOVERLAPPED * overlap;
while (true)
{
::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);
T * clt = (T *)key;
switch(clt->event)
{
case T::EV_RECV:
{
if (0 >= ready)
{
clt->event = T::EV_DISCONNECT;
::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
}
else
{
clt->OnRecv(ready);
}
}
break;
case T::EV_CONNECT:
{
if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
{
::closesocket(clt->fd);
delete clt;
}
else
{
clt->OnConnect();
}
}
break;
case T::EV_DISCONNECT:
{
clt->OnDisconnect();
::closesocket(clt->fd);
delete clt;
}
break;
case T::EV_SEND:
break;
}
}
return 0;
}
|
所以Client::OnConnect、Client::OnRecv、Client::OnDisconnect都在工作線程中進(jìn)行,這些處理過程中都可以有IO等耗時操作,一個連接的阻塞不會影響其他連接的響應(yīng)速度。
Client的其他方法Client::recv、Client::send和Client::close。
Client::recv是一個異步接收數(shù)據(jù)的方法,這個方面只是告訴IOCP想要接收客戶端的數(shù)據(jù),然后立即返回,由IOCP去負(fù)責(zé)接收數(shù)據(jù),有數(shù)據(jù)收到時,Processor<T>的工作線程會收到Client::EV_RECV的消息,Processor<T>會調(diào)用Client::OnRecv進(jìn)行通知。
Client::send是發(fā)送消息的函數(shù),這個函數(shù)是阻塞調(diào)用,等待消息發(fā)送成功后才返回。
Client::close是主動斷開客戶端連接的方法,這個方法不會直接調(diào)用closesocket(fd),而是調(diào)用shutdown(fd),shutdown(fd)會向Scheduler<T>觸發(fā)一個Client::EV_DISCONNECT的事件,然后Processor<T>調(diào)用Client::OnDisconnect通知連接斷開,執(zhí)行完Client::OnDisconnect后,由Processor<T>調(diào)用closesocket(fd)真正斷開連接,這樣設(shè)計一方面滿足任何情況下OnDisconnect都被調(diào)用,另一方面因?yàn)椴僮飨到y(tǒng)會重用已經(jīng)關(guān)閉的套接字fd,所以只有當(dāng)OnDisconnect執(zhí)行完畢后才真正調(diào)用closesocket讓操作系統(tǒng)回收fd,可以避免使用無效的套接字或者挪用其他連接的套接字。
Linux epoll和FreeBSD kqueue設(shè)計
Linux epoll和FreeBSD kqueue的機(jī)制幾乎一樣,只有函數(shù)名字和個數(shù)不一樣,所以一起分析,并且簡寫為Linux。
因?yàn)?font face="Times New Roman">Linux不像Windows一樣會管理事件隊列和多CPU支持,所以Linux需要額外實(shí)現(xiàn)事件隊列和多CPU支持。
Linux下用戶接口跟Windows一樣,有lance::net::TCPSrv<T>和lance::net::Client,因?yàn)榭缙脚_,所以他們提供的接口功能和意義也一樣,參考Windows一節(jié)。
lance::net::TCPSrv<T>管理連接套接字、事件隊列、多CPU支持、事件調(diào)度和事件處理。
lance::net::TCPSrv<T>由Listener<T>、Scheduler<T>、Processor<T>、Queue<T>組成。
他們之間關(guān)系圖如下:

500)this.width=500;" border="0" width="500">
圖3
Listener<T>管理監(jiān)聽套接字,有連接到來時創(chuàng)建一個Client的實(shí)例clt,初始化Client::event為Client::EV_CONNECT,然后將clt放入調(diào)度器,調(diào)度器為clt選擇一個合適的epoll/kqueue進(jìn)行綁定,然后將clt放入事件隊列Queue<T>等待被Processor<T>執(zhí)行。
Scheduler<T>管理epoll/kqueue,為了支持多CPU,一個Scheduler<T>可能管理多個epoll/kqueue,通過lance::net::TCPSrv::scheds進(jìn)行設(shè)置,當(dāng)lance::net::TCPSrv::scheds大于1時,Scheduler<T>將創(chuàng)建scheds個線程,每個線程管理一個epoll/kqueue。當(dāng)Listener<T>提交一個新的clt時,Scheduler<T>順序選擇一個epoll/kqueue進(jìn)行綁定,這是最簡單的均等選擇算法,epoll/kqueue會檢查綁定的clt的數(shù)據(jù)接收和連接斷開事件,如果有事件,會把產(chǎn)生這個事件的clt放入事件隊列Queue<T>等待被Processor<T>執(zhí)行,并且設(shè)置clt的套接字為休眠狀態(tài),因?yàn)?font face="Times New Roman">epoll/kqueue為狀態(tài)觸發(fā),如果事件在被Processor<T>處理前不休眠,會再次被觸發(fā),這樣Queue<T>將被迅速填滿。
多CPU時,依靠多個epoll/kqueue能有效利用這些CPU。
參考eptcpsrv.hpp:
template<typename T>
void Scheduler<T>::push(T * clt)
{
clt->epfd = epers[epoff].epfd;
epoff = (epoff+1 == scheds)?0:(epoff+1);
queue.in();
while (queue.full())
{
queue.fullWait();
}
if (queue.empty())
{
queue.emptyNotify();
}
queue.push(clt);
queue.out();
}
|
Queue<T>是有限緩沖隊列,有隊列最大長度EPOLL_MAX_QUEUE/KQUEUE_MAX_QUEUE,有限緩沖隊列結(jié)構(gòu)如下:
500)this.width=500;" border="0">
圖4
Queue<T>采用monitor模式,使用pthread_mutex_t lock保護(hù)臨界區(qū),使用pthread_cond_t emptySignal做隊列由空到不空的通知,也就是喚醒消費(fèi)者可以處理隊列,使用pthread_cond_t fullSignal做隊列由滿到不滿的通知,也就是喚醒生產(chǎn)者可以填充隊列,這里Scheduler<T>是生產(chǎn)者,Processor<T>是消費(fèi)者。
有時epoll/kqueue會一次產(chǎn)生多個事件,如果先前隊列為空,那么需要通知Processor<T>可以處理事件,因?yàn)?font face="Times New Roman">emptySignal.notify只能一次喚醒一個線程,為了更加高效的處理事件,應(yīng)該使用emptySignal.broadcast喚醒所有工作線程。
如果epoll/kqueue一次只產(chǎn)生了一個事件,并且先前隊列為空,那么只需要使用emptySignal.notify喚醒一個工作線程而不應(yīng)該使用emptySignal.broadcast喚醒工作線程,因?yàn)橹挥幸粋€事件,所以只有一個線程會處理事件,而其他線程會空轉(zhuǎn)一次消耗資源。
如果epoll/kqueue產(chǎn)生了事件,但是隊列不為空,那么不需要喚醒工作線程的操作,因?yàn)殛犃胁粸榭盏臅r候,沒有任何工作線程處于等待狀態(tài)。
代碼參考eptcpsrv.hpp/Queue<T>。
Processor<T>跟Windows基本一樣,Processor<T>從Queue<T>取出事件,然后根據(jù)clt->event事件類型調(diào)用響應(yīng)的事件通知函數(shù)。
Client::recv也是一個請求接收數(shù)據(jù)的過程,并不實(shí)際接收數(shù)據(jù),當(dāng)有數(shù)據(jù)到來時,Processor<T>的工作線程負(fù)責(zé)接收數(shù)據(jù),然后調(diào)用Client::OnRecv通知響應(yīng)的連接對象。
Cleint::send是一個同步阻塞函數(shù),等待數(shù)據(jù)真正發(fā)送完成后再返回。
Client::close跟Windows類似,只是調(diào)用shutdown來觸發(fā)斷開消息,然后處理流程跟Windows一致。
轉(zhuǎn)自:http://blog.chinaunix.net/u1/52224/showart_425449.html