C++的线程管理
线程类(Thread)
执行线程是一个指令序列,它可以在多线程环境中,与其他此类指令序列同时执行,同时共享相同的地址空间。
一个初始化的线程对象代表一个活动的执行线程;这样的线程对象是可连接的,并且具有唯一的线程ID。
默认构造的(未初始化的)线程对象是不可连接的,并且它的线程 id 对于所有不可连接的线程都是通用的。
如果从可连接线程移出,或者对它们调用 join 或 detach,则该线程将变得不可连接。
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
void foo()
{
sleep(10); // sleep 10 seconds
cout << "I'm foo, awake now" << endl;
}
void bar(int x)
{
sleep(5); // sleep 10 seconds
cout << "I'm bar, awake now" << endl;
}
int main()
{
thread T1 (foo); // spawn new thread that calls foo()
thread T2 (bar,0); // spawn new thread that calls bar(0)
cout << "main, foo and bar now execute concurrently...\n";
// synchronize threads:
T1.join(); // pauses until first finishes
T2.join(); // pauses until second finishes
cout << "foo and bar completed.\n";
return 0;
}
程序运行屏幕输出
main, foo and bar now execute concurrently...
I'm bar, awake now
I'm foo, awake now
foo and bar completed.
线程构造器
- thread() noexcept;
- template <class Fn, class... Args>explicit thread (Fn&& fn, Args&&... args);
- thread (const thread&) = delete;
- thread (thread&& x) noexcept;
约定构造器
thread() noexcept;
构造一个线程对象, 它不包含任何执行线程。
初始化构造器
template <class Fn, class... Args>explicit thread (Fn&& fn, Args&&... args);
构造一个线程对象,它拥有一个可连接执行线程。
新的执行线程调用 fn, 并传递 args 作为参数。
此构造的完成与 fn 的副本开始运行同步。
#include <chrono>
#include <iostream>
#include <thread>
#include <utility>
using namespace std;
void f1(int n)
{
for (int i = 0; i < 5; ++i)
{
cout << "Thread 1 executing\n";
++n;
this_thread::sleep_for(chrono::milliseconds(10));
}
}
void f2(int& n, int sz)
{
for (int i = 0; i < sz; ++i)
{
cout << "Thread 2 executing\n";
++n;
this_thread::sleep_for(chrono::milliseconds(10));
}
}
int main()
{
int n = 0;
thread t2(f1, n + 1); // pass by value
thread t3(f2, ref(n), 6); // pass by reference
t2.join();
t3.join();
cout << "Final value of n is " << n << '\n';
}
程序运行屏幕输出
Thread 1 executing
Thread 2 executing
Thread 1 executing
Thread 2 executing
Thread 2 executing
Thread 1 executing
Thread 1 executing
Thread 2 executing
Thread 1 executing
Thread 2 executing
Thread 2 executing
Final value of n is 6
复制构造器
thread (const thread&) = delete;
删除构造函数,线程对象不能复制。
移动构造器
thread (thread&& x) noexcept;
构造一个线程对象,该对象获取 x 表示的执行线程(如果有)。此操作不会以任何方式影响移动线程的执行,它只是传输其处理程序。
x 对象不再代表任何执行线程。
#include <chrono>
#include <iostream>
#include <thread>
#include <utility>
#include <unistd.h>
using namespace std;
void f2(int& n)
{
thread::id this_id = this_thread::get_id();
cout << "Thread " << this_id << " executing" << endl;
for (int i = 0; i < 5; ++i)
{
++n;
this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int main()
{
int n = 0;
thread t3(f2, ref(n));
thread t4(move(t3));
t4.join();
cout << "Final value of n is " << n << '\n';
}
程序运行屏幕输出
Thread 140291256411904 executing
Final value of n is 5
多线程
atomic
atomic类型是封装值的类型,保证其访问不会导致数据争用,并且可用于同步不同线程之间的内存访问。
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <random>
using namespace std;
atomic<bool> ready (false);
atomic_flag winner = ATOMIC_FLAG_INIT;
void count1m (int id) {
random_device dev;
mt19937 rng(dev());
uniform_int_distribution<mt19937::result_type> dist6(50,100); // distribution in range [1, 6]
while (!ready) {
this_thread::yield();
}
int val = dist6(rng);
this_thread::sleep_for(chrono::milliseconds(val));
if (!winner.test_and_set()) {
cout << "thread #" << id << " won!\n";
}
}
int main ()
{
vector<thread> threads;
cout << "5 threads compete...\n";
for (int i=1; i<=5; ++i)
threads.push_back(thread(count1m,i));
ready = true;
for (auto& th : threads) th.join();
return 0;
}
程序运行2次,屏幕输出
threads$ ./atomic
5 threads compete...
thread #3 won!
threads$ ./atomic
5 threads compete...
thread #4 won!
condition_variable
条件变量是一个能够阻塞调用线程,直到通知恢复的对象。
当调用其等待函数之一时,它使用 unique_lock(通过互斥锁)来锁定线程。该线程将保持阻塞状态,直到被另一个对同一 condition_variable 对象调用通知函数的线程唤醒。
Condition_variable 类型的对象始终使用 unique_lock 进行等待。
应用实列
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable cv;
bool ready = false;
void wait_init_ready (int id) {
unique_lock<mutex> lck(mtx);
cout << "Init " << id << " start ..." << endl;
while (!ready)
cv.wait(lck);
cout << "Init " << id << " done !!!" << '\n';
}
void init_complete() {
unique_lock<mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main ()
{
vector<thread> threads;
for (int i=0; i<5; ++i)
threads.push_back(thread(wait_init_ready, i));
init_complete();
for (auto& th : threads) th.join();
return 0;
}
程序运行屏幕输出
Init 0 start ...
Init 1 start ...
Init 3 start ...
Init 3 done !!!
Init 1 done !!!
Init 4 start ...
Init 4 done !!!
Init 0 done !!!
Init 2 start ...
Init 2 done !!!
future
具有允许异步访问特定提供程序(可能在不同线程中)设置的值的功能的标头。
这些提供者中的每一个(要么是promise或packaged_task对象,要么是对async的调用)与未来对象共享对共享状态的访问:提供者使共享状态准备就绪的点与未来对象访问共享状态的点同步状态。
promise
Promise 是一个对象,它可以存储类型 T 的值,以便将来的对象(可能在另一个线程中)检索,从而提供同步点。
在构造时,Promise 对象与一个新的共享状态相关联,它们可以在该状态上存储类型 T 的值或从 std::exception 派生的异常。
通过调用成员 get_future,可以将该共享状态关联到未来对象。调用后,两个对象共享相同的共享状态:
- Promise 对象是异步提供者,预计会在某个时刻为共享状态设置一个值。
- future 对象是一个异步返回对象,可以检索共享状态的值,并在必要时等待它准备好。
共享状态的生命周期至少持续到与其关联的最后一个对象释放它或被销毁为止。因此,如果也与 future 相关联,它可以在最初获得它的 Promise 对象中存活下来。
应用实列
#include <iostream>
#include <functional>
#include <thread>
#include <future>
using namespace std;
struct data_pkt {
int id;
uint8_t data[20];
};
void wait_new_value (future<data_pkt>& fut) {
data_pkt x = fut.get();
cout << "value: " << x.id << '\n';
}
int main ()
{
data_pkt pkt;
promise<data_pkt> prom; // create promise
future<data_pkt> fut = prom.get_future(); // engagement with future
thread th1 (wait_new_value, ref(fut)); // send future to new thread
pkt.id = 1;
prom.set_value (pkt); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
程序运行屏幕输出
value: 1
future
future 是一个可以从某些提供程序对象或函数检索值的对象,如果在不同的线程中,则可以正确同步此访问。
“有效”未来是与共享状态关联的未来对象,并通过调用以下函数之一来构造:
异步
承诺::get_future
打包任务::获取未来
future 对象仅在有效时才有用。默认构造的未来对象无效(除非移动分配了有效的未来)。
在有效的 future 上调用 future::get 会阻塞线程,直到提供者使共享状态准备就绪(通过设置值或异常)。这样,两个线程可以通过一个等待另一个线程设置值来同步。
共享状态的生命周期至少持续到与其关联的最后一个对象释放它或被销毁为止。因此,如果与未来相关联,共享状态可以在最初获取它的对象(如果有)中继续存在。
应用实列
#include <iostream>
#include <future>
#include <chrono>
#include <signal.h>
using namespace std;
bool ready = false;
mutex mtx;
condition_variable cv;
struct data_pkt {
int code;
uint8_t data[32];
};
void term(int signum)
{
if (signum == SIGINT)
{
printf("Received SIGINT(ctrl+c), exiting ... \n");
unique_lock<mutex> lck(mtx);
ready = true;
cv.notify_all();
}
else
{
time_t mytime = time(0);
printf("%d: %s\n", signum, asctime(localtime(&mytime)));
printf("%d\n",signum);
}
}
bool async_promise (data_pkt &pkt) {
cout << "async_promise start ..." << endl;
struct sigaction act;
act.sa_handler = term;
sigaction(SIGQUIT, &act, NULL);
sigaction(SIGINT, &act, NULL);
unique_lock<mutex> lck(mtx);
while (!ready)
cv.wait(lck);
cout << "async_promise condition variable ready" << endl;
pkt.code = 1900;
return true;
}
int main ()
{
data_pkt pkt;
// call function asynchronously:
future<bool> fut = async (async_promise, ref(pkt));
// do something while waiting for function to set future:
cout << "checking, please wait";
chrono::milliseconds span (100);
while (fut.wait_for(span) == future_status::timeout)
cout << '.' << flush;
bool x = fut.get(); // retrieve return value
cout << pkt.code << endl;
return 0;
}
checking, please waitasync_promise start ...
............................^CReceived SIGINT(ctrl+c), exiting ...
async_promise condition variable ready
1900
函数模板 std::async 异步运行函数 f ,可能在一个单独的线程中,该线程可能是线程池的一部分,并返回一个 std::future ,它最终将保存该函数调用的结果。