Linux信号量与线程池、日志

发布于:2024-04-14 ⋅ 阅读:(157) ⋅ 点赞:(0)

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: 
 pshared:0表示线程间共享,非零表示进程间共享 
 value:信号量初始值 

销毁信号量  

int sem_destroy(sem_t *sem); 

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P() --P操作

发布信号量 

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);//V() --V操作

我们需要知道: 

1.信号量的本质是一把计数器

2.申请信号量的本质就是预订资源

 3.PV操作是原子的

以往把公共资源当做一个整体来使用时,需要mutex(锁),但如果不把公共资源当做一个整体,让多线程不访问临界资源的同一个区域呢?

那么可以这样做:申请信号量(申请成功,就一定有资源可以访问,可以不用再判断资源是否就绪,因为申请的时候就判断了)->访问指定的一个位置(访问临界资源)->释放信号量

基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性

对于“资源”的认识:生产者将空间视为资源,消费者将数据视为资源

多生产者多消费者模型--生产者派发任务--消费者执行任务

LockGuard.hpp   

封装锁

// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock):_lock(lock)
    {}
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }
    ~Mutex()
    {}

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock): _mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.Unlock();
    }
private:
    Mutex _mutex;
};

RingQueue.hpp

环形队列

const int defaultsize = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
public:
     RingQueue(int size = defaultsize)
    : _ringqueue(size)
    , _size(size)
    , _p_step(0)
    , _c_step(0)
    {
        //sem_init的第二个参数设置为0-->线程间共享
        sem_init(&_space_sem, 0, size);//生产者信号量的初始化--信号量本质是资源的计数器,对于生产者而言,资源就是可利用的空间
        sem_init(&_data_sem, 0, 0);//对于消费者而言,资源即数据,初始时未有数据

        pthread_mutex_init(&_p_mutex, nullptr);
        pthread_mutex_init(&_c_mutex, nullptr);
    }

    void Push(const T &in)
    {
        // 生产
        // 先申请信号量,再申请锁--相比先申请锁,再申请信号量,速度快
        P(_space_sem);//生产者申请信号量--相当于资源(可利用空间)数目--
        {
            LockGuard lockGuard(&_p_mutex);//在该代码块内,构造时申请锁,析构时释放锁
            _ringqueue[_p_step] = in;
            _p_step++;
            _p_step %= _size;
        }
        V(_data_sem);//相当于资源(数据)数据++
    }

    void Pop(T *out)
    {
        // 消费
        P(_data_sem);//消费者申请信号量,数据资源的数目--
        {
            LockGuard lockGuard(&_c_mutex);
            *out = _ringqueue[_c_step];
            _c_step++;
            _c_step %= _size;
        }
        V(_space_sem);//空间资源的数目++
    }

    ~RingQueue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_p_mutex);
        pthread_mutex_destroy(&_c_mutex);
    }

private:
    std::vector<T> _ringqueue;
    int _size;

    int _p_step; // 生产者的生产位置
    int _c_step; // 消费者的消费位置

    sem_t _space_sem; // 生产者的信号量
    sem_t _data_sem;  // 消费者的信号量

    pthread_mutex_t _p_mutex;//生产者的锁
    pthread_mutex_t _c_mutex;//消费者的锁
};

Task.hpp

简单的计算任务

const int defaultvalue = 0;

const std::string opers = "+-*/%)(&%^";

enum
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op)
    : data_x(x)
    , data_y(y)
    , oper(op)
    , result(defaultvalue)
    , code(ok)
    {}

    void Run()
    {
        switch (oper)
        {
        case '+':
            result = data_x + data_y;
            break;
        case '-':
            result = data_x - data_y;
            break;
        case '*':
            result = data_x * data_y;
            break;
        case '/':
        {
            if (data_y == 0)
                code = div_zero;
            else
                result = data_x / data_y;
        }
        break;
        case '%':
        {
            if (data_y == 0)
                code = mod_zero;
            else
                result = data_x % data_y;
        }
        break;
        default:
            code = unknow;
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=?";

        return s;
    }

    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=";
        s += std::to_string(result);
        s += " [";
        s += std::to_string(code);
        s += "]";

        return s;
    }

    ~Task()
    {}

private:
    int data_x;
    int data_y;
    char oper; // + - * / %

    int result;
    int code; // 结果码,0: 结果可信 !0: 结果不可信
};

Makefile

testringqueue:Main.cc
	g++ -o $@ $^ -lpthread
.PHONY:clean
clean:
	rm testringqueue

Main.cc

测试代码

void *Productor(void *args)//生产者派发任务
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        sleep(1);
        // 1. 有数据
        int data1 = rand() % 10; 
        usleep(rand() % 123);
        int data2 = rand() % 10;
        usleep(rand() % 123);
        char oper = opers[rand() % (opers.size())];
        Task t(data1, data2, oper);
        std::cout << "productor task: " << t.PrintTask() << std::endl;

        rq->Push(t);
    }
}

void *Consumer(void *args)//消费者执行任务
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->Pop(&t);

        t();//执行任务
        std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;
    }
}

int main()
{
    srand((uint64_t)time(nullptr) ^ pthread_self());
    pthread_t c[3], p[2];

    //环形队列
    RingQueue<Task> *rq = new RingQueue<Task>();
    pthread_create(&p[0], nullptr, Productor, rq);
    pthread_create(&p[1], nullptr, Productor, rq);
    pthread_create(&c[0], nullptr, Consumer, rq);
    pthread_create(&c[1], nullptr, Consumer, rq);
    pthread_create(&c[2], nullptr, Consumer, rq);

    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(c[2], nullptr);

    return 0;
}

在介绍线程池之前,我们先聊聊题外话--日志

日志:1. 可以向显示器打印,也可以向文件中写入 2. 日志包括-->时间,内容,日志等级,文件名,代码行数,进程pid...

localtime:

 vsnprintf:

 va_start

va_end

设计日志:

Log.hpp:
#pragma once

#include <iostream>
#include <fstream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

enum//日志等级
{
    Debug = 0,
    Info,
    Warning,
    Error,
    Fatal
};

enum//日志输出方式
{
    Screen = 10,//输出到屏幕
    OneFile,//输出到一个文件中
    ClassFile//输出到不同的文件中
};

std::string LevelToString(int level)
{
    switch(level)
    {
    case Debug:
        return "Debug";
    case Info:
        return "Info";
    case Warning:
        return "Warning";
    case Error:
        return "Error";
    case Fatal:
        return "Fatal";
    default:
        return "Unknown";
    }
}

const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir = "log";

class Log
{
public:
    Log()
    :style(defaultstyle)
    ,filename(default_filename)
    {
        mkdir(logdir.c_str(), 0775);
    }

    ~Log() {}

    void Enable(int sty)//将日志输出方式设置成想要的--如果需要的话
    {
        style = sty;
    }

    std::string TimeStampExLocalTime()//获得时间--年月日时分秒
    {
        time_t currtime = time(nullptr);//获取当前时间--时间戳形式--转化我们日常中喜闻乐见的时间形式
        struct tm* curr = localtime(&currtime);//struct tm结构体中包含年月日时分秒等
        char time_buffer[128];
        snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
        curr->tm_year+1900,curr->tm_mon+1,curr->tm_mday,
        curr->tm_hour,curr->tm_min,curr->tm_sec);
        return time_buffer;
    }

    void WriteLogToOneFile(const std::string &logname, const std::string &message)//日志内容都往一个文件中打印
    {
        umask(0);
        int fd = open(logname.c_str(),O_CREAT | O_WRONLY |O_APPEND,0666);
        if(fd<0)
            return;
        
        write(fd,message.c_str(),message.size());
        close(fd);
    }

    void WriteLogToClassFile(const std::string &levelstr, const std::string &message)//根据传入的日志等级,向不同的文件中打印
    {
        std::string logname = logdir;
        logname+='/';
        logname+=filename;
        logname+=levelstr;

        WriteLogToOneFile(logname,message);
    }

    void WriteLog(const std::string &levelstr, const std::string &message)//日志输出接口
    {
        switch(style)
        {
        case Screen:
            std::cout << message;
            break;
        case OneFile:
            WriteLogToClassFile("all", message);
            break;
        case ClassFile:
            WriteLogToClassFile(levelstr, message);
            break;
        default:
            break;
        }
    }

    void LogMessage(int level, const char *format, ...) // 类C的一个日志接口 第一个参数-->日志等级 +可变参数
    {
        char leftbuffer[1024];
        std::string levelstr = LevelToString(level);
        std::string currtime = TimeStampExLocalTime();
        std::string idstr = std::to_string(getpid());
        snprintf(leftbuffer,sizeof(leftbuffer),"[%s][%s][%s] ",levelstr.c_str(), currtime.c_str(), idstr.c_str());


        char rightbuffer[1024];
        va_list args;//类似于 char *, void *,即指针
        va_start(args, format);// args 指向了可变参数部分
        vsnprintf(rightbuffer,sizeof(rightbuffer), format, args);//将可变参数部分写入rightbuffer
        va_end(args); // args = nullptr;

        std::string loginfo = leftbuffer;
        loginfo += rightbuffer;
        WriteLog(levelstr, loginfo);
    }

private:
    int style;//日志输出方式
    std::string filename;//日志文件的名字
};

Makefile: 

testLog:Main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -rf testLog

Main.cc:

#include <iostream>
#include "Log.hpp"

int main()
{
    Log lg;
    lg.Enable(OneFile);
    lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
    lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
    return 0;
}

 

线程池 

线程池: 是一种线程使用模式

线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

线程池的应用场景:

1. 需要大量的线程来完成任务,且完成任务的时间比较短

2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求

3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用

(突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误)

线程池示例:

1. 创建固定数量线程池,循环从任务队列中获取任务对象

2. 获取到任务对象后,执行任务对象中的任务接口

下面设计一个简单的单例线程池:

现在我们设定:队列中的任务就是简单的加减乘除取模运算

线程池ThreadPool.hpp:
#pragma once

#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"

static const int defaultnum = 5;//默认预先创建好5个线程

class ThreadData
{
public:
    ThreadData(const std::string &name) 
    : threadname(name)
    {}
    ~ThreadData()
    {}
public:
    std::string threadname;
};

template <class T>
class ThreadPool//线程池
{
private:
    ThreadPool(int thread_num = defaultnum)
    :_thread_num(thread_num)
    {
        pthread_mutex_init(&_mutex,nullptr);//初始化锁
        pthread_cond_init(&_cond,nullptr);//初始化条件变量
        // 构建指定个数的线程
        for(int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread-";
            threadname+=std::to_string(i+1);
            ThreadData td(threadname);//线程的名字
            _threads.emplace_back(threadname,
                                  std::bind(&ThreadPool<T>::ThreadRun, this,
                                            std::placeholders::_1),
                                  td);//插入线程
            lg.LogMessage(Info, "%s is created...\n", threadname.c_str());//打印日志
        }
    }

    ThreadPool(const ThreadPool<T> &tp)=delete;//禁用拷贝构造
    const ThreadPool<T> &operator=(const ThreadPool<T> tp)=delete;//禁用赋值重载
public:
    static ThreadPool<T> *GetInstance()//得到唯一的线程池的指针
    {
        if(instance==nullptr)
        {
            LockGuard lockguard(&sig_lock);
            if (instance == nullptr)
            {
                lg.LogMessage(Info, "创建单例成功...\n");
                instance = new ThreadPool<T>();
            }
        }
        return instance;
    }

    bool Start()//启动所有线程
    {
        for (auto &thread : _threads)
        {
            thread.Start();
            lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
        }
        return true;
    }

    void ThreadWait(const ThreadData &td)
    {
        lg.LogMessage(Debug, "no task, %s is sleeping...\n", td.threadname.c_str());
        pthread_cond_wait(&_cond, &_mutex);//等待条件变量就绪(即交易场所/队列中没有任务/数据可被线程拿走执行),线程会先解锁-->等待
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);//条件变量准备就绪(即交易场所/队列中有任务/数据可被线程拿走执行),每次唤醒一个线程
    }

    void ThreadRun(ThreadData &td)
    {
        while (true)
        {
            //取任务
            T t;
            {
                LockGuard lockguard(&_mutex);//取交易场所/队列取任务/数据要先申请锁
                while(_q.empty())//队列无任务--线程进入等待
                {
                    ThreadWait(td);
                    lg.LogMessage(Debug, "thread %s is wakeup\n", td.threadname.c_str());
                }
                t = _q.front();
                _q.pop();
            }
            //处理任务
            t();//在测试时,T为Task类型,重载了(),自动调用Task类的Run(),完成简单的加减乘除计算任务
            lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
                          td.threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str());
        }
    }

    void Push(T &in)//生产/插入任务
    {
        lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
        LockGuard lockguard(&_mutex);//去交易场所/队列生产任务也要先申请锁
        _q.push(in);
        ThreadWakeup();//队列中有任务了,进行通知
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
        }
    }

private:
    std::queue<T> _q;//队列--提供场所
    std::vector<Thread<ThreadData>> _threads;//预先要创建好一批线程
    int _thread_num;
    pthread_mutex_t _mutex;//锁--实现互斥机制--交易场所(队列)要使用到的锁
    pthread_cond_t _cond;//条件变量--实现同步机制

    static ThreadPool<T>*instance;//要实现一个线程池单例
    static pthread_mutex_t sig_lock;//锁--得到指向线程池的指针instance需要用到的锁,因为instance全局有效,可被共享访问,属于临界资源,所以需要加锁保护
};

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;//全局锁的初始化
线程Thread.hpp:
#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>

template<class T>
using func_t = std::function<void(T&)>;

template<class T>
class Thread
{
public:
    Thread(const std::string &threadname, func_t<T> func, T data)
    :_tid(0)
    , _threadname(threadname)
    , _isrunning(false)
    , _func(func)
    , _data(data)
    {}

    static void *ThreadRoutine(void *args) //pthread_create的第三个参数:void *(*start_routine) (void *),类内方法有一个隐藏的参数:Thread*this,不符合,所以设计成静态方法
    {
        Thread *ts = static_cast<Thread *>(args);

        ts->_func(ts->_data);

        return nullptr;
    }

    bool Start()
    {
        int n = pthread_create(&_tid, nullptr, ThreadRoutine, this);//将this作为ThreadRoutine的参数传入
        if(n == 0) 
        {
            _isrunning = true;
            return true;
        }
        else return false;
    }

    bool Join()
    {
        if(!_isrunning) return true;
        int n = pthread_join(_tid, nullptr);
        if(n == 0)
        {
            _isrunning = false;
            return true;
        }
        return false;
    }

    std::string ThreadName()
    {
        return _threadname;
    }

    bool IsRunning()
    {
        return _isrunning;
    }

    ~Thread()
    {}
private:
    pthread_t _tid;//线程id
    std::string _threadname;//线程名
    bool _isrunning;//线程是否允许
    func_t<T> _func;//线程要执行的函数
    T _data;//线程要执行函数的参数
};




任务Task.hpp:
#pragma once
#include <iostream>
#include <string>

const int defaultvalue = 0;

enum
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};

const std::string opers = "+-*/%";

class Task
{

public:
    Task()
    {}

    Task(int x, int y, char op)
    : data_x(x)
    , data_y(y)
    , oper(op)
    , result(defaultvalue)
    , code(ok)
    {}

    void Run()
    {
        switch (oper)
        {
        case '+':
            result = data_x + data_y;
            break;
        case '-':
            result = data_x - data_y;
            break;
        case '*':
            result = data_x * data_y;
            break;
        case '/':
        {
            if (data_y == 0)
                code = div_zero;
            else
                result = data_x / data_y;
        }
        break;
        case '%':
        {
            if (data_y == 0)
                code = mod_zero;
            else
                result = data_x % data_y;
        }
        break;
        default:
            code = unknow;
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=?";

        return s;
    }

    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(data_x);
        s += oper;
        s += std::to_string(data_y);
        s += "=";
        s += std::to_string(result);
        s += " [";
        s += std::to_string(code);
        s += "]";

        return s;
    }

    ~Task()
    {}
private:
    int data_x;
    int data_y;
    char oper; // + - * / %

    int result;
    int code; // 结果码,0: 结果可信 !0: 结果不可信,如1,2,3,4
};
日志Log.hpp:
#pragma once

#include <iostream>
#include <fstream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

enum//日志等级
{
    Debug = 0,
    Info,
    Warning,
    Error,
    Fatal
};

enum//日志输出方式
{
    Screen = 10,//输出到屏幕
    OneFile,//输出到一个文件中
    ClassFile//输出到不同的文件中
};

std::string LevelToString(int level)
{
    switch(level)
    {
    case Debug:
        return "Debug";
    case Info:
        return "Info";
    case Warning:
        return "Warning";
    case Error:
        return "Error";
    case Fatal:
        return "Fatal";
    default:
        return "Unknown";
    }
}

const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir = "log";

class Log
{
public:
    Log()
    :style(defaultstyle)
    ,filename(default_filename)
    {
        mkdir(logdir.c_str(), 0775);
    }

    ~Log() {}

    void Enable(int sty)//将日志输出方式设置成想要的--如果需要的话
    {
        style = sty;
    }

    std::string TimeStampExLocalTime()//获得时间--年月日时分秒
    {
        time_t currtime = time(nullptr);//获取当前时间--时间戳形式--转化我们日常中喜闻乐见的时间形式
        struct tm* curr = localtime(&currtime);//struct tm结构体中包含年月日时分秒等
        char time_buffer[128];
        snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
        curr->tm_year+1900,curr->tm_mon+1,curr->tm_mday,
        curr->tm_hour,curr->tm_min,curr->tm_sec);
        return time_buffer;
    }

    void WriteLogToOneFile(const std::string &logname, const std::string &message)//日志内容都往一个文件中打印
    {
        umask(0);
        int fd = open(logname.c_str(),O_CREAT | O_WRONLY |O_APPEND,0666);
        if(fd<0)
            return;
        
        write(fd,message.c_str(),message.size());
        close(fd);
    }

    void WriteLogToClassFile(const std::string &levelstr, const std::string &message)//根据传入的日志等级,向不同的文件中打印
    {
        std::string logname = logdir;
        logname+='/';
        logname+=filename;
        logname+=levelstr;

        WriteLogToOneFile(logname,message);
    }

    void WriteLog(const std::string &levelstr, const std::string &message)//日志输出接口
    {
        switch(style)
        {
        case Screen:
            std::cout << message;
            break;
        case OneFile:
            WriteLogToClassFile("all", message);
            break;
        case ClassFile:
            WriteLogToClassFile(levelstr, message);
            break;
        default:
            break;
        }
    }

    void LogMessage(int level, const char *format, ...) // 类C的一个日志接口 第一个参数-->日志等级 +可变参数
    {
        char leftbuffer[1024];
        std::string levelstr = LevelToString(level);
        std::string currtime = TimeStampExLocalTime();
        std::string idstr = std::to_string(getpid());
        snprintf(leftbuffer,sizeof(leftbuffer),"[%s][%s][%s] ",levelstr.c_str(), currtime.c_str(), idstr.c_str());


        char rightbuffer[1024];
        va_list args;//类似于 char *, void *,即指针
        va_start(args, format);// args 指向了可变参数部分
        vsnprintf(rightbuffer,sizeof(rightbuffer), format, args);//将可变参数部分写入rightbuffer
        va_end(args); // args = nullptr;

        std::string loginfo = leftbuffer;
        loginfo += rightbuffer;
        WriteLog(levelstr, loginfo);
    }

private:
    int style;//日志输出方式
    std::string filename;//日志文件的名字
};

Log lg;

class Conf
{
public:
    Conf()
    {
        lg.Enable(Screen);
    }
    ~Conf()
    {}
};

Conf conf;
锁的封装LockGuard.hpp:
#pragma once

#include <pthread.h>

// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock)
    :_lock(lock)
    {}
    void Lock()
    {
        pthread_mutex_lock(_lock);
    }
    void Unlock()
    {
        pthread_mutex_unlock(_lock);
    }
    ~Mutex()
    {}

private:
    pthread_mutex_t *_lock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock)
    : _mutex(lock)
    {
        _mutex.Lock();
    }
    ~LockGuard()
    {
        _mutex.Unlock();
    }
private:
    Mutex _mutex;
};
Main.cc:
#include <iostream>
#include <memory>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"

int main()
{
    sleep(10);
    ThreadPool<Task>::GetInstance()->Start();//启动所有线程去队列中取任务执行
    srand((uint64_t)time(nullptr) ^ getpid());
    while (true)
    {
        int x = rand() % 100 + 1;
        usleep(1234);
        int y = rand() % 200;
        usleep(1234);
        char oper = opers[rand() % opers.size()];
        Task t(x, y, oper);
        ThreadPool<Task>::GetInstance()->Push(t);
        sleep(1);
    }

   ThreadPool<Task>::GetInstance()->Wait();

    return 0;
}
Makefile:
threadpool:Main.cc
	g++ -o $@ $^ -lpthread -std=c++14
.PHONY:clean
clean:
	rm -rf threadpool

线程安全的单例模式

什么是单例模式

单例模式是一种 "经典的, 常用的, 常考的" 设计模式

什么是设计模式

针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式

单例模式的特点

某些类, 只应该具有一个对象(实例), 就称之为单例

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据

饿汉实现方式和懒汉实现方式

为了方便理解什么是饿汉和懒汉,举个洗碗的小栗子:

吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭

吃完饭, 先不洗碗, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式

懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度

饿汉方式实现单例模式

template<class T>
class Singleton
{
	static T data;
public:
	static T* GetInstance()
	{
		return &data;
	}
};

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例

懒汉方式实现单例模式 

template<class T>
class Singleton
{
	static T* p;
public:
	static T* GetInstance()
	{
		if (p == nullptr)
		{
			p = new T();
		}
		return p;
	}
};

若是如以上代码一样设计,则可能会导致线程安全问题

第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例

懒汉方式实现单例模式(线程安全版本) 

template<class T>
class Singleton
{
	volatile static T* p;//需要使用volatile 关键字, 否则可能被编译器优化--p指针是全局变量,可被多个线程共享
	static std::mutex mtx;
public:
	static T* GetInstance()
	{
		if (p == nullptr)// 双重判定空指针, 降低锁冲突的概率, 提高性能
		{
			mtx.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new
			if (p == nullptr)
			{
				p = new T();
			}
			mtx.unlock();
		}
		return p;
	}
};

以上代码的注意事项:

1. 加锁解锁的位置

2. 双重 if 判定, 避免不必要的锁竞争

3. volatile关键字防止过度优化  

使用volatile关键字,能够做到:每次使用某个变量时必须每次重新从内存中读取值,而不是使用该变量第一次加载到寄存器中的值

关于编译器的优化:

在本次线程内, 当读取一个变量时,为提高存取速度,编译器优化时有时会先把变量读取到一个寄存器中,以后,再取变量值时,就直接从寄存器中取值

当变量值在本线程里改变时,会同时把变量的新值copy到该寄存器中,以便保持一致

但是当变量在其他线程中改变了值,该寄存器的值却不会相应改变,从而导致读取的值和实际的变量值不一致

当该寄存器在别的线程中改变了值,原变量的值不会改变,从而导致读取的值和实际的变量值不一致

被多个线程同时共享的变量,可以使用volatile修饰,以防止编译器的过度优化

STL,智能指针和线程安全

STL中的容器是否是线程安全的?

答案:当然不是啦!

因为STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。而且对于不同的容器, 加锁方式的不同, 性能可能也不同

因此 STL 默认不是线程安全。 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全

智能指针是否是线程安全的? 

对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题

对于 shared_ptr, 多个对象需要共用一个引用计数变量,会涉及++/--操作 所以会存在线程安全问题

但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数 

 其他常见的各种锁

悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁,当其他线程想要访问数据时,被阻塞挂起

乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前, 会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作

CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试

读者写者问题(稍作了解)

读写锁

在编写多线程的时候,有这样一种常见情况:有些公共数据修改的机会比较少。相比较改写,它们读的机会反而更频繁,一般来说,在读的过程中,往往伴随着查找的操作,中间耗时很长,给这种代码段加锁,会极大地降低程序的效率,因此,读写锁的出现可以专门处理这种多读少写的情况

 注意:写独占,读共享,读锁优先级高

读写锁接口

设置读写优先

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); 
/* 
pref 共有 3 种选择 
 
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况 
 
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和 
PTHREAD_RWLOCK_PREFER_READER_NP 一致 
 
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁 
*/ 

初始化 

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t 
*restrict attr); 

销毁  

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 

加锁和解锁  

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); 
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); 
 
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); 

读者写者模型遵守321原则(名字仅是方便记忆):

3:三种关系   读读(读者与读者之间没关系,并发,因为读者不会把数据取走)--写写(写者与写者之间是互斥关系)--读写(读者与写者之间是互斥与同步关系)

2:两种角色:读者和写者(执行流)

1:一个交易场所,即一段内存空间

读者比较多,写者比较少,同时,读场景比较多

 自旋锁

一般申请锁失败了,申请线程都会被挂起,在将来等待被唤醒

还有一种情况是,如果申请失败,会直接返回,立马继续重新申请锁,我们一般把这种锁叫做自旋锁

拿日常生活中的小事来说,你在楼下等待你的朋友,我们在等人的时候,如果对方下楼的时间不同,我们等待的策略也会有所变化,比如朋友还要很长一段时间才下楼,那么我们可能会选择先去某个地方做自己的事情,如果朋友马上就要下楼了,那么我们就会在原地等待


网站公告

今日签到

点亮在社区的每一天
去签到