0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

無鎖隊(duì)列解決的問題

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-10 15:33 ? 次閱讀

為什么需要無鎖隊(duì)列

無鎖隊(duì)列解決了什么問題?無鎖隊(duì)列解決了鎖引起的問題。

cache失效

當(dāng)CPU要訪問主存的時(shí)候,這些數(shù)據(jù)首先要被copy到cache中,因?yàn)檫@些數(shù)據(jù)在不久的將來可能又會(huì)被處理器訪問;CPU訪問cache的速度要比訪問內(nèi)存要快的多;由于線程頻繁切換,會(huì)造成cache失效,將導(dǎo)致應(yīng)用程序性能下降。

阻塞引起的CPU浪費(fèi)

mutex是阻塞的,在一個(gè)負(fù)載較重的應(yīng)用程序中使用阻塞隊(duì)列來在線程之間傳遞消息,會(huì)導(dǎo)致頻繁的線程切換,大量的時(shí)間將被浪費(fèi)在獲取mutex,而不是處理任務(wù)上。

這就需要非阻塞來解決問題。任務(wù)之間不爭(zhēng)搶任何資源,在隊(duì)列中預(yù)定一個(gè)位置,然后在這個(gè)位置上插入或提取數(shù)據(jù)。這種機(jī)制使用了cas(compare and swap)的操作,它是一個(gè)原子操作,需要CPU指令支持。它的思想是先比較,再賦值。具體操作如下:它需要3個(gè)操作數(shù),m,A, B,其中m是一個(gè)內(nèi)存地址,將m指向的內(nèi)存中的數(shù)據(jù)與A比較,如果相等則將B寫入到m指向的內(nèi)存并返回true,如果不相等則什么也不做返回false。

cas語義如下

Compare And Swap
if (a == b) {
a = c;
}

cmpxchg(a, b, c)

bool CAS( int * pAddr, int nExpected, int nNew )
atomically {
if ( *pAddr == nExpected ) {
*pAddr = nNew ;
return true ;
}
return false ;
}

內(nèi)存的頻繁申請(qǐng)和釋放

當(dāng)一個(gè)任務(wù)從堆中分配內(nèi)存時(shí),標(biāo)準(zhǔn)的內(nèi)存分配機(jī)制會(huì)阻塞所有與這個(gè)任務(wù)共享地址空間的其它任務(wù)(進(jìn)程中的所有線程)。malloc本身也是加鎖的,保證線程安全。這樣也會(huì)造成線程之間的競(jìng)爭(zhēng)。標(biāo)準(zhǔn)隊(duì)列插入數(shù)據(jù)的時(shí)候,都回導(dǎo)致堆上的動(dòng)態(tài)內(nèi)存分配,會(huì)導(dǎo)致應(yīng)用程序性能下降。

小結(jié)

  • cache失效
  • 阻塞引起的CPU浪費(fèi)
  • 內(nèi)存的頻繁申請(qǐng)和釋放

這3個(gè)問題,本質(zhì)上都是由于線程切換帶來的問題。無鎖隊(duì)列就是從這幾個(gè)方面解決問題。

無鎖隊(duì)列使用場(chǎng)景

無鎖隊(duì)列適用于隊(duì)列push、pop非常頻繁的場(chǎng)景,效率要比mutex高很多; 比如,股票行情,1秒鐘至少幾十萬數(shù)據(jù)量。

無鎖隊(duì)列一般也會(huì)結(jié)合mutex + condition使用,如果數(shù)據(jù)量很小,比如一秒鐘幾百個(gè)、幾千個(gè)消息,那就會(huì)有很多時(shí)間是沒有消息需要處理的,消費(fèi)線程就會(huì)休眠,等待喚醒;所以對(duì)于消息量很小的情況,無鎖隊(duì)列的吞吐量并不會(huì)有很大的提升,沒有必要使用無鎖隊(duì)列。

無鎖隊(duì)列的實(shí)現(xiàn),主要分為兩類:

  1. 鏈表實(shí)現(xiàn);
  2. 數(shù)組實(shí)現(xiàn)。

鏈表實(shí)現(xiàn)有一個(gè)問題,就是會(huì)頻繁的從堆上申請(qǐng)內(nèi)存,所以效率也不會(huì)很高。

對(duì)于一寫一讀場(chǎng)景下,各種消息隊(duì)列的測(cè)試結(jié)果對(duì)比:

圖片

zmq無鎖隊(duì)列的實(shí)現(xiàn)原理

zmq中實(shí)現(xiàn)了一個(gè)無鎖隊(duì)列,這個(gè)無鎖隊(duì)列只支持單寫單讀的場(chǎng)景。zmq的無鎖隊(duì)列是十分高效的,號(hào)稱全世界最快的無鎖隊(duì)列。它的設(shè)計(jì)是非常優(yōu)秀的,有很多設(shè)計(jì)是值得借鑒的。我們可以直接把它用到項(xiàng)目中去,zmq只用了不到600行代碼就實(shí)現(xiàn)了無鎖隊(duì)列。

zmq的無鎖隊(duì)列主要由yqueue和ypipe組成。yqueue負(fù)責(zé)隊(duì)列的數(shù)據(jù)組織,ypipe負(fù)責(zé)隊(duì)列的操作。

原子操作函數(shù)

無鎖隊(duì)列的實(shí)現(xiàn),一定是基于原子操作的。

zmq無鎖隊(duì)列使用如下原子操作函數(shù)

// This class encapsulates several atomic operations on pointers.
template class atomic_ptr_t
{
public:
inline void set (T *ptr_); //非原子操作
inline T *xchg (T *val_); //原子操做,設(shè)置新值,返回舊值
inline T *cas (T *cmp_, T *val_);//原子操作
private:
volatile T *ptr;
}

set函數(shù),把私有成員ptr指針設(shè)置成參數(shù)ptr_的值,不是一個(gè)原子操作,需要使用者確保執(zhí)行set過程沒有其他線程使用ptr的值。

xchg函數(shù),把私有成員ptr指針設(shè)置成參數(shù)val_的值,并返回ptr設(shè)置之前的值。原子操作,線程安全。

cas函數(shù),原子操作,線程安全,把私有成員ptr指針與參數(shù)cmp_指針比較:

如果相等返回ptr設(shè)置之前的值,并把ptr更新為參數(shù)val_的值;

如果不相等直接返回ptr值。

chunk機(jī)制

每次分配可以存放N個(gè)元素的大塊內(nèi)存,減少內(nèi)存的分配和釋放。N值還有元素的類型,是可以根據(jù)自己的需要進(jìn)行設(shè)置的。N不能太小,如果太小,就退化成鏈表方式了,就會(huì)有內(nèi)存頻分的申請(qǐng)和釋放的問題。

// Individual memory chunk to hold N elements.
// 鏈表結(jié)點(diǎn)稱之為chunk_t
struct chunk_t
{
T values[N]; //每個(gè)chunk_t可以容納N個(gè)T類型的元素,以后就以一個(gè)chunk_t為單位申請(qǐng)內(nèi)存
chunk_t *prev;
chunk_t *next;
};

圖片

當(dāng)隊(duì)列空間不足時(shí)每次分配一個(gè)chunk_t,每個(gè)chunk_t能存儲(chǔ)N個(gè)元素。

當(dāng)一個(gè)chunk中的元素都出隊(duì)后,回收的chunk也不是馬上釋放,而是根據(jù)局部性原理先回收到spare_chunk里面,當(dāng)再次需要分配chunk_t的時(shí)候從spare_chunk中獲取。spare_chunk只保存一個(gè)chunk,即只保存最新的要回收的chunk;如果spare_chunk現(xiàn)在保存了一個(gè)chunk A,如果現(xiàn)在有一個(gè)更新的chunk B需要回收,那么spare_chunk會(huì)更新為chunk B,chunk A會(huì)被釋放;這個(gè)操作是通過cas完成的。

批量寫入

支持批量寫入,批量能夠提高吞吐量。

// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();

// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 記錄要刷新的位置
// printf("1 f:%p, w:%pn", f, w);
}
else
{
// printf("0 f:%p, w:%pn", f, w);
}
}

通過第二個(gè)參數(shù)incomplete_來判斷write是否結(jié)束。

write(a, true);
write(b, true);
write(c, false);
flush();

flush后才更新到讀端。

怎樣喚醒讀端?

讀端沒有數(shù)據(jù)可讀,這個(gè)時(shí)候應(yīng)該怎么辦?

使用mutex + condition進(jìn)行wait,休眠;

寫端怎么喚醒讀端去讀取數(shù)據(jù)呢?

很多消息隊(duì)列,都是每次有消息,都進(jìn)行notify。如果發(fā)送端每發(fā)送一個(gè)消息都notify,性能會(huì)下降。調(diào)用notify,涉及到線程切換,內(nèi)核態(tài)與用戶態(tài)切換,會(huì)影響性能;檢測(cè)到讀端處于阻塞狀態(tài),在notify,效率才高。

zmq的無鎖隊(duì)列寫端只有在讀端處于休眠狀態(tài)的時(shí)候才會(huì)發(fā)送notify,是不是很厲害的樣子?寫端是怎么檢測(cè)到讀端處于休眠狀態(tài)的呢?

寫端在進(jìn)行flush的時(shí)候,如果返回false,說明讀端處于等待喚醒的狀態(tài),就可以進(jìn)行notify。

圖片

condition wait和notify,都需要由應(yīng)用程序自己去做。

我們修改代碼,將寫端修改為每次flush都notify;經(jīng)過測(cè)試,性能是會(huì)明顯下降的。

圖片

寫端為什么可以檢測(cè)到讀端的狀態(tài)的?

c值是唯一一個(gè)讀端和寫端都要設(shè)置的值,通過對(duì)c值進(jìn)行cas操作,寫端就可以判斷讀端是否處于等待喚醒的狀態(tài)。

圖片

圖片

yqueue的實(shí)現(xiàn)

yqueue主要負(fù)責(zé)隊(duì)列的數(shù)據(jù)組織,通過chunk機(jī)制進(jìn)行管理。

template
class yqueue_t
{
public:

inline yqueue_t();
inline ~yqueue_t();

inline T &front();
inline T &back();

inline void push();
inline void unpush();

inline void pop();

private:
struct chunk_t
{
T values[N];
chunk_t *prev;
chunk_t *next;
};

chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos;

atomic_ptr_t spare_chunk; // 空閑塊(把所有元素都已經(jīng)出隊(duì)的塊稱為空閑塊),讀寫線程的共享變量
// 操作的時(shí)候使用xchg原子操作
// Disable copying of yqueue.
yqueue_t(const yqueue_t &);
const yqueue_t &operator=(const yqueue_t &);
};

數(shù)據(jù)的組織

chunk是通過鏈表進(jìn)行組織的;

yqueue_t內(nèi)部有三個(gè)chunk_t類型指針以及對(duì)應(yīng)的索引位置:

begin_chunk/begin_pos:begin_chunk指向第一個(gè)的chunk;begin_pos是隊(duì)列第一個(gè)元素在當(dāng)前chunk中的位置;

back_chunk/back_pos:back_chunk指向隊(duì)列尾所在的chunk;back_pos是隊(duì)列最后一個(gè)元素在當(dāng)前chunk的位置;

end_chunk/end_pos: end_chunk指向最后一個(gè)chunk;end_chunk和back_chunk大部分情況是一致的;end_pos 大部分情況是 back_pos + 1; end_pos主要是用來判斷是否要分配新的chunk。

圖片

上圖中:

由于back_pos已經(jīng)指向了back_chunk的最后一個(gè)元素,所以end_pos就指向了end_chunk的第一個(gè)元素。

back、push函數(shù)

back函數(shù)返回隊(duì)列尾部元素的引用;

// Returns reference to the back element of the queue.
// If the queue is empty, behaviour is undefined.
// 返回隊(duì)列尾部元素的引用,調(diào)用者可以通過該引用更新元素,結(jié)合push實(shí)現(xiàn)插入操作。
// 如果隊(duì)列為空,該函數(shù)是不允許被調(diào)用。
inline T &back() // 返回的是引用,是個(gè)左值,調(diào)用者可以通過其修改容器的值
{
return back_chunk->values[back_pos];
}

push函數(shù),更新back_chunk、back_pos的值,并且判斷是否需要新的chunk;如果需要新的chunk,先看spare_chunk是否為空: 如果spare_chunk有值,則將spare_chunk作為end_chunk; 否則新malloc一個(gè)chunk。

// Adds an element to the back end of the queue.
inline void push()
{
back_chunk = end_chunk;
back_pos = end_pos; //

if (++end_pos != N) //end_pos!=N表明這個(gè)chunk節(jié)點(diǎn)還沒有滿
return;

chunk_t *sc = spare_chunk.xchg(NULL); // 為什么設(shè)置為NULL?因?yàn)槿绻阎爸等〕鰜砹藙t沒有spare chunk了,所以設(shè)置為NULL
if (sc) // 如果有spare chunk則繼續(xù)復(fù)用它
{
end_chunk->next = sc;
sc->prev = end_chunk;
}
else // 沒有則重新分配
{
// static int s_cout = 0;
// printf("s_cout:%dn", ++s_cout);
end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一個(gè)chunk
alloc_assert(end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}

可以使用back和push函數(shù)向隊(duì)列中插入元素:

// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();

front、pop函數(shù)

front函數(shù)返回隊(duì)列頭部元素的引用。

// Returns reference to the front element of the queue.
// If the queue is empty, behaviour is undefined.
// 返回隊(duì)列頭部元素的引用,調(diào)用者可以通過該引用更新元素,結(jié)合pop實(shí)現(xiàn)出隊(duì)列操作。
inline T &front() // 返回的是引用,是個(gè)左值,調(diào)用者可以通過其修改容器的值
{
return begin_chunk->values[begin_pos];
}

pop函數(shù),主要更新begin_pos;如果begin_pos == N,則回收chunk;將chunk保存到spare_chunk中。

// Removes an element from the front end of the queue.
inline void pop()
{
if (++begin_pos == N) // 刪除滿一個(gè)chunk才回收chunk
{
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;

// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,總是保存最新的空閑塊而釋放先前的空閑快
free(cs);
}
}

可以使用front和pop函數(shù)進(jìn)行出隊(duì)列操作

// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
queue.pop();

這里有兩點(diǎn)需要注意:

  1. pop掉的元素,其銷毀工作交給調(diào)用者完成,即是pop前調(diào)用者需要通過front()接口讀取并進(jìn)行銷毀(比如動(dòng)態(tài)分配的對(duì)象);
  2. 空閑塊的保存,要求是原子操作;因?yàn)殚e塊是讀寫線程的共享變量,因?yàn)樵趐ush中也使用了spare_chunk。

ypipe的實(shí)現(xiàn)

ypipe_t在yqueue_t的基礎(chǔ)上構(gòu)建一個(gè)單寫單讀的無鎖隊(duì)列。

template
class ypipe_t
{
public:
// Initialises the pipe.
inline ypipe_t();
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_t();

// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_);

// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_);

// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。
// 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;
// 反悔機(jī)制 unwrite
inline bool flush();

// Check whether item is available for reading.
// 這里面有兩個(gè)點(diǎn),一個(gè)是檢查是否有數(shù)據(jù)可讀,一個(gè)是預(yù)取
inline bool check_read();

// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_);

protected:
// Allocation-efficient queue to store pipe items.
// Front of the queue points to the first prefetched item, back of
// the pipe points to last un-flushed item. Front is used only by
// reader thread, while back is used only by writer thread.
yqueue_t queue;

// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w; //指向第一個(gè)未刷新的元素,只被寫線程使用

// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r; //指向第一個(gè)還沒預(yù)提取的元素,只被讀線程使用;代表可以讀取到哪個(gè)位置的元素

// Points to the first item to be flushed in the future.
T *f; //指向下一輪要被刷新的一批元素中的第一個(gè)

// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t c; //讀寫線程共享的指針,指向每一輪刷新的起點(diǎn)。當(dāng)c為空時(shí),表示讀線程睡眠(只會(huì)在讀線程中被設(shè)置為空)

// Disable copying of ypipe object.
ypipe_t(const ypipe_t &);
const ypipe_t &operator=(const ypipe_t &);


},>

核心思想是通過w、r、f指針,通過對(duì)c值的cas操作,解決讀寫線程的數(shù)據(jù)競(jìng)爭(zhēng)問題。

write

將一個(gè)元素入隊(duì)列;incomplete_表示write是否結(jié)束,如果是flase,將f設(shè)置為queue.back()。write最終只是更新了f值。

// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒完成,在沒完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();

// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 記錄要刷新的位置
}
else
{

}
}

flush

主要是將w更新到f的位置,說明已經(jīng)寫到的位置。

通過cas操作,嘗試將c值設(shè)置為f。通過flush的返回值,可以判斷讀端是否處于等待喚醒的狀態(tài)。

// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。
// 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;
// 反悔機(jī)制 unwrite
inline bool flush()
{
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是還沒有新元素加入
return true;

// Try to set 'c' to 'f'.
// read時(shí)如果沒有數(shù)據(jù)可以讀取則c的值會(huì)被置為NULL
if (c.cas(w, f) != w) // 嘗試將c設(shè)置為f,即是準(zhǔn)備更新w的位置
{

// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新為新的f位置
w = f;
return false; //線程看到flush返回false之后會(huì)發(fā)送一個(gè)消息給讀線程,這需要寫業(yè)務(wù)去做處理
}
else // 讀端還有數(shù)據(jù)可讀取
{
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新f的位置
return true;
}
}

check_read

是一種預(yù)讀的機(jī)制,檢查是否有數(shù)據(jù)可讀;通過對(duì)c值的cas操作,來更新r值;r就是可以讀取到的位置。

c值如果和&queue.front()相等,標(biāo)志沒有數(shù)據(jù)可讀,將c值設(shè)置為NULL;寫端就可以通過c值判斷出讀端的狀態(tài)。

// Check whether item is available for reading.
// 這里面有兩個(gè)點(diǎn),一個(gè)是檢查是否有數(shù)據(jù)可讀,一個(gè)是預(yù)取
inline bool check_read()
{
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判斷是否在前幾次調(diào)用read函數(shù)時(shí)已經(jīng)預(yù)取數(shù)據(jù)了return true;
return true;

// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// 兩種情況
// 1. 如果c值和queue.front()相等, 返回c值并將c值置為NULL,此時(shí)沒有數(shù)據(jù)可讀
// 2. 如果c值和queue.front()不相等, 返回c值,此時(shí)可能有數(shù)據(jù)讀取
r = c.cas(&queue.front(), NULL); //嘗試預(yù)取數(shù)據(jù),r代表可以讀取到哪個(gè)位置的元素

// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items are being deallocated.
if (&queue.front() == r || !r) //判斷是否成功預(yù)取數(shù)據(jù)
return false;

// There was at least one value prefetched.
return true;
}

基于環(huán)形數(shù)組的無鎖隊(duì)列

基于環(huán)形數(shù)組的無鎖隊(duì)列,也是利用cas操作解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題;它支持多謝多讀。

template
class ArrayLockFreeQueue
{
public:

ArrayLockFreeQueue();
virtual ~ArrayLockFreeQueue();

QUEUE_INT size();

bool enqueue(const ELEM_T &a_data);

bool dequeue(ELEM_T &a_data);

bool try_dequeue(ELEM_T &a_data);

private:

ELEM_T m_thequeue[Q_SIZE];

volatile QUEUE_INT m_count;
volatile QUEUE_INT m_writeIndex;

volatile QUEUE_INT m_readIndex;

volatile QUEUE_INT m_maximumReadIndex;

inline QUEUE_INT countToIndex(QUEUE_INT a_count);
};

關(guān)鍵是對(duì)于三種下標(biāo)的操作:

  1. m_writeIndex;//新元素入列時(shí)存放位置在數(shù)組中的下標(biāo)
  2. m_readIndex;/ 下一個(gè)出列的元素在數(shù)組中的下標(biāo)
  3. m_maximumReadIndex; //最后一個(gè)已經(jīng)完成入列操作的元素在數(shù)組中的下標(biāo), 即可以讀到的最大索引。

通過對(duì)這3個(gè)下標(biāo)的cas操作,解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題。

圖片

enqueue

template
bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data)
{
QUEUE_INT currentWriteIndex; // 獲取寫指針的位置
QUEUE_INT currentReadIndex;
// 1. 獲取可寫入的位置
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if(countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
return false; // 隊(duì)列已經(jīng)滿了
}
// 目的是為了獲取一個(gè)能寫入的位置
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
// 獲取寫入位置后 currentWriteIndex是一個(gè)臨時(shí)變量,保存我們寫入的位置
// We know now that this index is reserved for us. Use it to save the data
m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置

// 2. 更新可讀的位置,按著currentWriteIndex + 1的操作
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield(); // 當(dāng)線程超過cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。
}

AtomicAdd(&m_count, 1);

return true;

},>

首先判斷隊(duì)列是否已滿:(m_writeIndex + 1) %/Q_SIZE == m_readIndex;如果隊(duì)列已滿,則返回false。

enqueu的核心思想是預(yù)先占用一個(gè)可寫的位置,保證同一個(gè)位置只有一個(gè)線程會(huì)進(jìn)行寫操作;并且保證先獲取到位置的線程,先操作,保證了操作的順序性。這兩個(gè)都是通過cas操作保證的。

dequeue

template
bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data)
{
QUEUE_INT currentMaximumReadIndex;
QUEUE_INT currentReadIndex;

do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;

if(countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_thequeue[countToIndex(currentReadIndex)];

// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù)
return true;
}
} while(true);

assert(0);
// Add this return statement to avoid compiler warnings
return false;

},>

判斷隊(duì)列是否有數(shù)據(jù)可讀:m_readIndex == m_maximumReadIndex;

通過cas操作保證同一個(gè)位置,只有一個(gè)線程讀取。

多寫多讀測(cè)試結(jié)果

圖片

從測(cè)試結(jié)果可以看出,這個(gè)基于環(huán)形數(shù)組的隊(duì)列,比較適合1寫多讀的場(chǎng)景,性能會(huì)有很大的提升。

總結(jié)

本文主要介紹了以下內(nèi)容:

  • 無鎖隊(duì)列所解決的問題;
  • 無鎖隊(duì)列都是利用了cas操作,來解決多線程數(shù)據(jù)競(jìng)爭(zhēng)的問題;因?yàn)閏as操作的粒度要比mutex,spinlock要小很多;
  • zmq無鎖隊(duì)列實(shí)現(xiàn)原理,包括chunk機(jī)制、批量寫入、怎樣喚醒讀端等;yqueue、ypipe的具體的實(shí)現(xiàn),預(yù)讀機(jī)制、寫端如何檢測(cè)到讀端的狀態(tài)等;它只支持單寫單讀的場(chǎng)景;
  • 基于環(huán)形數(shù)組的無鎖隊(duì)列,它支持多寫多讀的場(chǎng)景;對(duì)于1寫多讀的場(chǎng)景,性能有很大提升;它是如何解決多線程競(jìng)爭(zhēng)問題的。
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • cpu
    cpu
    +關(guān)注

    關(guān)注

    68

    文章

    10804

    瀏覽量

    210841
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    6808

    瀏覽量

    88743
  • 內(nèi)存
    +關(guān)注

    關(guān)注

    8

    文章

    2966

    瀏覽量

    73814
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    503

    瀏覽量

    19636
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專業(yè)技術(shù)詞解析 在討論區(qū)里,大家看到:《有版》,《
    發(fā)表于 02-03 11:05 ?942次閱讀

    AWorks軟件設(shè)計(jì),郵箱、消息隊(duì)列和自旋使用方法

    本文介紹了郵箱、消息隊(duì)列和自旋的使用方法。信號(hào)量只能用于任務(wù)間的同步,不能傳遞更多的信息,為此,AWorks提供了郵箱和消息隊(duì)列服務(wù),它們的主要區(qū)別在于支持的消息長度不同,在郵箱中,每條消息的長度固定為4字節(jié),而在消息
    的頭像 發(fā)表于 06-13 09:13 ?1.2w次閱讀
    AWorks軟件設(shè)計(jì),郵箱、消息<b class='flag-5'>隊(duì)列</b>和自旋<b class='flag-5'>鎖</b>使用方法

    由淺入深的一步步迭代出隊(duì)列的實(shí)現(xiàn)原理

    什么是隊(duì)列,顧名思義,就類似于超市面前排起的一個(gè)隊(duì)伍,當(dāng)最前面的顧客買完了東西,后面的顧客整體向前移動(dòng),而處于新隊(duì)頭的顧客繼續(xù)消費(fèi)。如果沒有后來的顧客,那么最終這個(gè)隊(duì)伍將消失。而如果有新的顧客到來,那么他將排在隊(duì)伍最后等待購買。
    的頭像 發(fā)表于 07-09 09:25 ?6741次閱讀
    由淺入深的一步步迭代出<b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>的實(shí)現(xiàn)原理

    測(cè)控軟件研發(fā),助力儀器進(jìn)行遠(yuǎn)程數(shù)據(jù)管理

    不同的測(cè)量?jī)x器定制專業(yè)的測(cè)控軟件,為各種檢測(cè)儀器提供專業(yè)的數(shù)據(jù)監(jiān)控并生成直觀的數(shù)據(jù)報(bào)告。 目前的測(cè)控軟件系統(tǒng)支持各種測(cè)控應(yīng)用場(chǎng)景;跨平臺(tái)部署;自定義UI、拖拽式設(shè)計(jì);支持二次開發(fā),快速實(shí)施;應(yīng)用隊(duì)列、內(nèi)存數(shù)
    的頭像 發(fā)表于 12-03 17:11 ?1547次閱讀

    利用CAS技術(shù)實(shí)現(xiàn)隊(duì)列

    【 導(dǎo)讀 】:本文 主要講解利用CAS技術(shù)實(shí)現(xiàn)隊(duì)列。 關(guān)于隊(duì)列的實(shí)現(xiàn),網(wǎng)上有很多文章,雖
    的頭像 發(fā)表于 01-11 10:52 ?2244次閱讀
    利用CAS技術(shù)實(shí)現(xiàn)<b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>

    關(guān)于CAS等原子操作介紹 隊(duì)列的鏈表實(shí)現(xiàn)方法

    在開始說隊(duì)列之前,我們需要知道一個(gè)很重要的技術(shù)就是CAS操作——Compare & Set,或是 Compare & Swap,現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作
    的頭像 發(fā)表于 05-18 09:12 ?3327次閱讀
    關(guān)于CAS等原子操作介紹 <b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>的鏈表實(shí)現(xiàn)方法

    MPMCQueue有界多生產(chǎn)者多用戶隊(duì)列

    ./oschina_soft/MPMCQueue.zip
    發(fā)表于 06-22 10:23 ?0次下載
    MPMCQueue有界多生產(chǎn)者多用戶<b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>

    一文徹底搞懂內(nèi)存屏障與volatile

    內(nèi)存屏障與 volatile 是高并發(fā)編程中比較常用的兩個(gè)技術(shù),隊(duì)列的時(shí)候就會(huì)用到這兩項(xiàng)技術(shù)。然而這兩項(xiàng)技術(shù)涉及比較廣的基礎(chǔ)知識(shí),所以比較難以理解,也比較不容易解釋清楚。關(guān)于內(nèi)存屏障
    的頭像 發(fā)表于 11-29 11:43 ?2296次閱讀

    怎么設(shè)計(jì)實(shí)現(xiàn)一個(gè)高并發(fā)的環(huán)形連續(xù)內(nèi)存緩沖隊(duì)列

    隊(duì)列是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作,和棧一樣,隊(duì)列是一種操作受限制的線性表。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭
    的頭像 發(fā)表于 02-15 14:59 ?1221次閱讀
    怎么設(shè)計(jì)實(shí)現(xiàn)一個(gè)<b class='flag-5'>無</b><b class='flag-5'>鎖</b>高并發(fā)的環(huán)形連續(xù)內(nèi)存緩沖<b class='flag-5'>隊(duì)列</b>

    發(fā)燒友實(shí)測(cè) | i.MX8MP 編譯DPDK源碼實(shí)現(xiàn)rte_ring環(huán)隊(duì)列進(jìn)程間通信

    作者|donatello1996來源|電子發(fā)燒友題圖|飛凌嵌入式rte_ring是一個(gè)用CAS實(shí)現(xiàn)的FIFO環(huán)形隊(duì)列,支持多消費(fèi)者/生產(chǎn)者同時(shí)出入隊(duì)列,常用于多線程/多進(jìn)程之間的通
    的頭像 發(fā)表于 01-10 16:29 ?1889次閱讀
    發(fā)燒友實(shí)測(cè) | i.MX8MP 編譯DPDK源碼實(shí)現(xiàn)rte_ring<b class='flag-5'>無</b><b class='flag-5'>鎖</b>環(huán)<b class='flag-5'>隊(duì)列</b>進(jìn)程間通信

    源智能的應(yīng)用前景

    源智能是一種結(jié)合機(jī)械與電子特點(diǎn)的智能鎖具。由源智能、智能鑰匙和管理軟件共同組成,
    的頭像 發(fā)表于 09-22 10:18 ?1443次閱讀
    <b class='flag-5'>無</b>源智能<b class='flag-5'>鎖</b>的應(yīng)用前景

    如何實(shí)現(xiàn)一個(gè)多讀多寫的線程安全的隊(duì)列

    在ZMQ隊(duì)列的原理與實(shí)現(xiàn)一文中,我們已經(jīng)知道了ypipe可以實(shí)現(xiàn)一線程寫一線程讀的隊(duì)列,
    的頭像 發(fā)表于 11-08 15:25 ?1138次閱讀
    如何實(shí)現(xiàn)一個(gè)多讀多寫的線程安全的<b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>

    隊(duì)列的潛在優(yōu)勢(shì)

    隊(duì)列 先大致介紹一下隊(duì)列
    的頭像 發(fā)表于 11-09 09:23 ?530次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b><b class='flag-5'>隊(duì)列</b>的潛在優(yōu)勢(shì)

    CAS如何實(shí)現(xiàn)各種的數(shù)據(jù)結(jié)構(gòu)

    關(guān)于CAS等原子操作 在開始說隊(duì)列之前,我們需要知道一個(gè)很重要的技術(shù)就是CAS操作——Compare Swap,現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作,X86下對(duì)應(yīng)的是 CMPXCHG
    的頭像 發(fā)表于 11-10 11:00 ?501次閱讀
    CAS如何實(shí)現(xiàn)各種<b class='flag-5'>無</b><b class='flag-5'>鎖</b>的數(shù)據(jù)結(jié)構(gòu)

    CAS如何實(shí)現(xiàn)各種的數(shù)據(jù)結(jié)構(gòu)

    ,可用于在多線程編程中實(shí)現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時(shí)改寫某?數(shù)據(jù)時(shí)由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)?的數(shù)據(jù)不一致問題 有了CAS,我們就可以用它來實(shí)現(xiàn)各種(lock free)的數(shù)據(jù)結(jié)構(gòu) 實(shí)現(xiàn)原理 該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進(jìn)行比較,
    的頭像 發(fā)表于 11-13 15:38 ?728次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>CAS如何實(shí)現(xiàn)各種<b class='flag-5'>無</b><b class='flag-5'>鎖</b>的數(shù)據(jù)結(jié)構(gòu)