C++学习:六个月从基础到就业——多线程编程:std::thread基础

发布于:2025-05-21 ⋅ 阅读:(14) ⋅ 点赞:(0)

C++学习:六个月从基础到就业——多线程编程:std::thread基础

本文是我C++学习之旅系列的第五十四篇技术文章,也是第四阶段"并发与高级主题"的第一篇,介绍C++11引入的多线程编程基础知识。查看完整系列目录了解更多内容。

引言

在现代计算机科学中,多线程编程已成为提高程序性能的关键技术。随着多核处理器的普及,有效利用并行计算能力变得日益重要。C++11标准引入了线程支持库,使C++开发者能够直接在语言层面进行多线程编程,无需依赖操作系统特定的API或第三方库。

本文将深入介绍C++11的std::thread类的基础知识,包括线程的创建、管理、参数传递、异常处理以及线程同步的基本概念。通过本文的学习,你将能够编写基本的多线程C++程序,为后续深入学习并发编程打下基础。

目录

多线程编程基础

并发与并行

在讨论多线程编程之前,我们需要理解两个基本概念:并发(Concurrency)和并行(Parallelism)。

并发是指程序的不同部分可以"同时"执行,但实际上可能是通过时间片轮转在单核处理器上交替执行。并发是一个程序结构概念,强调的是任务的独立性。

并行是指程序的不同部分真正同时执行,通常在多核处理器上。并行是一个执行概念,强调的是性能的提升。

#include <iostream>
#include <thread>

void printMessage(const std::string& message) {
    std::cout << message << std::endl;
}

int main() {
    // 创建两个线程,在多核处理器上可能并行执行
    std::thread t1(printMessage, "Hello from thread 1!");
    std::thread t2(printMessage, "Hello from thread 2!");
    
    // 等待线程完成
    t1.join();
    t2.join();
    
    return 0;
}

多线程的优势

多线程编程具有以下主要优势:

  1. 提高性能:通过并行处理,多线程可以更有效地利用多核处理器,加速计算密集型任务。

  2. 响应性增强:在用户界面应用中,使用独立线程处理耗时操作可以保持界面响应迅速。

  3. 资源利用率提高:当一个线程等待I/O操作完成时,其他线程可以继续执行,提高整体资源利用率。

  4. 简化复杂问题:某些问题在多线程模型下更容易表达和理解。

多线程的挑战

尽管多线程编程带来诸多优势,但也面临以下挑战:

  1. 同步问题:多线程访问共享资源需要适当同步,否则会导致数据竞争和不确定行为。

  2. 死锁风险:不当的线程同步可能导致死锁,使程序永久卡住。

  3. 调试困难:多线程程序的执行具有不确定性,使得调试更加复杂。

  4. 可伸缩性问题:创建过多线程会导致线程切换开销增加,反而降低性能。

  5. 设计复杂性:多线程程序的设计和实现通常比单线程程序更复杂。

std::thread类

C++11引入的std::thread类是C++标准库中进行多线程编程的核心组件。它封装了操作系统的线程API,提供了平台无关的线程管理功能。

线程的创建

创建线程的最基本方式是构造一个std::thread对象,并传递一个可调用对象(函数、函数对象或lambda表达式)作为线程函数:

#include <iostream>
#include <thread>

// 普通函数作为线程函数
void hello() {
    std::cout << "Hello from thread!" << std::endl;
}

int main() {
    // 创建线程,执行hello函数
    std::thread t(hello);
    
    // 等待线程完成
    t.join();
    
    std::cout << "Main thread continues execution." << std::endl;
    
    return 0;
}

线程创建后会立即开始执行,与主线程并发运行。在上面的例子中,主线程通过调用join()方法等待新线程完成。

函数对象与Lambda表达式

除了普通函数外,我们还可以使用函数对象和Lambda表达式作为线程函数:

#include <iostream>
#include <thread>

// 函数对象
class Task {
public:
    void operator()() const {
        std::cout << "Task is executing in thread." << std::endl;
    }
};

int main() {
    // 使用函数对象
    Task task;
    std::thread t1(task);
    t1.join();
    
    // 使用临时函数对象(需要额外的括号避免语法解析歧义)
    std::thread t2((Task()));  // 额外的括号
    t2.join();
    
    // 使用Lambda表达式
    std::thread t3([]() {
        std::cout << "Lambda is executing in thread." << std::endl;
    });
    t3.join();
    
    return 0;
}

注意,当使用临时函数对象时,需要额外的括号避免"最令人恐惧的解析"(most vexing parse)问题,否则编译器会将std::thread t2(Task());解释为一个函数声明,而不是对象定义。

成员函数作为线程函数

线程函数也可以是类的成员函数,但需要提供一个对象实例:

#include <iostream>
#include <thread>

class Counter {
private:
    int count = 0;
    
public:
    void increment(int times) {
        for (int i = 0; i < times; ++i) {
            ++count;
        }
        std::cout << "Final count: " << count << std::endl;
    }
};

int main() {
    Counter counter;
    
    // 创建线程执行成员函数
    std::thread t(&Counter::increment, &counter, 1000000);
    t.join();
    
    return 0;
}

在上面的例子中,我们传递了成员函数指针、对象指针和函数参数给std::thread构造函数。

线程的参数传递

基本参数传递

向线程函数传递参数非常简单,只需在std::thread构造函数中的线程函数参数后添加额外的参数:

#include <iostream>
#include <thread>
#include <string>

void printMessage(const std::string& message, int count) {
    for (int i = 0; i < count; ++i) {
        std::cout << message << " " << i << std::endl;
    }
}

int main() {
    // 传递两个参数给线程函数
    std::thread t(printMessage, "Message", 5);
    t.join();
    
    return 0;
}

需要注意的是,参数是以值传递的方式传给线程函数的,即使函数参数声明为引用类型。

引用参数的传递

如果要传递引用,需要使用std::refstd::cref包装器:

#include <iostream>
#include <thread>
#include <string>
#include <functional>  // 为std::ref和std::cref

void modifyString(std::string& str) {
    str += " - Modified by thread";
}

int main() {
    std::string message = "Original message";
    
    // 使用std::ref传递引用
    std::thread t(modifyString, std::ref(message));
    t.join();
    
    std::cout << "After thread: " << message << std::endl;
    
    return 0;
}

不使用std::ref的话,线程函数会收到message的一个副本,而不是引用,修改不会影响原始变量。

移动语义与线程

C++11的移动语义在线程参数传递中非常有用,尤其是对于不可复制但可移动的对象:

#include <iostream>
#include <thread>
#include <memory>
#include <vector>

void processUniquePtr(std::unique_ptr<int> ptr) {
    // 处理独占指针
    *ptr += 10;
    std::cout << "Value in thread: " << *ptr << std::endl;
}

int main() {
    // 创建一个独占指针
    auto ptr = std::make_unique<int>(42);
    
    // 使用std::move转移所有权到线程
    std::thread t(processUniquePtr, std::move(ptr));
    
    // 此时ptr为nullptr
    if (ptr == nullptr) {
        std::cout << "Original pointer is now nullptr" << std::endl;
    }
    
    t.join();
    
    return 0;
}

在上面的例子中,我们使用std::moveunique_ptr的所有权转移到线程函数中。这是必要的,因为unique_ptr不可复制,只能移动。

线程的生命周期管理

join操作

join()方法用于等待线程完成。调用线程会阻塞,直到目标线程执行完毕:

#include <iostream>
#include <thread>
#include <chrono>

void longTask() {
    // 模拟耗时任务
    std::cout << "Long task started" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Long task completed" << std::endl;
}

int main() {
    std::cout << "Main thread starting" << std::endl;
    
    std::thread t(longTask);
    
    std::cout << "Main thread waiting for worker thread..." << std::endl;
    t.join();  // 主线程阻塞,等待t完成
    
    std::cout << "Worker thread has completed. Main thread continues." << std::endl;
    
    return 0;
}

需要注意的是,一个线程只能被join()一次。尝试多次join()同一个线程会导致未定义行为,通常会抛出异常。

detach操作

detach()方法用于将线程与std::thread对象分离。分离后,线程会在后台独立运行,不再受std::thread对象的控制:

#include <iostream>
#include <thread>
#include <chrono>

void backgroundTask() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Background task completed" << std::endl;
}

int main() {
    {
        std::cout << "Creating a detached thread" << std::endl;
        std::thread t(backgroundTask);
        t.detach();  // 线程在后台运行,不等待它完成
        
        std::cout << "Thread detached, main thread continues..." << std::endl;
    }  // t销毁,但线程继续在后台运行
    
    // 睡眠足够长的时间,确保能看到后台线程的输出
    std::this_thread::sleep_for(std::chrono::seconds(3));
    
    std::cout << "Main thread ending" << std::endl;
    
    return 0;
}

使用detach()时需要特别小心:

  • 分离后无法再获取线程的控制权
  • 主线程结束时,即使后台线程还在运行,程序也会终止
  • 要确保线程访问的资源在线程运行期间保持有效

可连接状态检查

线程对象有两种状态:可连接(joinable)和不可连接(non-joinable)。只有处于可连接状态的线程才能被join()detach()

#include <iostream>
#include <thread>

void simpleTask() {
    std::cout << "Task executing..." << std::endl;
}

int main() {
    // 默认构造的线程对象是不可连接的
    std::thread t1;
    std::cout << "t1 joinable: " << t1.joinable() << std::endl;  // 输出:0
    
    // 初始化后的线程是可连接的
    std::thread t2(simpleTask);
    std::cout << "t2 joinable: " << t2.joinable() << std::endl;  // 输出:1
    
    // join后线程变为不可连接
    t2.join();
    std::cout << "After join, t2 joinable: " << t2.joinable() << std::endl;  // 输出:0
    
    // 创建另一个线程
    std::thread t3(simpleTask);
    std::cout << "t3 joinable: " << t3.joinable() << std::endl;  // 输出:1
    
    // detach后线程变为不可连接
    t3.detach();
    std::cout << "After detach, t3 joinable: " << t3.joinable() << std::endl;  // 输出:0
    
    return 0;
}

以下情况下线程是不可连接的:

  • 默认构造的std::thread对象
  • 已经被join()detach()的线程
  • 通过移动操作转移了所有权的线程

线程标识符与线程本地存储

获取线程ID

每个线程都有一个唯一的标识符,可以通过get_id()方法或std::this_thread::get_id()获取:

#include <iostream>
#include <thread>
#include <sstream>

// 打印当前线程ID的辅助函数
std::string getThreadIdString() {
    std::ostringstream oss;
    oss << std::this_thread::get_id();
    return oss.str();
}

void threadFunction() {
    std::cout << "Thread function running in thread " 
              << getThreadIdString() << std::endl;
}

int main() {
    std::cout << "Main thread ID: " << getThreadIdString() << std::endl;
    
    std::thread t(threadFunction);
    std::cout << "Created thread with ID: " << t.get_id() << std::endl;
    
    t.join();
    
    // join后,线程ID变为默认值
    std::cout << "After join, thread ID: " << t.get_id() << std::endl;
    
    return 0;
}

线程ID可用于识别和区分不同的线程,在调试和日志记录中特别有用。

线程本地存储

线程本地存储(Thread Local Storage, TLS)允许每个线程拥有变量的私有副本。C++11引入了thread_local关键字来声明线程局部变量:

#include <iostream>
#include <thread>
#include <string>

// 线程局部变量
thread_local int counter = 0;
thread_local std::string threadName = "Unknown";

void incrementCounter(const std::string& name) {
    threadName = name;  // 设置此线程的名称
    
    for (int i = 0; i < 5; ++i) {
        ++counter;  // 递增此线程的计数器
        std::cout << "Thread " << threadName << ": counter = " << counter << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    // 在主线程中访问
    threadName = "Main";
    std::cout << "Initial counter in main thread: " << counter << std::endl;
    
    // 创建两个线程,各自拥有counter的副本
    std::thread t1(incrementCounter, "Thread1");
    std::thread t2(incrementCounter, "Thread2");
    
    // 在主线程中递增counter
    for (int i = 0; i < 3; ++i) {
        ++counter;
        std::cout << "Thread " << threadName << ": counter = " << counter << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    
    t1.join();
    t2.join();
    
    // 主线程中的counter不受其他线程影响
    std::cout << "Final counter in main thread: " << counter << std::endl;
    
    return 0;
}

线程本地存储的使用场景:

  • 线程安全的单例模式
  • 每线程缓存
  • 线程特定的状态信息
  • 避免使用互斥量的简单线程隔离

线程与异常处理

线程函数中的异常

线程函数中抛出的异常不会传播到创建线程的上下文中。如果不在线程内部捕获异常,程序将调用std::terminate终止:

#include <iostream>
#include <thread>
#include <stdexcept>

void threadWithException() {
    try {
        std::cout << "Thread starting..." << std::endl;
        throw std::runtime_error("Exception in thread!");
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception in thread: " << e.what() << std::endl;
    }
}

void threadWithUncaughtException() {
    std::cout << "Thread starting..." << std::endl;
    throw std::runtime_error("Uncaught exception in thread!");
    // 这个异常不会被捕获,程序将终止
}

int main() {
    // 正确处理异常的线程
    std::thread t1(threadWithException);
    t1.join();
    
    std::cout << "After first thread" << std::endl;
    
    // 包含未捕获异常的线程 - 会导致程序终止
    // std::thread t2(threadWithUncaughtException);
    // t2.join();
    
    std::cout << "Main thread ending" << std::endl;
    
    return 0;
}

由于线程异常不会传播,正确的线程设计应在线程函数内部捕获和处理所有可能的异常。

RAII与线程管理

在C++中,我们常常使用RAII(Resource Acquisition Is Initialization)模式来确保资源的正确释放。对于线程管理,这一点也很重要,可以确保线程始终被正确地join()detach()

#include <iostream>
#include <thread>

// 线程包装器,实现RAII
class ThreadGuard {
private:
    std::thread& t;
    
public:
    // 构造函数接收线程引用
    explicit ThreadGuard(std::thread& t_) : t(t_) {}
    
    // 析构函数确保线程被join
    ~ThreadGuard() {
        if (t.joinable()) {
            t.join();
        }
    }
    
    // 禁止复制和赋值
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void someFunction() {
    std::cout << "Thread function executing..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Thread function completed." << std::endl;
}

int main() {
    try {
        std::thread t(someFunction);
        ThreadGuard guard(t);  // RAII包装器确保t被join
        
        // 模拟异常
        // throw std::runtime_error("Simulated exception");
        
        std::cout << "Main thread continuing..." << std::endl;
    }
    catch (const std::exception& e) {
        std::cout << "Exception caught: " << e.what() << std::endl;
    }
    
    std::cout << "Main thread exiting safely." << std::endl;
    return 0;
}

C++17引入了std::jthread类,它是std::thread的改进版本,自动实现了RAII模式,并提供了取消线程的能力。在C++20中,它已成为标准的一部分。

实际应用案例

并行计算示例

以下是一个使用多线程并行计算向量点积的例子:

#include <iostream>
#include <vector>
#include <thread>
#include <numeric>
#include <functional>
#include <future>

// 计算部分点积
double partialDotProduct(const std::vector<double>& v1, 
                         const std::vector<double>& v2,
                         size_t start, size_t end) {
    return std::inner_product(v1.begin() + start, v1.begin() + end,
                              v2.begin() + start, 0.0);
}

// 并行计算点积
double parallelDotProduct(const std::vector<double>& v1,
                          const std::vector<double>& v2,
                          unsigned numThreads) {
    std::vector<std::future<double>> futures(numThreads);
    std::vector<std::thread> threads(numThreads);
    
    // 计算每个线程处理的元素数量
    size_t length = v1.size();
    size_t blockSize = length / numThreads;
    
    // 启动线程
    for (unsigned i = 0; i < numThreads; ++i) {
        // 计算当前线程处理的范围
        size_t start = i * blockSize;
        size_t end = (i == numThreads - 1) ? length : (i + 1) * blockSize;
        
        // 创建promise和future
        std::promise<double> promise;
        futures[i] = promise.get_future();
        
        // 创建线程
        threads[i] = std::thread(
            [&v1, &v2, start, end, promise = std::move(promise)]() mutable {
                double result = partialDotProduct(v1, v2, start, end);
                promise.set_value(result);
            }
        );
    }
    
    // 等待所有线程完成并获取结果
    double result = 0.0;
    for (unsigned i = 0; i < numThreads; ++i) {
        threads[i].join();
        result += futures[i].get();
    }
    
    return result;
}

int main() {
    // 创建两个测试向量
    std::vector<double> v1(1'000'000, 1.0);
    std::vector<double> v2(1'000'000, 2.0);
    
    // 单线程计算
    auto start = std::chrono::high_resolution_clock::now();
    double singleThreadResult = std::inner_product(v1.begin(), v1.end(), v2.begin(), 0.0);
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> singleThreadTime = end - start;
    
    // 多线程计算
    start = std::chrono::high_resolution_clock::now();
    unsigned numThreads = std::thread::hardware_concurrency();  // 获取CPU核心数
    double multiThreadResult = parallelDotProduct(v1, v2, numThreads);
    end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> multiThreadTime = end - start;
    
    // 输出结果
    std::cout << "Single thread result: " << singleThreadResult 
              << " (Time: " << singleThreadTime.count() << "ms)" << std::endl;
    std::cout << "Multi thread result: " << multiThreadResult 
              << " (Time: " << multiThreadTime.count() << "ms)" << std::endl;
    std::cout << "Speedup: " << singleThreadTime.count() / multiThreadTime.count()
              << "x" << std::endl;
    
    return 0;
}

在这个例子中,我们将大向量分成多个块,由不同线程计算部分点积,然后汇总结果。在多核处理器上,这种并行计算通常能显著提高性能。

后台任务处理

多线程也常用于执行不应阻塞主线程的后台任务,如下载、IO操作等:

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>

// 线程安全的任务队列
template<typename T>
class TaskQueue {
private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;
    std::atomic<bool> quit_{false};
    
public:
    // 添加任务到队列
    void push(T item) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.push(std::move(item));
        }
        cond_.notify_one();  // 通知一个等待线程
    }
    
    // 从队列获取任务
    bool pop(T& item) {
        std::unique_lock<std::mutex> lock(mutex_);
        
        // 等待直到队列有元素或收到退出信号
        cond_.wait(lock, [this] { 
            return !queue_.empty() || quit_; 
        });
        
        // 如果是退出信号且队列为空,返回false
        if (queue_.empty()) return false;
        
        item = std::move(queue_.front());
        queue_.pop();
        return true;
    }
    
    // 设置退出信号
    void quit() {
        quit_ = true;
        cond_.notify_all();  // 通知所有等待线程
    }
    
    // 检查队列是否为空
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.empty();
    }
};

// 模拟文件下载任务
void downloadFile(const std::string& url) {
    std::cout << "Downloading: " << url << "..." << std::endl;
    
    // 模拟下载时间
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    std::cout << "Download completed: " << url << std::endl;
}

// 后台下载线程函数
void downloadWorker(TaskQueue<std::string>& taskQueue) {
    std::string url;
    
    // 循环处理队列中的任务
    while (taskQueue.pop(url)) {
        downloadFile(url);
    }
    
    std::cout << "Download worker exiting..." << std::endl;
}

int main() {
    TaskQueue<std::string> downloadQueue;
    
    // 创建后台工作线程
    std::thread workerThread(downloadWorker, std::ref(downloadQueue));
    
    // 添加下载任务
    downloadQueue.push("http://example.com/file1.zip");
    downloadQueue.push("http://example.com/file2.zip");
    downloadQueue.push("http://example.com/file3.zip");
    
    // 模拟主线程其他工作
    for (int i = 0; i < 5; ++i) {
        std::cout << "Main thread doing other work..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    
    // 添加更多任务
    downloadQueue.push("http://example.com/file4.zip");
    downloadQueue.push("http://example.com/file5.zip");
    
    // 等待所有任务完成
    while (!downloadQueue.empty()) {
        std::cout << "Waiting for downloads to complete..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    
    // 发送退出信号并等待工作线程结束
    downloadQueue.quit();
    workerThread.join();
    
    std::cout << "Main thread exiting." << std::endl;
    
    return 0;
}

这个示例实现了一个简单的后台任务处理系统,主线程可以向队列添加任务,而工作线程在后台处理这些任务。这种模式在GUI应用、服务器程序等场景中很常见。

用户界面响应性改进

多线程可以显著提高用户界面的响应性。下面是一个简化的示例,演示如何在后台线程执行耗时操作,同时保持主线程响应用户输入:

#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <mutex>

// 模拟耗时计算
void heavyComputation(std::atomic<double>& progress, 
                      std::atomic<bool>& shouldStop) {
    for (int i = 0; i <= 100; ++i) {
        // 检查是否应该停止
        if (shouldStop) {
            std::cout << "Computation cancelled!" << std::endl;
            return;
        }
        
        // 执行"计算"
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        // 更新进度
        progress = i;
    }
    
    std::cout << "Computation completed successfully!" << std::endl;
}

// 显示进度的线程
void displayProgress(const std::atomic<double>& progress, 
                    const std::atomic<bool>& shouldStop) {
    while (!shouldStop && progress < 100) {
        std::cout << "Progress: " << progress << "%" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

int main() {
    std::atomic<double> progress(0);
    std::atomic<bool> shouldStop(false);
    
    std::cout << "Starting heavy computation..." << std::endl;
    std::cout << "Press 'c' to cancel or any other key to check progress." << std::endl;
    
    // 启动计算线程
    std::thread computationThread(heavyComputation, std::ref(progress), std::ref(shouldStop));
    
    // 启动显示进度的线程
    std::thread displayThread(displayProgress, std::ref(progress), std::ref(shouldStop));
    
    // 主线程处理用户输入
    char input;
    while (progress < 100 && !shouldStop) {
        input = std::cin.get();
        if (input == 'c' || input == 'C') {
            std::cout << "Cancellation requested." << std::endl;
            shouldStop = true;
        } else {
            std::cout << "Current progress: " << progress << "%" << std::endl;
        }
    }
    
    // 等待线程完成
    computationThread.join();
    displayThread.join();
    
    std::cout << "Program exiting." << std::endl;
    
    return 0;
}

在这个示例中,我们创建了两个线程:一个执行耗时计算,另一个定期显示进度。同时,主线程保持响应用户输入,允许用户随时取消计算。这种模式可以容易地扩展到实际的GUI应用程序中。

常见问题与注意事项

竞态条件

当多个线程同时访问共享数据,并且至少有一个线程修改数据时,就会发生竞态条件(Race Condition):

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

// 全局计数器
int counter = 0;
std::mutex counterMutex;  // 保护counter的互斥量

// 不安全的递增函数 - 存在竞态条件
void incrementUnsafe(int numTimes) {
    for (int i = 0; i < numTimes; ++i) {
        ++counter;  // 竞态条件!
    }
}

// 安全的递增函数 - 使用互斥量
void incrementSafe(int numTimes) {
    for (int i = 0; i < numTimes; ++i) {
        std::lock_guard<std::mutex> lock(counterMutex);
        ++counter;  // 受互斥量保护
    }
}

int main() {
    int numThreads = 10;
    int incrementsPerThread = 100000;
    
    // 测试不安全的版本
    counter = 0;
    std::vector<std::thread> unsafeThreads;
    
    for (int i = 0; i < numThreads; ++i) {
        unsafeThreads.emplace_back(incrementUnsafe, incrementsPerThread);
    }
    
    for (auto& t : unsafeThreads) {
        t.join();
    }
    
    std::cout << "Unsafe counter value: " << counter 
              << " (Expected: " << numThreads * incrementsPerThread << ")" << std::endl;
    
    // 测试安全的版本
    counter = 0;
    std::vector<std::thread> safeThreads;
    
    for (int i = 0; i < numThreads; ++i) {
        safeThreads.emplace_back(incrementSafe, incrementsPerThread);
    }
    
    for (auto& t : safeThreads) {
        t.join();
    }
    
    std::cout << "Safe counter value: " << counter 
              << " (Expected: " << numThreads * incrementsPerThread << ")" << std::endl;
    
    return 0;
}

在不安全版本中,多个线程可能同时读取counter的值,增加它,然后写回,这可能导致某些递增操作被覆盖。安全版本使用互斥量确保每次只有一个线程可以修改counter,从而避免竞态条件。

死锁与活锁

死锁(Deadlock)是指两个或多个线程互相等待对方持有的资源,导致所有线程都无法继续执行:

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::mutex mutexA;
std::mutex mutexB;

// 可能导致死锁的函数
void deadlockFunction1() {
    std::cout << "Thread 1 trying to lock mutexA..." << std::endl;
    std::lock_guard<std::mutex> lockA(mutexA);
    std::cout << "Thread 1 locked mutexA" << std::endl;
    
    // 添加延迟增加死锁可能性
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    std::cout << "Thread 1 trying to lock mutexB..." << std::endl;
    std::lock_guard<std::mutex> lockB(mutexB);
    std::cout << "Thread 1 locked mutexB" << std::endl;
    
    std::cout << "Thread 1 releasing both locks" << std::endl;
}

void deadlockFunction2() {
    std::cout << "Thread 2 trying to lock mutexB..." << std::endl;
    std::lock_guard<std::mutex> lockB(mutexB);
    std::cout << "Thread 2 locked mutexB" << std::endl;
    
    // 添加延迟增加死锁可能性
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    std::cout << "Thread 2 trying to lock mutexA..." << std::endl;
    std::lock_guard<std::mutex> lockA(mutexA);
    std::cout << "Thread 2 locked mutexA" << std::endl;
    
    std::cout << "Thread 2 releasing both locks" << std::endl;
}

// 安全版本,使用std::lock防止死锁
void noDeadlockFunction1() {
    std::cout << "Safe Thread 1 trying to lock both mutexes..." << std::endl;
    std::scoped_lock lock(mutexA, mutexB);  // C++17的std::scoped_lock
    std::cout << "Safe Thread 1 locked both mutexes" << std::endl;
    
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    std::cout << "Safe Thread 1 releasing both locks" << std::endl;
}

void noDeadlockFunction2() {
    std::cout << "Safe Thread 2 trying to lock both mutexes..." << std::endl;
    std::scoped_lock lock(mutexB, mutexA);  // 注意顺序不同,但不会导致死锁
    std::cout << "Safe Thread 2 locked both mutexes" << std::endl;
    
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    
    std::cout << "Safe Thread 2 releasing both locks" << std::endl;
}

int main() {
    // 示范死锁(注意:这会使程序卡住)
    std::cout << "Demonstrating deadlock (program will hang):" << std::endl;
    /*
    std::thread t1(deadlockFunction1);
    std::thread t2(deadlockFunction2);
    t1.join();
    t2.join();
    */
    
    // 展示避免死锁的方法
    std::cout << "\nDemonstrating deadlock prevention:" << std::endl;
    std::thread t3(noDeadlockFunction1);
    std::thread t4(noDeadlockFunction2);
    t3.join();
    t4.join();
    
    return 0;
}

为避免死锁:

  1. 始终以相同顺序锁定多个互斥量
  2. 使用std::lockstd::scoped_lock同时锁定多个互斥量
  3. 避免在持有锁时调用用户代码(可能会尝试获取其他锁)
  4. 使用层次锁定,为每个互斥量分配层级,只允许按层级顺序锁定

活锁(Livelock)类似于死锁,但线程并非阻塞等待,而是持续尝试某个无法完成的操作,导致CPU资源被消耗而无进展。

线程数量的选择

选择适当的线程数量对于优化性能至关重要:

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <numeric>
#include <algorithm>

// 线程数量性能测试函数
void threadCountBenchmark(const std::vector<int>& data) {
    // 最大线程数为硬件并发线程数(通常是CPU核心数)
    unsigned int maxThreads = std::thread::hardware_concurrency();
    std::cout << "Hardware concurrency: " << maxThreads << " threads" << std::endl;
    
    // 测试不同线程数量
    for (unsigned int numThreads = 1; numThreads <= maxThreads * 2; numThreads += std::max(1u, maxThreads / 4)) {
        // 计算每个线程处理的元素数
        size_t blockSize = data.size() / numThreads;
        
        auto start = std::chrono::high_resolution_clock::now();
        
        std::vector<std::thread> threads;
        std::vector<long long> partialSums(numThreads);
        
        // 创建线程
        for (unsigned int i = 0; i < numThreads; ++i) {
            size_t startIdx = i * blockSize;
            size_t endIdx = (i == numThreads - 1) ? data.size() : (i + 1) * blockSize;
            
            threads.emplace_back([&data, &partialSums, i, startIdx, endIdx](){
                // 模拟计算密集型任务
                long long sum = 0;
                for (size_t j = startIdx; j < endIdx; ++j) {
                    sum += data[j] * data[j];  // 计算平方和
                }
                partialSums[i] = sum;
            });
        }
        
        // 等待所有线程完成
        for (auto& t : threads) {
            t.join();
        }
        
        // 合并结果
        long long totalSum = std::accumulate(partialSums.begin(), partialSums.end(), 0LL);
        
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
        
        std::cout << "Threads: " << numThreads 
                  << ", Time: " << duration << "ms"
                  << ", Result: " << totalSum << std::endl;
    }
}

int main() {
    // 创建大量数据
    const size_t dataSize = 100'000'000;
    std::vector<int> data(dataSize);
    for (size_t i = 0; i < dataSize; ++i) {
        data[i] = i % 100;  // 简单模式
    }
    
    // 运行基准测试
    threadCountBenchmark(data);
    
    return 0;
}

选择线程数量的一般指南:

  1. 对于计算密集型任务,线程数接近或等于CPU核心数通常是最优的
  2. 对于IO密集型任务,线程数可以超过CPU核心数,因为线程经常处于等待状态
  3. 避免创建过多线程,这会增加线程切换开销
  4. 考虑使用线程池来控制线程数量和重用线程

调试多线程程序

多线程程序的调试比单线程程序更具挑战性,主要原因在于线程执行顺序的不确定性。以下是一些有用的调试技巧:

  1. 使用线程ID标记日志
#include <iostream>
#include <thread>
#include <sstream>
#include <iomanip>
#include <mutex>

std::mutex logMutex;  // 保护日志输出的互斥量

// 带线程ID的日志记录函数
void log(const std::string& message) {
    std::lock_guard<std::mutex> lock(logMutex);
    std::ostringstream tid;
    tid << std::this_thread::get_id();
    std::cout << "[Thread " << std::setw(5) << tid.str() << "] " << message << std::endl;
}

void workerFunction(int id) {
    log("Worker " + std::to_string(id) + " starting");
    std::this_thread::sleep_for(std::chrono::milliseconds(id * 100));
    log("Worker " + std::to_string(id) + " step 1");
    std::this_thread::sleep_for(std::chrono::milliseconds(id * 50));
    log("Worker " + std::to_string(id) + " finishing");
}

int main() {
    log("Main thread starting");
    
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(workerFunction, i + 1);
    }
    
    log("All workers started");
    
    for (auto& t : threads) {
        t.join();
    }
    
    log("All workers completed");
    return 0;
}
  1. 使用调试器的线程窗口:现代调试器如Visual Studio、GDB和LLDB都提供了线程窗口,可以查看所有线程的状态并在线程之间切换。

  2. 使用条件编译的调试帮助器:在关键点添加调试信息。

  3. 记录时间戳:在日志中添加时间戳,帮助分析事件顺序。

  4. 使用原子操作进行计数和检查:使用原子变量跟踪关键状态转换。

  5. 使用线程分析工具:如Intel Thread Checker、Valgrind的DRD和Helgrind工具等。

总结

在这篇文章中,我们介绍了C++11的std::thread类及其基本用法,包括线程的创建、参数传递、生命周期管理以及常见问题。多线程编程是现代C++开发中不可或缺的一部分,掌握这些基础知识将为你构建高性能、响应迅速的应用程序奠定基础。

主要要点回顾:

  1. 线程创建与基本操作:使用std::thread创建线程,传递函数、函数对象或lambda表达式作为线程函数。

  2. 参数传递:使用值传递、std::ref引用传递或移动语义传递参数到线程函数。

  3. 线程管理:使用join()等待线程完成或detach()允许线程在后台运行。

  4. 线程本地存储:使用thread_local关键字创建线程私有的变量。

  5. 异常处理:线程函数中的异常必须在线程内部捕获,否则程序将终止。

  6. 线程安全问题:了解竞态条件、死锁等多线程编程常见问题,以及防范措施。

  7. 实际应用:使用多线程可以提高计算性能、改善用户界面响应性、实现后台任务处理等。

然而,本文只是多线程编程的开始。在接下来的文章中,我们将深入探讨更多高级主题,如互斥量、锁、条件变量等同步原语,它们对于构建线程安全的数据结构和算法至关重要。


这是我C++学习之旅系列的第五十四篇技术文章。查看完整系列目录了解更多内容。


网站公告

今日签到

点亮在社区的每一天
去签到