【Linux】生产消费模型

发布于:2025-02-18 ⋅ 阅读:(111) ⋅ 点赞:(0)

目录

一、生产消费模型

(一)概念

(二)三种关系

(三)特点

二、基于阻塞队列的生产消费模型

三、基于环形队列的生产消费模型

(一)信号量       

1、概念

2、信号量的使用

(1)信号量的创建

(2)信号量的使用

(3)信号量的销毁

(二)生产消费模型

四、线程池 

(一)概念

(二)线程池

1、thread.hpp

2、pthreadpool.hpp

3、task.hpp

4、main.cpp


一、生产消费模型

(一)概念

        在传统程序中,程序的执行是串行且强耦合。生产消费模型是一种经典的线程同步机制,通过使用一个中间容器使得生成和消费两个过程解耦。生产者与消费者彼此并不直接通信,而是通过中间容器进行通信。

        通过这一中间容器,生产者生产的数据直接可放入容器中无需直接等待消费者消费数据,而消费者消费的数据可直接从容器取出而无需直接等待生产者生产数据。

(二)三种关系

        生产消费模型作为经典的线程同步机制,在实现过程中若没有同步互斥控制则会产生许多冲突问题,例如:生产者并没有将数据完全放入中间容器而消费者直接将数据拿走消费等。

        因此在实现生产消费模型需要注意三种关系:

        1、生产者与生产者之间的互斥关系;

        2、消费者与消费者之间的互斥关系;

        3、生产者与消费者之间的同步、互斥关系。

(三)特点

        生产消费模型在计算机并发编程中是一种非常重要的设计模式,它解决了生产者和消费者之间如何协调工作、如何安全地共享资源以及如何确保程序的高效运行。通过适当的同步机制,可以避免死锁、竞争条件等问题,确保生产者和消费者能够在多线程环境中高效、正确地协作。

二、基于阻塞队列的生产消费模型

        在多线程编程中阻塞队列是一种常用的实现生产消费的数据结构。它与普通队列的区别为:当队列为空时,从队列获取元素会被阻塞,直到插入元素后被唤醒;当队列为满时,向队列插入元素会被则是,直到取出元素后被唤醒。

        其实较于普通队列,阻塞队列也只是在其基础上增加了同步互斥机制,本文在实现该阻塞队列采用互斥锁以及条件变量。

#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
template <class T>
class blockqueue
{
public:
    blockqueue(int maxcapacity = 5)
        : _capacity(maxcapacity)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    ~blockqueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    void push(const T &data)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
        {
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(data);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T *data)
    {
        pthread_mutex_lock(&_mutex);
        while (is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *data = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    bool is_full()
    {
        return _q.size() == _capacity;
    }
    bool is_empty()
    {
        return _q.size() == 0;
    }

public:
    queue<T> _q;            // 阻塞队列
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _pcond;  // 生产者条件变量
    pthread_cond_t _ccond;  // 消费者条件变量
    int _capacity;          // 缓冲区容量
};

三、基于环形队列的生产消费模型

(一)信号量       

1、概念

        在之前我们解决同步互斥问题时,通常使用互斥锁和条件变量来控制线程同步。问题在于,当线程访问共享资源时,必须将整个资源加锁,导致即使线程只访问共享资源中的一部分(如资源A或资源B),也必须等待其他线程完成对整个资源的访问。这种方式可能会降低多线程的执行效率。为了解决这个问题,我们可以使用信号量,允许更加细粒度的控制:比如,对于一个数组而言,当使用互斥锁以及条件变量来控制线程同步,每当有线程访问时都必须将整个数组进行加锁,但采用信号量的方式,若多个线程的访问位置不同,实际可以使多个线程访问不同位置,更进一步提高效率。

2、信号量的使用

(1)信号量的创建
NAME
       sem_init - initialize an unnamed semaphore
SYNOPSIS
       #include <semaphore.h>

       int sem_init(sem_t *sem, int pshared, unsigned int value);

       Link with -pthread.
RETURN VALUE
       sem_init() returns 0 on success; on error, -1 is 
returned, and errno is set to indicate the error.

用途:sem_init()是一个信号量初始化函数,用于初始化一个未命名的信号量。

参数:

sem:指向要初始化的信号量的指针。
pshared:指定信号量的类型。如果pshared为0,表示信号量只能用于同一进程内的线程之间的同步;如果pshared为非零值,表示信号量可用于多个进程之间的同步。
value:指定信号量的初始值。
返回值:

如果sem_init()函数调用成功,返回值为0。
如果调用失败,返回值为-1,并设置errno为相应的错误代码。

(2)信号量的使用
NAME
       sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS
       #include <semaphore.h>
       int sem_wait(sem_t *sem);
       int sem_trywait(sem_t *sem);
       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
       Link with -pthread.

RETURN VALUE
       All of these functions return 0 on success; on error, the value of 
the semaphore is left unchanged, -1 is returned, and errno is set to 
indicate the error.

用途:sem_wait()相当于 P() 操作,将信号量的值减一,当信号量为0时,调用sem_wait()会被阻塞,直到信号量的值大于0.

参数:

sem:指向要初始化的信号量的指针。
返回值:

如果 sem_wait() 函数调用成功,返回值为0。
如果调用失败,返回值为-1,并设置errno为相应的错误代码。

NAME
       sem_post - unlock a semaphore
SYNOPSIS
       #include <semaphore.h>
       int sem_post(sem_t *sem);
       Link with -pthread.

RETURN VALUE
       sem_post() returns 0 on success; on error, the value of the semaphore 
is left unchanged, -1 is returned, and errno is set  to indicate the error.

用途:sem_post()相当于 V() 操作,将信号量的值加一,当有线程因为信号量为0被阻塞时,调用sem_post()会将其唤醒。

参数:

sem:指向要初始化的信号量的指针。
返回值:

如果 sem_wait() 函数调用成功,返回值为0。
如果调用失败,返回值为-1,并设置errno为相应的错误代码。

(3)信号量的销毁
NAME
       sem_destroy - destroy an unnamed semaphore
SYNOPSIS
       #include <semaphore.h>
       int sem_destroy(sem_t *sem);
       Link with -pthread.

RETURN VALUE
       sem_destroy() returns 0 on success; on error, -1 is returned, 
and errno is set to indicate the error.

用途:sem_destroy()是一个信号量销毁函数,用于销毁已初始化的信号量。

参数:

sem:指向要初始化的信号量的指针。
返回值:

如果sem_destroy()函数调用成功,返回值为0。
如果调用失败,返回值为-1,并设置errno为相应的错误代码。

(二)生产消费模型

        基于环形队列的生产消费模型与上文的阻塞队列相比只是中间容器不同。环形队列底层数据结构采用数组,同步互斥机制则采用信号量的机制。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <vector>
using namespace std;
template <class T>
class ringqueue
{
private:
    void P(sem_t &_sem)
    {
        int n = sem_wait(&_sem);
        if (n != 0)
        {
            cout << strerror(errno) << endl;
        }
    }
    void V(sem_t &_sem)
    {
        int n = sem_post(&_sem);
        if (n != 0)
        {
            cout << strerror(errno) << endl;
        }
    }

public:
    ringqueue(int cap = 5) : _queue(cap), _cap(cap)
    {
        pthread_mutex_init(&_pMutex, nullptr);
        pthread_mutex_init(&_cMutex, nullptr);
        _pStep = _cStep = 0;
        sem_init(&_dateSem, 0, 0);
        sem_init(&_spaceSem, 0, _cap);
    }
    void push(const T &task)
    {
        P(_spaceSem);
        pthread_mutex_lock(&_pMutex);
        _queue[_pStep] = task;
        _pStep = (_pStep + 1) % _cap;
        pthread_mutex_unlock(&_pMutex);
        V(_dateSem);
    }
    void pop(T &task)
    {
        P(_dateSem);
        pthread_mutex_lock(&_cMutex);
        task = _queue[_cStep];
        _cStep = (_cStep + 1) % _cap;
        pthread_mutex_unlock(&_cMutex);
        V(_spaceSem);
    }
    ~ringqueue()
    {
        pthread_mutex_destroy(&_pMutex);
        pthread_mutex_destroy(&_cMutex);
        sem_destroy(&_dateSem);
        sem_destroy(&_spaceSem);
    }

private:
    vector<T> _queue;
    int _cap;
    int _pStep;
    int _cStep;
    sem_t _dateSem;
    sem_t _spaceSem;
    pthread_mutex_t _pMutex;
    pthread_mutex_t _cMutex;
};

四、线程池 

(一)概念

        在一些场景下可能会频繁创建线程,如果我们仅当需要线程时再去创建的话会影响缓存局部性和整体性能。因此我们可以在程序开始时就创建大量的线程以供用户调度使用,从而提高程序性能。

        线程池是一种线程使用模式,其避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利 用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

        线程池的应用场景:

        1、需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技 术是非常合适的。因为单个任务小,而任务数量巨大。但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

        2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

        3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误。

(二)线程池

1、thread.hpp

#include <iostream>
#include <pthread.h>
#include <functional>
#include <string>
using namespace std;
class Thread
{
    typedef std::function<void *(void *)> func_t;

private:
    static void *start_routine(void *arg)
    {
        Thread *th = static_cast<Thread *>(arg);
        th->callback(th->_arg);
    }
    void *callback(void *arg)
    {
        return _func(arg);
    }

public:
    void start(func_t func, void *arg = nullptr)
    {
        _func = func;
        _arg = arg;
        pthread_create(&_tid, nullptr, start_routine, this);
    }
    void join()
    {
        pthread_join(_tid, nullptr);
    }
    ~Thread()
    {
        join();
    }

private:
    func_t _func;
    pthread_t _tid;
    void *_arg;
};

2、pthreadpool.hpp

#include <vector>
#include <queue>
#include "thread.hpp"
template <class T>
class pthreadPool
{
private:
    void pop(T &date)
    {
        date = _tasks.front();
        _tasks.pop();
    }

    static void *handlerTask(void *arg)
    {
        pthreadPool *th = static_cast<pthreadPool *>(arg);
        while (1)
        {
            pthread_mutex_lock(&(th->_mutex));
            while (th->_tasks.empty())
                pthread_cond_wait(&(th->_cond), &(th->_mutex));
            T task;
            th->pop(task);
            cout << pthread_self() << "获得任务:" << task.toTaskString() << endl;
            cout << "计算结果为:" << task() << endl;
            pthread_mutex_unlock(&(th->_mutex));
        }
        pthread_mutex_lock(&(th->_mutex));
    }

public:
    pthreadPool(int num = 3) : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < _num; ++i)
        {
            _threads.push_back(new Thread());
        }
    }
    ~pthreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (int i = 0; i < _num; ++i)
        {
            delete _threads[i];
        }
    }
    void run()
    {
        for (int i = 0; i < _num; ++i)
        {
            _threads[i]->start(handlerTask, this);
        }
    }
    void push(const T &date)
    {
        pthread_mutex_lock(&_mutex);
        _tasks.push(date);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_mutex);
    }
    static pthreadPool<T> *getInstance()
    {
        if (_tp == nullptr)
        {
            pthread_mutex_lock(&_sin);
            if (_tp == nullptr)
            {
                _tp = new pthreadPool(3);
            }
            pthread_mutex_unlock(&_sin);
        }
        return _tp;
    }
    pthreadPool(const pthreadPool<T> &tp) = delete;
    pthreadPool<T> operator=(pthreadPool<T>) = delete;

private:
    int _num;
    vector<Thread *> _threads;
    queue<T> _tasks;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static pthreadPool<T> *_tp;
    static pthread_mutex_t _sin;
};
template <class T>
pthreadPool<T> *pthreadPool<T>::_tp = nullptr;

template <class T>
pthread_mutex_t pthreadPool<T>::_sin = PTHREAD_MUTEX_INITIALIZER;

3、task.hpp

#include <iostream>
#include <string>
using namespace std;
class calcTask
{
public:
    calcTask() {}
    calcTask(const int &x, const int &y, char &op)
        : _x(x), _y(y), _op(op)
    {
    }
    int mymath(int x, int y, char op)
    {
        int result = 0;
        switch (op)
        {
        case '+':
            result = x + y;
            break;
        case '-':
            result = x - y;
            break;
        case '*':
            result = x * y;
            break;
        case '/':
            if (y == 0)
            {
                cerr << "除零异常" << endl;
                result = -1;
            }
            else
                result = x / y;
            break;
        case '%':
            if (y == 0)
            {
                cerr << "模零异常" << endl;
                result = -1;
            }
            else
                result = x % y;
            break;
        default:
            break;
        }
        return result;
    }
    string operator()()
    {
        int result = mymath(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }
    int _x;
    int _y;
    char _op;
};

4、main.cpp

#include "pthreadpool.hpp"
#include "task.hpp"
#include <unistd.h>
int main()
{
    pthreadPool<calcTask> *tp = pthreadPool<calcTask>::getInstance();
    tp->run();
    while (1)
    {
        int x, y;
        char op;
        cin >> x >> op >> y;
        calcTask t(x, y, op);
        tp->push(t);
        sleep(1);
    }
    return 0;
}