信号量Semaphore

发布于:2024-04-20 ⋅ 阅读:(15) ⋅ 点赞:(0)

什么是信号量?

C++中的信号量(Semaphore)是一种同步对象,用于控制对共享资源的访问,以防止多个线程或进程同时访问同一资源,从而避免数据不一致的问题。信号量通过维护一个计数值来实现这一功能,该计数值表示可以同时访问共享资源的线程或进程的数量。当一个线程或进程想要访问共享资源时,它会尝试减少信号量的计数值;如果计数值大于零,线程或进程可以继续执行,否则它必须等待直到信号量的计数值变为正数。当线程或进程完成对共享资源的访问后,它会释放信号量,即增加计数值,从而可能允许其他正在等待的线程或进程继续执行。

信号量可以干什么?

  1. 互斥
  2. 限制并发访问
  3. 线程同步
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>


class Semaphore {
public:
    Semaphore(int count = 1) : count_(count) {}

    void notify() {
        std::unique_lock<std::mutex> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (count_ <= 0) {
            condition_.wait(lock);
        }
        --count_;
    }

private:
    std::mutex mutex_;
    std::condition_variable condition_;
    int count_;
};

// 共享资源
int sharedResource = 0;

void workerFunction(Semaphore& semaphore) {
    for (int i = 0; i < 1000; ++i) {
        semaphore.wait(); // 加锁
        ++sharedResource;
        semaphore.notify(); // 解锁
    }
}

//实现互斥

int main() {
    Semaphore semaphore(1);
    std::vector<std::thread> threads;

    // 创建 10 个工作线程
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread(workerFunction, std::ref(semaphore)));
    }

    // 等待所有线程完成
    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Shared resource value: " << sharedResource << std::endl;

    return 0;
}
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(int count = 1) : count_(count) {}

    void notify() {
        std::unique_lock<std::mutex> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (count_ <= 0) {
            condition_.wait(lock);
        }
        --count_;
    }

private:
    std::mutex mutex_;
    std::condition_variable condition_;
    int count_;
};

void workerFunction(int threadId, Semaphore& semaphore) {
    semaphore.wait(); // 获取访问权限
    std::cout << "Thread " << threadId << " is accessing the shared resource." << std::endl;
    // 模拟访问共享资源
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Thread " << threadId << " has finished accessing the shared resource." << std::endl;
    semaphore.notify(); // 释放访问权限
}

//限制并发访问

int main() {
    Semaphore semaphore(3); // 允许最多3个线程同时访问
    std::vector<std::thread> threads;

    // 创建10个工作线程
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread(workerFunction, i, std::ref(semaphore)));
    }

    // 等待所有线程完成
    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(int count = 1) : count_(count) {}

    void notify() {
        std::unique_lock<std::mutex> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (count_ <= 0) {
            condition_.wait(lock);
        }
        --count_;
    }

private:
    std::mutex mutex_;
    std::condition_variable condition_;
    int count_;
};

void task1(Semaphore& semaphore1, Semaphore& semaphore2) {
    std::cout << "Task 1 is executing." << std::endl;
    // 模拟任务执行
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Task 1 is completed." << std::endl;
    // 通知任务2可以开始执行
    semaphore1.notify();
    // 等待任务2完成
   // semaphore2.wait();
}

void task2(Semaphore& semaphore1, Semaphore& semaphore2) {
    // 等待任务1完成
    semaphore1.wait();
    std::cout << "Task 2 is executing." << std::endl;
    // 模拟任务执行
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Task 2 is completed." << std::endl;
    // 通知任务3可以开始执行
    semaphore2.notify();
}

void task3(Semaphore& semaphore2) {
    // 等待任务2完成
    semaphore2.wait();
    std::cout << "Task 3 is executing." << std::endl;
    // 模拟任务执行
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Task 3 is completed." << std::endl;
}

//线程同步

int main() {
    Semaphore semaphore1(0), semaphore2(0);

    std::thread t1(task1, std::ref(semaphore1), std::ref(semaphore2));
    std::thread t2(task2, std::ref(semaphore1), std::ref(semaphore2));
    std::thread t3(task3, std::ref(semaphore2));

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

信号量怎么实现?

C++11

使用互斥锁和条件变量模拟信号量。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

class Semaphore {
public:
    Semaphore(int count) : _count(count) {}

    void acquire() {
        std::unique_lock<std::mutex> lock(_mutex);
        while (_count == 0) {
            _cv.wait(lock);
        }
        --_count;
    }

    void release() {
        std::lock_guard<std::mutex> lock(_mutex);
        ++_count;
        _cv.notify_one();
    }

private:
    int _count;
    std::mutex _mutex;
    std::condition_variable _cv;
};

Semaphore taskSemaphore(4); // 最大并发数为4

void processTask(int taskID) {
    taskSemaphore.acquire();

    std::cout << "Task " << taskID << " started." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(10));
    std::cout << "Task " << taskID << " finished." << std::endl;

    taskSemaphore.release();
}

int main() {
    std::vector<std::thread> threads;
    const int numTasks = 10;

    for (int i = 1; i <= numTasks; ++i) {
        threads.emplace_back(processTask, i);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

C++20

使用std::counting_semaphore

#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>

// 假设我们有一个最大并发数为4的任务队列
std::counting_semaphore<4> taskSemaphore(4);

void processTask(int taskID) {
    // 请求一个任务许可
    taskSemaphore.acquire();

    std::cout << "Task " << taskID << " started." << std::endl;
    // 这里模拟任务处理时间
    std::this_thread::sleep_for(std::chrono::milliseconds(10000));
    std::cout << "Task " << taskID << " finished." << std::endl;

    // 任务处理完释放许可
    taskSemaphore.release();
}

int main() {
    std::vector<std::thread> threads;
    const int numTasks = 10;

    for (int i = 1; i <= numTasks; ++i) {
        threads.emplace_back(processTask, i);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

经典生产者消费者问题

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <chrono>
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(int count = 1) : count_(count) {}

    void notify() {
        std::unique_lock<std::mutex> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (count_ <= 0) {
            condition_.wait(lock);
        }
        --count_;
    }

private:
    std::mutex mutex_;
    std::condition_variable condition_;
    int count_;
};

const int BUFFER_SIZE = 10;
std::queue<int> buffer;
std::mutex mutex_; // 互斥锁,用于保护共享数据
Semaphore empty(BUFFER_SIZE), full(0);

void producer(int producerId) {
    for (int i = 0; i < 100; ++i) {
        empty.wait(); // 等待缓冲区有空位
        std::unique_lock<std::mutex> lock(mutex_);
        buffer.push(i);
        std::cout << "Producer " << producerId << " produced: " << i << std::endl;
        lock.unlock();
        full.notify(); // 通知消费者有新数据
    }
}

void consumer(int consumerId) {
    for (int i = 0; i < 100; ++i) {
        full.wait(); // 等待缓冲区有数据
        std::unique_lock<std::mutex> lock(mutex_);
        int data = buffer.front();
        buffer.pop();
        std::cout << "Consumer " << consumerId << " consumed: " << data << std::endl;
        lock.unlock();
        empty.notify(); // 通知生产者有空位
    }
}

int main() {
    std::vector<std::thread> producers, consumers;

    // 创建3个生产者线程
    for (int i = 0; i < 3; ++i) {
        producers.push_back(std::thread(producer, i));
    }

    // 创建3个消费者线程
    for (int i = 0; i < 3; ++i) {
        consumers.push_back(std::thread(consumer, i));
    }

    // 等待所有生产者线程完成
    for (auto& producer : producers) {
        producer.join();
    }

    // 等待所有消费者线程完成
    for (auto& consumer : consumers) {
        consumer.join();
    }

    return 0;
}