0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

協(xié)程的作用、結(jié)構(gòu)及原理

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-08 16:39 ? 次閱讀

本文介紹了協(xié)程的作用、結(jié)構(gòu)、原理,并使用C++匯編實現(xiàn)了64位系統(tǒng)下的協(xié)程池。文章內(nèi)容避免了協(xié)程晦澀難懂的部分,用大量圖文來分析原理,適合新手閱讀學(xué)習(xí)。

GitHub源碼

1. Web服務(wù)器問題

現(xiàn)代分布式Web后臺服務(wù)邏輯通常由一系列RPC請求組成,若串行則耗時比較長。

圖片

此時一般都會使用線程池并行運行RPC請求,如圖中GetData函數(shù)

圖片

假設(shè)請求數(shù)據(jù)包不大,那么可假設(shè)GetData耗時組成如下圖所示。在非阻塞讀情況下,CPU將在Wait環(huán)節(jié)空轉(zhuǎn)浪費資源(不斷地read,得到返回碼-1)。

圖片

  1. 協(xié)程的引入

有沒有辦法只用一個線程并行執(zhí)行GetData呢?答案是:可以!我們假設(shè)有3個并行的GetData任務(wù),下圖線程1通過跳轉(zhuǎn)控制流,減少CPU資源浪費。執(zhí)行流為①⑦,在Wait階段則跳到其他任務(wù)如①⑤。運行結(jié)束后也跳到其他任務(wù)如⑥~⑦。通過這種方式,3個GetData能用一個線程以52ms的耗時并行執(zhí)行。

圖片

如果GetData任務(wù)可以被這樣分配,則可以減少線程切換的消耗。因為協(xié)程的調(diào)度是線程內(nèi)用戶態(tài)執(zhí)行的,CPU消耗非常小。

圖片

  1. 協(xié)程的原理

**從上文可知,協(xié)程之間的切換本質(zhì)是函數(shù)的跳轉(zhuǎn),即如何讓正在執(zhí)行的函數(shù)跳轉(zhuǎn)到另一個新的函數(shù)上,以及下次如何又跳轉(zhuǎn)回來。**如下面代碼所示:

void func1() {
printf("① 跳轉(zhuǎn)到func2");
Coroutine::CoYield(); // 通過該函數(shù)跳到func2
printf("③ func2跳轉(zhuǎn)回func1");
}

void func2() {
printf("② func2執(zhí)行完畢");
}

要實現(xiàn)這種能力,需要結(jié)合匯編知識。首先研究如下簡單函數(shù)的匯編語言

#include
using namespace std;

class Object {
public:
int val[12];
};

int func(Object *pObj1, Object *pObj2) {
pObj1->val[0] = 1;
pObj1->val[11] = 11;
pObj2->val[0] = 2;
pObj2->val[11] = 12;
int arr[100];
arr[0] = 3;
arr[99] = 99;
return pObj1->val[0];
}

int main() {
Object obj, obj2;
int a = func(&obj, &obj2);
return a;
}

下面看看在64位系統(tǒng)匯編中,func函數(shù)是如何執(zhí)行的。push %rbp是進(jìn)入func函數(shù)執(zhí)行的第一個指令,作用是把rbp的地址壓到棧頂。因為rsp始終指向棧頂,所以壓棧后,rsp的地址下移8字節(jié)。rdi和rsi相差48個字節(jié),該空間被class Object內(nèi)的int val[12]占用。

圖片

前兩個指令讓rbp指向rsp往下296字節(jié)的位置。后面兩個指令把rdi和rsi地址保存在最下面。

圖片

為什么rsp下移296字節(jié)?首先,上述代碼使用了臨時變量int arr[100],需要有400個字節(jié)的棧空間;其次,x64系統(tǒng)存有128字節(jié)的紅色區(qū)域可使用;最后,rdi和rsi地址共占16字節(jié)。因此,rbp到紅色區(qū)域底部的空間一共是 288 + 8 + 104 + 8 + 8 = 416字節(jié)。接下來才開始執(zhí)行func函數(shù)第一行代碼,給val[0]賦值。

圖片

然后分別給pObj1和pObj2的成員變量賦值

圖片

接下來給臨時變量arr賦值

圖片

最后讓eax指向返回值,恢復(fù)函數(shù)棧的棧底和棧頂。

圖片

  1. 協(xié)程的結(jié)構(gòu)

從前面我們知道,每個函數(shù)在內(nèi)存中都有棧頂rsp和棧底rbp。這兩個值決定了函數(shù)可操作的內(nèi)存范圍,如下圖所示

圖片

既然協(xié)程切換是從一個函數(shù)切換到另一個函數(shù),那么就需要知道兩個函數(shù)的rbp和rsp。然而,函數(shù)的rbp和rsp是執(zhí)行時設(shè)定的,代碼層面難以獲得。既然如此,我們可以實現(xiàn)騰出空間,讓函數(shù)在預(yù)期的rbp和rsp內(nèi)。定義一個類如下:

class Coroutine {
void* m_pRegister[14];
char m_pStack[1024];
std::function m_func;
};()>

那么在內(nèi)存模型中,該類的布局如下所示

圖片

這樣的協(xié)程在能被使用前需要做初始化,如下圖所示

圖片

在其他協(xié)程切換過來時,cpu寄存器可按m_pRegister預(yù)設(shè)的地址賦值,開始執(zhí)行DoWork函數(shù),函數(shù)代碼如下:

static void Coroutine::DoWork(Coroutine *pThis) {
pThis->m_func();
pThis->Yield(); // 轉(zhuǎn)讓控制流給同線程的其他協(xié)程
}

由于是靜態(tài)函數(shù),需令參數(shù)pThis為協(xié)程地址。所以,初始化時需要設(shè)置m_pRegister中的rdi為this。上述第二行代碼執(zhí)行時,rbp會設(shè)為this。所以執(zhí)行m_func時,如下圖所示:

圖片

  1. 協(xié)程間的切換

下面以Coroutine1切換到Coroutine2為例。主要分為兩步:1. 保存Coroutine1的上下文

圖片

  1. 加載Coroutine2的上下文

圖片

切換代碼可見源代碼Coroutine::Switch## 6. 協(xié)程池的實現(xiàn)

本文實現(xiàn)協(xié)程池比較簡單,初始化創(chuàng)建線程并設(shè)置thread_local變量以保存協(xié)程隊列狀態(tài)。并且,每個線程額外創(chuàng)建一個main協(xié)程用作Guard。在執(zhí)行時,每個線程通過輪詢的方式切換協(xié)程,若協(xié)程無任務(wù)則嘗試CAS獲取Job,否則直接執(zhí)行已有Job。當(dāng)Job執(zhí)行完或主動CoYield時,切換到下一個協(xié)程。為了避免CAS空轉(zhuǎn),在沒有任務(wù)時會阻塞休眠。當(dāng)任務(wù)來臨時則Notify所有線程的協(xié)程。

圖片

  1. 源代碼

example.cpp

/**
* @file example.cpp
* @author souma
* @brief 使用協(xié)程池的示例,編譯命令如下
* g++ example.cpp coroutine.cpp -lpthread -O3
* @version 0.1
* @date 2023-06-06
*
* @copyright Copyright (c) 2023
*
*/
#include
#include
#include "coroutine.h"

using namespace std;
using namespace comm;

void func(const string &sTaskName, uint32_t uWaitSeconds) {
printf("[%ld] [%s start], wait seconds[%u]n", time(nullptr), sTaskName.c_str(), uWaitSeconds);
time_t iStartSec = time(nullptr);
// 默認(rèn)可用65535字節(jié)的棧內(nèi)存,具體可看CO_STACK_SIZE
uint32_t uArrSize = 65535/4;
int arr[uArrSize];
while (time(nullptr) - iStartSec < uWaitSeconds) {
// 操作棧內(nèi)存
for (int i = 0; i < uArrSize; ++i) {
arr[i] = i;
}

// 切換控制流
printf("[%ld] [%s] -> [協(xié)程池]n", time(nullptr), sTaskName.c_str());
usleep(100);
Coroutine::CoYield(); // 只需這一個函數(shù)即可切換控制流
printf("[%ld] [協(xié)程池] -> [%s]n", time(nullptr), sTaskName.c_str());
}

// 檢查棧內(nèi)存是否正確
for (int i = 0; i < uArrSize; ++i) {
if (arr[i] != i) {
printf("棧內(nèi)存錯誤n");
exit(-1);
}
}
printf("[%ld] [%s end], expect_timecost[%d], real_timecost[%ld]n", time(nullptr), sTaskName.c_str(), uWaitSeconds, time(nullptr) - iStartSec);
}

int main() {
// 如果想當(dāng)線程池用,可以令第一個參數(shù)為線程數(shù),第二個參數(shù)為1。
// 在該場景下,使用小線程大協(xié)程不僅CPU消耗低,整體耗時也很低,可以自行測試。
CoroutinePool oPool(2, 300);
oPool.Run();

time_t iStartTime = time(nullptr);
const int iTaskCnt = 400;
vector> vecFuture;
for (int i = 0; i < iTaskCnt; ++i) {
// 模擬GetData中的Wait環(huán)節(jié), 1 ~ 5秒等待
shared_ptr pFuture = oPool.Submit([i](){func("Task" + to_string(i), random() % 5 + 1);});
if (pFuture != nullptr) {
vecFuture.emplace_back(pFuture);
}
}

// 阻塞等待所有Task完成
for (auto it = vecFuture.begin(); it != vecFuture.end(); ++it) {
(*it)->Get();
}

printf("demo's finished, time cost[%ld]n", time(nullptr) - iStartTime);
return 0;
}

coroutine.h

/**
* @file coroutine.h
* @author souma
* @brief 多線程無棧式協(xié)程池,請不要用-O0編譯否則會產(chǎn)生coredump
* @version 0.1
* @date 2023-06-06
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

namespace comm {
class Future;
class CoroutinePool;
class Coroutine;
struct CoroutinePoolCtx;
struct CoroutineTaskCtx;


struct CoroutinePoolCtx {
std::vector> m_vecCoroutine;
std::shared_ptr m_pMainCoroutine;
uint32_t m_uCursor;
uint32_t m_uWorkCnt;
};

struct CoroutineTaskCtx {
std::function m_userFunc;
std::shared_ptr m_pFuture;
};

// class ArraySyncQueue start
template
class ArraySyncQueue {
public:
ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs = 100, uint32_t uRetryTimes = 3);
bool Push(T *pObj);
T* Pop();
inline bool IsFull() const { return m_uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && m_uPushCursor == m_vecQueue.size() - 1); }
bool IsEmpty() const { return m_uPopCursor == m_uPushCursor; }

~ArraySyncQueue();

private:
uint32_t GetNextCursor(uint32_t uCursor);
private:
std::vector m_vecQueue;
uint32_t m_uPushCursor = 0;
uint32_t m_uPopCursor = 0;
uint32_t m_uSleepUs;
uint32_t m_uRetryTimes;
};
// class ArraySyncQueue end

// class Coroutine start
class Coroutine {
public:

friend class CoroutinePool;

/**
* @brief 調(diào)用該函數(shù)將執(zhí)行流交給其他協(xié)程,僅在協(xié)程池環(huán)境下有效
*
* @return true:協(xié)程切換成功, false:不在協(xié)程池環(huán)境中運行
*/
static bool CoYield();

Coroutine(const Coroutine &) = delete;
Coroutine(Coroutine &&) = delete;
Coroutine & operator=(const Coroutine &) = delete;
Coroutine & operator=(Coroutine &&) = delete;

private:
// 4096是預(yù)留給庫使用的棧內(nèi)存大小,后者是留給用戶使用的棧內(nèi)存大小
constexpr static uint32_t CO_STACK_SIZE = 4096 + 65535;

Coroutine();

/**
* @brief 當(dāng)前協(xié)程是否綁定了任務(wù)
*
* @return true:是
*/
inline bool HasTask() const { return m_pTaskCtx != nullptr; }

/**
* @brief 兩個協(xié)程切換,從pPrev切換到pNext
*/
static void Switch(Coroutine *pPrev, Coroutine *pNext);

/**
* @brief 將控制流轉(zhuǎn)給同線程的其他協(xié)程
*/
void Yield();

/**
* @brief 這個是給main協(xié)程用的
*/
void Register();

/**
* @brief 這個是給執(zhí)行用戶任務(wù)的協(xié)程用的
*/
void Register(std::shared_ptr pTaskCtx);

/**
* @return CoroutinePoolCtx& 當(dāng)前線程的協(xié)程上下文
*/
static CoroutinePoolCtx & GetCtx();

/**
* @brief 讓當(dāng)前線程的cursor往后移,輪詢協(xié)程
*/
static void MoveCursor();

/**
* @brief 協(xié)程包一層的函數(shù)
*/
static void DoWork(Coroutine *pThis);

/**
*
* @return void* 獲得自建rsp地址
*/
void* GetRsp();

/**
* 保存寄存器的值到m_pStack中
*/
void SaveReg();

private:
void* m_pRegister[14];
char m_pStack[CO_STACK_SIZE];
std::shared_ptr m_pTaskCtx;
};
// class Coroutine end

// class CoroutinePool start
class CoroutinePool {
public:
friend class Coroutine;
/**
* @brief 建立一個多線程協(xié)程池,即創(chuàng)建uThreadCnt個線程,每個線程含有uCoroutineCnt個協(xié)程
調(diào)用Run開始運行,調(diào)用Stop或直接析構(gòu)結(jié)束
* @param uThreadCnt 線程數(shù),小于1則為1
* @param uCoroutineCnt 每個線程的協(xié)程數(shù),小于1則為1
* @param uJobQueueSize 總?cè)蝿?wù)隊列大小,小于1則為1
*/
CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize = 1024000);

/**
* @brief 線程安全,可重入
* @return true:正常
*/
bool Run();

/**
* @brief 停止協(xié)程池 (會先保證池中任務(wù)完成再停止),線程安全可重入
*
*/
void Stop();

/**
* @param userFunc 用戶函數(shù)
* @return std::shared_ptr nullptr:協(xié)程池隊列滿了,提交不了
*/
std::shared_ptr Submit(const std::function &userFunc);

~CoroutinePool();
CoroutinePool(const CoroutinePool &) = delete;
CoroutinePool(CoroutinePool &&) = delete;
CoroutinePool & operator=(const CoroutinePool &) = delete;
CoroutinePool & operator=(CoroutinePool &&) = delete;

private:
static void LoopWork(CoroutinePool &oPool);

private:
bool m_bStarted;
uint32_t m_uThreadCnt;
uint32_t m_uRoutineCnt;
ArraySyncQueue m_queueJob;
std::vector> m_vecThread;
std::mutex m_oMutex;
std::condition_variable m_oCondition;
};
// class CoroutinePool end

// class Future start
class Future {
public:
/**
* @brief 阻塞獲得結(jié)果
*
* @param uTimeoutMs 超時時間
* @return true:成功, false:超時
*/
bool Get(uint32_t uTimeoutMs = -1);

/**
* @brief 設(shè)置狀態(tài)為完成
*/
void SetFinished();

Future();

Future(const Future&) = delete;
Future(Future&&) = delete;

Future & operator=(const Future&) = delete;
Future & operator=(Future&&) = delete;

private:
std::mutex m_oMutex;
std::condition_variable m_oCondition;
bool m_bFinished;
};
// class Future end
}()>*>()>

coroutine.cpp

/**
* @file coroutine.cpp
* @author souma
* @brief 協(xié)程池的具體實現(xiàn)
* @version 0.1
* @date 2023-06-06
*
* @copyright Copyright (c) 2023
*
*/

#include "coroutine.h"
#include

using namespace std;
namespace comm {

// class Coroutine start
Coroutine::Coroutine() {
m_pTaskCtx = nullptr;
}

void Coroutine::Register() {
m_pTaskCtx = make_shared();
m_pTaskCtx->m_userFunc = [](){};
m_pTaskCtx->m_pFuture = nullptr;
SaveReg();
}

void Coroutine::Register(shared_ptr pTaskCtx) {
m_pTaskCtx = pTaskCtx;
SaveReg();
}

inline void Coroutine::Yield() {
Coroutine::Switch(this, Coroutine::GetCtx().m_pMainCoroutine.get());
}

bool Coroutine::CoYield() {
if (GetCtx().m_vecCoroutine.size() == 0) {
return false;
}
GetCtx().m_vecCoroutine[GetCtx().m_uCursor]->Yield();
return true;
}

CoroutinePoolCtx & Coroutine::GetCtx() {
thread_local CoroutinePoolCtx coroutinePoolCtx;
return coroutinePoolCtx;
}

void Coroutine::MoveCursor() {
GetCtx().m_uCursor = GetCtx().m_uCursor == GetCtx().m_vecCoroutine.size() - 1 ? 0 : GetCtx().m_uCursor + 1;
}

extern "C" __attribute__((noinline, weak))
void Coroutine::Switch(Coroutine *pPrev, Coroutine *pNext) {
// 1.保存pPrev協(xié)程的上下文, rdi和pPrev同指向
// 2.加載pNext協(xié)程的上下文, rsi和pNext同指向
asm volatile(R"(
movq %rsp, %rax
movq %rbp, 104(%rdi)
movq %rax, 96(%rdi)
movq %rbx, 88(%rdi)
movq %rcx, 80(%rdi)
movq %rdx, 72(%rdi)
movq 0(%rax), %rax
movq %rax, 64(%rdi)
movq %rsi, 56(%rdi)
movq %rdi, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)

movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 48(%rsi), %rdi
movq 64(%rsi), %rax
movq 72(%rsi), %rdx
movq 80(%rsi), %rcx
movq 88(%rsi), %rbx
movq 96(%rsi), %rsp
movq 104(%rsi), %rbp
movq 56(%rsi), %rsi
movq %rax, (%rsp)
xorq %rax, %rax
)");
}

void Coroutine::DoWork(Coroutine *pThis) {
pThis->m_pTaskCtx->m_userFunc();
pThis->m_pTaskCtx->m_pFuture->SetFinished();
pThis->m_pTaskCtx.reset();
Coroutine::GetCtx().m_uWorkCnt--;
pThis->Yield();
}

void* Coroutine::GetRsp() {
// m_pRegister和m_pStack中間預(yù)留一個指針空間
auto sp = std::end(m_pStack) - sizeof(void*);
// 預(yù)定Rsp的地址保證能夠整除8字節(jié)
sp = decltype(sp)(reinterpret_cast(sp) & (~0xF));
return sp;
}

void Coroutine::SaveReg() {
void *pStack = GetRsp();
memset(m_pRegister, 0, sizeof m_pRegister);
void **pRax = (void**)pStack;
*pRax = (void*) DoWork;
// rsp
m_pRegister[12] = pStack;
// rax
m_pRegister[8] = *pRax;
// rdi
m_pRegister[6] = this;
}
// class Coroutine end

// class CoroutinePool start
CoroutinePool::CoroutinePool(uint32_t uThreadCnt, uint32_t uCoroutineCnt, uint32_t uJobQueueSize) : m_queueJob(uJobQueueSize) {
m_bStarted = false;
m_uThreadCnt = max(uThreadCnt, 1u);
m_uRoutineCnt = max(uCoroutineCnt, 1u);
}

bool CoroutinePool::Run() {
if (!__sync_bool_compare_and_swap(&m_bStarted, false, true)) {
return false;
}

for (decltype(m_uThreadCnt) i = 0; i < m_uThreadCnt; ++i) {
m_vecThread.emplace_back(make_shared(CoroutinePool::LoopWork, ref(*this)));
}
return true;
}

void CoroutinePool::Stop() {
if (!__sync_bool_compare_and_swap(&m_bStarted, true, false)) {
return;
}

m_oCondition.notify_all();
for (auto it = m_vecThread.begin(); it != m_vecThread.end(); ++it) {
(*it)->join();
}
m_vecThread.clear();
}

shared_ptr CoroutinePool::Submit(const function &userFunc) {
shared_ptr pNewFuture = make_shared();
CoroutineTaskCtx *pTaskCtx = new CoroutineTaskCtx;
pTaskCtx->m_pFuture = pNewFuture;
pTaskCtx->m_userFunc = userFunc;

if (!m_queueJob.Push(pTaskCtx)) {
delete pTaskCtx, pTaskCtx = nullptr;
return nullptr;
}
m_oCondition.notify_all();
return pNewFuture;
}

CoroutinePool::~CoroutinePool() {
Stop();
}

void CoroutinePool::LoopWork(CoroutinePool &oPool) {
Coroutine::GetCtx().m_uCursor = 0;
Coroutine::GetCtx().m_uWorkCnt = 0;
Coroutine::GetCtx().m_pMainCoroutine = shared_ptr(new Coroutine);
Coroutine::GetCtx().m_pMainCoroutine->Register();

Coroutine::GetCtx().m_vecCoroutine.clear();
for (decltype(oPool.m_uRoutineCnt) i = 0; i < oPool.m_uRoutineCnt; ++i) {
Coroutine::GetCtx().m_vecCoroutine.emplace_back(shared_ptr(new Coroutine));
}

Coroutine *pMainCoroutine, *pCurCoroutine;
while (oPool.m_bStarted || Coroutine::GetCtx().m_uWorkCnt > 0 || !oPool.m_queueJob.IsEmpty()) {

pMainCoroutine = Coroutine::GetCtx().m_pMainCoroutine.get();
pCurCoroutine = Coroutine::GetCtx().m_vecCoroutine[Coroutine::GetCtx().m_uCursor].get();

if (pCurCoroutine->HasTask()) {
Coroutine::Switch(pMainCoroutine, pCurCoroutine);
Coroutine::MoveCursor();
continue;
}

CoroutineTaskCtx *pTaskCtx = oPool.m_queueJob.Pop();
if (pTaskCtx == nullptr) {
if (Coroutine::GetCtx().m_uWorkCnt > 0) {
Coroutine::MoveCursor();
continue;
}
unique_lock oLock(oPool.m_oMutex);
oPool.m_oCondition.wait(oLock);
continue;
}

pCurCoroutine->Register(shared_ptr(pTaskCtx));
++Coroutine::GetCtx().m_uWorkCnt;
Coroutine::Switch(pMainCoroutine, pCurCoroutine);
Coroutine::MoveCursor();
}
}
// class CoroutinePool end

// class Future start
Future::Future() {
m_bFinished = false;
}

bool Future::Get(uint32_t uTimeoutMs) {
unique_lock oLock(m_oMutex);
if (m_bFinished) {
return true;
}
return m_oCondition.wait_for(oLock, chrono::milliseconds(uTimeoutMs)) == cv_status::no_timeout;
}

void Future::SetFinished() {
{
unique_lock oLock(m_oMutex);
m_bFinished = true;
}
m_oCondition.notify_all();
}
// class Future end

// class ArraySyncQueue start
template
ArraySyncQueue::ArraySyncQueue(uint32_t uCapacity, uint32_t uSleepUs, uint32_t uRetryTimes) {
for (uint32_t i = 0; i < std::max(uCapacity, 1u); ++i) {
m_vecQueue.emplace_back(nullptr);
}
m_uSleepUs = uSleepUs;
m_uRetryTimes = uRetryTimes;
}

template
bool ArraySyncQueue::Push(T *pObj) {
if (pObj == nullptr) {
return false;
}
uint32_t uRetryTimes = 0;
while (uRetryTimes <= m_uRetryTimes) {
uint32_t uPushCursor = m_uPushCursor;
if (uPushCursor == m_uPopCursor - 1 || (m_uPopCursor == 0 && uPushCursor == m_vecQueue.size() - 1)) {
// 隊列滿了
return false;
}

if (!__sync_bool_compare_and_swap(&m_vecQueue[uPushCursor], nullptr, pObj)) {
uRetryTimes++;
usleep(m_uSleepUs);
continue;
}

m_uPushCursor = GetNextCursor(uPushCursor);
return true;
}
// 競爭失敗
return false;
}

template
T* ArraySyncQueue::Pop() {
uint32_t uRetryTimes = 0;
while (uRetryTimes <= m_uRetryTimes) {
uint32_t uPopCursor = m_uPopCursor;
if (uPopCursor == m_uPushCursor) {
return nullptr;
}

T* pToReturn = m_vecQueue[uPopCursor];
if (pToReturn == nullptr || !__sync_bool_compare_and_swap(&m_vecQueue[uPopCursor], pToReturn, nullptr)) {
usleep(m_uSleepUs);
uRetryTimes++;
continue;
}
m_uPopCursor = GetNextCursor(uPopCursor);
return pToReturn;
}
return nullptr;
}

template
uint32_t ArraySyncQueue::GetNextCursor(uint32_t uCursor) {
if (uCursor == m_vecQueue.size() - 1) {
return 0;
}
return uCursor + 1;
}

template
ArraySyncQueue::~ArraySyncQueue() {
m_uRetryTimes = -1;
do {
T *pObj = Pop();
if (pObj == nullptr) {
return;
}
delete pObj, pObj = nullptr;
} while (true);
}
// class ArraySyncQueue end
}()>

8. 補充說明

8.1. 為什么不能-O0編譯?

在-O0的情況下,編譯器會給函數(shù)(coroutine.cpp:57)Coroutine::Switch包一層匯編指令,導(dǎo)致實際執(zhí)行匯編指令不是期望的。具體可以分別用-O0和-O3在GDB下disassemble看到差異。

8.2. 如果函數(shù)使用棧很大怎么辦?

源碼中定義的協(xié)程棧為CO_STACK_SIZE=4096 + 65535KB,若用戶函數(shù)使用的棧超過該范圍會產(chǎn)生coredump。簡單可行的解法是:1.盡量使用堆變量;2.改大CO_STACK_SIZE。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • Web服務(wù)器
    +關(guān)注

    關(guān)注

    0

    文章

    137

    瀏覽量

    24315
  • RPC
    RPC
    +關(guān)注

    關(guān)注

    0

    文章

    110

    瀏覽量

    11477
  • C++
    C++
    +關(guān)注

    關(guān)注

    21

    文章

    2085

    瀏覽量

    73302
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    501

    瀏覽量

    19580
收藏 人收藏

    評論

    相關(guān)推薦

    談?wù)?b class='flag-5'>協(xié)的那些事兒

    隨著異步編程的發(fā)展以及各種并發(fā)框架的普及,協(xié)作為一種異步編程規(guī)范在各類語言中地位逐步提高。我們不單單會在自己的程序中使用協(xié),各類框架如fastapi,aiohttp等也都是基于異步
    的頭像 發(fā)表于 01-26 11:36 ?1018次閱讀
    談?wù)?b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的那些事兒

    協(xié)和線程有什么區(qū)別

    協(xié)和線程的區(qū)別協(xié)和線程的共同目的之一是實現(xiàn)系統(tǒng)資源的上下文調(diào)用,不過它們的實現(xiàn)層級不同;線程(Thraed)是比進(jìn)程小一級的的運行單位,多線程實現(xiàn)系統(tǒng)資源上下文調(diào)用,是編程語言交付
    發(fā)表于 12-10 06:23

    Python中的多核CPU共享數(shù)據(jù)之協(xié)詳解

    協(xié)又稱微線程,coroutne,協(xié)是一種用戶態(tài)的輕量級線程。通俗點講就是周末我在家里休息,假如我先洗漱,再煮飯,再下載電影看會很慢,用了協(xié)
    的頭像 發(fā)表于 12-07 10:23 ?6470次閱讀
    Python中的多核CPU共享數(shù)據(jù)之<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>詳解

    手機(jī)上的協(xié)處理器有什么作用_蘋果協(xié)處理器是干什么的

    本文首先介紹了協(xié)處理器概念,其次介紹了協(xié)處理器內(nèi)部結(jié)構(gòu)與手機(jī)協(xié)處理器的作用,最后介紹了蘋果的M8協(xié)
    的頭像 發(fā)表于 04-24 09:27 ?2.1w次閱讀

    關(guān)于C++ 20協(xié)最全面詳解

    花了一兩周的時間后,我想寫寫 C++20 協(xié)的基本用法,因為 C++ 的協(xié)讓我感到很奇怪,寫一個協(xié)
    的頭像 發(fā)表于 04-12 11:10 ?1.3w次閱讀
    關(guān)于C++ 20<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>最全面詳解

    Python后端項目的協(xié)是什么

    最近公司 Python 后端項目進(jìn)行重構(gòu),整個后端邏輯基本都變更為采用“異步”協(xié)的方式實現(xiàn)??粗鴿M屏幕經(jīng)過 async await(協(xié)在 Python 中的實現(xiàn))修飾的代碼,我頓時
    的頭像 發(fā)表于 09-23 14:38 ?1257次閱讀

    Python協(xié)與JavaScript協(xié)的對比及經(jīng)驗技巧

    前言以前沒怎么接觸前端,對 JavaScript 的異步操作不了解,現(xiàn)在有了點了解。一查發(fā)現(xiàn) Python 和 JavaScript 的協(xié)發(fā)展史簡直就是一毛一樣!這里大致做下橫向?qū)Ρ群涂偨Y(jié),便于
    的頭像 發(fā)表于 10-20 14:30 ?1748次閱讀

    通過例子由淺入深的理解yield協(xié)

    send:send() 方法致使協(xié)程前進(jìn)到下一個yield 語句,另外,生成器可以作為協(xié)使用
    的頭像 發(fā)表于 08-23 11:12 ?1918次閱讀

    使用channel控制協(xié)數(shù)量

    goroutine 是輕量級線程,調(diào)度由 Go 運行時進(jìn)行管理的。Go 語言的并發(fā)控制主要使用關(guān)鍵字 go 開啟協(xié) goroutine。Go 協(xié)(Goroutine)之間通過信道(
    的頭像 發(fā)表于 09-19 15:06 ?1042次閱讀

    詳解Linux線程、線程與異步編程、協(xié)與異步

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認(rèn)為
    的頭像 發(fā)表于 03-16 15:49 ?846次閱讀

    協(xié)的概念及協(xié)的掛起函數(shù)介紹

    協(xié)是一種輕量級的線程,它可以在單個線程中實現(xiàn)并發(fā)執(zhí)行。與線程不同,協(xié)不需要操作系統(tǒng)的上下文切換,因此可以更高效地使用系統(tǒng)資源。Kotlin 協(xié)
    的頭像 發(fā)表于 04-19 10:20 ?805次閱讀

    Kotlin協(xié)實戰(zhàn)進(jìn)階之筑基篇3

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方庫來支持協(xié)
    的頭像 發(fā)表于 05-30 16:26 ?575次閱讀

    FreeRTOS任務(wù)與協(xié)介紹

    FreeRTOS 中應(yīng)用既可以使用任務(wù),也可以使用協(xié)(Co-Routine),或者兩者混合使用。但是任務(wù)和協(xié)使用不同的API函數(shù),因此不能通過隊列(或信號量)將數(shù)據(jù)從任務(wù)發(fā)送給協(xié)
    的頭像 發(fā)表于 09-28 11:02 ?860次閱讀

    協(xié)的實現(xiàn)與原理

    前言 協(xié)這個概念很久了,好多程序員是實現(xiàn)過這個組件的,網(wǎng)上關(guān)于協(xié)的文章,博客,論壇都是汗牛充棟,在知乎,github上面也有很多大牛寫了關(guān)于協(xié)
    的頭像 發(fā)表于 11-10 10:57 ?349次閱讀

    Linux線程、線程與異步編程、協(xié)與異步介紹

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認(rèn)為
    的頭像 發(fā)表于 11-11 11:35 ?805次閱讀
    Linux線程、線程與異步編程、<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>與異步介紹