0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

TCP和UDP連接介紹

科技綠洲 ? 來(lái)源:Linux開(kāi)發(fā)架構(gòu)之路 ? 作者:Linux開(kāi)發(fā)架構(gòu)之路 ? 2023-11-11 15:40 ? 次閱讀

作為一名開(kāi)發(fā)人員我們經(jīng)常會(huì)聽(tīng)到HTTP協(xié)議、TCP/IP協(xié)議、UDP協(xié)議、Socket、Socket長(zhǎng)連接、Socket連接池等字眼,然而它們之間的關(guān)系、區(qū)別及原理并不是所有人都能理解清楚,這篇文章就從網(wǎng)絡(luò)協(xié)議基礎(chǔ)開(kāi)始到Socket連接池,一步一步解釋他們之間的關(guān)系。

七層網(wǎng)絡(luò)模型

首先從網(wǎng)絡(luò)通信的分層模型講起:七層模型,亦稱OSI(Open System Interconnection)模型。自下往上分為:物理層、數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層、傳輸層、會(huì)話層、表示層和應(yīng)用層。所有有關(guān)通信的都離不開(kāi)它,下面這張圖片介紹了各層所對(duì)應(yīng)的一些協(xié)議和硬件

圖片

通過(guò)上圖,我知道IP協(xié)議對(duì)應(yīng)于網(wǎng)絡(luò)層,TCP、UDP協(xié)議對(duì)應(yīng)于傳輸層,而HTTP協(xié)議對(duì)應(yīng)于應(yīng)用層,OSI并沒(méi)有Socket,那什么是Socket,后面我們將結(jié)合代碼具體詳細(xì)介紹。

TCP和UDP連接

關(guān)于傳輸層TCP、UDP協(xié)議可能我們平時(shí)遇見(jiàn)的會(huì)比較多,有人說(shuō)TCP是安全的,UDP是不安全的,UDP傳輸比TCP快,那為什么呢,我們先從TCP的連接建立的過(guò)程開(kāi)始分析,然后解釋UDP和TCP的區(qū)別。

TCP的三次握手和四次分手

我們知道TCP建立連接需要經(jīng)過(guò)三次握手,而斷開(kāi)連接需要經(jīng)過(guò)四次分手,那三次握手和四次分手分別做了什么和如何進(jìn)行的。

圖片

第一次握手:建立連接??蛻舳税l(fā)送連接請(qǐng)求報(bào)文段,將SYN位置為1,Sequence Number為x;然后,客戶端進(jìn)入SYN_SEND狀態(tài),等待服務(wù)器的確認(rèn);
第二次握手:服務(wù)器收到客戶端的SYN報(bào)文段,需要對(duì)這個(gè)SYN報(bào)文段進(jìn)行確認(rèn),設(shè)置Acknowledgment Number為x+1(Sequence Number+1);同時(shí),自己自己還要發(fā)送SYN請(qǐng)求信息,將SYN位置為1,Sequence Number為y;服務(wù)器端將上述所有信息放到一個(gè)報(bào)文段(即SYN+ACK報(bào)文段)中,一并發(fā)送給客戶端,此時(shí)服務(wù)器進(jìn)入SYN_RECV狀態(tài);
第三次握手:客戶端收到服務(wù)器的SYN+ACK報(bào)文段。然后將Acknowledgment Number設(shè)置為y+1,向服務(wù)器發(fā)送ACK報(bào)文段,這個(gè)報(bào)文段發(fā)送完畢以后,客戶端和服務(wù)器端都進(jìn)入ESTABLISHED狀態(tài),完成TCP三次握手。

完成了三次握手,客戶端和服務(wù)器端就可以開(kāi)始傳送數(shù)據(jù)。以上就是TCP三次握手的總體介紹。通信結(jié)束客戶端和服務(wù)端就斷開(kāi)連接,需要經(jīng)過(guò)四次分手確認(rèn)。

第一次分手:主機(jī)1(可以使客戶端,也可以是服務(wù)器端),設(shè)置Sequence Number和Acknowledgment Number,向主機(jī)2發(fā)送一個(gè)FIN報(bào)文段;此時(shí),主機(jī)1進(jìn)入FIN_WAIT_1狀態(tài);這表示主機(jī)1沒(méi)有數(shù)據(jù)要發(fā)送給主機(jī)2了;
第二次分手:主機(jī)2收到了主機(jī)1發(fā)送的FIN報(bào)文段,向主機(jī)1回一個(gè)ACK報(bào)文段,Acknowledgment Number為Sequence Number加1;主機(jī)1進(jìn)入FIN_WAIT_2狀態(tài);主機(jī)2告訴主機(jī)1,我“同意”你的關(guān)閉請(qǐng)求;
第三次分手:主機(jī)2向主機(jī)1發(fā)送FIN報(bào)文段,請(qǐng)求關(guān)閉連接,同時(shí)主機(jī)2進(jìn)入LAST_ACK狀態(tài);
第四次分手:主機(jī)1收到主機(jī)2發(fā)送的FIN報(bào)文段,向主機(jī)2發(fā)送ACK報(bào)文段,然后主機(jī)1進(jìn)入TIME_WAIT狀態(tài);主機(jī)2收到主機(jī)1的ACK報(bào)文段以后,就關(guān)閉連接;此時(shí),主機(jī)1等待2MSL后依然沒(méi)有收到回復(fù),則證明Server端已正常關(guān)閉,那好,主機(jī)1也可以關(guān)閉連接了。

可以看到一次tcp請(qǐng)求的建立及關(guān)閉至少進(jìn)行7次通信,這還不包過(guò)數(shù)據(jù)的通信,而UDP不需3次握手和4次分手。

TCP和UDP的區(qū)別

1、TCP是面向鏈接的,雖然說(shuō)網(wǎng)絡(luò)的不安全不穩(wěn)定特性決定了多少次握手都不能保證連接的可靠性,但TCP的三次握手在最低限度上(實(shí)際上也很大程度上保證了)保證了連接的可靠性;而UDP不是面向連接的,UDP傳送數(shù)據(jù)前并不與對(duì)方建立連接,對(duì)接收到的數(shù)據(jù)也不發(fā)送確認(rèn)信號(hào),發(fā)送端不知道數(shù)據(jù)是否會(huì)正確接收,當(dāng)然也不用重發(fā),所以說(shuō)UDP是無(wú)連接的、不可靠的一種數(shù)據(jù)傳輸協(xié)議。
2、也正由于1所說(shuō)的特點(diǎn),使得UDP的開(kāi)銷(xiāo)更小數(shù)據(jù)傳輸速率更高,因?yàn)椴槐剡M(jìn)行收發(fā)數(shù)據(jù)的確認(rèn),所以UDP的實(shí)時(shí)性更好。知道了TCP和UDP的區(qū)別,就不難理解為何采用TCP傳輸協(xié)議的MSN比采用UDP的QQ傳輸文件慢了,但并不能說(shuō)QQ的通信是不安全的,因?yàn)?a target="_blank">程序員可以手動(dòng)對(duì)UDP的數(shù)據(jù)收發(fā)進(jìn)行驗(yàn)證,比如發(fā)送方對(duì)每個(gè)數(shù)據(jù)包進(jìn)行編號(hào)然后由接收方進(jìn)行驗(yàn)證啊什么的,即使是這樣,UDP因?yàn)樵诘讓訁f(xié)議的封裝上沒(méi)有采用類似TCP的“三次握手”而實(shí)現(xiàn)了TCP所無(wú)法達(dá)到的傳輸效率。

問(wèn)題

關(guān)于傳輸層我們會(huì)經(jīng)常聽(tīng)到一些問(wèn)題

1.TCP服務(wù)器最大并發(fā)連接數(shù)是多少?

關(guān)于TCP服務(wù)器最大并發(fā)連接數(shù)有一種誤解就是“因?yàn)?a target="_blank">端口號(hào)上限為65535,所以TCP服務(wù)器理論上的可承載的最大并發(fā)連接數(shù)也是65535”。首先需要理解一條TCP連接的組成部分:客戶端IP、客戶端端口、服務(wù)端IP、服務(wù)端端口。所以對(duì)于TCP服務(wù)端進(jìn)程來(lái)說(shuō),他可以同時(shí)連接的客戶端數(shù)量并不受限于可用端口號(hào),理論上一個(gè)服務(wù)器的一個(gè)端口能建立的連接數(shù)是全球的IP數(shù)*每臺(tái)機(jī)器的端口數(shù)。實(shí)際并發(fā)連接數(shù)受限于linux可打開(kāi)文件數(shù),這個(gè)數(shù)是可以配置的,可以非常大,所以實(shí)際上受限于系統(tǒng)性能。通過(guò)#ulimit -n 查看服務(wù)的最大文件句柄數(shù),通過(guò)ulimit -n xxx 修改 xxx是你想要能打開(kāi)的數(shù)量。也可以通過(guò)修改系統(tǒng)參數(shù)

#vi /etc/security/limits.conf
*softnofile65536
*hardnofile65536

2.為什么TIME_WAIT狀態(tài)還需要等2MSL后才能返回到CLOSED狀態(tài)?

這是因?yàn)殡m然雙方都同意關(guān)閉連接了,而且握手的4個(gè)報(bào)文也都協(xié)調(diào)和發(fā)送完畢,按理可以直接回到CLOSED狀態(tài)(就好比從SYN_SEND狀態(tài)到ESTABLISH狀態(tài)那樣);但是因?yàn)槲覀儽仨氁傧刖W(wǎng)絡(luò)是不可靠的,你無(wú)法保證你最后發(fā)送的ACK報(bào)文會(huì)一定被對(duì)方收到,因此對(duì)方處于LAST_ACK狀態(tài)下的Socket可能會(huì)因?yàn)槌瑫r(shí)未收到ACK報(bào)文,而重發(fā)FIN報(bào)文,所以這個(gè)TIME_WAIT狀態(tài)的作用就是用來(lái)重發(fā)可能丟失的ACK報(bào)文。

3.TIME_WAIT狀態(tài)還需要等2MSL后才能返回到CLOSED狀態(tài)會(huì)產(chǎn)生什么問(wèn)題

通信雙方建立TCP連接后,主動(dòng)關(guān)閉連接的一方就會(huì)進(jìn)入TIME_WAIT狀態(tài),TIME_WAIT狀態(tài)維持時(shí)間是兩個(gè)MSL時(shí)間長(zhǎng)度,也就是在1-4分鐘,Windows操作系統(tǒng)就是4分鐘。進(jìn)入TIME_WAIT狀態(tài)的一般情況下是客戶端,一個(gè)TIME_WAIT狀態(tài)的連接就占用了一個(gè)本地端口。一臺(tái)機(jī)器上端口號(hào)數(shù)量的上限是65536個(gè),如果在同一臺(tái)機(jī)器上進(jìn)行壓力測(cè)試模擬上萬(wàn)的客戶請(qǐng)求,并且循環(huán)與服務(wù)端進(jìn)行短連接通信,那么這臺(tái)機(jī)器將產(chǎn)生4000個(gè)左右的TIME_WAIT Socket,后續(xù)的短連接就會(huì)產(chǎn)生address already in use : connect的異常,如果使用Nginx作為方向代理也需要考慮TIME_WAIT狀態(tài),發(fā)現(xiàn)系統(tǒng)存在大量TIME_WAIT狀態(tài)的連接,通過(guò)調(diào)整內(nèi)核參數(shù)解決。

vi /etc/sysctl.conf

編輯文件,加入以下內(nèi)容:

net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30

然后執(zhí)行 /sbin/sysctl -p 讓參數(shù)生效。

net.ipv4.tcp_syncookies = 1 表示開(kāi)啟SYN Cookies。當(dāng)出現(xiàn)SYN等待隊(duì)列溢出時(shí),啟用cookies來(lái)處理,可防范少量SYN攻擊,默認(rèn)為0,表示關(guān)閉;
net.ipv4.tcp_tw_reuse = 1 表示開(kāi)啟重用。允許將TIME-WAIT sockets重新用于新的TCP連接,默認(rèn)為0,表示關(guān)閉;
net.ipv4.tcp_tw_recycle = 1 表示開(kāi)啟TCP連接中TIME-WAIT sockets的快速回收,默認(rèn)為0,表示關(guān)閉。
net.ipv4.tcp_fin_timeout 修改系統(tǒng)默認(rèn)的TIMEOUT時(shí)間

HTTP協(xié)議

關(guān)于TCP/IP和HTTP協(xié)議的關(guān)系,網(wǎng)絡(luò)有一段比較容易理解的介紹:“我們?cè)趥鬏敂?shù)據(jù)時(shí),可以只使用(傳輸層)TCP/IP協(xié)議,但是那樣的話,如果沒(méi)有應(yīng)用層,便無(wú)法識(shí)別數(shù)據(jù)內(nèi)容。如果想要使傳輸?shù)臄?shù)據(jù)有意義,則必須使用到應(yīng)用層協(xié)議。應(yīng)用層協(xié)議有很多,比如HTTP、FTP、TELNET等,也可以自己定義應(yīng)用層協(xié)議。
HTTP協(xié)議即超文本傳送協(xié)議(Hypertext Transfer Protocol ),是Web聯(lián)網(wǎng)的基礎(chǔ),也是手機(jī)聯(lián)網(wǎng)常用的協(xié)議之一,WEB使用HTTP協(xié)議作應(yīng)用層協(xié)議,以封裝HTTP文本信息,然后使用TCP/IP做傳輸層協(xié)議將它發(fā)到網(wǎng)絡(luò)上。
由于HTTP在每次請(qǐng)求結(jié)束后都會(huì)主動(dòng)釋放連接,因此HTTP連接是一種“短連接”,要保持客戶端程序的在線狀態(tài),需要不斷地向服務(wù)器發(fā)起連接請(qǐng)求。通常 的做法是即時(shí)不需要獲得任何數(shù)據(jù),客戶端也保持每隔一段固定的時(shí)間向服務(wù)器發(fā)送一次“保持連接”的請(qǐng)求,服務(wù)器在收到該請(qǐng)求后對(duì)客戶端進(jìn)行回復(fù),表明知道 客戶端“在線”。若服務(wù)器長(zhǎng)時(shí)間無(wú)法收到客戶端的請(qǐng)求,則認(rèn)為客戶端“下線”,若客戶端長(zhǎng)時(shí)間無(wú)法收到服務(wù)器的回復(fù),則認(rèn)為網(wǎng)絡(luò)已經(jīng)斷開(kāi)。
下面是一個(gè)簡(jiǎn)單的HTTP Post application/json數(shù)據(jù)內(nèi)容的請(qǐng)求:

POST  HTTP/1.1
Host: 127.0.0.1:9017
Content-Type: application/json
Cache-Control: no-cache

{"a":"a"}

關(guān)于Socket(套接字)

現(xiàn)在我們了解到TCP/IP只是一個(gè)協(xié)議棧,就像操作系統(tǒng)的運(yùn)行機(jī)制一樣,必須要具體實(shí)現(xiàn),同時(shí)還要提供對(duì)外的操作接口。就像操作系統(tǒng)會(huì)提供標(biāo)準(zhǔn)的編程接口,比如Win32編程接口一樣,TCP/IP也必須對(duì)外提供編程接口,這就是Socket。現(xiàn)在我們知道,Socket跟TCP/IP并沒(méi)有必然的聯(lián)系。Socket編程接口在設(shè)計(jì)的時(shí)候,就希望也能適應(yīng)其他的網(wǎng)絡(luò)協(xié)議。所以,Socket的出現(xiàn)只是可以更方便的使用TCP/IP協(xié)議棧而已,其對(duì)TCP/IP進(jìn)行了抽象,形成了幾個(gè)最基本的函數(shù)接口。比如create,listen,accept,connect,read和write等等。
不同語(yǔ)言都有對(duì)應(yīng)的建立Socket服務(wù)端和客戶端的庫(kù),下面舉例Nodejs如何創(chuàng)建服務(wù)端和客戶端:
服務(wù)端:

const net = require('net');
const server = net.createServer();
server.on('connection', (client) = > {
  client.write('Hi!n'); // 服務(wù)端向客戶端輸出信息,使用 write() 方法
  client.write('Bye!n');
  //client.end(); // 服務(wù)端結(jié)束該次會(huì)話
});
server.listen(9000);

服務(wù)監(jiān)聽(tīng)9000端口
下面使用命令行發(fā)送http請(qǐng)求和telnet

$ curl http://127.0.0.1:9000
Bye!

$telnet 127.0.0.1 9000
Trying 192.168.1.21...
Connected to 192.168.1.21.
Escape character is '^]'.
Hi!
Bye!
Connection closed by foreign host.

注意到curl只處理了一次報(bào)文。
客戶端

const client = new net.Socket();
client.connect(9000, '127.0.0.1', function () {
});
client.on('data', (chunk) = > {
  console.log('data', chunk.toString())
  //data Hi!
  //Bye!
});

Socket長(zhǎng)連接

所謂長(zhǎng)連接,指在一個(gè)TCP連接上可以連續(xù)發(fā)送多個(gè)數(shù)據(jù)包,在TCP連接保持期間,如果沒(méi)有數(shù)據(jù)包發(fā)送,需要雙方發(fā)檢測(cè)包以維持此連接(心跳包),一般需要自己做在線維持。 短連接是指通信雙方有數(shù)據(jù)交互時(shí),就建立一個(gè)TCP連接,數(shù)據(jù)發(fā)送完成后,則斷開(kāi)此TCP連接。比如Http的,只是連接、請(qǐng)求、關(guān)閉,過(guò)程時(shí)間較短,服務(wù)器若是一段時(shí)間內(nèi)沒(méi)有收到請(qǐng)求即可關(guān)閉連接。其實(shí)長(zhǎng)連接是相對(duì)于通常的短連接而說(shuō)的,也就是長(zhǎng)時(shí)間保持客戶端與服務(wù)端的連接狀態(tài)。
通常的短連接操作步驟是:
連接→數(shù)據(jù)傳輸→關(guān)閉連接;

而長(zhǎng)連接通常就是:
連接→數(shù)據(jù)傳輸→保持連接(心跳)→數(shù)據(jù)傳輸→保持連接(心跳)→……→關(guān)閉連接;

什么時(shí)候用長(zhǎng)連接,短連接?
長(zhǎng)連接多用于操作頻繁,點(diǎn)對(duì)點(diǎn)的通訊,而且連接數(shù)不能太多情況,。每個(gè)TCP連接都需要三步握手,這需要時(shí)間,如果每個(gè)操作都是先連接,再操作的話那么處理 速度會(huì)降低很多,所以每個(gè)操作完后都不斷開(kāi),次處理時(shí)直接發(fā)送數(shù)據(jù)包就OK了,不用建立TCP連接。例如:數(shù)據(jù)庫(kù)的連接用長(zhǎng)連接, 如果用短連接頻繁的通信會(huì)造成Socket錯(cuò)誤,而且頻繁的Socket創(chuàng)建也是對(duì)資源的浪費(fèi)。

什么是心跳包為什么需要:
心跳包就是在客戶端和服務(wù)端間定時(shí)通知對(duì)方自己狀態(tài)的一個(gè)自己定義的命令字,按照一定的時(shí)間間隔發(fā)送,類似于心跳,所以叫做心跳包。網(wǎng)絡(luò)中的接收和發(fā)送數(shù)據(jù)都是使用Socket進(jìn)行實(shí)現(xiàn)。但是如果此套接字已經(jīng)斷開(kāi)(比如一方斷網(wǎng)了),那發(fā)送數(shù)據(jù)和接收數(shù)據(jù)的時(shí)候就一定會(huì)有問(wèn)題。可是如何判斷這個(gè)套接字是否還可以使用呢?這個(gè)就需要在系統(tǒng)中創(chuàng)建心跳機(jī)制。其實(shí)TCP中已經(jīng)為我們實(shí)現(xiàn)了一個(gè)叫做心跳的機(jī)制。如果你設(shè)置了心跳,那TCP就會(huì)在一定的時(shí)間(比如你設(shè)置的是3秒鐘)內(nèi)發(fā)送你設(shè)置的次數(shù)的心跳(比如說(shuō)2次),并且此信息不會(huì)影響你自己定義的協(xié)議。也可以自己定義,所謂“心跳”就是定時(shí)發(fā)送一個(gè)自定義的結(jié)構(gòu)體(心跳包或心跳幀),讓對(duì)方知道自己“在線”,以確保鏈接的有效性。
實(shí)現(xiàn):
服務(wù)端:

const net = require('net');

let clientList = [];
const heartbeat = 'HEARTBEAT'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突

const server = net.createServer();
server.on('connection', (client) = > {
  console.log('客戶端建立連接:', client.remoteAddress + ':' + client.remotePort);
  clientList.push(client);
  client.on('data', (chunk) = > {
    let content = chunk.toString();
    if (content === heartbeat) {
      console.log('收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包');
    } else {
      console.log('收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù):', content);
      client.write('服務(wù)端的數(shù)據(jù):' + content);
    }
  });
  client.on('end', () = > {
    console.log('收到客戶端end');
    clientList.splice(clientList.indexOf(client), 1);
  });
  client.on('error', () = > {
    clientList.splice(clientList.indexOf(client), 1);
  })
});
server.listen(9000);
setInterval(broadcast, 10000); // 定時(shí)發(fā)送心跳包
function broadcast() {
  console.log('broadcast heartbeat', clientList.length);
  let cleanup = []
  for (let i=0;i< clientList.length;i+=1) {
    if (clientList[i].writable) { // 先檢查 sockets 是否可寫(xiě)
      clientList[i].write(heartbeat);
    } else {
      console.log('一個(gè)無(wú)效的客戶端');
      cleanup.push(clientList[i]); // 如果不可寫(xiě),收集起來(lái)銷(xiāo)毀。銷(xiāo)毀之前要 Socket.destroy() 用 API 的方法銷(xiāo)毀。
      clientList[i].destroy();
    }
  }
  //Remove dead Nodes out of write loop to avoid trashing loop index
  for (let i=0; i< cleanup.length; i+=1) {
    console.log('刪除無(wú)效的客戶端:', cleanup[i].name);
    clientList.splice(clientList.indexOf(cleanup[i]), 1);
  }
}

服務(wù)端輸出結(jié)果:

客戶端建立連接: ::ffff:127.0.0.1:57125
broadcast heartbeat 1
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:45:15 GMT
收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:45:20 GMT
broadcast heartbeat 1
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:45:25 GMT
收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包
客戶端建立連接: ::ffff:127.0.0.1:57129
收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:46:00 GMT
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:46:04 GMT
broadcast heartbeat 2
收到客戶端發(fā)過(guò)來(lái)的數(shù)據(jù): Thu, 29 Mar 2018 03:46:05 GMT
收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包

客戶端代碼:

const net = require('net');

const heartbeat = 'HEARTBEAT'; 
const client = new net.Socket();
client.connect(9000, '127.0.0.1', () = > {});
client.on('data', (chunk) = > {
  let content = chunk.toString();
  if (content === heartbeat) {
    console.log('收到心跳包:', content);
  } else {
    console.log('收到數(shù)據(jù):', content);
  }
});

// 定時(shí)發(fā)送數(shù)據(jù)
setInterval(() = > {
  console.log('發(fā)送數(shù)據(jù)', new Date().toUTCString());
  client.write(new Date().toUTCString());
}, 5000);

// 定時(shí)發(fā)送心跳包
setInterval(function () {
  client.write(heartbeat);
}, 10000);

客戶端輸出結(jié)果:

發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:04 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:04 GMT
收到心跳包: HEARTBEAT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:09 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:09 GMT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:14 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:14 GMT
收到心跳包: HEARTBEAT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:19 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:19 GMT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:24 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:24 GMT
收到心跳包: HEARTBEAT

定義自己的協(xié)議

如果想要使傳輸?shù)臄?shù)據(jù)有意義,則必須使用到應(yīng)用層協(xié)議比如Http、Mqtt、Dubbo等?;赥CP協(xié)議上自定義自己的應(yīng)用層的協(xié)議需要解決的幾個(gè)問(wèn)題:

  1. 心跳包格式的定義及處理
  2. 報(bào)文頭的定義,就是你發(fā)送數(shù)據(jù)的時(shí)候需要先發(fā)送報(bào)文頭,報(bào)文里面能解析出你將要發(fā)送的數(shù)據(jù)長(zhǎng)度
  3. 你發(fā)送數(shù)據(jù)包的格式,是json的還是其他序列化的方式

下面我們就一起來(lái)定義自己的協(xié)議,并編寫(xiě)服務(wù)的和客戶端進(jìn)行調(diào)用:
定義報(bào)文頭格式: length:000000000xxxx; xxxx代表數(shù)據(jù)的長(zhǎng)度,總長(zhǎng)度20,舉例子不嚴(yán)謹(jǐn)。
數(shù)據(jù)表的格式: Json
服務(wù)端:

const net = require('net');
const server = net.createServer();
let clientList = [];
const heartBeat = 'HeartBeat'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突
const getHeader = (num) = > {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
server.on('connection', (client) = > {
  client.name = client.remoteAddress + ':' + client.remotePort
  // client.write('Hi ' + client.name + '!n');
  console.log('客戶端建立連接', client.name);

  clientList.push(client)
  let chunks = [];
  let length = 0;
  client.on('data', (chunk) = > {
    let content = chunk.toString();
    console.log("content:", content, content.length);
    if (content === heartBeat) {
      console.log('收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包');
    } else {
      if (content.indexOf('length:') === 0){
        length = parseInt(content.substring(7,20));
        console.log('length', length);
        chunks =[chunk.slice(20, chunk.length)];
      } else {
        chunks.push(chunk);
      }
      let heap = Buffer.concat(chunks);
      console.log('heap.length', heap.length)
      if (heap.length >= length) {
        try {
          console.log('收到數(shù)據(jù)', JSON.parse(heap.toString()));
          let data = '服務(wù)端的數(shù)據(jù)數(shù)據(jù):' + heap.toString();;
          let dataBuff =  Buffer.from(JSON.stringify(data));
          let header = getHeader(dataBuff.length)
          client.write(header);
          client.write(dataBuff);
        } catch (err) {
          console.log('數(shù)據(jù)解析失敗');
        }
      }
    }
  })

  client.on('end', () = > {
    console.log('收到客戶端end');
    clientList.splice(clientList.indexOf(client), 1);
  });
  client.on('error', () = > {
    clientList.splice(clientList.indexOf(client), 1);
  })
});
server.listen(9000);
setInterval(broadcast, 10000); // 定時(shí)檢查客戶端 并發(fā)送心跳包
function broadcast() {
  console.log('broadcast heartbeat', clientList.length);
  let cleanup = []
  for(var i=0;i< clientList.length;i+=1) {
    if(clientList[i].writable) { // 先檢查 sockets 是否可寫(xiě)
      // clientList[i].write(heartBeat); // 發(fā)送心跳數(shù)據(jù)
    } else {
      console.log('一個(gè)無(wú)效的客戶端')
      cleanup.push(clientList[i]) // 如果不可寫(xiě),收集起來(lái)銷(xiāo)毀。銷(xiāo)毀之前要 Socket.destroy() 用 API 的方法銷(xiāo)毀。
      clientList[i].destroy();
    }
  }
  // 刪除無(wú)效的客戶端
  for(i=0; i< cleanup.length; i+=1) {
    console.log('刪除無(wú)效的客戶端:', cleanup[i].name);
    clientList.splice(clientList.indexOf(cleanup[i]), 1)
  }
}

日志打?。?/p>

客戶端建立連接 ::ffff:127.0.0.1:50178
 content: length:0000000000031 20
 length 31
 heap.length 0
 content: "Tue, 03 Apr 2018 06:12:37 GMT" 31
 heap.length 31
 收到數(shù)據(jù) Tue, 03 Apr 2018 06:12:37 GMT
 broadcast heartbeat 1
 content: HeartBeat 9
 收到客戶端發(fā)過(guò)來(lái)的一個(gè)心跳包
 content: length:0000000000031"Tue, 03 Apr 2018 06:12:42 GMT" 51
 length 31
 heap.length 31
 收到數(shù)據(jù) Tue, 03 Apr 2018 06:12:42 GMT

客戶端

const net = require('net');
const client = new net.Socket();
const heartBeat = 'HeartBeat'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突
const getHeader = (num) = > {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
client.connect(9000, '127.0.0.1', function () {});
let chunks = [];
let length = 0;
client.on('data', (chunk) = > {
  let content = chunk.toString();
  console.log("content:", content, content.length);
  if (content === heartBeat) {
    console.log('收到服務(wù)端發(fā)過(guò)來(lái)的一個(gè)心跳包');
  } else {
    if (content.indexOf('length:') === 0){
      length = parseInt(content.substring(7,20));
      console.log('length', length);
      chunks =[chunk.slice(20, chunk.length)];
    } else {
      chunks.push(chunk);
    }
    let heap = Buffer.concat(chunks);
    console.log('heap.length', heap.length)
    if (heap.length >= length) {
      try {
        console.log('收到數(shù)據(jù)', JSON.parse(heap.toString()));
      } catch (err) {
        console.log('數(shù)據(jù)解析失敗');
      }
    }
  }
});
// 定時(shí)發(fā)送數(shù)據(jù)
setInterval(function () {
  let data = new Date().toUTCString();
  let dataBuff =  Buffer.from(JSON.stringify(data));
  let header =getHeader(dataBuff.length);
  client.write(header);
  client.write(dataBuff);
}, 5000);
// 定時(shí)發(fā)送心跳包
setInterval(function () {
  client.write(heartBeat);
}, 10000);

日志打?。?/p>

content: length:0000000000060 20
 length 60
 heap.length 0
 content: "服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:37 GMT"" 44
 heap.length 60
 收到數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:37 GMT"
 content: length:0000000000060"服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:42 GMT"" 64
 length 60
 heap.length 60
 收到數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:42 GMT"

客戶端定時(shí)發(fā)送自定義協(xié)議數(shù)據(jù)到服務(wù)端,先發(fā)送頭數(shù)據(jù),在發(fā)送內(nèi)容數(shù)據(jù),另外一個(gè)定時(shí)器發(fā)送心跳數(shù)據(jù),服務(wù)端判斷是心跳數(shù)據(jù),再判斷是不是頭數(shù)據(jù),再是內(nèi)容數(shù)據(jù),然后解析后再發(fā)送數(shù)據(jù)給客戶端。從日志的打印可以看出客戶端先后writeheader和data數(shù)據(jù),服務(wù)端可能在一個(gè)data事件里面接收到。
這里可以看到一個(gè)客戶端在同一個(gè)時(shí)間內(nèi)處理一個(gè)請(qǐng)求可以很好的工作,但是想象這么一個(gè)場(chǎng)景,如果同一時(shí)間內(nèi)讓同一個(gè)客戶端去多次調(diào)用服務(wù)端請(qǐng)求,發(fā)送多次頭數(shù)據(jù)和內(nèi)容數(shù)據(jù),服務(wù)端的data事件收到的數(shù)據(jù)就很難區(qū)別哪些數(shù)據(jù)是哪次請(qǐng)求的,比如兩次頭數(shù)據(jù)同時(shí)到達(dá)服務(wù)端,服務(wù)端就會(huì)忽略其中一次,而后面的內(nèi)容數(shù)據(jù)也不一定就對(duì)應(yīng)于這個(gè)頭的。所以想復(fù)用長(zhǎng)連接并能很好的高并發(fā)處理服務(wù)端請(qǐng)求,就需要連接池這種方式了。

Socket連接池

什么是Socket連接池,池的概念可以聯(lián)想到是一種資源的集合,所以Socket連接池,就是維護(hù)著一定數(shù)量Socket長(zhǎng)連接的集合。它能自動(dòng)檢測(cè)Socket長(zhǎng)連接的有效性,剔除無(wú)效的連接,補(bǔ)充連接池的長(zhǎng)連接的數(shù)量。從代碼層次上其實(shí)是人為實(shí)現(xiàn)這種功能的類,一般一個(gè)連接池包含下面幾個(gè)屬性:

  1. 空閑可使用的長(zhǎng)連接隊(duì)列
  2. 正在運(yùn)行的通信的長(zhǎng)連接隊(duì)列
  3. 等待去獲取一個(gè)空閑長(zhǎng)連接的請(qǐng)求的隊(duì)列
  4. 無(wú)效長(zhǎng)連接的剔除功能
  5. 長(zhǎng)連接資源池的數(shù)量配置
  6. 長(zhǎng)連接資源的新建功能

場(chǎng)景: 一個(gè)請(qǐng)求過(guò)來(lái),首先去資源池要求獲取一個(gè)長(zhǎng)連接資源,如果空閑隊(duì)列里面有長(zhǎng)連接,就獲取到這個(gè)長(zhǎng)連接Socket,并把這個(gè)Socket移到正在運(yùn)行的長(zhǎng)連接隊(duì)列。如果空閑隊(duì)列里面沒(méi)有,且正在運(yùn)行的隊(duì)列長(zhǎng)度小于配置的連接池資源的數(shù)量,就新建一個(gè)長(zhǎng)連接到正在運(yùn)行的隊(duì)列去,如果正在運(yùn)行的不下于配置的資源池長(zhǎng)度,則這個(gè)請(qǐng)求進(jìn)入到等待隊(duì)列去。當(dāng)一個(gè)正在運(yùn)行的Socket完成了請(qǐng)求,就從正在運(yùn)行的隊(duì)列移到空閑的隊(duì)列,并觸發(fā)等待請(qǐng)求隊(duì)列去獲取空閑資源,如果有等待的情況。

這里簡(jiǎn)單介紹Nodejs的Socket連接池generic-pool模塊的源碼。
主要文件目錄結(jié)構(gòu)

.
|————lib  ------------------------- 代碼庫(kù)
| |————DefaultEvictor.js ---------- 
| |————Deferred.js ---------------- 
| |————Deque.js ------------------- 
| |————DequeIterator.js ----------- 
| |————DoublyLinkedList.js -------- 
| |————DoublyLinkedListIterator.js- 
| |————factoryValidator.js -------- 
| |————Pool.js -------------------- 連接池主要代碼
| |————PoolDefaults.js ------------ 
| |————PooledResource.js ---------- 
| |————Queue.js ------------------- 隊(duì)列
| |————ResourceLoan.js ------------ 
| |————ResourceRequest.js --------- 
| |————utils.js ------------------- 工具
|————test ------------------------- 測(cè)試目錄
|————README.md  ------------------- 項(xiàng)目描述文件
|————.eslintrc  ------------------- eslint靜態(tài)檢查配置文件
|————.eslintignore  --------------- eslint靜態(tài)檢查忽略的文件
|————package.json ----------------- npm包依賴配置

下面介紹庫(kù)的使用:

初始化連接池

'use strict';
const net = require('net');
const genericPool = require('generic-pool');

function createPool(conifg) {
  let options = Object.assign({
    fifo: true,                             // 是否優(yōu)先使用老的資源
    priorityRange: 1,                       // 優(yōu)先級(jí)
    testOnBorrow: true,                     // 是否開(kāi)啟獲取驗(yàn)證
    // acquireTimeoutMillis: 10 * 1000,     // 獲取的超時(shí)時(shí)間
    autostart: true,                        // 自動(dòng)初始化和釋放調(diào)度啟用
    min: 10,                                // 初始化連接池保持的長(zhǎng)連接最小數(shù)量
    max: 0,                                 // 最大連接池保持的長(zhǎng)連接數(shù)量
    evictionRunIntervalMillis: 0,           // 資源釋放檢驗(yàn)間隔檢查 設(shè)置了下面幾個(gè)參數(shù)才起效果
    numTestsPerEvictionRun: 3,              // 每次釋放資源數(shù)量
    softIdleTimeoutMillis: -1,              // 可用的超過(guò)了最小的min 且空閑時(shí)間時(shí)間 達(dá)到釋放
    idleTimeoutMillis: 30000                // 強(qiáng)制釋放
    // maxWaitingClients: 50                // 最大等待
  }, conifg.options);
  const factory = {

    create: function () {
      return new Promise((resolve, reject) = > {
        let socket = new net.Socket();
        socket.setKeepAlive(true);
        socket.connect(conifg.port, conifg.host);
        // TODO 心跳包的處理邏輯
        socket.on('connect', () = > {
          console.log('socket_pool', conifg.host, conifg.port, 'connect' );
          resolve(socket);
        });
        socket.on('close', (err) = > { // 先end 事件再close事件
          console.log('socket_pool', conifg.host, conifg.port, 'close', err);
        });
        socket.on('error', (err) = > {
          console.log('socket_pool', conifg.host, conifg.port, 'error', err);
          reject(err);
        });
      });
    },
    //銷(xiāo)毀連接
    destroy: function (socket) {
      return new Promise((resolve) = > {
        socket.destroy(); // 不會(huì)觸發(fā)end 事件 第一次會(huì)觸發(fā)發(fā)close事件 如果有message會(huì)觸發(fā)error事件
        resolve();
      });
    },
    validate: function (socket) { //獲取資源池校驗(yàn)資源有效性
      return new Promise((resolve) = > {
        // console.log('socket.destroyed:', socket.destroyed, 'socket.readable:', socket.readable, 'socket.writable:', socket.writable);
        if (socket.destroyed || !socket.readable || !socket.writable) {
          return resolve(false);
        } else {
          return resolve(true);
        }
      });
    }
  };
  const pool = genericPool.createPool(factory, options);
  pool.on('factoryCreateError', (err) = > { // 監(jiān)聽(tīng)新建長(zhǎng)連接出錯(cuò) 讓請(qǐng)求直接返回錯(cuò)誤
    const clientResourceRequest = pool._waitingClientsQueue.dequeue();
    if (clientResourceRequest) {
      clientResourceRequest.reject(err);
    }
  });
  return pool;
};

let pool = createPool({
  port: 9000,
  host: '127.0.0.1',
  options: {min: 0, max: 10}
});

使用連接池

下面連接池的使用,使用的協(xié)議是我們之前自定義的協(xié)議。

let pool = createPool({
  port: 9000,
  host: '127.0.0.1',
  options: {min: 0, max: 10}
});
const getHeader = (num) = > {
  return 'length:' + (Array(13).join(0) + num).slice(-13);
}
const request = async (requestDataBuff) = > {
  let client;
  try {
    client = await pool.acquire();
  } catch (e) {
    console.log('acquire socket client failed: ', e);
    throw e;
  }
  let timeout = 10000;
  return new Promise((resolve, reject) = > {
    let chunks = [];
    let length = 0;
    client.setTimeout(timeout);
    client.removeAllListeners('error');
    client.on('error', (err) = > {
      client.removeAllListeners('error');
      client.removeAllListeners('data');
      client.removeAllListeners('timeout');
      pool.destroyed(client);
      reject(err);
    });
    client.on('timeout', () = > {
      client.removeAllListeners('error');
      client.removeAllListeners('data');
      client.removeAllListeners('timeout');
      // 應(yīng)該銷(xiāo)毀以防下一個(gè)req的data事件監(jiān)聽(tīng)才返回?cái)?shù)據(jù)
      pool.destroy(client);
      // pool.release(client);
      reject(`socket connect timeout set ${timeout}`);
    });
    let header = getHeader(requestDataBuff.length);
    client.write(header);
    client.write(requestDataBuff);
    client.on('data', (chunk) = > {
      let content = chunk.toString();
      console.log('content', content, content.length);
      // TODO 過(guò)濾心跳包
      if (content.indexOf('length:') === 0){
        length = parseInt(content.substring(7,20));
        console.log('length', length);
        chunks =[chunk.slice(20, chunk.length)];
      } else {
        chunks.push(chunk);
      }
      let heap = Buffer.concat(chunks);
      console.log('heap.length', heap.length);
      if (heap.length >= length) {
        pool.release(client);
        client.removeAllListeners('error');
        client.removeAllListeners('data');
        client.removeAllListeners('timeout');
        try {
          // console.log('收到數(shù)據(jù)', JSON.parse(heap.toString()));
          resolve(JSON.parse(heap.toString()));
        } catch (err) {
          reject(err);
          console.log('數(shù)據(jù)解析失敗');
        }
      }
    });
  });
}
request(Buffer.from(JSON.stringify({a: 'a'})))
  .then((data) = > {
    console.log('收到服務(wù)的數(shù)據(jù)',data)
  }).catch(err = > {
    console.log(err);
  });

request(Buffer.from(JSON.stringify({b: 'b'})))
  .then((data) = > {
    console.log('收到服務(wù)的數(shù)據(jù)',data)
  }).catch(err = > {
    console.log(err);
  });

setTimeout(function () { //查看是否會(huì)復(fù)用Socket 有沒(méi)有建立新的連接
  request(Buffer.from(JSON.stringify({c: 'c'})))
    .then((data) = > {
      console.log('收到服務(wù)的數(shù)據(jù)',data)
    }).catch(err = > {
    console.log(err);
  });

  request(Buffer.from(JSON.stringify({d: 'd'})))
    .then((data) = > {
      console.log('收到服務(wù)的數(shù)據(jù)',data)
    }).catch(err = > {
    console.log(err);
  });
}, 1000)

日志打印:

socket_pool 127.0.0.1 9000 connect
 socket_pool 127.0.0.1 9000 connect
 content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"a":"a"}" 44
 length 40
 heap.length 40
 收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"a":"a"}
 content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"b":"b"}" 44
 length 40
 heap.length 40
 收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"b":"b"}
 content length:0000000000040 20
 length 40
 heap.length 0
 content "服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"c":"c"}" 24
 heap.length 40
 收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"c":"c"}
 content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"d":"d"}" 44
 length 40
 heap.length 40
 收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"d":"d"}

這里看到前面兩個(gè)請(qǐng)求都建立了新的Socket連接 socket_pool 127.0.0.1 9000 connect,定時(shí)器結(jié)束后重新發(fā)起兩個(gè)請(qǐng)求就沒(méi)有建立新的Socket連接了,直接從連接池里面獲取Socket連接資源。

源碼分析

發(fā)現(xiàn)主要的代碼就位于lib文件夾中的Pool.js
構(gòu)造函數(shù):
lib/Pool.js

/**
   * Generate an Object pool with a specified `factory` and `config`.
   *
   * @param {typeof DefaultEvictor} Evictor
   * @param {typeof Deque} Deque
   * @param {typeof PriorityQueue} PriorityQueue
   * @param {Object} factory
   *   Factory to be used for generating and destroying the items.
   * @param {Function} factory.create
   *   Should create the item to be acquired,
   *   and call it's first callback argument with the generated item as it's argument.
   * @param {Function} factory.destroy
   *   Should gently close any resources that the item is using.
   *   Called before the items is destroyed.
   * @param {Function} factory.validate
   *   Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
   *   If it should be removed from pool.
   * @param {Object} options
   */
  constructor(Evictor, Deque, PriorityQueue, factory, options) {
    super();
    factoryValidator(factory); // 檢驗(yàn)我們定義的factory的有效性包含create destroy validate
    this._config = new PoolOptions(options); // 連接池配置
    // TODO: fix up this ugly glue-ing
    this._Promise = this._config.Promise;

    this._factory = factory;
    this._draining = false;
    this._started = false;
    /**
     * Holds waiting clients
     * @type {PriorityQueue}
     */
    this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange); // 請(qǐng)求的對(duì)象管管理隊(duì)列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] }
    /**
     * Collection of promises for resource creation calls made by the pool to factory.create
     * @type {Set}
     */
    this._factoryCreateOperations = new Set(); // 正在創(chuàng)建的長(zhǎng)連接

    /**
     * Collection of promises for resource destruction calls made by the pool to factory.destroy
     * @type {Set}
     */
    this._factoryDestroyOperations = new Set(); // 正在銷(xiāo)毀的長(zhǎng)連接

    /**
     * A queue/stack of pooledResources awaiting acquisition
     * TODO: replace with LinkedList backed array
     * @type {Deque}
     */
    this._availableObjects = new Deque(); // 空閑的資源長(zhǎng)連接

    /**
     * Collection of references for any resource that are undergoing validation before being acquired
     * @type {Set}
     */
    this._testOnBorrowResources = new Set(); // 正在檢驗(yàn)有效性的資源

    /**
     * Collection of references for any resource that are undergoing validation before being returned
     * @type {Set}
     */
    this._testOnReturnResources = new Set();

    /**
     * Collection of promises for any validations currently in process
     * @type {Set}
     */
    this._validationOperations = new Set();// 正在校驗(yàn)的中間temp

    /**
     * All objects associated with this pool in any state (except destroyed)
     * @type {Set}
     */
    this._allObjects = new Set(); // 所有的鏈接資源 是一個(gè) PooledResource對(duì)象

    /**
     * Loans keyed by the borrowed resource
     * @type {Map}
     */
    this._resourceLoans = new Map(); // 被借用的對(duì)象的map release的時(shí)候用到

    /**
     * Infinitely looping iterator over available object
     * @type {DequeIterator}
     */
    this._evictionIterator = this._availableObjects.iterator(); // 一個(gè)迭代器

    this._evictor = new Evictor();

    /**
     * handle for setTimeout for next eviction run
     * @type {(number|null)}
     */
    this._scheduledEviction = null;

    // create initial resources (if factory.min > 0)
    if (this._config.autostart === true) { // 初始化最小的連接數(shù)量
      this.start();
    }
  }

可以看到包含之前說(shuō)的空閑的資源隊(duì)列,正在請(qǐng)求的資源隊(duì)列,正在等待的請(qǐng)求隊(duì)列等。
下面查看 Pool.acquire 方法
lib/Pool.js

/**
   * Request a new resource. The callback will be called,
   * when a new resource is available, passing the resource to the callback.
   * TODO: should we add a seperate "acquireWithPriority" function
   *
   * @param {Number} [priority=0]
   *   Optional.  Integer between 0 and (priorityRange - 1).  Specifies the priority
   *   of the caller if there are no available resources.  Lower numbers mean higher
   *   priority.
   *
   * @returns {Promise}
   */
  acquire(priority) { // 空閑資源隊(duì)列資源是有優(yōu)先等級(jí)的 
    if (this._started === false && this._config.autostart === false) {
      this.start(); // 會(huì)在this._allObjects 添加min的連接對(duì)象數(shù)
    }
    if (this._draining) { // 如果是在資源釋放階段就不能再請(qǐng)求資源了
      return this._Promise.reject(
        new Error("pool is draining and cannot accept work")
      );
    }
    // 如果要設(shè)置了等待隊(duì)列的長(zhǎng)度且要等待 如果超過(guò)了就返回資源不可獲取
    // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
    if (
      this._config.maxWaitingClients !== undefined &&
      this._waitingClientsQueue.length >= this._config.maxWaitingClients
    ) {
      return this._Promise.reject(
        new Error("max waitingClients count exceeded")
      );
    }

    const resourceRequest = new ResourceRequest(
      this._config.acquireTimeoutMillis, // 對(duì)象里面的超時(shí)配置 表示等待時(shí)間 會(huì)啟動(dòng)一個(gè)定時(shí) 超時(shí)了就觸發(fā)resourceRequest.promise 的reject觸發(fā)
      this._Promise
    );
    // console.log(resourceRequest)
    this._waitingClientsQueue.enqueue(resourceRequest, priority); // 請(qǐng)求進(jìn)入等待請(qǐng)求隊(duì)列
    this._dispense(); // 進(jìn)行資源分發(fā) 最終會(huì)觸發(fā)resourceRequest.promise的resolve(client) 

    return resourceRequest.promise; // 返回的是一個(gè)promise對(duì)象resolve卻是在其他地方觸發(fā)
  }
/**
   * Attempt to resolve an outstanding resource request using an available resource from
   * the pool, or creating new ones
   *
   * @private
   */
  _dispense() {
    /**
     * Local variables for ease of reading/writing
     * these don't (shouldn't) change across the execution of this fn
     */
    const numWaitingClients = this._waitingClientsQueue.length; // 正在等待的請(qǐng)求的隊(duì)列長(zhǎng)度 各個(gè)優(yōu)先級(jí)的總和
    console.log('numWaitingClients', numWaitingClients)  // 1

    // If there aren't any waiting requests then there is nothing to do
    // so lets short-circuit
    if (numWaitingClients < 1) {
      return;
    }
    //  max: 10, min: 4
    console.log('_potentiallyAllocableResourceCount', this._potentiallyAllocableResourceCount) // 目前潛在空閑可用的連接數(shù)量
    const resourceShortfall =
      numWaitingClients - this._potentiallyAllocableResourceCount; // 還差幾個(gè)可用的 小于零表示不需要 大于0表示需要新建長(zhǎng)連接的數(shù)量
    console.log('spareResourceCapacity', this.spareResourceCapacity) // 距離max數(shù)量的還有幾個(gè)沒(méi)有創(chuàng)建
    const actualNumberOfResourcesToCreate = Math.min(
      this.spareResourceCapacity, // -6
      resourceShortfall // 這個(gè)是 -3
    ); // 如果resourceShortfall >0 表示需要新建但是這新建的數(shù)量不能超過(guò)spareResourceCapacity最多可創(chuàng)建的
    console.log('actualNumberOfResourcesToCreate', actualNumberOfResourcesToCreate) // 如果actualNumberOfResourcesToCreate >0 表示需要?jiǎng)?chuàng)建連接
    for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
      this._createResource(); // 新增新的長(zhǎng)連接
    }

    // If we are doing test-on-borrow see how many more resources need to be moved into test
    // to help satisfy waitingClients
    if (this._config.testOnBorrow === true) { // 如果開(kāi)啟了使用前校驗(yàn)資源的有效性
      // how many available resources do we need to shift into test
      const desiredNumberOfResourcesToMoveIntoTest =
        numWaitingClients - this._testOnBorrowResources.size;// 1
      const actualNumberOfResourcesToMoveIntoTest = Math.min(
        this._availableObjects.length, // 3
        desiredNumberOfResourcesToMoveIntoTest // 1
      );
      for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) { // 需要有效性校驗(yàn)的數(shù)量 至少滿足最小的waiting clinet
        this._testOnBorrow(); // 資源有效校驗(yàn)后再分發(fā)
      }
    }

    // if we aren't testing-on-borrow then lets try to allocate what we can
    if (this._config.testOnBorrow === false) { // 如果沒(méi)有開(kāi)啟有效性校驗(yàn) 就開(kāi)啟有效資源的分發(fā)
      const actualNumberOfResourcesToDispatch = Math.min(
        this._availableObjects.length,
        numWaitingClients
      );
      for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) { // 開(kāi)始分發(fā)資源
        this._dispatchResource();
      }
    }
  }
/**
   * Attempt to move an available resource to a waiting client
   * @return {Boolean} [description]
   */
  _dispatchResource() {
    if (this._availableObjects.length < 1) {
      return false;
    }

    const pooledResource = this._availableObjects.shift(); // 從可以資源池里面取出一個(gè)
    this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分發(fā)
    return false;
  }
  /**
   * Dispatches a pooledResource to the next waiting client (if any) else
   * puts the PooledResource back on the available list
   * @param  {PooledResource} pooledResource [description]
   * @return {Boolean}                [description]
   */
  _dispatchPooledResourceToNextWaitingClient(pooledResource) {
    const clientResourceRequest = this._waitingClientsQueue.dequeue(); // 可能是undefined 取出一個(gè)等待的quene
    console.log('clientResourceRequest.state', clientResourceRequest.state);
    if (clientResourceRequest === undefined ||
      clientResourceRequest.state !== Deferred.PENDING) {
      console.log('沒(méi)有等待的')
      // While we were away either all the waiting clients timed out
      // or were somehow fulfilled. put our pooledResource back.
      this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的資源里面添加一個(gè)
      // TODO: do need to trigger anything before we leave?
      return false;
    }
    // TODO clientResourceRequest 的state是否需要判斷 如果已經(jīng)是resolve的狀態(tài) 已經(jīng)超時(shí)回去了 這個(gè)是否有問(wèn)題
    const loan = new ResourceLoan(pooledResource, this._Promise); 
    this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是個(gè)map k= >value  pooledResource.obj 就是socket本身
    pooledResource.allocate(); // 標(biāo)識(shí)資源的狀態(tài)是正在被使用
    clientResourceRequest.resolve(pooledResource.obj); //  acquire方法返回的promise對(duì)象的resolve在這里執(zhí)行的
    return true;
  }

上面的代碼就按種情況一直走下到最終獲取到長(zhǎng)連接的資源,其他更多代碼大家可以自己去深入了解。

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • TCP
    TCP
    +關(guān)注

    關(guān)注

    8

    文章

    1324

    瀏覽量

    78755
  • UDP
    UDP
    +關(guān)注

    關(guān)注

    0

    文章

    317

    瀏覽量

    33801
  • 網(wǎng)絡(luò)通信
    +關(guān)注

    關(guān)注

    4

    文章

    769

    瀏覽量

    29693
  • 模型
    +關(guān)注

    關(guān)注

    1

    文章

    3032

    瀏覽量

    48350
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    通信必備知識(shí)!TCPUDP協(xié)議介紹及使用

    TCPUDP是兩個(gè)最常用的通訊協(xié)議。TCP是面向連接的協(xié)議,需要在收發(fā)數(shù)據(jù)前與對(duì)方建立可靠的連接,建立
    的頭像 發(fā)表于 03-15 08:19 ?1543次閱讀
    通信必備知識(shí)!<b class='flag-5'>TCP</b>與<b class='flag-5'>UDP</b>協(xié)議<b class='flag-5'>介紹</b>及使用

    TCPUDP的區(qū)別分析

      傳輸層協(xié)議主要有TCPUDP。UDP提供無(wú)連接的通信,不能保證數(shù)據(jù)包被發(fā)送到目標(biāo)地址,典型的即時(shí)傳輸少量數(shù)據(jù)的應(yīng)用程序通常使用UDP。
    發(fā)表于 09-18 10:29 ?2次下載

    TCP如何與UDP命令連接詳細(xì)指南說(shuō)明

    本文檔的主要內(nèi)容詳細(xì)介紹的是TCP如何與UDP命令連接詳細(xì)指南說(shuō)明。
    發(fā)表于 02-28 08:00 ?6次下載
    <b class='flag-5'>TCP</b>如何與<b class='flag-5'>UDP</b>命令<b class='flag-5'>連接</b>詳細(xì)指南說(shuō)明

    TCP, ISO- on- TCP, UDP連接

    TSEND“ & ?TRCV “ 發(fā)送和接收數(shù)據(jù)(TCP 和ISO - on- TCP)?TUSEND“ & ?TURCV“ 發(fā)送和接收數(shù)據(jù)(UDP) 自動(dòng)連接管理的通訊塊
    的頭像 發(fā)表于 06-12 15:11 ?4936次閱讀
    <b class='flag-5'>TCP</b>, ISO- on- <b class='flag-5'>TCP</b>, <b class='flag-5'>UDP</b><b class='flag-5'>連接</b>

    tcpudp協(xié)議的異同

    。UDP 校驗(yàn)和則是包含 UDP 首部和數(shù)據(jù)在內(nèi)的校驗(yàn)結(jié)果。 TCP協(xié)議 TCP協(xié)議基于網(wǎng)絡(luò)層的 IP 協(xié)議提供的是有連接、可靠服務(wù),是基于
    的頭像 發(fā)表于 11-12 14:45 ?3841次閱讀
    <b class='flag-5'>tcp</b>和<b class='flag-5'>udp</b>協(xié)議的異同

    如何使用Netcat命令建立和測(cè)試TCPUDP連接

    Netcat或nc是一個(gè)命令行程序,它使用TCPUDP協(xié)議通過(guò)網(wǎng)絡(luò)連接讀取和寫(xiě)入數(shù)據(jù)。
    的頭像 發(fā)表于 12-12 17:39 ?7307次閱讀

    UDPTCP的區(qū)別

    在上一則文章中,對(duì) TCP 的**三次握手建立連接**和**四次揮手釋放連接**進(jìn)行了詳細(xì)地闡述,本節(jié)教程針對(duì)于 TCP 的其他內(nèi)容進(jìn)行講解,首先是同處于傳輸層協(xié)議的`
    的頭像 發(fā)表于 01-20 17:05 ?1563次閱讀
    <b class='flag-5'>UDP</b>和<b class='flag-5'>TCP</b>的區(qū)別

    TCPUDP的原理以及區(qū)別

    TCP是基于連接的,而UDP是基于非連接的。 **tcp傳輸數(shù)據(jù)穩(wěn)定可靠** ,適用于對(duì)網(wǎng)絡(luò)通訊質(zhì)量要求較高的場(chǎng)景,需要準(zhǔn)確無(wú)誤的傳輸
    的頭像 發(fā)表于 05-18 17:14 ?854次閱讀
    <b class='flag-5'>TCP</b>和<b class='flag-5'>UDP</b>的原理以及區(qū)別

    基于Socket的UDPTCP編程解析 1

    TCP(傳輸控制協(xié)議)和UDP(用戶數(shù)據(jù)報(bào)協(xié)議是網(wǎng)絡(luò)體系結(jié)TCP/IP模型中傳輸層一層中的兩個(gè)不同的通信協(xié)議。 TCP:傳輸控制協(xié)議,一種面向
    的頭像 發(fā)表于 05-18 17:22 ?857次閱讀
    基于Socket的<b class='flag-5'>UDP</b>和<b class='flag-5'>TCP</b>編程解析 1

    基于Socket的UDPTCP編程解析 2

    TCP(傳輸控制協(xié)議)和UDP(用戶數(shù)據(jù)報(bào)協(xié)議是網(wǎng)絡(luò)體系結(jié)TCP/IP模型中傳輸層一層中的兩個(gè)不同的通信協(xié)議。 TCP:傳輸控制協(xié)議,一種面向
    的頭像 發(fā)表于 05-18 17:22 ?560次閱讀
    基于Socket的<b class='flag-5'>UDP</b>和<b class='flag-5'>TCP</b>編程解析 2

    TCPUDP可以同時(shí)綁定相同的端口嗎?

    TCPUDP可以同時(shí)綁定相同的端口嗎?TCPUDP可以同時(shí)綁定相同的端口嗎?解答這個(gè)問(wèn)題之前,我們需要先來(lái)了解什么是TCP
    的頭像 發(fā)表于 02-06 11:16 ?1635次閱讀
    <b class='flag-5'>TCP</b>和<b class='flag-5'>UDP</b>可以同時(shí)綁定相同的端口嗎?

    TCPUDP的區(qū)別

    1.TCPUDP的區(qū)別 TCP是面向連接的,UDP是面向無(wú)連接的;
    的頭像 發(fā)表于 11-09 09:35 ?3305次閱讀
    <b class='flag-5'>TCP</b>和<b class='flag-5'>UDP</b>的區(qū)別

    TCPUDP的基本區(qū)別

    TCPUDP基本區(qū)別 基于連接與無(wú)連接 TCP要求系統(tǒng)資源較多,UDP較少;
    的頭像 發(fā)表于 11-13 15:27 ?4397次閱讀
    <b class='flag-5'>TCP</b>與<b class='flag-5'>UDP</b>的基本區(qū)別

    UDPTCP的主要區(qū)別 UDP能否像TCP一樣實(shí)現(xiàn)可靠傳輸?

    UDPTCP的主要區(qū)別 UDP能否像TCP一樣實(shí)現(xiàn)可靠傳輸?TCP如何實(shí)現(xiàn)可靠性傳輸? UDP
    的頭像 發(fā)表于 01-22 16:10 ?605次閱讀

    udp是什么意思 簡(jiǎn)述TCPUDP的區(qū)別和聯(lián)系

    中的兩個(gè)基本協(xié)議。然而,TCPUDP之間存在一些重要的區(qū)別和聯(lián)系。 首先,TCP是一種面向連接的協(xié)議,而UDP是無(wú)
    的頭像 發(fā)表于 02-02 16:33 ?1003次閱讀