0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

線程池技術(shù)簡介與Apollo線程池類源代碼分析

YB7m_Apollo_Dev ? 來源:未知 ? 作者:李倩 ? 2018-06-05 14:30 ? 次閱讀

* 本文修改后的代碼已上傳到GitHub網(wǎng)站Apollo項目中。

{ 1 }

線程池技術(shù)簡介

1線程池的定義

線程池是一種多線程形式,首先開啟指定數(shù)量的后臺工作線程,并將多個待執(zhí)行任務(wù)添加到任務(wù)隊列,然后將隊列中的任務(wù)逐個交給空閑的工作線程執(zhí)行(如下圖所示)。

2使用線程池的原因

創(chuàng)建/銷毀線程伴隨著操作系統(tǒng)的資源開銷,過于頻繁的創(chuàng)建/銷毀線程,會很大程度上影響處理效率。若創(chuàng)建線程消耗時間T1,執(zhí)行任務(wù)消耗時間T2,銷毀線程消耗時間T3,如果T1+T3>T2,則開啟一個線程來執(zhí)行一個任務(wù)就很不劃算,而使用線程池緩存線程,就可利用已有的閑置線程來執(zhí)行新任務(wù),有效避免T1+T3帶來的系統(tǒng)開銷。

線程并發(fā)數(shù)量過多,搶占系統(tǒng)資源從而導(dǎo)致阻塞。我們知道線程會共享系統(tǒng)資源,如果同時執(zhí)行的線程數(shù)量過多,可能會導(dǎo)致系統(tǒng)資源不足而產(chǎn)生操作卡頓甚至出現(xiàn)假死現(xiàn)象,運用線程池能有效地控制線程最大并發(fā)數(shù),有效避免上述問題。

對線程進行一些簡單的管理。比如:延時執(zhí)行、定時循環(huán)執(zhí)行等策略,運用線程池就較容易實現(xiàn)。

3C++中如何使用線程池

C++標(biāo)準(zhǔn)庫不提供線程池,如需使用需自行撰寫線程池類。GitHub中有多個線程池類的實現(xiàn),Apollo項目也參考了其中的一個實現(xiàn)【https://github.com/vit-vit/CTPL】。

{ 2 }

Apollo線程池類源代碼分析

Apollo線程池文件位于[your_apollo_root_dir]/modules/common/util/ctpl_stl.h,包含任務(wù)隊列類Queue和線程池類ThreadPool,其中Queue位于命名空間apollo::common::util::detail內(nèi),ThreadPool位于命名空間apollo::common::util內(nèi)。

1任務(wù)隊列類Queue

任務(wù)隊列類Queue基于C++標(biāo)準(zhǔn)庫的隊列類std::queue實現(xiàn),只是對push、pop和empty三個函數(shù)進行了加鎖操作。

template class Queue { public: bool push(T const &value) { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); q_.push(value); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;};

根據(jù)這篇博客【https://blog.csdn.net/tgxallen/article/details/73522233】的介紹,可使用std::lock_guard和std::unique_lock提供RAII(資源獲取即初始化,Resource Acquisition Is Initialization,參見該網(wǎng)頁【https://blog.csdn.net/doc_sgl/article/details/43028009】)風(fēng)格的加鎖操作,其中std::lock_guard的系統(tǒng)開銷更小,std::unique_lock更為靈活(可適時解鎖)。就我們的任務(wù)隊列類Queue而言,不需要std::unique_lock提供的靈活性,因此使用std::lock_guard更為合適。另外,我還增加一個接受右值引用的push函數(shù),以方便下文中的ThreadPool使用,修改后的類如下:

class Queue { public: bool push(const T &value) { std::lock_guard lock(mutex_); q_.push(value); return true; } // 增加一個接受右值引用的push函數(shù) bool push(T &&value) { std::lock_guard lock(mutex_); q_.emplace(std::move(value)); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT std::lock_guard lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { std::lock_guard lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;};

2線程池類ThreadPool

線程池類ThreadPool的主要功能是創(chuàng)建n_threads個后臺工作線程,將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊列q_,根據(jù)當(dāng)前工作線程空閑情況,適時從任務(wù)隊列q_中提取一個任務(wù)函數(shù)并執(zhí)行之。注意復(fù)制構(gòu)造函數(shù)ThreadPool(const ThreadPool &)、移動構(gòu)造函數(shù)ThreadPool(ThreadPool &&)、復(fù)制運算符ThreadPool &operator=(const ThreadPool &)、移動運算符ThreadPool &operator=(ThreadPool &&)全部設(shè)置為private,表明禁止使用這些函數(shù)。其實C++11標(biāo)準(zhǔn)完成可以通過在函數(shù)聲明后加上= delete;的方式來禁用,源代碼以注釋的方式給出了這種實現(xiàn)方式。

下面分析該類中幾個比較重要的成員函數(shù)。

2.2.1 Push函數(shù)

Push函數(shù)的作用是將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊列q_。Push函數(shù)有兩個版本,一個允許任務(wù)函數(shù)f帶可變參數(shù)Rest &&... rest,一個不允許任務(wù)函數(shù)f帶額外參數(shù),函數(shù)體內(nèi)部代碼大同小異,下面以帶可變參數(shù)的版本進行分析,代碼如下:

template auto Push(F &&f, Rest &&... rest) -> std::future { // std::placeholders::_1表示通過std::bind函數(shù)綁定后得到的異步任務(wù)對象接受的第一個參數(shù)是自由參數(shù) auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); // 最好使用std::make_shared創(chuàng)建智能指針對象,后面不用操心指針內(nèi)存的釋放 // auto _f = std::make_shared>([pck](int id) { (*pck)(id); }); auto _f = new std::function([pck](int id) { (*pck)(id); }); q_.push(_f); // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_one(); return pck->get_future();}

Push函數(shù)的返回值為一個std::future對象,std::future對象內(nèi)存儲的數(shù)據(jù)類型由(f(0, rest...)函數(shù)的返回值類型確定,decltype(f(0, rest...))的作用就是獲取(f(0, rest...)函數(shù)的返回值類型。std::future提供一種異步操作結(jié)果的訪問機制,從字面意思來理解,它表示未來,我覺得這個名字非常貼切,因為一個異步操作的結(jié)果不可能馬上獲取,只能在未來某個時候得到。關(guān)于std::future,這篇博客【https://blog.csdn.net/yockie/article/details/50595958】講得挺不錯,大家可以借鑒。

因為任務(wù)函數(shù)f的聲明各式各樣,有的不帶參數(shù),有的接受一個參數(shù),有的接受兩個參數(shù)……因此不能將其直接存儲到任務(wù)隊列q_,于是先利用std::bind函數(shù)將其包裝為一個異步操作任務(wù)std::packaged_task對象pck(接受一個整型參數(shù),返回值類型為(f(0, rest...)函數(shù)的返回值類型),再利用Lambda表達式將pck包裝為一個std::function對象,這樣就可以存儲到任務(wù)隊列q_中了。這里原作者直接使用new運算符創(chuàng)建裸指針_f,后面還需想辦法釋放指針內(nèi)存,我認(rèn)為不是很合適,使用std::make_shared創(chuàng)建智能指針可以自動管理內(nèi)存,更加省事,但使用std::shared_ptr>智能指針就不能使用Queue::push(const T &value)版本將其存儲到任務(wù)隊列,為此我在Queue類中添加了一個接受右值引用參數(shù)的版本Queue::push(T &&value),使用該版本就可以順利將智能指針存儲進去了。

接下來,使用條件變量std::condition_variable對象cv_.notify_one()函數(shù)通知各個線程任務(wù)隊列已經(jīng)發(fā)生了改變,讓空閑線程趕緊從任務(wù)隊列中拉取新任務(wù)執(zhí)行;最后通過pck->get_future()返回一個std::future對象,以便調(diào)用者能從中取出函數(shù)執(zhí)行完畢后的返回值。

我看過很多C++多線程方面的書籍(”C++ Concurrency in Action”比較經(jīng)典),一般不對cv_.notify_one();進行加鎖操作,因為這樣做除了降低效率外,還很容易引起死鎖,故需去除加鎖操作,具體原因參見該網(wǎng)頁【https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one】以及另一個網(wǎng)頁【http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one】。

以下是修改后的版本:

template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future();}

2.2.2Pop函數(shù)

Pop函數(shù)的作用是從任務(wù)隊列q_中取出并返回一個任務(wù),代碼如下:

std::function Pop() { std::function *_f = nullptr; q_.pop(_f); // 如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f;}

首先,從從任務(wù)隊列q_中取出一個任務(wù)函數(shù)對象的裸指針_f,若非空,則將其賦值給std::function f并返回。該函數(shù)里使用一個小花招,即創(chuàng)建一個智能指針std::unique_ptr> func(_f),當(dāng)超出該對象的作用域時,就會在其析構(gòu)函數(shù)中調(diào)用delete運算符釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。

以下是修改后的版本:

std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f;}

2.2.3Stop函數(shù)

Stop函數(shù)的作用停止線程池工作,若不允許等待,則直接停止當(dāng)前正在執(zhí)行的工作線程,同時清空任務(wù)隊列;若允許等待,則等待當(dāng)前正在執(zhí)行的工作線程完成,代碼如下:

void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?{ ? ? ?// 這里不要加鎖,否則易引起死鎖 ? ? ? ?std::unique_lock lock(mutex_); cv_.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();}

函數(shù)中的布爾變量is_stop_、is_done_、flags_[i]為什么都不用加鎖呢?這是因為它們都是原子類型std::atomic,所謂原子類型就是一條CPU指令就能完成取值或?qū)懼挡僮鞯淖兞款愋汀++標(biāo)準(zhǔn)可保證std::atomic類型變量在任何架構(gòu)操作系統(tǒng)中均只使用一條CPU指令就可完成取值或?qū)懼挡僮?,其他形如std::atomic的類型,雖然將其聲明為原子類型,但在某些架構(gòu)操作系統(tǒng)中,并不能只使用一條CPU指令完成取值或?qū)懼挡僮?。綜上所述,std::atomic類型的變量可以在多線程中不加鎖操作。

根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除。

修改后的代碼如下:

void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();}

2.2.4ClearQueue函數(shù)

ClearQueue函數(shù)的作用是清空任務(wù)隊列q_,代碼如下:

void ClearQueue() { std::function *_f; // empty the queue while (q_.pop(_f)) { delete _f; }}

使用while循環(huán)從任務(wù)隊列q_中逐個彈出任務(wù)函數(shù)指針_f,因為_f使用new運算符創(chuàng)建,故需使用delete運算符刪除以釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必手工刪除對象來釋放內(nèi)存了。

以下是使用智能指針的版本:

void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } }

2.2.5Resize函數(shù)

Resize函數(shù)的作用是更改線程池內(nèi)工作線程的數(shù)量,代碼如下:

void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } { // stop the detached threads that were waiting // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_all(); } // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }}

如果兩個變量is_stop_、is_done_都不為真,表明線程池仍在使用,可以更改線程池內(nèi)工作線程的數(shù)量,否則沒必要對一個停用的線程池更改工作線程的數(shù)量。若新線程數(shù)n_threads大于當(dāng)前的工作線程數(shù)old_n_threads,則將工作線程數(shù)組threads_和線程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目,同時使用for循環(huán)調(diào)用SetThread(i)函數(shù)逐個重新創(chuàng)建工作線程;若新線程數(shù)n_threads小于當(dāng)前的工作線程數(shù)old_n_threads,則將先完成old_n_threads - n_threads個線程正在執(zhí)行的任務(wù),之后將工作線程數(shù)組threads_和線程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目。

根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除,具體原因參見該網(wǎng)頁

注意:Resize函數(shù)很危險,應(yīng)盡量少調(diào)用,若必須調(diào)用,則應(yīng)當(dāng)在創(chuàng)建線程池的那個線程內(nèi)調(diào)用,而不要在其他線程中調(diào)用。

修改的代碼如下:

void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }}

2.2.6SetThread函數(shù)

SetThread函數(shù)的作用重新創(chuàng)建指定序號i的工作線程,代碼如下:

void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::function *_f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue // 如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception // occurred // 執(zhí)行任務(wù)函數(shù) (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command // 這里必須使用std::unique_lock,因為后面條件變量cv_等待期間,需要解鎖。 std::unique_lock lock(mutex_); ++n_waiting_; // 等待任務(wù)隊列傳來的新任務(wù) cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() }

上述代碼看起來比較復(fù)雜,實際上只有三條語句,第一條是std::shared_ptr> flag(flags_[i]);,即使用flags_[i]來初始化標(biāo)志變量flag;第二條看起來很長,實際上就是創(chuàng)建一個Lambda表達式變量f;第三條是threads_[i].reset(new std::thread(f));,使用Lambda表達式變量f作為工作線程的任務(wù)函數(shù),創(chuàng)建序號為i的工作線程。

那么Lambda表達式變量f何時啟動呢?當(dāng)任務(wù)隊列q_.pop(_f)的返回值為true時,表明從任務(wù)隊列q_中取到了一個新任務(wù),于是調(diào)用(*_f)(i);執(zhí)行之,如果當(dāng)前任務(wù)隊列沒有任務(wù),則使用:

cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag;});

等待新任務(wù)的到來,在新任務(wù)到來之前,當(dāng)前工作線程處于休眠狀態(tài)。

該函數(shù)同樣使用一個小花招,即創(chuàng)建一個智能指針std::unique_ptr> func(_f),當(dāng)超出該對象的作用域時,就會在其析構(gòu)函數(shù)中調(diào)用delete運算符釋放內(nèi)存。如果任務(wù)隊列q_中存儲的是智能指針,就不必使用這種小花招來釋放內(nèi)存了。

2.2.7修改后的ThreadPool類代碼

為完整起見,這里給出修改后的ThreadPool類代碼。

class ThreadPool { public: ThreadPool() { Init(); } explicit ThreadPool(int n_threads) { Init(); Resize(n_threads); } // the destructor waits for all the functions in the queue to be finished ~ThreadPool() { Stop(true); } // get the number of running threads in the pool int size() { return static_cast(threads_.size()); } // number of idle threads int NumIdle() { return n_waiting_; } std::thread &GetThread(const int i) { return *(threads_[i]); } // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, // also with stop() // n_threads must be >= 0 void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } } } // empty the queue void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } // pops a functional wrapper to the original function std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f; } // wait for all computing threads to finish and stop all threads // may be called asynchronously to not pause the calling thread while waiting // if is_wait == true, all the functions in the queue are run, otherwise the // queue is cleared without running the functions void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear(); } template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } // run the user's function that excepts argument int - id of the running // thread. returned value is templatized // operator returns std::future, where the user can get the result and rethrow // the catched exceptins template auto Push(F &&f) -> std::future { auto pck = std::make_shared>( std::forward(f)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } private: // deleted ThreadPool(const ThreadPool &); // = delete; ThreadPool(ThreadPool &&); // = delete; ThreadPool &operator=(const ThreadPool &); // = delete; ThreadPool &operator=(ThreadPool &&); // = delete; void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::shared_ptr> _f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command { std::unique_lock lock(mutex_); ++n_waiting_; cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } void Init() { is_stop_ = false; is_done_ = false; n_waiting_ = 0; } std::vector> threads_; std::vector>> flags_; detail::Queue>> q_; std::atomic is_done_; std::atomic is_stop_; std::atomic n_waiting_; // how many threads are waiting std::mutex mutex_; std::condition_variable cv_;};

2.2.8增加的單元測試代碼

為檢驗修改后代碼的正確性,增添如下單元測試代碼。第一個待測試函數(shù)filter_duplicates_str接受的第一個參數(shù)為一個整型ID值,我在測試代碼中只是將其作為一個占位符,實際并未使用,后面接受四個C風(fēng)格字符串,該函數(shù)的任務(wù)是去除四個字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回;第二個待測試函數(shù)filter_duplicates只接受的一個整型ID值參數(shù),我在測試代碼中只是將其作為一個占位符,實際并未使用,該函數(shù)的任務(wù)是去除一串固定字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回。因為C++編譯器不能推導(dǎo)出重載函數(shù)的正確版本,因此第二個待測函數(shù)并未使用重載函數(shù)形式。兩個待測函數(shù)均使用線程池執(zhí)行1000次,最后檢查返回結(jié)果與預(yù)期結(jié)果的一致性。

#include "modules/common/util/ctpl_stl.h"#include #include #include #include #include #include #include "gtest/gtest.h"namespace apollo {namespace common {namespace util {namespace {// ...// Attention: don't use overloaded functions, otherwise the compiler can't// deduce the correct edition.std::string filter_duplicates_str(int id, const char* str1, const char* str2, const char* str3, const char* str4) { // id is unused. std::stringstream ss_in; ss_in << str1 << " " << str2 << " " << str3 << " " << str4; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}std::string filter_duplicates(int id) { // id is unused. std::stringstream ss_in; ss_in << "a a b b b c foo foo bar foobar foobar hello world hello hello world"; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}} // namespaceTEST(ThreadPool, filter_duplicates) { const unsigned int hardware_threads = std::thread::hardware_concurrency(); const unsigned int threads = std::min(hardware_threads != 0 ? hardware_threads : 2, 50U); ThreadPool p(threads); std::vector> futures1, futures2; for (int i = 0; i < 1000; ++i) { ? ?futures1.push_back(std::move(p.Push( ? ? ? ?filter_duplicates_str, "thread pthread", "pthread thread good news", ? ? ? ?"today is a good day", "she is a six years old girl"))); ? ?futures2.push_back(std::move(p.Push(filter_duplicates))); ?} ?for (int i = 0; i < 1000; ++i) { ? ?std::string result1 = futures1[i].get(); ? ?std::string result2 = futures2[i].get(); ? ?EXPECT_STREQ( ? ? ? ?result1.c_str(), ? ? ? ?"a day girl good is news old pthread she six thread today years "); ? ?EXPECT_STREQ(result2.c_str(), "a b bar c foo foobar hello world "); ?}}} ?// namespace util} ?// namespace common} ?// namespace apollo

{ 3 }

Apollo Planning模塊對于線程池的使用分析

Apollo Planning模塊通過PlanningThreadPool類來完成對線程池ThreadPool的包裝調(diào)用。PlanningThreadPool類位于頭文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.h及對應(yīng)的實現(xiàn)文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.cc中,位于命名空間apollo::planning內(nèi)。

1PlanningThreadPool類

PlanningThreadPool類的聲明如下:

class PlanningThreadPool { public: void Init(); void Stop() { if (thread_pool_) { thread_pool_->Stop(true); } } template void Push(F &&f, Rest &&... rest) { func_.push_back(std::move(thread_pool_->Push(f, rest...))); } template void Push(F &&f) { func_.push_back(std::move(thread_pool_->Push(f))); } void Synchronize(); private: std::unique_ptr thread_pool_; bool is_initialized = false; // 這里的func_用得非常不恰當(dāng),因為這里保存的是std::future對象, // 而非std::function對象,將其修改為futures_很有必要。 std::vector> func_; DECLARE_SINGLETON(PlanningThreadPool);};

PlanningThreadPool通過宏DECLARE_SINGLETON定義一個單實例類,因此不能直接在棧(stack)和堆(heap)上創(chuàng)建該類對象,而只能通過PlanningThreadPool::instance()獲取該類的唯一實例。該類中的成員變量func_非常具有誤導(dǎo)性,實際上它是一個保存著多個std::future對象的動態(tài)數(shù)組,而不是保存std::function對象,也就是說它保存的是函數(shù)的異步返回值對象,而非異步函數(shù)對象本身,因此這里將其修改為futures_很有必要。

2PlanningThreadPool類的使用

在Planning模塊使用PlanningThreadPool類的步驟如下:

3.2.1初始化線程池

在Planning::Init()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句完成PlanningThreadPool類對象的初始化:

// initialize planning thread pool PlanningThreadPool::instance()->Init();

3.2.2 利用線程池完成并發(fā)處理

在合適的位置調(diào)用線程池完成某個功能的并發(fā)處理,一般而言是在某個循環(huán)體內(nèi)。注意:需進行并發(fā)處理的任務(wù),相互之間不能有先后依賴關(guān)系,因為使用線程池執(zhí)行并發(fā)任務(wù)時根本不知道哪個任務(wù)會先執(zhí)行,哪個任務(wù)會后執(zhí)行。

Planning模塊目前在以下幾處使用了線程池:

ReferenceLineInfo::AddObstacles函數(shù)ReferenceLineInfo::AddObstacles函數(shù)(位于[your_apollo_root_dir]/modules/planning/common/reference_line_info.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),用于增加當(dāng)前的障礙物信息,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。

bool ReferenceLineInfo::AddObstacles(const std::vector& obstacles) {if (FLAGS_use_multi_thread_to_add_obstacles) {std::vector ret(obstacles.size(), 0);for (size_t i = 0; i < obstacles.size(); ++i) { ?const auto* obstacle = obstacles.at(i); ?PlanningThreadPool::instance()->Push(std::bind( &ReferenceLineInfo::AddObstacleHelper, this, obstacle, &(ret[i])));}PlanningThreadPool::instance()->Synchronize();if (std::find(ret.begin(), ret.end(), 0) != ret.end()) { return false;}} else {// ...}return true;}

DPRoadGraph::GenerateMinCostPath函數(shù)DPRoadGraph::GenerateMinCostPath函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_poly_path/dp_road_graph.cc中)在每級航點(way point)上多個橫向采樣點的for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),用于計算本級航點的最小代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。

bool DPRoadGraph::GenerateMinCostPath( const std::vector &obstacles, std::vector *min_cost_path) { // ... for (std::size_t level = 1; level < path_waypoints.size(); ++level) { ? ?const auto &prev_dp_nodes = graph_nodes.back(); ? ?const auto &level_points = path_waypoints[level]; ? ?graph_nodes.emplace_back(); ? ?for (size_t i = 0; i < level_points.size(); ++i) { ? ? ?const auto &cur_point = level_points[i]; ? ? ?graph_nodes.back().emplace_back(cur_point, nullptr); ? ? ?auto &cur_node = graph_nodes.back().back(); ? ? ?if (FLAGS_enable_multi_thread_in_dp_poly_path) { ? ? ? ?PlanningThreadPool::instance()->Push(std::bind( &DPRoadGraph::UpdateNode, this, std::ref(prev_dp_nodes), level, total_level, &trajectory_cost, &(front), &(cur_node))); } else { UpdateNode(prev_dp_nodes, level, total_level, &trajectory_cost, &front, &cur_node); } } if (FLAGS_enable_multi_thread_in_dp_poly_path) { PlanningThreadPool::instance()->Synchronize(); } } // ...}

DpStGraph::CalculateTotalCost函數(shù)DpStGraph::CalculateTotalCost函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_st_speed/dp_st_graph.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線程池任務(wù),對于時間采樣值c上的不同距離采樣值r: next_lowest_row<=r<=next_highest_row計算抵達節(jié)點(c, r)的最小總代價,使用PlanningThreadPool::instance()->Synchronize()等待線程池任務(wù)全部完成。

Status DpStGraph::CalculateTotalCost() { // col and row are for STGraph // t corresponding to col // s corresponding to row uint32_t next_highest_row = 0; uint32_t next_lowest_row = 0; for (size_t c = 0; c < cost_table_.size(); ++c) { ? ?int highest_row = 0; ? ?int lowest_row = cost_table_.back().size() - 1; ? ?for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?if (FLAGS_enable_multi_thread_in_dp_st_graph) { ? ? ? ?PlanningThreadPool::instance()->Push( std::bind(&DpStGraph::CalculateCostAt, this, c, r)); } else { CalculateCostAt(c, r); } } if (FLAGS_enable_multi_thread_in_dp_st_graph) { PlanningThreadPool::instance()->Synchronize(); } for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?const auto& cost_cr = cost_table_[c][r]; ? ? ?if (cost_cr.total_cost() < std::numeric_limits::infinity()) { int h_r = 0; int l_r = 0; GetRowRange(cost_cr, &h_r, &l_r); highest_row = std::max(highest_row, h_r); lowest_row = std::min(lowest_row, l_r); } } next_highest_row = highest_row; next_lowest_row = lowest_row; } return Status::OK();}

3.2.3 銷毀線程池

在Planning::Stop()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語句以便 銷毀線程池:

1 PlanningThreadPool::instance()->Stop();

自Apollo平臺開放已來,我們收到了大量開發(fā)者的咨詢和反饋,越來越多開發(fā)者基于Apollo擦出了更多的火花,并愿意將自己的成果貢獻出來,這充分體現(xiàn)了Apollo『貢獻越多,獲得越多』的開源精神。為此我們開設(shè)了『開發(fā)者說』板塊,希望開發(fā)者們能夠踴躍投稿,更好地為廣大自動駕駛開發(fā)者營造一個共享交流的平臺!

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 源代碼
    +關(guān)注

    關(guān)注

    96

    文章

    2943

    瀏覽量

    66617
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    56

    瀏覽量

    6826
  • Apollo
    +關(guān)注

    關(guān)注

    5

    文章

    340

    瀏覽量

    18378

原文標(biāo)題:開發(fā)者說 | Apollo項目線程池技術(shù)淺析

文章出處:【微信號:Apollo_Developers,微信公眾號:Apollo開發(fā)者社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    Java中的線程包括哪些

    java.util.concurrent 包來實現(xiàn)的,最主要的就是 ThreadPoolExecutor 。 Executor: 代表線程的接口,有一個 execute() 方法,給一個 Runnable 類型對象
    的頭像 發(fā)表于 10-11 15:33 ?777次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實現(xiàn)的

    線程的概念是什么?線程是如何實現(xiàn)的?
    發(fā)表于 02-28 06:20

    基于線程技術(shù)集群接入點的應(yīng)用研究

    本文在深入研究高級線程技術(shù)的基礎(chǔ)上,分析、研究了固定線程數(shù)目的線程
    發(fā)表于 01-22 14:21 ?5次下載

    java自帶的線程方法

    二、原理分析 從上面使用線程的例子來看,最主要就是兩步,構(gòu)造ThreadPoolExecutor對象,然后每來一個任務(wù),就調(diào)用ThreadPoolExecutor對象的execute方法。 1
    發(fā)表于 09-27 11:06 ?0次下載

    基于Nacos的簡單動態(tài)化線程實現(xiàn)

    本文以Nacos作為服務(wù)配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實現(xiàn)一個簡單的動態(tài)化線程
    發(fā)表于 01-06 14:14 ?830次閱讀

    線程線程

    線程通常用于服務(wù)器應(yīng)用程序。 每個傳入請求都將分配給線程池中的一個線程,因此可以異步處理請求,而不會占用主線程,也不會延遲后續(xù)請求的處理
    的頭像 發(fā)表于 02-28 09:53 ?742次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    Java線程核心原理

    看過Java線程源碼的小伙伴都知道,在Java線程池中最核心的就是ThreadPoolExecutor,
    的頭像 發(fā)表于 04-21 10:24 ?816次閱讀

    細數(shù)線程的10個坑

    JDK開發(fā)者提供了線程的實現(xiàn),我們基于Executors組件,就可以快速創(chuàng)建一個線程
    的頭像 發(fā)表于 06-16 10:11 ?699次閱讀
    細數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10個坑

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態(tài)也是616條,這個點就非常可疑了,我斷定就是這個pool開頭線程導(dǎo)致的問題。我們先排查為何這個
    發(fā)表于 07-31 10:49 ?2206次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    Spring 的線程應(yīng)用

    。 使用@Async聲明多線程 SpringBoot 提供了注解 @Async 來使用線程, 具體使用方法如下: 在啟動(配置)添加
    的頭像 發(fā)表于 10-13 10:47 ?585次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應(yīng)用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優(yōu)勢 C++線程
    的頭像 發(fā)表于 11-10 10:24 ?465次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執(zhí)行效
    的頭像 發(fā)表于 11-10 16:37 ?483次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    線程七大核心參數(shù)執(zhí)行順序

    線程是一種用于管理和調(diào)度線程執(zhí)行的技術(shù),通過將任務(wù)分配到線程池中的線程進行處理,可以有效地控制
    的頭像 發(fā)表于 12-04 16:45 ?913次閱讀

    線程的創(chuàng)建方式有幾種

    線程是一種用于管理和調(diào)度線程技術(shù),能夠有效地提高系統(tǒng)的性能和資源利用率。它通過預(yù)先創(chuàng)建一組線程并維護一個工作隊列,將任務(wù)提交給
    的頭像 發(fā)表于 12-04 16:52 ?801次閱讀

    什么是動態(tài)線程?動態(tài)線程的簡單實現(xiàn)思路

    因此,動態(tài)可監(jiān)控線程一種針對以上痛點開發(fā)的線程管理工具。主要可實現(xiàn)功能有:提供對 Spring 應(yīng)用內(nèi)線程
    的頭像 發(fā)表于 02-28 10:42 ?567次閱讀