线程的创建和使用

​ 线程库中创建多线程的库是thread库,基本的线程创建是使用std::thread,如果创建了线程但是没有等待线程结束就会报错,此时有两种方式,第一种使用join()方法让主线程等待子线程完成工作。第二个就是分离子线程使其单独在后台运行,方法为detach()。当整个进程结束时整个子线程的资源才会被跟着回收

​ 其中每个线程只能使用一次join,如果多次使用就会抛出异常,其中方法joinable()就是判断是否可以进行join或者detach()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <thread>
#include <string>


void printStr(std::string str) {
std::cout << str << std::endl;
}

int main() {


// 创建线程
std::thread thread1(printStr,"Hello_world");

//分离线程(主进程结束后子线程不会报错但是也会被强制结束)
//thread1.detach();

//主线程等待子线程结束
//thread1.join();

//返回bool判断是否可以调用join或者detach
//join只能使用一次,再次使用会报错
bool isjoin = thread1.joinable();
if (isjoin)
thread1.join();//或者thread1.detach();
return 0;
}

线程中的无定义行为(Undefined Behavior,UB)

cpp程序中无定义行为(UB)是一种非常严重的程序错误其中主要是两个方面的原因:

1. 对象生命周期的规划
1. 指针失效

第一点中其中对象生命周期的规划主要是作用域中体现,如当一个局部变量在所在作用域执行完后其生命周期就结束了,结束后其原来的占用地址内存被回收,那一块内存就不再是原来的内容了。

第二点中是指针的失效,即原来的指针在解引用后如果再次访问的话就造成了UB,此时会有以下几个情况

  • 程序继续运行,但是读取的内容是垃圾值无效数值。
  • 触发系统保护,抛出SIGSEGV。
  • 彻底优化掉本次访问,直接相当于略过。

那么再加上线程运行后会把这个问题放大,当主线程执行完毕后很多资源被释放掉后子线程就及其容易进入UB,本身在线程中执行时间执行级别会被放大,其问题发生概率也会被直接放大。

在整个程序周期中有以下几点可以尽量避免程序触发UB:

  1. 延长生命周期
  2. 主线程等待,尽量使用join避免使用或者少使用detach
  3. 将需要使用的内容直接拷贝而非直接引用

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>


void func1(int& x){
x+=1;
}

void func2(){
int a = 1;
std::thread t(func1,std::ref(a));//同样触发UB,在线程启动后a的资源就被释放了
}

void func3(int*x){
std::cout << *x << std::endl;
}

int main(){
int a = 1;
std:;thread t1(func1,a);//这里会报错或者会出
//如果想传递这个参数需要使用std::ref打包,如下:
std::thread t2(func1,std::ref);

/*
这个示例现实的值并非我们传入的1,也可能崩溃,也有概率会输出1,看是否是在delete之前就把指令执行完了
*/
int* s = new int(1);
std::thread t3(func3,s);
delete s;

t1.join();
t2.join();
t3.join();
return 0;
}

类对象UB示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>
#include <memory>

class MyClass{
public:
void func(){
std::cout << 'Thread' <<std::this_thread::get_id()
<< "shared" <, std::endl;
std::cout << "Thread" << std::this_thread::get_id()
<<"finished" << std::endl;

}
};

int main(){
std::shared_ptr<MyClass> obj = std::make_shared<MyClass>();
std::thread t(&MyClass::func,*obj);
t.join();//与上述例子其实都差不多
return 0;
}

可以使用智能指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
#include <memory>

class Cls{
public:
void foo(){
std::cout << "Hello" << std::endl;
}
}

int main(){
std::shared_ptr<A> a = std::make_shared<A>();
std::thread t(&a::foo,a);
return 0;
}


互斥锁(量)解决多线程共享问题

数据共享问题

​ 在多个线程中共享数据时需要注意线程安全问题,如果多个线程同时访问一个变量并且至少其中一个线程对其进行了写操作,那么就会出现数据竞争问题,数据竞争问题可能会导致程序崩溃、产生未定义结果、或者得到错误的结果。

​ 为了避免数据竞争问题,需要有同步机制来确保多个线程之间共享数据的安全。常见的同步机制包括互斥量,条件变量,原子操作等。

以下为简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>

void funcadd() {
for (int i = 0; i < 1000; i++) {
a += 1;
}
}


int main(){

std::thread t1(funcadd);
std::thread t2(funcadd);

t1.join();
t2.join();

std::cout << a << std::endl;

return 0;
}

运行以上代码后基本不会得到预期的2000数值且每次运行结果都很可能会不一样

正确写法是在funcadd中加一个锁,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int a = 0;

void funcadd() {
for (int i = 0; i < 1000; i++) {
mtx.lock();
a += 1;
mtx.unlock();
}
}

根据运行结果可知,只要当多线程运行的数值结果能与单线程运行时得到相同的结果时这个多线程运行就是安全的。


死锁

值得注意的是在锁的操作中还有个死锁的情况

场景:

  • 线程A 需要操作lk1 和lk2
  • 线程B需要操作lk2 和lk1

操作:

​ 让程序直接运行这两个线程就会出现死锁的状态。

原因:

​ 当线程A,B执行后由于两者是并行运行,所以会有线程A拿到lk1的所有权,线程B拿到lk2的所有权,当线程A准备申请lk2的所有权时因为在B手中所以陷入等待。同样,线程B也在等待线程A,于是两者就永远等待下去了。就是死锁。

场景复现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#incldue <thread>
#include <mutex>

std::mutex m1,m2;

void func1() {
m1.lock();
std::this_thread::sleep_for(std::chrono::seconds(2));
m2.lock();
m2.unlock();
m1.unlock();
}

void func2() {
m2.lock();
std::this_thread::sleep_for(std::chrono::seconds(2));
m1.lock();
m1.unlock();
m2.unlock();
}

int main(){
std::thread t1(func1);
std::thread t2(func2);

t1.join();
t2.join();

std::cout << "执行完成" << std::endl;
return 0;
}

在以上代码中两个锁永远没有释放,双方一直在等待对方解开锁就造成了死锁


模板锁的用法

有两个主要的模板锁分别是std::lock_guardstd::unique_lock这两者各有自己的不同点主要为:

  • lock_guard:用于保护数据共享防止多个线程同时访问同一资源而导致的数据竞争问题。
  1. 当构造函数被调用时,该互斥锁会被自动锁定。
  2. 当析构函数被调用时,该互斥锁会自动解除锁定。
  3. std::lock_guard对象不能被复制或者移动,因此他只能在局部作用域中使用。

用法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int sum =0;

void func(){
for(int i =0 ;i<50;i++){
std::lock_guard<std::mutex> lk(mtx);
sum++;
}
}

int main(){
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
return 0;
}
  • unique_lock:用于在多线程程序中对互斥量进行加锁和解锁。主要是可以对互斥量进行更为灵活的管理,包括延迟加载、条件变量、超时等。

其提供了几个成员方法:

  1. lock():尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到成功加锁
  2. try_lock():尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程所持有,则立即返回false否则返回true
  3. try_lock_for(const std::chrono::duration<Rep,Period>& rel_time):尝试对互斥量进行加锁,入股哦当前互斥量已经被其他线程所持有,则当前线程会被阻塞,直到互斥锁加锁成功或超过了指定的时间。
  4. unlock():对互斥锁进行解锁操作。

其中还提供了一些构造函数:

  1. unique_lock()noexcept =default 默认构造函数,创建一个未关联任何互斥量的std::unique_lock对象。
  2. explicit unique_lock(mutex_type& m)构造函数,使用给定的互斥量m进行初始化,并对该互斥量进行加锁。
  3. unique_lock(mutex_type& m,defer_lock_t) noexcept构造函数,使用给定的互斥量m进行初始化,但不对该互斥量进行加锁操作。
  4. unique_lock(mutex_type& m,try_to_lock_t) noexcept构造函数,使用给定的互斥量m进行初始化,并尝试对该互斥量进行加锁操作,如果加锁失败,则创建的std::unique_lock对象不与任何互斥量关联。
  5. unique_lock(mutex& m,adopt_lock_t) noexcept构造函数,使用给定的互斥量m进行初始化并且假定该互斥量已经呗当前线程成功加锁。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>

std::mutex mtx;

void defer_lock_demo()
{
std::unique_lock<std::mutex> lk(mtx, std::defer_lock); //延迟加锁
std::cout << "加锁前"<< std::endl;
lk.lock(); // 手动加锁
std::cout << "加锁后"<<std::endl;
lk.unlock(); // 手动解锁
}

void try_lock_demo()
{
std::unique_lock<std::mutex> lk(mtx, std::try_to_lock); //尝试加锁
std::cout << "尝试加锁结果:" << (lk.owns_lock() ? "成功":"失败")<<std::endl;
}

void timeout_demo()
{
std::unique_lock<std::mutex> lk;
auto now = std::chrono::steady_clock::now();
if (lk.try_lock_until(now + std::chrono::milliseconds(100))) // 3. 超时加锁
std::cout << "100ms 后加锁\n"<< std::endl;
else
std::cout << "100ms后加锁失败" << std::endl;
}

void adopt_lock_demo()
{
mtx.lock(); // 已加锁
std::unique_lock<std::mutex> lk(mtx, std::adopt_lock); // 4. 接管所有权
std::cout << "使用 adopt_lock\n";
} // 自动解锁

int main()
{
std::thread t1(defer_lock_demo);
std::thread t2(try_lock_demo);
std::thread t3(timeout_demo);
std::thread t4(adopt_lock_demo);

for (auto& t : {std::ref(t1), std::ref(t2), std::ref(t3), std::ref(t4)})
t.join();
return 0;
}



std::call_once介绍以及使用场景

std::call_once其主要作用是保证一个函数在多个线程中只被调用一次,其原型如下:

1
2
_EXPORT_STD template <class _Fn, class... _Args>
void(call_once)(once_flag& _Once, _Fn&& _Fx, _Args&&... _Ax)

其中 _Once是标志,Fx是我们需要保证调用一次的函数,Ax则是参数。大多数的适用场景在于构造类的单例模式,如果执行的内容中有单例模式类的构造,有可能出现多个线程同时构造造成的多个实例。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>


class A {
public:
A() {}
A(const A&) = delete;
A& operator=(const A&) = delete;

// 对外唯一入口
static A& getInstance() {
instance_ = new A;
return *instance_;
}
void print() { std::cout << "A::print() " << std::this_thread::get_id() << '\n'; }
private:
static A* instance_;
};

在以上单例创建中,当我们调用线程后,多个线程都执行了创建A的过程的话,就可能回造成同时进行多个getInstance这样类的单例就会被破坏。那么此时就需要call_once来保证其单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <mutex>
#include <thread>

class A {
public:
A(const A&) = delete;
A& operator=(const A&) = delete;

// 对外唯一入口
static A& getInstance() {
std::call_once(initFlag_, []() {
instance_ = new A; // 只执行一次
});
return *instance_;
}

void print() { std::cout << "线程为: " << std::this_thread::get_id() << '\n'; }

private:
A() {}

static std::once_flag initFlag_;
static A* instance_; // 原始指针,永不释放
};


std::once_flag A::initFlag_;
A* A::instance_ = nullptr;

int main() {
std::thread pool[10];
for (auto& t : pool)
t = std::thread([]() { A::getInstance().print(); });
for (auto& t : pool) t.join();
return 0;
}

  • 注意:call_once只能在子线程中使用,不能在主线程中使用(main函数),否则会报错。

条件变量的使用(condition_variable)

具体使用流程如下:

  1. 创建一个std::condition_variable对象
  2. 创建一个互斥锁std::mutex对象来保护共享资源的访问。
  3. 在需要等待条件变量的地方:使用std::unique_lock<std::mutex>对象锁定互斥锁并且调用std::condition_variable::await_until()或者std::condition_variable::await_for()函数等待条件变量
  4. 在其他线程中需要通知等待的线程时,调用std::condition_variable::notify_one()或者std::condition_variable::notify_all()来通知等待的线程

在以上流程中,最经典的场景为:生产者—消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <vector>

std::condition_variable cv;
std::queue<int> tasks;
std::mutex mtx;
bool finish = false;

//生产者
void producer(size_t id,size_t num) {
for (size_t i = 0; i < num; i++)
{
std::unique_lock<std::mutex> lk(mtx);
tasks.push(i * i);
std::cout << "["<< id <<"] task push" << i * i << std::endl;
}
cv.notify_all();//通知所有线程
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

//消费者
void consumer(size_t id) {
while (true) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, []() {
return !tasks.empty() || finish;
});
if (!tasks.empty()) {
int data = tasks.front();
tasks.pop();
lk.unlock();//用完即解锁
std::cout <<"["<< id << "]consumer get data: " << data << std::endl;
}
else if (finish)
break;
}
}


int main() {
size_t PRODUCER = 2;
size_t CONSUMER = 3;
size_t TOTAL = 20;

std::vector<std::thread> consumers, producers;

for (size_t i = 0; i < CONSUMER; i++)
consumers.emplace_back(consumer, i);//启动所有消费者

for (size_t i = 0; i < PRODUCER; i++)
producers.emplace_back(producer, i, TOTAL);//启动所有生产者

for (auto& t : producers)t.join(); //等待所有生产者执行完毕
{//生产结束
std::unique_lock<std::mutex> lk(mtx);
finish = true;
}
cv.notify_all();//再次提醒所有消费者检查finish,如果队列空了并且finish为true则退出

for (auto& t : consumers) t.join();

std::cout << "执行完成" << std::endl;

return 0;
}

线程池具体实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <vector>
#include <functional>

class ThreadPool {
public:
explicit ThreadPool(const size_t max_threads) {
for (size_t i = 0; i < max_threads; i++) {
threads.emplace_back([this]() {
while (1) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [this]() {
return !tasks.empty() || stop;
});

if (stop && tasks.empty()) return;

std::function<void()> task(std::move(tasks.front()));
tasks.pop();
lk.unlock();
task();
}
});
}

}

~ThreadPool() {
{
std::unique_lock<std::mutex> lk(mtx);
stop = true;
}
cv.notify_all();

for (auto& t : threads) t.join();
}

template<typename F,typename ...Args>
void enqueue(F&& f ,Args && ...args) {
std::function<void()> task =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lk(mtx);
tasks.emplace(std::move(task));
}
cv.notify_one();
}

private:
std::vector <std::thread> threads;
std::queue <std::function<void()>> tasks;

std::mutex mtx;
std::condition_variable cv;

bool stop = false;
};

int main() {

ThreadPool threadpool(10);

for (size_t i = 0; i < 10; i++)
{
threadpool.enqueue([i] {
static std::mutex io_mtx; //为了让输出流正常输出
{
std::unique_lock<std::mutex> lock(io_mtx);
std::cout << "第" << i << "个任务开始进行" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::unique_lock<std::mutex> lock(io_mtx);
std::cout << "第" << i << "个任务已经完成" << std::endl;
}
});
}

std::cout << "线程池执行完毕" << std::endl;

}

异步并发

  • async future:

async``future是一个函数模板并且用于异步执行的一个函数,并且返回值是一个类型为std::future对象,表示异步操作的结果。使用std::async可以很方便的进行异步编程,避免了手动创建线程和管理线程的麻烦。其中调用使用std::async(执行策略,函数, 参数...);。接收使用std::future<返回值类型>,获取结果使用.get()方法。其中执行策略有两种:

  1. std::launch::async:直接开新的线程或者复用线程池异步跑。
  2. std::launch::deferred:惰性求值,只有进行get方法后才进行运行。线程id与main的线程相同

使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <future>
#include <chrono>

int slow_add(int a, int b) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
}

int main() {
std::future<int> result = std::async(std::launch::async,slow_add, 3, 4);

std::cout << "主线程继续运行" << std::endl;
std::cout << "结果 = " << result.get() << std::endl;
}
  • packaged_task:

在c++中,packaged_task是一个类模板,用于将一个可调用对象(如函数,函数对象或lambda表达式)封装成一个异步操作,并返回一个std::future对象,表示异步操作的结果。package_task可以方便的将一个函数或可调用对象转换成一个异步操作,供其他线程使用。调用模板为std::packaged_task<返回值类型,(参数1类型,参数2类型,...)> 名称(函数)获取future则使用其.get_future(),其使用需要自己为其分配一个线程运行

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <future>
#include <thread>

int add(int a,int b){ return a+b; }

int main(){
std::packaged_task<int(int,int)> task(add); // 包函数
std::future<int> result = task.get_future(); // 拿 future

std::thread t(std::move(task), 3, 4); // 这里需要使用move转换成右值传入
t.join();

std::cout << "packaged_task 结果 = " << result.get() << '\n';
}
  • promise:

promise用于在一个线程中产生一个值,并且在另一个线程中获取这个值。promise通常和async一起使用实现异步编程。

基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <future>
#include <thread>

void work(std::promise<int> p){ // promise 可移动
std::this_thread::sleep_for(std::chrono::seconds(1));
p.set_value(42); // 手动给结果
}

int main(){
std::promise<int> prom;
std::future<int> result = prom.get_future();

std::thread t(work, std::move(prom)); // 把 promise 交给干活线程
std::cout << "等结果…" <<std::endl;
std::cout << "promise 结果 = " << result.get() << std::endl;
t.join();
}

原子操作

  • std::atomic:

它提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中的数据竞争问题。他的用法就类似普通变量,但是操作是原子性的,也就意味着在多线程中他不会出现资源竞争问题。

常用操作:

  • load():将std::atomic变量的值加载到当前线程的本地缓存中并且返回这个值。
  • store(value):将value的值存储到std::atomic的变量中,并且保证操作是原子性的。
  • exchange(value):将value的值存到std::atomic的变量之中,并且返回原来存储的数据。
  • compare_exchange_weak(expected, desired):只有当前值等于expected 才改成 desired,返回是否成功;可能假失败(有时确实相等但是依旧返回false,是由于底层执行时进行竞争所造成的,再次检查就可能恢复正常),适合循环
  • compare_exchange_strong(expected, desired) :同上,但是不允许假失败,成本高,适合单次判断
  • fetch_add(delta) / fetch_sub(delta):对原子变量进行加减delta
  • is_lock_free() :查询该对象是否无锁
  • 同时也支持语法糖:++/- - /+= /-=

完整示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <atomic>
#include <iostream>
#include <thread>

template<typename T>
void show(const char* op, T old, T now, bool ok = true) {
std::cout << op << " 旧值=" << old
<< " 新值=" << now
<< (ok ? " 成功\n" : " 失败")
<< std::endl;
}

int main() {
std::atomic<int> a{10};
std::cout << "初始 a = " << a.load() << std::endl;

/* 1. load / store */
int x = a.load(); // 原子读
std::cout << "load 得到 " << x << std::endl;
a.store(20); // 原子写
std::cout << "store 20 后 a = " << a.load() << std::endl;


int old = a.exchange(99);
show("exchange(99)", old, 99);


old = a.fetch_add(5);
show("fetch_add(5)", old, old + 5);
old = a.fetch_sub(3);
show("fetch_sub(3)", old, old - 3);

++a; --a;
std::cout << "++a 再 --a 后 a = " << a.load() << std::endl;


int expected = 100;
bool okWeak = a.compare_exchange_weak(expected, 200);
show("compare_exchange_weak(100→200)", 100, 200, okWeak);

expected = 101;
okWeak = a.compare_exchange_weak(expected, 200);
show("compare_exchange_weak(101→200)", expected, 200, okWeak);

expected = a.load();
bool okStrong = a.compare_exchange_strong(expected, 300);
show("compare_exchange_strong(当前→300)", expected, 300, okStrong);

/* 5. 无锁查询 */
std::cout << "\nis_lock_free = "
<< (a.is_lock_free() ? "true" : "false") << std::endl;
}