Linux內核提供了3個關鍵函數供用戶來操作epoll,分別是:
- epoll_create(), 創(chuàng)建eventpoll對象
- epoll_ctl(), 操作eventpoll對象
- epoll_wait(), 從eventpoll對象中返回活躍的事件
而操作系統(tǒng)內部會用到一個名叫epoll_event_callback()的回調函數來調度epoll對象中的事件,這個函數非常重要,故本文將會對上述4個函數進行源碼分析。
源碼來源
由于epoll的實現內嵌在內核中,直接查看內核源碼的話會有一些無關代碼影響閱讀。為此在GitHub上寫的簡化版TCP/IP協(xié)議棧,里面實現了epoll邏輯。鏈接為:
https://github.com/wangbojing/NtyTcp
存放著以上4個關鍵函數的文件是[srcnty_epoll_rb.c],本文接下來通過分析該程序的代碼來探索epoll能支持高并發(fā)連接的秘密。
兩個核心數據結構
(1)epitem
如圖所示,epitem是中包含了兩個主要的成員變量,分別是rbn和rdlink,前者是紅黑樹的節(jié)點,而后者是雙鏈表的節(jié)點,也就是說一個epitem對象即可作為紅黑樹中的一個節(jié)點又可作為雙鏈表中的一個節(jié)點。并且每個epitem中存放著一個event,對event的查詢也就轉換成了對epitem的查詢。
struct epitem {
RB_ENTRY(epitem) rbn;
/* RB_ENTRY(epitem) rbn等價于
struct {
struct type *rbe_left; //指向左子樹
struct type *rbe_right; //指向右子樹
struct type *rbe_parent; //指向父節(jié)點
int rbe_color; //該節(jié)點的顏色
} rbn
*/
LIST_ENTRY(epitem) rdlink;
/* LIST_ENTRY(epitem) rdlink等價于
struct {
struct type *le_next; //指向下個元素
struct type **le_prev; //前一個元素的地址
}*/
int rdy; //判斷該節(jié)點是否同時存在與紅黑樹和雙向鏈表中
int sockfd; //socket句柄
struct epoll_event event; //存放用戶填充的事件
};
(2)eventpoll
如圖所示,eventpoll中包含了兩個主要的成員變量,分別是rbr和rdlist,前者指向紅黑樹的根節(jié)點,后者指向雙鏈表的頭結點。即一個eventpoll對象對應二個epitem的容器。對epitem的檢索,將發(fā)生在這兩個容器上(紅黑樹和雙鏈表)。
struct eventpoll {
/*
struct ep_rb_tree {
struct epitem *rbh_root;
}
*/
ep_rb_tree rbr; //rbr指向紅黑樹的根節(jié)點
int rbcnt; //紅黑樹中節(jié)點的數量(也就是添加了多少個TCP連接事件)
LIST_HEAD( ,epitem) rdlist; //rdlist指向雙向鏈表的頭節(jié)點;
/* 這個LIST_HEAD等價于
struct {
struct epitem *lh_first;
}rdlist;
*/
int rdnum; //雙向鏈表中節(jié)點的數量(也就是有多少個TCP連接來事件了)
// ...略...
};
四個關鍵函數
(1) epoll_create()
//創(chuàng)建epoll對象,包含一顆空紅黑樹和一個空雙向鏈表
int epoll_create(int size) {
//與很多內核版本一樣,size參數沒有作用,只要保證大于0即可
if (size <= 0) return -1;
nty_tcp_manager *tcp = nty_get_tcp_manager(); //獲取tcp對象
if (!tcp) return -1;
struct _nty_socket *epsocket = nty_socket_allocate(NTY_TCP_SOCK_EPOLL);
if (epsocket == NULL) {
nty_trace_epoll("malloc failedn");
return -1;
}
// 1° 開辟了一塊內存用于填充eventpoll對象
struct eventpoll *ep = (struct eventpoll*)calloc(1, sizeof(struct eventpoll));
if (!ep) {
nty_free_socket(epsocket- >id, 0);
return -1;
}
ep- >rbcnt = 0;
// 2° 讓紅黑樹根指向空
RB_INIT(&ep- >rbr); //等價于ep- >rbr.rbh_root = NULL;
// 3° 讓雙向鏈表的頭指向空
LIST_INIT(&ep- >rdlist); //等價于ep- >rdlist.lh_first = NULL;
// 4° 并發(fā)環(huán)境下進行互斥
// ...該部分代碼與主線邏輯無關,可自行查看...
//5° 保存epoll對象
tcp- >ep = (void*)ep;
epsocket- >ep = (void*)ep;
return epsocket- >id;
}
對以上代碼的邏輯進行梳理,可以總結為以下6步:
- 創(chuàng)建eventpoll對象
- 讓eventpoll中的rbr指向空
- 讓eventpoll中的rdlist指向空
- 在并發(fā)環(huán)境下進行互斥
- 保存eventpoll對象
- 返回eventpoll對象的句柄(id)
(2)epoll_ctl()
該函數的邏輯其實很簡單,無非就是將用戶傳入的參數封裝為一個epitem對象,然后根據傳入的op是①EPOLL_CTL_ADD、②EPOLL_CTL_MOD還是③EPOLL_CTL_DEL,來決定是①將epitem對象插入紅黑樹中,②更新紅黑樹中的epitem對象,還是③移除紅黑樹中的epitem對象。
//往紅黑樹中加每個tcp連接以及相關的事件
int epoll_ctl(int epid, int op, int sockid, struct epoll_event *event) {
nty_tcp_manager *tcp = nty_get_tcp_manager();
if (!tcp) return -1;
nty_trace_epoll(" epoll_ctl -- > 1111111:%d, sockid:%dn", epid, sockid);
struct _nty_socket *epsocket = tcp- >fdtable- >sockfds[epid];
if (epsocket- >socktype == NTY_TCP_SOCK_UNUSED) {
errno = -EBADF;
return -1;
}
if (epsocket- >socktype != NTY_TCP_SOCK_EPOLL) {
errno = -EINVAL;
return -1;
}
nty_trace_epoll(" epoll_ctl -- > eventpolln");
struct eventpoll *ep = (struct eventpoll*)epsocket- >ep;
if (!ep || (!event && op != EPOLL_CTL_DEL)) {
errno = -EINVAL;
return -1;
}
if (op == EPOLL_CTL_ADD) {
//添加sockfd上關聯(lián)的事件
pthread_mutex_lock(&ep- >mtx);
struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep- >rbr, &tmp); //先在紅黑樹上找,根據key來找,也就是這個sockid,找的速度會非常快
if (epi) {
//原來有這個節(jié)點,不能再次插入
nty_trace_epoll("rbtree is existn");
pthread_mutex_unlock(&ep- >mtx);
return -1;
}
//只有紅黑樹上沒有該節(jié)點【沒有用過EPOLL_CTL_ADD的tcp連接才能走到這里】;
//(1)生成了一個epitem對象,這個結構對象,其實就是紅黑的一個節(jié)點;
epi = (struct epitem*)calloc(1, sizeof(struct epitem));
if (!epi) {
pthread_mutex_unlock(&ep- >mtx);
errno = -ENOMEM;
return -1;
}
//(2)把socket(TCP連接)保存到節(jié)點中;
epi- >sockfd = sockid; //作為紅黑樹節(jié)點的key,保存在紅黑樹中
//(3)我們要增加的事件也保存到節(jié)點中;
memcpy(&epi- >event, event, sizeof(struct epoll_event));
//(4)把這個節(jié)點插入到紅黑樹中去
epi = RB_INSERT(_epoll_rb_socket, &ep- >rbr, epi); //實際上這個時候epi的rbn成員就會發(fā)揮作用,如果這個紅黑樹中有多個節(jié)點,那么RB_INSERT就會epi- >rbi相應的值:可以參考圖來理解
assert(epi == NULL);
ep- >rbcnt ++;
pthread_mutex_unlock(&ep- >mtx);
} else if (op == EPOLL_CTL_DEL) {
pthread_mutex_lock(&ep- >mtx);
struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep- >rbr, &tmp);//先在紅黑樹上找,根據key來找,也就是這個sockid,找的速度會非???/span>
if (!epi) {
nty_trace_epoll("rbtree no existn");
pthread_mutex_unlock(&ep- >mtx);
return -1;
}
//只有在紅黑樹上找到該節(jié)點【用過EPOLL_CTL_ADD的tcp連接才能走到這里】;
//從紅黑樹上把這個節(jié)點移除
epi = RB_REMOVE(_epoll_rb_socket, &ep- >rbr, epi);
if (!epi) {
nty_trace_epoll("rbtree is no existn");
pthread_mutex_unlock(&ep- >mtx);
return -1;
}
ep- >rbcnt --;
free(epi);
pthread_mutex_unlock(&ep- >mtx);
} else if (op == EPOLL_CTL_MOD) {
struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep- >rbr, &tmp); //先在紅黑樹上找,根據key來找,也就是這個sockid,找的速度會非???/span>
if (epi) {
//紅黑樹上有該節(jié)點,則修改對應的事件
epi- >event.events = event- >events;
epi- >event.events |= EPOLLERR | EPOLLHUP;
} else {
errno = -ENOENT;
return -1;
}
} else {
nty_trace_epoll("op is no existn");
assert(0);
}
return 0;
}
(3)epoll_wait()
//到雙向鏈表中去取相關的事件通知
int epoll_wait(int epid, struct epoll_event *events, int maxevents, int timeout) {
nty_tcp_manager *tcp = nty_get_tcp_manager();
if (!tcp) return -1;
struct _nty_socket *epsocket = tcp- >fdtable- >sockfds[epid];
struct eventpoll *ep = (struct eventpoll*)epsocket- >ep;
// ...此處主要是一些負責驗證性工作的代碼...
//(1)當eventpoll對象的雙向鏈表為空時,程序會在這個while中等待一定時間,
//直到有事件被觸發(fā),操作系統(tǒng)將epitem插入到雙向鏈表上使得rdnum >0時,程序才會跳出while循環(huán)
while (ep- >rdnum == 0 && timeout != 0) {
// ...此處主要是一些與等待時間相關的代碼...
}
pthread_spin_lock(&ep- >lock);
int cnt = 0;
//(1)取得事件的數量
//ep- >rdnum:代表雙向鏈表里邊的節(jié)點數量(也就是有多少個TCP連接來事件了)
//maxevents:此次調用最多可以收集到maxevents個已經就緒【已經準備好】的讀寫事件
int num = (ep- >rdnum > maxevents ? maxevents : ep- >rdnum); //哪個數量少,就取得少的數字作為要取的事件數量
int i = 0;
while (num != 0 && !LIST_EMPTY(&ep- >rdlist)) { //EPOLLET
//(2)每次都從雙向鏈表頭取得 一個一個的節(jié)點
struct epitem *epi = LIST_FIRST(&ep- >rdlist);
//(3)把這個節(jié)點從雙向鏈表中刪除【但這并不影響這個節(jié)點依舊在紅黑樹中】
LIST_REMOVE(epi, rdlink);
//(4)這是個標記,標記這個節(jié)點【這個節(jié)點本身是已經在紅黑樹中】已經不在雙向鏈表中;
epi- >rdy = 0; //當這個節(jié)點被操作系統(tǒng) 加入到 雙向鏈表中時,這個標記會設置為1。
//(5)把事件標記信息拷貝出來;拷貝到提供的events參數中
memcpy(&events[i++], &epi- >event, sizeof(struct epoll_event));
num --;
cnt ++; //拷貝 出來的 雙向鏈表 中節(jié)點數目累加
ep- >rdnum --; //雙向鏈表里邊的節(jié)點數量減1
}
pthread_spin_unlock(&ep- >lock);
//(5)返回 實際 發(fā)生事件的 tcp連接的數目;
return cnt;
}
該函數的邏輯也十分簡單,就是讓先看一下eventpoll對象的雙鏈表中是否有節(jié)點。如果有節(jié)點的話則取出節(jié)點中的事件填充到用戶傳入的指針所指向的內存中。如果沒有節(jié)點的話,則在while循環(huán)中等待一定時間,直到有事件被觸發(fā)后操作系統(tǒng)會將epitem插入到雙向鏈表上使得rdnum>0時(這個過程是由操作系統(tǒng)調用epoll_event_callback函數完成的),程序才會跳出while循環(huán),去雙向鏈表中取數據。
(4)epoll_event_callback()
通過跟蹤epoll_event_callback在內核中被調用的位置??芍?,當服務器在以下5種情況會調用epoll_event_callback:
- 客戶端connect()連入,服務器處于SYN_RCVD狀態(tài)時
- 三路握手完成,服務器處于ESTABLISHED狀態(tài)時
- 客戶端close()斷開連接,服務器處于FIN_WAIT_1和FIN_WAIT_2狀態(tài)時
- 客戶端send/write()數據,服務器可讀時
- 服務器可以發(fā)送數據時
接下來,我們來看一下epoll_event_callback的源碼:
//當發(fā)生客戶端三路握手連入、可讀、可寫、客戶端斷開等情況時,操作系統(tǒng)會調用這個函數,用以往雙向鏈表中增加一個節(jié)點【該節(jié)點同時 也在紅黑樹中】
int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) {
struct epitem tmp;
tmp.sockfd = sockid;
//(1)根據給定的key【這個TCP連接的socket】從紅黑樹中找到這個節(jié)點
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep- >rbr, &tmp);
if (!epi) {
nty_trace_epoll("rbtree not existn");
assert(0);
}
//(2)從紅黑樹中找到這個節(jié)點后,判斷這個節(jié)點是否已經被連入到雙向鏈表里【判斷的是rdy標志】
if (epi- >rdy) {
//這個節(jié)點已經在雙向鏈表里,那無非是把新發(fā)生的事件標志增加到現有的事件標志中
epi- >event.events |= event;
return 1;
}
//走到這里,表示 雙向鏈表中并沒有這個節(jié)點,那要做的就是把這個節(jié)點連入到雙向鏈表中
nty_trace_epoll("epoll_event_callback -- > %dn", epi- >sockfd);
pthread_spin_lock(&ep- >lock);
//(3)標記這個節(jié)點已經被放入雙向鏈表中,我們剛才研究epoll_wait()的時候,從雙向鏈表中把這個節(jié)點取走的時候,這個標志被設置回了0
epi- >rdy = 1;
//(4)把這個節(jié)點鏈入到雙向鏈表的表頭位置
LIST_INSERT_HEAD(&ep- >rdlist, epi, rdlink);
//(5)雙向鏈表中的節(jié)點數量加1,剛才研究epoll_wait()的時候,從雙向鏈表中把這個節(jié)點取走的時候,這個數量減了1
ep- >rdnum ++;
pthread_spin_unlock(&ep- >lock);
pthread_mutex_lock(&ep- >cdmtx);
pthread_cond_signal(&ep- >cond);
pthread_mutex_unlock(&ep- >cdmtx);
return 0;
}
以上代碼的邏輯也十分簡單,就是將eventpoll所指向的紅黑樹的節(jié)點插入到雙向鏈表中。
總結
epoll底層實現中有兩個關鍵的數據結構,一個是eventpoll另一個是epitem,其中eventpoll中有兩個成員變量分別是rbr和rdlist,前者指向一顆紅黑樹的根,后者指向雙向鏈表的頭。而epitem則是紅黑樹節(jié)點和雙向鏈表節(jié)點的綜合體,也就是說epitem即可作為樹的節(jié)點,又可以作為鏈表的節(jié)點,并且epitem中包含著用戶注冊的事件。
- 當用戶調用epoll_create()時,會創(chuàng)建eventpoll對象(包含一個紅黑樹和一個雙鏈表);
- 而用戶調用epoll_ctl(ADD)時,會在紅黑樹上增加節(jié)點(epitem對象);
- 接下來,操作系統(tǒng)會默默地在通過epoll_event_callback()來管理eventpoll對象。當有事件被觸發(fā)時,操作系統(tǒng)則會調用epoll_event_callback函數,將含有該事件的epitem添加到雙向鏈表中。
- 當用戶需要管理連接時,只需通過epoll_wait()從eventpoll對象中的雙鏈表下"摘取"epitem并取出其包含的事件即可。
-
函數
+關注
關注
3文章
4235瀏覽量
61965 -
代碼
+關注
關注
30文章
4670瀏覽量
67764 -
協(xié)議棧
+關注
關注
2文章
137瀏覽量
33571 -
epoll
+關注
關注
0文章
28瀏覽量
2938
發(fā)布評論請先 登錄
相關推薦
評論