C++ 线程与协程

最后更新于 29 天前 2901 字 预计阅读时间: 13 分钟


前言

最近也不知道学什么,被Kernel PWN搞头昏了,复现sekaictf 2023时候发现了一题关于C++协程的PWN,感觉挺有意思的,遂开始学习协程,C++线程也忘得差不多了顺便补一下。


线程

C++线程库在需要包含#include <thread>主要用到下列几个函数

  • std::thread
  • std::mutex
  • std::timed_mutex
  • std::lock_guard
  • std::unique_lock

std::thread原型如下,是一个类,存在构造函数

创建一个线程

#include<iostream>  
#include <thread>  
using namespace std;  

void func()  
{  
   for(int i=0;i<150;i++)  
   {  
       cout << i << endl;  
   }  
}  
int main()  
{  
   thread t(func);
   return 0;  
}

值得注意的是进程结束时会结束其子进程,上述代码在运行中会报错

究其原因其实是创建的线程没执行结束主线程将其结束掉了。

使用std::thread::join可以解决该问题


std::thread::join

  • 底层调用了pthread_join,阻塞主线程,主线程会等待其结束完后才向下执行。
#include<iostream>  
#include <thread>  
using namespace std;  

void func()  
{  
   for(int i=0;i<150;i++)  
   {  
       cout << i << endl;  
   }  
}  
int main()  
{  
   thread t(func);
   t.join();
   return 0;  
}

如果使用了std::thread::join函数则main函数会等待线程的结束,如此一来线程的作用就和函数一样了,无法和main函数同时进行。这就用到了std::thread::detach函数,


std::thread::detach

  • 该函数会分离子线程,子线程在后台运行父线程不能够通过std::thread对象名与子线程通信,对线程使用这个函数主线程不会去注意子线程的生命周期,当主线程结束子线程也会结束。
#include<iostream>  
#include <thread>  
using namespace std;  

void func()  
{  
   for(int i=0;i<150;i++)  
   {  
       cout << i << endl;  
   }  
}  
int main()  
{  
   thread t(func);
   t.detach();
   return 0;  
}

由于和主线程分离了,故在console中看不到输出

多线程下的数据竞争

主要现象是多线程下对同一个数据进行读写会造成错误在此不多赘述,这时候就用到了std::mutex,注意要加上#include <mutex>

std::mutex

  • 创建一个全局互斥量,使用lock方法给互斥量上锁,unlock方法给互斥量解锁。
#include<iostream>  
#include <thread>  
#include <mutex>
using namespace std;  

int sum = 0;
std::mutex m;
void func()  
{  
   for(int i=0;i<15000;i++)  
   {  
       m.lock();
       sum += 1;  
       m.unlock();
   }  
}  
int main()  
{  
   thread t1(func);
   thread t2(func);
   t1.join();
   t2.join();
   cout << sum << endl;
   return 0;  
}

std::lock_guard

  • 该方法会自动的在作用域内对一个互斥量加解锁,本质上是一个类在作用域开始执行构造函数,在作用域结束后调用析构函数。

加在for函数中,该方法的作用域为一次for循环,一次for循环后线程可以为互斥量加解锁。

void func()  
{  
   
   for(int i=0;i<1500;i++)  
   { 
       std::lock_guard<std::mutex> lock(m);
       sum += 1;  
       cout << i << endl;
   }  
}  

加在函数中则作用域为整个函数,则该线程会一直占据该函数直至结束后才会解锁互斥量。

void func()  
{  
   std::lock_guard<std::mutex> lock(m);
   for(int i=0;i<1500;i++)  
   { 
       sum += 1;  
       cout << i << endl;
   }  
}  

std::unique_lock

  • 和std::lock_guard的作用差不多,都是依靠构造析构加解锁,但是std::unique_lock还支持时间上的一些功能
  • std::unique_lock::try_lock:会尝试对互斥量进行加锁,如不成功则返回false
  • std::unique_lock::try_lock_for:会在指定时间内尝试对互斥量进行加锁,如在指定时间内不能获取互斥量的所有权则返回false,注意该互斥量要支持时间操作(std::timed_mutex)
  • std::unique_lock::try_lock_until:会在指定时间点尝试对互斥量进行加锁,如在指定时间点不能获取互斥量的所有权则返回false,注意该互斥量要支持时间操作(std::timed_mutex)

如下代码会导致sum的结果比预期少,可能的原因为线程1第一次加锁成功后睡眠1秒后解锁,下一次for循环还是线程1成功加锁导致线程二超时。

#include<iostream>   
#include <thread>  
#include <mutex>

int sum = 0;
std::timed_mutex m;
void func()  
{  
   for(int i=0;i<5;i++)  
   { 
       std::unique_lock<std::timed_mutex> lock(m,std::defer_lock);
       if (lock.try_lock_for(std::chrono::seconds(2)))
       {
           std::this_thread::sleep_for(std::chrono::seconds(1));
           sum += 1;
           std::cout << i << std::endl;
       }
       else
       {
           std::cout << "timeout" << std::endl;
       }
   }  

}  
int main()  
{  
   std::thread t1(func);
   std::thread t2(func);
   t1.join();
   t2.join();
   std::cout << sum << std::endl;
   return 0;  
}

std::call_once(once_flag& _Once, _Fn&& _Fx, _Args&&… _Ax)

  • 在多线程下,当一个类只能有一个实例时,需要用call_once来避免多线程创建多个实例

当A类被规定只能有一个实例时,使用std::call_once避免在多线程的情况下创建多个实例。

#include<iostream>  
#include <thread>  
#include <mutex>

static std::once_flag flag;
class A
{
public:
	static A* pA;
	A() {};
	A(A& a) = delete;
	A& operator=(A& a) = delete;
	~A() {};
	static void  init()
	{
		if (!pA) pA = new A();
	}
	static A& GetInstance() 
	{
		std::call_once(flag, init);
		return *pA;
	}
	void func()
	{
		std::cout << "func" << std::endl;
	}
}; 
A* A::pA = NULL;

void func()
{
	A::GetInstance().func();
}
int main()  
{
	std::thread t1(func);
	std::thread t2(func);
	t1.join();
	t2.join();
    return 0;  
}

std::condition_variable

  • std::condition_variable::wait(unique_lock& _Lck, _Predicate _Pred):解锁互斥量,并将执行该函数的线程加入等待队列。
  • std::condition_variable::notify_one:唤醒等待队列的第一个线程,并给与互斥量的锁,不用竞争锁。
  • std::condition_variable::notify_all:唤醒等待队列的所有线程,所有进程竞争锁。

std::condition_variable::notify_one

#include<iostream>  
#include <thread>  
#include <mutex>
#include<queue>
#include<condition_variable>

std::queue<int> q;
std::mutex m;
std::condition_variable cv;
void add_task()
{
	for(int a = 0; a < 100; a++)
	{
		std::this_thread::sleep_for(std::chrono::microseconds(100));
		std::unique_lock<std::mutex> lk(m);
		q.push(a);
		cv.notify_one();
		std::cout<<"task:"<<a<< std::endl;
	}
}
void pop_task()
{
	int thread_id = 1;
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, []() {return !q.empty();});
		int i = q.front();
		q.pop();
		std::cout<<"id:"<<thread_id<<"," << "run task" << i << std::endl;
	}
}
void pop_task2()
{
	int thread_id = 2;
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, []() {return !q.empty(); });
		int i = q.front();
		q.pop();
		std::cout << "id:" << thread_id << "," << "run task" << i << std::endl;
	}
}
int main()  
{
	std::thread t1(add_task);
	std::thread t2(pop_task);
	std::thread t3(pop_task2);
	t1.join();
	t2.join();
	t3.join();
    return 0;  
}

执行结果如下,每个进程按顺序执行。

std::condition_variable::notify_all结果如下图,线程会争抢锁导致有时不按顺序执行。

线程池

#include<iostream>  
#include<coroutine>  
#include <thread>  
#include <functional>  
#include <mutex>
#include<queue>
#include<condition_variable>
#include<vector>
std::queue< std::function<void()>> q;
std::mutex m;
std::condition_variable cv;
std::vector<std::function<void()>> thread_pool;
class Thread_pool
{
public:
	Thread_pool(int num){
		for (int i = 0; i < num; i++)
		{
			thread_pool.push_back([]() {
				while (1)
				{
					if (stop_flag)
					{
						return;
					}
					std::unique_lock<std::mutex> lock(m);
					cv.wait(lock, []() { return !q.empty(); });
					std::function<void()>task(std::move(q.front()));
					q.pop();
					lock.unlock();
					task();
				}
				});
		}
	}
	~Thread_pool() {
		if (!thread_pool.empty())
		{
			for (auto& t : thread_pool)
			{
				t();
			}
		}
		else {
			stop_flag = true;
			cv.notify_all();
		}
	}
private:
	static bool stop_flag;
};

bool Thread_pool::stop_flag = false;

void add_task(std::function<void()> p)
{
		std::unique_lock<std::mutex> lock(m);
		q.push(p);
		cv.notify_all();
}

int main()  
{

	Thread_pool pool(10);
	for (int x = 0; x < 15000; x++)
	{
		add_task([x]() {
			std::cout << "task"<<x<< std::endl;
			});
	}

	
	return 0;
}

协程

拖了好久了,主要是懒。。。。

什么是协程

协程是一种特殊的函数,用于简化异步编程和并发编程,可以在执行过程中暂停并在稍后恢复执行。在一些情况下比线程更加高效。

协程的定义

在一个函数中,只要出现了

  • co_await
  • co_yield
  • co_return

三者之一就会自动变成协程函数

co_await关键字是让协程暂停,对自定义类型使用时要遵守Awaitable(可等待)原则。

Awaitable即在自定义类型实现以下三个方法

  • await_ready:表示协程是否准备好,如果没准备好则执行await_suspend,否则继续执行协程。
  • await_suspend:表示协程被挂起后的行为
  • await_resume:表示协程恢复后的行为,也是co_await的返回结果

test类实现了下面三个方法,即test类是Awaitable的,在func中co_await t; 执行流程如下

  • co_await t挂起协程函数func并调用test::await_ready判断协程是否已经准备好
  • test::await_ready返回false表示协程未准备好进而执行test::await_suspend,且参数未该协程的句柄,拥有该句柄可以控制协程的恢复与暂停。
  • test::await_suspend执行h.resume();,表示恢复协程,并执行test::await_resume返回res
  • 执行test::await_resume后继续协程func的运行,v1=1;
int res = 0;
class Task  
{  
..........................
};  
class test  
{  
public:  
 bool await_ready() { return false; }  #2
 void  await_suspend(std::coroutine_handle<> h)  #3
 {  
	 res+=1;
         h.resume();
 }  
 int await_resume()  #4
 {  
	 return res;
 }  
}; 
Task func()  
{  
 test t;
 int v1=co_await t;   #1
 co_return v1;
}  
int main()
{
Task t=func();
} 

协程的返回值

可以注意到协程func的返回值是一个Task类,Task类是我们自定义的,但是协程定义其返回值类型必须有promise_type的内嵌类型。在promise_type类型中必须要实现以下方法

  • get_return_object:promise_type用于初始化Task类的方法
  • initial_suspend:协程刚开始时执行的函数
  • final_suspend:协程结束时执行的函数
  • return_value/return_void:co_return的行为,如无返回值则执行return_void,有则执行return_value。
  • unhandled_exception:协程句柄失效后的函数

Task类定义如下,在执行test::await_ready前还需要初始化协程,上面简化了这些操作。

class Task  
{  
public:  
 class promise_type  
 {  
 public:
   promise_type() {
	   this->val =1;
   }
   Task get_return_object() 
   {
	   return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
   }  
   std::suspend_never initial_suspend() { return {}; }
   std::suspend_always final_suspend() noexcept { return {}; }
   //void return_void() {}  
   void return_value(int v) {
	   this->val = v;
   }
   void unhandled_exception() {} 
	int val;
 };  
 std::coroutine_handle<promise_type> handle;
};  

正常流程如下

  • 执行co_await t后挂起协程并执行Task::promise_type::promise_type()构造函数
  • 执行Task::promise_type::get_return_object()根据promise_type得到协程的句柄初始化Task类。
  • 执行initial_suspend
  • 执行test类中的函数
  • 遇到co_return v1有返回值则执行Task::promise_type::return_value将val置为v1
  • Task t=func();得到Task类
  • 根据t.handle.promise()函数得到对应的promise_type对象,t.handle.promise().val得到val的值。

注意initial_suspend和final_suspend有两个不同的返回值分别是

  • suspend_never :表示协程不挂起,继续执行
  • suspend_always :表示协程挂起,需要handle.resume恢复运行

final_suspend的返回值类型要是suspend_always ,否则协程运行结束后销毁资源导致handle变成野指针。

完成了1+2+3的逻辑

#include <iostream>  
#include <coroutine>  
#include <functional>  
#include <Windows.h>  
int res = 0;
class test  
{  
public:  
 bool await_ready() { return false; }  
 void  await_suspend(std::coroutine_handle<> h)  
 {  
	 res+=1;
 }  
 int await_resume()  
 {  
	 return res;
 }  
};  

class Task  
{  
public:  
 class promise_type  
 {  
 public:
   promise_type() {
	   this->val =1;
   }
   Task get_return_object() 
   {
	   return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
   }  
   std::suspend_never initial_suspend() { return {}; }
   std::suspend_always final_suspend() noexcept { return {}; }
   //void return_void() {}  
   void return_value(int v) {
	   this->val = v;
   }
   void unhandled_exception() {} 
	int val;
 };  
 std::coroutine_handle<promise_type> handle;
};  

Task func()  
{  
 test t;
 int v1=co_await t;  
 v1 += co_await t;
 v1 += co_await t;
 co_return v1;
 
}  

int main()  
{  
Task t=func();  
t.handle.resume();
t.handle.resume();
t.handle.resume();
std::cout << t.handle.promise().val;
 return 0;  
}
此作者没有提供个人介绍。
最后更新于 2025-04-10