POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P() --P操作
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V() --V操作
我们需要知道:
1.信号量的本质是一把计数器
2.申请信号量的本质就是预订资源
3.PV操作是原子的
以往把公共资源当做一个整体来使用时,需要mutex(锁),但如果不把公共资源当做一个整体,让多线程不访问临界资源的同一个区域呢?
那么可以这样做:申请信号量(申请成功,就一定有资源可以访问,可以不用再判断资源是否就绪,因为申请的时候就判断了)->访问指定的一个位置(访问临界资源)->释放信号量
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
对于“资源”的认识:生产者将空间视为资源,消费者将数据视为资源
多生产者多消费者模型--生产者派发任务--消费者执行任务
LockGuard.hpp
封装锁
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock): _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
RingQueue.hpp
环形队列
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size)
, _size(size)
, _p_step(0)
, _c_step(0)
{
//sem_init的第二个参数设置为0-->线程间共享
sem_init(&_space_sem, 0, size);//生产者信号量的初始化--信号量本质是资源的计数器,对于生产者而言,资源就是可利用的空间
sem_init(&_data_sem, 0, 0);//对于消费者而言,资源即数据,初始时未有数据
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{
// 生产
// 先申请信号量,再申请锁--相比先申请锁,再申请信号量,速度快
P(_space_sem);//生产者申请信号量--相当于资源(可利用空间)数目--
{
LockGuard lockGuard(&_p_mutex);//在该代码块内,构造时申请锁,析构时释放锁
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
V(_data_sem);//相当于资源(数据)数据++
}
void Pop(T *out)
{
// 消费
P(_data_sem);//消费者申请信号量,数据资源的数目--
{
LockGuard lockGuard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
V(_space_sem);//空间资源的数目++
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
std::vector<T> _ringqueue;
int _size;
int _p_step; // 生产者的生产位置
int _c_step; // 消费者的消费位置
sem_t _space_sem; // 生产者的信号量
sem_t _data_sem; // 消费者的信号量
pthread_mutex_t _p_mutex;//生产者的锁
pthread_mutex_t _c_mutex;//消费者的锁
};
Task.hpp
简单的计算任务
const int defaultvalue = 0;
const std::string opers = "+-*/%)(&%^";
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: data_x(x)
, data_y(y)
, oper(op)
, result(defaultvalue)
, code(ok)
{}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
void operator()()
{
Run();
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += " [";
s += std::to_string(code);
s += "]";
return s;
}
~Task()
{}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0: 结果可信 !0: 结果不可信
};
Makefile
testringqueue:Main.cc
g++ -o $@ $^ -lpthread
.PHONY:clean
clean:
rm testringqueue
Main.cc
测试代码
void *Productor(void *args)//生产者派发任务
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
// 1. 有数据
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
rq->Push(t);
}
}
void *Consumer(void *args)//消费者执行任务
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->Pop(&t);
t();//执行任务
std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ pthread_self());
pthread_t c[3], p[2];
//环形队列
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_create(&p[0], nullptr, Productor, rq);
pthread_create(&p[1], nullptr, Productor, rq);
pthread_create(&c[0], nullptr, Consumer, rq);
pthread_create(&c[1], nullptr, Consumer, rq);
pthread_create(&c[2], nullptr, Consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
return 0;
}
在介绍线程池之前,我们先聊聊题外话--日志
日志:1. 可以向显示器打印,也可以向文件中写入 2. 日志包括-->时间,内容,日志等级,文件名,代码行数,进程pid...
localtime:
vsnprintf:
va_start
va_end
设计日志:
Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
enum//日志等级
{
Debug = 0,
Info,
Warning,
Error,
Fatal
};
enum//日志输出方式
{
Screen = 10,//输出到屏幕
OneFile,//输出到一个文件中
ClassFile//输出到不同的文件中
};
std::string LevelToString(int level)
{
switch(level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir = "log";
class Log
{
public:
Log()
:style(defaultstyle)
,filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
}
~Log() {}
void Enable(int sty)//将日志输出方式设置成想要的--如果需要的话
{
style = sty;
}
std::string TimeStampExLocalTime()//获得时间--年月日时分秒
{
time_t currtime = time(nullptr);//获取当前时间--时间戳形式--转化我们日常中喜闻乐见的时间形式
struct tm* curr = localtime(&currtime);//struct tm结构体中包含年月日时分秒等
char time_buffer[128];
snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
curr->tm_year+1900,curr->tm_mon+1,curr->tm_mday,
curr->tm_hour,curr->tm_min,curr->tm_sec);
return time_buffer;
}
void WriteLogToOneFile(const std::string &logname, const std::string &message)//日志内容都往一个文件中打印
{
umask(0);
int fd = open(logname.c_str(),O_CREAT | O_WRONLY |O_APPEND,0666);
if(fd<0)
return;
write(fd,message.c_str(),message.size());
close(fd);
}
void WriteLogToClassFile(const std::string &levelstr, const std::string &message)//根据传入的日志等级,向不同的文件中打印
{
std::string logname = logdir;
logname+='/';
logname+=filename;
logname+=levelstr;
WriteLogToOneFile(logname,message);
}
void WriteLog(const std::string &levelstr, const std::string &message)//日志输出接口
{
switch(style)
{
case Screen:
std::cout << message;
break;
case OneFile:
WriteLogToClassFile("all", message);
break;
case ClassFile:
WriteLogToClassFile(levelstr, message);
break;
default:
break;
}
}
void LogMessage(int level, const char *format, ...) // 类C的一个日志接口 第一个参数-->日志等级 +可变参数
{
char leftbuffer[1024];
std::string levelstr = LevelToString(level);
std::string currtime = TimeStampExLocalTime();
std::string idstr = std::to_string(getpid());
snprintf(leftbuffer,sizeof(leftbuffer),"[%s][%s][%s] ",levelstr.c_str(), currtime.c_str(), idstr.c_str());
char rightbuffer[1024];
va_list args;//类似于 char *, void *,即指针
va_start(args, format);// args 指向了可变参数部分
vsnprintf(rightbuffer,sizeof(rightbuffer), format, args);//将可变参数部分写入rightbuffer
va_end(args); // args = nullptr;
std::string loginfo = leftbuffer;
loginfo += rightbuffer;
WriteLog(levelstr, loginfo);
}
private:
int style;//日志输出方式
std::string filename;//日志文件的名字
};
Makefile:
testLog:Main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -rf testLog
Main.cc:
#include <iostream>
#include "Log.hpp"
int main()
{
Log lg;
lg.Enable(OneFile);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
return 0;
}
线程池
线程池: 是一种线程使用模式
线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
线程池的应用场景:
1. 需要大量的线程来完成任务,且完成任务的时间比较短
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用
(突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误)
线程池示例:
1. 创建固定数量线程池,循环从任务队列中获取任务对象
2. 获取到任务对象后,执行任务对象中的任务接口
下面设计一个简单的单例线程池:
现在我们设定:队列中的任务就是简单的加减乘除取模运算
线程池ThreadPool.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
static const int defaultnum = 5;//默认预先创建好5个线程
class ThreadData
{
public:
ThreadData(const std::string &name)
: threadname(name)
{}
~ThreadData()
{}
public:
std::string threadname;
};
template <class T>
class ThreadPool//线程池
{
private:
ThreadPool(int thread_num = defaultnum)
:_thread_num(thread_num)
{
pthread_mutex_init(&_mutex,nullptr);//初始化锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
// 构建指定个数的线程
for(int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-";
threadname+=std::to_string(i+1);
ThreadData td(threadname);//线程的名字
_threads.emplace_back(threadname,
std::bind(&ThreadPool<T>::ThreadRun, this,
std::placeholders::_1),
td);//插入线程
lg.LogMessage(Info, "%s is created...\n", threadname.c_str());//打印日志
}
}
ThreadPool(const ThreadPool<T> &tp)=delete;//禁用拷贝构造
const ThreadPool<T> &operator=(const ThreadPool<T> tp)=delete;//禁用赋值重载
public:
static ThreadPool<T> *GetInstance()//得到唯一的线程池的指针
{
if(instance==nullptr)
{
LockGuard lockguard(&sig_lock);
if (instance == nullptr)
{
lg.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
}
return instance;
}
bool Start()//启动所有线程
{
for (auto &thread : _threads)
{
thread.Start();
lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void ThreadWait(const ThreadData &td)
{
lg.LogMessage(Debug, "no task, %s is sleeping...\n", td.threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);//等待条件变量就绪(即交易场所/队列中没有任务/数据可被线程拿走执行),线程会先解锁-->等待
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);//条件变量准备就绪(即交易场所/队列中有任务/数据可被线程拿走执行),每次唤醒一个线程
}
void ThreadRun(ThreadData &td)
{
while (true)
{
//取任务
T t;
{
LockGuard lockguard(&_mutex);//取交易场所/队列取任务/数据要先申请锁
while(_q.empty())//队列无任务--线程进入等待
{
ThreadWait(td);
lg.LogMessage(Debug, "thread %s is wakeup\n", td.threadname.c_str());
}
t = _q.front();
_q.pop();
}
//处理任务
t();//在测试时,T为Task类型,重载了(),自动调用Task类的Run(),完成简单的加减乘除计算任务
lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
td.threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
void Push(T &in)//生产/插入任务
{
lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
LockGuard lockguard(&_mutex);//去交易场所/队列生产任务也要先申请锁
_q.push(in);
ThreadWakeup();//队列中有任务了,进行通知
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
private:
std::queue<T> _q;//队列--提供场所
std::vector<Thread<ThreadData>> _threads;//预先要创建好一批线程
int _thread_num;
pthread_mutex_t _mutex;//锁--实现互斥机制--交易场所(队列)要使用到的锁
pthread_cond_t _cond;//条件变量--实现同步机制
static ThreadPool<T>*instance;//要实现一个线程池单例
static pthread_mutex_t sig_lock;//锁--得到指向线程池的指针instance需要用到的锁,因为instance全局有效,可被共享访问,属于临界资源,所以需要加锁保护
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;//全局锁的初始化
线程Thread.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
template<class T>
using func_t = std::function<void(T&)>;
template<class T>
class Thread
{
public:
Thread(const std::string &threadname, func_t<T> func, T data)
:_tid(0)
, _threadname(threadname)
, _isrunning(false)
, _func(func)
, _data(data)
{}
static void *ThreadRoutine(void *args) //pthread_create的第三个参数:void *(*start_routine) (void *),类内方法有一个隐藏的参数:Thread*this,不符合,所以设计成静态方法
{
Thread *ts = static_cast<Thread *>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);//将this作为ThreadRoutine的参数传入
if(n == 0)
{
_isrunning = true;
return true;
}
else return false;
}
bool Join()
{
if(!_isrunning) return true;
int n = pthread_join(_tid, nullptr);
if(n == 0)
{
_isrunning = false;
return true;
}
return false;
}
std::string ThreadName()
{
return _threadname;
}
bool IsRunning()
{
return _isrunning;
}
~Thread()
{}
private:
pthread_t _tid;//线程id
std::string _threadname;//线程名
bool _isrunning;//线程是否允许
func_t<T> _func;//线程要执行的函数
T _data;//线程要执行函数的参数
};
任务Task.hpp:
#pragma once
#include <iostream>
#include <string>
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
const std::string opers = "+-*/%";
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: data_x(x)
, data_y(y)
, oper(op)
, result(defaultvalue)
, code(ok)
{}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
void operator()()
{
Run();
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += " [";
s += std::to_string(code);
s += "]";
return s;
}
~Task()
{}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0: 结果可信 !0: 结果不可信,如1,2,3,4
};
日志Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
enum//日志等级
{
Debug = 0,
Info,
Warning,
Error,
Fatal
};
enum//日志输出方式
{
Screen = 10,//输出到屏幕
OneFile,//输出到一个文件中
ClassFile//输出到不同的文件中
};
std::string LevelToString(int level)
{
switch(level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir = "log";
class Log
{
public:
Log()
:style(defaultstyle)
,filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
}
~Log() {}
void Enable(int sty)//将日志输出方式设置成想要的--如果需要的话
{
style = sty;
}
std::string TimeStampExLocalTime()//获得时间--年月日时分秒
{
time_t currtime = time(nullptr);//获取当前时间--时间戳形式--转化我们日常中喜闻乐见的时间形式
struct tm* curr = localtime(&currtime);//struct tm结构体中包含年月日时分秒等
char time_buffer[128];
snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
curr->tm_year+1900,curr->tm_mon+1,curr->tm_mday,
curr->tm_hour,curr->tm_min,curr->tm_sec);
return time_buffer;
}
void WriteLogToOneFile(const std::string &logname, const std::string &message)//日志内容都往一个文件中打印
{
umask(0);
int fd = open(logname.c_str(),O_CREAT | O_WRONLY |O_APPEND,0666);
if(fd<0)
return;
write(fd,message.c_str(),message.size());
close(fd);
}
void WriteLogToClassFile(const std::string &levelstr, const std::string &message)//根据传入的日志等级,向不同的文件中打印
{
std::string logname = logdir;
logname+='/';
logname+=filename;
logname+=levelstr;
WriteLogToOneFile(logname,message);
}
void WriteLog(const std::string &levelstr, const std::string &message)//日志输出接口
{
switch(style)
{
case Screen:
std::cout << message;
break;
case OneFile:
WriteLogToClassFile("all", message);
break;
case ClassFile:
WriteLogToClassFile(levelstr, message);
break;
default:
break;
}
}
void LogMessage(int level, const char *format, ...) // 类C的一个日志接口 第一个参数-->日志等级 +可变参数
{
char leftbuffer[1024];
std::string levelstr = LevelToString(level);
std::string currtime = TimeStampExLocalTime();
std::string idstr = std::to_string(getpid());
snprintf(leftbuffer,sizeof(leftbuffer),"[%s][%s][%s] ",levelstr.c_str(), currtime.c_str(), idstr.c_str());
char rightbuffer[1024];
va_list args;//类似于 char *, void *,即指针
va_start(args, format);// args 指向了可变参数部分
vsnprintf(rightbuffer,sizeof(rightbuffer), format, args);//将可变参数部分写入rightbuffer
va_end(args); // args = nullptr;
std::string loginfo = leftbuffer;
loginfo += rightbuffer;
WriteLog(levelstr, loginfo);
}
private:
int style;//日志输出方式
std::string filename;//日志文件的名字
};
Log lg;
class Conf
{
public:
Conf()
{
lg.Enable(Screen);
}
~Conf()
{}
};
Conf conf;
锁的封装LockGuard.hpp:
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
:_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
Main.cc:
#include <iostream>
#include <memory>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"
int main()
{
sleep(10);
ThreadPool<Task>::GetInstance()->Start();//启动所有线程去队列中取任务执行
srand((uint64_t)time(nullptr) ^ getpid());
while (true)
{
int x = rand() % 100 + 1;
usleep(1234);
int y = rand() % 200;
usleep(1234);
char oper = opers[rand() % opers.size()];
Task t(x, y, oper);
ThreadPool<Task>::GetInstance()->Push(t);
sleep(1);
}
ThreadPool<Task>::GetInstance()->Wait();
return 0;
}
Makefile:
threadpool:Main.cc
g++ -o $@ $^ -lpthread -std=c++14
.PHONY:clean
clean:
rm -rf threadpool
线程安全的单例模式
什么是单例模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式
什么是设计模式
针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据
饿汉实现方式和懒汉实现方式
为了方便理解什么是饿汉和懒汉,举个洗碗的小栗子:
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭
吃完饭, 先不洗碗, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度
饿汉方式实现单例模式
template<class T>
class Singleton
{
static T data;
public:
static T* GetInstance()
{
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例
懒汉方式实现单例模式
template<class T>
class Singleton
{
static T* p;
public:
static T* GetInstance()
{
if (p == nullptr)
{
p = new T();
}
return p;
}
};
若是如以上代码一样设计,则可能会导致线程安全问题
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例
懒汉方式实现单例模式(线程安全版本)
template<class T>
class Singleton
{
volatile static T* p;//需要使用volatile 关键字, 否则可能被编译器优化--p指针是全局变量,可被多个线程共享
static std::mutex mtx;
public:
static T* GetInstance()
{
if (p == nullptr)// 双重判定空指针, 降低锁冲突的概率, 提高性能
{
mtx.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new
if (p == nullptr)
{
p = new T();
}
mtx.unlock();
}
return p;
}
};
以上代码的注意事项:
1. 加锁解锁的位置
2. 双重 if 判定, 避免不必要的锁竞争
3. volatile关键字防止过度优化
使用volatile关键字,能够做到:每次使用某个变量时必须每次重新从内存中读取值,而不是使用该变量第一次加载到寄存器中的值
关于编译器的优化:
在本次线程内, 当读取一个变量时,为提高存取速度,编译器优化时有时会先把变量读取到一个寄存器中,以后,再取变量值时,就直接从寄存器中取值
当变量值在本线程里改变时,会同时把变量的新值copy到该寄存器中,以便保持一致
但是当变量在其他线程中改变了值,该寄存器的值却不会相应改变,从而导致读取的值和实际的变量值不一致
当该寄存器在别的线程中改变了值,原变量的值不会改变,从而导致读取的值和实际的变量值不一致
被多个线程同时共享的变量,可以使用volatile修饰,以防止编译器的过度优化
STL,智能指针和线程安全
STL中的容器是否是线程安全的?
答案:当然不是啦!
因为STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。而且对于不同的容器, 加锁方式的不同, 性能可能也不同
因此 STL 默认不是线程安全。 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题
对于 shared_ptr, 多个对象需要共用一个引用计数变量,会涉及++/--操作 所以会存在线程安全问题
但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
其他常见的各种锁
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁,当其他线程想要访问数据时,被阻塞挂起
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前, 会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试
读者写者问题(稍作了解)
读写锁
在编写多线程的时候,有这样一种常见情况:有些公共数据修改的机会比较少。相比较改写,它们读的机会反而更频繁,一般来说,在读的过程中,往往伴随着查找的操作,中间耗时很长,给这种代码段加锁,会极大地降低程序的效率,因此,读写锁的出现可以专门处理这种多读少写的情况
注意:写独占,读共享,读锁优先级高
读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
读者写者模型遵守321原则(名字仅是方便记忆):
3:三种关系 读读(读者与读者之间没关系,并发,因为读者不会把数据取走)--写写(写者与写者之间是互斥关系)--读写(读者与写者之间是互斥与同步关系)
2:两种角色:读者和写者(执行流)
1:一个交易场所,即一段内存空间
读者比较多,写者比较少,同时,读场景比较多
自旋锁
一般申请锁失败了,申请线程都会被挂起,在将来等待被唤醒
还有一种情况是,如果申请失败,会直接返回,立马继续重新申请锁,我们一般把这种锁叫做自旋锁
拿日常生活中的小事来说,你在楼下等待你的朋友,我们在等人的时候,如果对方下楼的时间不同,我们等待的策略也会有所变化,比如朋友还要很长一段时间才下楼,那么我们可能会选择先去某个地方做自己的事情,如果朋友马上就要下楼了,那么我们就会在原地等待