第三章 · 动手写一个线程池
从函数指针到条件变量,一步步实现多线程调度器
全新篇章:自己动手写一个线程池!
在这一章,也就是 Chapter 3,我们会自己动手实现一个多线程调度器——线程池(ThreadPool)!听起来是不是超级厉害 (≧∇≦)ノ
这会有一点难度,但只要理清逻辑,耐心肯定能解决的。在写代码之前,我们需要先搞明白,多线程到底会遇到什么绝境,以及前人们为了活命,被迫发明了什么工具。
什么是“任务”和“队列”?
我们要建一个线程池,里面预先创建好几个工作线程 (Worker Thread)。既然要工作,首先得有任务。主线程 (Main Thread) 会源源不断地把任务提交进来,工作线程就从里面拿任务去执行。
这个存放任务的地方,就是任务队列 (Task Queue)。
在 C 语言中,一个“任务”到底是什么呢?
想象一下,这个线程池是一个通用的组件。它不关心具体的业务,今天可能被主线程叫去修图片,明天可能被叫去算工资。如果在线程池的代码里写死一句 process_image();,那这个组件就彻底绑死在图片处理上了,再也干不了别的活。
我们怎么把“一段未知的代码”或者“一个动作”当成参数,塞进队列里传给工作线程呢? 我们平时用的变量只能存数字、存字符,怎么才能存一个“动作”?
为了解决这个问题,C 语言的前辈们提供了一个极其强大的武器:函数指针 (Function Pointer)。
如果之前没有专门接触过底层语言,你可能会疑惑:函数指针和普通指针有什么区别?
- 普通指针(比如
int*或者char*):它们指向的是数据(变量、数组等)。它们告诉 CPU:“去这个内存地址拿点材料过来算一算”。 - 函数指针:它指向的是代码指令。我们写好的函数代码,在编译运行后,其实也是静静地躺在内存里的某一块特定区域。函数指针保存的,就是这段代码在内存里的“入口首地址”。它告诉 CPU:“直接跳到这个地址,开始执行那里的动作”。
来看看 thread_pool.h 里的定义,虽然它的语法看起来有点像天书:
typedef void (*thread_task_fn)(void*);
typedef struct {
thread_task_fn fn;
void* arg;
} ThreadTask;typedef void (*thread_task_fn)(void*);:怎么拆解这句语法呢?- 核心在中间的
(*thread_task_fn),那个*明确告诉你,这是一个指针。 - 右边的
(void*)表示这个函数需要接收一个void*类型的参数。 - 左边的
void表示这个函数执行完不返回任何东西。 - 连起来就是:定义了一种名为
thread_task_fn的专门的“动作指针类型”。主线程只要把想要执行的函数地址存进fn里,工作线程从队列里拿出这个任务后,直接调用fn(arg);,就能顺藤摸瓜,完美执行主线程临时指派的任何动作了。
- 核心在中间的
void* arg;:除了动作,还得有干活用的材料(参数)。既然工作线程一开始不知道具体是什么任务,自然也不知道参数到底是什么类型(是整型?是字符串?还是结构体?)。void*就是 C 语言里的“万能盲盒”,任何类型的数据地址都能被塞进这个盲盒打包送进去。等真正进入了那个特定的任务函数内部,开发者自己心里清楚盒子里装的是什么,再把它强转回原来的真实类型拿出来用就行了。
为了高效利用内存,我们在 ThreadPool 结构体中把队列设计成了环形队列 (Circular Queue):
// 主线程塞入任务后,尾指针后移:
pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
// 工作线程取出任务后,头指针后移:
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;用固定大小的数组配合取模运算 %,就能实现一个无限循环利用的流水线!数学老师说过的取模终于有用了
什么是“互斥锁”?
现在流水线有了,工作线程也准备就绪了。但如果你直接让他们去队列里拿任务,灾难就会发生。
想象一下:工作线程 A 看到 queue_head 是 0,准备去拿 0 号任务;但就在他刚伸出手的一瞬间,时间片轮转,工作线程 B 也看到了 queue_head 是 0,也去拿 0 号任务。最终不仅任务被重复执行,queue_head 还会被加两次,甚至引发段错误,整个队列瞬间崩溃。
这里的问题在于,读取 queue_head、取出任务、修改 queue_head 这一系列动作不是原子的(它们随时可以被打断)。
是谁打断了它们?答案是:中断 (Interrupt)。
如果你之前没有接触过底层,你可能会问:什么是中断? 其实,CPU 本质上是一个极其庞大但又无比刻板的自动状态机。它只要通了电,就会不知疲倦地一条接一条地执行指令。但如果它永远只执行眼前的代码,那系统一旦卡死或者进入死循环,整个电脑就彻底没救了。 所以硬件工程师发明了“中断”。这就像在 CPU 的大脑里埋了一个定时炸弹(时钟发生器)。每隔几毫秒,炸弹就会“砰”地炸一下(触发时钟中断)。不管 CPU 此时正在执行多么重要、多么神圣的代码,它都必须无条件立刻停下手头的工作,把控制权强制交还给操作系统的调度器。 正是依靠这种极其霸道的时钟中断机制,操作系统才能强行把 CPU 从一个线程手里抢过来,切给另一个线程,从而让你感觉几百个程序在“同时”运行。
时序混乱
如果工作线程 A 刚读完 queue_head 就遭遇了时钟中断,被强行“冻结”调度走;工作线程 B 刚好接管了 CPU,他就会读取到一样的 queue_head,然后拿走同一个任务。等 A 再被调度回来解冻时,他还以为自己手里拿到的是独一无二的任务。灾难就此发生!
像队列这种只要时序不对就会导致灾难、同一时刻只能被一个线程安全操作的资源,我们称之为临界资源。
为了解决这个绝境,前人们发明了一种机制来抵抗这种时序混乱。这种机制就像公共卫生间门上的那个“有人/无人”的标志牌——当你进去后,把牌子翻到“有人”(上锁),如果此时你被“中断”打断去休息了,别人因为拿不到锁,在外面就只能干等(空转或睡眠);等你回来干完活出来时,再把牌子翻回“无人”(解锁)。
最后,我们来给这个伟大的发明下个彻底的定义。 这在计算机里被称为互斥锁 (Mutex, Mutual Exclusion)。 什么是互斥? 互斥就是人为地制造一种绝对的排他性。在微观的 CPU 状态机世界里,所有的指令都是交错执行、充满时序变数的。而互斥,就是硬生生地在这片混沌之中,强行劈出一块绝对安全的“静止时空”(临界区)。在这个时空里,你是唯一的霸主,没有中断的威胁,没有并发的干扰。
它本质上就是内存里的一个变量。但神奇的是,操作系统和 CPU 硬件为它提供了特殊的指令保证:无论有多少个线程同时去抢这个锁,最终有且只有 1 个线程能成功把它设置为“已锁定”状态。
pthread_mutex_lock(&pool->mutex); // 抢锁!抢不到的线程只能在门外乖乖排队睡觉(被操作系统挂起)
// ... 安全地操作队列 (取出任务,修改 queue_head) ...
// 这里的代码被称为“临界区”
pthread_mutex_unlock(&pool->mutex); // 完事了,把锁解开,操作系统会放下一个排队的线程进来只有在 lock 和 unlock 之间,代码才是绝对安全的。这也是互斥锁被发明的唯一目的:保护临界资源。
单锁不够怎么办?
现在工作线程很守规矩了,每次去拿任务都会先加锁。但这又引出了一个致命的逻辑死局:如果当前队列是空的(主线程还没发任务),工作线程该怎么办?
-
死循环狂刷?
while (queue_size == 0) { /* 疯狂加锁、解锁、查看 */ }绝对不行!这叫忙等待(Busy Waiting)。几个线程会在空队列前疯狂空转,让 CPU 占用率瞬间飙升到 100%,极度浪费资源。风扇狂转的声音已经响起来了
-
睡一会儿再看?
if (queue_size == 0) { sleep(1); }也不行。如果刚睡下 0.01 秒主线程就发任务了,系统就要白白干等 1 秒钟,响应极慢。用户已经开始砸键盘了
只靠一把互斥锁,我们陷入了死局。我们需要一种全新的机制:当条件不满足(没有任务)时,让线程真正地挂起休眠(完全不占 CPU);而当条件满足(新任务到来)时,其他线程能精准地把它唤醒。
这就是操作系统为我们发明的神兵利器:条件变量 (Condition Variable, pthread_cond_t)。
你可以把条件变量理解为一个等待队列和一个唤醒信号。
当工作线程发现没任务时,就拿着互斥锁调用 pthread_cond_wait 挂起睡眠。当主线程往队列里塞了新任务后,就调用 pthread_cond_signal 或 pthread_cond_broadcast 发送信号,把挂起的工作线程唤醒起来干活。
唤醒信号有两种,叫错人导致整个程序死锁怎么办?
唤醒 API 有两种:pthread_cond_signal 只唤醒一个在等待的线程,而 pthread_cond_broadcast 唤醒所有等待的线程。
如果在复杂的逻辑里,不仅工作线程会因为没任务而挂起,主线程也会因为队列塞满而挂起(等待 not_full 条件)。
假设队列满了,主线程挂起了。此时两个工作线程 A 和 B 刚好干完活,队列空出了位置。工作线程 A 干完活后,如果只用 signal(随机唤醒一个线程),万一它唤醒的不是主线程,而是刚好也在挂起的工作线程 B,那主线程就永远挂死在那里了,整个程序彻底死锁!
核心观点 · broadcast
所以我们得出并发编程的一个重要观点:如果不确定该唤醒谁,总是使用 broadcast 唤醒所有潜在可能被唤醒的线程! 宁可错杀唤醒大家一起来抢锁,也不能漏掉那个真正该醒的线程。除了极度明确的单一场景,用 broadcast 永远是最安全的兜底方案。
wait 与 while
条件变量必须和互斥锁搭配使用!这是初学者最容易栽跟头的地方。
首先,我们要讲清楚 pthread_cond_wait(&cond, &mutex) 这个 API 到底在底层施了什么魔法。当你调用它时,它会在内核里原子地做两件事:
- 把你手里的互斥锁
mutex给解开(这样主线程才能拿到锁发任务)。 - 把你这个线程塞进等待队列挂起睡眠。 等到未来某一天,唤醒信号把你叫醒后,它又会在内部自动去重新抢回那把互斥锁。
看看正确的工作线程拿任务代码:
pthread_mutex_lock(&pool->mutex); // 1. 先加锁
// 2. 必须用 while 循环判断条件!不能用 if!
while (pool->queue_size == 0 && pool->stop == 0) {
// 3. 挂起睡眠。
// 注意:这句话会自动解开 mutex 锁!否则你拿着锁睡觉,主线程连发任务的锁都抢不到了!
// 等它被唤醒时,又会在内部自动重新抢回 mutex 锁!
pthread_cond_wait(&pool->not_empty, &pool->mutex);
}
// 4. 被叫醒了,而且 queue_size > 0 啦!从队列中取出任务...
pthread_mutex_unlock(&pool->mutex); // 5. 关键:拿到任务后,第一时间先解锁!!!
// 6. 真正开始干活(千万别拿着锁干活,不然多线程就退化成串行了!)
task.fn(task.arg);为什么醒来后还要用 while 循环再查一遍队列是不是空的?
想象这个极其真实的场景:
- 工作线程 A 在等待队列里睡眠。
- 主线程放了一个新任务,发送了信号,唤醒了工作线程 A。
- A 准备去抢互斥锁(
mutex)。 - 就在这短短的几毫秒内,一个刚刚干完活、刚好路过的工作线程 B,顺手抢先一步拿到了锁,把那个新任务秒抢走执行了!
- A 终于抢到了锁,走到队列一看:空空如也!
这就是所谓的虚假唤醒 (Spurious Wakeup)。你被叫醒了,但不代表轮到你拿锁的时候,条件依旧满足(任务还在)。
所以,A 醒来后的第一件事,必须用 while 循环再看一眼:队列还是空的吗?如果是,那就得乖乖回去接着睡。这就让 while 循环成为了逻辑上的必然,而不是死记硬背的规则。
核心观点 · while 循环
在这里,我们引出并发编程中关于条件变量的另一条铁律(核心观点):
总是在唤醒后,用 while 循环再次检查同步条件! 千万别用 if。你必须假定这个世界充满了虚假唤醒。
优雅谢幕
最后,主线程决定销毁线程池了(调用 thread_pool_destroy)。它不能直接强杀线程,必须妥善安置。
1. 为什么需要 working_count?
在 thread_pool_wait 里,我们需要等所有任务完成。很多同学觉得只要 queue_size == 0(队列空了)就算做完了。
大家思考一个场景:主线程放了最后一个任务,被线程 A 拿走了,此时 queue_size 变成 0。主线程一看队列空了,以为搞定了准备销毁。但它没看到的是,A 还在满头大汗地跑着最后一个任务的代码呢!
所以只有当**“待办列表是空的” (queue_size == 0) 且 “没有线程正在执行任务” (working_count == 0)** 时,才是真正的万事大吉。
2. 怎么用 pthread_join 安全回收线程?
主线程先加锁设置 pool->stop = 1,然后用 pthread_cond_broadcast 把所有因为队列为空而在睡眠的工作线程全部叫醒。
工作线程醒来一看 stop == 1,就知道没活干而且要退出了,直接 return NULL; 结束生命周期。
主线程必须确保所有的工作线程都已经真正退出。怎么确认?用 pthread_join!
join 这个概念在并发编程里极其常见,你可以把它生动地理解为现实生活中的**“死等”**:
- NPY:等我洗个头就出门
NPY 用了join(洗头),你只能在楼下干等 - NPY:等我打完这局游戏就来
NPY 用了join(游戏) - 舍友:等我修好这个 bug 就吃饭
你只能饿着肚子join(修bug) - 导师:等我出差回来就讨论这个课题
课题进度被join(出差)彻底阻塞了
所以,pthread_join(thread, NULL) 的含义就是:等指定的线程彻底结束了,当前的代码才会继续往下走。
for (int i = 0; i < pool->thread_count; ++i) {
pthread_join(pool->threads[i], NULL);
}pthread_join 会让主线程阻塞等在这里,直到对应的线程跑完了最后一行代码彻底退出,主线程才会走向下一个继续等。确认所有线程都安全退出后,正式释放内存,结束程序!
你的任务 (TODO)
在 src/ch3/thread_pool.c 中,我们已经帮你搭好了骨架,并在关键的地方留下了 TODO 注释。
你需要完成:
thread_pool_worker:实现工作线程的逻辑。安全地等待任务、取出任务,修改queue_size/queue_head/working_count,并在干完活后发送信号更新状态。thread_pool_submit:实现主线程提交任务的逻辑。安全地把新任务塞进环形队列尾部(更新queue_tail和queue_size),并唤醒睡眠的工作线程。thread_pool_wait:让主线程在这里乖乖等所有的任务都被处理完。thread_pool_destroy:安全下班!通知所有线程退出,并用pthread_join回收它们。
卡住了?
这可能是你写过的最复杂的 C 语言多线程代码之一了。如果卡住了,多回头看看第四步条件变量配合 while 循环的那个例子,想想加锁和解锁的时机。
加油!写出自己的线程池,成就感绝对是无与伦比的!(๑•̀ㅂ•́)و✧