多线程安全和性能测试

发布于:2025-08-15 ⋅ 阅读:(16) ⋅ 点赞:(0)

本文是实现一个多线程安全的动态数组(vector)实现与性能测试框架,通过条件编译(宏定义)提供了三种不同的线程安全策略,并包含了功能验证和性能测试的入口。其核心目的是对比不同线程安全方案在多线程环境下的正确性与效率。

一、整体结构

代码通过宏定义(MUTEX/ATOM/THREAD)切换三种线程安全的ThreadSafeVector实现,再通过SINGAL/TIME宏切换测试模式(功能验证/性能测试)。整体结构如下:

  • 三种线程安全的ThreadSafeVector实现(互斥锁、原子索引、线程本地存储)
  • 两种测试入口(功能正确性验证、性能计时)
  • 依赖C++标准库的线程(std::thread)、同步(std::mutex)、原子操作(std::atomic)等组件

二、实现

#include <iostream>
#include <vector>
#include <mutex>
#include <thread>
#include <atomic>
#include <chrono>
#include <memory>

/*
# 在编译时带宏
GCC : gcc -D<宏名>[=<宏值>] 源文件 -o 可执行文件 {{
    gcc -DDEBUG test.c -o test ( g++ -DATOM main.cpp -o main)
    gcc -DVERSION=2 test2.c -o test2
}
Makefile  : 
{
    CC = gcc
    CFLAGS = -Wall -DDEBUG // CFLAGS 变量包含了 -DDEBUG 选项

    all: test
        $(CC) $(CFLAGS) test.c -o test

    clean:
        rm -f test
}
环境变量 :
{
    export MY_MACRO=1
    gcc -DMY_MACRO=$MY_MACRO test.c -o test
}
*/ 

#ifdef MUTEX
/*
互斥锁保护动态数组
优点:简单易实现,直接利用标准库。  
缺点:高并发下锁竞争可能成为性能瓶颈。
*/
class ThreadSafeVector {
public:
    void add(int value) {
        std::lock_guard<std::mutex> lock(mtx);
        data.push_back(value);
    }

    size_t size() {
        std::lock_guard<std::mutex> lock(mtx);
        return data.size();
    }

    std::vector<int> vec() {
        return data;
    }

    std::string type() {
        return "mutex";
    }
private:
    std::vector<int> data;
    std::mutex mtx;
};
#endif



#ifdef ATOM
/* 
原子索引 + 预分配内存
优点:大部分写入操作无锁,性能较高。  
缺点:预分配可能导致内存浪费,动态扩容时仍需加锁。

原子索引:是一种在多线程环境下,通过 原子操作 管理共享资源(如数组写入位置)的技术。它的核心思想是使用原子变量(如 std::atomic)记录当前可写入的位置索引,
多个线程通过原子操作安全地竞争索引,避免数据竞争(Data Race)。  
实现思路:预分配固定大小的数组,通过原子变量管理当前写入位置索引。仅当空间不足时,通过锁动态扩容。
*/
class ThreadSafeVector {
public:
    ThreadSafeVector() {
        data.resize(INIT_SIZE);
    }

    void add(int value) {
        size_t idx = index.fetch_add(1, std::memory_order_relaxed);
        if (idx < data.size()) {
            data[idx] = value;
        } else {
            std::lock_guard<std::mutex> lock(mtx);
            if (idx >= data.size()) {
                data.resize(data.size() *2);
            }
            data[idx] = value;
        }
    }

    size_t size() {
        std::lock_guard<std::mutex> lock(mtx);
        return data.size();
    }

    std::vector<int> vec() {
        data.resize(index);
        return data;
    }

    std::string type() {
        return "atom";
    }
private:
    std::vector<int> data;
    std::atomic<size_t> index{0};
    std::mutex mtx;
    static const size_t INIT_SIZE = 1024;
};
#endif

#ifdef THREAD
/*
线程本地存储
优点:完全无锁写入,合并时才需同步。  
缺点:数据访问延迟,合并时可能阻塞。

线程本地存储(TLS): 是一种允许每个线程拥有独立数据副本的机制。在多线程环境中,每个线程操作自己的本地数据,
无需与其他线程竞争共享资源,从而完全避免锁的使用。当需要全局汇总数据时,再通过同步机制(如锁)合并各线程的本地数据。  
实现思路 :每个线程使用本地数组缓存数据,定期将数据合并到全局数组。 

thread_local 是 C++11 引入的一个存储类说明符,用于声明线程局部存储的变量。线程局部存储意味着每个线程都有该变量的一个独立副本,每个线程对该变量的操作都不会影响其他线程中的副本
*/
#define DATA_MERGE_LENGTH 100
class ThreadSafeVector {
public:
    ~ThreadSafeVector() {
        std::lock_guard<std::mutex> lock(mtx);
        global_data.insert(global_data.end(), local_data.begin(), local_data.end());
    }

    void add(int value) {
        local_data.push_back(value);
        if (local_data.size() >= DATA_MERGE_LENGTH) {
            merge();
        }
    }

    size_t size() {
        std::lock_guard<std::mutex> lock(mtx);
        return global_data.size();
    }

    std::vector<int> vec() {
        return global_data;
    }

    std::string type() {
        return "thread";
    }

private:
    void merge() {
        std::lock_guard<std::mutex> lock(mtx);
        global_data.insert(global_data.end(), local_data.begin(), local_data.end());
        local_data.clear();
    }

private:
    std::vector<int> global_data;
    thread_local static std::vector<int> local_data;
    std::mutex mtx;
};

// 定义线程局部存储变量
thread_local std::vector<int> ThreadSafeVector::local_data;
#endif


#ifdef SINGAL
void worker(ThreadSafeVector& vec, int start, int end) {
    for (int i = start; i < end; i++) {
        vec.add(i);
    }
}

/*
1. 低竞争场景:方案 1(互斥锁)简单可靠。
2. 高并发写入:方案 2(原子索引)性能更优。
3. 允许最终一致性:方案 3(线程本地存储)避免锁争用。
*/
int main() {
    ThreadSafeVector vec;
    std::cout << vec.type().data() << std::endl;
    int num_threads = 5;
    int num_elements_per_thread = 100;
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; i++) {
        int start = i * num_elements_per_thread;
        int end = start + num_elements_per_thread;
        threads.emplace_back(worker, std::ref(vec), start, end);
    }

    for (auto& th : threads) {
        th.join();
    }

    size_t expected_size = num_threads * num_elements_per_thread;
    if (vec.size() == expected_size) {
        std::cout << "corrent" << std::endl;
    } else {
        std::vector<int> data = vec.vec(); 
        std::cout << "incorrent vec data size : " << data.size() << std::endl;
        // for (int i = 0; i < vec.size(); i++) {
        //     std::cout << data[i] << std::endl;
        // }
    }
    return 0;
}
#endif

#ifdef TIME
constexpr int TOTAL_DATA = 10000;
constexpr int THREAD_NUM = 4;
constexpr int DATA_PER_THREAD = TOTAL_DATA / THREAD_NUM;

// 测试工具函数
template<typename T>
void run_test(T& container, const std::string& name){
    auto start = std::chrono::high_resolution_clock::now();

    std::vector<std::thread> threads;
    for (int i = 0; i < THREAD_NUM; ++i) {
        threads.emplace_back([&]() {
        for (int j = 0; j < DATA_PER_THREAD; ++j) {
            container.add(j);
        }
        });
    }

    for (auto& t : threads) {
        t.join();
    }

    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> duration = end - start;

    std::cout << name << " Time: " << duration.count() << "s\n";
}

int main() {
    ThreadSafeVector vec;
    run_test(vec, vec.type());
    return 0;
}
#endif

三、核心组件解析:三种ThreadSafeVector实现

1. 互斥锁实现(#ifdef MUTEX

核心思想:通过互斥锁(std::mutex)保护所有对vector的操作,确保同一时间只有一个线程访问数据。

方法 实现逻辑
add(int) std::lock_guard加锁,向vector尾部添加元素(push_back),自动解锁。
size() 加锁后返回vector的大小。
vec() 返回内部vector的副本(注意:此方法未加锁,存在线程安全隐患!)。
type() 返回实现类型标识(“mutex”)。
优缺点
  • 优点:实现简单,逻辑直观,适用于低并发场景。
  • 缺点:所有操作都需要加锁,高并发下锁竞争会导致性能瓶颈(线程阻塞等待锁)。
  • 适用场景:线程数量少、写入频率低的场景。
2. 原子索引+预分配内存(#ifdef ATOM

核心思想:通过原子变量记录写入位置,大部分操作无锁;仅当内存不足时加锁扩容,减少锁竞争。

关键设计 细节说明
预分配内存 初始容量INIT_SIZE = 1024,避免频繁扩容。
原子索引index std::atomic<size_t>记录下一个可写入的位置,通过fetch_add(1)原子操作获取索引(无锁)。
扩容逻辑 当索引超过当前容量时,加锁并将容量翻倍(resize(data.size()*2)),再写入数据。
size() 加锁返回vector大小(实际容量,非已写入元素数)。
vec() 调整vector大小为已写入元素数(index值),返回副本。
优缺点
  • 优点:大部分写入操作无锁(仅原子操作),性能优于互斥锁,适合高并发写入。
  • 缺点:预分配可能导致内存浪费;扩容时仍需加锁(但扩容频率低)。
  • 适用场景:可预估数据量范围、高并发写入的场景。
3. 线程本地存储(#ifdef THREAD

核心思想:每个线程操作自己的本地vector,积累到一定数量后再合并到全局vector,完全避免写入时的锁竞争。

关键设计 细节说明
线程本地存储local_data thread_local定义,每个线程有独立副本,写入时无需锁(local_data.push_back)。
合并机制 当本地vector大小达到DATA_MERGE_LENGTH(100)时,加锁将本地数据合并到全局global_data,并清空本地数据。
析构函数 对象销毁时合并剩余本地数据到全局,避免数据丢失。
size() 加锁返回全局vector的大小。
优缺点
  • 优点:写入操作完全无锁(线程内操作),并发性能最优;锁仅在合并时使用。
  • 缺点:数据存在延迟(未合并时全局数据不完整);线程销毁/对象析构时需合并,可能有短暂阻塞。
  • 适用场景:允许数据短暂不一致、超高并发写入的场景(如日志收集、统计计数)。

四、使用方法(编译与运行)

通过编译时指定宏来选择实现和测试模式,例如:

# 测试互斥锁实现的功能正确性
g++ -DMUTEX -DSINGAL main.cpp -o test_mutex && ./test_mutex
# 测试原子索引实现的性能
g++ -DATOM -DTIME main.cpp -o test_atom && ./test_atom
# 测试线程本地存储实现的功能
g++ -DTHREAD -DSINGAL main.cpp -o test_thread && ./test_thread

五、总结与对比

实现方案 线程安全核心 优点 缺点 适用场景
互斥锁(MUTEX) std::mutex 简单可靠,数据实时一致 高并发下锁竞争严重,性能差 低并发、要求实时一致
原子索引(ATOM) std::atomic+锁 大部分操作无锁,性能较好 预分配内存可能浪费 高并发、数据量可预估
线程本地存储 thread_local+锁 写入完全无锁,性能最优 数据有合并延迟,非实时一致 超高并发、允许最终一致性

网站公告

今日签到

点亮在社区的每一天
去签到