* 本文修改后的代碼已上傳到GitHub網(wǎng)站Apollo項目中。
{ 1 }
線程池技術(shù)簡介
1線程池的定義
線程池是一種多線程形式,首先開啟指定數(shù)量的后臺工作線程,并將多個待執(zhí)行任務(wù)添加到任務(wù)隊列,然后將隊列中的任務(wù)逐個交給空閑的工作線程執(zhí)行(如下圖所示)。
2使用線程池的原因
創(chuàng)建/銷毀線程伴隨著操作系統(tǒng)的資源開銷,過于頻繁的創(chuàng)建/銷毀線程,會很大程度上影響處理效率。若創(chuàng)建線程消耗時間T1,執(zhí)行任務(wù)消耗時間T2,銷毀線程消耗時間T3,如果T1+T3>T2,則開啟一個線程來執(zhí)行一個任務(wù)就很不劃算,而使用線程池緩存線程,就可利用已有的閑置線程來執(zhí)行新任務(wù),有效避免T1+T3帶來的系統(tǒng)開銷。
線程并發(fā)數(shù)量過多,搶占系統(tǒng)資源從而導(dǎo)致阻塞。我們知道線程會共享系統(tǒng)資源,如果同時執(zhí)行的線程數(shù)量過多,可能會導(dǎo)致系統(tǒng)資源不足而產(chǎn)生操作卡頓甚至出現(xiàn)假死現(xiàn)象,運用線程池能有效地控制線程最大并發(fā)數(shù),有效避免上述問題。
對線程進行一些簡單的管理。比如:延時執(zhí)行、定時循環(huán)執(zhí)行等策略,運用線程池就較容易實現(xiàn)。
3C++中如何使用線程池
C++標(biāo)準(zhǔn)庫不提供線程池,如需使用需自行撰寫線程池類。GitHub中有多個線程池類的實現(xiàn),Apollo項目也參考了其中的一個實現(xiàn)【https://github.com/vit-vit/CTPL】。
{ 2 }
Apollo線程池類源代碼分析
Apollo線程池文件位于[your_apollo_root_dir]/modules/common/util/ctpl_stl.h,包含任務(wù)隊列類Queue和線程池類ThreadPool,其中Queue位于命名空間apollo::common::util::detail內(nèi),ThreadPool位于命名空間apollo::common::util內(nèi)。
1任務(wù)隊列類Queue
任務(wù)隊列類Queue基于C++標(biāo)準(zhǔn)庫的隊列類std::queue實現(xiàn),只是對push、pop和empty三個函數(shù)進行了加鎖操作。
|
template class Queue { public: bool push(T const &value) { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); q_.push(value); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
根據(jù)這篇博客【https://blog.csdn.net/tgxallen/article/details/73522233】的介紹,可使用std::lock_guard和std::unique_lock提供RAII(資源獲取即初始化,Resource Acquisition Is Initialization,參見該網(wǎng)頁【https://blog.csdn.net/doc_sgl/article/details/43028009】)風(fēng)格的加鎖操作,其中std::lock_guard的系統(tǒng)開銷更小,std::unique_lock更為靈活(可適時解鎖)。就我們的任務(wù)隊列類Queue而言,不需要std::unique_lock提供的靈活性,因此使用std::lock_guard更為合適。另外,我還增加一個接受右值引用的push函數(shù),以方便下文中的ThreadPool使用,修改后的類如下:
|
class Queue { public: bool push(const T &value) { std::lock_guard lock(mutex_); q_.push(value); return true; } // 增加一個接受右值引用的push函數(shù) bool push(T &&value) { std::lock_guard lock(mutex_); q_.emplace(std::move(value)); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT std::lock_guard lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { std::lock_guard lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
2線程池類ThreadPool
線程池類ThreadPool的主要功能是創(chuàng)建n_threads個后臺工作線程,將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊列q_,根據(jù)當(dāng)前工作線程空閑情況,適時從任務(wù)隊列q_中提取一個任務(wù)函數(shù)并執(zhí)行之。注意復(fù)制構(gòu)造函數(shù)ThreadPool(const ThreadPool &)、移動構(gòu)造函數(shù)ThreadPool(ThreadPool &&)、復(fù)制運算符ThreadPool &operator=(const ThreadPool &)、移動運算符ThreadPool &operator=(ThreadPool &&)全部設(shè)置為private,表明禁止使用這些函數(shù)。其實C++11標(biāo)準(zhǔn)完成可以通過在函數(shù)聲明后加上= delete;的方式來禁用,源代碼以注釋的方式給出了這種實現(xiàn)方式。
下面分析該類中幾個比較重要的成員函數(shù)。
2.2.1 Push函數(shù)
Push函數(shù)的作用是將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊列q_。Push函數(shù)有兩個版本,一個允許任務(wù)函數(shù)f帶可變參數(shù)Rest &&... rest,一個不允許任務(wù)函數(shù)f帶額外參數(shù),函數(shù)體內(nèi)部代碼大同小異,下面以帶可變參數(shù)的版本進行分析,代碼如下:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { // std::placeholders::_1表示通過std::bind函數(shù)綁定后得到的異步任務(wù)對象接受的第一個參數(shù)是自由參數(shù) auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); // 最好使用std::make_shared創(chuàng)建智能指針對象,后面不用操心指針內(nèi)存的釋放 // auto _f = std::make_shared>([pck](int id) { (*pck)(id); }); auto _f = new std::function([pck](int id) { (*pck)(id); }); q_.push(_f); // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_one(); return pck->get_future();} |
Push函數(shù)的返回值為一個std::future對象,std::future對象內(nèi)存儲的數(shù)據(jù)類型由(f(0, rest...)函數(shù)的返回值類型確定,decltype(f(0, rest...))的作用就是獲取(f(0, rest...)函數(shù)的返回值類型。std::future提供一種異步操作結(jié)果的訪問機制,從字面意思來理解,它表示未來,我覺得這個名字非常貼切,因為一個異步操作的結(jié)果不可能馬上獲取,只能在未來某個時候得到。關(guān)于std::future,這篇博客【https://blog.csdn.net/yockie/article/details/50595958】講得挺不錯,大家可以借鑒。
因為任務(wù)函數(shù)f的聲明各式各樣,有的不帶參數(shù),有的接受一個參數(shù),有的接受兩個參數(shù)……因此不能將其直接存儲到任務(wù)隊列q_,于是先利用std::bind函數(shù)將其包裝為一個異步操作任務(wù)std::packaged_task對象pck(接受一個整型參數(shù),返回值類型為(f(0, rest...)函數(shù)的返回值類型),再利用Lambda表達式將pck包裝為一個std::function對象,這樣就可以存儲到任務(wù)隊列q_中了。這里原作者直接使用new運算符創(chuàng)建裸指針_f,后面還需想辦法釋放指針內(nèi)存,我認(rèn)為不是很合適,使用std::make_shared創(chuàng)建智能指針可以自動管理內(nèi)存,更加省事,但使用std::shared_ptr>智能指針就不能使用Queue::push(const T &value)版本將其存儲到任務(wù)隊列,為此我在Queue類中添加了一個接受右值引用參數(shù)的版本Queue::push(T &&value),使用該版本就可以順利將智能指針存儲進去了。
接下來,使用條件變量std::condition_variable對象cv_.notify_one()函數(shù)通知各個線程任務(wù)隊列已經(jīng)發(fā)生了改變,讓空閑線程趕緊從任務(wù)隊列中拉取新任務(wù)執(zhí)行;最后通過pck->get_future()返回一個std::future對象,以便調(diào)用者能從中取出函數(shù)執(zhí)行完畢后的返回值。
我看過很多C++多線程方面的書籍(”C++ Concurrency in Action”比較經(jīng)典),一般不對cv_.notify_one();進行加鎖操作,因為這樣做除了降低效率外,還很容易引起死鎖,故需去除加鎖操作,具體原因參見該網(wǎng)頁【https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one】以及另一個網(wǎng)頁【http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one】。
以下是修改后的版本:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future();} |
2.2.2Pop函數(shù)
Pop函數(shù)的作用是從任務(wù)隊列q_中取出并返回一個任務(wù),代碼如下:
|
std::function Pop() { std::function *_f = nullptr; q_.pop(_f); // 如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f;} |
首先,從從任務(wù)隊列q_中取出一個任務(wù)函數(shù)對象的裸指針_f,若非空,則將其賦值給std::function f并返回。該函數(shù)里使用一個小花招,即創(chuàng)建一個智能指針std::unique_ptr> func(_f),當(dāng)超出該對象的作用域時,就會在其析構(gòu)函數(shù)中調(diào)用delete運算符釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。
以下是修改后的版本:
|
std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f;} |
2.2.3Stop函數(shù)
Stop函數(shù)的作用停止線程池工作,若不允許等待,則直接停止當(dāng)前正在執(zhí)行的工作線程,同時清空任務(wù)隊列;若允許等待,則等待當(dāng)前正在執(zhí)行的工作線程完成,代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?{ ? ? ?// 這里不要加鎖,否則易引起死鎖 ? ? ? ?std::unique_lock lock(mutex_); cv_.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
函數(shù)中的布爾變量is_stop_、is_done_、flags_[i]為什么都不用加鎖呢?這是因為它們都是原子類型std::atomic,所謂原子類型就是一條CPU指令就能完成取值或?qū)懼挡僮鞯淖兞款愋汀++標(biāo)準(zhǔn)可保證std::atomic類型變量在任何架構(gòu)操作系統(tǒng)中均只使用一條CPU指令就可完成取值或?qū)懼挡僮?,其他形如std::atomic的類型,雖然將其聲明為原子類型,但在某些架構(gòu)操作系統(tǒng)中,并不能只使用一條CPU指令完成取值或?qū)懼挡僮?。綜上所述,std::atomic類型的變量可以在多線程中不加鎖操作。
根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除。
修改后的代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
2.2.4ClearQueue函數(shù)
ClearQueue函數(shù)的作用是清空任務(wù)隊列q_,代碼如下:
|
void ClearQueue() { std::function *_f; // empty the queue while (q_.pop(_f)) { delete _f; }} |
使用while循環(huán)從任務(wù)隊列q_中逐個彈出任務(wù)函數(shù)指針_f,因為_f使用new運算符創(chuàng)建,故需使用delete運算符刪除以釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必手工刪除對象來釋放內(nèi)存了。
以下是使用智能指針的版本:
|
void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } |
2.2.5Resize函數(shù)
Resize函數(shù)的作用是更改線程池內(nèi)工作線程的數(shù)量,代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } { // stop the detached threads that were waiting // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_all(); } // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
如果兩個變量is_stop_、is_done_都不為真,表明線程池仍在使用,可以更改線程池內(nèi)工作線程的數(shù)量,否則沒必要對一個停用的線程池更改工作線程的數(shù)量。若新線程數(shù)n_threads大于當(dāng)前的工作線程數(shù)old_n_threads,則將工作線程數(shù)組threads_和線程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目,同時使用for循環(huán)調(diào)用SetThread(i)函數(shù)逐個重新創(chuàng)建工作線程;若新線程數(shù)n_threads小于當(dāng)前的工作線程數(shù)old_n_threads,則將先完成old_n_threads - n_threads個線程正在執(zhí)行的任務(wù),之后將工作線程數(shù)組threads_和線程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目。
根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除,具體原因參見該網(wǎng)頁
注意:Resize函數(shù)很危險,應(yīng)盡量少調(diào)用,若必須調(diào)用,則應(yīng)當(dāng)在創(chuàng)建線程池的那個線程內(nèi)調(diào)用,而不要在其他線程中調(diào)用。
修改的代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
2.2.6SetThread函數(shù)
SetThread函數(shù)的作用重新創(chuàng)建指定序號i的工作線程,代碼如下:
|
void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::function *_f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue // 如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception // occurred // 執(zhí)行任務(wù)函數(shù) (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command // 這里必須使用std::unique_lock,因為后面條件變量cv_等待期間,需要解鎖。 std::unique_lock lock(mutex_); ++n_waiting_; // 等待任務(wù)隊列傳來的新任務(wù) cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } |
上述代碼看起來比較復(fù)雜,實際上只有三條語句,第一條是std::shared_ptr> flag(flags_[i]);,即使用flags_[i]來初始化標(biāo)志變量flag;第二條看起來很長,實際上就是創(chuàng)建一個Lambda表達式變量f;第三條是threads_[i].reset(new std::thread(f));,使用Lambda表達式變量f作為工作線程的任務(wù)函數(shù),創(chuàng)建序號為i的工作線程。
那么Lambda表達式變量f何時啟動呢?當(dāng)任務(wù)隊列q_.pop(_f)的返回值為true時,表明從任務(wù)隊列q_中取到了一個新任務(wù),于是調(diào)用(*_f)(i);執(zhí)行之,如果當(dāng)前任務(wù)隊列沒有任務(wù),則使用:
|
cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag;}); |
等待新任務(wù)的到來,在新任務(wù)到來之前,當(dāng)前工作線程處于休眠狀態(tài)。
該函數(shù)同樣使用一個小花招,即創(chuàng)建一個智能指針std::unique_ptr> func(_f),當(dāng)超出該對象的作用域時,就會在其析構(gòu)函數(shù)中調(diào)用delete運算符釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。
2.2.7修改后的ThreadPool類代碼
為完整起見,這里給出修改后的ThreadPool類代碼。
|
class ThreadPool { public: ThreadPool() { Init(); } explicit ThreadPool(int n_threads) { Init(); Resize(n_threads); } // the destructor waits for all the functions in the queue to be finished ~ThreadPool() { Stop(true); } // get the number of running threads in the pool int size() { return static_cast(threads_.size()); } // number of idle threads int NumIdle() { return n_waiting_; } std::thread &GetThread(const int i) { return *(threads_[i]); } // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, // also with stop() // n_threads must be >= 0 void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } } } // empty the queue void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } // pops a functional wrapper to the original function std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f; } // wait for all computing threads to finish and stop all threads // may be called asynchronously to not pause the calling thread while waiting // if is_wait == true, all the functions in the queue are run, otherwise the // queue is cleared without running the functions void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear(); } template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } // run the user's function that excepts argument int - id of the running // thread. returned value is templatized // operator returns std::future, where the user can get the result and rethrow // the catched exceptins template auto Push(F &&f) -> std::future { auto pck = std::make_shared>( std::forward(f)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } private: // deleted ThreadPool(const ThreadPool &); // = delete; ThreadPool(ThreadPool &&); // = delete; ThreadPool &operator=(const ThreadPool &); // = delete; ThreadPool &operator=(ThreadPool &&); // = delete; void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::shared_ptr> _f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command { std::unique_lock lock(mutex_); ++n_waiting_; cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } void Init() { is_stop_ = false; is_done_ = false; n_waiting_ = 0; } std::vector> threads_; std::vector>> flags_; detail::Queue>> q_; std::atomic is_done_; std::atomic is_stop_; std::atomic n_waiting_; // how many threads are waiting std::mutex mutex_; std::condition_variable cv_;}; |
2.2.8增加的單元測試代碼
為檢驗修改后代碼的正確性,增添如下單元測試代碼。第一個待測試函數(shù)filter_duplicates_str接受的第一個參數(shù)為一個整型ID值,我在測試代碼中只是將其作為一個占位符,實際并未使用,后面接受四個C風(fēng)格字符串,該函數(shù)的任務(wù)是去除四個字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回;第二個待測試函數(shù)filter_duplicates只接受的一個整型ID值參數(shù),我在測試代碼中只是將其作為一個占位符,實際并未使用,該函數(shù)的任務(wù)是去除一串固定字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回。因為C++編譯器不能推導(dǎo)出重載函數(shù)的正確版本,因此第二個待測函數(shù)并未使用重載函數(shù)形式。兩個待測函數(shù)均使用線程池執(zhí)行1000次,最后檢查返回結(jié)果與預(yù)期結(jié)果的一致性。
|
#include "modules/common/util/ctpl_stl.h"#include #include #include #include #include #include #include "gtest/gtest.h"namespace apollo {namespace common {namespace util {namespace {// ...// Attention: don't use overloaded functions, otherwise the compiler can't// deduce the correct edition.std::string filter_duplicates_str(int id, const char* str1, const char* str2, const char* str3, const char* str4) { // id is unused. std::stringstream ss_in; ss_in << str1 << " " << str2 << " " << str3 << " " << str4; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}std::string filter_duplicates(int id) { // id is unused. std::stringstream ss_in; ss_in << "a a b b b c foo foo bar foobar foobar hello world hello hello world"; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}} // namespaceTEST(ThreadPool, filter_duplicates) { const unsigned int hardware_threads = std::thread::hardware_concurrency(); const unsigned int threads = std::min(hardware_threads != 0 ? hardware_threads : 2, 50U); ThreadPool p(threads); std::vector> futures1, futures2; for (int i = 0; i < 1000; ++i) { ? ?futures1.push_back(std::move(p.Push( ? ? ? ?filter_duplicates_str, "thread pthread", "pthread thread good news", ? ? ? ?"today is a good day", "she is a six years old girl"))); ? ?futures2.push_back(std::move(p.Push(filter_duplicates))); ?} ?for (int i = 0; i < 1000; ++i) { ? ?std::string result1 = futures1[i].get(); ? ?std::string result2 = futures2[i].get(); ? ?EXPECT_STREQ( ? ? ? ?result1.c_str(), ? ? ? ?"a day girl good is news old pthread she six thread today years "); ? ?EXPECT_STREQ(result2.c_str(), "a b bar c foo foobar hello world "); ?}}} ?// namespace util} ?// namespace common} ?// namespace apollo |
{ 3 }
Apollo Planning模塊對于線程池的使用分析
Apollo Planning模塊通過PlanningThreadPool類來完成對線程池ThreadPool的包裝調(diào)用。PlanningThreadPool類位于頭文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.h及對應(yīng)的實現(xiàn)文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.cc中,位于命名空間apollo::planning內(nèi)。
1PlanningThreadPool類
PlanningThreadPool類的聲明如下:
|
class PlanningThreadPool { public: void Init(); void Stop() { if (thread_pool_) { thread_pool_->Stop(true); } } template void Push(F &&f, Rest &&... rest) { func_.push_back(std::move(thread_pool_->Push(f, rest...))); } template void Push(F &&f) { func_.push_back(std::move(thread_pool_->Push(f))); } void Synchronize(); private: std::unique_ptr thread_pool_; bool is_initialized = false; // 這里的func_用得非常不恰當(dāng),因為這里保存的是std::future對象, // 而非std::function對象,將其修改為futures_很有必要。 std::vector> func_; DECLARE_SINGLETON(PlanningThreadPool);}; |
PlanningThreadPool通過宏DECLARE_SINGLETON定義一個單實例類,因此不能直接在棧(stack)和堆(heap)上創(chuàng)建該類對象,而只能通過PlanningThreadPool::instance()獲取該類的唯一實例。該類中的成員變量func_非常具有誤導(dǎo)性,實際上它是一個保存著多個std::future對象的動態(tài)數(shù)組,而不是保存std::function對象,也就是說它保存的是函數(shù)的異步返回值對象,而非異步函數(shù)對象本身,因此這里將其修改為futures_很有必要。
2PlanningThreadPool類的使用
在Planning模塊使用PlanningThreadPool類的步驟如下:
3.2.1初始化線程池
在Planning::Init()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句完成PlanningThreadPool類對象的初始化:
|
// initialize planning thread pool PlanningThreadPool::instance()->Init(); |
3.2.2 利用線程池完成并發(fā)處理
在合適的位置調(diào)用線程池完成某個功能的并發(fā)處理,一般而言是在某個循環(huán)體內(nèi)。注意:需進行并發(fā)處理的任務(wù),相互之間不能有先后依賴關(guān)系,因為使用線程池執(zhí)行并發(fā)任務(wù)時根本不知道哪個任務(wù)會先執(zhí)行,哪個任務(wù)會后執(zhí)行。
Planning模塊目前在以下幾處使用了線程池:
ReferenceLineInfo::AddObstacles函數(shù)ReferenceLineInfo::AddObstacles函數(shù)(位于[your_apollo_root_dir]/modules/planning/common/reference_line_info.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),用于增加當(dāng)前的障礙物信息,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。
|
bool ReferenceLineInfo::AddObstacles(const std::vector& obstacles) {if (FLAGS_use_multi_thread_to_add_obstacles) {std::vector ret(obstacles.size(), 0);for (size_t i = 0; i < obstacles.size(); ++i) { ?const auto* obstacle = obstacles.at(i); ?PlanningThreadPool::instance()->Push(std::bind( &ReferenceLineInfo::AddObstacleHelper, this, obstacle, &(ret[i])));}PlanningThreadPool::instance()->Synchronize();if (std::find(ret.begin(), ret.end(), 0) != ret.end()) { return false;}} else {// ...}return true;} |
DPRoadGraph::GenerateMinCostPath函數(shù)DPRoadGraph::GenerateMinCostPath函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_poly_path/dp_road_graph.cc中)在每級航點(way point)上多個橫向采樣點的for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),用于計算本級航點的最小代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。
|
bool DPRoadGraph::GenerateMinCostPath( const std::vector &obstacles, std::vector *min_cost_path) { // ... for (std::size_t level = 1; level < path_waypoints.size(); ++level) { ? ?const auto &prev_dp_nodes = graph_nodes.back(); ? ?const auto &level_points = path_waypoints[level]; ? ?graph_nodes.emplace_back(); ? ?for (size_t i = 0; i < level_points.size(); ++i) { ? ? ?const auto &cur_point = level_points[i]; ? ? ?graph_nodes.back().emplace_back(cur_point, nullptr); ? ? ?auto &cur_node = graph_nodes.back().back(); ? ? ?if (FLAGS_enable_multi_thread_in_dp_poly_path) { ? ? ? ?PlanningThreadPool::instance()->Push(std::bind( &DPRoadGraph::UpdateNode, this, std::ref(prev_dp_nodes), level, total_level, &trajectory_cost, &(front), &(cur_node))); } else { UpdateNode(prev_dp_nodes, level, total_level, &trajectory_cost, &front, &cur_node); } } if (FLAGS_enable_multi_thread_in_dp_poly_path) { PlanningThreadPool::instance()->Synchronize(); } } // ...} |
DpStGraph::CalculateTotalCost函數(shù)DpStGraph::CalculateTotalCost函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_st_speed/dp_st_graph.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),對于時間采樣值c上的不同距離采樣值r: next_lowest_row<=r<=next_highest_row計算抵達節(jié)點(c, r)的最小總代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。
|
Status DpStGraph::CalculateTotalCost() { // col and row are for STGraph // t corresponding to col // s corresponding to row uint32_t next_highest_row = 0; uint32_t next_lowest_row = 0; for (size_t c = 0; c < cost_table_.size(); ++c) { ? ?int highest_row = 0; ? ?int lowest_row = cost_table_.back().size() - 1; ? ?for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?if (FLAGS_enable_multi_thread_in_dp_st_graph) { ? ? ? ?PlanningThreadPool::instance()->Push( std::bind(&DpStGraph::CalculateCostAt, this, c, r)); } else { CalculateCostAt(c, r); } } if (FLAGS_enable_multi_thread_in_dp_st_graph) { PlanningThreadPool::instance()->Synchronize(); } for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?const auto& cost_cr = cost_table_[c][r]; ? ? ?if (cost_cr.total_cost() < std::numeric_limits::infinity()) { int h_r = 0; int l_r = 0; GetRowRange(cost_cr, &h_r, &l_r); highest_row = std::max(highest_row, h_r); lowest_row = std::min(lowest_row, l_r); } } next_highest_row = highest_row; next_lowest_row = lowest_row; } return Status::OK();} |
3.2.3 銷毀線程池
在Planning::Stop()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句以便 銷毀線程池:
1 |
PlanningThreadPool::instance()->Stop(); |
自Apollo平臺開放已來,我們收到了大量開發(fā)者的咨詢和反饋,越來越多開發(fā)者基于Apollo擦出了更多的火花,并愿意將自己的成果貢獻出來,這充分體現(xiàn)了Apollo『貢獻越多,獲得越多』的開源精神。為此我們開設(shè)了『開發(fā)者說』板塊,希望開發(fā)者們能夠踴躍投稿,更好地為廣大自動駕駛開發(fā)者營造一個共享交流的平臺!
評論