C/C++实现高性能并行计算——1.pthreads并行编程(下)

发布于:2024-05-01 ⋅ 阅读:(23) ⋅ 点赞:(0)

系列文章目录

  1. pthreads并行编程(上)
  2. pthreads并行编程(中)
  3. pthreads并行编程(下)
  4. 使用OpenMP进行共享内存编程


前言

在上一篇文中讲到竞争条件和临界区,以及对应的解决方案——忙等待或者互斥量(尽量使用互斥量),主要针对线程之间的运行顺序没有先后的差别。这一节主要针对的是线程之间的运行是有先后顺序的情况。


一、同步和信号量

在这里插入图片描述信号量
在这里插入图片描述

5.1 信号量的值

信号量的值代表了可用资源的数量或者允许进入某个临界区(critical section,一段需要互斥访问的代码区域)的线程数量。这个数值是信号量操作的核心,因为它决定了线程或进程是否可以立即访问共享资源或必须等待。
具体来说,信号量的值代表了以下几个方面:

  1. 可用资源的数量:
    当信号量用于管理有限资源(如数据库连接、线程池的线程数、IO设备等)时,信号量的值表示当前可用的资源单位数。例如,如果有10个相同的资源可用,信号量的初始值可以设置为10。每次一个线程或进程通过执行P操作(sem_wait())来请求资源时,信号量的值就会减1。当信号量的值达到0时,这意味着没有更多的资源可用,后续的线程或进程必须等待,直到某个已经占用资源的线程或进程通过执行V操作(sem_post())释放资源,信号量的值才会增加。
  2. 许可进入的线程数量:
    在控制对临界区的访问时,信号量的值表示可以同时进入该区域的最大线程数。例如,如果某个临界区域一次只能安全地由一个线程进入,那么可以使用一个初始化值为1的二元信号量(或互斥锁)。这确保了每次只有一个线程可以进入并执行该区域的代码。如果信号量的值为1,意味着临界区是可访问的;如果为0,表示临界区已经被占用,其他线程必须等待。
  3. 同步和协调多个进程或线程:
    信号量也经常用于同步操作,确保线程以特定的顺序执行。例如,在某些线程必须在其他线程完成其任务之后才能开始的情况下,可以通过信号量来协调这些线程的启动顺序。

5.1 信号量函数语法

在 C 语言中,信号量的处理主要依赖于 POSIX 线程库(Pthreads),特别是在 UNIX 或类 UNIX 系统(如 Linux)上。POSIX 信号量定义在 <semaphore.h> 头文件中,提供了一组标准的函数来创建、操作和销毁信号量。

主要的信号量函数包括:

  1. sem_init - 初始化一个未命名的信号量。(初始化前还要声明哈)

    int sem_init(sem_t *sem, int pshared, unsigned int value);
    
    • 参数
      • sem: 指向信号量对象的指针。
      • pshared: 如果此参数非零,则信号量在进程间共享;如果为零,则只能在同一进程的线程间共享。
      • value: 信号量的初始值。
    • 返回值:成功时返回 0;错误时返回 -1 并设置 errno。
  2. sem_destroy - 销毁一个未命名的信号量。

    int sem_destroy(sem_t *sem);
    
    • 参数
      • sem: 指向信号量对象的指针。
    • 返回值:成功时返回 0;错误时返回 -1 并设置 errno。
  3. sem_wait - 减少信号量的值(P 操作,即 “wait” 操作)。

    int sem_wait(sem_t *sem);
    
    • 参数
      • sem: 指向信号量对象的指针。
    • 返回值:成功时返回 0;错误时返回 -1 并设置 errno。
  4. sem_post - 增加信号量的值(V 操作,即 “signal” 操作)。

    int sem_post(sem_t *sem);
    
    • 参数
      • sem: 指向信号量对象的指针。
    • 返回值:成功时返回 0;错误时返回 -1 并设置 errno。

以下是一个使用 POSIX 信号量的简单示例,演示了如何同步两个线程:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

sem_t sem; //声明信号量类型对象

void* thread_func(void* arg) {
//这里的减少信号量和增加信号量的操作和互斥锁的上锁解锁很类似
    sem_wait(&sem);
    printf("Entered thread %d\n", *(int*)arg);
    sem_post(&sem);
    return NULL;
}

int main() {
    pthread_t t1, t2;
    int id1 = 1, id2 = 2;

    sem_init(&sem, 0, 1);  // 初始化信号量,初始值为1

    pthread_create(&t1, NULL, thread_func, &id1);
    pthread_create(&t2, NULL, thread_func, &id2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    sem_destroy(&sem);  // 销毁信号量

    return 0;
}

这个程序创建了一个信号量和两个线程,通过信号量来同步它们的执行。这样,即使两个线程几乎同时启动,它们也会一个接一个地执行,因为信号量控制。这个例子和互斥锁的原理几乎一样!!

5.2 二元信号量

信号量的值(是否为1、0或者其他正整数)有着重要的含义,特别是在控制多个线程或进程访问共享资源时。这里讨论一下信号量值为1和0的情况,以及这两个值在并发编程中的应用。

信号量值为1
当信号量的值为1时,它通常被称为二元信号量互斥信号量。这种信号量在功能上类似于互斥锁(mutex):

  • 互斥访问:确保同一时刻只有一个线程(或进程)可以进入临界区。信号量值为1意味着可用的资源数量为1,因此任何时刻只能有一个线程获取这个资源(即进入临界区)。
  • 操作
    • sem_wait():当线程执行sem_wait()时,如果信号量值为1,则将其减为0并允许线程进入临界区。如果信号量值已经是0(说明已被其他线程占用),当前线程会阻塞,直到信号量再次变为1。
    • sem_post():当线程执行sem_post()时,它将信号量值从0增加到1,表明线程已离开临界区,资源现在可用,等待的线程可以被唤醒并进入临界区。

信号量值为0
信号量的值为0时,表示没有可用资源,或者临界区不能被任何线程进入:

  • 阻塞状态:任何尝试执行sem_wait()的线程都将阻塞,因为信号量值为0表示不允许进入临界区。
  • 释放资源:只有当某个线程执行sem_post(),将信号量从0增加到1时,等待的线程才能解除阻塞并访问资源或进入临界区。

用途和场景

  • 信号量值为1
    • 用作互斥锁来保护对共享资源的访问。
    • 确保数据结构或文件等不会由多个线程同时修改。
  • 信号量值为0
    • 控制线程的启动序列,例如,在某些初始化操作完成之前阻止线程工作。
    • 等待外部事件,例如,一个线程可能需要等待其他线程准备好数据或状态。

总结
信号量的具体值(1、0或其他)定义了其控制线程行为的方式,其中值为1的信号量通常用作二元互斥锁,而值为0的信号量表示资源不可用,需要线程等待。

5.3 生产者和消费者的工作流程

让我们通过一个具体的例子来解释信号量值为1和0的情况,以及如何使用它们进行线程同步。这个例子涉及到两个线程:一个生产者线程(Producer)和一个消费者线程(Consumer)。生产者负责生成数据,消费者负责处理数据。我们将使用两个信号量来同步它们的行为:

  1. empty 信号量:表示缓冲区空的槽数。初始值设为缓冲区大小(假设为1,代表我们有一个单元的缓冲区)。
  2. full 信号量:表示缓冲区中满的槽数。初始值设为0,表示开始时没有数据可供消费。
  • 生产者
    • 检查empty信号量(等待有空位)。如果信号量大于0(表示有空间),则进入缓冲区生产数据,并通过sem_wait()减少empty的值。
    • 生产数据后,使用sem_post()增加full信号量的值,表示现在有一个产品可以被消费。
  • 消费者
    • 检查full信号量(等待产品可用)。如果信号量大于0(表示缓冲区中有数据),则进入缓冲区消费数据,并通过sem_wait()减少full的值。
    • 消费数据后,使用sem_post()增加empty信号量的值,表示缓冲区现在有空位可以生产新的产品。

代码实现

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h> // for sleep()

sem_t empty;
sem_t full;

// 模拟的共享缓冲区,这里只有一个槽位
int buffer = 0;

// 生产者函数
void* producer(void* arg) {
    for (int i = 0; i < 5; i++) {
        sem_wait(&empty);  // 等待空位
        buffer = i;        // 生产数据
        printf("Produced: %d\n", buffer);
        sem_post(&full);   // 释放信号量表示有数据可用
        sleep(1);          // 延时模拟耗时操作
    }
    return NULL;
}

// 消费者函数
void* consumer(void* arg) {
    for (int i = 0; i < 5; i++) {
        sem_wait(&full);   // 等待数据
        printf("Consumed: %d\n", buffer);
        buffer = 0;        // 消费数据
        sem_post(&empty);  // 释放信号量表示有空位
        sleep(1);          // 延时模拟耗时操作
    }
    return NULL;
}

int main() {
    pthread_t tid1, tid2;
    sem_init(&empty, 0, 1); // 缓冲区空位为1
    sem_init(&full, 0, 0);  // 缓冲区满位为0

    pthread_create(&tid1, NULL, producer, NULL);
    pthread_create(&tid2, NULL, consumer, NULL);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    sem_destroy(&empty);
    sem_destroy(&full);

    return 0;
}

在这个例子中,emptyfull信号量确保生产者和消费者可以正确地同步对单个缓冲区的访问。这种方式避免了竞争条件和不一致的状态,并且保证了生产者不会在缓冲区满时写入数据,消费者也不会在缓冲区空时尝试读取数据。


二、路障(barrier)

  • 通过保证所有线程在程序中处于同一位置来同步线程。
  • 这个同步点又被称为路障(barrier),只有所有线程都抵达路障,线程才能继续运行下去,否则会阻塞在路障处。

pthread中没有对路障的一个实现,所以只能通过现有的知识来实现一个路障:

2.1 实现barrier ——忙等待&互斥量

int counter;  //计数器
int num_thread;
pthread_mutex_t barrier_mutex;

//线程函数
void* Thread_work(...){
	pthread_mutex_lock(&barrier_mutex);
	counter++;
	pthread_mutex_unlock(&barrier_mutex); //对全局变量counter进行一个保护
	
	while (counter < num_thread); // 这段代码描述的是提前到达这里的线程将会处于忙等待的状态
}

在这里插入图片描述除非这里在之后counter重新等于0,也就是说这里的counter的重用性不是很好!

2.2 实现barrier ——信号量

int counter;
sem_t count_sem;    //保护计数器,初始化为1。
sem_t barrier_sem;  //阻塞已经进入路障的线程
void* Thread_work(...){
	sem_wait(&count_sem);
	if (counter == num_thread - 1){  //这里表示最后一个线程已经到达路障处
		counter = 0;  //需要把全局变量计数器更新为0
		sem_post(&count_sem);
		//在之前的线程都被阻塞到了barrier_sem(不是二元信号量了)处
		for (j = 0; j < num_thread - 1; j++) sem_post(&barrier_sem);  //把阻塞在这里的信号量给唤醒
	}else{
		counter++;
		sem_post(&count_sem);
		sem_wait(&barrier_sem);
	}
	...		
}

在这里插入图片描述
为什么barrier_sem在第二次调用时会出现竞争条件?
当最后一个线程触发 sem_post(&barrier_sem) 释放其他所有线程时,这些线程可能会立即继续执行,而在它们重新运行到 Thread_work 函数并执行 sem_wait(&count_sem) 前,counter 已经被重置为0。如果某些线程运行比较快,并开始了新一轮的计数(在其他线程还没来得及通过旧的屏障),这将导致 counter 的值可能无法正确反映当前正在 barrier_sem 上等待的线程数。

2.3 实现barrier ——条件变量

  • 条件变量是一个数据对象,允许线程在某个特定条件或时间发生之前都处于挂起状态。当事件或者条件发生时,另一个线程可以通过信号来唤醒挂起线程。
  • 一个条件变量总是与一个互斥量相关联。(受到互斥量的保护)

条件变量的一般使用方法

lock mutex;
if condition has occured
	signal thread(s); //唤醒线程s
else{
	unlock the mutex and block; //解锁阻塞线程
}
unlock mutex;

条件变量是用于线程间同步的一种机制,允许线程在某些预设条件尚未满足时挂起执行,直到其他线程改变了这些条件并通知条件变量继续执行。在多线程编程中,条件变量通常与互斥锁(mutexes)结合使用,以避免竞争条件和提供对共享数据的保护。

使用条件变量的基本元素:

  1. 互斥锁(Mutex):保护共享资源,确保在检查条件和修改共享资源时不会出现并发冲突。
  2. 条件变量(Condition Variable):用于在特定条件尚未满足时阻塞一个或多个线程,并在条件满足时通知它们。

主要操作:

  • 等待(Wait):线程在继续执行前等待特定的条件成立。当线程调用等待操作时,它必须已经获得了互斥锁。等待操作会原子性地释放互斥锁并挂起调用线程的执行。当条件变量被通知(或广播)时,线程被唤醒,然后重新尝试获取互斥锁,继续执行。
  • 通知(Signal):通知操作用于唤醒一个等待该条件变量的线程。如果有多个线程在等待,通常选择一个线程被唤醒。
  • 广播(Broadcast):与通知操作类似,但它会唤醒等待同一条件变量的所有线程。
#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  // 静态初始化条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  // 静态初始化互斥锁

// 初始化条件变量
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

// 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

// 等待条件变量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);  //通过互斥量来阻塞,成功时返回0
//mutex: 指向已经锁定的互斥锁的指针。该互斥锁保护条件的状态。此函数会自动释放互斥锁,当条件变量被通知后,互斥锁会重新被锁定。

// 通知一个等待条件变量的线程
int pthread_cond_signal(pthread_cond_t *cond);

// 广播通知所有等待条件变量的线程
int pthread_cond_broadcast(pthread_cond_t *cond);

实现barrier

int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
void* Thread_work(...){ 
	pthread_mutex_lock(&mutex);
	counter++;
	if (counter == num_thread){
		counter = 0;
		pthread_cond_broadcast(&cond_var);
	}else 
		while (pthread_cond_wait(&cond_var, &mutex) != 0); //用来阻塞已经到达这个地方的线程,在这里是一个假唤醒,没什么用。
	pthread_mutex_unlock(&mutex);
}

互斥锁与条件变量函数之间的关系

pthread_cond_wait(&cond_var, &mutex) 调用中,线程首先会释放与条件变量关联的互斥锁 mutex,然后进入阻塞状态,等待条件变量被其他线程通过 pthread_cond_signalpthread_cond_broadcast 唤醒。当条件变量被唤醒时,线程会从阻塞状态恢复,并在函数返回之前自动重新获得之前释放的互斥锁。


总结

  1. 信号量,semaphore.h库产生,其数据类型是sem_t。其相关函数的使用。
  2. 条件变量,其数据类型是pthread_cond_t。其相关函数的使用。
  3. 路障,线程在实现的时候停下来同步,其实现方式有3种:
    • 忙等待&互斥锁
    • 信号量
    • 条件变量(其本身实现就包含互斥锁)。前两种方式代码变量重用性差。该方式是最好的!!!

参考

  1. 【团日活动】C++实现高性能并行计算——⑨pthreads并行编程

网站公告

今日签到

点亮在社区的每一天
去签到