1、 Linux线程概念
1.1、什么是线程
- 在一个程序里的一个执行路线就叫做线程(thread)更准确的定义是:线程是“一个进程内部的控制序列”
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
线程与进程:
- 进程是一个执行起来的程序,进程 = 内核数据结构 + 代码和数据;进程是承担分配系统资源的基本实体
- 线程是一个进程内的执行流,执行粒度比进程要更细,是进程内部的一个执行分支;线程是OS调度的基本单位
- Linux下的线程,是用进程模拟实现的,复用了进程的历史代码
- Linux执行流,统一称为轻量级进程(LWP),Linux中没有真正意义上的线程,线程使用LWP进行模拟实现的;所以一个进程是由多个PCB构成的,而不是只有一个PCB。
1.2 线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
最主要的区别是线程的切换虚拟内存空间依然是相同的,但是进程切换是不同的。这两种上下文切换的处理都是通过操作系统内核来完成的。内核的这种切换过程伴随的最显著的性能损耗是将寄存器中的内容切换出
另外一个隐藏的损耗是上下文的切换会扰乱处理器的缓存机制。简单的说,一旦去切换上下文,处理器中所有已经缓存的内存地址一瞬间都作废了。还有一个显著的区别是当你改变虚拟内存空间的时候,处理的页表缓冲TLB(快表)会被全部刷新,这将导致内存的访问在一段时间内相当的低效。但是在线程的切换中,不会出现这个问题,当然还有硬件cache - 线程占用的资源要比进程很少
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作
1.3 线程的缺点
- 性能损失:一个很少被外部事件阻塞的计算密集型线程往往无法与其它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
- 健壮性降低:编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
- 缺乏访问控制:进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
- 编程难度提高:编写与调试一个多线程程序比单线程程序困难得多
1.4 线程异常
- 单个线程如果出现除零,野指针问题导致线程奔溃,进程也会随着奔溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
1.5 线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
2、Linux线程控制
2.1 进程和线程
- 进程是资源分配的基本单位
- 线程是调度的基本单位
- 线程共享进程数据,但也拥有自己的一部分数据:线程ID、一组寄存器、栈、errno、信号屏蔽字、调度优先级
进程的多个线程共享同一个地址空间,因此Text Segment、Data Segment等都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,同时各个线程还共享以下进程资源和环境:文件描述符、每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义的信号处理函数)、当前工作目录、用户id和组id。
2.2 POSIX线程库
POSIX线程库:
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文<pthread,h>
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
线程创建
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
功能:创建新线程
参数:
thread
:指向线程标识符的指针
attr
:线程属性,NULL表示默认属性
start_routine
:线程执行的函数
arg
:传递给线程函数的参数返回值:成功返回0,失败返回错误码
例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <string.h> #include <stdlib.h> void* handler(void* arg) { int* i = static_cast<int*>(arg); std::cout << *i << std::endl; for(;;) { printf("I am thread 1\n"); sleep(1); } } int main() { pthread_t id; int ret; int arg = 10; if((ret = pthread_create(&id,nullptr,handler,&arg)) != 0) { fprintf(stderr,"pthread_create: %s\n",strerror(ret)); exit(EXIT_FAILURE); } int i; for(;;) { printf("I am main thread\n"); sleep(1); } return 0; }
获取线程ID
#include <pthread.h> //获取线程ID pthread_t pthread_self(void);
功能:返回一个pthread_t类型的变量,指代的是调用pthread_self函数的线程的"ID"。
#include <iostream> #include <unistd.h> #include <pthread.h> #include <string.h> #include <stdlib.h> void* handler(void* arg) { printf("thread1 ID:%0lx\n",pthread_self()); return nullptr; } int main() { pthread_t id; int ret; int arg = 10; if((ret = pthread_create(&id,nullptr,handler,&arg)) != 0) { fprintf(stderr,"pthread_create: %s\n",strerror(ret)); exit(EXIT_FAILURE); } printf("thread1 ID:%0lx\n",id); printf("mainthread ID:%0lx\n",pthread_self()); return 0; }
这个"ID"是当前调用线程的线程标识符(thread ID),该标识符由线程库分配,用于唯一标识线程。线程 ID 仅在进程内唯一,不同进程的线程 ID 可能重复。这个pthread_t类型的ID实际上是一个在虚拟地址空间上的一个地址,通过这个地址可以找到关于这个线程的基本信息,包括线程ID,线程栈,寄存器等属性。pthread库是用户空间的线程库,但底层依赖内核提供的系统调用(如
clone
)实现线程创建和管理。内核为每个线程分配全局唯一的线程ID(TID),而pthread库在用户层也会维护独立的线程标识符(pthread_t)。
内核分配的线程ID是系统全局唯一的,通过
gettid()
系统调用可获取。此ID用于内核调度和资源管理,不同于用户层的pthread_t
。例如:#include <unistd.h> #include <sys/syscall.h> pid_t tid = syscall(SYS_gettid);
pthread_t与内核TID的区别:
pthread_t
是pthread库的用户层标识符,类型由实现决定(可能为整数或结构体),两者关联性取决于实现,Linux中pthread_t
通常直接映射到内核TID。
使用PS命令查看线程信息(-L选项):ps -aL
LWP的ID就是真正的线程ID,即为TID。PID和LWP相同的是主线程,主线程的栈在虚拟地址空间的栈上,而其他线程的栈是在共享区(堆栈之间),因为pthread系列函数都是pthread库提供给我们的,而pthread库在共享区,所以除了主线程之外的其他线程的栈都在共享区。
线程终止
线程终止有三种基本方法:return返回(对主线程不适用);线程调用pthread_exit终止自己;线程调用pthread_cancel终止同一进程中的另一个线程。
pthread_exit函数void pthread_exit(void *retval);
功能:终止调用线程
参数:
retval
:线程返回值,可由其他线程通过pthread_join
获取pthead_cancel函数
int pthread_cancel(pthread_t thread);
- 功能:取消一个指定的执行中的线程(一定要在执行中)
- 参数:thread线程ID
- 返回值:成功返回0;失败返回错误码
线程等待
int pthread_join(pthread_t thread, void **retval);
功能:等待指定线程终止
参数:
thread
:要等待的线程ID
retval
:存储被等待线程的返回值返回值:成功返回0,失败返回错误码
为什么需要线程等待:因为退出的线程,其空间没有被释放,仍然在进程的地址空间内;并且创建新的线程不会复用刚才退出线程的地址空间
调用该函数的线程将挂起等待,直到id为thread的线程终止;thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的:
1、如果thread线程通过return返回,retval所指向的单元里存放的是thread线程函数的返回值。
2、如果thread线程被别的线程调用pthread_cancel异常终掉,retval所指向的单元里存放的是常数PTHREAD_CANCELED。
3、如果thread线程是自己调用pthread_exit终止的,retval所指向的单元存放的是传给pthread_exit的参数。
4、如果对thread线程的终止状态不感兴趣,可以传NULL给retval参数。例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cstdlib> void *thread1(void *arc) { printf("thread1 returning ... \n"); int *p = (int*)malloc(sizeof(int)); *p = 1111; return (void*)p; } void *thread2(void *arc) { printf("thread2 returning ... \n"); int *p = (int*)malloc(sizeof(int)); *p = 22222; pthread_exit((void*)p); } void *thread3(void *arc) { while(1) { std::cout << "thread3 is running ... \n" << std::endl; sleep(1); } return nullptr; } int main() { pthread_t id; void *ret; pthread_create(&id, nullptr, thread1, nullptr); pthread_join(id,&ret); printf("thread return,thread id:%lX,return code:%d\n",id,*(int*)ret); free(ret); pthread_create(&id, nullptr, thread2, nullptr); pthread_join(id,&ret); printf("thread return,thread id:%lX,return code:%d\n",id,*(int*)ret); free(ret); pthread_create(&id, nullptr, thread3, nullptr); sleep(3); pthread_cancel(id); pthread_join(id,&ret); if(ret == PTHREAD_CANCELED) printf("thread return, thread id %lX, return code:PTHREAD_CANCELED",id); else printf("thread return,thread id %lX, return code:NULL\n",id); return 0; }
线程分离
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);
功能:将线程标记为分离状态,线程终止时自动释放资源
参数:
thread
:要分离的线程ID返回值:成功返回0,失败返回错误码
可以是线程组内其他线程对目标线程分离,也可以是线程对自己分离:
pthread_detach(pthread_self());
例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cstdlib> void *handler(void *arc) { for(int i = 0; i < 5; i++) { std::cout << "thread is running...\n" << std::endl; sleep(2); } std::cout << "thread is return" << std::endl; return nullptr; } int main() { pthread_t id; pthread_create(&id,nullptr,handler,nullptr); pthread_detach(id); for(int i = 0; i < 5; i++) { std::cout << "main is running...\n" << std::endl; sleep(3); } std::cout << "main is return" << std::endl; return 0; }
3、线程ID和进程地址空间布局
- pthread_create函数会产生一个线程ID,存放在第一个参数指向的地址中。
- 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
- pthread_create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
- pthread_t 本质是一个进程地址空间上的一个地址:
4、线程封装
这是传入函数无参版本的:
//Thread.hpp
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <stdlib.h>
#include <string>
#include <pthread.h>
#include <functional>
#include <atomic>
#include <unistd.h>
namespace ThreadModule
{
std::atomic<std::uint32_t> cnt(1); // 原子计数器,用于形成线程编号
using func_t = std::function<void()>;
enum class TSTATUS
{
NEW,
RUNNING,
STOP
};
//封装自己的线程类
class Thread
{
private:
static void *Routine(void* args)
{
Thread* t = static_cast<Thread*>(args);
t->_func();
return nullptr;
}
public:
Thread(func_t func):_func(func),_status(TSTATUS::NEW),_joinable(true)
{
_name = "Thread-" + std::to_string(cnt++);
_pid = getpid();
}
bool Start()
{
//避免多次启动
if(_status == TSTATUS::RUNNING)
{
std::cout << _name << "启动失败,当前线程已经执行" << std::endl;
return false;
}
else
{
if(_status == TSTATUS::STOP)
std::cout << "正在重新启动线程" << std::endl;
int ret;
std::cout << _name << "已启动" << std::endl;
_status = TSTATUS::RUNNING;
if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0)
{
fprintf(stderr,"phread_create:%s\n",strerror(ret));
return false;
}
return true;
}
return false;
}
bool Stop()
{
if(_status == TSTATUS::RUNNING)
{
int ret;
if((ret = pthread_cancel(_tid)) != 0)
{
fprintf(stderr,"pthread_cancel:%s\n",strerror(ret));
return false;
}
_status = TSTATUS::STOP;
std::cout << _name << "已停止" << std::endl;
return true;
}
return false;
}
bool Join()
{
if(_joinable)
{
int ret;
if((ret = pthread_join(_tid,nullptr)) != 0)
{
fprintf(stderr,"pthread_join:%s\n",strerror(ret));
return false;
}
_status = TSTATUS::STOP;
std::cout << _name << "资源已被回收" << std::endl;
return true;
}
return false;
}
void Detach()
{
_joinable = false;
pthread_detach(_tid);
}
bool IsJoinable()
{
return _joinable;
}
std::string Name()
{
return _name;
}
~Thread()
{
}
public:
std::string _name;//线程名
pthread_t _tid;//线程id
pid_t _pid;//进程id
bool _joinable;//是否可以分离
func_t _func;//函数
TSTATUS _status;//状态
};
}
//main.cpp
#include "Thread.hpp"
using namespace ThreadModule;
void func()
{
int cnt = 5;
while(cnt--)
{
std::cout << "pthread running" << std::endl;
sleep(1);
}
return;
}
int main()
{
Thread thread(func);
thread.Start();
thread.Start();
thread.Join();
thread.Start();
thread.Detach();
for(int i = 1; i <= 10; i++)
{
std::cout << "main is running" << std::endl;
sleep(1);
}
return 0;
}
这是传入函数可变参数版本的:
//Thread.hpp
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <stdlib.h>
#include <string>
#include <pthread.h>
#include <functional>
#include <atomic>
#include <unistd.h>
namespace ThreadModule
{
std::atomic<std::uint32_t> cnt(1); // 原子计数器,用于形成线程编号
enum class TSTATUS
{
NEW,
RUNNING,
STOP
};
//封装自己的线程类
template <typename T>
class Thread
{
using func_t = std::function<void(T)>;
private:
static void *Routine(void* args)
{
Thread<T>* t = static_cast<Thread<T>*>(args);
t->_func(t->_data);
return nullptr;
}
public:
Thread(func_t func, T data):_func(func),_data(data),_status(TSTATUS::NEW),_joinable(true)
{
_name = "Thread-" + std::to_string(cnt++);
_pid = getpid();
}
bool Start()
{
//避免多次启动
if(_status == TSTATUS::RUNNING)
{
std::cout << _name << "启动失败,当前线程已经执行" << std::endl;
return false;
}
else
{
if(_status == TSTATUS::STOP)
std::cout << "正在重新启动线程" << std::endl;
int ret;
std::cout << _name << "已启动" << std::endl;
_status = TSTATUS::RUNNING;
if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0)
{
fprintf(stderr,"phread_create:%s\n",strerror(ret));
return false;
}
return true;
}
return false;
}
bool Stop()
{
if(_status == TSTATUS::RUNNING)
{
int ret;
if((ret = pthread_cancel(_tid)) != 0)
{
fprintf(stderr,"pthread_cancel:%s\n",strerror(ret));
return false;
}
_status = TSTATUS::STOP;
std::cout << _name << "已停止" << std::endl;
return true;
}
return false;
}
bool Join()
{
if(_joinable)
{
int ret;
if((ret = pthread_join(_tid,nullptr)) != 0)
{
fprintf(stderr,"pthread_join:%s\n",strerror(ret));
return false;
}
_status = TSTATUS::STOP;
std::cout << _name << "资源已被回收" << std::endl;
return true;
}
return false;
}
void Detach()
{
_joinable = false;
pthread_detach(_tid);
}
bool IsJoinable()
{
return _joinable;
}
std::string Name()
{
return _name;
}
~Thread()
{
}
public:
std::string _name;//线程名
pthread_t _tid;//线程id
pid_t _pid;//进程id
bool _joinable;//是否可以分离
func_t _func;//函数
TSTATUS _status;//状态
T _data;
};
}
//main.cpp
#include "Thread.hpp"
using namespace ThreadModule;
void func(int a)
{
int cnt = 5;
std::cout << a << std::endl;
while(cnt--)
{
std::cout << "pthread running" << std::endl;
sleep(1);
}
return;
}
int main()
{
int a = 888;
Thread<int> thread(func,a);
thread.Start();
thread.Start();
thread.Join();
thread.Start();
thread.Detach();
for(int i = 1; i <= 10; i++)
{
std::cout << "main is running" << std::endl;
sleep(1);
}
return 0;
}
5、线程互斥
相关概念
- 临界资源:多线程执行流共享的资源叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
互斥量mutex
大部分情况下,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况下变量归属于单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。多个线程并发的操作共享变量,会带来一些问题。
比如下面的抢票代码:
#include <iostream> #include <unistd.h> #include <string> #include <cstring> #include <pthread.h> #include <cstdlib> int ticket = 100; void *route(void *argc) { char *id = (char*)argc; while(1) { if(ticket > 0) { sleep(1); std::cout << id << "sells ticket:" << ticket-- << std::endl; } else break; } return nullptr; } int main() { pthread_t t1,t2,t3,t4; pthread_create(&t1,nullptr,route,(void*)"thread1"); pthread_create(&t2,nullptr,route,(void*)"thread2"); pthread_create(&t3,nullptr,route,(void*)"thread3"); pthread_create(&t4,nullptr,route,(void*)"thread4"); pthread_join(t1,nullptr); pthread_join(t2,nullptr); pthread_join(t3,nullptr); pthread_join(t4,nullptr); return 0; }
为什么票数可能减到-2呢?原因如下:
- if语句判断条件为真以后,代码可以并发得切换到其他线程
- sleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- --ticket操作本身就不是一个原子操作,而是对应三条汇编指令:
1、load:将共享变量ticket从内存加载到寄存器中
2、update:更新寄存器里面的值,执行-1操作
3、store:将新值从寄存器写回到共享变量ticket的内存地址
而大部分情况下,一条指令就是原子的(当然也有一些多条指令是原子的)要解决以上问题,需要做到三点:
- 代码必须实现互斥机制:当某个线程进入临界区执行时,其他线程不得进入同一临界区。
- 在多线程环境中,若临界区空闲且多个线程同时请求执行,系统只允许其中一个线程进入临界区。
- 非临界区执行的线程,不得妨碍其他线程正常进入临界区。
要做到这三点,本质上就是需要一把锁,Linux上提供的这把锁叫互斥锁:
互斥量/锁的接口
1、初始化互斥量:
方法1:静态分配:phread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
方法2:动态分配:
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
- mutex: 指向待初始化的互斥锁对象的指针。
- attr: 互斥锁属性对象指针,若为
NULL
则使用默认属性。- 成功返回
0
,失败返回错误码(非errno
)。2、销毁互斥量:
- 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
3、互斥量加锁和解锁:
int pthread_mutex_lock(pthread_mutex_t *mutex); //加锁 int pthread_mutex_ulock(pthread_mutex_t *mutex); //解锁 返回值:成功返回0,失败返回错误号
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock调用就会陷入阻塞(执行流被挂起),等待互斥量解锁。
使用锁来改进上面的模拟抢票系统:
#include <iostream> #include <unistd.h> #include <string> #include <cstring> #include <pthread.h> #include <cstdlib> int ticket = 100; pthread_mutex_t mutex; void *route(void *argc) { char *id = (char*)argc; while(1) { pthread_mutex_lock(&mutex); if(ticket > 0) { usleep(1000); std::cout << id << "sells ticket:" << ticket-- << std::endl; pthread_mutex_unlock(&mutex); } else { pthread_mutex_unlock(&mutex); break; } } return nullptr; } int main() { pthread_t t1,t2,t3,t4; pthread_mutex_init(&mutex,nullptr); pthread_create(&t1,nullptr,route,(void*)"thread1"); pthread_create(&t2,nullptr,route,(void*)"thread2"); pthread_create(&t3,nullptr,route,(void*)"thread3"); pthread_create(&t4,nullptr,route,(void*)"thread4"); pthread_join(t1,nullptr); pthread_join(t2,nullptr); pthread_join(t3,nullptr); pthread_join(t4,nullptr); pthread_mutex_destroy(&mutex); return 0; }
结果符合预期。
互斥量实现原理
单纯的i++或者++i都不是原子的,有可能会有数据一致性问题;为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。lock和unlock的伪代码改一下:
互斥量的封装 -- 使用RAII风格管理锁
#pragma once #include <iostream> #include <pthread.h> #include <string> namespace LockModule { class Mutex { public: Mutex(const Mutex&) = delete; const Mutex& operator=(const Mutex&) = delete; Mutex() { int ret; if((ret = pthread_mutex_init(&_mutex, nullptr)) != 0) { std::cout << "Mutex init error" << std::endl; exit(EXIT_FAILURE); } } void Lock() { int ret; if((ret = pthread_mutex_lock(&_mutex)) != 0) { std::cout << "Mutex lock error" << std::endl; exit(EXIT_FAILURE); } } void Unlock() { int ret; if((ret = pthread_mutex_unlock(&_mutex)) != 0) { std::cout << "Mutex unlock error" << std::endl; exit(EXIT_FAILURE); } } ~Mutex() { int ret; if((ret = pthread_mutex_destroy(&_mutex)) != 0) { std::cout << "Mutex destroy error" << std::endl; exit(EXIT_FAILURE); } } pthread_mutex_t* GetMutex() { return &_mutex; } private: pthread_mutex_t _mutex; }; //采用RAII风格,进行锁管理 class LockGuard { public: LockGuard(Mutex &mutex):_mutex(mutex) { _mutex.Lock(); } //析构函数中解锁 ~LockGuard() { _mutex.Unlock(); } private: Mutex &_mutex; }; }
6、线程同步
相关概念
条件变量:
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,直到线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
条件变量相关接口
条件变量(Condition Variable)是线程同步的一种机制,通常与互斥锁(Mutex)配合使用,用于阻塞线程直到特定条件满足。Linux通过POSIX线程库(pthread)提供相关接口,主要包括以下核心函数:
初始化条件变量
函数原型:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
参数说明:
cond
:指向条件变量的指针。attr
:属性参数,通常设为NULL
表示默认属性。静态初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁条件变量
函数原型:
int pthread_cond_destroy(pthread_cond_t *cond);
注意事项:
- 必须在没有线程等待该条件变量时调用。
- 静态初始化的条件变量无需销毁。
等待条件变量
函数原型:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
行为:
- 原子性地释放
mutex
并阻塞当前线程。- 被唤醒后重新获取
mutex
并返回。超时等待:
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
abstime
:绝对时间(从1970年1月1日起的秒和纳秒)。
唤醒等待线程
唤醒单个线程:
int pthread_cond_signal(pthread_cond_t *cond);
唤醒所有线程:
int pthread_cond_broadcast(pthread_cond_t *cond);
使用示例
#include <iostream> #include <string> #include <pthread.h> #include <unistd.h> #include <cstring> #include <cstdlib> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; void *handler(void *argc) { std::string name = (char*)argc; while(1) { pthread_mutex_lock(&mutex); pthread_cond_wait(&cond,&mutex); std::cout << name << " is running" << std::endl; pthread_mutex_unlock(&mutex); //sleep(1); } } int main() { pthread_t tid1,tid2; int ret; if(ret = (pthread_create(&tid1,nullptr,handler,(void*)"thread-1")) != 0) { std::cerr << "pthread_create error: " << strerror(ret) << std::endl; return -1; } if(ret = (pthread_create(&tid2,nullptr,handler,(void*)"thread-2")) != 0) { std::cerr << "pthread_create error: " << strerror(ret) << std::endl; return -1; } while(1) { pthread_cond_signal(&cond); sleep(1); } if(ret = (pthread_join(tid1,nullptr)) != 0) { std::cerr << "pthread_join error: " << strerror(ret) << std::endl; return -1; } if(ret = (pthread_join(tid2,nullptr)) != 0) { std::cerr << "pthread_join error: " << strerror(ret) << std::endl; return -1; } return 0; }
注意事项
- 虚假唤醒:即使未调用唤醒函数,线程也可能被唤醒。通常需在循环中检查条件。
- 死锁风险:确保在调用
pthread_cond_wait
前持有正确的互斥锁。- 性能优化:频繁唤醒可能影响性能,建议结合事件驱动模型优化。
为什么pthread_cond_wait需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
- 举一个错误的设计:先上锁,发现条件不满足,直接解锁,然后在等待条件变量?(错误的)例如:
由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait。所以解锁和等待必须是一个原子操作。// 错误的设计 pthread_mutex_lock(&mutex); while (condition_is_false) { pthread_mutex_unlock(&mutex); //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过 pthread_cond_wait(&cond); pthread_mutex_lock(&mutex); } pthread_mutex_unlock(&mutex);
int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t *mutex);进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。
条件变量使用规范
等待条件代码:
pthread_mutex_lock(&mutex); while (条件为假) pthread_cond_wait(cond, mutex); //在这里等待,被唤醒后还需要通过while再次检测条件是否为真,为真继续向下执行,为假继续等待 修改条件 pthread_mutex_unlock(&mutex);
给条件发送信号代码:
pthread_mutex_lock(&mutex); 设置条件为真 pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);
条件变量的封装
锁使用的是上面封装的锁
#ifndef __MYCOND__HPP__ #define __MYCOND__HPP__ #include <iostream> #include <pthread.h> #include <cstring> #include "Lock.hpp" namespace CondModule { using namespace LockModule; class Cond { public: Cond() { int ret = pthread_cond_init(&_cond,nullptr); if(ret != 0) { fprintf(stderr,"pthread_cond_init:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void Wait(Mutex &mutex) { int ret = pthread_cond_wait(&_cond,mutex.GetMutex()); if(ret != 0) { fprintf(stderr,"pthread_cond_wait:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void Notify() { int ret = pthread_cond_signal(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_signal:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void NotifyAll() { int ret = pthread_cond_broadcast(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_broadcast:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } ~Cond() { int ret = pthread_cond_destroy(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_destroy:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } private: pthread_cond_t _cond; }; } #endif
7、生产者消费者模型
概念
生产者消费者模型是一种经典的并发编程模式,用于解决多线程或多进程环境中生产者和消费者之间的协同问题。生产者负责生成数据或任务,消费者负责处理这些数据或任务,两者通过共享的缓冲区(如队列)进行通信。该模型的核心目标是解耦生产与消费过程,避免两者因速度不匹配导致的资源浪费或阻塞。
简单来说就是:
- 3种关系 -- 生产者与生产者:互斥;消费者和消费者:互斥;生产者和消费者:互斥且同步;
- 2种角色 -- 生产者和消费者;
- 1个交易场所 -- 共享缓冲区;
生产者消费者模型的优点:
- 解耦
- 支持并发
- 支持忙先不均(生产者消费者两者速度不均)
基于BlockingQueue的生产者消费者模型
概念:在多线程编程中阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
c++ queue模拟阻塞队列的单生产单消费模型
//BlockQueue.hpp #ifndef __BLOCKQUEUE__ #define __BLOCKQUEUE__ #include <iostream> #include <queue> #include <pthread.h> #include <string> namespace myBlockQueue { template<typename T> class BlockQueue { private: bool isFull() { return _block_queue.size() >= _cap; } bool isEmpty() { return _block_queue.empty(); } public: BlockQueue(int cap) { _cap = cap; _product_wait_num = 0; _consum_wait_num = 0; pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_product_cond,nullptr); pthread_cond_init(&_consum_cond,nullptr); } //生产者调用接口 bool Enqueue(const T &in) { pthread_mutex_lock(&_mutex); //判断是否为满,满了就等待 //if(isFull()) //这里使用while(isFull())而不是使用if(isFull())的原因\ 避免造成伪唤醒,即线程被唤醒但条件并不满足\ 例如:此时有五个生产者在等待,但此时生产者最多生产的数据只能是一个,\ 如果此时同时唤醒五个生产者,那必然只有一个生产者此时可以生产数据(生产一个后队列就满了),另外四个还不可以生产\ 但这四个也被唤醒了,也会向下执行代码,所以需要while再次循环判断是否满足条件,而不是使用if让执行流向下执行 while(isFull()) { _product_wait_num++; pthread_cond_wait(&_product_cond,&_mutex); _product_wait_num--; } //生产数据 _block_queue.push(in); //此时队列不为空了,判断是否有消费者在等待,通知消费者来消费 if(_consum_wait_num > 0) { pthread_cond_signal(&_consum_cond); } pthread_mutex_unlock(&_mutex); return true; } //消费者调用接口 bool Pop(T *out) { pthread_mutex_lock(&_mutex); //这里使用while循环判断条件是否满足,同样是避免造成伪唤醒的问题 while(isEmpty()) { _consum_wait_num++; pthread_cond_wait(&_consum_cond,&_mutex); _consum_wait_num--; } //消费数据 *out = _block_queue.front(); _block_queue.pop(); //判断是否有生产者在等待,通知生产者生产数据 if(_product_wait_num > 0) { pthread_cond_signal(&_product_cond); } pthread_mutex_unlock(&_mutex); return true; } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_product_cond); pthread_cond_destroy(&_consum_cond); } private: std::queue<T> _block_queue; int _cap; pthread_mutex_t _mutex; pthread_cond_t _product_cond; pthread_cond_t _consum_cond; int _product_wait_num; int _consum_wait_num; }; } #endif
//main.cpp #include "BlockQueue.hpp" #include <unistd.h> #include <cstdlib> using namespace myBlockQueue; void* ProductStart(void* args) { BlockQueue<int>* bq = (BlockQueue<int>*)args; while(true) { int data = rand() % 100 + 1; bq->Enqueue(data); printf("product data:%d\n",data); sleep(2); } } void* ConsumStart(void* args) { BlockQueue<int>* bq = (BlockQueue<int>*)args; while(true) { int data; bq->Pop(&data); printf("consum data:%d\n",data); sleep(4); } } int main() { BlockQueue<int> bq(10); pthread_t product,consum; pthread_create(&product,nullptr,ProductStart,&bq); pthread_create(&consum,nullptr,ConsumStart,&bq); pthread_join(product,nullptr); pthread_join(consum,nullptr); return 0; }
执行结果:
c++ queue模拟阻塞队列的多生产多消费模型
在上述的单生产、单消费中已有消费者和生产者的关系,需要新加消费者和消费者的关系、生产者和生产者的关系。 但上述代码都是用同一个锁,所以天然形成了上面三个关系。所以上面的代码同样适用于多生产多消费模型。
改为多生产多消费模型+封装的互斥锁和信号量:
//BlockQueue.hpp #ifndef __BLOCKQUEUE__ #define __BLOCKQUEUE__ #include <iostream> #include <queue> #include <pthread.h> #include <string> #include "Cond.hpp" #include "Lock.hpp" namespace myBlockQueue { using namespace LockModule; using namespace CondModule; template <typename T> class BlockQueue { private: bool isFull() { return _block_queue.size() >= _cap; } bool isEmpty() { return _block_queue.empty(); } public: BlockQueue(int cap) { _cap = cap; _product_wait_num = 0; _consum_wait_num = 0; } // 生产者调用接口 bool Enqueue(const T &in) { // RAII思想,自动加锁解锁 LockGuard lock(_mutex); // 判断是否为满,满了就等待 // if(isFull()) // 这里使用while(isFull())而不是使用if(isFull())的原因\ 避免造成伪唤醒,即线程被唤醒但条件并不满足\ 例如:此时有五个生产者在等待,但此时生产者最多生产的数据只能是一个,\ 如果此时同时唤醒五个生产者,那必然只有一个生产者此时可以生产数据(生产一个后队列就满了),另外四个还不可以生产\ 但这四个也被唤醒了,也会向下执行代码,所以需要while再次循环判断是否满足条件,而不是使用if让执行流向下执行 while (isFull()) { _product_wait_num++; _product_cond.Wait(_mutex); _product_wait_num--; } // 生产数据 _block_queue.push(in); // 此时队列不为空了,判断是否有消费者在等待,通知消费者来消费 if (_consum_wait_num > 0) { _consum_cond.Notify(); } return true; } // 消费者调用接口 bool Pop(T *out) { LockGuard lock(_mutex); // 这里使用while循环判断条件是否满足,同样是避免造成伪唤醒的问题 while (isEmpty()) { _consum_wait_num++; _consum_cond.Wait(_mutex); _consum_wait_num--; } // 消费数据 *out = _block_queue.front(); _block_queue.pop(); // 判断是否有生产者在等待,通知生产者生产数据 if (_product_wait_num > 0) { _product_cond.Notify(); } return true; } ~BlockQueue() { // 不用自己调用析构 //_mutex.Destroy(); //_product_cond.Destroy(); //_consum_cond.Destroy(); } private: std::queue<T> _block_queue; int _cap; Mutex _mutex; Cond _product_cond; Cond _consum_cond; int _product_wait_num; int _consum_wait_num; }; } #endif
//main.cpp #include "BlockQueue.hpp" #include <unistd.h> #include <cstdlib> using namespace myBlockQueue; template <typename T> class Data { public: Data(BlockQueue<T>* bq,const std::string name): _bq(bq),_name(name) {} BlockQueue<T>* GetBlockQueue() { return _bq; } std::string GetName() { return _name; } private: BlockQueue<T>* _bq; std::string _name; }; void* ProductStart(void* args) { Data<int> *da = (Data<int>*)args; BlockQueue<int>* bq = da->GetBlockQueue(); std::string name = da->GetName(); while(true) { int val = rand() % 100 + 1; bq->Enqueue(val); printf("%s data:%d\n",name.c_str(),val); } return nullptr; } void* ConsumStart(void* args) { Data<int> *da = (Data<int>*)args; BlockQueue<int>* bq = da->GetBlockQueue(); std::string name = da->GetName(); while(true) { int val; bq->Pop(&val); printf("%s data:%d\n",name.c_str(),val); } return nullptr; } int main() { BlockQueue<int> bq(5); pthread_t product1,product2,product3,product4,product5; pthread_t consum1,consum2,consum3,consum4,consum5; Data<int> data1(&bq,"product-1"); Data<int> data2(&bq,"product-2"); Data<int> data3(&bq,"product-3"); Data<int> data4(&bq,"product-4"); Data<int> data5(&bq,"product-5"); Data<int> data6(&bq,"consum-1"); Data<int> data7(&bq,"consum-2"); Data<int> data8(&bq,"consum-3"); Data<int> data9(&bq,"consum-4"); Data<int> data10(&bq,"consum-5"); pthread_create(&product1,nullptr,ProductStart,&data1); pthread_create(&product2,nullptr,ProductStart,&data2); pthread_create(&product3,nullptr,ProductStart,&data3); pthread_create(&product4,nullptr,ProductStart,&data4); pthread_create(&product5,nullptr,ProductStart,&data5); pthread_create(&consum1,nullptr,ConsumStart,&data6); pthread_create(&consum2,nullptr,ConsumStart,&data7); pthread_create(&consum3,nullptr,ConsumStart,&data8); pthread_create(&consum4,nullptr,ConsumStart,&data9); pthread_create(&consum5,nullptr,ConsumStart,&data10); pthread_join(product1,nullptr); pthread_join(product2,nullptr); pthread_join(product3,nullptr); pthread_join(product4,nullptr); pthread_join(product5,nullptr); pthread_join(consum1,nullptr); pthread_join(consum2,nullptr); pthread_join(consum3,nullptr); pthread_join(consum4,nullptr); pthread_join(consum5,nullptr); return 0; }
基于环形队列的生产消费模型
POSIX信号量
POSIX信号量是多线程或多进程同步机制的重要工具,分为命名信号量和未命名信号量(基于内存的信号量)。命名信号量通过文件系统路径名标识,可用于不相关进程间同步;未命名信号量通常用于同一进程内的线程间或相关进程间的同步。
主要接口函数:
sem_init 初始化未命名信号量。原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared
为0表示线程间共享,非0表示进程间共享;value
为信号量初始值。成功返回0,失败返回-1。sem_open 创建或打开命名信号量。原型:
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
name
为信号量路径名,oflag
指定打开方式(如O_CREAT),mode
设置权限,value
为初始值。成功返回信号量指针,失败返回SEM_FAILED。sem_wait 信号量P操作(减一)。原型:
int sem_wait(sem_t *sem);
若信号量值为0则阻塞,直到信号量值变为正。成功返回0,失败返回-1。
sem_post 信号量V操作(加一)。原型:
int sem_post(sem_t *sem);
唤醒等待该信号量的线程/进程。成功返回0,失败返回-1。
sem_getvalue 获取信号量当前值。原型:
int sem_getvalue(sem_t *sem, int *sval);
成功时
sval
存储信号量值并返回0,失败返回-1。sem_close 关闭命名信号量。原型:
int sem_close(sem_t *sem);
释放进程关联的资源,但不销毁信号量。成功返回0,失败返回-1。
sem_unlink 删除命名信号量。原型:
int sem_unlink(const char *name);
当所有进程都关闭信号量后,系统自动销毁它。成功返回0,失败返回-1。
sem_destroy 销毁未命名信号量。原型:
int sem_destroy(sem_t *sem);
成功返回0,失败返回-1。
命名信号量(进程间同步):
sem_t *sem = sem_open("/my_sem", O_CREAT, 0644, 1); sem_wait(sem); // 临界区操作 sem_post(sem); sem_close(sem); sem_unlink("/my_sem");
未命名信号量(线程间同步):
sem_t sem; sem_init(&sem, 0, 1); sem_wait(&sem); // 临界区操作 sem_post(&sem); sem_destroy(&sem);
注意事项
- 命名信号量的名称通常以斜杠开头(如
/sem_name
),且长度受系统限制。sem_destroy
调用前需确保没有线程/进程在等待该信号量。- 多进程共享未命名信号量时,信号量需位于共享内存区域。
- 信号量操作是原子性的,但需注意错误处理(如检查返回值)。
信号量封装
#ifndef __MYSEM__ #define __MYSEM__ #include <iostream> #include <semaphore.h> namespace SemModule { class Sem { public: Sem(int n) : _num(n) { // 0表示线程间共享 sem_init(&_sem, 0, _num); } // P操作 void P() { sem_wait(&_sem); } // V操作 void V() { sem_post(&_sem); } ~Sem() { sem_destroy(&_sem); } int GetNum() { return _num; } private: sem_t _sem; int _num; }; } #endif
基于环形队列的生产消费模型环形队列采用数组模拟,用模运算来模拟环状特性:
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
现在使用信号量这个计数器,就很简单的进行多线程间的同步过程:
- 我们采用数组模拟,模运算来模拟环状特性,为空或为满时都指向同一个位置。
- 对于这个环形队列起始时有空间N个,数据0个,对于生产者来说关注的是剩余空间,对于消费者来说关注的是剩余数据,任何人访问临界资源之前都必须申请信号量。
//生产: int tail = 0; P(空间); ring[tail] = data; tail++; V(数据);
//消费: int head = 0; P(数据); int data = ring[head]; head++; V(空间);
当生产消费同时访问同一个位置时,只能是队列为空或为满:
如果队列为空,此时需要保证生产者,原子性先生产 -- 此时数据信号量为0,空间信号量为N,数据信号量阻塞,这样就形成生产消费同一位置互斥且同步,生产优先。
如果队列为满,此时需要保证消费者,原子性先消费 -- 此时空间信号量为0,数据信号量为N,空间信号量阻塞,这样就形成生产消费同一位置互斥且同步,消费优先。当生产消费不是同一个位置时,即除了队列为空或为满时,其他情况都是生产消费不在同一个位置,生产者和消费者,可以同时进行并发访问。
总结:为空为满时,通过信号量实现互斥同步;其他情况,生产者和消费者并发进行。
三种关系:生产者之间互斥;消费者之间互斥;生产和消费互斥和同步。
需要锁两个:生产者与生产者之间加锁;消费者与消费者之间加锁。
需要信号量两个:一个表示空间;一个表示数据。
代码:
//RingQueue.hpp #ifndef __RINGQUEUE__ #define __RINGQUEUE__ #include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> #include <string> #include <cstdlib> #include <unistd.h> #include "Sem.hpp" #include "Lock.hpp" namespace myRingQueue { using namespace LockModule; using namespace SemModule; template <typename T> class RingQueue { public: RingQueue(int cap) : _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0), _room_sem(cap), _data_sem(0) {} // 生产者入队 void Enqueue(const T &in) { _room_sem.P(); //这里申请信号量和上锁位置可以互换但是存在区别:先上锁则是访问临界资源和申请信号量串行处理;如果后上锁则是访问临界资源和申请信号量并行处理,提高效率。 { LockGuard lock(_productor_mutex); // 能进入这里此时一定有空间 _ring_queue[_productor_step++] = in; _productor_step %= _cap; } _data_sem.V(); } // 消费者出队 void Pop(T *out) { _data_sem.P(); { LockGuard lock(_consumer_mutex); *out = _ring_queue[_consumer_step++]; _consumer_step %= _cap; } _room_sem.V(); } ~RingQueue() {} private: std::vector<T> _ring_queue; // 环形队列 int _cap; // 上线容量 // 生产消费的下标 int _productor_step; int _consumer_step; // 定义信号量 Sem _room_sem; // 生产者关心 Sem _data_sem; // 消费者关心 // 锁 Mutex _productor_mutex; Mutex _consumer_mutex; }; } #endif
//Sem.hpp #ifndef __MYSEM__ #define __MYSEM__ #include <iostream> #include <semaphore.h> namespace SemModule { class Sem { public: Sem(int n) : _num(n) { // 0表示线程间共享 sem_init(&_sem, 0, _num); } // P操作 void P() { sem_wait(&_sem); } // V操作 void V() { sem_post(&_sem); } ~Sem() { sem_destroy(&_sem); } int GetNum() { return _num; } private: sem_t _sem; int _num; }; } #endif
//Lock.hpp #ifndef __LOCK__ #define __LOCK__ #include <iostream> #include <pthread.h> #include <string> namespace LockModule { class Mutex { public: Mutex(const Mutex&) = delete; const Mutex& operator=(const Mutex&) = delete; Mutex() { int ret; if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0) { std::cout << "Mutex init error" << std::endl; exit(EXIT_FAILURE); } } void Lock() { int ret; if(ret = (pthread_mutex_lock(&_mutex)) != 0) { std::cout << "Mutex lock error" << std::endl; exit(EXIT_FAILURE); } } void Unlock() { int ret; if(ret = (pthread_mutex_unlock(&_mutex)) != 0) { std::cout << "Mutex unlock error" << std::endl; exit(EXIT_FAILURE); } } ~Mutex() { int ret; if(ret = (pthread_mutex_destroy(&_mutex)) != 0) { std::cout << "Mutex destroy error" << std::endl; exit(EXIT_FAILURE); } } pthread_mutex_t* GetMutex() { return &_mutex; } private: pthread_mutex_t _mutex; }; //采用RAII风格,进行锁管理 class LockGuard { public: LockGuard(Mutex &mutex):_mutex(mutex) { _mutex.Lock(); } //析构函数中解锁 ~LockGuard() { _mutex.Unlock(); } private: Mutex &_mutex; }; } #endif
//main.cpp #include "RingQueue.hpp" using namespace LockModule; using namespace myRingQueue; using namespace SemModule; template <typename T> class Data { public: Data(RingQueue<T>* rq,const std::string name): _rq(rq),_name(name) {} RingQueue<T>* GetRingQueue() { return _rq; } std::string GetName() { return _name; } private: RingQueue<T>* _rq; std::string _name; }; void *product(void *argc) { sleep(3); Data<int> *data = static_cast<Data<int>*>(argc); RingQueue<int> *rq = data->GetRingQueue(); std::string name = data->GetName(); int val = 1; while(true) { rq->Enqueue(val++); std::cout << name << " product: " << val << std::endl; sleep(3); } return nullptr; } void *consum(void *argc) { sleep(10); Data<int> *data = static_cast<Data<int>*>(argc); RingQueue<int> *rq = data->GetRingQueue(); std::string name = data->GetName(); int val; while(true) { rq->Pop(&val); std::cout << name << " consume: " << val << std::endl; sleep(1); } return nullptr; } int main() { RingQueue<int> rq(4); pthread_t product1,product2,product3,product4,product5; pthread_t consum1,consum2,consum3,consum4,consum5; Data<int> data1(&rq,"product-1"); Data<int> data2(&rq,"product-2"); Data<int> data3(&rq,"product-3"); Data<int> data4(&rq,"product-4"); Data<int> data5(&rq,"product-5"); Data<int> data6(&rq,"consumer-1"); Data<int> data7(&rq,"consumer-2"); Data<int> data8(&rq,"consumer-3"); Data<int> data9(&rq,"consumer-4"); Data<int> data10(&rq,"consumer-5"); pthread_create(&product1, nullptr, product, &data1); pthread_create(&product2, nullptr, product, &data2); pthread_create(&product3, nullptr, product, &data3); pthread_create(&product4, nullptr, product, &data4); pthread_create(&product5, nullptr, product, &data5); pthread_create(&consum1, nullptr, consum, &data6); pthread_create(&consum2, nullptr, consum, &data7); pthread_create(&consum3, nullptr, consum, &data8); pthread_create(&consum4, nullptr, consum, &data9); pthread_create(&consum5, nullptr, consum, &data10); pthread_join(product1, nullptr); pthread_join(product2, nullptr); pthread_join(product3, nullptr); pthread_join(product4, nullptr); pthread_join(product5, nullptr); pthread_join(consum1, nullptr); pthread_join(consum2, nullptr); pthread_join(consum3, nullptr); pthread_join(consum4, nullptr); pthread_join(consum5, nullptr); return 0; }
#makefile bin=ringbuffer_cp cc=g++ src=$(wildcard *.cpp) obj=$(src:.cc=.o) $(bin):$(obj) $(cc) -o $@ $^ -lpthread %.o:%.cc $(cc) -c $< -std=c++17 .PHONY:clean clean: rm -f $(bin) .PHONY:test test: echo $(src) echo $(obj)
8、策略模式 和 日志
日志:日志是系统、应用程序或设备在运行过程中自动生成的记录文件,用于追踪事件、状态变化、错误信息或用户操作。日志通常以时间顺序存储,包含时间戳、日志内容、日志等级等关键信息,帮助分析系统行为或排查问题。
日志格式必须有的指标:时间戳、日志等级、日志内容;
可选指标:文件名行号、进程线程相关id信息;
日志有现成的解决方案,如:spdlog、glog、Boost.Log、Log4cxx等等,我们依旧采用自定义日志的方式。
采用日志格式:
[可读性很好的时间] [日志等级] [进程pid] [打印对应的文件名] [行号] - 消息内容支持可变参数例如:[2011-11-11 11:11:11] [DEBUG] [111111] [main.cc] [11] - hello world
这里我们采用设计模式--策略模式来进行日志的设计。
策略模式:策略模式(Strategy Pattern)属于行为型设计模式,允许在运行时动态选择算法或行为。它将算法封装成独立的类,使得算法可以独立于使用它的客户端变化。
核心思想:
- 定义策略接口:声明所有具体策略类必须实现的方法。
- 具体策略类:实现策略接口,提供具体的算法实现。
- 上下文类(Context):持有一个策略对象的引用,通过委托调用具体策略。
基于策略模式实现的日志
策略模式实现结构图:
日志记录流程时序图:
//Log.hpp #ifndef __LOG__ #define __LOG__ #include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <fstream> #include <sstream> #include <memory> #include <filesystem> #include <unistd.h> #include <time.h> #include "Lock.hpp" namespace LogModule { using namespace LockModule; //获取一下当前系统时间 std::string CurrentTime() { //时间戳 time_t time_stamp = time(nullptr); struct tm curr; localtime_r(&time_stamp, &curr); //将时间戳转化成可读性较强的时间信息 char buffer[1024]; snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d", curr.tm_year + 1900, curr.tm_mon + 1, curr.tm_mday, curr.tm_hour, curr.tm_min, curr.tm_sec); return buffer; } const std::string defaultlogpath = "./log/"; const std::string defaultlogname = "log.txt"; enum class LogLevel { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; std::string LevelToString(LogLevel level) { switch(level) { case LogLevel::DEBUG: return "DEBUG"; case LogLevel::INFO: return "INFO"; case LogLevel::WARNING: return "WARNING"; case LogLevel::ERROR: return "ERROR"; case LogLevel::FATAL: return "FATAL"; default: return "None"; } } //刷新策略 class LogStrategy { public: virtual ~LogStrategy() = default; virtual void SyncLog(const std::string &message) = 0; }; //控制台策略 class ConsoleLogStrategy:public LogStrategy { public: ConsoleLogStrategy(){} ~ConsoleLogStrategy(){} void SyncLog(const std::string &message) { LockGuard lockGuard(_lock); std::cout << message << std::endl; } private: Mutex _lock; }; //磁盘策略 class FileLogStrategy:public LogStrategy { public: FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname) :_logpath(logpath),_logname(logname) { //确定_logpath是否存在 LockGuard lockGuard(_lock); if(std::filesystem::exists(_logpath)) { return; } //不存在就尝试创建路径 try { std::filesystem::create_directories(_logpath); } catch(std::filesystem::filesystem_error &e) { std::cerr << e.what() << "\n"; } } ~FileLogStrategy(){} void SyncLog(const std::string &message) { LockGuard lockGuard(_lock); std::string log = _logpath + _logname; //./log/log.txt std::ofstream out(log, std::ios::app); //追加方式写入 //判断是否打开 if(!out.is_open()) { return; } out << message << "\n"; out.close(); } private: std::string _logpath; std::string _logname; Mutex _lock; }; //日志类:构建日志字符串,根据策略,进行刷新 class Logger { public: Logger() { //默认采用控制台策略 _strategy = std::make_shared<ConsoleLogStrategy>(); } //启动控制台策略 void EnableConsoleLog() { _strategy = std::make_shared<ConsoleLogStrategy>(); } //启动磁盘策略 void EnableFileLog() { _strategy = std::make_shared<FileLogStrategy>(); } ~Logger() {} //内部类:构建字符串 class LogMessage { public: LogMessage(LogLevel level,const std::string &filename,int line,Logger &logger) :_level(level),_currtime(CurrentTime()),_pid(getpid()),_filename(filename),_line(line),_logger(logger) { std::stringstream ssbuffer; ssbuffer << "[" << _currtime << "] " << "[" << LevelToString(_level) << "] " << "[" << _pid << "] " << "[" << _filename << "] " << "[" << _line << "] - "; _loginfo = ssbuffer.str(); } template <typename T> LogMessage &operator<<(const T &info) { std::stringstream ss; ss << info; _loginfo += ss.str(); return *this; } ~LogMessage() { if(_logger._strategy) _logger._strategy->SyncLog(_loginfo); } private: std::string _currtime;//当前日志的时间 LogLevel _level; //日志等级 pid_t _pid; //进程pid std::string _filename;//源文件名称 int _line; //日志所在的行号 Logger &_logger; //负责根据不同的策略进行刷新 std::string _loginfo; //一条完整的日志记录 }; LogMessage operator()(LogLevel level,const std::string &filename, int line) { return LogMessage(level,filename,line,*this); } private: std::shared_ptr<LogStrategy> _strategy; //日志刷新的策略方案 }; //全局一个logger对象 Logger logger; //__FILE__:表示当前文件名,__LINE__:表示当前调用所在行号 #define LOG(level) logger(level,__FILE__,__LINE__) #define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog() #define ENABLE_FILE_LOG() logger.EnableFileLog() } #endif
//Lock.hpp #pragma once #include <iostream> #include <pthread.h> #include <string> namespace LockModule { class Mutex { public: Mutex(const Mutex&) = delete; const Mutex& operator=(const Mutex&) = delete; Mutex() { int ret; if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0) { std::cout << "Mutex init error" << std::endl; exit(EXIT_FAILURE); } } void Lock() { int ret; if(ret = (pthread_mutex_lock(&_mutex)) != 0) { std::cout << "Mutex lock error" << std::endl; exit(EXIT_FAILURE); } } void Unlock() { int ret; if(ret = (pthread_mutex_unlock(&_mutex)) != 0) { std::cout << "Mutex unlock error" << std::endl; exit(EXIT_FAILURE); } } ~Mutex() { int ret; if(ret = (pthread_mutex_destroy(&_mutex)) != 0) { std::cout << "Mutex destroy error" << std::endl; exit(EXIT_FAILURE); } } pthread_mutex_t* GetMutex() { return &_mutex; } private: pthread_mutex_t _mutex; }; //采用RAII风格,进行锁管理 class LockGuard { public: LockGuard(Mutex &mutex):_mutex(mutex) { _mutex.Lock(); } //析构函数中解锁 ~LockGuard() { _mutex.Unlock(); } private: Mutex &_mutex; }; }
//main.cpp #include "Log.hpp" using namespace LogModule; int main() { logger.EnableFileLog(); LOG(LogLevel::DEBUG) << "Hello Chen Sun"; logger.EnableConsoleLog(); LOG(LogLevel::INFO) << "Hello CHEN SUN"; return 0; }
9、线程池
概念
线程池:线程池是一种多线程处理形式,通过预先创建并管理一组线程,避免频繁创建和销毁线程的开销。线程池的核心思想是将任务提交到队列中,由池中的线程自动执行,提高系统资源利用率和响应速度。
核心组件:
- 任务队列:存放待执行的任务,通常采用阻塞队列实现。
- 工作线程:池中实际执行任务的线程,循环从队列中获取任务并运行。
- 线程池管理器:负责创建、销毁线程,以及动态调整池的大小。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
线程池的种类:
- 固定大小线程池(FixedThreadPool):创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口
- 缓存线程池(CachedThreadPool):线程数量动态调整,空闲线程会被回收。适用于短期异步任务或负载波动大的场景,但可能因任务暴增耗尽资源。
线程池的优势:
- 降低资源消耗:复用已创建的线程,减少线程创建和销毁的开销。
- 提高响应速度:任务到达时直接由空闲线程执行,无需等待线程创建。
- 增强可管理性:统一监控和调优线程数量,避免无限制创建线程导致系统崩溃。
线程池设计
实现固定大小线程池,预先创建若干个工作线程;同时创建一个阻塞队列作为任务队列,存放待执行的任务;循环从队列中获取任务并让线程池中某个线程运行。
//ThreadPool.hpp #ifndef __THREADPOOL__ #define __THREADPOOL__ #include <iostream> #include <string> #include <cstring> #include <queue> #include <vector> #include <unistd.h> #include <memory> #include "Cond.hpp" #include "Lock.hpp" #include "Log.hpp" #include "Task.hpp" #include "Thread.hpp" namespace ThreadPoolModule { using namespace LogModule; using namespace LockModule; using namespace CondModule; using namespace ThreadModule; //用于接收单个线程的指针 using thread_t = std::shared_ptr<Thread<std::string>>; const static int defaultnum = 5; template<typename T> class ThreadPool { private: //判断任务队列是否为空 bool IsEmpty() { return _taskq.empty(); } //线程的执行逻辑 -- 1、拿任务;2、执行任务 void HandlerTask(std::string name) { LOG(LogLevel::INFO) << "Thread:" << name << " 开始运行"; while(true) { //1、拿任务 T t; { LockGuard lockguard(_lock); //任务队列如果为空且线程池运行(如果线程池停止,就不能继续等待) while(IsEmpty() && _isrunning) { _wait_num++; _cond.Wait(_lock); _wait_num--; } //如果线程池停止了,此时存在两个情况:\ 1、任务队列不为空--继续执行任务队列的任务;2、任务队列为空--直接退出即可 if(!_isrunning && IsEmpty()) break; t = _taskq.front(); _taskq.pop(); } //2、处理任务 -- 这里不需要加锁,可以并发执行任务 t(name); } LOG(LogLevel::INFO) << "thread:" << name << "退出"; } public: ThreadPool(int num_thread = defaultnum) :_thread_num(num_thread),_wait_num(0),_isrunning(false) { //创建自定义线程类对象 for(int i = 0; i < _thread_num; i++) { _threads.push_back(std::make_shared<Thread<std::string>>(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),"Thread-" + std::to_string(i + 1))); LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功"; } } //任务加入任务队列 void Equeue(T &&in) { LockGuard lockguard(_lock); //如果线程池停止,不能继续加入任务 if(!_isrunning) return; _taskq.push(std::move(in)); //通知线程队列已有任务,可以获取 if(_wait_num > 0) _cond.Notify(); } void Start() { if(_isrunning) return; LockGuard lockguard(_lock); _isrunning = true; for(auto &thread_ptr:_threads) { LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功"; thread_ptr->Start(); //启动线程 } } //执行剩下所有任务,停止线程池 void Stop() { LockGuard lockguard(_lock); if(_isrunning) { _isrunning = false; //唤醒所有线程去执行剩下的任务 if(_wait_num > 0) _cond.NotifyAll(); } } //回收线程 void Wait() { for(auto &thread_ptr:_threads) { thread_ptr->Join(); LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功"; } } ~ThreadPool() { //在析构函数中调用Stop和Wait Stop(); sleep(3); Wait(); } private: std::vector<thread_t> _threads; //线程数组 int _thread_num; //线程数量 std::queue<T> _taskq; //任务队列 int _wait_num; //任务等待数量 Mutex _lock; Cond _cond; bool _isrunning; }; } #endif
//ThreadPool.cpp #include "ThreadPool.hpp" #include "Task.hpp" #include <memory> using namespace ThreadPoolModule; int main() {// ENABLE_CONSOLE_LOG(); ENABLE_FILE_LOG(); std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>(); tp->Start(); int cnt = 10; while (cnt) { tp->Equeue(PushTask); cnt--; sleep(1); } return 0; }
//Thread.hpp #include <iostream> #include <cstring> #include <cstdlib> #include <stdlib.h> #include <string> #include <pthread.h> #include <functional> #include <atomic> #include <unistd.h> namespace ThreadModule { std::atomic<std::uint32_t> cnt(1); // 原子计数器,用于形成线程编号 enum class TSTATUS { NEW, RUNNING, STOP }; //封装自己的线程类 template <typename T> class Thread { using func_t = std::function<void(T)>; private: static void *Routine(void* args) { Thread<T>* t = static_cast<Thread<T>*>(args); t->_func(t->_data); return nullptr; } public: Thread(func_t func, T data):_func(func),_data(data),_status(TSTATUS::NEW),_joinable(true) { _name = "Thread-" + std::to_string(cnt++); _pid = getpid(); } bool Start() { //避免多次启动 if(_status == TSTATUS::RUNNING) { std::cout << _name << "启动失败,当前线程已经执行" << std::endl; return false; } else { if(_status == TSTATUS::STOP) std::cout << "正在重新启动线程" << std::endl; int ret; std::cout << _name << "已启动" << std::endl; _status = TSTATUS::RUNNING; if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0) { fprintf(stderr,"phread_create:%s\n",strerror(ret)); return false; } return true; } return false; } bool Stop() { if(_status == TSTATUS::RUNNING) { int ret; if((ret = pthread_cancel(_tid)) != 0) { fprintf(stderr,"pthread_cancel:%s\n",strerror(ret)); return false; } _status = TSTATUS::STOP; std::cout << _name << "已停止" << std::endl; return true; } return false; } bool Join() { if(_joinable) { int ret; if((ret = pthread_join(_tid,nullptr)) != 0) { fprintf(stderr,"pthread_join:%s\n",strerror(ret)); return false; } _status = TSTATUS::STOP; std::cout << _name << "资源已被回收" << std::endl; return true; } return false; } void Detach() { _joinable = false; pthread_detach(_tid); } bool IsJoinable() { return _joinable; } std::string Name() { return _name; } ~Thread() { } public: std::string _name;//线程名 pthread_t _tid;//线程id pid_t _pid;//进程id bool _joinable;//是否可以分离 func_t _func;//函数 TSTATUS _status;//状态 T _data; }; }
//Task.hpp #ifndef __TASK__ #define __TASK__ #include <iostream> #include <string> #include <functional> #include "Log.hpp" using namespace LogModule; using task_t = std::function<void(std::string name)>; void PushTask(std::string name) { LOG(LogLevel::DEBUG) << "我是一个推送数据到服务器的一个任务,我正在被执行" << '[' << name << ']'; } #endif
//Log.hpp #ifndef __LOG__ #define __LOG__ #include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <fstream> #include <sstream> #include <memory> #include <filesystem> #include <unistd.h> #include <time.h> #include "Lock.hpp" namespace LogModule { using namespace LockModule; //获取一下当前系统时间 std::string CurrentTime() { //时间戳 time_t time_stamp = time(nullptr); struct tm curr; localtime_r(&time_stamp, &curr); //将时间戳转化成可读性较强的时间信息 char buffer[1024]; snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d", curr.tm_year + 1900, curr.tm_mon + 1, curr.tm_mday, curr.tm_hour, curr.tm_min, curr.tm_sec); return buffer; } const std::string defaultlogpath = "./log/"; const std::string defaultlogname = "log.txt"; enum class LogLevel { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; std::string LevelToString(LogLevel level) { switch(level) { case LogLevel::DEBUG: return "DEBUG"; case LogLevel::INFO: return "INFO"; case LogLevel::WARNING: return "WARNING"; case LogLevel::ERROR: return "ERROR"; case LogLevel::FATAL: return "FATAL"; default: return "None"; } } //刷新策略 class LogStrategy { public: virtual ~LogStrategy() = default; virtual void SyncLog(const std::string &message) = 0; }; //控制台策略 class ConsoleLogStrategy:public LogStrategy { public: ConsoleLogStrategy(){} ~ConsoleLogStrategy(){} void SyncLog(const std::string &message) { LockGuard lockGuard(_lock); std::cout << message << std::endl; } private: Mutex _lock; }; //磁盘策略 class FileLogStrategy:public LogStrategy { public: FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname) :_logpath(logpath),_logname(logname) { //确定_logpath是否存在 LockGuard lockGuard(_lock); if(std::filesystem::exists(_logpath)) { return; } //不存在就尝试创建路径 try { std::filesystem::create_directories(_logpath); } catch(std::filesystem::filesystem_error &e) { std::cerr << e.what() << "\n"; } } ~FileLogStrategy(){} void SyncLog(const std::string &message) { LockGuard lockGuard(_lock); std::string log = _logpath + _logname; //./log/log.txt std::ofstream out(log, std::ios::app); //追加方式写入 //判断是否打开 if(!out.is_open()) { return; } out << message << "\n"; out.close(); } private: std::string _logpath; std::string _logname; Mutex _lock; }; //日志类:构建日志字符串,根据策略,进行刷新 class Logger { public: Logger() { //默认采用控制台策略 _strategy = std::make_shared<ConsoleLogStrategy>(); } //启动控制台策略 void EnableConsoleLog() { _strategy = std::make_shared<ConsoleLogStrategy>(); } //启动磁盘策略 void EnableFileLog() { _strategy = std::make_shared<FileLogStrategy>(); } ~Logger() {} //内部类:构建字符串 class LogMessage { public: LogMessage(LogLevel level,const std::string &filename,int line,Logger &logger) :_level(level),_currtime(CurrentTime()),_pid(getpid()),_filename(filename),_line(line),_logger(logger) { std::stringstream ssbuffer; ssbuffer << "[" << _currtime << "] " << "[" << LevelToString(_level) << "] " << "[" << _pid << "] " << "[" << _filename << "] " << "[" << _line << "] - "; _loginfo = ssbuffer.str(); } template <typename T> LogMessage &operator<<(const T &info) { std::stringstream ss; ss << info; _loginfo += ss.str(); return *this; } ~LogMessage() { if(_logger._strategy) _logger._strategy->SyncLog(_loginfo); } private: std::string _currtime;//当前日志的时间 LogLevel _level; //日志等级 pid_t _pid; //进程pid std::string _filename;//源文件名称 int _line; //日志所在的行号 Logger &_logger; //负责根据不同的策略进行刷新 std::string _loginfo; //一条完整的日志记录 }; LogMessage operator()(LogLevel level,const std::string &filename, int line) { return LogMessage(level,filename,line,*this); } private: std::shared_ptr<LogStrategy> _strategy; //日志刷新的策略方案 }; //全局一个logger对象 Logger logger; //__FILE__:表示当前文件名,__LINE__:表示当前调用所在行号 #define LOG(level) logger(level,__FILE__,__LINE__) #define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog() #define ENABLE_FILE_LOG() logger.EnableFileLog() } #endif
//Lock.hpp #pragma once #include <iostream> #include <pthread.h> #include <string> namespace LockModule { class Mutex { public: Mutex(const Mutex&) = delete; const Mutex& operator=(const Mutex&) = delete; Mutex() { int ret; if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0) { std::cout << "Mutex init error" << std::endl; exit(EXIT_FAILURE); } } void Lock() { int ret; if(ret = (pthread_mutex_lock(&_mutex)) != 0) { std::cout << "Mutex lock error" << std::endl; exit(EXIT_FAILURE); } } void Unlock() { int ret; if(ret = (pthread_mutex_unlock(&_mutex)) != 0) { std::cout << "Mutex unlock error" << std::endl; exit(EXIT_FAILURE); } } ~Mutex() { int ret; if(ret = (pthread_mutex_destroy(&_mutex)) != 0) { std::cout << "Mutex destroy error" << std::endl; exit(EXIT_FAILURE); } } pthread_mutex_t* GetMutex() { return &_mutex; } private: pthread_mutex_t _mutex; }; //采用RAII风格,进行锁管理 class LockGuard { public: LockGuard(Mutex &mutex):_mutex(mutex) { _mutex.Lock(); } //析构函数中解锁 ~LockGuard() { _mutex.Unlock(); } private: Mutex &_mutex; }; }
//Cond.hpp #ifndef __MYCOND__HPP__ #define __MYCOND__HPP__ #include <iostream> #include <pthread.h> #include <cstring> #include "Lock.hpp" namespace CondModule { using namespace LockModule; class Cond { public: Cond() { int ret = pthread_cond_init(&_cond,nullptr); if(ret != 0) { fprintf(stderr,"pthread_cond_init:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void Wait(Mutex &mutex) { int ret = pthread_cond_wait(&_cond,mutex.GetMutex()); if(ret != 0) { fprintf(stderr,"pthread_cond_wait:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void Notify() { int ret = pthread_cond_signal(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_signal:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } void NotifyAll() { int ret = pthread_cond_broadcast(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_broadcast:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } ~Cond() { int ret = pthread_cond_destroy(&_cond); if(ret != 0) { fprintf(stderr,"pthread_cond_destroy:%s\n",strerror(ret)); exit(EXIT_FAILURE); } } private: pthread_cond_t _cond; }; } #endif
#makefile bin=thread_pool cc=g++ src=$(wildcard *.cpp) obj=$(src:.cc=.o) $(bin):$(obj) $(cc) -o $@ $^ -lpthread %.o:%.cc $(cc) -c $< -std=c++17 .PHONY:clean clean: rm -f $(bin) $(obj) .PHONY:test test: echo $(src) echo $(obj)
[2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 构建线程Thread-1对象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 构建线程Thread-2对象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 构建线程Thread-3对象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 构建线程Thread-4对象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 构建线程Thread-5对象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 启动线程Thread-1...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 启动线程Thread-2...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-1 开始运行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 启动线程Thread-3...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 启动线程Thread-4...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-3 开始运行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-2 开始运行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 启动线程Thread-5...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-5 开始运行 [2025-06-17 16:56:51] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-5] [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-4 开始运行 [2025-06-17 16:56:52] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-5] [2025-06-17 16:56:53] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-1] [2025-06-17 16:56:54] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-3] [2025-06-17 16:56:55] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-4] [2025-06-17 16:56:56] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-2] [2025-06-17 16:56:57] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-5] [2025-06-17 16:56:58] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-1] [2025-06-17 16:56:59] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-3] [2025-06-17 16:57:00] [DEBUG] [11927] [Task.hpp] [15] - 我是一个推送数据到服务器的一个任务,我正在被执行[Thread-4] [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-4退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-2退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-5退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-1退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-3退出 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收线程Thread-1...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收线程Thread-2...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收线程Thread-3...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收线程Thread-4...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收线程Thread-5...成功
10、线程安全的单例模式
单例模式:某些类,只应该具有一个对象(实例),就称之为单例;在很多服务器开发场景中,经常需要让服务器加载很多的数据到内存中,此时往往要用一个单例的类来管理这些数据。
饿汉实现方式和懒汉实现方式:
- 饿汉方式在类加载时就完成单例对象的初始化,利用类加载机制保证线程安全。
template<typename T> class Singleton { static T data; public: static T* GetInstance() { return &data; } }; //只通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例。
- 懒汉模式延迟单例对象的初始化,仅在首次调用时创建实例。
template <typename T> class Singleton { static T* inst; public: static T* GetInstance() { if(inst == nullptr) inst = new T(); return inst; } }; //但这个懒汉模式线程不安全,可能存在两个线程同时调用,可能会创建出两份T对象的实例。 //懒汉模式,线程安全版本 template <typename T> class Singleton { volatile static T *inst; //需要设置volatile关键字,否则可能被编译器优化。 static std::mutex lock; //上面自定义的锁 public: static T *GetInstance() { if(inst == nullptr) { lock.lock(); if(inst == nullptr)//双重判断是为了不必要的锁竞争 inst = new T(); lock.unlock(); } return inst; } };
单例式线程池:
//ThreadPool.hpp
#ifndef __THREADPOOL__
#define __THREADPOOL__
#include <iostream>
#include <string>
#include <cstring>
#include <queue>
#include <vector>
#include <unistd.h>
#include <memory>
#include "Cond.hpp"
#include "Lock.hpp"
#include "Log.hpp"
#include "Task.hpp"
#include "Thread.hpp"
namespace ThreadPoolModule
{
using namespace LogModule;
using namespace LockModule;
using namespace CondModule;
using namespace ThreadModule;
// 用于接收单个线程的指针
using thread_t = std::shared_ptr<Thread<std::string>>;
const static int defaultnum = 5;
template <typename T>
class ThreadPool
{
private:
//复制拷贝禁用
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(const Thread<T>&) = delete;
//构造设置为私有
ThreadPool(int num_thread = defaultnum)
: _thread_num(num_thread), _wait_num(0), _isrunning(false)
{
// 创建自定义线程类对象
for (int i = 0; i < _thread_num; i++)
{
_threads.push_back(std::make_shared<Thread<std::string>>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), "Thread-" + std::to_string(i + 1)));
LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";
}
}
// 判断任务队列是否为空
bool IsEmpty()
{
return _taskq.empty();
}
// 线程的执行逻辑 -- 1、拿任务;2、执行任务
void HandlerTask(std::string name)
{
LOG(LogLevel::INFO) << "Thread:" << name << " 开始运行";
while (true)
{
// 1、拿任务
T t;
{
LockGuard lockguard(_lock);
// 任务队列如果为空且线程池运行(如果线程池停止,就不能继续等待)
while (IsEmpty() && _isrunning)
{
_wait_num++;
_cond.Wait(_lock);
_wait_num--;
}
//如果线程池停止了,此时存在两个情况:\
1、任务队列不为空--继续执行任务队列的任务;2、任务队列为空--直接退出即可
if (!_isrunning && IsEmpty())
break;
t = _taskq.front();
_taskq.pop();
}
// 2、处理任务 -- 这里不需要加锁,可以并发执行任务
t(name);
}
LOG(LogLevel::INFO) << "thread:" << name << "退出";
}
public:
static ThreadPool<T> *GetInstace(int num = defaultnum)
{
//懒汉方式
if(_instace == nullptr)
{
LockGuard lockguard(_mutex);
if(_instace == nullptr) //双重判断避免不必要的锁争抢问题
{
_instace = new ThreadPool<T>(num);
LOG(LogLevel::DEBUG) << "创建线程池单例";
return _instace;
}
}
LOG(LogLevel::DEBUG) << "获取线程池单例";
return _instace;
}
// 任务加入任务队列
void Equeue(T &&in)
{
LockGuard lockguard(_lock);
// 如果线程池停止,不能继续加入任务
if (!_isrunning)
return;
_taskq.push(std::move(in));
// 通知线程队列已有任务,可以获取
if (_wait_num > 0)
_cond.Notify();
}
void Start()
{
if (_isrunning)
return;
LockGuard lockguard(_lock);
_isrunning = true;
for (auto &thread_ptr : _threads)
{
LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";
thread_ptr->Start(); // 启动线程
}
}
// 执行剩下所有任务,停止线程池
void Stop()
{
LockGuard lockguard(_lock);
if (_isrunning)
{
_isrunning = false;
// 唤醒所有线程去执行剩下的任务
if (_wait_num > 0)
_cond.NotifyAll();
}
}
// 回收线程
void Wait()
{
for (auto &thread_ptr : _threads)
{
thread_ptr->Join();
LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";
}
}
~ThreadPool()
{}
private:
std::vector<thread_t> _threads; // 线程数组
int _thread_num; // 线程数量
std::queue<T> _taskq; // 任务队列
int _wait_num; // 任务等待数量
Mutex _lock;
Cond _cond;
bool _isrunning;
//添加单例模式
static ThreadPool<T> *_instace;
static Mutex _mutex;
};
//初始化
template<typename T>
ThreadPool<T> *ThreadPool<T>::_instace = nullptr;
template<typename T>
Mutex ThreadPool<T>::_mutex;
}
#endif
//ThreadPool.cpp
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>
using namespace ThreadPoolModule;
int main()
{
ENABLE_CONSOLE_LOG();
//ENABLE_FILE_LOG();
int cnt = 20;
//首次调用创建唯一对象
ThreadPool<task_t>::GetInstace(10)->Start();
while (cnt)
{
ThreadPool<task_t>::GetInstace()->Equeue(PushTask);
cnt--;
sleep(1);
}
ThreadPool<task_t>::GetInstace()->Stop();
sleep(3);
ThreadPool<task_t>::GetInstace()->Wait();
return 0;
}
11、线程安全和重入问题
线程安全:就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结
果。一般而言,多个线程并发同一段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进行操作,并且没有锁保护的情况下,容易出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
重入可以分为两种情况:
- 多线程重入函数
- 信号导致一个执行流重复进入函数
常见的线程不安全的情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况:
- 调用了malloc/free函数,因为malloc函数就是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系:
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
可重入与线程安全区别:
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入
12、死锁
1、死锁概念
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源而处于的一种永久等待状态。
- 为了方便表述,假设现在线程A,线程B,一个线程必须同时持有锁1和锁2,才能进行后续资源的访问:
申请一把锁是原子的,但申请两把锁就不一定是原子的了
可能造成结果就是:
- 死锁的四个必要条件:
1、互斥条件:一个资源每次只能被一个执行流使用
2、请求与保持条件:一个执行流因请求资源而阻塞时,对以获得的资源保持不放
3、不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺
4、循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
2、破坏死锁
死锁的产生需要满足四个必要条件:互斥条件、占有并等待、非抢占条件和循环等待。破坏其中任何一个条件即可避免死锁。
- 破坏互斥条件:互斥条件指资源一次只能被一个进程占用。破坏互斥条件可以通过允许多个进程共享资源,但某些资源(如打印机)无法共享,因此该方法适用性有限。
- 破坏占有并等待条件:
一次性申请所有资源:进程在运行前申请所有所需资源,否则不执行;
释放已占有资源:若进程无法获得新资源,则释放已占有资源并重新申请。 - 破坏非抢占条件:
若进程无法获得资源,则释放已占有资源,等待一段时间后重新申请。
操作系统强制回收资源,可能导致进程回滚。 - 破坏循环等待条件:
资源有序分配法:为资源编号,进程只能按编号顺序申请资源。
超时机制。
银行家算法:动态检查资源分配状态,避免进入不安全状态。
2.1 银行家算法
具体参考:操作系统——银行家算法(Banker's Algorithm) - 王陸 - 博客园
13、读写锁
读者写者问题:
- 三种关系:读者和读者--并发关系;读者和写者--互斥&&同步;写者和写者--互斥;
- 两种角色:读者和写者;
- 一个交易场所:缓冲区
伪代码:
uint32_t reader_count = 0; lock_t count_lock; lock_t writer_lock; //Reader //加锁 lock(count_lock); if(read_count == 0) lock(writer_lock); //只需要第一个读者上锁 ++reader_count; ulock(count_lock); //read lock(count_lock) --reader_count; if(reader_count == 0) unlock(writer_lock); ulock(count_lock); //Writer lock(writer_lock); //write unlock(writer_lock);
读写锁:
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率;读写锁就是专门处理这种多读少写的情况。
|
读者优先(Reader-Preference):在这种策略中,系统会尽可能多地允许多个读者同时访问资源(比如共享文件或数据),而不会优先考虑写者。这意味着当有读者正在读取时,新到达的读者会立即被允许进入读取区,而写者则会被阻塞,直到所有读者都离开读取区。读者优先策略可能会导致写者饥饿(即写者长时间无法获得写入权限),特别是当读者频繁到达时。
写者优先(Writer-Preference):在这种策略中,系统会优先考虑写者。当写者请求写入权限时,系统会尽快地让写者进入写入区,即使此时有读者正在读取。这通常意味着一旦有写者到达,所有后续的读者都会被阻塞,直到写者完成写入并离开写入区。写者优先策略可以减少写者等待的时间,但可能会导致读者饥饿(即读者长时间无法获得读取权限),特别是当写者频繁到达时。
读写锁接口:读写锁(Reader-Writer Lock)接口,用于实现多线程环境下的高效共享资源访问。读写锁允许并发读操作,但写操作必须独占访问。
Linux读写锁接口
Linux提供了读写锁(Reader-Writer Lock)的接口,用于实现多线程环境下的高效共享资源访问。读写锁允许并发读操作,但写操作必须独占访问。以下是主要接口和用法:
初始化读写锁:
读写锁的类型为
pthread_rwlock_t
,初始化方式有两种:静态初始化:
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
动态初始化(需销毁):
pthread_rwlock_init(&rwlock, NULL);
设置读写优先:
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); /* pref 共有 3 种选择 PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先, 可能会导致写者饥饿情况 PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先, 目前有 BUG, 导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先, 但写者不能递归加锁 */
获取读锁(共享锁):
- 阻塞方式获取读锁:
pthread_rwlock_rdlock(&rwlock);
- 非阻塞方式尝试获取读锁:
int ret = pthread_rwlock_tryrdlock(&rwlock); if (ret != 0) { // 获取锁失败处理 }
- 获取写锁(独占锁):
- 阻塞方式获取写锁:
pthread_rwlock_wrlock(&rwlock);
- 非阻塞方式尝试获取写锁:
int ret = pthread_rwlock_trywrlock(&rwlock); if (ret != 0) { // 获取锁失败处理 }
- 释放读写锁:
无论读锁还是写锁,统一使用:
pthread_rwlock_unlock(&rwlock);
销毁读写锁:
动态初始化的读写锁需要销毁:
pthread_rwlock_destroy(&rwlock);
示例代码:
#include <pthread.h> #include <stdio.h> pthread_rwlock_t rwlock; int shared_data = 0; void* reader(void* arg) { pthread_rwlock_rdlock(&rwlock); printf("Reader: %d\n", shared_data); pthread_rwlock_unlock(&rwlock); return NULL; } void* writer(void* arg) { pthread_rwlock_wrlock(&rwlock); shared_data++; printf("Writer: %d\n", shared_data); pthread_rwlock_unlock(&rwlock); return NULL; } int main() { pthread_rwlock_init(&rwlock, NULL); pthread_t threads[10]; for (int i = 0; i < 5; i++) { pthread_create(&threads[i], NULL, writer, NULL); } for (int i = 5; i < 10; i++) { pthread_create(&threads[i], NULL, reader, NULL); } for (int i = 0; i < 10; i++) { pthread_join(threads[i], NULL); } pthread_rwlock_destroy(&rwlock); return 0; }
注意事项:
- 读写锁可能导致写者饥饿问题(读者持续获取锁导致写者无法执行)
- 适合读多写少的场景,写多读少时性能可能不如互斥锁
- 销毁已锁定的读写锁会导致未定义行为
- 同一线程重复获取写锁可能导致死锁(除非使用递归锁属性)