為什么需要無鎖隊(duì)列
無鎖隊(duì)列解決了什么問題?無鎖隊(duì)列解決了鎖引起的問題。
cache失效
當(dāng)CPU要訪問主存的時(shí)候,這些數(shù)據(jù)首先要被copy到cache中,因?yàn)檫@些數(shù)據(jù)在不久的將來可能又會(huì)被處理器訪問;CPU訪問cache的速度要比訪問內(nèi)存要快的多;由于線程頻繁切換,會(huì)造成cache失效,將導(dǎo)致應(yīng)用程序性能下降。
阻塞引起的CPU浪費(fèi)
mutex是阻塞的,在一個(gè)負(fù)載較重的應(yīng)用程序中使用阻塞隊(duì)列來在線程之間傳遞消息,會(huì)導(dǎo)致頻繁的線程切換,大量的時(shí)間將被浪費(fèi)在獲取mutex,而不是處理任務(wù)上。
這就需要非阻塞來解決問題。任務(wù)之間不爭(zhēng)搶任何資源,在隊(duì)列中預(yù)定一個(gè)位置,然后在這個(gè)位置上插入或提取數(shù)據(jù)。這種機(jī)制使用了cas(compare and swap)的操作,它是一個(gè)原子操作,需要CPU指令支持。它的思想是先比較,再賦值。具體操作如下:它需要3個(gè)操作數(shù),m,A, B,其中m是一個(gè)內(nèi)存地址,將m指向的內(nèi)存中的數(shù)據(jù)與A比較,如果相等則將B寫入到m指向的內(nèi)存并返回true,如果不相等則什么也不做返回false。
cas語義如下
if (a == b) {
a = c;
}
cmpxchg(a, b, c)
bool CAS( int * pAddr, int nExpected, int nNew )
atomically {
if ( *pAddr == nExpected ) {
*pAddr = nNew ;
return true ;
}
return false ;
}
內(nèi)存的頻繁申請(qǐng)和釋放
當(dāng)一個(gè)任務(wù)從堆中分配內(nèi)存時(shí),標(biāo)準(zhǔn)的內(nèi)存分配機(jī)制會(huì)阻塞所有與這個(gè)任務(wù)共享地址空間的其它任務(wù)(進(jìn)程中的所有線程)。malloc本身也是加鎖的,保證線程安全。這樣也會(huì)造成線程之間的競(jìng)爭(zhēng)。標(biāo)準(zhǔn)隊(duì)列插入數(shù)據(jù)的時(shí)候,都回導(dǎo)致堆上的動(dòng)態(tài)內(nèi)存分配,會(huì)導(dǎo)致應(yīng)用程序性能下降。
小結(jié)
- cache失效
- 阻塞引起的CPU浪費(fèi)
- 內(nèi)存的頻繁申請(qǐng)和釋放
這3個(gè)問題,本質(zhì)上都是由于線程切換帶來的問題。無鎖隊(duì)列就是從這幾個(gè)方面解決問題。
無鎖隊(duì)列使用場(chǎng)景
無鎖隊(duì)列適用于隊(duì)列push、pop非常頻繁的場(chǎng)景,效率要比mutex高很多; 比如,股票行情,1秒鐘至少幾十萬數(shù)據(jù)量。
無鎖隊(duì)列一般也會(huì)結(jié)合mutex + condition使用,如果數(shù)據(jù)量很小,比如一秒鐘幾百個(gè)、幾千個(gè)消息,那就會(huì)有很多時(shí)間是沒有消息需要處理的,消費(fèi)線程就會(huì)休眠,等待喚醒;所以對(duì)于消息量很小的情況,無鎖隊(duì)列的吞吐量并不會(huì)有很大的提升,沒有必要使用無鎖隊(duì)列。
無鎖隊(duì)列的實(shí)現(xiàn),主要分為兩類:
- 鏈表實(shí)現(xiàn);
- 數(shù)組實(shí)現(xiàn)。
鏈表實(shí)現(xiàn)有一個(gè)問題,就是會(huì)頻繁的從堆上申請(qǐng)內(nèi)存,所以效率也不會(huì)很高。
對(duì)于一寫一讀場(chǎng)景下,各種消息隊(duì)列的測(cè)試結(jié)果對(duì)比:
zmq無鎖隊(duì)列的實(shí)現(xiàn)原理
zmq中實(shí)現(xiàn)了一個(gè)無鎖隊(duì)列,這個(gè)無鎖隊(duì)列只支持單寫單讀的場(chǎng)景。zmq的無鎖隊(duì)列是十分高效的,號(hào)稱全世界最快的無鎖隊(duì)列。它的設(shè)計(jì)是非常優(yōu)秀的,有很多設(shè)計(jì)是值得借鑒的。我們可以直接把它用到項(xiàng)目中去,zmq只用了不到600行代碼就實(shí)現(xiàn)了無鎖隊(duì)列。
zmq的無鎖隊(duì)列主要由yqueue和ypipe組成。yqueue負(fù)責(zé)隊(duì)列的數(shù)據(jù)組織,ypipe負(fù)責(zé)隊(duì)列的操作。
原子操作函數(shù)
無鎖隊(duì)列的實(shí)現(xiàn),一定是基于原子操作的。
zmq無鎖隊(duì)列使用如下原子操作函數(shù)
template class atomic_ptr_t
{
public:
inline void set (T *ptr_); //非原子操作
inline T *xchg (T *val_); //原子操做,設(shè)置新值,返回舊值
inline T *cas (T *cmp_, T *val_);//原子操作
private:
volatile T *ptr;
}
set函數(shù),把私有成員ptr指針設(shè)置成參數(shù)ptr_的值,不是一個(gè)原子操作,需要使用者確保執(zhí)行set過程沒有其他線程使用ptr的值。
xchg函數(shù),把私有成員ptr指針設(shè)置成參數(shù)val_的值,并返回ptr設(shè)置之前的值。原子操作,線程安全。
cas函數(shù),原子操作,線程安全,把私有成員ptr指針與參數(shù)cmp_指針比較:
如果相等返回ptr設(shè)置之前的值,并把ptr更新為參數(shù)val_的值;
如果不相等直接返回ptr值。
chunk機(jī)制
每次分配可以存放N個(gè)元素的大塊內(nèi)存,減少內(nèi)存的分配和釋放。N值還有元素的類型,是可以根據(jù)自己的需要進(jìn)行設(shè)置的。N不能太小,如果太小,就退化成鏈表方式了,就會(huì)有內(nèi)存頻分的申請(qǐng)和釋放的問題。
// 鏈表結(jié)點(diǎn)稱之為chunk_t
struct chunk_t
{
T values[N]; //每個(gè)chunk_t可以容納N個(gè)T類型的元素,以后就以一個(gè)chunk_t為單位申請(qǐng)內(nèi)存
chunk_t *prev;
chunk_t *next;
};
當(dāng)隊(duì)列空間不足時(shí)每次分配一個(gè)chunk_t,每個(gè)chunk_t能存儲(chǔ)N個(gè)元素。
當(dāng)一個(gè)chunk中的元素都出隊(duì)后,回收的chunk也不是馬上釋放,而是根據(jù)局部性原理先回收到spare_chunk里面,當(dāng)再次需要分配chunk_t的時(shí)候從spare_chunk中獲取。spare_chunk只保存一個(gè)chunk,即只保存最新的要回收的chunk;如果spare_chunk現(xiàn)在保存了一個(gè)chunk A,如果現(xiàn)在有一個(gè)更新的chunk B需要回收,那么spare_chunk會(huì)更新為chunk B,chunk A會(huì)被釋放;這個(gè)操作是通過cas完成的。
批量寫入
支持批量寫入,批量能夠提高吞吐量。
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();
// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 記錄要刷新的位置
// printf("1 f:%p, w:%pn", f, w);
}
else
{
// printf("0 f:%p, w:%pn", f, w);
}
}
通過第二個(gè)參數(shù)incomplete_來判斷write是否結(jié)束。
write(b, true);
write(c, false);
flush();
flush后才更新到讀端。
怎樣喚醒讀端?
讀端沒有數(shù)據(jù)可讀,這個(gè)時(shí)候應(yīng)該怎么辦?
使用mutex + condition進(jìn)行wait,休眠;
寫端怎么喚醒讀端去讀取數(shù)據(jù)呢?
很多消息隊(duì)列,都是每次有消息,都進(jìn)行notify。如果發(fā)送端每發(fā)送一個(gè)消息都notify,性能會(huì)下降。調(diào)用notify,涉及到線程切換,內(nèi)核態(tài)與用戶態(tài)切換,會(huì)影響性能;檢測(cè)到讀端處于阻塞狀態(tài),在notify,效率才高。
zmq的無鎖隊(duì)列寫端只有在讀端處于休眠狀態(tài)的時(shí)候才會(huì)發(fā)送notify,是不是很厲害的樣子?寫端是怎么檢測(cè)到讀端處于休眠狀態(tài)的呢?
寫端在進(jìn)行flush的時(shí)候,如果返回false,說明讀端處于等待喚醒的狀態(tài),就可以進(jìn)行notify。
condition wait和notify,都需要由應(yīng)用程序自己去做。
我們修改代碼,將寫端修改為每次flush都notify;經(jīng)過測(cè)試,性能是會(huì)明顯下降的。
寫端為什么可以檢測(cè)到讀端的狀態(tài)的?
c值是唯一一個(gè)讀端和寫端都要設(shè)置的值,通過對(duì)c值進(jìn)行cas操作,寫端就可以判斷讀端是否處于等待喚醒的狀態(tài)。
yqueue的實(shí)現(xiàn)
yqueue主要負(fù)責(zé)隊(duì)列的數(shù)據(jù)組織,通過chunk機(jī)制進(jìn)行管理。
class yqueue_t
{
public:
inline yqueue_t();
inline ~yqueue_t();
inline T &front();
inline T &back();
inline void push();
inline void unpush();
inline void pop();
private:
struct chunk_t
{
T values[N];
chunk_t *prev;
chunk_t *next;
};
chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos;
atomic_ptr_t spare_chunk; // 空閑塊(把所有元素都已經(jīng)出隊(duì)的塊稱為空閑塊),讀寫線程的共享變量
// 操作的時(shí)候使用xchg原子操作
// Disable copying of yqueue.
yqueue_t(const yqueue_t &);
const yqueue_t &operator=(const yqueue_t &);
};
數(shù)據(jù)的組織
chunk是通過鏈表進(jìn)行組織的;
yqueue_t內(nèi)部有三個(gè)chunk_t類型指針以及對(duì)應(yīng)的索引位置:
begin_chunk/begin_pos:begin_chunk指向第一個(gè)的chunk;begin_pos是隊(duì)列第一個(gè)元素在當(dāng)前chunk中的位置;
back_chunk/back_pos:back_chunk指向隊(duì)列尾所在的chunk;back_pos是隊(duì)列最后一個(gè)元素在當(dāng)前chunk的位置;
end_chunk/end_pos: end_chunk指向最后一個(gè)chunk;end_chunk和back_chunk大部分情況是一致的;end_pos 大部分情況是 back_pos + 1; end_pos主要是用來判斷是否要分配新的chunk。
上圖中:
由于back_pos已經(jīng)指向了back_chunk的最后一個(gè)元素,所以end_pos就指向了end_chunk的第一個(gè)元素。
back、push函數(shù)
back函數(shù)返回隊(duì)列尾部元素的引用;
// If the queue is empty, behaviour is undefined.
// 返回隊(duì)列尾部元素的引用,調(diào)用者可以通過該引用更新元素,結(jié)合push實(shí)現(xiàn)插入操作。
// 如果隊(duì)列為空,該函數(shù)是不允許被調(diào)用。
inline T &back() // 返回的是引用,是個(gè)左值,調(diào)用者可以通過其修改容器的值
{
return back_chunk->values[back_pos];
}
push函數(shù),更新back_chunk、back_pos的值,并且判斷是否需要新的chunk;如果需要新的chunk,先看spare_chunk是否為空: 如果spare_chunk有值,則將spare_chunk作為end_chunk; 否則新malloc一個(gè)chunk。
inline void push()
{
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos!=N表明這個(gè)chunk節(jié)點(diǎn)還沒有滿
return;
chunk_t *sc = spare_chunk.xchg(NULL); // 為什么設(shè)置為NULL?因?yàn)槿绻阎爸等〕鰜砹藙t沒有spare chunk了,所以設(shè)置為NULL
if (sc) // 如果有spare chunk則繼續(xù)復(fù)用它
{
end_chunk->next = sc;
sc->prev = end_chunk;
}
else // 沒有則重新分配
{
// static int s_cout = 0;
// printf("s_cout:%dn", ++s_cout);
end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一個(gè)chunk
alloc_assert(end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
可以使用back和push函數(shù)向隊(duì)列中插入元素:
queue.back() = value_;
queue.push();
front、pop函數(shù)
front函數(shù)返回隊(duì)列頭部元素的引用。
// If the queue is empty, behaviour is undefined.
// 返回隊(duì)列頭部元素的引用,調(diào)用者可以通過該引用更新元素,結(jié)合pop實(shí)現(xiàn)出隊(duì)列操作。
inline T &front() // 返回的是引用,是個(gè)左值,調(diào)用者可以通過其修改容器的值
{
return begin_chunk->values[begin_pos];
}
pop函數(shù),主要更新begin_pos;如果begin_pos == N,則回收chunk;將chunk保存到spare_chunk中。
inline void pop()
{
if (++begin_pos == N) // 刪除滿一個(gè)chunk才回收chunk
{
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,總是保存最新的空閑塊而釋放先前的空閑快
free(cs);
}
}
可以使用front和pop函數(shù)進(jìn)行出隊(duì)列操作
// Return it to the caller.
*value_ = queue.front();
queue.pop();
這里有兩點(diǎn)需要注意:
- pop掉的元素,其銷毀工作交給調(diào)用者完成,即是pop前調(diào)用者需要通過front()接口讀取并進(jìn)行銷毀(比如動(dòng)態(tài)分配的對(duì)象);
- 空閑塊的保存,要求是原子操作;因?yàn)殚e塊是讀寫線程的共享變量,因?yàn)樵趐ush中也使用了spare_chunk。
ypipe的實(shí)現(xiàn)
ypipe_t在yqueue_t的基礎(chǔ)上構(gòu)建一個(gè)單寫單讀的無鎖隊(duì)列。
class ypipe_t
{
public:
// Initialises the pipe.
inline ypipe_t();
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_t();
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_);
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_);
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。
// 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;
// 反悔機(jī)制 unwrite
inline bool flush();
// Check whether item is available for reading.
// 這里面有兩個(gè)點(diǎn),一個(gè)是檢查是否有數(shù)據(jù)可讀,一個(gè)是預(yù)取
inline bool check_read();
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_);
protected:
// Allocation-efficient queue to store pipe items.
// Front of the queue points to the first prefetched item, back of
// the pipe points to last un-flushed item. Front is used only by
// reader thread, while back is used only by writer thread.
yqueue_t queue;
// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w; //指向第一個(gè)未刷新的元素,只被寫線程使用
// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r; //指向第一個(gè)還沒預(yù)提取的元素,只被讀線程使用;代表可以讀取到哪個(gè)位置的元素
// Points to the first item to be flushed in the future.
T *f; //指向下一輪要被刷新的一批元素中的第一個(gè)
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t c; //讀寫線程共享的指針,指向每一輪刷新的起點(diǎn)。當(dāng)c為空時(shí),表示讀線程睡眠(只會(huì)在讀線程中被設(shè)置為空)
// Disable copying of ypipe object.
ypipe_t(const ypipe_t &);
const ypipe_t &operator=(const ypipe_t &);
},>
核心思想是通過w、r、f指針,通過對(duì)c值的cas操作,解決讀寫線程的數(shù)據(jù)競(jìng)爭(zhēng)問題。
write
將一個(gè)元素入隊(duì)列;incomplete_表示write是否結(jié)束,如果是flase,將f設(shè)置為queue.back()。write最終只是更新了f值。
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();
// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 記錄要刷新的位置
}
else
{
}
}
flush
主要是將w更新到f的位置,說明已經(jīng)寫到的位置。
通過cas操作,嘗試將c值設(shè)置為f。通過flush的返回值,可以判斷讀端是否處于等待喚醒的狀態(tài)。
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。
// 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;
// 反悔機(jī)制 unwrite
inline bool flush()
{
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是還沒有新元素加入
return true;
// Try to set 'c' to 'f'.
// read時(shí)如果沒有數(shù)據(jù)可以讀取則c的值會(huì)被置為NULL
if (c.cas(w, f) != w) // 嘗試將c設(shè)置為f,即是準(zhǔn)備更新w的位置
{
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新為新的f位置
w = f;
return false; //線程看到flush返回false之后會(huì)發(fā)送一個(gè)消息給讀線程,這需要寫業(yè)務(wù)去做處理
}
else // 讀端還有數(shù)據(jù)可讀取
{
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新f的位置
return true;
}
}
check_read
是一種預(yù)讀的機(jī)制,檢查是否有數(shù)據(jù)可讀;通過對(duì)c值的cas操作,來更新r值;r就是可以讀取到的位置。
c值如果和&queue.front()相等,標(biāo)志沒有數(shù)據(jù)可讀,將c值設(shè)置為NULL;寫端就可以通過c值判斷出讀端的狀態(tài)。
// 這里面有兩個(gè)點(diǎn),一個(gè)是檢查是否有數(shù)據(jù)可讀,一個(gè)是預(yù)取
inline bool check_read()
{
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判斷是否在前幾次調(diào)用read函數(shù)時(shí)已經(jīng)預(yù)取數(shù)據(jù)了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// 兩種情況
// 1. 如果c值和queue.front()相等, 返回c值并將c值置為NULL,此時(shí)沒有數(shù)據(jù)可讀
// 2. 如果c值和queue.front()不相等, 返回c值,此時(shí)可能有數(shù)據(jù)讀取
r = c.cas(&queue.front(), NULL); //嘗試預(yù)取數(shù)據(jù),r代表可以讀取到哪個(gè)位置的元素
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items are being deallocated.
if (&queue.front() == r || !r) //判斷是否成功預(yù)取數(shù)據(jù)
return false;
// There was at least one value prefetched.
return true;
}
基于環(huán)形數(shù)組的無鎖隊(duì)列
基于環(huán)形數(shù)組的無鎖隊(duì)列,也是利用cas操作解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題;它支持多謝多讀。
class ArrayLockFreeQueue
{
public:
ArrayLockFreeQueue();
virtual ~ArrayLockFreeQueue();
QUEUE_INT size();
bool enqueue(const ELEM_T &a_data);
bool dequeue(ELEM_T &a_data);
bool try_dequeue(ELEM_T &a_data);
private:
ELEM_T m_thequeue[Q_SIZE];
volatile QUEUE_INT m_count;
volatile QUEUE_INT m_writeIndex;
volatile QUEUE_INT m_readIndex;
volatile QUEUE_INT m_maximumReadIndex;
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};
關(guān)鍵是對(duì)于三種下標(biāo)的操作:
- m_writeIndex;//新元素入列時(shí)存放位置在數(shù)組中的下標(biāo)
- m_readIndex;/ 下一個(gè)出列的元素在數(shù)組中的下標(biāo)
- m_maximumReadIndex; //最后一個(gè)已經(jīng)完成入列操作的元素在數(shù)組中的下標(biāo), 即可以讀到的最大索引。
通過對(duì)這3個(gè)下標(biāo)的cas操作,解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題。
enqueue
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊(duì)列已經(jīng)滿了
}
// 目的是為了獲取一個(gè)能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex是一個(gè)臨時(shí)變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置
// 2. 更新可讀的位置,按著currentWriteIndex + 1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}
AtomicAdd(&m_count, 1);
return true;
},>
首先判斷隊(duì)列是否已滿:(m_writeIndex + 1) %/Q_SIZE == m_readIndex;如果隊(duì)列已滿,則返回false。
enqueu的核心思想是預(yù)先占用一個(gè)可寫的位置,保證同一個(gè)位置只有一個(gè)線程會(huì)進(jìn)行寫操作;并且保證先獲取到位置的線程,先操作,保證了操作的順序性。這兩個(gè)都是通過cas操作保證的。
dequeue
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)];
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù)
return true;
}
} while(true);
assert(0);
// Add this return statement to avoid compiler warnings
return false;
},>
判斷隊(duì)列是否有數(shù)據(jù)可讀:m_readIndex == m_maximumReadIndex;
通過cas操作保證同一個(gè)位置,只有一個(gè)線程讀取。
多寫多讀測(cè)試結(jié)果
從測(cè)試結(jié)果可以看出,這個(gè)基于環(huán)形數(shù)組的隊(duì)列,比較適合1寫多讀的場(chǎng)景,性能會(huì)有很大的提升。
總結(jié)
本文主要介紹了以下內(nèi)容:
- 無鎖隊(duì)列所解決的問題;
- 無鎖隊(duì)列都是利用了cas操作,來解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題;因?yàn)閏as操作的粒度要比mutex,spinlock要小很多;
- zmq無鎖隊(duì)列實(shí)現(xiàn)原理,包括chunk機(jī)制、批量寫入、怎樣喚醒讀端等;yqueue、ypipe的具體的實(shí)現(xiàn),預(yù)讀機(jī)制、寫端如何檢測(cè)到讀端的狀態(tài)等;它只支持單寫單讀的場(chǎng)景;
- 基于環(huán)形數(shù)組的無鎖隊(duì)列,它支持多寫多讀的場(chǎng)景;對(duì)于1寫多讀的場(chǎng)景,性能有很大提升;它是如何解決多線程競(jìng)爭(zhēng)問題的。
-
cpu
+關(guān)注
關(guān)注
68文章
10804瀏覽量
210841 -
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
6808瀏覽量
88743 -
內(nèi)存
+關(guān)注
關(guān)注
8文章
2966瀏覽量
73814 -
線程
+關(guān)注
關(guān)注
0文章
503瀏覽量
19636
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論