Linux篇19多线程第三部分

发布于:2022-12-28 ⋅ 阅读:(508) ⋅ 点赞:(0)

1.线程同步

概念:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。

  • 下面我们来解释一下什么叫做“饥饿问题”

    还是举我们之前用过的例子。假如有一个 自习室,每次只允许一个人进去自习,自习室门外面的墙上挂着钥匙。ABCD几个人都想来上自习,A第一个来的,他拿着钥匙进入自习室并将门反锁了,后来的几个人BCD就只能在外面等着了。学了一会呢,A出来了,将钥匙挂在墙上了,可就在这时A突然又想继续学,他就又拿着钥匙进去学了,然后学了几分钟,A又坐不住了要出去,可是他一出去就又想再进入自习室。由于A离钥匙最近,别人争不过他,那么BCD就只能等,而A一直在重复这个过程,也没有好好地学习。这样就导致了BCD的“饥饿”。显然,这种做饭虽然保证了“互斥”,但是效率实在是低下。为了改善这种状况,我们修改规则,任何一个从自习室出来的人如果想要再进入 自习室,要重新排队,这样其他人就有效避免了“饥饿问题”。

2.条件变量

条件变量,是用来描述某种临界资源是否就绪的一种数据化描述。通常,条件变量需要与mutex互斥锁一起使用

我们先来认识一组条件变量的接口

  • 初始化

    #include <pthread.h>
    //方式1     
    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
    //cond:要初始化的条件变量       attr:通常设置为NULL
    
    //方式2
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER
    
    
  • 销毁

    #include <pthread.h>
    int pthread_cond_destroy(pthread_cond_t *cond);
    //cond:要销毁的条件变量
    
    
  • 等待条件满足

    #include <pthread.h>
    int pthread_cond_wait(pthread_cond_t *restrict cond,  pthread_mutex_t *restrict mutex);
    // cond 在cond条件下等待   
    //mutex:配合使用的互斥锁.通常我们在使用条件等待接口时,已经加锁了,如果等待,这个接口就可以自动解锁,让其他线程可以访问临界资源。如果被唤醒,这个接口又会重新进行加锁
    

    为什么该接口需要互斥锁?

    1. 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件
      变量上的线程。
    2. 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
  • 唤醒等待

    #include <pthread.h>
    int pthread_cond_signal(pthread_cond_t *cond);
    //cond:要唤醒的条件变量
    

接下来我们来一个测试用例,使用一下这些接口

#include <iostream>
#include <pthread.h>

using namespace std;
pthread_mutex_t lock;
pthread_cond_t cond;

void* Run(void* arg)
{
  pthread_detach(pthread_self());
  const char* msg = (char*)arg;
  while(1)
  {
    pthread_cond_wait(&cond, &lock);
    cout << msg << " run..." << endl;
  }
  return (void*)0;

}
int main()
{
  //初始化锁与条件变量
  pthread_mutex_init(&lock, nullptr);
  pthread_cond_init(&cond, nullptr);

  //创建三个线程
  pthread_t t1, t2, t3;
  pthread_create(&t1, nullptr, Run, (void*)"thread 1");
  pthread_create(&t2, nullptr, Run, (void*)"thread 2");
  pthread_create(&t3, nullptr, Run, (void*)"thread 3");

  //用主线程控制三个新线程,按一下回车就唤醒一个线程
  while(1)
  {
    getchar();
    pthread_cond_signal(&cond);
  }
  //最后销毁锁和条件变量
  pthread_mutex_destroy(&lock);                                                                                       pthread_cond_destroy(&cond);
  return 0;
}


3.基于阻塞队列的生产者消费者模型

我们首先来讲一个生活中常见的小例子。

image-20220825133746142

那么为什么要使用生产者消费者模型呢?

  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

下面我们来一段基于生产者消费者模型的代码,来帮助大家更好的理解

main.cc

#include "BlockQueue.hpp"
#include "task.hpp"
using namespace std;
void* Producter(void* arg)
{
  auto bq = (BlockQueue<Task>*)arg;
  const char* arr = "+-*/";
  while(1)
  {
    //随机生产数字和操作符
    int x = rand()%100+1;
    int y = rand()%50;
    char op = arr[rand()%4];
    //生产任务并将任务放进阻塞队列
    Task t(x, y, op);
    bq->push(t);
    cout << "product task done" << endl;
  }
}
void* Consumer(void* arg)
{
  auto bq = (BlockQueue<Task>*)arg;
  while(1)
  {
    sleep(1);
    Task t;
    bq->pop(t);
    t.Run();                                                
  }
}
int main()
{
  srand((unsigned)time(nullptr));

  BlockQueue<Task>* bq = new BlockQueue<Task>();
  //创建两个线程,一个是生产者p一个是消费者c
  pthread_t p, c;
  pthread_create(&p, nullptr, Producter, (void*)bq);
  pthread_create(&c, nullptr, Consumer, (void*)bq);

  pthread_join(p, nullptr);
  pthread_join(c, nullptr);
  return 0;
}

BlcokQueue.hpp

#pragma once                                                                                                       
#include <queue>
#include <iostream>
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <cstdlib>
template<typename T>
#define NUM 32 //默认阻塞队列的容量设置为32
class BlockQueue 
{
  private:
    bool IsFull()
    {
      return _q.size() == _capacity;
    }
    bool IsEmpty()
    {
      return _q.empty();
    }
  public:
    BlockQueue(int capacity = NUM)
      :_capacity(capacity)
    {
      //在构造函数里先将锁与条件变量初始化
      pthread_mutex_init(&lock, nullptr);
      pthread_cond_init(&full, nullptr);
      pthread_cond_init(&empty, nullptr);
    }
    ~BlockQueue()
    {
      //在析构函数将锁与条件变量清除
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&full);
      pthread_cond_destroy(&empty);
    }

    //数据插入
    void push(const T& in)
    {
      //插入之前先加锁
      pthread_mutex_lock(&lock);
      //先判断是不是满了
      while(IsFull())//这里坚决不能使用if判断。原因有2。1:pthread_cond_wait调用失败,代码向下执行,本来就满了,还继续插入。2.如果是一个生产者三个消费者,生产者生产了一个数据就把三个等待的消费者唤醒了,可是数据只有一个啊,如果三个线    程都向下执行那就出错了,所以我们使用while,在进程被唤醒的时候多判断一次,而不是直接向下执行,这样避免了这种错误
      {
        //满了就等
        pthread_cond_wait(&full, &lock);
      }
      _q.push(in);
      if(_q.size() >= _capacity/2)
      {
        //阻塞队列里面数据数量大于一半,唤醒消费者来消费
        std::cout << "数据很多了,快来消费吧" << std::endl;
        pthread_cond_signal(&empty);
      }
      pthread_mutex_unlock(&lock);
    }
    
    //删除数据
    void pop(T& out)
    {
      pthread_mutex_lock(&lock);
      while(IsEmpty())
      {
        //如果是空的就等待
        pthread_cond_wait(&empty, &lock);
      }
      out = _q.front();
      _q.pop();
      if(_q.size() < _capacity/10)
      {//数据量小于十分之一,唤醒生产
        std::cout << "数据很少了,快来生产把" << std::endl;
        pthread_cond_signal(&full);
      }

      pthread_mutex_unlock(&lock);
    }

  private:
    std::queue<T> _q;//用来保存临界资源
    int _capacity;//容量
    pthread_mutex_t lock;
    pthread_cond_t empty;
    pthread_cond_t full;

};        

task.hpp

#pragma once                                                                                                                                                                                                                            
#include <iostream>

class Task
{
  public:
    Task(int x, int y, char op)
      :_x(x)
      ,_y(y)
      ,_op(op)
    {}

    Task()
    {}
    void Run()
    {
      int result = 0;
      switch(_op)
      {
        case '+':
          result = _x +_y;
          break;
        case '-':
          result = _x - _y;
          break;
        case '*':
          result = _x * _y;
          break;
        case '/':
          if(_y == 0)
          {
            std::cout<< "除数为0" << std::endl;
            result = -1;
            break;
          }
          else 
          {
            result = _x / _y;
            break;
          }
        default:
          break;
      }
      std::cout << _x << _op << _y << "=" << result << std::endl;
    }
    ~Task()
    {}
    Task& operator=(const Task& t)
    {
      _x = t._x;
      _y = t._y;
      _op = t._op;
      return *this;
    }
  private:
    int _x;
    int _y;
    char _op;
}; 

运行结果

image-20220825164035345

4.POSIX信号量

POSIX信号量可以用于线程间同步,达到无冲突的访问共享资源的目的

下面我们来认识一下POSIX信号量的几个接口

  • 初始化信号量

    #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);
    //第一个参数sem:我们要初始化的信号量
    //第二个参数pshared,如果为0,这个信号量被多线程共享,非0,被多个进程共享
    //第三个参数,信号量的初始值
     
    
  • 销毁信号量

    #include <semaphore.h>
    int sem_destroy(sem_t *sem);
    
    
  • 等待信号量

    #include <semaphore.h>
    int sem_wait(sem_t *sem);
    //等待信号量,会将信号量的值减1
    
  • 发布信号量

    #include <semaphore.h>
    int sem_post(sem_t *sem);
    //发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
    

信号量的本质就是一个计数器,是用于描述临界资源中资源数目个数的计数器

申请到信号量的本质,并不是你已经开始使用临界资源中你所申请的那个区域,而是有了使用特定资源的权限

申请信号量:count–,P操作

释放信号量:count++, V操作

信号量也是临界资源,所以信号量的申请和释放也必须是原子的。在信号量的个数为1时,基本等价于互斥锁

接下来,我们还是以抢票的例子,对以上几个接口进行一下简单的运用

#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <string>
#include <unistd.h>                                  
class Sem
{
  public:
    Sem(int num)
    {
      sem_init(&_sem, 0, num);
    }
    ~Sem()
    {
      sem_destroy(&_sem);
    }
    void P()
    {
      sem_wait(&_sem);//count--
    }
    void V()
    {
      sem_post(&_sem);//count++
    }
  private:
    sem_t _sem;
};
Sem sem(1);
int tickets = 2000;
void* GetTickets(void* arg)
{
  std::string name = (char*)arg;
  while(1)
  {
    sem.P();
    if(tickets > 0)
    {
      usleep(10000);
      tickets--;
      std::cout << name << "抢到了一张票,剩余票数 " << tickets << std::endl;
      sem.V();
    }
    else 
    {
      sem.V();
      break;
    }
  }
  return (void*)0;
}
int main()
{
  pthread_t tid1, tid2, tid3, tid4, tid5;
  pthread_create(&tid1, nullptr, GetTickets, (void*)"thread 1");
  pthread_create(&tid2, nullptr, GetTickets, (void*)"thread 2");
  pthread_create(&tid3, nullptr, GetTickets, (void*)"thread 3");
  pthread_create(&tid4, nullptr, GetTickets, (void*)"thread 4");
  pthread_create(&tid5, nullptr, GetTickets, (void*)"thread 5");

  pthread_join(tid1, nullptr);
  pthread_join(tid2, nullptr);
  pthread_join(tid3, nullptr);
  pthread_join(tid4, nullptr);
  pthread_join(tid5, nullptr);

  sem.~Sem();
  return 0;
}    

5.基于环形队列的生产消费模型

image-20220904201251404

下面我们写一段环形队列的代码

Ring.hpp

#pragma once 
#include <iostream>                                  
#include <vector>
#include <pthread.h>
#include <semaphore.h>

#define NUM 5
template<typename T>
class RingQueue
{
  private:
    void P(sem_t& s)
    {
      sem_wait(&s);
    }
    void V(sem_t& s)
    {
      sem_post(&s);
    }
  public:
    RingQueue(int cap = NUM)
      :_cap(cap)
      ,c_pos(0)
      ,p_pos(0)
    {
      _q.resize(_cap);
      sem_init(&blank_sem, 0, _cap);
      sem_init(&data_sem, 0, 0);
    }
    ~RingQueue()
    {
      sem_destroy(&blank_sem);
      sem_destroy(&data_sem);
    }
    //生产数据,由生产者调用
    void Push(const T& in)
    {
      //插入数据,格子减一,数据加一
      P(blank_sem);
      _q[p_pos] = in;
      V(data_sem);

      p_pos++;
      p_pos %= _cap;
    }

    //消费数据,由消费者调用
    void Pop(T& out)
    {
      //删除数据,数据减一,格子加一
      P(data_sem);
      out = _q[c_pos];
      V(blank_sem);
      c_pos++;
      c_pos %= _cap;
    }
  private:
    std::vector<T> _q;
    int _cap;//环形队列的空间大小
    int c_pos;//消费者指向的位置
    int p_pos;//生产者指向的位置

    sem_t blank_sem;
    sem_t data_sem;
};                      

main.cc

#include "Ring.hpp"
#include <stdlib.h>

void* Product(void* arg)
{
  RingQueue<int>* rq = (RingQueue<int>*)arg;
  while(1)
  {
    int in = rand() % 100 + 1;
    rq->Push(in);
    std::cout<< "product done!" << in << std::endl;
  }
}

void* Consume(void* arg)
{
  RingQueue<int>* rq = (RingQueue<int>*)arg;
  int out;
  while(1)
  {
    rq->Pop(out);                                       
    std::cout<< "consume done!" << out << std::endl;
  }
}
int main()
{
  srand((unsigned long)time(nullptr));

  RingQueue<int>* rq = new RingQueue<int>();
  pthread_t p, c;
  pthread_create(&p, nullptr, Product, (void*)rq);
  pthread_create(&c, nullptr, Consume, (void*)rq);

  pthread_join(p, nullptr);
  pthread_join(c, nullptr);
  return 0;
}

6.线程池

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

image-20220905104957175

下面我们来一段测试代码。设计一个线程池。

PthreadPool.hpp

#include <iostream>
#include <pthread.h>
#include <queue>
#include "Task.hpp"
#define NUM 5
template<typename T>
class PthreadPool
{
  public:
    PthreadPool(int num = NUM)
      :thread_num(num)
    {
      pthread_mutex_init(&lock, nullptr);
      pthread_cond_init(&cond, nullptr);
    }
    void Push(const T& in)
    {
      //插入任务先锁上
      pthread_mutex_lock(&lock);
      task_queue.push(in);
      pthread_mutex_unlock(&lock);
      WakeUp();
    }
    void Pop(T& out)
    {
      out = task_queue.front();
      task_queue.pop();
    }
    bool IsEmpty()
    {
      return task_queue.size() == 0 ? true : false;             
    }
    void Wait()
    {
      pthread_cond_wait(&cond, &lock);
    }
    void WakeUp()
    {
      pthread_cond_signal(&cond);
    }
    static void* Rountie(void* arg)
    {
      pthread_detach(pthread_self());
      PthreadPool* self = (PthreadPool*)arg;
      while(1)
      {
        //到任务队列取任务,先加锁,在判断任务队列是否为空
        pthread_mutex_lock(&self->lock);
        if(self->IsEmpty())
        {
          self->Wait();
        }
        //有任务
        T t;
        self->Pop(t);
        //处理任务
        t.Run();
        pthread_mutex_unlock(&self->lock);
      }

    }
    void InitThreadPool()
    {
      pthread_t tid;
      for(int i = 0; i < thread_num; i++)
      {
        pthread_create(&tid, nullptr, Rountie, this);
      }
    }
    ~PthreadPool()
    {
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&cond);
    }
  private:
    int thread_num;
    std::queue<T> task_queue;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};  

Task.hpp

#pragma once 
#include <iostream>
#include <pthread.h>
class Task
{
  public:
    Task(int x, int y, char op)
      :_x(x)
      ,_y(y)
      ,_op(op)
    {}
    Task()
    {}
    void Run()
    {
      int ret = 0;
      switch(_op)
      {
        case '+':
          ret = _x + _y;
          break;
        case '-':
          ret = _x - _y;
          break;
        case '*':
          ret = _x * _y;
          break;
        case '/':
          if(_y == 0)
          {
            std::cout << "div zero!" << std::endl;
            break;
          }
          ret = _x / _y;
          break;
        case '%':
          if(_y == 0)
          {
            std::cout << "mod zero!" << std::endl;
            break;
          }
          ret = _x % _y;
          break;                                                 
        default:
          break;
      }
      std::cout << "thread:" << pthread_self() << " " << _x << _op << _y << "=" << ret << std::endl;
    }
    ~Task()
    {}
  private:
    int _x;
    int _y;
    char _op;
};   

main.cc

#include "Task.hpp"
#include "PthreadPool.hpp"
#include <stdlib.h>
#include <unistd.h>
int main()
{
  PthreadPool<Task>* tp = new PthreadPool<Task>();
  tp->InitThreadPool();

  srand((unsigned long)time(nullptr));
  const char* op = "+-*/%";
  while(1)
  {
    int x = rand()%100 + 1;
    int y = rand()%100 + 1;
    int i = rand()%5;
    Task t(x, y, op[i]);                                         
    tp->Push(t);
   // sleep(1);
  }

  return 0;
}
    default:
      break;
  }
  std::cout << "thread:" << pthread_self() << " " << _x << _op << _y << "=" << ret << std::endl;
}
~Task()
{}

private:
int _x;
int _y;
char _op;
};


main.cc

```cpp
#include "Task.hpp"
#include "PthreadPool.hpp"
#include <stdlib.h>
#include <unistd.h>
int main()
{
  PthreadPool<Task>* tp = new PthreadPool<Task>();
  tp->InitThreadPool();

  srand((unsigned long)time(nullptr));
  const char* op = "+-*/%";
  while(1)
  {
    int x = rand()%100 + 1;
    int y = rand()%100 + 1;
    int i = rand()%5;
    Task t(x, y, op[i]);                                         
    tp->Push(t);
   // sleep(1);
  }

  return 0;
}
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到