0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

C語言線程池的實(shí)現(xiàn)方案

汽車電子技術(shù) ? 來源:老吳的嵌入式之旅 ? 作者:吳偉東Jack ? 2023-01-29 16:43 ? 次閱讀

一、簡介

https://github.com/Pithikos/C-Thread-Pool

這是一個(gè)簡單小巧的C語言線程池實(shí)現(xiàn),在 Github 上有 1.1K 的 star,很適合用來學(xué)習(xí) Linux 的多線程編程。

另外,里面還涉及到了信號、隊(duì)列、同步等知識點(diǎn),代碼讀起來還是挺過癮的。

特點(diǎn):

  • 符合 ANCI C and POSIX;
  • 支持暫停/恢復(fù)/等待功能;
  • 簡潔的 API;
  • 經(jīng)過嚴(yán)格的測試,附帶了豐富的測試用例;

二、使用

快速上手

example.c:

#include "thpool.h"

void task(void *arg){
 printf("Thread #%u working on %d
", (int)pthread_self(), (int) arg);
}

int main(){
 
 puts("Making threadpool with 4 threads");
 threadpool thpool = thpool_init(4);

 puts("Adding 10 tasks to threadpool");
 int i;
 for (i=0; i<8; i++){
  thpool_add_work(thpool, task, (void*)(uintptr_t)i);
 };

 thpool_wait(thpool);
 puts("Killing threadpool");
 thpool_destroy(thpool);
 
 return 0;
}

運(yùn)行效果:

$ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example

$ ./example
Making threadpool with 4 threads
THPOOL_DEBUG: Created thread 0 in pool 
THPOOL_DEBUG: Created thread 1 in pool 
THPOOL_DEBUG: Created thread 2 in pool 
THPOOL_DEBUG: Created thread 3 in pool 
Adding 10 tasks to threadpool
Thread #1509455616 working on 0
Thread #1509455616 working on 4
Thread #1509455616 working on 5
Thread #1492670208 working on 2
Thread #1492670208 working on 7
Thread #1509455616 working on 6
Thread #1501062912 working on 1
Thread #1517848320 working on 3
Killing threadpool

代碼分析:

  • threadpool thpool = thpool_init(4) 創(chuàng)建了一個(gè)含有 4 個(gè)線程的線程池;
  • 然后調(diào)用 thpool_add_work(thpool, ...) 往線程池里放入了 8 個(gè)任務(wù);
  • 從結(jié)果來看:
    • 線程5616 搶到了任務(wù) 0 / 4 / 5 / 6;
    • 線程0208 搶到了任務(wù) 2 / 7;
    • 線程2919 搶到了任務(wù) 1;
    • 線程8320 搶到了任務(wù) 3;

API 簡介

示例 作用
thpool_init(4) 創(chuàng)建一個(gè)含有 4 個(gè)線程的線程池。
* thpool_add_work(thpool, (void)function_p, (void )arg_p)* ** 添加任務(wù), function_p 是任務(wù)要執(zhí)行的函數(shù),arg_p 是 function_p 的參數(shù)
thpool_wait(thpool) 等待所有任務(wù)完成。
thpool_destroy(thpool) 銷毀線程池,如果還有任務(wù)在執(zhí)行,則會(huì)先等待其完成。
thpool_pause(thpool) 讓所有的線程都停止工作,進(jìn)入睡眠狀態(tài)。
thpool_resume(thpool) 讓所有的線程都恢復(fù)工作。
thpool_num_threads_working(thpool) 返回當(dāng)前正在工作的線程數(shù)。

三、內(nèi)部實(shí)現(xiàn)

整體把握

核心代碼就是 2 個(gè)文件:thpool.c 和 thpool.h。

分解 thpool.c

7 個(gè)公共函數(shù):

struct thpool_* thpool_init(int num_threads) 
int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p) 
void thpool_wait(thpool_* thpool_p) 
void thpool_destroy(thpool_* thpool_p) 
void thpool_pause(thpool_* thpool_p) 
void thpool_resume(thpool_* thpool_p) 
int thpool_num_threads_working(thpool_* thpool_p)

正好就是前面說過的 7 個(gè) API,稍后重點(diǎn)分析。

5 個(gè)自定義的數(shù)據(jù)結(jié)構(gòu):

// 描述一個(gè)信號量
typedef struct bsem {...} bsem;

// 描述一個(gè)任務(wù)
typedef struct job {...} job;

// 描述一個(gè)任務(wù)隊(duì)列
typedef struct jobqueue {...} jobqueue;

// 描述一個(gè)線程
typedef struct thread {...} thread;

// 描述一個(gè)線程池
typedef struct thpool_ {...} thpool_;

14 個(gè)私有函數(shù):

// 構(gòu)造 struct thread,并調(diào)用 pthread_create() 創(chuàng)建線程
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id) 

// 當(dāng)線程被暫停時(shí)會(huì)在這里休眠
static void thread_hold(int sig_id) 

// 線程在此函數(shù)中執(zhí)行任務(wù)
static void* thread_do(struct thread* thread_p) 

// 銷毀 struct thread
static void thread_destroy (thread* thread_p) 

// 任務(wù)隊(duì)列相關(guān)的操作集合
static int jobqueue_init(jobqueue* jobqueue_p) 
static void jobqueue_clear(jobqueue* jobqueue_p) 
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob) 
static struct job* jobqueue_pull(jobqueue* jobqueue_p) 
static void jobqueue_destroy(jobqueue* jobqueue_p) 

// 信號量相關(guān)的操作集合
static void bsem_init(bsem *bsem_p, int value) 
static void bsem_reset(bsem *bsem_p) 
static void bsem_post(bsem *bsem_p) 
static void bsem_post_all(bsem *bsem_p) 
static void bsem_wait(bsem* bsem_p)

核心 API 的實(shí)現(xiàn)

1. thpool_init()

該函數(shù)用于創(chuàng)建一個(gè)線程池,先明確線程池的定義:

typedef struct thpool_{
 thread**   threads;                  /* pointer to threads        */
 volatile int num_threads_alive;      /* threads currently alive   */
 volatile int num_threads_working;    /* threads currently working */
 pthread_mutex_t  thcount_lock;       /* used for thread count etc */
 pthread_cond_t  threads_all_idle;    /* signal to thpool_wait     */
 jobqueue  jobqueue;                  /* job queue                 */
} thpool_;

thpool_init() 的實(shí)現(xiàn)思路:

  1. 分配 struct thpool_:
    • malloc(sizeof(struct thpool_))
  2. 初始化 struct thpool_;
    • malloc(num_threads * sizeof(struct thread *))
    • thread_init(thpool_p, &thpool_p->threads[n], n);
    • jobqueue_init(&thpool_p->jobqueue)
    • 初始化 jobqueue:
    • 創(chuàng)建用戶指定數(shù)目的線程,用一個(gè)二級指針來指向這一組線程;
  3. 返回 struct thpool_ *;

2. thpool_add_work()

該函數(shù)用于往線程池里添加一個(gè)任務(wù),先明確任務(wù)的定義:

typedef struct job{
 struct job*  prev; /* pointer to previous job */
 void   (*function)(void* arg);  /* function pointer */
 void*  arg;  /* function's argument */
} job;

程序里是用隊(duì)列來管理任務(wù)的,這里的 job 首先是一個(gè)隊(duì)列節(jié)點(diǎn),攜帶的數(shù)據(jù)是 function + arg。

thpool_add_work 的實(shí)現(xiàn)思路:

  1. 分配 struct job:
    • malloc(sizeof(struct job))
  2. 初始化 struct job;
    • newjob->function=function_p;
    • newjob->arg=arg_p;
  3. 添加到隊(duì)列中:
    • jobqueue_push(&thpool_p->jobqueue, newjob);

3. thpool_pause() 和 thpool_resume()

thpool_pause() 用于暫停所有的線程,通過信號機(jī)制來實(shí)現(xiàn):

void thpool_pause(thpool_* thpool_p) {
 int n;
 for (n=0; n < thpool_p->num_threads_alive; n++){
  pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
 }
}

給所有工作線程發(fā)送 SIGUSR1,該信號的處理行為就是讓線程休眠:

static void thread_hold(int sig_id) {
    (void)sig_id;
 threads_on_hold = 1;
 while (threads_on_hold){
  sleep(1);
 }
}

只需要 thpool_resume() 中,將 threads_on_hold = 0,就可以讓線程返回到原來被中止時(shí)的工作狀態(tài)。

4. thpool_wait()

wait 的實(shí)現(xiàn)比較簡單,只要還有任務(wù)或者還有線程處于工作狀態(tài),就執(zhí)行 pthread 的 wait 操作:

while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
  pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
 }

到此,我感覺已經(jīng)沒有太多難點(diǎn)了,感興趣的小伙伴們可以自行查閱源碼。

四、測試用例

優(yōu)秀的開源項(xiàng)目通常會(huì)附帶豐富的測試用例,此項(xiàng)目也不例外:

  • memleaks.sh:測試是否發(fā)生內(nèi)存泄露;
  • threadpool.sh: 測試線程池是否能正確地執(zhí)行任務(wù);
  • pause_resume.sh:測試 pause 和 resume 是否正常;
  • wait.sh:測試 wait 功能是否正常;
  • heap_stack_garbage:測試堆棧內(nèi)有垃圾數(shù)據(jù)時(shí)的情況;
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • Linux
    +關(guān)注

    關(guān)注

    87

    文章

    11207

    瀏覽量

    208717
  • C語言
    +關(guān)注

    關(guān)注

    180

    文章

    7594

    瀏覽量

    135858
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    56

    瀏覽量

    6826
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    503

    瀏覽量

    19636
  • GitHub
    +關(guān)注

    關(guān)注

    3

    文章

    465

    瀏覽量

    16359
收藏 人收藏

    評論

    相關(guān)推薦

    跨平臺的線程組件--TP組件

    問題產(chǎn)生 無論是Linux,RTOS,還是Android等開發(fā),我們都會(huì)用到多線程編程;但是往往很多人在編程時(shí),都很隨意的創(chuàng)建/銷毀線程的策略來實(shí)現(xiàn)線程編程;很明顯這是不合理的做法,
    的頭像 發(fā)表于 04-06 15:39 ?840次閱讀

    Java中的線程包括哪些

    java.util.concurrent 包來實(shí)現(xiàn)的,最主要的就是 ThreadPoolExecutor 類。 Executor: 代表線程的接口,有一個(gè) execute() 方法,給一個(gè) Runnable 類型對象
    的頭像 發(fā)表于 10-11 15:33 ?777次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實(shí)現(xiàn)

    線程的概念是什么?線程是如何實(shí)現(xiàn)的?
    發(fā)表于 02-28 06:20

    基于Nacos的簡單動(dòng)態(tài)化線程實(shí)現(xiàn)

    本文以Nacos作為服務(wù)配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)一個(gè)簡單的動(dòng)態(tài)化線程
    發(fā)表于 01-06 14:14 ?830次閱讀

    線程線程

    線程通常用于服務(wù)器應(yīng)用程序。 每個(gè)傳入請求都將分配給線程池中的一個(gè)線程,因此可以異步處理請求,而不會(huì)占用主線程,也不會(huì)延遲后續(xù)請求的處理
    的頭像 發(fā)表于 02-28 09:53 ?742次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    如何用C++實(shí)現(xiàn)一個(gè)線程呢?

    C++線程是一種多線程管理模型,把線程分成任務(wù)執(zhí)行和線程調(diào)度兩部分。
    發(fā)表于 06-08 14:53 ?1696次閱讀
    如何用<b class='flag-5'>C</b>++<b class='flag-5'>實(shí)現(xiàn)</b>一個(gè)<b class='flag-5'>線程</b><b class='flag-5'>池</b>呢?

    細(xì)數(shù)線程的10個(gè)坑

    JDK開發(fā)者提供了線程實(shí)現(xiàn)類,我們基于Executors組件,就可以快速創(chuàng)建一個(gè)線程
    的頭像 發(fā)表于 06-16 10:11 ?699次閱讀
    細(xì)數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10個(gè)坑

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態(tài)也是616條,這個(gè)點(diǎn)就非??梢闪?,我斷定就是這個(gè)pool開頭線程導(dǎo)致的問題。我們先排查為何這個(gè)
    發(fā)表于 07-31 10:49 ?2206次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    Spring 的線程應(yīng)用

    我們在日常開發(fā)中,經(jīng)常跟多線程打交道,Spring 為我們提供了一個(gè)線程方便我們開發(fā),它就是 ThreadPoolTaskExecutor ,接下來我們就來聊聊 Spring 的線程
    的頭像 發(fā)表于 10-13 10:47 ?585次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應(yīng)用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優(yōu)勢 C++線程
    的頭像 發(fā)表于 11-10 10:24 ?465次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執(zhí)行效
    的頭像 發(fā)表于 11-10 16:37 ?483次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    如何用C語言實(shí)現(xiàn)線程

    線程是一種多線程處理形式,大多用于高并發(fā)服務(wù)器上,它能合理有效的利用高并發(fā)服務(wù)器上的線程資源;線程與進(jìn)程用于處理各項(xiàng)分支子功能,我們通常的
    的頭像 發(fā)表于 11-13 10:41 ?980次閱讀
    如何用<b class='flag-5'>C</b><b class='flag-5'>語言實(shí)現(xiàn)</b><b class='flag-5'>線程</b><b class='flag-5'>池</b>

    基于C++11的線程實(shí)現(xiàn)

    C++11 加入了線程庫,從此告別了標(biāo)準(zhǔn)庫不支持并發(fā)的歷史。然而 c++ 對于多線程的支持還是比較低級,稍微高級一點(diǎn)的用法都需要自己去實(shí)現(xiàn)
    的頭像 發(fā)表于 11-13 15:29 ?706次閱讀

    線程的創(chuàng)建方式有幾種

    線程是一種用于管理和調(diào)度線程的技術(shù),能夠有效地提高系統(tǒng)的性能和資源利用率。它通過預(yù)先創(chuàng)建一組線程并維護(hù)一個(gè)工作隊(duì)列,將任務(wù)提交給線程
    的頭像 發(fā)表于 12-04 16:52 ?801次閱讀

    什么是動(dòng)態(tài)線程?動(dòng)態(tài)線程的簡單實(shí)現(xiàn)思路

    因此,動(dòng)態(tài)可監(jiān)控線程一種針對以上痛點(diǎn)開發(fā)的線程管理工具。主要可實(shí)現(xiàn)功能有:提供對 Spring 應(yīng)用內(nèi)
    的頭像 發(fā)表于 02-28 10:42 ?567次閱讀