目录
一、信号量
1、概念
信号量(Semaphore)是一种用于解决多线程或多进程环境下共享资源同步问题的机制,最初由荷兰计算机科学家 Edsger Dijkstra 提出。在操作系统和并发编程中,信号量主要用于限制对有限资源的并发访问数量。
共享资源与临界资源: 当一个资源在同一时间内只能被一个执行流(例如线程或进程)访问时,我们就称这个资源为临界资源。为了保证数据一致性,对于这样的资源,我们需要实现互斥访问,即任何时候只允许一个执行流进入临界区(访问资源的那段代码)。信号量正是实现这一目的的工具之一。
资源计数与分配:
- 资源总数与剩余数量: 信号量其实就是一个带有计数功能的变量,它的值代表了可使用的资源数目。初始化时设置信号量的值即可表示总共有多少个资源可供使用。每次一个执行流请求资源时(执行 P 操作,也称为“wait”或“decrement”操作),信号量的值减 1,若此时信号量值为正,则请求成功;若为 0,则请求失败,执行流会被挂起,放入等待队列。
- 资源分配的保证: 当一个执行流成功执行 P 操作(semaphore -= 1)后,就认为它获得了资源的使用权。程序员不需要关心具体哪个资源被分配给了哪个执行流,只需要知道当前执行流已经获得了资源许可即可。
借助电影院购票场景理解:
- 假设电影院有 N 个座位,相当于有 N 个资源。信号量初始化为 N,代表全部座位可售。
- 每个观众(执行流)想要购票(获取资源)时,会调用类似于 P 操作的方法,售票系统会减少一个可用座位数,直至无座可售时,新来的观众将会等待。
- 观众看完电影离开(执行 V 操作,也称为“signal”或“increment”操作),则会释放座位,增加信号量的值,从而唤醒一个等待中的观众(执行流)。
信号量的使用理解: 在实际编程中,当我们声明并初始化一个信号量时,就已经确定了它所代表的资源总数。当线程调用信号量的 wait 函数时,它会尝试获取一个资源单位,如果资源充足,则获取成功并继续执行;如果资源不足,则线程被阻塞,直到有线程执行了 signal 函数释放了资源。程序员需要根据实际应用场景设计信号量的使用逻辑,确保资源的安全有效分配和释放。
2、使用
POSIX信号量和System V信号量都是用于同步多线程或多进程访问共享资源的机制,确保在并发环境下对资源的访问是有序和互斥的。尽管它们都服务于同样的目标,但在API设计和特性上存在一些差异。
POSIX信号量是POSIX标准的一部分,尤其适合于线程间的同步,当然也可以用于进程间同步,尤其是在多线程应用程序中更为常见。以下是信号量的一些关键操作:
初始化信号量:
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
这个函数用于创建一个新的信号量。参数
sem
是指向信号量对象的指针;pshared
参数表明信号量是否能在多个进程中共享,如果设置为0,则信号量只能在当前进程中被多个线程共享;如果设置为非零值(通常是一个常量PTHREAD_PROCESS_SHARED
),则信号量可以在多个进程间共享。value
参数则是信号量的初始值,表示可供访问的资源数量。销毁信号量:
int sem_destroy(sem_t *sem);
当不再需要信号量时,调用此函数将其销毁。只有当所有线程/进程都不再使用该信号量时,才能安全地销毁它。
等待信号量(降低信号量值):
int sem_wait(sem_t *sem); // 通常对应于 P 操作
当调用
sem_wait
函数时,如果信号量的值大于0,则将其减1并允许调用线程继续执行;如果信号量值为0,则线程会被阻塞,直到其他线程或进程调用sem_post
使其值增大为止。发布信号量(增加信号量值):
int sem_post(sem_t *sem); // 通常对应于 V 操作
当调用
sem_post
函数时,信号量的值会增加1。如果有其他线程正在等待该信号量(因为之前信号量值为0),那么其中一个等待的线程将被唤醒并允许继续执行。
二、基于环形队列的生产消费模型
1、概念
在环形缓冲区的生产者-消费者问题中,我们借助信号量同步机制来有效协调两个角色的操作,以确保数据的一致性和正确性,同时充分利用并发执行的优势。具体而言,我们希望:
互斥同步控制:在生产者和消费者操作即将触碰到缓冲区的同一位置时,必须确保这两者之间实现互斥关系,即在同一时刻,只能有一个线程执行操作(要么是生产者写入数据,要么是消费者读取数据),以此避免因并发访问引发的数据冲突。
条件约束:
- 当缓冲区已满时,生产者进程将会被阻止继续添加数据,直到消费者进程完成数据消费并腾出空间。此时,通过降低spacesem的值,生产者会暂停在P(spacesem)操作上,等待消费者释放空间。
- 当缓冲区为空时,消费者进程则无法继续提取数据,直到生产者填充新的数据进来。此刻,消费者会在P(datasem)操作上等待,直到生产者增加了datasem的值,表明有数据可供消费。
并发执行优化:在缓冲区既不是满也不是空的状态下,即有一定空间剩余又有一定数据待处理时,我们期望生产者和消费者能够并发执行,以提高系统整体性能。
为实现以上目标,我们可以使用信号量来控制资源访问:
空间信号量(spacesem):
spacesem
代表剩余空间资源的信号量,初始值为 N(即缓冲区大小)。生产者在向缓冲区添加元素前会调用 P 操作(也叫 wait 或 acquire 操作)来检查是否有足够的空间,这会导致spacesem
的值减一。如果缓冲区未满,则生产者可以在特定位置进行生产;生产完成后,通过 V 操作(也叫 signal 或 release 操作)增加spacesem
的值,表示已使用的空间减少,可供生产的空间增多。数据信号量(datasem):
datasem
代表可用数据资源的信号量,初始值为 0。消费者在从缓冲区读取元素前会调用 P 操作来检查是否有可消费的数据,这会导致datasem
增加。如果缓冲区内有数据可供消费,消费者可以消费特定位置的数据;消费完成后,通过 V 操作减少datasem
的值,表示缓冲区中的有效数据减少,更多的空间等待被填充。
通过精心配置spacesem(剩余空间信号量)和datasem(有效数据信号量)的使用,我们能够实现以下理想效果:
- 防止套圈现象:确保生产者不会覆盖消费者尚未处理的数据,即便在高并发环境中也能维持正确的数据顺序。
- 动态调度:
- 当缓冲区为空时,优先激活生产者线程,使其能够填充数据,而消费者线程将在尝试获取数据时因P(datasem)操作失败而进入等待状态。
- 当缓冲区满载时,消费者线程享有优先权,会先于生产者执行,从缓冲区移除数据,而生产者线程在尝试添加数据时因P(spacesem)操作受阻而暂时挂起。
- 并发执行机制:只要缓冲区并非处于满或空的边界状态,生产者和消费者就可以同时安全地执行各自的读写操作,充分挖掘并发执行所带来的效率提升。
2、实现
makefile
ring_queue:testQ.cc
g++ -o $@ $^ -lpthread
.PHONY: clean
clean:
rm -f ring_queue
sem.hpp信号量操作
sem.hpp
提供了一个简单的类 Sem
,用于实现基于操作系统接口的信号量。信号量是一种在多线程或多进程环境下进行同步和互斥控制的重要工具。
#ifndef _SEM_HPP_
#define _SEM_HPP_
// 包含标准输入输出流库,用于可能的日志记录或调试输出
#include <iostream>
// 包含POSIX信号量操作的头文件
#include <semaphore.h>
// 定义Sem类,它将包装一个POSIX信号量结构sem_t
class Sem
{
private:
// 内部存储的信号量对象
sem_t sem;
public:
// 构造函数,根据给定的初始值初始化信号量
Sem(int value)
{
// 使用sem_init函数初始化信号量,第二个参数为0表示该信号量不是进程间共享的
sem_init(&sem, 0, value);
}
// 下降操作(wait/pthread_sem_wait):减少信号量的计数,若计数非正则阻塞当前线程
void p(){sem_wait(&sem);}
// 上升操作(post/pthread_sem_post):增加信号量的计数,可能唤醒一个等待的线程
void v(){sem_post(&sem);}
// 析构函数,在对象销毁时清理信号量资源
~Sem()
{// 使用sem_destroy函数销毁信号量
sem_destroy(&sem);
}
};
#endif // _SEM_HPP_
首先,通过预处理器宏防止多次包含该头文件 (
#ifndef _SEM_HPP_
,#define _SEM_HPP_
)。然后引入了
<iostream>
和<semaphore.h>
头文件,其中<semaphore.h>
提供了POSIX信号量相关的函数。定义了一个名为
Sem
的类,它封装了 POSIX 的sem_t
类型信号量。类中有一个私有成员变量
sem
,它是实际的 POSIX 信号量对象。构造函数
Sem(int value)
初始化信号量,传入的value
表示信号量的初始值。成员函数
p()
对应于 POSIX 函数sem_wait()
, 用于减一信号量,如果信号量小于零,则阻塞当前线程直到其他线程对信号量执行v()
操作。成员函数
v()
对应于 POSIX 函数sem_post()
, 用于增加信号量的值,唤醒一个因调用p()
而被阻塞的线程(如果有的话)。析构函数
~Sem()
在类实例销毁时调用sem_destroy()
函数,释放与信号量关联的系统资源。
通过这个 Sem
类,用户可以更方便地在 C++ 中使用信号量进行线程同步操作,避免直接调用 POSIX API 函数。在提供的 RingQueue.hpp
文件中,就使用了这个 Sem
类来同步生产和消费操作,确保环形队列在多线程环境下的正确使用。
ringQueue.hpp环形队列
ringQueue.hpp
定义了一个模板类 RingQueue
,这是一个用于多线程环境中的环形队列数据结构。
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_
// 包含必要的头文件,包括iostream用于输出、vector用于动态数组存储队列元素、pthread.h用于线程操作,以及自定义的信号量类sem.hpp
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
// 定义默认队列容量
const int g_default_num = 5;
// 定义泛型环形队列模板类,其中T为存储在队列中的元素类型
template <class T>
class RingQueue
{
public:
// 构造函数,接受一个可选的默认容量值,默认为g_default_num
RingQueue(int default_num = g_default_num)
: _ring_queue(default_num), // 初始化队列容器
_num(default_num), // 存储实际容量
c_step(0), // 消费者索引
p_step(0), // 生产者索引
space_sem(default_num), // 空间信号量,初始值为容量
data_sem(0) // 数据信号量,初始值为0
{
// 初始化两个互斥锁,用于保护队列访问的线程安全性
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
// 析构函数,在销毁对象时清理资源
~RingQueue()
{
pthread_mutex_destroy(&clock); // 销毁消费者互斥锁
pthread_mutex_destroy(&plock); // 销毁生产者互斥锁
}
// 入队操作,将给定的元素放入队列
void push(const T &in)
{
// 信号量减1,表明需要占用一个存储空间
space_sem.p();
// 加锁生产者互斥锁以确保原子性
pthread_mutex_lock(&plock);
// 将元素存入队列,并使生产者索引循环递增
_ring_queue[p_step++] = in;
p_step %= _num; // 确保索引始终在有效范围内循环
// 解锁生产者互斥锁
pthread_mutex_unlock(&plock);
// 增加数据信号量,表明队列中有新数据可供消费
data_sem.v();
}
// 出队操作,从队列中移除并返回一个元素
void pop(T *out)
{
// 信号量减1,表明需要获取一个数据
data_sem.p();
// 加锁消费者互斥锁以确保原子性
pthread_mutex_lock(&clock);
// 从队列中取出元素并将其赋值给传入的指针,然后更新消费者索引
*out = _ring_queue[c_step++];
c_step %= _num; // 确保索引始终在有效范围内循环
// 解锁消费者互斥锁
pthread_mutex_unlock(&clock);
// 增加空间信号量,表明队列中腾出了一个新的存储空间
space_sem.v();
}
private:
// 使用vector作为底层实现的环形队列
std::vector<T> _ring_queue;
// 队列的实际容量
int _num;
// 消费者和生产者的当前索引
int c_step;
int p_step;
// 使用自定义的Sem类管理的空间和数据信号量
Sem space_sem; // 控制队列剩余空间
Sem data_sem; // 控制队列中待消费的数据数量
// 用于同步线程访问队列的互斥锁
pthread_mutex_t clock; // 消费者互斥锁
pthread_mutex_t plock; // 生产者互斥锁
};
#endif // _RING_QUEUE_HPP_
模板类:
RingQueue
是一个模板类,允许存储任何类型的元素(由模板参数T
指定)。环形队列结构:队列内部采用一个动态分配的
std::vector<T>
_ring_queue
来模拟环形队列的空间结构。同步机制:
- 使用自定义的
Sem
类(来自sem.hpp
文件)来管理信号量,其中space_sem
用于表示队列剩余空间的数量,data_sem
用于表示队列中有多少可用数据。 - 使用
pthread_mutex_t
类型的clock
和plock
互斥锁来保护队列状态的修改,分别对应于队列的读取和写入操作。
- 使用自定义的
构造函数:
RingQueue
类的构造函数接受一个可选参数default_num
,用来初始化队列的大小,默认为全局常量g_default_num
设置的值(这里为5)。同时,在构造函数中初始化信号量和互斥锁,并设置步进索引c_step
和p_step
为0。成员函数:
push(const T &in)
:向队列中添加一个元素。首先通过space_sem.p()
函数等待有足够的空间,然后锁定写入互斥锁plock
,将元素加入队列,并更新写入索引p_step
。最后,通过data_sem.v()
唤醒等待获取数据的线程。pop(T *out)
:从队列中移除并返回一个元素。首先通过data_sem.p()
函数等待队列中有数据可读,然后锁定读取互斥锁clock
,读取并移除队列中的元素,并更新读取索引c_step
。最后,通过space_sem.v()
唤醒等待插入数据的线程。
析构函数:在销毁
RingQueue
对象时,会调用析构函数,释放之前初始化的两个互斥锁。
此环形队列设计能够有效地在多线程环境中实现在生产者线程和消费者线程之间的同步,从而避免数据竞争和死锁等问题。在测试程序 testQ.cc
中,可以看到多个消费者线程和生产者线程交替执行,共享同一个 RingQueue
对象进行数据的生产和消费。
testQ.cc主函数
#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
// 定义消费者函数,其任务是从环形队列中取出并消耗数据
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (1)
{
sleep(1); // 消费者每秒从队列中取出一个数据
int x;
rq->pop(&x); // 从队列中弹出数据
// 输出消费的数据及当前线程ID
std::cout << "消费: " << x << "[" << pthread_self() << "]" << std::endl;
}
}
// 定义生产者函数,其任务是生成随机数据并将其放入环形队列
void *producer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (1)
{
//sleep(1); // 生产者可以随时产生数据(注释掉睡眠是为了加快演示速度)
int x = rand() % 100 + 1; // 生成1到100之间的随机数
// 输出生产的随机数及当前线程ID
std::cout << "生产: " << x << "[" << pthread_self() << "]" << std::endl;
rq->push(x); // 将随机数推入环形队列
}
}
// 主函数,创建并初始化环形队列,同时启动多个消费者线程和生产者线程
int main()
{
// 初始化随机数种子以确保每次运行时产生的随机数不同
srand((unsigned)time(nullptr) ^ getpid());
// 创建一个环形队列实例
RingQueue<int> *rq = new RingQueue<int>();
// pthread_t p, c;
// pthread_create(&c, nullptr, consumer, (void *)rq);
// pthread_create(&p, nullptr, producer, (void *)rq);
// pthread_join(p, nullptr);
// pthread_join(c, nullptr);
// 创建多个消费者线程和生产者线程
pthread_t c[3], p[2];
pthread_create(c, nullptr, consumer, (void *)rq);
pthread_create(c + 1, nullptr, consumer, (void *)rq);
pthread_create(c + 2, nullptr, consumer, (void *)rq);
pthread_create(p, nullptr, producer, (void *)rq);
pthread_create(p + 1, nullptr, producer, (void *)rq);
// 等待所有消费者线程完成
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
// 等待所有生产者线程完成
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
return 0; // 主程序结束
}
首先,包含了必要的头文件:
"ringQueue.hpp"
:包含了前面定义的环形队列模板类RingQueue
。<cstdlib>
、<ctime>
、<sys/types.h>
和<unistd.h>
:这些头文件提供了随机数生成、线程创建与管理、以及进程ID和休眠等函数的支持。
接下来,定义了两个线程函数:
consumer(void *args)
:代表消费者线程。该函数从环形队列RingQueue<int>
中取出一个整数值并打印出来,然后休眠一秒以模拟消耗处理时间。producer(void *args)
:代表生产者线程。该函数生成一个随机整数(范围在1到100之间),打印出这个数值,并将其推入环形队列中。
在 main()
函数中:
- 初始化随机数种子,结合当前时间和进程ID确保不同运行之间的随机性。
- 创建一个
RingQueue<int>
实例,用于存储整数数据。 - 创建多个消费者线程(这里是3个)和生产者线程(这里是2个),它们都传递同一个
RingQueue<int>
实例作为参数。 - 使用
pthread_create()
创建并启动线程。 - 使用
pthread_join()
等待所有线程完成工作,即等待所有消费者线程消费完数据,并且生产者线程停止生产。
当程序运行时,生产者线程不断生成随机数并放入队列,而消费者线程则不断地从队列中取出数据并打印。由于使用了信号量 space_sem
和 data_sem
进行同步控制,确保了在生产与消费过程中不会发生竞争条件或死锁等问题。输出结果显示了生产者线程生产的数字和消费者线程消费的数字,以及各自线程的标识符。
[hbr@VM-16-9-centos RingQueue]$ ./ring_queue
生产: 87[139929069377280]
生产: 6[139929069377280]
生产: 37[139929069377280]
生产: 95[139929069377280]
生产: 16[139929069377280]
生产: 56[139929069377280]
生产: 2[139929060984576]
消费: 87[139929077769984]
生产: 66[139929069377280]
消费: 6[139929086162688]
生产: 77[139929060984576]
消费: 37[139929094555392]
生产: 95[139929069377280]
消费: 95[139929077769984]
生产: 62[139929060984576]
消费: 16[139929086162688]
生产: 32[139929069377280]
消费: 56[139929094555392]
生产: 42[139929060984576]
消费: 2[139929077769984]生产:
22[139929069377280]
消费: 66[139929086162688]
生产: 37[139929060984576]
消费: 77[139929094555392]
生产: 1[139929069377280]
消费: 95[139929077769984]
生产: 62[139929060984576]
消费: 62[139929086162688]
生产: 65[139929069377280]
消费: 32[139929094555392]
生产: 87[139929060984576]
^C
3、多生产多消费的意义
多生产多消费的模式强调的是如何有效地将任务或数据从私人化状态转换为公共状态,并进一步快速分发给多个处理单元,以此实现最大程度的并发执行、资源优化和系统稳定性增强。
高效的任务调度与执行: 生产过程本质上是将个体或独立模块生成的“私有”任务或数据转化为可供全局访问和处理的“公共资源”。多生产意味着多个源头能同时生成任务或填充数据池,加快任务或数据产生的总体效率。
消费过程则是从公共资源池中获取这些任务或数据,并将其转移到各个“私有”的执行环境进行处理。多消费则保证了多个消费者能够并行地处理这些任务,提高了任务消化和处理的整体性能。
灵活的负载分布与适应性: 在这一模式下,生产速率和消费速率不再受限于单个节点的能力,而是可以通过多个生产者和消费者的协同来达到最优平衡。当生产速度波动时,公共空间(如队列、缓冲区等)可以起到平滑波动的作用,保持系统稳定运行。
降低耦合度与提高响应速度: 生产者和消费者之间通过共享的中间媒介进行交互,无需直接依赖对方的状态或行为,这极大减少了各部分之间的耦合程度。同时,一旦数据或任务进入公共空间,消费者就可以立即开始处理,减少了等待时间和延迟。
充分利用硬件资源: 通过多生产多消费模型,可以更好地利用多核处理器或多台计算机的计算能力,减少空闲资源,从而提高系统的整体效能。
4、信号量意义
// 信号量本质是一把计数器-> 计数器的意义是什么??可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断! // 申请锁 -> 判断与访问 -> 释放锁 --> 本质是我们并不清楚临界资源的情况!! // 信号量要提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况! 讲解一下
信号量作为一种同步机制,其本质不仅仅是一把计数器,更是一种智能计数器,它的核心价值在于:
资源管理透明化:
- 计数器的值代表了可供使用的资源数量。例如,对于一个信号量来说,计数值可以反映某种临界资源(如内存块、文件句柄、许可等)当前的可用数量。
- 通过查看信号量的计数值,进程或线程能够在进入临界区之前就获知资源是否可用,而不必直接去检查具体的资源状态,从而减少了对临界区内部状态的不确定性和复杂判断。
简化同步逻辑:
- 在传统的“申请锁 -> 判断资源 -> 访问资源 -> 释放锁”的流程中,每次进程或线程想要访问资源时都需要执行一套完整的加锁和解锁操作,并且在临界区内还要做额外的资源可用性检查。
- 而使用信号量时,只需调用相应的PV操作(P操作:wait,V操作:signal)即可。P操作会尝试减少信号量的计数值(如果计数值为0,则阻塞等待),V操作则会增加计数值(唤醒等待队列中的一个进程或线程)。
- 这样,进程在试图获得资源时,不需要了解其他进程的状态细节,仅通过信号量的原子操作即可完成资源的获取和释放。
并发控制:
- 当信号量的值为非负数时,表明还有资源可用,线程可以直接进行资源访问,否则会被挂起,直到其他线程释放资源增加了信号量的计数值。
- 信号量可以实现更复杂的同步策略,不仅限于一对一的互斥访问,还可以控制同时访问同一资源的并发线程数,适用于有限资源的共享和多路同步等问题。
因此,信号量通过计数器有效地解决了资源的并发访问问题,使得资源管理更加高效和可控,同时也降低了程序设计和实现的复杂度。
5、完整无注释代码
//sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_
#include <iostream>
#include <semaphore.h>
class Sem
{
private:
sem_t sem;
public:
Sem(int value){sem_init(&sem,0,value);}
void p(){sem_wait(&sem);}
void v(){sem_post(&sem);}
~Sem(){sem_destroy(&sem);}
};
#endif
//ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int g_default_num = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
: _ring_queue(default_num),
_num(default_num),
c_step(0), p_step(0),
space_sem(default_num),
data_sem(0)
{
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
~RingQueue()
{
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&plock);
}
void push(const T &in)
{
space_sem.p();
pthread_mutex_lock(&plock);
_ring_queue[p_step++] = in;
p_step %= _num;
pthread_mutex_unlock(&plock);
data_sem.v();
}
void pop(T *out)
{
data_sem.p();
pthread_mutex_lock(&clock);
*out = _ring_queue[c_step++];
c_step %= _num;
pthread_mutex_unlock(&clock);
space_sem.v();
}
private:
std::vector<T> _ring_queue;
int _num;
int c_step;
int p_step;
Sem space_sem;
Sem data_sem;
pthread_mutex_t clock;
pthread_mutex_t plock;
};
#endif
//testQ.cc
#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (1)
{
sleep(1);
int x;
rq->pop(&x);
std::cout << "消费: " << x << "[" << pthread_self() << "]" << std::endl;
}
}
void *producer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (1)
{
//sleep(1);
int x = rand() % 100 + 1;
std::cout << "生产: " << x << "[" << pthread_self() << "]" << std::endl;
rq->push(x);
}
}
int main()
{
srand((unsigned)time(nullptr) ^ getpid());
RingQueue<int> *rq = new RingQueue<int>();
// pthread_t p, c;
// pthread_create(&c, nullptr, consumer, (void *)rq);
// pthread_create(&p, nullptr, producer, (void *)rq);
// pthread_join(p, nullptr);
// pthread_join(c, nullptr);
pthread_t c[3], p[2];
pthread_create(c, nullptr, consumer, (void *)rq);
pthread_create(c + 1, nullptr, consumer, (void *)rq);
pthread_create(c + 2, nullptr, consumer, (void *)rq);
pthread_create(p, nullptr, producer, (void *)rq);
pthread_create(p + 1, nullptr, producer, (void *)rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
return 0;
}
三、线程池
1、概念
线程池是一种高效的多线程编程模型,其核心思想是预先创建一组固定的线程,并让它们等待任务到来。当有新的任务需要执行时,不是每次都创建新的线程,而是从线程池中选择一个空闲的线程来执行任务。这种方式可以极大地减少系统创建和销毁线程的开销,同时提高系统的响应速度和资源利用率。
线程池主要优点如下:
降低系统开销:频繁地创建和销毁线程会产生一定的系统开销,包括内存分配、上下文切换等。线程池通过重用已存在的线程来减少这部分开销。
提高效率:对于CPU密集型任务,线程池可以更好地利用多核处理器的优势,使得各个内核能均衡地处理任务;而对于IO密集型任务,线程池也能充分利用线程等待IO完成时的空闲时间去处理其他任务,从而提升总体性能。
资源控制:通过限制线程池中线程的数量,可以根据系统的实际情况(如CPU核心数、内存大小)合理控制并发程度,防止因大量并发导致的资源竞争和过度调度。
线程池的应用场景:
Web服务器:对于快速响应用户请求的服务端应用,例如Web服务器处理HTTP请求,任务通常较短且数量巨大,采用线程池能够快速调度线程处理请求,提高服务器吞吐量。
性能敏感场景:对性能要求极高的应用,通过线程池可以高效利用系统资源,保证服务稳定且高性能运行。
应对突发流量:对于可能出现突发大流量请求的情况,线程池可以平滑地处理这些请求,防止瞬间创建大量线程导致的系统资源耗尽问题。
线程池主要有两种常见的实现方式:
固定大小线程池:预先设定好线程池中线程的数量,在接收到任务时,空闲线程会立即执行任务,若所有线程都在工作,则新任务会被放入任务队列中等待,直到有线程空闲下来。
可伸缩线程池:根据系统负载或任务队列长度动态调整线程池的大小,可以在一定程度上自动适应不同工作负载情况下的需求。
2、实现思路
我们将实现一个基于POSIX线程库(pthread)实现的C++多线程程序框架,包括了线程池(ThreadPool)、互斥锁(Mutex)、自动锁(lockGuard)和日志系统(log)等功能组件。
下面对各个组件的主要功能和实现思路:
Mutex: 这是一个封装了pthread_mutex_t的类,提供了lock和unlock接口,用于控制临界区的访问。当多个线程需要访问共享资源时,必须先获取锁才能进入临界区,离开时释放锁,以避免竞态条件和数据冲突。
lockGuard: 这是一个RAII(Resource Acquisition Is Initialization)风格的类,构造时自动调用Mutex的lock方法锁定互斥量,在析构时自动调用unlock方法解锁。这样可以确保在作用域结束时,即使发生异常也能保证锁会被正确释放,提高了代码的安全性和健壮性。
log:定义了日志级别和日志输出格式,实现了日志记录功能。logMessage函数接收日志级别、格式字符串和可变参数列表,生成完整的日志信息并写入到指定的日志文件中。在实际应用中,可以根据日志级别过滤不需要输出的日志信息。
Task:表示待执行的任务,包含任务处理所需的参数(x, y)和一个Lambda函数对象(func_),重载了operator()使其能够在线程中被调用执行。
Thread:封装了pthread线程,包含了线程创建、启动、加入(等待结束)等功能。在构造时初始化线程名和回调函数,并在start方法中调用pthread_create创建线程。
ThreadPool:线程池模板类,负责管理和调度一组工作线程执行任务。主要包括以下几个核心部分:
- 初始化线程池时创建指定数量的工作线程,并启动。
- 使用互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)实现任务队列的线程安全访问。
- 生产者(main函数)通过pushTask方法将新任务放入任务队列。
- 消费者(工作线程)在routine方法中循环检查任务队列,发现任务时取出并执行,执行完一个任务后继续检查下一个任务。
3、具体实现
lockGuard.hpp
lockGuard.hpp
是一个用于简化互斥锁管理的C++头文件,它采用RAII(Resource Acquisition Is Initialization)设计模式,通过构造函数自动锁定互斥量,并在析构函数中自动解锁互斥量,从而确保即使发生异常,也能正确地释放资源。
#pragma once
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
Mutex 类:
- Mutex 类是对 POSIX 线程库中关键同步机制——互斥锁(pthread_mutex_t)的一种高级封装。在构造阶段,该类接收一个指向互斥锁的指针,并将其存储于成员变量 pmtx_ 中以便后续操作。
- Mutex 类提供了 lock() 和 unlock() 成员函数,分别用于获取互斥锁(阻塞直到成功)和释放互斥锁,这两种操作是多线程环境中进行临界区同步的基础。
lockGuard 类:
- lockGuard 类遵循 C++ 中经典的 Resource Acquisition Is Initialization (RAII) 设计模式,是一种智能辅助类,用于自动管理互斥锁的生命周期。在构造时,lockGuard 类型的对象接收一个指向互斥锁的指针,并在其内部立即调用互斥锁的 lock() 方法,确保在对象创建的同时锁定互斥锁资源。
- 当 lockGuard 对象生命周期结束,例如离开当前作用域时(如函数返回、遇到异常等情况),编译器会自动调用该对象的析构函数。在此析构过程中,lockGuard 会调用互斥锁的 unlock() 方法来解除锁定状态。这一机制确保了在任何情况下,只要 lockGuard 对象的作用域结束,关联的互斥锁都能够及时释放,有效防止了因手动管理锁而导致的竞态条件、死锁等问题
通过这种方式,当你在代码块的开始处创建一个 lockGuard
对象时,能够确保该代码块内的所有操作都在互斥锁保护下执行,而在代码块结束时,无论是否抛出异常,互斥锁都会被正确地释放,从而避免了手动管理锁可能导致的资源泄露或死锁问题。例如:
void someFunction()
{
pthread_mutex_t myMutex;
// 初始化互斥锁...
{
lockGuard lock(&myMutex); // 在这里加锁
// ...在此区域内的代码将在互斥锁保护下执行...
} // 当离开此代码块时,lockGuard对象会被销毁,自动解锁互斥锁
// 此时互斥锁已被释放
}
makefile
thread_pool:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:
rm -f thread_pool
log.hpp
log.hpp
提供了一个基本的日志框架,可用于多线程环境下的日志记录,支持不同级别的日志输出,并且日志信息包含了时间戳、日志级别以及自定义信息。并且采用了文件追加的方式写入日志,因此可以持续记录应用程序运行过程中的事件信息,方便后期排查问题或监控系统状态。
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
日志级别定义:
这里定义了五种日志级别,分别为DEBUG、NORMAL、WARNING、ERROR和FATAL,并用一个字符数组gLevelMap
对应存储了这些级别的字符串表示形式。
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
LOGFILE
宏被定义为一个表示日志文件路径的字符串常量。这意味着程序在运行过程中产生的日志将会被写入到名为 threadpool.log
的文件中,
#define LOGFILE "./threadpool.log"
处理可变数量参数相关函数
va_list
, va_start
, va_arg
, 和 vsnprintf
是 C 语言中用于处理可变数量参数的机制相关的函数和宏。这些函数和宏主要用于编写可以接受任意数量参数的函数,如格式化输出函数(如 printf
的变体)。下面分别解释每个术语:
va_list:
va_list
是一个类型名,通常定义为某种类型的指针(具体类型取决于编译器实现)。它用于保存函数内部可变参数列表的状态。当你需要在一个函数内遍历那些未知数量的参数时,会声明一个va_list
类型的变量来指向这些参数。va_start: 这是一个宏,用于初始化
va_list
变量,使其指向第一个可变参数。它的语法通常是:va_start(va_list variable, last_fixed_argument);
其中,
variable
是你要使用的va_list
变量名,last_fixed_argument
是函数固定参数列表中的最后一个已知参数(通常是在变参前的最后一个参数)。va_arg: 这也是一个宏,用于从
va_list
中依次取出下一个参数。每次调用va_arg
,它都会返回当前指针指向的参数,并将其内部指针移动到参数列表中的下一个位置。其使用方式如下:type argument = va_arg(va_list variable, type);
其中,
type
是你想要提取的参数的类型,variable
是之前通过va_start
初始化过的va_list
。va_end: 这也是一个宏,用于清理
va_list
变量,在不再需要访问可变参数列表后调用。虽然在logMessage
函数中没有直接使用va_end
,但在处理完可变参数后应调用此宏以正确清理状态。vsnprintf: 这是一个安全版本的格式化输出函数,类似于
sprintf
,但提供了长度检查,防止缓冲区溢出。int vsnprintf(char *str, size_t size, const char *format, va_list ap);
它接受一个指向缓冲区的指针,缓冲区的最大大小,以及一个格式化字符串和一个
va_list
参数,用于从变参列表中读取和格式化数据。函数返回实际需要的字符数(不包括结束符\0
),如果超过缓冲区大小,则不会写出超出部分。示例:
#include <stdio.h> #include <stdarg.h> void my_printf(const char *fmt, ...) { va_list args; va_start(args, fmt); char buffer[1024]; int result = vsnprintf(buffer, sizeof(buffer), fmt, args); va_end(args); // 输出格式化后的字符串到标准输出或其他地方 printf("%s", buffer); } int main() { my_printf("Hello, %s! The number is %d.\n", "World", 42); return 0; }
在这个例子中,
my_printf
使用va_start
初始化va_list
,接着用vsnprintf
格式化并存入缓冲区,最后用va_end
清理va_list
。这样就可以处理类似printf
的可变数量和类型的参数了。
日志输出函数 logMessage
:
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
- 关键参数:
level
: 指定日志级别,例如 DEBUG、NORMAL、WARNING、ERROR 或 FATAL 等,这些级别事先已在全局数组gLevelMap
中预定义,以对应不同的严重性层次。format
: 提供一个类似于 C 语言printf
函数的格式化字符串,用以精确控制用户自定义日志内容的布局和样式。...
: 可变参数列表,与format
字符串协同工作,实际填充日志消息中的具体内容。
- 函数执行流程如下:
在记录任何日志之前,首先检查当前设置是否启用了 DEBUG 级别的日志输出。若 DEBUG 级别未被启用,或者当前日志级别不是 DEBUG,则针对非 DEBUG 级别的日志,函数将继续执行记录操作。
函数内部分配了两个具有固定大小的字符缓冲区,分别是
stdBuffer
和logBuffer
。stdBuffer
负责存储规范化的日志头部信息,包含日志级别标识及当前 Unix 时间戳;而logBuffer
则用于承载用户定制的、格式化后的日志内容,其中包括可能的文件名和行号等详细信息。使用
va_start
宏初始化va_list
变量args
,以便能逐个访问传递给logMessage
函数的所有可变参数。随后,通过调用
vsnprintf
函数,依据format
字符串及其对应的args
变量,将可变参数按预期格式填充至logBuffer
中。填充完成后,必需调用
va_end
宏来正确清理va_list
变量,这是遵循 C/C++ 可变参数编程规范的重要环节。接下来,函数以追加模式打开预设的日志文件
LOGFILE
,并在其中写入日志内容。首先,将日志头部(包括日志级别和时间戳)格式化并存储进stdBuffer
,然后将stdBuffer
和已经填充好内容的logBuffer
合并成完整的日志条目。最终,通过
fprintf
函数将合并后的日志信息一次性写入文件,并确保在完成写入后关闭日志文件,以此确保日志记录过程的流畅性和一致性。
Task.hpp
Task.hpp
主要定义了一个用于表示和执行简单计算任务的类,其中的任务可以通过传递给类构造函数的函数对象进行定制。在实际运行时,这些任务将由线程池中的线程调度执行。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
// 定义一个函数指针类型,用于表示接受两个int参数并返回一个int值的函数
typedef std::function<int(int, int)> func_t;
class Task
{
public:
// Task类用于封装一个计算任务,包含操作数及执行计算的函数
Task() {} // 默认构造函数,不传入任何参数
Task(int x, int y, func_t func) : x_(x), y_(y), func_(func) {} // 构造函数,传入操作数x和y以及计算函数func
// 重载运算符(),使得可以通过Task对象像函数一样调用,传入线程名称来执行任务并记录日志
void operator ()(const std::string &name)
{
// 原本这里是输出结果,现在改为使用logMessage记录日志
// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
// 任务的操作数
int x_;
int y_;
// 任务的计算函数,采用std::function模板类存储,便于传入任意满足签名要求的函数
func_t func_;
};
类定义:
Task
类是一个简单的任务容器,它持有两个整数变量x_
和y_
以及一个函数对象func_
。x_
和y_
可能代表任务需要处理的数据,而func_
则是一个std::function
类型的对象,用于存储任务的实际执行逻辑。
构造函数:
Task()
是一个默认构造函数,不带任何参数。- 另一个构造函数接受两个整数参数
x
和y
,以及一个函数对象func
。在构造时,它将这些参数赋值给对应的成员变量。
重载运算符 ()
:
- 重载了运算符
()
,使得Task
类的对象可以像函数一样被调用。 - 当调用
task_instance("thread_name")
时,它会执行相应的日志记录操作,并调用func_
执行任务的主体逻辑。 - 在代码中,它会使用
logMessage
函数记录一条关于任务完成的消息,消息内容包括线程名称、操作数和计算结果,并附带当前文件名和行号。
数据成员:
int x_
和int y_
分别表示任务需要处理的两个整数值。func_t func_
是一个std::function<int(int, int)>
类型的成员,用来存储任务的具体计算逻辑,这里的计算逻辑是一个接收两个整数参数并返回一个整数的函数。
thread.hpp
thread.hpp
提供了一个基础的线程类 Thread
,它可以用来创建、启动、加入和命名线程,并通过 ThreadData
类将参数传递给线程执行函数。在实际应用中,如上述代码片段所示的线程池场景中,Thread
类实例将会作为工作线程,不断地从线程池中获取任务并执行。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 定义一个函数指针类型,用于表示线程执行的函数,该函数接受一个void*类型的参数并返回void*类型的结果
typedef void *(*fun_t)(void *);
// ThreadData类用于存储传递给线程的参数
class ThreadData
{
public:
// 线程参数
void *args_;
// 线程名称
std::string name_;
};
// Thread类用于创建和管理一个操作系统级别的线程
class Thread
{
public:
// Thread构造函数,接收线程编号(用于命名线程)、线程执行函数和传递给该函数的参数
Thread(int num, fun_t callback, void *args)
: func_(callback)
{
// 根据线程编号生成线程名称
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
// 保存参数
tdata_.args_ = args;
tdata_.name_ = name_;
}
// 开启线程执行
void start()
{
// 使用POSIX线程API创建并启动线程
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
// 等待线程结束
void join()
{
// 使用POSIX线程API等待线程结束
pthread_join(tid_, nullptr);
}
// 获取线程名称
std::string name()
{
return name_;
}
// 线程析构函数,确保线程资源正确释放(在当前情况下不需要显式做任何事情)
~Thread()
{}
private:
// 线程执行的函数指针
fun_t func_;
// 用于传递给线程函数的参数结构体
ThreadData tdata_;
// POSIX线程标识符
pthread_t tid_;
// 线程名称
std::string name_;
};
thread.hpp
文件定义了一个用于多线程编程的基本线程类(Thread
)以及辅助线程数据类(ThreadData
)。以下是这两个类的主要组成部分:
ThreadData 类:
class ThreadData
{
public:
void *args_;
std::string name_;
};
ThreadData
类是为了传递给线程初始化函数的辅助数据结构,它包含两个成员:
void *args_
:一个通用指针,用于传递任意类型的参数给线程函数。std::string name_
:线程的名称字符串。
Thread 类:
Thread
类是线程的主要容器,它封装了 POSIX 线程(pthread)的相关操作:
class Thread
{
public:
Thread(int num, fun_t callback, void *args) : func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
void start()
{
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
void join()
{
pthread_join(tid_, nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{}
private:
std::string name_;
fun_t func_;
ThreadData tdata_;
pthread_t tid_;
};
- 构造函数:接受三个参数,分别是线程编号(
num
)、回调函数指针(fun_t
类型,即线程执行体)和传递给线程函数的参数(void *args
)。构造函数会为线程生成一个名字,并将回调函数和参数保存在ThreadData
结构中。 start()
函数:负责创建并启动一个新的线程,使用pthread_create()
函数实现。join()
函数:等待指定线程终止,使用pthread_join()
函数实现。name()
函数:返回线程的名字。- 析构函数:清理资源,但在这里没有特殊操作,因为线程已经通过
join()
函数被正确地等待和终止。
threadPool.hpp
整个 ThreadPool
类的设计实现了这样一个系统:
- 外部使用者可以通过调用
pushTask()
方法将任务放入线程池,工作线程则通过routine()
不断地从任务队列中取出任务执行。 - 通过互斥锁和条件变量保证了多线程环境下的任务安全分配与执行。
- 在测试主程序
testMain.cc
中可以看到,主线程创建并启动线程池后,不断产生新的任务并推送到线程池中,线程池内的工作线程则异步地执行这些任务。 - 同时,还使用了日志模块记录相关的操作和信息。
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
// 定义默认线程数量常量
const int g_thread_num = 3;
// ThreadPool类模板,基于生产者-消费者模型,用于并发执行不同类型的任务(T)
template <class T>
class ThreadPool
{
public:
// 返回线程池内部使用的互斥锁
pthread_mutex_t *getMutex()
{
return &lock;
}
// 判断任务队列是否为空
bool isEmpty()
{
return task_queue_.empty();
}
// 线程等待条件变量,当任务队列为空时阻塞
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
// 从任务队列中取出并移除一个任务
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
// ThreadPool构造函数,初始化线程池并创建指定数量的工作线程
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
// 创建指定数量的工作线程,每个线程运行静态成员函数routine
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, &ThreadPool::routine, this));
}
}
// ThreadPool析构函数,确保所有工作线程正常结束并释放资源
~ThreadPool()
{
// 等待所有工作线程结束
for (auto &iter : threads_)
{
iter->join();
delete iter;
}
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
// 启动所有工作线程
void run()
{
for (auto &iter : threads_)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 静态成员函数,作为工作线程的执行入口
static void *routine(void *args)
{
// 解引用指向ThreadPool实例的指针
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
// 循环处理任务
while (true)
{
T task;
{
// 使用RAII的lockGuard对任务队列进行加锁
lockGuard lockguard(tp->getMutex());
// 等待任务队列非空
while (tp->isEmpty())
tp->waitCond();
// 从任务队列中取出一个任务
task = tp->getTask();
// 退出作用域后,lockGuard自动释放互斥锁,让其他线程有机会添加任务
}
task(td->name_);// 执行任务
// 继续下一轮循环等待新的任务
}
}
// 将新任务推送到线程池的任务队列中
void pushTask(const T &task)
{
// 使用RAII的lockGuard对任务队列进行加锁
lockGuard lockguard(&lock);
// 将任务压入任务队列,并发出条件变量信号,通知工作线程有新任务到来
task_queue_.push(task);
pthread_cond_signal(&cond);
}
private:
// 存储工作线程实例的容器
std::vector<Thread *> threads_;
// 工作线程的数量
int num_;
// 任务队列,用于存放待执行的任务
std::queue<T> task_queue_;
// 用于同步任务队列访问的互斥锁
pthread_mutex_t lock;
// 用于通知工作线程有新任务到来的条件变量
pthread_cond_t cond;
};
threadPool.hpp
文件定义了一个基于C++模板技术的线程池类 ThreadPool<T>
,它能灵活处理不同种类的任务(此处以模板参数 T
表示任务类型,例如 Task
类)。线程池的核心目标在于高效管理和调度一组线程,以按序执行任务队列中的各项任务,从而实现生产者-消费者模型。
核心成员变量
pthread_mutex_t lock
: 这个互斥锁用于同步对任务队列的操作,确保在多线程环境下对任务队列的访问具备线程安全性。pthread_cond_t cond
: 这个条件变量用于在任务队列为空时令工作线程进入等待状态,一旦有新任务加入队列,则唤醒相应的线程继续执行。std::vector<Thread*> threads_
: 用于存储所有工作线程实例的容器。int num_
: 表示线程池中线程的数量。std::queue<T> task_queue_
: 是一个存放待处理任务的先进先出(FIFO)队列。
构造函数
当创建线程池实例时,首先初始化互斥锁和条件变量,并根据用户指定的线程数创建相应数量的工作线程。每个工作线程内部均调用 routine
函数来执行任务。
关键成员函数
getMutex()
: 提供互斥锁的指针,便于在其它代码位置上进行加锁与解锁操作。isEmpty()
: 判断任务队列是否为空。waitCond()
: 让当前线程暂停并等待条件变量发出信号(即当任务队列中有新增任务时)。getTask()
: 从任务队列中取出并移除首个任务。run()
: 启动所有预先创建好的工作线程,各个线程将按照各自逻辑执行routine
函数。routine(void*)
: 这是一个静态成员函数,作为工作线程的实际执行逻辑。它负责从任务队列中取出任务并执行,因其需作为回调函数传递给pthread_create
函数,所以设计为静态成员函数。pushTask(const T&)
: 新增一项任务至任务队列,并通过pthread_cond_signal
发出信号通知任意一个正等待条件变量的工作线程,表明有新任务可供处理。
线程池的销毁
在线程池实例被销毁时,确保所有工作线程已完成其任务并退出,同时清理并释放相关资源,包括销毁互斥锁和条件变量。
综上所述,该线程池设计巧妙地运用了C++模板技术和POSIX线程API,构建出了一种适用于多线程环境的任务执行框架,有效提升了程序执行效率和资源使用率。此外,还无缝集成了日志模块以追踪和记录线程池的运行状况。
互斥和同步
在threadPool.hpp
中,线程池(ThreadPool
)使用了互斥锁(pthread_mutex_t
)和条件变量(pthread_cond_t
)来进行线程间的同步和通信。
互斥锁(
pthread_mutex_t lock;
):- 在线程池中,互斥锁用于保护任务队列(
std::queue<T> task_queue_
)的安全访问,确保在任何时候只有一个线程能够修改队列的状态。例如,在pushTask()
方法中,首先获取互斥锁,然后将新任务加入队列,最后释放锁。这样,当多个线程尝试同时向队列中添加任务时,互斥锁确保了每次只有一个线程能够执行此操作,从而避免了数据竞争和线程安全问题。
- 在线程池中,互斥锁用于保护任务队列(
条件变量(
pthread_cond_t cond;
):条件变量配合互斥锁使用,允许线程在某个特定条件未满足时挂起自身,当条件满足时由其他线程唤醒它们。在
ThreadPool
中,条件变量主要用于工作线程的调度。工作线程在
routine()
函数中会不断检查任务队列是否为空。如果队列为空,线程就会调用pthread_cond_wait(&cond, &lock);
,这时线程会释放掉之前获取的互斥锁,并进入等待状态,直到另一个线程调用pthread_cond_signal(&cond);
或pthread_cond_broadcast(&cond);
唤醒它。当主线程或其他线程调用
pushTask()
方法添加新任务时,不仅会将任务添加到队列中,还会在添加完任务后发送一个条件变量信号,通知等待的线程有新任务可以处理了。此时,处于等待状态的线程会被唤醒,重新获得互斥锁并从队列中取出任务执行。
总结起来,线程池利用互斥锁实现了任务队列的线程安全访问,而条件变量则是用来协调线程在任务队列为空时的等待行为,有效地实现了任务分配与执行之间的协同合作,提高了系统资源利用率和程序效率。
去掉routinewhile内部的大括号可以吗
去掉 routine
函数中 while 循环内部的大括号会导致 lockGuard 的作用范围扩大,也就是说,互斥锁在整个 while 循环内都会保持锁定状态,而非仅在检查和获取任务时锁定。这样的改动会导致以下问题:
效率降低:如果线程在获取任务后需要花费较长时间执行任务,那么在此期间,其他线程即使有新任务也无法向任务队列中添加,因为互斥锁一直被该线程持有。这会显著降低线程池的并发性和整体效率。
死锁风险:若在执行任务过程中涉及其他同步原语(例如条件变量或其他互斥锁),可能会导致死锁情况出现。因为此时线程不仅持有任务队列的锁,还可能尝试获取其他资源的锁,而其他线程可能正在等待此线程释放任务队列的锁。
因此,保留 while 循环内部的大括号是必要的,以确保互斥锁仅在必要的时候(即访问和操作任务队列时)被持有,从而提高并发性和避免潜在的死锁问题。
在
threadPool.hpp
中的routine
函数中,while 循环内部的大括号{}
包含了两段关键操作:
获取并锁定互斥锁(mutex)来保护任务队列(
task_queue_
)的安全访问:lockGuard lockguard(tp->getMutex());
这行代码创建了一个
lockGuard
对象,它在构造时自动调用pthread_mutex_lock()
对任务队列的互斥锁进行加锁,并且在其析构时(也就是离开大括号时)自动调用pthread_mutex_unlock()
解锁。这样确保了在对任务队列进行操作(如检查是否为空、获取任务)时,只有一个线程可以访问。在锁定互斥锁的情况下进行以下操作:
- 当任务队列为空时,调用
waitCond()
让线程进入等待状态,直到接收到信号(通过pthread_cond_signal()
)表示有新任务加入队列。 - 当队列中有任务时,从任务队列中取出一个任务(
task = tp->getTask();
)。
- 当任务队列为空时,调用
将这两步操作放在大括号内,意味着一旦任务被取出,互斥锁将立即得到释放,这样其他线程就可以在当前线程执行任务时安全地向任务队列添加新任务,而不必等待当前线程完成任务处理。这种设计有助于提高多线程环境下的并发效率,避免了因长期持有锁而导致的线程阻塞现象。
threadPool.hpp方案二
在这个修改后的版本中,我们引入了一个额外的互斥量 swapMutex
,用来保护队列切换操作的原子性。当生产者满足切换条件时,会通过原子标志 swap_flag
和互斥锁 swapMutex
安全地切换队列指针。消费者在获取任务前也会检查并处理队列切换。注意,此处仅给出了基本的切换逻辑框架,具体何时切换队列取决于您的实际需求。
#include <queue>
template <class T>
class ThreadPool
{
private:
std::queue<T> queue1, queue2;
std::queue<T> *p_queue{&queue1}, *c_queue{&queue2};
std::atomic<bool> swap_flag{false}; // 标记是否需要切换队列
// ... 其他已存在的成员变量和方法 ...
public:
// ... 已存在的公共方法 ...
void pushTask(const T &task)
{
lockGuard lockguard(getMutex());
p_queue->push(task);
if (/* 满足切换条件 */) // 示例中未给出确切的切换条件,请按实际情况设定
{
std::lock_guard<std::mutex> lock(swapMutex);
std::swap(p_queue, c_queue);
swap_flag.store(true, std::memory_order_release);
pthread_cond_broadcast(&cond);
}
else
{
pthread_cond_signal(&cond);
}
}
// 消费者获取并处理任务的方法,这里增加了对队列切换的处理
static void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->c_queue->empty() && !tp->swap_flag.load(std::memory_order_acquire))
tp->waitCond();
if (tp->swap_flag.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> lock(tp->swapMutex);
std::swap(tp->p_queue, tp->c_queue);
tp->swap_flag.store(false, std::memory_order_release);
}
// 获取任务
task = tp->c_queue->front();
tp->c_queue->pop();
}
task(td->name_);
}
}
private:
// 新增互斥量,用于保护队列切换操作
std::mutex swapMutex;
// ... 其他已存在的私有成员方法 ...
};
testMain.cc
是的,在
testMain.cc
中的 lambda 表达式可以作为参数传递给Task.hpp
中构造函数中的func_t
类型func
。这里的func_t
是一个std::function<int(int, int)>
类型别名,它能存储任何匹配int(int, int)
签名的可调用对象。Lambda 表达式由于能够捕获外部变量并在内部定义执行逻辑,因此它可以隐式转换为接受两个int
参数并返回int
的函数对象,从而满足func_t
类型的要求。具体来说,
Task
类构造函数接收两个整数参数x
和y
以及一个func_t
类型的参数func
,然后在调用Task
实例的operator()
时会执行传入的 lambda 函数,计算两个整数的和。在testMain.cc
中,创建了一个Task
对象t
,并传入了适当的 lambda 表达式,这个表达式会在线程池中的工作线程中被执行。
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
// 创建一个线程池实例
ThreadPool<Task> tp(3);
tp.run();
while (true)
{
// 生产任务
int x = rand() % 100 + 1;
usleep(7721);
int y = rand() % 30 + 1;
Task t(x, y, [](int x, int y) -> int {
return x + y;
});
// 记录日志
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 将任务推送到线程池
tp.pushTask(t);
sleep(1);
}
return 0;
}
testMain.cc
是一个C++源文件,它实现了主函数,用于测试前面定义的 ThreadPool
类以及与之配合使用的 Task
类。以下是详细的解释:
包含头文件:首先,包含了所需的头文件,包括实现线程池和相关辅助类的库,以及
<ctime>
、<cstdlib>
、<iostream>
和<unistd.h>
,分别用于时间处理、随机数生成、I/O操作和休眠控制。初始化随机数生成器:使用
srand
函数初始化随机数生成器,确保每次运行程序时产生的随机数序列不同。创建线程池:声明了一个
ThreadPool<Task>
类型的对象tp
,指定线程池中的线程数量为3个。这意味着在线程池中有3个工作线程可以并发处理任务。启动线程池:调用
tp.run()
方法启动线程池中所有的线程。每个线程都会进入一个循环,等待从任务队列中获取任务并执行。主循环:程序进入一个无限循环,在循环内部:
- 生成任务:使用
rand()
函数生成两个随机整数x
和y
,然后构造一个Task
对象t
,传入这两个随机数及一个Lambda函数,该函数负责计算两个整数的和。 - 记录日志:利用
logMessage
函数记录一条日志消息,表明已经准备好了一个任务,并显示其具体的加法运算表达式。 - 提交任务:调用
tp.pushTask(t)
方法将任务t
加入到线程池的任务队列中,等待工作线程来处理。 - 延迟:让主线程休眠1秒,模拟任务之间的延迟,避免过快地生成大量任务。
- 生成任务:使用
主函数结束:虽然无限循环理论上不会自动终止,但在实际应用中可能会有某种外部条件或者信号来控制循环的退出。最后,函数返回0,表示程序正常结束。
整个程序的设计是典型的生产者-消费者模式,主线程作为“生产者”,不断地生成随机任务并将其提交到线程池的任务队列中;而线程池内的工作线程作为“消费者”,从任务队列中取出任务并执行。日志系统则用于跟踪和记录任务的创建和执行情况。
完整代码:
lockGuard.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
// std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
// std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
log.hpp:
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
// printf("%s%s\n", stdBuffer, logBuffer);
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
Task.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
void operator ()(const std::string &name)
{
// std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl;
logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
// int type;
func_t func_;
};
thread.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// typedef std::function<void* (void*)> fun_t;
typedef void *(*fun_t)(void *);
class ThreadData
{
public:
void *args_;
std::string name_;
};
class Thread
{
public:
Thread(int num, fun_t callback, void *args) : func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
void start()
{
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
void join()
{
pthread_join(tid_, nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{
}
private:
std::string name_;
fun_t func_;
ThreadData tdata_;
pthread_t tid_;
};
threadPool.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num = 3;
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
private:
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this));
}
}
void run()
{
for (auto &iter : threads_)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 线程池本质也是一个生产消费模型
// void *routine(void *args)
// 消费过程
static void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty())
tp->waitCond();
// 读取任务
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
}
task(td->name_);
// lock
// while(task_queue_.empty()) wait();
// 获取任务
// unlock
// 处理任务
}
}
// 2. pushTask()
void pushTask(const T &task)
{
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
// test func
// void joins()
// {
// for (auto &iter : threads_)
// {
// iter->join();
// }
// }
~ThreadPool()
{
for (auto &iter : threads_)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread *> threads_;
int num_;
std::queue<T> task_queue_;
pthread_mutex_t lock;
pthread_cond_t cond;
};
testMain.cc:
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
// 创建一个线程池实例
ThreadPool<Task> tp(3);
tp.run();
while (true)
{
// 生产任务
int x = rand() % 100 + 1;
usleep(7721);
int y = rand() % 30 + 1;
Task t(x, y, [](int x, int y) -> int {
return x + y;
});
// 记录日志
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 将任务推送到线程池
tp.pushTask(t);
sleep(1);
}
return 0;
}
四、单例模式
- 单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
1、设计模式
设计模式,特别是在计算机科学领域,是一种广泛认可的、解决特定问题的最佳实践或通用方案,它并非具体代码,而是描述如何组织代码以解决某一类设计问题的模板。设计模式不仅提升了代码的可读性、可复用性和可维护性,也促进了团队间的沟通效率和设计的一致性。
2、单例模式
单例模式作为众多设计模式中的一种“经典”且“常用”的模式,它确保在任何情况下,对于特定类只有一个实例存在,并提供全局访问点来获取这个唯一的实例。这种模式适用于那些因为资源消耗较大或者逻辑上必须保持全局唯一性的场景,就像在一个家庭中,一个男人只能有一个合法的妻子一样,体现了单一性原则。
- 举例来说,在服务器开发中,有时会遇到需要一次性加载大量数据(例如上百GB)进内存的情况。这时,通常会采用单例模式来创建一个管理这些数据的类,确保整个应用生命周期中只有唯一的数据加载实例,避免重复加载和资源浪费。
3、特点
- 单例模式的核心特点是“单一实例”,即同一类的对象只有一个实例存在。
- 通过控制类的实例化过程,避免外部多次实例化同一个类,从而达到节约系统资源、控制共享资源的目的。
- 单例模式的应用场合通常包括那些需要频繁实例化但又希望减少系统开销的情况,或者需要维护全局唯一状态和服务的场景。
4、饿汉实现方式和懒汉实现方式
对于单例模式的实现方式,有两种主要的策略:
饿汉式: 在类加载时就立即初始化并创建单例实例,就像吃完饭立刻洗碗一样,不论下顿饭何时开始,碗已经准备好随时可以使用。这种方式保证了线程安全,但可能造成不必要的内存占用,尤其是在实例并不一定会被使用的前提下。
懒汉式: 实例的创建被延迟到真正需要时才发生,即首次访问该单例时才进行实例化。如同吃完饭先把碗放下,等到下次需要用碗时再去清洗。这种方式的核心理念在于“延时加载”,它可以优化程序启动速度,特别是当单例初始化过程复杂或资源消耗较大的时候。然而,单纯的懒汉式实现可能会带来线程安全问题,因此在多线程环境下通常需要额外的同步机制来保证安全地创建单例。
5、两种方式实现
饿汉式实现单例模式:
template <typename T>
class Singleton {
private:
static T data; // 静态成员变量,类加载时初始化
public:
static T* GetInstance() {
return &data; // 直接返回预先创建好的唯一实例
}
};
// 注:实际使用时需要在类外部定义静态成员变量T data
// 示例:T Singleton<T>::data;
通过上述饿汉式单例模式的实现,无论何时通过Singleton<T>类获取实例,系统都会确保在整个进程范围内只存在一个T类型的实例。类加载时,静态成员变量data会被初始化,这样在任何时候调用GetInstance()方法,都只会返回同一个已初始化的实例。
懒汉式实现单例模式(非线程安全版本):
template <typename T>
class Singleton {
private:
static T* inst; // 静态成员变量,初始值为NULL
public:
static T* GetInstance() {
if (inst == NULL) { // 第一次调用时判断是否已创建实例
inst = new T(); // 若未创建,则新建一个实例
}
return inst; // 返回已存在的实例
}
};
// 同样需在类外部定义静态成员变量T* Singleton<T>::inst = nullptr;
此处的懒汉式单例模式实现确实存在一个显著问题,即线程不安全。在多线程环境下,若多个线程同时首次调用GetInstance()方法,由于缺少互斥锁等同步机制,可能会导致创建多个T类型的实例。不过,一旦实例被创建出来,后续调用GetInstance()则不会再出现此问题,因为它会直接返回先前创建的实例。
为了实现线程安全的懒汉式单例模式,通常会在GetInstance()方法内部添加适当的同步机制,如使用mutex或其他锁机制来确保同一时间只有一个线程执行new T()操作。
6、懒汉方式实现单例模式(线程安全版本)
在多线程环境中采用的单例模式是为了确保在整个程序运行期间,无论多少个线程试图访问 ThreadPool
类,都只会创建一个 ThreadPool
实例。这样做的好处有:
资源利用率:线程池是一种管理线程资源的机制,其中包含了一组可复用的工作线程和任务队列。单例模式可以确保所有的线程都在同一个线程池中调度任务,而不是各自创建独立的线程池,从而有效地节约系统资源(如内存、CPU时间等)。
一致性:通过单一的线程池管理所有的并发任务,能够更好地控制和协调这些任务的执行顺序、优先级以及资源分配,维护系统行为的一致性。
线程安全性:在多线程环境下,如果不采用单例模式,可能会有多个线程同时创建多个线程池实例,造成混乱和资源冲突。通过单例模式和双重检查锁机制,可以确保线程安全地初始化和访问线程池实例。
简化编程模型:程序员只需通过
ThreadPool::getThreadPool()
方法就能获取全局唯一的线程池实例,简化了多线程编程模型,无需关心线程池实例的创建和销毁。
总结来说,在多线程编程中,通过单例模式实现线程池能够提供高效、统一且线程安全的任务调度服务,有助于构建稳定可靠的并发系统。
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
// 定义默认线程数量
const int g_thread_num = 3;
// 类模板ThreadPool,代表一个线程池,可以处理不同类型的任务(T)
template <class T>
class ThreadPool
{
public:
// 获取线程池内部使用的互斥锁
pthread_mutex_t *getMutex()
{
return &lock;
}
// 判断任务队列是否为空
bool isEmpty()
{
return task_queue_.empty();
}
// 线程等待条件变量
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
// 从任务队列中取出并移除一个任务
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
private:
// ThreadPool构造函数,初始化线程池,创建指定数量的工作线程
ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, &ThreadPool::routine, this));
}
}
// 删除拷贝构造函数和赋值操作符,避免线程池实例的拷贝
ThreadPool(const ThreadPool<T> &other) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;
public:
// 获取线程池的单例实例
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{
// 使用双重检查锁定模式确保线程安全地初始化单例
if (nullptr == thread_ptr)
{
// 加锁
lockGuard lockguard(&mutex);
// 如果在加锁后仍然没有初始化,则创建一个新的线程池实例
if (nullptr == thread_ptr)
{
thread_ptr = new ThreadPool<T>(num);
}
// 不需要显式解锁,因为lockGuard会在作用域结束时自动解锁
}
return thread_ptr;
}
// 启动线程池中的所有工作线程
void run()
{
for (auto &iter : threads_)
{
iter->start();
// 记录线程启动成功的日志消息
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 静态方法,作为工作线程的执行入口
static void *routine(void *args)
{
// 解封装传入的参数
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
// 工作线程循环执行,直到收到终止信号
while (true)
{
T task;
// 上锁,同步访问任务队列
{
lockGuard lockguard(tp->getMutex());
// 等待非空任务到来
while (tp->isEmpty())
tp->waitCond();
// 从任务队列中取出一个任务
task = tp->getTask();
}
// 执行任务
task(td->name_);
// 这里假设任务完成后会自动重置循环条件,否则需要显式判断是否退出循环
}
}
// 将新任务推送到线程池的任务队列中
void pushTask(const T &task)
{
// 加锁,同步访问任务队列
lockGuard lockguard(&lock);
// 将任务放入队列,并通知条件变量,有一个新的任务可被处理
task_queue_.push(task);
pthread_cond_signal(&cond);
}
// 线程池析构函数,清理所有线程资源
~ThreadPool()
{
// 确保所有工作线程完成其任务后再销毁
for (auto &iter : threads_)
{
iter->join();
delete iter;
}
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
// 存储工作线程实例的容器
std::vector<Thread *> threads_;
// 工作线程的数量
int num_;
// 任务队列,用于存放待执行的任务
std::queue<T> task_queue_;
// 单例实例指针
static ThreadPool<T> *thread_ptr;
// 用于保护线程池单例初始化的全局互斥锁
static pthread_mutex_t mutex;
// 用于控制线程同步的互斥锁
pthread_mutex_t lock;
// 条件变量,用于实现线程间的通信,如通知工作线程有新任务到来
pthread_cond_t cond;
};
// 初始化静态成员变量
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
在这个代码片段中,ThreadPool
类采用了 C++ 的“懒汉式”线程不安全单例模式,并针对多线程环境进行了改进。首先我们看到 ThreadPool
类的实例化方法是通过 getThreadPool()
函数实现的,而不是传统的公有构造函数。这样做可以确保任何时候全局只存在一个 ThreadPool
实例。
为了实现线程安全,getThreadPool()
函数内嵌套了一层双重检查锁(Double-Checked Locking Pattern, DCLP)。具体步骤如下:
- 首先定义一个静态指针
thread_ptr
作为单例实例的全局存储,并初始化为nullptr
。private: static ThreadPool<T>* thread_ptr; }; template <typename T> ThreadPool<T>* ThreadPool<T>::thread_ptr = nullptr;
- 定义一个静态互斥锁
mutex
,用于在多线程环境下保护thread_ptr
的初始化操作。private: static pthread_mutex_t mutex; }; template <typename T> pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
- 在
getThreadPool()
函数内部,首先检查thread_ptr
是否为nullptr
,如果不是,则直接返回已存在的实例,无需加锁。public: // 获取线程池的单例实例 static ThreadPool<T>* getThreadPool(int num = g_thread_num) { // 使用双重检查锁定模式确保线程安全地初始化单例 if (nullptr == thread_ptr) { // 加锁 lockGuard lockguard(&mutex); // 如果在加锁后仍然没有初始化,则创建一个新的线程池实例 if (nullptr == thread_ptr) { thread_ptr = new ThreadPool<T>(num); } // 不需要显式解锁,因为lockGuard会在作用域结束时自动解锁 } return thread_ptr; }
- 如果
thread_ptr
为nullptr
,则利用lockGuard
类对mutex
进行加锁,然后再次检查thread_ptr
是否为nullptr
,这是因为有可能在第一次检查后,另一个线程已经完成了初始化。只有当第二次检查仍然发现thread_ptr
为nullptr
时,才会真正创建ThreadPool
单例实例。 - 创建完单例实例后,将其赋值给
thread_ptr
,然后释放锁。此后其他线程再调用getThreadPool()
时,就会直接返回已经创建好的单例实例,避免重复初始化。
- 如果
通过这样的设计,既保证了单例的唯一性,又减少了多线程环境下不必要的锁竞争,提高了系统性能。同时,为了避免拷贝和赋值操作,还重载了复制构造函数和赋值运算符,并设置为删除(= delete
),确保单例不可被复制。
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
// void *run(void *args)
// {
// while(true)
// {
// ThreadPool<Task>::getThreadPool();
// }
// }
int main()
{
// logMessage(NORMAL, "%s %d %c %f \n", "这是一条日志信息", 1234, 'c', 3.14);
srand((unsigned long)time(nullptr) ^ getpid());
// ThreadPool<Task> *tp = new ThreadPool<Task>();
// ThreadPool<Task> *tp = ThreadPool<Task>::getThreadPool();
// 那么,如果单例本身也在被多线程申请使用呢??
ThreadPool<Task>::getThreadPool()->run();
//thread1,2,3,4
while(true)
{
//生产的过程,制作任务的时候,要花时间
int x = rand()%100 + 1;
usleep(7721);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
// std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl;
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
// 推送任务到线程池中
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
首先,程序初始化随机数种子,确保每次运行时产生的随机数不同。这是通过调用
srand((unsigned long)time(nullptr) ^ getpid())
实现的,使用当前时间和进程ID组合生成随机种子。紧接着,程序调用了
ThreadPool<Task>::getThreadPool()->run();
,这一行代码获取了全局唯一的ThreadPool<Task>
实例,并启动了线程池。线程池中的线程将会在routine
函数中无限循环等待并处理任务。进入
while(true)
循环,模拟了一个不断产生新任务的生产者角色。在循环中:a. 随机生成两个整数
x
和y
,模拟计算任务的具体参数。b. 添加一定延时(
usleep(7721)
),模拟任务的创建需要消耗一定的时间。c. 创建一个
Task
对象t
,并将x
、y
以及一个 lambda 函数作为参数传入,lambda 函数负责计算x+y
的结果。d. 使用
logMessage(DEBUG, ...)
函数记录日志,表明任务已经被创建完成。e. 调用
ThreadPool<Task>::getThreadPool()->pushTask(t);
将生成的任务t
加入到线程池的任务队列中。f. 使用
sleep(1)
休眠1秒,以便连续不断地生成新的任务。整个
while(true)
循环会一直持续运行,除非程序被手动停止或遇到异常终止。
五、STL,智能指针和线程安全
1、STL中的容器是否是线程安全的?
- STL中的容器并不是设计为线程安全的,其根本原因在于STL追求极致的性能表现。
为了达到最高的效率,STL容器的设计并未内置线程同步机制。一旦在多线程环境下强制实施锁机制来保障线程安全,很可能会引入显著的性能开销,特别是考虑到不同容器类型(例如哈希表中的锁表与锁桶策略)所需的同步复杂度各异。因此,STL容器默认不具备线程安全特性,如果要在多线程应用程序中使用它们,开发人员必须自行确保适当的线程同步措施得以实现。
2、智能指针是否是线程安全的?
至于智能指针,情况有所不同:
unique_ptr
是独占所有权的智能指针,它的生命周期只与其所在的作用域绑定,不涉及多个线程共享资源,故在单线程情境下使用时无需担心线程安全问题。然而,当试图在多线程环境下跨线程转移所有权时,依然需要额外的同步控制以保证线程安全。shared_ptr
则涉及到多个对象共享资源的管理,尤其是在维护引用计数时可能存在线程安全问题。幸运的是,C++标准库在实现shared_ptr
时已经充分考虑到了这一挑战,采用了原子操作(如Compare and Swap, CAS)技术来确保引用计数的增减操作在多线程环境下的线程安全和高效性。这意味着在多线程环境下,尽管shared_ptr
自身并不完全排除线程安全问题(特别是在访问其所指向的对象时),但其引用计数管理部分是线程安全的。
六、 其他常见的各种锁(了解)
悲观锁: 在悲观锁的策略下,每当一个线程试图访问共享数据时,都会假定数据随时可能被其他线程更改,因此采取保守做法——在读取数据前先锁定数据(可能是获取读锁、写锁或行级锁)。这样一来,其他线程在尝试访问相同数据时,如果遇到已加锁的情况,就会被迫进入等待或阻塞状态,直至锁被释放。
乐观锁: 与悲观锁相反,乐观锁在读取数据阶段并不急于锁定资源,而是乐观地假设在同一时间段内不会有其他线程修改数据。当线程准备更新数据时,会检查自上次读取以来数据是否有过变更。乐观锁实现通常有两种方式:一是通过维护数据版本号,只有版本未发生变化才执行更新操作;二是利用 Compare-and-Swap(CAS)原子指令来验证和更新数据。
CAS操作: CAS(Compare-and-Swap)是一种原子指令,它的工作原理是在修改内存值时,首先比较当前内存地址上的值是否与期望的旧值一致,如果一致则将内存地址上的值更新为新的指定值。否则,不做任何修改并报告操作失败。在并发环境中,线程通常会反复执行CAS操作,也就是所谓的“自旋”,直至成功为止,以此来实现无锁并发控制。
自旋锁: 自旋锁是一种特殊的锁机制,当线程试图获取已被占用的锁时,不是立刻进入睡眠状态等待锁释放,而是持续循环(即“自旋”)检查锁是否已经被释放。这种方法适用于锁被持有的时间非常短的情况,因为在锁很快就能释放的情况下,自旋等待比线程上下文切换成本更低。然而,如果锁被长时间持有,自旋锁会导致线程空转消耗CPU资源。
公平锁与非公平锁:
- 公平锁:顾名思义,公平锁遵循先来后到的原则,线程在等待队列中的排序与获取锁的顺序一致,即最早请求锁的线程将在锁释放后第一个获得锁,保证所有线程都能公平地获取锁的机会。
- 非公平锁:与此相反,非公平锁在解锁后重新获取时,并不保证等待最久的线程一定能得到锁,新来的线程有可能抢占正在等待的线程,率先获得锁。非公平锁在某些情况下能提高系统的整体吞吐量,但也可能导致线程饥饿现象,即某个线程长期得不到锁。
七、读者写者问题
1、读写锁
在多线程编程中,经常遇到一种典型场景:共享数据结构被频繁读取而较少进行写入操作。尤其在读取过程中,通常伴随着复杂的查找运算,耗时较长。在这种情况下,对相关代码区域单纯地施加互斥锁可能会显著降低程序的整体性能。为了解决这一问题,我们引入了一种特殊的同步机制——读写锁。
读写锁作为一种优化手段,特别针对多读少写的场景进行了设计。其基本原理是:
读取操作:多个读取线程可以同时访问共享数据,实现“读取共享”。这意味着在无写入线程介入时,多个读取线程可以并发执行而不互相阻塞,从而提升了系统并行处理能力。
写入操作:写入操作具有排他性,同一时刻仅允许一个写入线程持有写锁并对数据进行修改,即“写入独占”。
写独占,读共享,读锁优先级高
值得注意的是,在读写锁的机制中,通常赋予读锁更高的优先级,也就是说,如果有读取请求等待,那么写入请求需要等待所有读取请求完成后才能获取写锁。这样的设计进一步优化了读密集型场景下的性能表现,确保了尽可能多的读取操作得以并发执行。
2、借助伪代码理解
// 读者(Reader)的伪代码实现
lock() // 加锁
reader_count++ // 计数器加一,表示新增一个读者
unlock() // 解锁,允许其他读者在计数器大于0时同时访问资源
# 读取行为:执行读取资源的操作
lock() // 再次加锁,在更新计数器前确保无其他线程修改
reader_count-- // 计数器减一,表示一个读者完成读取
unlock() // 解锁,允许其他线程访问资源
// 写者(Writer)的伪代码实现
lock() // 加锁
while (reader_count > 0) { // 检查是否有读者正在访问资源
wait(); // 如果有读者在读取,则写者等待
}
// 写入行为:当计数器为0时,执行写入资源的操作
unlock() // 解锁,允许其他线程访问资源
读者:
- 加锁(lock):防止其他线程同时访问资源。
- 计数器加一(reader_count++):表示有一个新的读者正在访问资源。
- 解锁(unlock):释放锁,允许其他读者同时访问资源(如果计数器大于0)。
- 执行读取操作(# 读取行为)。
- 再次加锁(lock):确保在更新计数器之前没有其他线程修改它。
- 计数器减一(reader_count--):表示一个读者已经完成读取。
- 解锁(unlock):释放锁,允许其他线程访问资源。
写者:
- 加锁(lock):防止其他线程同时访问资源。
- 检查计数器(if(reader_count>0)):如果还有读者在访问资源,则不允许写入并立即返回。
- 如果计数器为0,执行写入操作(// 写入行为)。
- 解锁(unlock):释放锁,允许其他线程访问资源。