实现一个可中断线程的线程类
在上文中简单的实现了一个线程中断退出方案,下面是这个实现代码:
int shared = 0;// 线程函数
void foo(atomic<bool> &stop_token, string name) {int count = 0;while (!stop_token.load(memory_order_acquire) {cout << name << ": " << count++ << endl;this_thread::sleep_for(chrono::milliseconds(200));}// 返回前读sharedcout << shared << endl;
}int main() {atomic<bool> stop_token{false};thread thr(foo, ref(stop_token), "test");this_thread::sleep_for(chrono::seconds(1));shared = 42; // 中断前写sharedstop_token.store(true, memory_order_release);thr.join();
}
虽然能够满足线程中断功能要求,但是在方案设计上仍有一些问题:
首先,它是是专用的,无法做到通用性,它是为一个特殊的应用场景实现的,如果换成另一个场景,就得重新实现。
其次,中断标记变量stop_token封装性不好,每次启动一个线程,都需要手动定义一个stop_token变量。
再次,stop_token的生存期和线程及线程对象是分离的,在启动线程时以引用类型作为参数传递,存在悬挂引用的风险。
既然C++是面向对象的语言,可以考虑使用类来封装,让stop_token成为线程对象的数据成员。
封装stop_token
基于上面的分析,我们可以实现一个具有中断功能的线程类,中断标记变量stop_token是这个类的数据成员,初步实现部分代码如下:
// 类的名称是ithread,即interruptable thread(可中断线程)。
class ithread {atomic<bool> stop_token;thread thr;
public:template<typename F, typename... Args>ithread(F &&f, Args && ...args) : stop_token{false},thr(std::forward<F>(f), stop_token, std::forward<Args>(args)...) {}...
};
假设线程函数是:
void foo(atomic<bool> &stop_token, const char *name);
那么在构造ithread对象时:
ithread ithr(foo, "123456");
会编译失败,因为thread构造时:thr(std::forward(f), stop_token, std::forward(args)…),是一个可变参数的函数模板调用,无法单独为foo()传输stop_token参数。
再者,如果线程函数中又调用了子函数,子函数也要检查是否中断退出,也得需要把这个stop_token作为子函数的参数。
由此可见,把stop_token作为显式参数传递并不方便,它不应该作为线程函数的参数,而是让线程函数在内部直接访问它,即给stop_token提供一个众所周知的访问点,可以直接在线程函数内通过这个访问点访问。
熟悉设计模式的朋友知道,单例模式有一个特点,它提供了一个全局知名的访问点,应用程序在函数内可以直接通过这个访问点来使用单例对象,因此,单例对象实例通常在类中使用static public修饰,这样就可以在程序中任何位置直接访问。不过,单例对象是可以在整个进程范围内访问的,而这里要求是仅在线程范围内访问,因此只是把stop_token在类内使用static修饰是不可行的,还得要借助于其它技术。
隐藏stop_token参数
如何控制一个static变量是线程独有的呢?那就是thread_local变量,因此:
class ithread {thread_local static atomic<bool> stop_token;thread thr;
public:...static bool stop_requested() {return stop_token;}void request_stop() {...}...
}
先在类中定义了thread_local static数据成员stop_token,并定义了两个和线程中断退出相关的函数:非静态成员函数request_stop()用于发送中断请求,静态成员函数stop_requested()用于判断是否设置了中断标识,它访问了static数据成员stop_token。
数据成员stop_token是一个thread_local变量,它是线程私有的变量,当线程函数访问它时,就是访问自己所在线程的那个stop_token变量,尽管所有线程使用了相同的变量名称,但是线程之间也不会冲突。这样,不需要使用参数传递,可以在函数内直接调用stop_requested()来判断是否中断退出:
// 线程函数
void foo(const char *name) {if (ithread::stop_requested()) {....return;}
}
显然,这为编写函数带来了极大的便利性。函数不需要传递额外的参数,如果想要响应中断请求就使用ithread::stop_requested()来检查,不想响应就不使用它。
共享stop_token
在ithread类内新增一个指针类型的数据成员stoken_ptr,它指向中断标记变量stop_token,访问stoken_ptr就是访问stop_token,这样线程对象就可以通过stoken_ptr和目标线程共享stop_token变量了。凡是在线程对象中访问stop_token的地方,都通过stoken_ptr来访问,而目标线程中访问stop_token的地方,直接调用static成员函数stop_requested()。实现代码如下:
class ithread {thread_local static atomic<bool> stop_token;atomic<bool> *stoken_ptr; // 作为数据成员,指向stop_tokenthread thr;// run是ithread线程的启动函数void run(std::function<void()> callable, std::promise<atomic<bool> *> &prom) {stop_token.store(false, memory_order_relaxed);prom.set_value(&stop_token); // Acallable();}public:template<typename F, typename... Args>ithread(F &&f, Args && ...args) : stoken_ptr{nullptr} {std::promise<atomic<bool> *> prom;thr = thread(&ithread::run, this, bind(std::forward<F>(f), std::forward<Args>(args)...),ref(prom));stoken_ptr = prom.get_future().get(); // B}static bool stop_requested() { // 检查是否中断线程return stop_token.load(memory_order_acquire); // C}void request_stop() { // 随时可以调用此接口中断线程stoken_ptr->store(true, memory_order_release); // D}~ithread() {request_stop();if (thr.joinable()) {thr.join();}}
};
定义一个辅助成员函数run(),它负责在启动线程之后,调用线程函数之前初始化stop_token,此时run()是在目标线程中运行的,因此它所初始化的stop_token是属于目标线程的私有变量,同时stoken_ptr指针指向了stop_token,因此它指向的stop_token是目标线程的私有变量。这样,在C和D两处访问的都是目标线程私有的stop_token变量。
此外,使用了future和promise来保证stop_token和stoken_ptr的初始化顺序。构造ithread对象时,在构造函数返回之前,使用future::get()等待stop_token初始化完成(B处),当线程完成它的stop_token初始化之后,使用promise::set_value()设置stoken_ptr的值(A处),这样在对象构造完成之后,stoken_ptr就已经正确指向线程的stop_token了。
stop_token变量既是中断标识变量也是同步变量,因为它是atomic原子类型的,可以对它进行适当的内存序标记。
stop_token生存期
线程对象是通过裸指针类型的数据成员stoken_ptr来引用目标线程的stop_token,因此,线程对象是共享但不拥有stop_token的所有权。
stop_token是thread_local变量,它是线程的私有对象,它的生存期和线程生存期是一致的,当线程结束之后,它也就随之销毁了。而stoken_ptr是线程对象的私有数据,它的生存期和线程对象的生存期一致。当线程运行结束之后,stop_token就被销毁了,此时如果线程对象仍然存活,那么它的数据成员stoken_ptr就成为一个悬挂指针,如果继续访问它就很危险。例如下面的使用场景:
int main() {ithread thr(foo, "test");thr.detach();...// 在此期间,线程退出,stop_token销毁了thr.request_stop(); // 通知后台线程中断退出...
}
thr线程在detach之后很快就运行结束了,此时stop_token已经销毁了,如果后来thr线程对象再调用request_stop()时,因为stoken_ptr指向的stop_token已经被销毁,它成为悬挂指针,再对它进行解引用操作可能会发生意外。
因此,stop_token的所有权应该是线程和线程对象共有,它应该在堆上分配,并使用std::shared_ptr来管理。
共有stop_token所有权
如果使用shared_ptr来管理在堆上分配的stop_token,可能要使用shared_ptr的原子类型了,即定义atomic<shared_ptr> stop_token。因为shared_ptr不是平凡的可拷贝类型,无法定义它的原子类型,在C++11中也没有为它提供特化的原子类型,因此无法直接使用。
仔细分析一下这里使用原子类型的目的,一是保证stop_token读写时的原子操作,二是使用它来设置内存序。
先看一,stop_token在这里的操作只有读或写操作,不使用别的原子操作,比如CAS、RMW等,而stop_token的类型是bool型的,长度只有一个字节,地址自然是内存对齐的,因此,对它进行读和写操作肯定都是原子操作。可见,即使不用atomic类型也能满足读、写的原子操作要求。
再看二,要想保证内存序,除了使用原子变量之外,也可以使用内存屏障atomic_thread_fence()来保证,因此,即使不用atomic类型也能满足指定内存序的要求。
因此,stop_token最终使用shared_ptr类型,修改为如下实现:
class ithread {thread_local static shared_ptr<bool> stop_token;shared_ptr<bool> stoken_sptr;...
public:...static bool stop_requested() {bool ret = *stop_token;atomic_thread_fence(memory_order_acquire);return ret;}void request_stop() {atomic_thread_fence(memory_order_release);*stoken_sptr = true;}...
};
request_stop()中的内存序也可以在*stoken_sptr的写操作后面使用seq_cst,尽管seq_cst是最严格的内存序,使用它可能性能要差一些,但是,首先,调用request_stop()的次数并不多,其次,seq_cst内存序能保证修改stop_token为true时,能以最快的速度传输到各个CPU的cache中,可以提高目标线程对中断的响应速度。
void request_stop() {atomic_thread_fence(memory_order_release);*stoken_sptr = true;atomic_thread_fence(memory_order_seq_cst);
}
最终实现
加上几个管理线程的成员函数,如get_id()、joinable()、join()、detach(),它们都是把调用转发到thread类的对应成员函数。这样ithread类的最终实现是:
class ithread {thread_local static shared_ptr<bool> stop_token;shared_ptr<bool> stoken_sptr;thread thr;// run是ithread线程的启动函数void run(std::function<void()> callable, std::promise<shared_ptr<bool>> &prom) {stop_token = make_shared<bool>(false);prom.set_value(stop_token);callable();}public:template<typename F, typename... Args>ithread(F &&f, Args && ...args) : stoken_sptr{nullptr} {std::promise<shared_ptr<bool>> prom;thr = thread(&ithread::run, this, bind(std::forward<F>(f), std::forward<Args>(args)...),ref(prom));stoken_sptr = prom.get_future().get();}static bool stop_requested() {bool ret = *stop_token;atomic_thread_fence(memory_order_acquire);return ret;}void request_stop() {atomic_thread_fence(memory_order_release);*stoken_sptr = true;}~ithread() {request_stop();if (joinable()) {join();}}std::thread::id get_id() {return thr.get_id();}bool joinable() const noexcept {return thr.joinable();}void join() {thr.join();}void detach() {thr.detach();}
};thread_local shared_ptr<bool> ithread::stop_token;