并发(Concurrent),在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。
并行(Parallel),当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。
https://github.com/anthonywilliams/ccia_code_samples
class background_task
{
public:void operator()() const{do_something();do_something_else();}
};background_task f;
std::thread my_thread(f);//√
std::thread my_thread(background_task()); //×
//相当与声明了一个名为my_thread的函数, 这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),
//返回一个 std::thread 对象的函数, 而非启动了一个线程。std::thread my_thread((background_task()));//√ 多组括号
std::thread my_thread{background_task()}; //√ 初始化语法std::thread my_thread([]{ //√ lambdado_something();do_something_else();});
#include
#include
#include
#include void do_work(unsigned id)
{}void f()
{std::vector threads;for(unsigned i=0;i<20;++i){threads.push_back(std::thread(do_work,i));}std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
}int main()
{f();
}
https://en.cppreference.com/w/cpp/utility/functional/mem_fn std::mem_fn用法
#include
#include
#include struct Foo {void display_greeting() {std::cout << "Hello, world.\n";}void display_number(int i) {std::cout << "number: " << i << '\n';}int add_xy(int x, int y) {return data + x + y;}template int add_many(Args... args) {return data + (args + ...);}auto add_them(auto... args) {return data + (args + ...);}int data = 7;
};int main() {auto f = Foo{};auto greet = std::mem_fn(&Foo::display_greeting);greet(f);auto print_num = std::mem_fn(&Foo::display_number);print_num(f, 42);auto access_data = std::mem_fn(&Foo::data);std::cout << "data: " << access_data(f) << '\n';auto add_xy = std::mem_fn(&Foo::add_xy);std::cout << "add_xy: " << add_xy(f, 1, 2) << '\n';// Working with smart pointerauto u = std::make_unique();std::cout << "access_data(u): " << access_data(u) << '\n';std::cout << "add_xy(u, 1, 2): " << add_xy(u, 1, 2) << '\n';// Working with member function template with parameter packauto add_many = std::mem_fn(&Foo::add_many);std::cout << "add_many(u, ...): " << add_many(u, 1, 2, 3) << '\n';auto add_them = std::mem_fn(&Foo::add_them);std::cout << "add_them(u, ...): " << add_them(u, 5, 7, 10.0f, 13.0) << '\n';
}
std::thread::hardware_concurrency() 在新版C++标准库中是一个很有用的函数。这个函数会返回能并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。但是,这也无法掩盖这个函数对启动线程数量的帮助。
//Download by www.cctry.com
#include
#include
#include
#include
#include
#include template
struct accumulate_block
{void operator()(Iterator first,Iterator last,T& result){result=std::accumulate(first,last,result);}
};template
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length=std::distance(first,last);if(!length)return init;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;std::vector results(num_threads);std::vector threads(num_threads-1);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);threads[i]=std::thread(accumulate_block(),block_start,block_end,std::ref(results[i]));block_start=block_end;}accumulate_block()(block_start,last,results[num_threads-1]);std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));return std::accumulate(results.begin(),results.end(),init);
}int main()
{std::vector vi;for(int i=0;i<10;++i){vi.push_back(10);}int sum=parallel_accumulate(vi.begin(),vi.end(),5);std::cout<<"sum="<
线程标识类型为 std::thread::id , 并可以通过两种方式进行检索。 第一种, 可以通过调用 std::thread 对象的成员函数 get_id() 来直接获取。 如果 std::thread 对象没有与任何执行线程相关联,get_id()将返回 std::thread::type 默认构造值, 这个值表示“无线程”。 第二种, 当前线程中调用 std::this_thread::get_id() (这个函数定义在
std::thread::id master_thread;
void some_core_part_of_algorithm()
{if(std::this_thread::get_id()==master_thread){do_master_thread_work();}do_common_work();
}
共享数据带来的问题 | 使用互斥量保护数据 | 数据保护的替代方案
std::locak_guard 这样简单的模板类型的模板参数列表可以省略。std::lock_guard guard(some_mutex);
std::lock_guard guard(some_mutex);
class some_data
{int a;std::string b;
public:void do_something();
};class data_wrapper
{
private:some_data data;std::mutex m;
public:templatevoid process_data(Function func){std::lock_guard l(m);func(data);}
};some_data* unprotected;
void malicious_function(some_data& protected_data)//通过引用,绕过互斥量访问到了保护数据
{unprotected=&protected_data;
}data_wrapper x;
void foo()
{x.process_data(malicious_function);unprotected->do_something();
}
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。不过,事情没那么简单,比如:当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免数据被并发修改,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
可以一次性锁住多个(两个以上)的互斥量, 并且没有副作用 (死锁风险)
#include class some_big_object
{};void swap(some_big_object& lhs,some_big_object& rhs)
{}class X
{
private:some_big_object some_detail;mutable std::mutex m;
public:X(some_big_object const& sd):some_detail(sd){}friend void swap(X& lhs, X& rhs){if(&lhs==&rhs)return;std::lock(lhs.m,rhs.m);std::lock_guard lock_a(lhs.m,std::adopt_lock);std::lock_guard lock_b(rhs.m,std::adopt_lock);swap(lhs.some_detail,rhs.some_detail);}
};int main()
{}
首先, 检查参数是否是不同的实例, 因为操作试图获取 std::mutex 对象上的锁, 所以当其被获取时, 结果很难预料。 (一个互斥量可以在同一线程上多次上锁, 标准库中 std::recursive_mutex 提供这样的功能。 详情见3.3.3节)。然后, 调用 std::lock() ①锁住两个互斥量, 并且两个 std:lock_guard 实例已经创建好②③。 提供 std::adopt_lock 参数除了表示 std::lock_guard 对象可获取锁之外, 还将锁交由 std::lock_guard 对象管理, 而不需要 std::lock_guard 对象再去构建新的锁(此参数作用就是让std::lock_guard在构造函数中不调用mutex的lock函数)。 当使用 std::lock 去锁lhs.m或rhs.m时, 可能会抛出异常; 这种情况下, 异常会传播到 std::lock 之外。 当 std::lock 成功的获取一个互斥量上的锁, 并且当其尝试从另一个互斥量上再获取锁时, 就会有异常抛出, 第一个锁也会随着异常的产生而自动释放, 所以 std::lock 要么将两个锁都锁住, 要不一个都不锁。
https://www.jianshu.com/p/08a8393ac93b C++11多线程 unique_lock详解
https://blog.csdn.net/u012507022/article/details/85909567 C++多线程std::lock
c++17新特性:scoped_lock,一种新的RAII类型模板类型, 与 std::lock_guard<> 的功能等价, 这个新类型能接受不定数量的互斥量类型作为模板参数, 以及相应的互斥量(数量和类型)作为构造参数。 互斥量支持构造即上锁, 与 std::lock 的用法相同, 其解锁阶段是在析构中进行。
void swap(X& lhs, X& rhs)
{if(&lhs==&rhs)return;std::scoped_lock guard(lhs.m,rhs.m); //c++17 自动推导模板参数swap(lhs.some_detail,rhs.some_detail);
}
链表的节点内部有锁,定义遍历的顺序, 一个线程必须先锁住A才能获取B的锁, 在锁住B之后才能获取C的锁。 这将消除死锁发生的可能性, 不允许反向遍历的列表上。 类似的约定常被用来建立其他的数据结构
定义锁的顺序是一种特殊情况, 但锁的层次的意义在于提供对运行时约定是否被坚持的检查。 这个建议需要对你的应用进行分层, 并且识别在给定层上所有可上锁的互斥量。 当代码试图对一个互斥量上锁, 在该层锁已被低层持有时, 上锁是不允许的。 你可以在运行时对其进行检查, 通过分配层数到每个互斥量上, 以及记录被每个线程上锁的互斥量。 下面的代码列表中将展示两个线程如何使用分层互斥。
高层级可以调用低层级,低层级不能被高层级调用 (类似通行证的级别)
hierarchical_mutex high_level_mutex(10000);//1
hierarchical_mutex low_level_mutex(5000); //2
hierarchical_mutex other_mutex(6000); //3
int do_low_level_stuff();
int low_level_func()
{std::lock_guard lk(low_level_mutex);//4return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func()
{std::lock_guard lk(high_level_mutex);high_level_stuff(low_level_func());
}
void thread_a()
{high_level_func();
}
void do_other_stuff();
void other_stuff()
{high_level_func();do_other_stuff();
}
void thread_b()
{std::lock_guard lk(other_mutex);other_stuff();
}
层级锁的实现
// Download by www.cctry.com
#include
#include
#include
class hierarchical_mutex
{std::mutex internal_mutex;unsigned long const hierarchy_value; // 大于hierarchy_value才能锁(厅级才能进入这个门)unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; // 线程当前的级别(当前的级别),级别不够不配锁void check_for_hierarchy_violation(){if (this_thread_hierarchy_value <= hierarchy_value){throw std::logic_error("mutex hierarchy violated");}}void update_hierarchy_value(){previous_hierarchy_value = this_thread_hierarchy_value;this_thread_hierarchy_value = hierarchy_value;}public:explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value),previous_hierarchy_value(0){}void lock(){check_for_hierarchy_violation();internal_mutex.lock();update_hierarchy_value();}void unlock(){if (this_thread_hierarchy_value != hierarchy_value)throw std::logic_error("mutex hierarchy violated");this_thread_hierarchy_value = previous_hierarchy_value;internal_mutex.unlock();}bool try_lock(){check_for_hierarchy_violation();if (!internal_mutex.try_lock())return false;update_hierarchy_value();return true;}
};
thread_local unsigned longhierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); int main()
{hierarchical_mutex m1(42);hierarchical_mutex m2(2000);
}
根据已经定义好的机制, 如你已将一个hierarchical_mutex实例进行上锁, 那么你只能获取更低层级hierarchical_mutex实例上的锁, 这就会对代码进行一些限制 。类似应用场景:2个操作,改密码和设置地址。在设置地址(低)时不能改密码(高),但是可以设置地址之后再改密码。按照安全度将操作划分,防止设置地址时调用修改密码。
thread_local T 可以看作 ----> map
thread_local 作为类成员变量时必须是static的,可以确定其再静态存储区分配了
https://blog.csdn.net/qq_37233607/article/details/80159873 C11:std::unique_lock和std::lock_guard的区别
std::mutex m_mutex;
void print(int cnt)
{//用unique_lock来对锁进行管理.defer_lock是它的构造参数之一.用来推迟加锁.unique_lock lock(m_mutex, defer_lock); /*...这里可以进行一系列线程安全的操作...*///手动加锁.解锁。比lock_guard更灵活lock.lock();cout << std::this_thread::get_id() << " " << cnt << endl;lock.unlock();//这里用来计算sum的值.然后打印.因为sum是在函数内定义的局部变量.//所以下面的代码是线程安全的.没必要用锁对这段代码进行保护.所以在上面用unlock解锁.int sum = 0;for (int i = 0; i < cnt; ++i){sum += i;}//最后在手动加锁.不用手动释放.因为会在生命周期结束时自动释放. 不加也可以,unique_lock析构时会检查是否拥有锁lock.lock();cout << std::this_thread::get_id() << " " << sum << endl;
}
void swap(X& lhs, X& rhs)
{if(&lhs==&rhs)return;std::unique_lock lock_a(lhs.m,std::defer_lock);//defer_lock: 待会再上锁std::unique_lock lock_b(rhs.m,std::defer_lock);std::lock(lock_a,lock_b);swap(lhs.some_detail,rhs.some_detail);
}
std::unique_lock get_lock()
{extern std::mutex some_mutex;std::unique_lock lk(some_mutex);prepare_data();return lk; // 1 编译器会帮忙做move操作
}
void process_data()
{std::unique_lock lk(get_lock()); // 2do_something();
}
void get_and_process_data()
{std::unique_lock my_lock(the_mutex);//默认构造时就会上锁,除非传入std::defer_locksome_class data_to_process=get_next_data_chunk();my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用result_type result=process(data_to_process);my_lock.lock(); // 2 为了写入数据, 对互斥量再次上锁write_result(data_to_process,result);
}
一个特别极端(但十分常见)的情况就是, 共享数据在并发访问和初始化时(都需要保护), 但是之后需要进行隐式同步。 这可能是因为数据作为只读方式创建, 所以没有同步问题; 或者因为必要的保护作为对数据操作的一部分。 任何情况下, 数据初始化后锁住一个互斥量, 纯粹是为了保护其初始化过程(这是没有必要的), 并且会给性能带来不必要的冲击。 出于以上的原因, C++标准提供了一种纯粹保护共享数据初始化过程的机制。
std::shared_ptr resource_ptr;
void foo()
{if(!resource_ptr){resource_ptr.reset(new some_resource); // 1 多线程不安全}resource_ptr->do_something();
}
锁粒度大
std::shared_ptr resource_ptr;
std::mutex resource_mutex;
void foo()
{std::unique_lock lk(resource_mutex); // 所有线程在此序列化if(!resource_ptr){resource_ptr.reset(new some_resource); // 只有初始化过程需要保护}lk.unlock();resource_ptr->do_something();
}
声名狼藉的 double-check
void undefined_behaviour_with_double_checked_locking()
{if(!resource_ptr) // 1{std::lock_guard lk(resource_mutex);if(!resource_ptr) // 2{resource_ptr.reset(new some_resource); // 3}}resource_ptr->do_something(); // 4
}
std::call_once 和 std::once_flag
C++标准委员会也认为条件竞争的处理很重要, 所以C++标准库提供了 std::once_flag 和 std::call_once 来处理这种情况。 比起锁住互斥量并显式的检查指针, 每个线程只需要使用 std::call_once 就可以, 在 std::call_once 的结束时, 就能安全的知道指针已经被其他的线程初始化了。 使用 std::call_once 比显式使用互斥量消耗的资源更少, 特别是当初始化完成后。 下面的例子展示了与清单3.11中的同样的操作, 这里使用了 std::call_once 。 在这种情况下, 初始化通过调用函数完成, 这样的操作使用类中的函数操作符来实现同样很简单。 如同大多数在标准库中的函数一样, 或作为函数被调用, 或作为参数被传递, std::call_once 可以和任何函数或可调用对象一起使用。
//.h
class Control
{
public:static Control *GetInstance();
private:Control();~Control();Control(const Control &other) = delete;const Control &operator=(const Control &other) = delete;// 唯一单实例对象指针static Control *instance_;
}
//.cpp
Control *Control::instance_ = nullptr;
Control *Control::GetInstance()
{static std::once_flag of;std::call_once(of, [&](){ instance_ = new (std::nothrow) Control(); });return instance_;
}std::shared_ptr resource_ptr;
std::once_flag resource_flag; // 1
void init_resource()
{resource_ptr.reset(new some_resource);
}
void foo()
{std::call_once(resource_flag, init_resource); // 可以完整的进行一次初始化resource_ptr->do_something();
}
值得注意的是, std::mutex 和 std::once_flag 的实例不能拷贝和移动, 需要通过显式定义相应的成员函数, 对这些类成员进行操作
其中一个局部变量被声明为static类型, 这种变量的在声明后就已经完成初始化; 对于多线程调用的函数, 这就意味着这里有条件竞争——抢着去定义这个变量。 很多在不支持C++11标准的编译器上, 在实践过程中, 这样的条件竞争是确实存在的, 因为在多线程中, 每个线程都认为他们是第一个初始化这个变量线程; 或一个线程对变量进行初始化, 而另外一个线程要使用这个变量时, 初始化过程还没完成。 在C++11标准中, 这些问题都被解决了: 初始化及定义完全在一个线程中发生, 并且没有其他线程可在初始化完成前对其进行处理, 条件竞争终止于初始化阶段, 这样比在之后再去处理好的多。 在只需要一个全局实例情况下, 这里提供一个 std::call_once 的替代方案。局部静态变量初始化过程线程安全。
class my_class;
my_class& get_my_class_instance()
{static my_class instance; // 线程安全的初始化过程return instance;
}//*.cpp
class WebSocketInfo
{
public:static WebSocketInfo &Instance();std::atomic_bool need_control{false}; //远程控制是否开启WebSocketInfo(){};~WebSocketInfo(){};hv::WebSocketClient ws;
};
//*.h
#define INSTANCE_IMP(class_name, ...) \class_name &class_name::Instance() \{ \static std::shared_ptr s_instance(new class_name(__VA_ARGS__)); \static class_name &s_instance_ref = *s_instance; \return s_instance_ref; \}INSTANCE_IMP(WebSocketInfo)
c++17新特性:C++17标准库提供了两种非常好的互斥量—— std::shared_mutex 和 std::shared_timed_mutex 。 C++14只提供了 std::shared_timed_mutex , 并且在C++11中并未提供任何互斥量类型。 如果你还在用支持C++14标准之前的编译器, 那你可以使用Boost库中实现的互斥量。 std::shared_mutex 和 std::shared_timed_mutex 的不同点在于, std::shared_timed_mutex 支持更多的操作方式(参考4.3节), std::shared_mutex 有更高的性能优势, 从而不支持更多的操作
比起使用 std::mutex 实例进行同步, 不如使用 std::shared_mutex 来做同步。 对于更新操作, 可以使用std::lock_guard
#include
c++11读写锁需要自行实现
c++读写锁实现
C++ 读写锁设计
当一个线程已经获取一个 std::mutex 时(已经上锁), 并对其再次上锁, 这个操作就是错误的, 并且继续尝试这样做的话, 就会产生未定义行为。 然而, 在某些情况下, 一个线程尝试获取同一个互斥量多次, 而没有对其进行一次释放是可以的。 之所以可以, 是因为C++标准库提供了 std::recursive_mutex 类。 除了可以对同一线程的单个实例上获取多个锁, 其他功能与 std::mutex 相同。 互斥量锁住其他线程前, 必须释放拥有的所有锁, 所以当调用lock()三次后, 也必须调用unlock()三次。 正确使用 std::lock_guard
大多数情况下, 当需要嵌套锁时, 就要对代码设计进行改动。 嵌套锁一般用在可并发访问的类上, 所以使用互斥量保护其成员数据。 每个公共成员函数都会对互斥量上锁, 然后完成对应的操作后再解锁互斥量。 不过, 有时成员函数会调用另一个成员函数, 这种情况下, 第二个成员函数也会试图锁住互斥量, 这就会导致未定义行为的发生。 “变通的”解决方案会将互斥量转为嵌套锁, 第二个成员函数就能成功的进行上锁, 并且函数能继续执行。但是, 不推荐这样的使用方式, 因为过于草率, 并且不合理。 特别是, 当锁被持有时, 对应类的不变量通常正在被修改。 这意味着, 当不变量正在改变的时候, 第二个成员函数还需要继续执行。 一个比较好的方式是, 从中提取出一个函数作为类的私有成员, 并且让其他成员函数都对其进行调用, 这个私有成员函数不会对互斥量进行上锁(在调用前必须获得锁)。 然后, 你仔细考虑一下, 在这种情况调用新函数时, 数据的状态。
我们看到各种在线程间保护共享数据的方法。 我们不仅想要保护数据, 还想对单独的线程进行同步。 例如, 在第一个线程完成前, 可能需要等待另一个线程执行完成。 通常情况下, 线程会等待一个特定事件发生, 或者等待某一条件达成。 这可能需要定期检查“任务完成”标识, 或将类似的东西放到共享数据中, 但这与理想情况差很多。像这种情况就需要在线程中进行同步, C++标准库提供了一些工具可用于同步操作, 形式上表现为条件变量(condition variables)和期望值(futures)。 并发技术规范(TS)中, 为期望值添加了更多的操作, 并与新的同步工具**锁存器(latches)(轻量级锁资源)和栅栏机制(barriers)**一起使用。
C++标准库对条件变量有两套实现: std::condition_variable 和 std::condition_variable_any 。 这两个实现都包含在
#include
#include
#include
#include std::mutex mut;
std::queue data_queue;
std::condition_variable data_cond;void data_preparation_thread()
{while(more_data_to_prepare()){data_chunk const data=prepare_data();std::lock_guard lk(mut);data_queue.push(data);data_cond.notify_one();}
}void data_processing_thread()
{while(true){std::unique_lock lk(mut); //必须使用unique_lock,而不是lock_guard。后者生命周期内无法解锁再上锁。data_cond.wait(lk,[]{return !data_queue.empty();});data_chunk data=data_queue.front();data_queue.pop();lk.unlock();process(data);if(is_last_chunk(data))break;}
}int main()
{std::thread t1(data_preparation_thread);//生产者std::thread t2(data_processing_thread); //消费者t1.join();t2.join();
}
wait()会去检查这些条件(通过调用所提供的lambda函数), 当条件满足(lambda函数返回true)时返回。 如果条件不满足(lambda函数返回false), wait()函数将解锁互斥量, 并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。 当准备数据的线程调用notify_one()通知条件变量时, 处理数据的线程从睡眠状态中苏醒, 重新获取互斥锁, 并且再次检查条件是否满足。 在条件满足的情况下, 从wait()返回并继续持有锁; 当条件不满足时, 线程将对互斥量解锁, 并且重新开始等待。 这就是为什么用 std::unique_lock 而不使用 std::lock_guard ——等待中的线程必须在等待期间解锁互斥量, 并在这之后对互斥量再次上锁, 而 std::lock_guard 没有这么灵活。 如果互斥量在线程休眠期间保持锁住状态, 准备数据的线程将无法锁住互斥量, 也无法添加数据到队列中; 同样的, 等待线程也永远不会知道条件何时满足。
#include
#include
#include
#include template
class threadsafe_queue
{
private:mutable std::mutex mut; //锁住互斥量事个可变操作,empyt()等const成员函数中的也需要锁std::queue data_queue;std::condition_variable data_cond;
public:threadsafe_queue(){}threadsafe_queue(threadsafe_queue const& other){std::lock_guard lk(other.mut);data_queue=other.data_queue;}void push(T new_value){std::lock_guard lk(mut);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value){std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});//if ( data_cond.wait_until(lk, time, pred()) ) //会超时,返回值仍然是pred()的结果value=data_queue.front();data_queue.pop();}std::shared_ptr wait_and_pop(){std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});std::shared_ptr res(std::make_shared(data_queue.front()));data_queue.pop();return res;}bool try_pop(T& value){std::lock_guard lk(mut);if(data_queue.empty)return false;value=data_queue.front();data_queue.pop();return true;}std::shared_ptr try_pop(){std::lock_guard lk(mut);if(data_queue.empty())return std::shared_ptr();std::shared_ptr res(std::make_shared(data_queue.front()));data_queue.pop();return res;}bool empty() const //{std::lock_guard lk(mut);return data_queue.empty();}
};
empty()是一个const成员函数, 并且传入拷贝构造函数的other形参是一个const引用; 因为其他线程可能有这个类型的非const引用对象, 并调用变种成员函数, 所以这里有必要对互斥量上锁。 又因为锁住互斥量是个可变操作,所以互斥量成员必须( 修饰) 为mutable①才能在empty()和拷贝构造函数中锁住。
#include
#include
struct X
{void foo(int,std::string const&);std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); // 调用p->foo(42, "hello"), p是指向x的指针
auto f2=std::async(&X::bar,x,"goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本struct Y
{double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); // 调用tmpy(3.141), tmpy通过Y的移动构造函数得到
auto f4=std::async(std::ref(y),2.718); // 调用y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用baz(x)class move_only
{
public:move_only();move_only(move_only&&)move_only(move_only const&) = delete;move_only& operator=(move_only&&);move_only& operator=(move_only const&) = delete;void operator()();
};
auto f5=std::async(move_only()); // 调用tmp(), tmp是通过std::move(move_only())构造得到auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
auto f8=std::async(std::launch::deferred | std::launch::async,baz,std::ref(x)); // 实现选择执行方式
auto f9=std::async(baz,std::ref(x));
f7.wait(); // 调用延迟函数
使用 std::async 会更容易让算法分割到各个任务中, 这样程序就能并发的执行了。不过, 这不是让 std::future 与任务实例相关联的唯一方式; 你也可以将任务包装入 std::packaged_task<> 实例中, 或通过编写代码的方式, 使用 std::promise<> 类型模板显示设置值。与 std::promise<> 对比, std::packaged_task<> 具有更高层的抽象, 所以我们从“高抽象”的模板说起。
std::packaged_task<> 对一个函数或可调用对象, 绑定一个期望值。 当调用std::packaged_task<>对象时, 它就会调用相关函数或可调用对象, 将期望状态置为就绪, 返回值也会被存储。 这可以用在构建线程池的结构单元(可见第9章), 或用于其他任务的管理, 比如: 在任务所在线程上运行其他任务, 或将它们顺序的运行在一个特殊的后台线程上。 当一个粒度较大的操作被分解为独立的子任务时, 其中每个子任务都可以包含在一个 std::packaged_task<> 实例中, 之后这个实例将传递到任务调度器或线程池中。 对任务细节进行抽象, 调度器仅处理std::packaged_task<>实例, 而非处理单独的函数。
std::packaged_task<> 的模板参数是一个函数签名, 比如void()就是一个没有参数也没有返回值的函数, 或int(std::string&, double*)就是有一个非const引用的 std::string 和一个指向double类型的指针, 并且返回类型是int。 当构造出一个 std::packaged_task<> 实例时, 就必须传入一个函数或可调用对象; 这个函数或可调用的对象, 需要能接收指定的参数和返回可转换为指定返回类型的值。 类型可以不完全匹配, 可以用一个int类型的参数和返回一个float类型的函数, 来构建 std::packaged_task 的实例, 因为这里类型可以隐式转换。
因为 std::packaged_task 对象是一个可调用对象, 所以可以封装在std::function对象中, 从而作为线程函数传递到 std::thread 对象中, 或作为可调用对象传递另一个函数中, 或可以直接进行调用。 当 std::packaged_task 作为一个函数被调用时, 实参将由函数调用操作符传递到底层函数, 并且返回值作为异步结果存储在 std::future , 可通过get_future()获取。 因此你可以把用 std::packaged_task 打包任务, 并在它被传到别处之前的适当时机取回期望值。 当需要异步任务的返回值时, 你可以等待期望的状态变为“就绪”。
std::mutex m;
std::deque< std::packaged_task > tasks;bool gui_shutdown_message_received();
void get_and_process_gui_message();void gui_thread()
{while(!gui_shutdown_message_received()){get_and_process_gui_message();std::packaged_task task;{std::lock_guard lk(m);if(tasks.empty())continue;task=std::move(tasks.front());tasks.pop_front();}task();}
}std::thread gui_bg_thread(gui_thread);template
std::future post_task_for_gui_thread(Func f)
{std::packaged_task task(f); //7std::future res=task.get_future(); //8std::lock_guard lk(m);tasks.push_back(std::move(task)); //9return res;
}
将一个任务传入队列: 提供的函数⑦可以提供一个打包好的任务, 可以通过这个任务⑧调用get_future()成员函数获取期望值对象, 并且在任务被推入列表⑨之前, 期望值将返回调用函数⑩。 当需要知道线程执行完任务时, 向图形界面线程发布消息的代码, 会等待期望值改变状态; 否则, 会丢弃这个期望值
这些任务能作为一个简单的函数调用来表达吗? 还有, 任务的结果能从很多地方得到吗? 这些问题可以使用第三种方法创建期望值来解决: 使用 std::promise 对值进行显示设置。
std::promise 提供设定值的方式(类型为T), 这个类型会和后面看到的std::future对象相关联。 一对 std::promise/std::future 会为这种方式提供一个可行的机制; 期望值可以阻塞等待线程, 同时, 提供数据的线程可以使用组合中的承诺值来对相关值进行设置, 并将期望值的状态置为“就绪”
可以通过一个给定的 std::promise 的get_future()成员函数来获取与之相关的std::future对象,跟 std::packaged_task 的用法类似。 当承诺值已经设置完毕(使用set_value()成员函数), 对应期望值的状态变为“就绪”, 并且可用于检索已存储的值。 当在设置值之前销毁 std::promise , 将会存储一个异常。
#include
#include
#include
#include // std::promise, std::futurevoid print_int(std::future& fut) {int x = fut.get(); // 获取共享状态的值.std::cout << "value: " << x << '\n'; // 打印 value: 10.
}int main ()
{std::promise prom; // 生成一个 std::promise 对象.std::future fut = prom.get_future(); // 和 future 关联.std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步. 不能不设置,否则会一直等待//prom.set_value(1); // error 不能设置多次t.join();return 0;
}
double square_root(double x)
{if(x<0){throw std::out_of_range(“x<0”);}return sqrt(x);
}
//异步调用
std::future f=std::async(square_root,-1);
double y=f.get();
函数作为 std::async 的一部分时, 当调用抛出一个异常时, 这个异常就会存储到期望值中,之后期望值的状态被置为“就绪”, 之后调用get()会抛出这个已存储异常(注意: 标准级别没有指定重新抛出的这个异常是原始的异常对象, 还是一个拷贝; 不同的编译器和库将会在这方面做出不同的选择)。 将函数打包入 std::packaged_task 任务包中后, 到任务被调用时, 同样的事情也会发生; 打包函数抛出一个异常, 这个异常将被存储在期望值中, 准备在get()调用时再次抛出。
std::promise 也能提供同样的功能。 当存入的是一个异常而非一个数值时, 就需要调用set_exception()成员函数, 而非set_value()。 这通常是用在一个catch块中, 并作为算法的一部分, 为了捕获异常, 使用异常填充承诺值 。std::copy_exception() 会直接存储一个新的异常而不抛出 (指定一个自定义的异常)
extern std::promise some_promise;
try
{some_promise.set_value(calculate_value());
}
catch(...)
{some_promise.set_exception(std::current_exception());//some_promise.set_exception(std::copy_exception(std::logic_error("foo "))); //自定义异常
}
很多线程在等待的时候, 只有一个线程能获取等待结果。 当多个线程需要等待相同的事件的结果, 就需要使用 std::shared_future 来替代 std::future
std::future 并行代码没有办法让多个线程等待同一个事件。 std::shared_future 可以来帮你解决。 因为 std::future 是只移动的, 所以其所有权可以在不同的实例中互相传递, 但是只有一个实例可以获得特定的同步结果, 而 std::shared_future 实例是可拷贝的, 所以多个对象可以引用同一关联期望值的结果。
每一个 std::shared_future 的独立对象上, 成员函数调用返回的结果还是不同步的, 所以为了在多个线程访问一个独立对象时避免数据竞争, 必须使用锁来对访问进行保护。 优先使用的办法: 为了替代只有一个拷贝对象的情况, 可以让每个线程都拥有自己对应的拷贝对象(指向相同的异步结果)。 这样, 当每个线程都通过自己拥有的 std::shared_future 对象获取结果, 那么多个线程访问共享同步结果就是安全的。
应用场景:可能会使用 std::shared_future 的情况, 例如: 实现类似于复杂的电子表格的并行执行; 每一个单元格有单一的终值, 这个终值可能是由其他单元格中的数据通过公式计算得到的。 公式计算得到的结果依赖于其他单元格, 然后可以使用一个 std::shared_future 对象引用第一个单元格的数据。 当每个单元格内的所有公式并行执行后, 任务会以期望的方式完成工作; 不过, 当其中有计算需要依赖其他单元格的值, 那么它就会被阻塞, 直到依赖单元格的数据准备就绪。 这将让系统在最大程度上使用硬件并发。
std::promise p;
std::future f(p.get_future());
assert(f.valid()); // 1 期望值 f 是合法的
std::shared_future sf(std::move(f));
assert(!f.valid()); // 2 期望值 f 现在是不合法的
assert(sf.valid()); // 3 sf 现在是合法的std::promise p;
std::shared_future sf(p.get_future()); // 1 隐式转移所有权std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf=p.get_future().share();
需要限定等待事件的时间, 不论是因为时间上有硬性规定(一段指定的代码需要在某段时间内完成), 还是因为在事件没有很快的触发时, 有必要的工作需要特定线程来完成。 为了处理这种情况, 有非常多各种等待函数都能够指定超时任务
一种是“时延”, 另一种是“绝对时间点”。 第一种方式, 需要指定一段时间(例如, 30毫秒);第二种方式, 就是指定一个时间点(例如, 世界标准时间[UTC]17:30:15.045987023, 2011年11月30日)。 多数等待函数提供变量, 对两种超时方式进行处理。
处理持续时间的变量以**_for**作为后缀,
处理绝对时间的变量以**_until**作为后缀。
所以, 当 std::condition_variable 的两个成员函数wait_for()和wait_until()成员函数分别有两个负载, 这两个负载都与wait()成员函数的负载相关——其中一个负载只是等待信号触发, 或时间超期, 亦或是伪唤醒, 并且醒来时会检查锁提供的谓词, 并且只有在检查为true时才会返回(这时条件变量的条件达成), 或直接超时。
获取毫秒时间戳
#include
uint64_t get_time_msec()
{struct timeval now;gettimeofday(&now, NULL);return (now.tv_sec * 1000 + now.tv_usec / 1000);
}#include
uint64_t getSystemTime()
{using namespace std::chrono;auto tp = time_point_cast(system_clock::now());auto tmp = duration_cast(tp.time_since_epoch()); return tmp.count();
}
//time_since_epoch函数表示求 从1970-01-01到该时间点的duration。
//duration_cast函数表示把精度强转到<>内的精度
//count表示数出有多少个滴答(chrono库里,精度表示一个滴答的时间间隔。毫秒精度,就是1毫秒1个滴答)
对于C++标准库来说, 时钟就是时间信息源。 并且, 时钟是一个类, 提供了四种不同的信息:
当前时间可以通过调用静态成员函数now()从时钟类中获取; 例如, std::chrono::system_clock::now() 是将返回系统时钟的当前时间。 特定的时间点类型可以通过time_point的数据typedef成员来指定, 所以some_clock::now()的类型就是some_clock::time_point。
时钟节拍被指定为1/x(x在不同硬件上有不同的值)秒, 这是由时间周期所决定——一个时钟一秒有25个节拍, 因此一个周期为 std::ratio<1, 25> , 当一个时钟的时钟节拍每2.5秒一次, 周期就可以表示为 std::ratio<5, 2> 。 当时钟节拍在运行时获取时, 可以使用一个给定的应用程序运行多次, 用执行的平均时间求出, 其中最短的时间可能就是时钟节拍, 或者是写在手册当中。 这就不保证, 在给定应用中观察到的节拍周期与指定的时钟周期是否相匹配。
C++14中 std::chrono_literals 命名空间中
using namespace std::chrono_literals;
auto one_day=24h;
auto half_an_hour=30min;
auto max_time_between_messages=30ms;std::chrono::milliseconds ms(54802);
std::chrono::seconds s= std::chrono::duration_cast(ms);//54s,该转换是截断的
基于时延的等待可由std::chrono::duration<> 来完成, 例如: 等待期望值状态变为就绪已经35毫秒
future.wait_for
std::future f=std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)do_something_with(f.get());
// future::wait_for
#include // std::cout
#include // std::async, std::future
#include // std::chrono::milliseconds// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {for (int i=2; i// call function asynchronously:std::future fut = std::async (is_prime,700020007); // do something while waiting for function to set future:std::cout << "checking, please wait";std::chrono::milliseconds span (100);while (fut.wait_for(span)==std::future_status::timeout)std::cout << '.'; // 异步的做,每100ms可以打印一个 "." 作为提示信息bool x = fut.get();std::cout << "\n700020007 " << (x?"is":"is not") << " prime.\n";return 0;
}
//checking, please wait..........................................
//700020007 is prime.
等待函数会返回一个状态值, 表示是等待是超时, 还是继续等待。 这里可以等待期望值, 所以当函数等待超时时, 会返回 std::future_status::timeout ; 当期望值状态改变, 函数会返回 std::future_status::ready; 当与期望值相关的任务延迟了, 函数会返回 std::future_status::deferred 。 基于时延的等待是使用内部库的稳定时钟来计时的; 所以,即使系统时钟在等待时被调整(向前或向后), 35毫秒的时延在这里意味着, 的确耗时35毫秒。 当然, 系统调度的不确定性和不同操作系统的时钟精度都意味着: 线程调用和返回的实际时间间隔可能要比35毫秒长。
等待一个条件变量—超时功能
#include
#include
#include
std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{auto const timeout= std::chrono::steady_clock::now()+std::chrono::milliseconds(500);std::unique_lock lk(m);while(!done){if(cv.wait_until(lk,timeout)==std::cv_status::timeout)break;}return done;//return cv.wait_until(lk, timeout, [](){return done;}); //精简版
}
std::mutex 和std::recursive_mutex都不支持超时锁, 但是 std::timed_mutex 和 std::recursive_timed_mutex 支持。 这两种类型也有try_lock_for()和try_lock_until()成员函数, 可以在一段时期内尝试获取锁或在指定时间点前获取互斥锁。
| Class/Namespace | Functions | Return Values |
|---|---|---|
| std::this_thread namespace | sleep_for(duration) sleep_until(time_point) | N/A |
| std::condition_variable or std::condition_variable_an ywait_for(lock,duration) wait_until(lock,time_ point) | std::cv_status::timeout or std::cv_status::no_timeout wait_for(lock,duration, predicate) wait_until(lock,time_point, predicate) | bool—the return value of the predicate when woken |
| std::timed_mutex, std::recursive_timed_mutex or std::shared_timed_ mutextry_lock_for(duration) try_lock_until(time_point) | bool—true if the lock was acquired, false otherwise | |
| std::shared_timed_mutex | try_lock_shared_for(duration) try_lock_shared_until(time_ point) | bool—true if the lock was acquired, false otherwise |
| std::unique_lock | N/A—owns_lock() on the newly constructed object returns true if the lock was acquired, false otherwise try_lock_for(duration) try_lock_until(time_point) | bool—true if the lock was acquired, false otherwise |
| std::shared_lock | N/A—owns_lock() on the newly constructed object returns true if the lock was acquired, false otherwise try_lock_for(duration) try_lock_until(time_point) | bool—true if the lock was acquired, false otherwise |
| std::future | std::future_status::timeout if the wait timed out, std::future_status::ready if the future is ready, or std::future_status::deferred if the future holds a deferred function that hasn’t yet started |
术语函数化编程 (functional programming) 引用于一种编程方式, 这种方式中的函数结果只依赖于传入函数的参数, 并不依赖外部状态。 期望值作为拼图的最后一块, 它使得函数化编程模式并发化(FP-style concurrency)在C++中成为可能; 期望值对象可以在线程间互相传递, 并允许计算结果依赖于另外一个, 而非对共享数据的显式访问。
void splice (const_iterator position, list& x);
void splice (const_iterator position, list& x, const_iterator i);
void splice (const_iterator position, list& x, const_iterator first, const_iterator last);The first version (1) transfers all the elements of x into the container.
The second version (2) transfers only the element pointed by i from x into the container.
The third version (3) transfers the range [first,last) from x into the container.
快排再FP-模式的顺序实现, 需要传入列表, 并且返回一个列表
// Download by www.cctry.com
#include
#include
template
std::list sequential_quick_sort(std::list input)
{if (input.empty()){return input;}std::list result;result.splice(result.begin(), input, input.begin());T const &pivot = *result.begin();// 重新排列范围 [first,last) 中的元素,使得 pred 返回 true 的所有元素都位于返回 false 的所有元素之前。// 返回的迭代器指向第二组的第一个元素。auto divide_point = std::partition(input.begin(), input.end(),[&](T const &t){ return t < pivot; })std::list lower_part;lower_part.splice(lower_part.end(), input, input.begin(), divide_point);auto new_lower( sequential_quick_sort(std::move(lower_part)) );auto new_higher( sequential_quick_sort(std::move(input)) );return result;
}
#include
#include
#include
template
std::list parallel_quick_sort(std::list input)
{if (input.empty()){return input;}std::list result;result.splice(result.begin(), input, input.begin());T const &pivot = *result.begin();auto divide_point = std::partition(input.begin(), input.end(),[&](T const &t){ return t < pivot; });std::list lower_part;lower_part.splice(lower_part.end(), input, input.begin(), divide_point);std::future> new_lower( std::async(¶llel_quick_sort, std::move(lower_part)) );auto new_higher(parallel_quick_sort(std::move(input)));result.splice(result.end(), new_higher);result.splice(result.begin(), new_lower.get());return result;
}
这里最大的变化是,当前线程不对小于“中间”值部分的列表进行排序, 使用 std::async() ①在另一线程对其进行排序。 大于部分列表, 如同之前一样, 使用递归的方式进行排序②。 通过递归调用parallel_quick_sort(), 就可以利用硬件并发了。 std::async() 会启动一个新线程, 这样当递归三次时, 就会有八个线程在运行了; 当你递归十次(对于大约有1000个元素的列表), 如果硬件能处理这十次递归调用, 将会创建1024个执行线程。 当运行库认为这样做产生了太多的任务时(也许是因为数量超过了硬件并发的最大值), 运行库可能会同步的切换新产生的任务。 当任务过多时(已影响性能), 为了避免任务想线程传递的开销, 这些任务应该在使用get()函数获取结果的线程上运行, 而不是在新线程上运行。这完全符合 std::async 的实现, 为每一个任务启动一个线程(甚至是在任务超额时, 也就是在 std::launch::deferred 没有明确规定的情况下), 或为了同步执行所有任务(在 std::launch::async 有明确规定的情况下)。
比起使用 std::async() , 这里可以写一个spawn_task()函数对 std::packaged_task 和 std::thread 做简单的包装, 如清单4.14中的代码所示; 需要为函数结果创建一个 std::packaged_task 对象, 并从这个对象中获取期望值,或在线程中执行它返回期望值。 其本身并没有太多优势(事实上会造成大规模的超额任务), 但它可为转型成一个更复杂的实现铺平道路, 实现将会向队列添加任务, 而后使用线程池的方式来运行它们, 第9章再来讨论线程池。 使用 std::async 更适合于已知所有情况的任务, 并且在线程池中构建或执行过任务的线程要能完全控制。
template
std::future::type>
spawn_task(F&& f,A&& a)
{typedef std::result_of::type result_type;std::packaged_task task(std::move(f)));std::future res(task.get_future());std::thread t(std::move(task),std::move(a));t.detach();return res;
}
因避开了共享可变数据, 函数化编程可算作是并发编程的范型, 并且也是通讯顺序进程(CSP, Communicating Sequential Processer[3])的范型, 这里的线程理论上是完全分开的, 也就是没有共享数据, 但是有通讯通道允许信息在不同线程间进行传递。 这种范型被Erlang语言所采纳, 并且在**MPI(Message Passing Interface, 消息传递接口)**上常用来做C和C++的高性能运算。
CSP的概念十分简单: 当没有共享数据时, 每个线程可以进行独立思考,其行为纯粹基于所接收到的信息。 每个线程就都有自己的状态机: 当线程收到一条信息, 会以某种方式更新状态, 并且可能向其他线程发出一条或多条信息, 消息的处理机制依赖于线程的初始化状态。 这是一种将其形式以有限状态机的模式实现, 但是并不是唯一的方案; 状态机可以在应用程序中隐式实现。 这种方法在任何情况下, 都更加依赖于明确的行为要求和编程团队的专业知识。 无论选择用哪种方式去实现每个线程, 任务都会有独立的处理部分, 这样就能消除潜在的混乱(数据共享并发), 这就让编程变的更加简单, 且降低错误率。
真正通讯顺序处理没有共享数据, 所有消息都是通过消息队列传递, 但C++线程共享一块地址空间, 所以达不到真正通讯顺序处理的要求。 这就需要一些约定来支持: 作为应用或是库作者, 我们有责任确保在实现中, 线程不存在共享数据。 当然, 为了线程间的通信, 消息队列必须共享, 具体的细节要包含在库中。
试想有一天你要为实现ATM(自动取款机)写一个应用。 这个应用需要处理: 人们尝试取钱时和银行之间的交互情况,以及控制物理器械接受用户的卡片, 显示适当的信息, 处理按钮事件, 吐出现金, 还有退还用户的卡 。
一种处理方法是让代码将所有事情分配到三个独立线程上去: 一个线程去处理物理机械, 一个去处理ATM机的逻辑, 还有一个用来与银行通讯。 这些线程可以通过信息进行纯粹的通讯, 而不共享任何数据。 比如, 当有人在ATM机上插入了卡片或者按下按钮, 处理物理机械的线程将会发送一条信息到逻辑线程上, 并且逻辑线程将会发送一条消息到机械线程, 告诉机械线程可以分配多少钱, 等等。
一种为ATM机逻辑建模的方式, 就是将其当做一个状态机。 线程的每一个状态都会等待一条可接受的信息, 这条信息包含需要处理的内容。 这会让线程过渡到新的状态, 并且继续循环。 图4.3中将展示有状态参与的一个简单是实现。 这个简化实现中, 系统在等待一张卡插入。 当有卡插入时, 系统将会等待用户输入PIN(类似身份码的东西), 每次输入一个数字。 用户可以将最后输入的数字删除。 当数字输入完成, 需要验证PIN。 当验证有问题时, 程序就需要终止, 就需要为用户退出卡, 并且继续等待其他人将卡插入到机器中; 当验证通过时, 程序要等待用户取消交易或选择取款。 当用户选择取消交易, 程序可以结束, 并返还卡片。 当用户选择取出一定量的现金, 程序就要在吐出现金和返还卡片前等待银行方面的确认, 或显示“余额不足”的信息, 并返还卡片。 很明显, 一个真正的ATM机要考虑的东西更多、 更复杂, 但是对于我们来说, 这样描述已经足够了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p8Y1Cipd-1669131297385)(assets/image-20221030143308110.png)]
//Download by www.cctry.com
#include struct card_inserted
{std::string account;};
class atm
{messaging::receiver incoming;messaging::sender bank;messaging::sender interface_hardware;void (atm::*state)();std::string account;std::string pin;void waiting_for_card() //1{interface_hardware.send(display_enter_card()); //2incoming.wait() //3.handle([&](card_inserted const& msg) //4{account=msg.account;pin="";interface_hardware.send(display_enter_pin());state=&atm::getting_pin;});}void getting_pin();
public:void run() //5{state=&atm::waiting_for_card; //6try{for(;;){(this->*state)(); //7}}catch(messaging::close_queue const&){}}
};
之前提到的, 这个实现对于实际ATM机的逻辑来说非常简单, 但是能让你感受到信息传递编程的方式。 这里无需考虑同步和并发问题, 只需要考虑什么时候接收信息和发送信息即可。 为ATM逻辑所设的状态机运行在独立的线程上, 与系统的其他部分一起, 比如: 与银行通讯的接口, 以及运行在独立线程上的终端接口。 这种程序设计的方式被称为参与者模式(Actor model)——在系统中有很多独立的(运行在一个独立的线程上)参与者, 这些参与者会互相发送信息, 去执行手头上的任务, 并且不会共享状态, 除非是通过信息直接传入的。
运行从run()成员函数开始⑤, 初始化waiting_for_card⑥的状态, 然后反复执行当前状态的成员函数(无论这个状态时怎么样的)⑦。 状态函数是简易atm类的成员函数。 wait_for_card函数①依旧很简单: 它发送一条信息到接口, 让终端显示“等待卡片”的信息②, 之后就等待传入一条消息进行处理③。 处理的消息类型只能是card_inserted类的, 这里使用一个Lambda函数④对其进行处理。 当然, 可以传递任何函数或函数对象去处理函数, 但对于一个简单的例子来说, 使用Lambda表达式是最简单的方式。 注意, handle()函数调用与wait()函数进行连接的; 当收到的信息类型与处理类型不匹配, 收到的信息将会被丢弃, 并且线程继续等待, 直到接收到一条类型匹配的消息。
Lambda函数自身只是将用户的账号信息缓存到一个成员变量中去, 并且清除PIN信息, 再发送一条消息到硬件接口,让显示界面提示用户输入PIN, 然后将线程状态改为“获取PIN”。 当消息处理程序结束, 状态函数就会返回, 然后主循环会调用新的状态函数⑦。
void atm::getting_pin()
{incoming.wait().handle([&](digit_pressed const& msg){unsigned const pin_length=4;pin+=msg.digit;if(pin.length()==pin_length){bank.send(verify_pin(account,pin,incoming));state=&atm::verifying_pin;}}).handle([&](clear_last_pressed const& msg){if(!pin.empty()){pin.resize(pin.length()-1);}}).handle([&](cancel_pressed const& msg){state=&atm::done_processing;});
}
这次需要处理三种消息类型, 所以wait()函数后面接了三个handle()函数调用①②③。 每个handle()都有对应的消息类型作为模板参数, 并且将消息传入一个Lambda函数中(其获取消息类型作为一个参数)。 因为调用都被连接在了一起, wait()的实现知道它是等待一条digit_pressed消息, 或是一条clear_last_pressed肖息, 亦或是一条cancel_pressed消息, 这样其他的消息类型将会被丢弃。
当获取一条消息时, 无需再去改变状态。 比如, 当获取一条digit_pressed消息时, 仅需要将其添加到pin中, 除非那些数字是最终的输入。 (清单4.15中)主循环⑦将会再次调用getting_pin()去等待下一个数字(或清除数字, 或取消交易)。
这里对应的动作如图4.3所示, 每个状态盒的实现都由一个不同的成员函数构成, 等待相关信息并适当的更新状态。一个并发系统中, 这种编程方式可以极大的简化任务的设计, 因为每一个线程都完全被独立对待。 因此, 使用多线程去分离关注点时, 需要明确线程之间的任务应该如何分配。
并发技术扩展规范在 std::experiment 命名空间中提供了新的 std::promise 和 std::packaged_taks 。 与 std 命名空间中类型完全不同, 其返回实例类型为 std::experimental::future , 而不是 std::future 。 这能让使用者体会 std::experimental::future 所带来的新特性——持续性
假设你的任务运行会产生一个结果, 并且期望值持有这个结果。 然后, 需要写一些代码来处理这个结果。 使用 std::future 时, 必须等待期望值状态变为就绪态, 要不就使用全阻塞成员函数wait(), 或是使用wait_for()/wait_unitl()成员函数直到等待超时。 这会让代码变得非常复杂。 想要用一句话来说, 就是“完事俱备, 只等数据”, 这也就是持续性的意义。 为了给期望值添加持续性, 只需要在成员函数后添加then()即可。 比如:给定一个期望值fut, 添加持续性的调用即为fut**.then**(continuation)。
与 std::future 类似 , std::experimental::future 存储值也只能检索一次。 如果期望值正处于持续使用状态, 那这个期望值就不能被其他代码所访问。 因此, 使用fut.then()为fut期望值添加持续性后, 对原始期望值fut的操作就是非法的。 另外, 调用fut.then()会返回一个新期望值, 这个新期望值会持有持续性调用的结果。 具体代码, 如下所示:
std::experimental::future find_the_answer;
auto fut=find_the_answer();
auto fut2=fut.then(find_the_question);
assert(!fut.valid());
assert(fut2.valid());std::string find_the_question(std::experimental::future the_answer);
#include
template
std::experimental::future()())>
spawn_async(Func&& func){std::experimental::promise< decltype(std::declval()()) > p;//推断 可调用物Func的返回值auto res=p.get_future();std::thread t([p=std::move(p), f=std::decay_t(func)]()mutable{try{p.set_value_at_thread_exit(f());} catch(...){p.set_exception_at_thread_exit(std::current_exception());}});t.detach();return res;
}
值是从一个then()调用中返回, 其返回的期望值是一个完整的期望值。 这也就意味着, 可以将持续性进行连接
std::declval: 将任何一个类型T转换成引用类型,令在decltype表达式中不必经过构造函数就能使用成员函数。
假设你有一些列耗费时间的任务要完成, 并且想要使用多线程的方式异步完成这些任务, 从而减轻主线程上的计算压力。 例如: 有用户登录了你的应用时, 需要将登录凭证发送给后台; 然后, 对身份信息进行验证后, 进一步从后台获取用户的账户信息; 最后, 当索引到相关信息后, 使用获取到的信息对显示进行更新。 串行执行的话, 可以写成如下的方式:
#include
void process_login(std::string const& username, std::string const& password)
{try {user_id const id=backend.authenticate_user(username,password);user_data const info_to_display=backend.request_current_info(id);update_display(info_to_display);} catch(std::exception& e){display_error(e);}
}
你想要的是一段异步代码, 所以不想阻塞UI线程。 使用 std::async 将另一个列表全部放在后台线程上, 不过这依旧会阻塞UI线程, 在等待这些任务完成的同时, 会消耗大量的资源。 如果有很多这样的任务, 可以结束一些只在等待的线程, 从而节省资源。
#include
#include
std::future process_login(std::string const& username,std::string const& password)
{return std::async(std::launch::async,[=](){try {user_id const id=backend.authenticate_user(username,password);user_data const info_to_display=backend.request_current_info(id);update_display(info_to_display);} catch(std::exception& e){display_error(e);}});
}
为了避免阻塞相应线程, 需要有机制对每个完成的任务进行连接: 持续性。 下面的代码清单展示的处理过程大体相同, 但这次将整个任务分成了一系列任务, 并且每个任务在完成的时候回连接到前一个任务上。
#include
std::experimental::future process_login(std::string const& username,std::string const& password)
{return spawn_async([=](){return backend.authenticate_user(username,password);}).then([](std::experimental::future id){return backend.request_current_info(id.get());}).then([](std::experimental::future info_to_display){try{update_display(info_to_display.get());} catch(std::exception& e){display_error(e);}});
}
需要注意的是, 每个持续性函数都有一个std::experimental::future作为独立参数, 然后使用.get()来获取其拥有的值。 这意味着异常会沿着这个链条进行传播, 如果有函数抛出异常, 那么就会在调用info_to_display.get()时抛出, 捕获结构可以处理所有的异常类型, 就如清单4.18的catch那样。
因为需要等待消息通过网络或数据操作进行传输, 所函数内部会对后端模块进行调用, 但这时前端的任务可能还没有完成。 虽然已经将任务进行分割成独立的小任务, 但它们仍然会阻塞调用, 这样就会阻塞线程的运行, 这些需要在后端任务完成时, 前端处理就已经准备好了, 而不是对线程进行阻塞。 这样的话,backend.async_authenticate_user(username,password)返回std::experimental::future
你可能觉得这段代码比较复杂, 因为持续函数返回的期望值类型为 future
std::experimental::future process_login(std::string const& username,std::string const& password)
{return backend.async_authenticate_user(username,password).then([](std::experimental::future id){return backend.async_request_current_info(id.get());}).then([](std::experimental::future info_to_display){try{update_display(info_to_display.get());} catch(std::exception& e){display_error(e);}});
}
我们一直将注意力放在支持持续性的 std::experimental::future 上。 std::experimental::shared_future 同样支持持续性。 二者的区别在于, std::experimental::shared_future 对象可以具有多个持续性对象, 并且持续性参数是 std::experimental::shared_future, 而不是 std::experimental::future 。 std::experimental::shared_future 脱离了共享的本性——因为多个对象可以引用相同的共享状态, 如果只允许一个延续, 那么多个线程的情况下就会产生条件竞争, 每个线程都试图将持续性对象添加到在自己的 std::experimental::shared_future 对象中。 这种情况的确很糟糕, 所以才允许多持续性的存在。 当使用多持续性时, 可以通过同一个 std::experimental::shared_future 对象对其进行添加。 另外, 当只打算给第二个持续性传递对象时, 不能给第一个持续性的传递一个临时 std::experimental::shared_future 对象。 因此, 传递给延续性函数的参数也必须是 std::experimental::shared_future 对象
auto fut = spawn_async(some_function).share();
auto fut2 = fut.then([](std::experimental::shared_future data){do_stuff(data);
});
auto fut3 = fut.then([](std::experimental::shared_future data){return do_other_stuff(data);
});
假设你有很多的数据需要处理, 并且每个数据都可以单独的进行处理。 这是利用硬件的好机会, 可以产生异步任务组来处理数据项, 每个任务通过期望值来返回处理过后的数据。 不过, 当需要等待所有任务完成, 才能得到最终的结果, 对于逐个的对期望值进行收集, 然后在整理结果, 这总觉得不是很爽。 如果打算用一个异步任务来收集结果, 那就先要生成这个异步任务, 这样的话就会占用一个线程的资源, 并且需要不断的对期望值进行轮询, 当所有期望值状态为就绪时, 生成新的任务。 下面就展示了一个这样的例子:
#include
std::future process_data(std::vector& vec)
{size_t const chunk_size=whatever;std::vector> results;for(auto begin=vec.begin(),end=vec.end();beg!=end;){size_t const remaining_size=end-begin;size_t const this_chunk_size=std::min(remaining_size,chunk_size);results.push_back(std::async(process_chunk,begin,begin+this_chunk_size));begin+=this_chunk_size;}return std::async([all_results=std::move(results)](){std::vector v;v.reserve(all_results.size());for(auto& f: all_results){v.push_back(f.get());//1}return gather_results(v);});
}
这段代码会生成一个异步任务来等待处理结果, 在所有处理结果都就绪的情况下, 对结果进行整合。 不过, 每个任务都是独立的, 因此当结果可用前, 调度程序会在①处反复的进行唤醒, 当发现有没有变为就绪态的结果时, 再次回到休眠的状态。 这样的方式不仅会占用线程资源, 而且在之后对期望值的操作中会增加上下文切换频率, 从而使应用增加了很多额外的开销。
可以使用 std::experimental::when_all 来避免这里的等待和切换, 可以将一组需要等待的期望值传入when_all函数中, 其会返回一个新的期望值——当传入的所有期望值的状态都为就绪时, 这个新的期望值状态就会被置为就绪, 这个期望值可以和持续性配合起来一起处理其他的任务。 下面的代码就展示了这样的一种方式: 使用 std::experimental::when_all 从多个期望值中收集结果
std::experimental::future process_data(std::vector& vec)
{size_t const chunk_size=whatever;std::vector> results;for(auto begin=vec.begin(),end=vec.end();beg!=end;){size_t const remaining_size=end-begin;size_t const this_chunk_size=std::min(remaining_size,chunk_size);results.push_back(spawn_async(process_chunk,begin,begin+this_chunk_size));begin+=this_chunk_size;}return std::experimental::when_all(results.begin(),results.end()).then([](std::future< std::vector> > ready_results){std::vector> all_results = ready_results.get();std::vector v;v.reserve(all_results.size());for(auto& f: all_results){v.push_back(f.get());}return gather_results(v);});
}
这个例子中, 可以看到when_all函数会等待所有期望值的状态变为就绪, 然后再用 .then 调用调度函数①, 而不是使用async。 虽然Lambda表达式表面上看上去是一样的, 但这里将results的vector作为参数(包装到期望值中),而不是放在捕获器中, 并在之后对每个期望值使用get②, 从而无阻塞的获得所有处理后的结果。 这不需要对代码做太多的修改, 就能介绍系统的负担。
为了补全when_all, 我们也有when_any。 其也会产生一个期望值, 当期望值组中任意一个期望为就绪态, 那么这个新期望值的状态即为就绪。 这对于并发性任务是一个不错的选择, 不过就需要为第一个为就绪的线程找点事情来做。
使用 std::experimental::when_any 处理第一个被找到的值
#include
#include
#include
#include
struct MyData {};
struct FinalResult {};bool matches_find_criteria(MyData const &);
FinalResult process_found_value(MyData const &);std::experimental::future
find_and_process_value(std::vector &data) {unsigned const concurrency = std::thread::hardware_concurrency();unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;std::vector> results;auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;auto chunk_begin = data.begin();std::shared_ptr> done_flag =std::make_shared>(false);for (unsigned i = 0; i < num_tasks; ++i) { //1auto chunk_end =(i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();results.push_back(std::experimental::async([=] { //2for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end);++entry) {if (matches_find_criteria(*entry)) {*done_flag = true;return &*entry;}}return (MyData *)nullptr;}));chunk_begin = chunk_end;}std::shared_ptr> final_result =std::make_shared>();struct DoneCheck {std::shared_ptr> final_result;DoneCheck(std::shared_ptr>final_result_): final_result(std::move(final_result_)) {}void operator()( //4std::experimental::future>>>results_param) {auto results = results_param.get();MyData *const ready_result = results.futures[results.index].get(); //5if (ready_result)final_result->set_value(process_found_value(*ready_result)); //6else {results.futures.erase(results.futures.begin() + results.index); //7if (!results.futures.empty()) {std::experimental::when_any( //8results.futures.begin(), results.futures.end()).then(std::move(*this));} else {final_result->set_exception(std::make_exception_ptr(std::runtime_error(“Not found”))); //9}}}};std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result)); //3return final_result->get_future(); //10
}
初始化循环①会产生num_tasks个异步任务, 每个任务都会执行②处的Lambda表达式。 这个Lambda表达式的捕获方式是拷贝, 所以每个任务都有自己的chunk_begin和chunk_end, 这里同样也拷贝了共享指针done_flag。 这就避免了生命周期所带来的问题。
当所有任务都已经产生, 希望对任务的返回结果进行处理。 可以调用when_any③通过连接持续性完成。 这次可将持续性以类的方式去编写, 因为想要对其进行递归复用。 当其中一个任务完成初始化, DoneCheck的函数操作符会被调用④。 首先, 已经准备好从就绪的期望值中获取值⑤, 并且当符合条件的值被找到, 可以对结果进行处理, 并对最终结果进行设置⑥。 否则, 就需要从集合中丢弃就绪的期望值⑦, 当还有很多期望值需要检查时, 会产生对when_any的再次调用⑧, 要再触发其持续性, 需要等待下个期望值到达就绪态。 如果没有剩下任何其他期望值, 就说明这个值没找到, 那么将会在期望值中存储一个异常⑨。 函数的返回值是一个期望值, 其包含有最终的结果⑩。 当然, 这个问题还有其他解法, 不过就想在这里展示一下如何使用when_any。
有时所等待的事件是一组线程, 或是要达到代码的特定点, 或是需要配合着处理了一定量的数据。 这种情况下, 最好使用锁存器或栅栏机制, 而不是期望值。 现在, 让我们来了解一下并发技术扩展规范所提供的锁存器和栅栏机制。
锁存器或是栅栏机制是什么意思。 锁存器是一种同步对象, 当它的计数器减为0时, 它就准备就绪了。 锁存器这个名称是基于其输出特性——当处于就绪态时, 其就会保持就绪态, 直到被销毁。 因此, 锁存器是为同步一系列事件发生的轻量级装置。
栅栏机制是一种可复用的同步装置, 其用于一组线程间的内部同步。 虽然, 锁存器不在乎是哪个线程使得计数器递减——同一个线程可以对计数器递减多次, 或多个线程对计数器递减一次, 再或是其中有些线程对计数器有两次的递减——对于栅栏机制来说, 每一个线程只能在每个周期到达栅栏一次。 当线程都抵达栅栏时, 会对线程进行阻塞, 直到所有线程都达到栅栏处, 这时阻塞将会被解除。 栅栏可以复用——线程可以再次到达栅栏处, 等待下一个周期的所有线程
std::experimental::latch 声明在 std::experimental::latch 时, 将计数器的值作为构造函数的唯一参数。 之后, 当等待的事件发生, 就会调用锁存器count_down成员函数; 当计数器为0时, 锁存器状态变为就绪。 可以调用wait成员函数对锁存器进行阻塞, 直到等待的锁存器处于就绪状态时释放; 如果需要对锁存器是否就绪的状态进行检查时, 可调用is_ready成员函数。 想要减少计数器1并阻塞直至它抵达0, 则可以调用count_down_and_wait成员函数。 下面代码清单展示一个简单的例子:
使用 std::experimental:: 等待所有事件
void foo(){unsigned const thread_count=...;latch done(thread_count); //1 my_data data[thread_count];std::vector > threads;for(unsigned i=0;i //2 data[i]=make_data(i);done.count_down(); //3 do_more_stuff(); //4 }));done.wait(); //5 process_data(data,thread_count); //6
}//7
使用需要等待的事件数量对done的构造进行初始化①, 并且使用 std::async 产生适量的线程②。 在进行下一步之前④, 每个线程生成了相应的数据块时, 都会对锁存器的计数器进行递减③。 在处理生成的数据⑥之前, 主线程只需要等待锁存器成为就绪态即可⑤。 ⑥处的数据处理可能会与对线程的最终处理同步进行④——所以这在函数末尾 std::future 析构之前⑦, 无法保证所有线程都已完成
需要注意的是, 在②传递给 std::async Lambda表达式中, 是通过引用的方式对除了i之外的所有内容进行捕获, 而i是通过值捕获的方式进行传递。 这是因为i是这里的循环计数器, 如果通过引用捕获将会导致数据竞争和未定义的行为, 而数据和完成状态是我们需要共享访问的东西。 此外, 在这种情况下, 只需要一个锁存器就够了, 因为线程在数据准备好之后, 还有其他任务要做; 否则, 就需要在处理数据前, 等待所有期望值, 从确保所有任务都已经完成。
process_data中对data的访问是安全的⑥, 即便这个值是其他线程上的任务存储的, 因为锁存器是一个同步对象,所以线程调用cound_down改变计数器的行为是可见的, 从而保证对wait的调用和返回在同一个锁存器对象上为可见。 本质上, 对count_down的调用与对wait的调用同步
锁存器之外, 并发技术扩展规范还为我们提供了用于同步一组线程的可复用的同步对象——栅栏机制。
c++20 新特性
并发技术扩展规范提供了两种栅栏机制, std::experimental::barrier和 std::experimental::flex_barrier 。 前者更简单, 所以开销更低; 后者更灵活, 但是开销较大。
假设有一组线程对某些数据进行处理。 每个线程都在处理独立的任务,因此在处理过程中无需同步,但当所有线程都必须处理下一个数据项前, 完成当前的任务。std::experimental::barrier正是针对这样的情况而设计。 这里可以为同步组, 指定线程的数量, 并为这组线程构造栅栏。 当每个线程完成其处理任务时, 都会到达栅栏处, 并且通过调用栅栏对象的arrive_and_wait成员函数, 等待小组的其他成员线程。 当最后一个线程抵达时, 所有线程将被释放, 并且栅栏会被重置。 组中的线程可以继续接下来的任务, 或是处理下一个数据项, 或是进入下一个处理阶段。
锁存器一旦就绪就会保持状态, 不会有释放等待线程, 重置, 以及复用的过程。 栅栏机制也只能用于一组线程内的同步——除非组中只有一个线程, 否则无法等待栅栏就绪。 可以通过显式调用栅栏对象的arrive_and_drop成员函数让线程退出组, 这样线程就不用再受栅栏的约束, 这样下一个周期到达的线程数就必须要比当前周期到达的线程数少一个了。
result_chunk process(data_chunk);
std::vector
divide_into_chunks(data_block data, unsigned num_threads);void process_data(data_source &source, data_sink &sink) {unsigned const concurrency = std::thread::hardware_concurrency();unsigned const num_threads = (concurrency > 0) ? concurrency : 2;std::experimental::barrier sync(num_threads);std::vector threads(num_threads);std::vector chunks;result_block result;for (unsigned i = 0; i < num_threads; ++i) {threads[i] = joining_thread([&, i] {while (!source.done()) {//6if (!i) {//1 串行data_block current_block = source.get_next_data_block();//获取一个大任务chunks = divide_into_chunks(current_block, num_threads);//分派成小任务至chunks}sync.arrive_and_wait();//2 等待任务分派完毕result.set_chunk(i, num_threads, process(chunks[i]));//3 领取任务并执行sync.arrive_and_wait();//4 等待任务执行完毕if (!i) {//5 串行sink.write_data(std::move(result));}}});}
}// 7
使用栅栏来对一组线程进行同步。 这里的数据来源是source, 并且输出是sink, 不过为了并发运行, 需要将数据划分成num_threads块。 这个操作是串行的, 所以需要在初始化数据块①是进行,并且初始化过程只运行在i为0的线程上。 并行执行任务之前, 所有线程都会在栅栏处等待数据划分完成②, 而后每个线程都会处理属于自己的数据块, 并且再次同步之前④,将结果更新到result中③。然后就会到达下一个需要串行处理域,这里只有0号线程可以将结果输出到sink⑤。 这时,所有线程都会循环等待,直到将source中的任务全部处理完(done)⑥。 当线程进入循环时, 串行部分与循环是连接在一起的; 因为在串行部分, 只有0号线程会执行, 所以也没什么问题, 在第一个栅栏处②, 会将所有线程进行同步。 当所有的处理都结束了, 就意味着所有线程将会退出循环,并等待所有joining_thread对象的外部函数结束时, 对这些对象进行析构⑦(joining_thread在第2章的清单2.7中有过介绍)
需要着重注意的是, arrive_and_wait函数的调用位置。 所有线程就绪前, 确定没有线程在运行是很重要的。 第一个同步点, 所有线程都在等待0号线程到达; 而第二个同步点, 情况刚好相反, 0号线程在等待其他线程都到达之后,才能将完成的结果写入sink中。
std::experimental::flex_barrier , 不过这个类型的栅栏更加的灵活。 灵活之处在于, 栅栏拥有完成阶段, 一旦参
与线程集中的所有线程都到达同步点, 则由参与线程之一执行完成阶段。
std::experimental::flex_barrier 与 std::experimental::barrier 有一点不同: 其有一个额外的构造函数, 需要传递传入一个完整的函数和线程数量。 当所有线程都到达栅栏处, 那么这个函数就由其中一个线程运行。 其不仅指定了一种串行代码块的运行方式, 并且还提供了一种修改需要在下一个周期到达栅栏处线程个数的方式。 对于线程的技术可以修改成任何数字, 无论这个数字比当前数字高或低; 因为这个功能, 开发者就能保证下一次到达栅栏处的线程数量时正确无误的。
void process_data(data_source &source, data_sink &sink) {unsigned const concurrency = std::thread::hardware_concurrency();unsigned const num_threads = (concurrency > 0) ? concurrency : 2;std::vector chunks;auto split_source = [&] {//1if (!source.done()) {data_block current_block = source.get_next_data_block();chunks = divide_into_chunks(current_block, num_threads);}};split_source();//2result_block result;std::experimental::flex_barrier sync(num_threads, [&] {//3 after_waitsink.write_data(std::move(result)); //把需要并行执行的抽离出来split_source();//4return -1;//5});std::vector threads(num_threads);for (unsigned i = 0; i < num_threads; ++i) {threads[i] = joining_thread([&, i] {while (!source.done()) {//6result.set_chunk(i, num_threads, process(chunks[i]));sync.arrive_and_wait();//7}});}
}
这里使用一个Lambda表达式对数据进行拆分①。 这个Lambda表达式会在运行前被调用②, 并封装在迭代开始时的0号线程上运行。
第二个区别在于, sync对象的类型为 std::experimental::flex_barrier , 并且需要将一个完整的函数和线程数量对实例进行构造③。 该函数会在所有线程抵达栅栏处的时候, 运行在0号线程上, 然后由0号线程调用Lambda表达式对数据进行拆分, 当拆分结束后, 下一轮迭代开始④。 返回值-1表示线程数目保持不变, 返回值为0或其他数值则指定的是下一个周期中参与迭代的线程数量。
主循环⑥就简单了: 其只包含了并行部分的代码, 所以只要有一个同步点就够了⑦。 使用 std::experimental::flex_barrier 能够很好的对代码进行简化
使用完整函数作为串行段是一种很强大的功能,因为这能够改变参与并行的线程数量。例如: 流水线类型代码在运行时,当流水线的各级都在进行处理时,线程的数量在初始阶段和执行阶段要少于主线程处理阶段。
同步操作对于使用并发编写应用来说,是很重要的一部分:如果没有同步,线程基本上就是独立的,也可写成单独的应用,因其任务之间的相关性,它们才可作为一个群体直接执行。本章讨论了各式各样的同步操作,有条件变量、期望值、承诺值、打包任务、锁存器和栅栏机制。也讨论了替代同步的解决方案:函数化模式编程,完全独立执行的函数,不会受到外部环境的影响;还有,消息传递模式,以消息子系统为中介,向线程异步的发送消息;以及持续性方式,其指定了操作的后续任务,并由系统负责调度。
已经讨论了很多C++中的高层工具,现在我们来看一下底层工具是如何工作的:C++内存模型和原子操作。
C++标准中,有一个十分重要特性,常被程序员们所忽略。它不是一个新语法特性,也不是新工具,它就是多线程(感知)内存模型。内存模型没有明确的定义基本部件应该如何工作的话,之前介绍的那些工具就无法正常工作。那为什么大多数程序员都没有注意到它呢?当使用互斥量保护数据和条件变量,或者是“期望”上的信号事件时,对于互斥量为什么能起到这样作用,大多数人不会去关心。只有当试图去“接触硬件”,才能详尽的了解到内存模型是如何起作用的。
C++是一个系统级别的编程语言,标准委员会的目标之一就是不需要比C++还要底层的高级语言。C++应该向程序员提供足够的灵活性,无障碍的去做他们想要做的事情:当需要的时候,可以上他们“接触硬件”。原子类型和原子操作就允许他们“接触硬件”,并提供底层级别的同步操作,通常会将常规指令数缩减到1~2个CPU指令。
本章,我们将讨论内存模型的基本知识,再了解一下原子类型和操作,最后了解与原子类型操作相关的各种同步。这个过程会比较复杂:除非已经打算使用原子操作(比如,第7章的无锁数据结构)同步你的代码;否则,就没有必要了解过多的细节。
内存模型:一方面是基本结构,这与内存布局的有关,另一方面就是并发。并发基本结构很重要,特别是低层原子操
作。
一个C++程序中所有数据都是由对象构成。不是说创建一个int的衍生类,或者是基本类型中存在有成员函数,或是像在Smalltalk和Ruby语言那样一"一切都是对象”。对象仅仅是对c++数据构建块的声明。C++标准定义类对象为"存储区域”,但对象还是可以将自己的特性赋予其他对象,比如:相应类型和生命周期。
像int或f1oat这样的对象是基本类型。当然,也有用户定义类的实例。一些对象(比如,数组,衍生类的实例,特殊(具有非静态数据成员)类的实例)拥有子对象,但是其他对象就没有。
无论对象是怎么样的类型,对象都会存储在一个或多个内存位置上。每个内存位置不是标量类型的对象,就是标量类型的子对象,比如,unsigned short、my_class*或序列中的相邻位域。当使用位域时就需要注意:虽然相邻位域中是不同的对象,但仍视其为相同的内存位置。
为了避免条件党争,两个线程就需要一定的执行顶序。第一种方式,如第3章所述,使用互斥量来确定访问的顺序:当同一互斥量在两个线程同时访问前被锁住,那么在同一时间内就只有一个线程能够访问到对应的内存位置,所以后一个访问必须在前一个访问之后。另一种是使用原子操作(详见5.2节中对于原子操作的定义),决定两个线程的访问顺序。使用原子操作来规定顺序在5.3节中会有介绍。当多于两个线程访问同一个内存地址时,对每个访问这都需要定义一个顺序。
当程序对同一内存地址中的数据访问存在竞争,可以使用原子操作来避免未定义行为。当然,这不会影响竞争的产生——原子操作并没有指定访问顺序——但原子操作把程序拉回到定义行为的区域内。
原子操作是个不可分副的操作。系统的所有线程中,不可能观察到原子操作完成了一半;要么就是做了,要么就是没做,只有这两种可能。如果读取对象的加载操作是原子的,那么这个对象的所有修改操作也是原子的,所以加载操作得到的值要么是对象的初始值,要么是某次修改操作存入的值。
另一方面,非原子操作可能会被另一个线程观察到只完成一半。如果这个操作是一个存储操作,那么其他线程看到的值,可能既不是存储前的值,也不是存储的值,而是别的什么值。如果非原子操作是一个加载操作,它可能先取到对象的一部分,然后值被另一个线程修改,然后它再取到剩余的部分,所以它取到的既不是第一个值,也不是第二个值,而是两个值的某种组合。如第3章所述,这就有了竞争风险,但在也就构成了数据竞争(见5.1节),会出现末定义行为。
C++中多数时候,需要原子类型对应得到原子的操作,我们先来看下这些类型。
标准原子类型定义在头文件is_lock_free()成员函数,这个函数可以让用户查询某原子类型的操作是直接用的原子指令返回true, 还是内部用了一个锁结构返回false。
原子操作的关键就是使用一种同步操作方式,来替换使用互斥量的同步方式:如果操作内部使用互斥量实现,那么期望达到的性能提升就是不可能的事情。所以要对原子操作进行实现,最好使用用于获取且基于互斥量的实现来替代。这就是第7章所要讨论的无锁数据结构。
只有 std::atomic_flag 类型不提供is_lock_free()。 该类型是一个简单的布尔标志,并且在这种类型上的操作都是无锁的;当有一个简单无锁的布尔标志时,可以使用该类型实现一个简单的锁,并且可以实现其他基础原子类型。当觉得“真的很简单”时,就说明对 std::atomic_flag 明确初始化后, 做查询和设置(使用test_and_set()成员函数),或清除(使用clear()成员函数)都很容易。 这就是:无赋值,无拷贝,没有测试和清除,没有任何多余操作。
剩下的原子类型都可以通过特化 std::atomic<> 类型模板得到, 并且拥有更多的功能, 但不可能都是无锁的(如之前
解释的那样)。 在主流平台上, 原子变量都是无锁的内置类型(例如 std::atomic
std::atomic<> 类模板不仅仅是一套可特化的类型, 作为一个原发模板也可以使用用户定义类型创建对应的原子变量。 因为, 它是一个通用类模板, 操作被限制为load(), store()(赋值和转换为用户类型), exchange(),compare_exchange_weak()和compare_exchange_strong()
每种函数类型的操作都有一个内存排序参数, 这个参数可以用来指定存储的顺序。 5.3节中, 会对存储顺序选项进行详
述。 现在, 只需要知道操作分为三类:
Store操作, 可选如下顺序: memory_order_relaxed, memory_order_release, memory_order_seq_cst。
Load操作, 可选如下顺序: memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_seq_cst。
Read-modify-write(读-改-写)操作, 可选如下顺序: memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release,memory_order_acq_rel, memory_order_seq_cst。
std::atomic_flag 是最简单的原子类型, 它表示了一个布尔标志。 这个类型的对象可以在两个状态间切换: 设置和清除。 只作为一个构建块存在。 我从未期待这个类型被使用, 除非在十分特别的情况下。 正因如此, 它将作为讨论其他原子类型的起点, 因为它会展示了原子类型所使用的通用策略。
std::atomic_flag 类型的对象必须被ATOMIC_FLAG_INIT初始化。初始化标志位是“清除”状态。这里没得选择,这
个标志总是初始化为“清除”:
std::atomic_flag f = ATOMIC_FLAG_INIT;
当标志对象已初始化, 那么只能做三件事情: 销毁,清除或设置(查询之前的值)。 这些操作对应的函数分别是:clear()成员函数和test_and_set()成员函数。clear()和test_and_set()成员函数可以指定好内存顺序。clear()是一个存储操作, 所以不能有memory_order_acquire或memory_order_acq_rel语义, 但是test_and_set()是一个“读-改-写”操作, 可以应用于任何内存顺序。每一个原子操作,默认的内存序都是memory_order_seq_cst。例如:
f.clear(std::memory_order_release); // 1
bool x=f.test_and_set(); // 2
不能拷贝构造另一个 std::atomic_flag 对象; 并且,不能将一个对象赋予另一个 std::atomic_flag 对象。 这不是 std::atomic_flag 特有的,而是所有原子类型共有的。一个原子类型的所有操作都是原子的,因赋值和拷贝调用了两个对象,这就就破坏了操作的原子性。这样的话,拷贝构造和拷贝赋值都会将第一个对象的值进行读取,然后再写入另外一个。 对于两个独立的对象,这里就有两个独立的操作了,合并这两个操作必定是不原子的。 因此, 操作就不被允许。
有限的特性使得 std::atomic_flag 非常适合于作自旋互斥锁。 初始化标志是“清除”, 并且互斥量处于解锁状态。 为了锁上互斥量, 循环运行test_and_set()直到旧值为false, 就意味着这个线程已经被设置为true了。 解锁互斥量是一件很简单的事情, 将标志清除即可。 实现如下面的程序清单所示:
#include
class spinlock_mutex
{std::atomic_flag flag;
public:spinlock_mutex():flag(ATOMIC_FLAG_INIT){}void lock(){// Sets the atomic_flag and returns whether it was already set immediately before the call.while(flag.test_and_set(std::memory_order_acquire));}void unlock(){flag.clear(std::memory_order_release);}
};
这样的互斥量是最基本的,但它已经足够 std::lock_guard<> 使用了(详见第3章)。其本质就是在lock()中等待,所以几乎不可能有竞争的存在,并且可以确保互斥。 当看到内存序语义时,会了解到它们是如何对一个互斥锁保证必要的强制顺序的。这个例子将在5.3.6节中展示。 std::atomic_flag 局限性太强,没有非修改查询操作,甚至不能像普通的布尔标志那样使用。
最基本的原子整型类型就是std::atomic
std::atomic b(true);
b=false;
另外,非原子bool类型的赋值操作不同于通常的操作(转换成对应类型的引用,再赋给对应的对象):它返回一个bool值来代替指定对象。原子类型中的另一种模式:赋值操作通过返回值(返回相关的非原子类型)完成,而非返回用。如果原子变量的引用被返回了,任何依赖与这个赋值结果的代码都需要显式加载这个值。潜在的问题是,结果可能会被其他线程修改。通过返回非原子值进行赋值的方式,可以避免多余的加载过程,并且得到就是实际存储的值。
compare_exchange_weak && compare_exchange_strong
而compare_exchange_weak和compare_exchange_strong则是著名的CAS(compare and set)。参数会要求在这里传入期待的数值和新的数值。它们对比变量的值和期待的值是否一致,如果是,则替换为用户指定的一个新的数值。如果不是,则将变量的值和期待的值交换。
compare_exchange_strong:atomic库中的一个函数,入参是3个,expect,desire,memoryorder,意思是如果当前的变
量this的值==expect值,则将this值改为desire,并返回true,否则,返回false,不进行修改,即进行一个读的操作。通常用于例如线程B等待线程A执行完毕,或者执行到某个步骤。此时线程B可以进行while等待,线程A在执行到对应步骤,将对应的原子变量置为expect值即可。类似于“接力运动”。这里由于会进行读写操作,所以,memory order一般是acq rel,而A线程由于要保证都执行完毕,执行顺序没有关系,所以一般是Release的memory order。
class Foo{};
Foo some_array[5];
std::atomic p(some_array);
Foo* x=p.fetch_add(2); // p加2, 并返回原始值
assert(x==some_array);
assert(p.load()==&some_array[2]);
x=(p-=1); // p减1, 并返回原始值
assert(x==&some_array[1]);
assert(p.load()==&some_array[1]);
fetch_add()和fetch_sub()都是“读-改-写”操作
假设两个线程,一个向数据结构中填充数据,另一个读取数据结构中的数据。为了避免恶性条件竞争,第一个线程设置一个标志,用来表明数据已经准备就绪,并且第二个线程在这个标志设置前不能读取数据。下面的程序清单就是这样的情况:
#include
#include
#include
#include
#include std::vector data;
std::atomic_bool data_ready(false);void reader_thread()
{while(!data_ready.load())//1{std::this_thread::sleep_for(std::chrono::milliseconds(1));}std::cout<<"The answer="<data.push_back(42);//3data_ready=true; //4
}
先把等待数据的循环①放在一边(你需要这个循环,否则无法在线程间进行数据共享:每一个数据项都必须是原子的)。当非原子读②和写③对同一数据结构进行无序访问时,将会导致未定义行为的发生,因此这个循环就是确保访问循序被严格的遵守的。
强制访问顺序是由对 std::atomic
https://blog.csdn.net/thalo1204/article/details/84948441
六个内存序列选项可应用于对原子类型的操作 memory_order_
relaxed, consume, acquire, release, acq_rel, seq_cst
代表三种内存模型:
排序一致序列(sequentially consistent)
获取-释放序列(consume acquire release acq_rel) (非全局队列,线程队列,可以保证同一线程内的顺序)
松散序列(relaxed)
排序一致队列 (队列)
默认序列命名为排序一致,因为程序中的行为从任意角度去看,序列顺序都保持一致。如果原子类型实例上的所有操作都是序列一致的,那么一个多线程程序的行为,就会以某种特殊的排序执行,如单线程那样。这是目前来看,最容易理解的内存序列,也是将其设置为默认的原因:所有线程都必须了解,不同的操作也要遵守相同的顺序。因为其简单的行为,可以使用原子变量进行编写。通过不同的线程,可以写出所有序列上可能的操作,就可以消除那些不一致,以及验证代码的行为是否与预期相符。也就意味着,所有操作都不能重排;如果代码在一个线程中,将一个操作放在另一个操作前面,那么这个顺序就必须让其他线程有所了解。
非排序一致内存模型
当踏出序列一致的世界,事情就开始变的复杂。可能最需要处理的问题就是:再也不会有全局的序列了。这就意味着不同线程看到相同操作,不一定有着相同的顺序,还有对于不同线程的操作,都会一个接着另一个执行的想法不在可行。不仅是有没有考虑事情真的同时发生的问题,还有就是线程没办法保证一致性。为了写出(或仅是了解)任何一段使用非默认内存序列的代码,这不仅仅是编译器可以重新排列指令的问题。即使线程运行相同的代码,它们都能拒绝遵循事件发生的顶序,因为操作在其他线程上没有明确的顺序限制:而不同的CPU缓存和内部缓冲区,在同样的存储空间中可以存储不同的值。这非常重要,这里我再重申一遍:线程没办法保证一致性。
原子类型上的操作以松散序列执行,没有任何同步关系。同一线程中对于同一变量的操作还是服从先发执行的关系,但是不同线程几乎不需要相对的顺序。唯一的要求是在访问同一线程中的单个原子变量不能重排序,当给定线程看到原子变量的特定值时,随后线程的读操作就不会去检索变量较早的那个值。当使用nemo ry_.order._relaxed,就不需要任何额外的同步,对于每个变量的修改顺序只是线程间共享的事情。
#include
#include
#include std::atomic x,y;
std::atomic z;void write_x_then_y()
{x.store(true,std::memory_order_relaxed);y.store(true,std::memory_order_relaxed);
}void read_y_then_x()
{while(!y.load(std::memory_order_relaxed));if(x.load(std::memory_order_relaxed))++z;
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();assert(z.load()!=0);//断言会触发
}
这次assert⑤可能会触发,因为加载x的操作④可能读取到false,即使加载y的操作③读取到true,并且存储x的操作①先发与存储y的操作②。×和y是两个不同的变量,所以这里没有顺序去保证每个操作产生相关值的可见性。
非限制操作对于不同变量可以自由重排序,只要它们服从任意的先发执行关系即可(比如,在同一线程中),它们不会引入同步相关的顺序。清单5.5中的先发执行关系如图5.4所示(只是其中一个可能的结果)。尽管,在不同的存储/加载操作间有着先发执行关系,这里不是在一对存储于载入之间了,所以载入操作可以看到”违反”顺序的存储操作。
非限制操作对于不同变量可以自由重排序,只要它们服从任意的先发执行关系即可(比如,在同一线程中),它们不会引入同步相关的顺序。清单5.5中的先发执行关系如图5.4所示(只是其中一个可能的结果)。尽管,在不同的存储/加载操作间有着先发执行关系,这里不是在一对存储于载入之间了,所以载入操作可以看到“违反”"顶序的存储操作。
#include
#include
#include std::atomic x(0),y(0),z(0);
std::atomic go(false);
unsigned const loop_count=10;struct read_values
{int x,y,z;
};
read_values values1[loop_count];
read_values values2[loop_count];
read_values values3[loop_count];
read_values values4[loop_count];
read_values values5[loop_count];
void increment(std::atomic* var_to_inc,read_values* values)
{while(!go)std::this_thread::yield();//3for(unsigned i=0;ivalues[i].x=x.load(std::memory_order_relaxed);values[i].y=y.load(std::memory_order_relaxed);values[i].z=z.load(std::memory_order_relaxed);var_to_inc->store(i+1,std::memory_order_relaxed);//4std::this_thread::yield();}
}void read_vals(read_values* values)
{while(!go)std::this_thread::yield();//5 自旋,等待for(unsigned i=0;ivalues[i].x=x.load(std::memory_order_relaxed);values[i].y=y.load(std::memory_order_relaxed);values[i].z=z.load(std::memory_order_relaxed);std::this_thread::yield();}
}void print(read_values* v)
{for(unsigned i=0;iif(i)std::cout<<",";std::cout<<"("<std::thread t1(increment,&x,values1);std::thread t2(increment,&y,values2);std::thread t3(increment,&z,values3);std::thread t4(read_vals,values4);std::thread t5(read_vals,values5);go=true;//6t5.join();t4.join();t3.join();t2.join();t1.join();print(values1);//7print(values2);print(values3);print(values4);print(values5);
}
第一组值中x增1,第二组值中y增1,并且第三组中z增1。
x元素只在给定集中增加,y和z也一样,但是增加是不均匀的,并且相对顺序在所有线程中都不同。
线程3看不到x或y的任何更新;它能看到的只有z的更新。这并不妨碍别的线程观察z的更新,并同时观察x和y的更新。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F1rqlIFq-1669131297389)(assets/image-20221107092922924.png)]
要想获取额外的同步,且不使用全局排序一致,可以使用获取-释放序列(acquire-release ordering)。
这个序列是松散序列(relaxed ordering)的加强版;虽然操作依旧没有统一的顺序,但是在这个序列引入了同步。这种序列模型中,原子加载就是获取(acquire)操作(memory._order._acquire),原子存储就是释放(memory._order._release)操作,原子读-改-写操作(例如fetch_add()或exchange())在这里,不是"获取”,就是"释放”,或者两者兼有的操作(memory._order._acq_rel)。这里,同步在线程释放和获取间是成对的(pairwise)。释放操作与获取操作同步,这样就能读取已写入的值。这意味着不同线程看到的序列虽不同,但这些序列都是受限的。下面列表5.8中是使用获取-释放序列(而非序列一致方式),对清单5.4的一次重写。
std::atomic x,y;
std::atomic z;void write_x_then_y()
{x.store(true,std::memory_order_relaxed);//1y.store(true,std::memory_order_release);//2
}void read_y_then_x()
{while(!y.load(std::memory_order_acquire));//3 自选,等待y被设置位trueif(x.load(std::memory_order_relaxed)) //4 ++z;
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();assert(z.load()!=0);// 断言不会被触发
}
最后,读取y③时会得到true,和存储时写入的一样②。因为存储使用的是memory_order_release,读取使用的是memory_order_acquire,存储就与读取就同步了。因为这两个操作是由同一个线程完成的,所以存储x①先行于加载y②。对y的存储同步与对y的加载,存储x也就先行于对y的加载,并且扩展先行于×的读取。因此,加载x的值必为true,并且断言⑤不会触发。如果对于y的加载不是在while循环中,情况可能就会有所不同;加载y的时候可能会读取到f1se,这种情况下对于读取到的x是什么值,就没有要求了。为了保证同步,加载和释放操作必须成对。所以,无论有何影响,释放操作存储的值必须要让获取操作看到。当存储如②或加载如③,都是一个释放操作时,对x的访问就无序了,也就无法保证④处读到的是true,并且还会触发断言。
如何理解模型中获取-释放的语义?让我们看一下例子。首先,线程a运行write_x_then_y函数,然后告诉在x屋的记录员,“请写下true作为组1的一部分,信息来源于线程a”,之后记录员工整的写下了这些信息。而后,线程a告诉在y屋的记录员,“请写下true作为组1的一部分,信息来源于线程a”。在此期间,线程b运行read_y_then_x。线程b持续向y屋的记录员询问值与组的信息,直到它听到记录员说”true”。记录员可能需要告诉他很多遍,不过最终记录员还是说了"true”。y屋的记录员不仅仅是说”true”,他还要说"组1最后是由线程a写入”。
现在,线程b会持续询问x屋的记录员,但这次他会说"请给我一个值,我知道这个值是组1的值,并且是由线程写入的"。所以现在,x屋中的记录员就开始查找组1中由线程a写入的值。这里他注意到,他写入的值是true,同样也是他列表中的最后一个值,所以它必须读出这个值;否则,他将打破这个游戏的规则。
与同步传递相关的获取-释放序列
#include
#include
#include std::atomic data[5];
std::atomic sync1(false),sync2(false);void thread_1()
{data[0].store(42, std::memory_order_relaxed);data[1].store(97, std::memory_order_relaxed);data[2].store(17, std::memory_order_relaxed);data[3].store(-141,std::memory_order_relaxed);data[4].store(2003,std::memory_order_relaxed);sync1.store(true,std::memory_order_release); //线程内有序
}void thread_2()
{while(!sync1.load(std::memory_order_acquire)); //sync1设置为true后,data内的数据肯定已经被设置过sync2.store(std::memory_order_release);
}void thread_3()
{while(!sync2.load(std::memory_order_acquire));assert(data[0].load(std::memory_order_relaxed)==42);//断言通过assert(data[1].load(std::memory_order_relaxed)==97);assert(data[2].load(std::memory_order_relaxed)==17);assert(data[3].load(std::memory_order_relaxed)==-141);assert(data[4].load(std::memory_order_relaxed)==2003);
}int main()
{std::thread t1(thread_1);std::thread t2(thread_2);std::thread t3(thread_3);t1.join();t2.join();t3.join();
}
std::atomic sync(0);
void thread_1()
{// ...sync.store(1,std::memory_order_release);
}void thread_2()
{int expected=1;//如果和期望值一样,sync设置为2while(!sync.compare_exchange_strong(expected,2, std::memory_order_acq_rel))expected=1;
}void thread_3()
{while(sync.load(std::memory_order_acquire)<2);// ...
}
如果使用“读-改-写”操作,选择语义就很重要了。这个例子中,想要同时进行获取和释放的语义,所以memory_order_acq_rel是一个不错的选择,但也可以使用其他序列。即使存储了一个值,使用memory_order_acquire语义的fetch_sub不会和任何东西同步的,因为没有释放操作。同样,使用memory_order_release语义的fetch_or也不会和任何存储操作进行同步,因为对于fetch_or的读取,并不是一个获取操作。使用memory_order_acq_rel语义的“读-改-写”操作,每一个动作都包含获取和释放操作,所以可以和之前的存储操作进行同步,并且可以对随后的加载操作进行同步。
如果将“获取-释放”操作和“序列一致”操作进行混合,“序列一致”的加载动作,就像使用了获取语义的加载操作;并且序列一致的存储操作,就如使用了释放语义的存储。“序列一致”的读-改-写操作行为,就像同时使用了获取和释放的操作。“松散操作”依旧那么松散,但其会和额外的同步进行绑定(也就是使用”获取-释放”的语义)。
介绍本章节的时候,说过memory_order_consume是“获取-释放”序列模型的一部分,但是在前面并没有对其进行过多的讨论。因为memory_order_consume很特别:它完全依赖于数据,并且其展示了与线程间先行关系(可见5.3.2节)的不同之处。这个内存序非常特殊,即使在C++17中也不推荐你使用它。这里我们只为了完整的覆盖内存序列而讨论, memory_order_consume不应该出现在你的代码中。
#include
#include
#include
#include
struct X
{int i;std::string s;
};std::atomic p;
std::atomic a;void create_x()
{X* x=new X;x->i=42;x->s="hello";a.store(99,std::memory_order_relaxed); //1p.store(x,std::memory_order_release); //2
}void use_x()
{X* x;while(!(x=p.load(std::memory_order_consume))) //3std::this_thread::sleep_for(std::chrono::microseconds(1));assert(x->i==42); //4assert(x->s=="hello"); //5assert(a.load(std::memory_order_relaxed)==99);//6
}
int main()
{std::thread t1(create_x);std::thread t2(use_x);t1.join();t2.join();
}
尽管,对a的存储①在存储p②之前,并且存储p的操作标记为memory_order_release,加载p③的操作标记为memory_order_consume,这意味着存储p仅先行那些需要加载p的操作。同样,也意味着X结构体中数据成员所在的断言语句④⑤不会被触发,因为对x变量操作的表达式对加载p的操作携带有依赖。另一方面,对于加载变量a⑥的断言就不能确定是否会被触发;这个操作并不依赖于p的加载操作,所以这里没法保证数据已经被读取。当然,这个情况也很明显,因为这个操作被标记为memory_order_relaxed。
通过其他线程,即使有(有序的)多个“读-改-写”操作(所有操作都已经做了适当的标记)在存储和加载操作之间, 依旧可以获取原子变量存储与加载的同步关系。现在,已经讨论所有可能使用到的内存序列“标签”,在这里可以做一个简单的概述。当存储操作被标记为memory_order_release,memory_order_acq_rel或memory_order_seq_cst,加载被标记为memory_order_consum,memory_order_acquire或memory_order_seq_cst,并且操作链上的每一加载操作都会读取之前操作写入的值,因此链上的操作构成了一个释放序列(release sequence), 并且初始化存储同步(对应memory_order_acquire或memory_order_seq_cst)或是前序依赖(对应memory_order_consume)的最终加载。操作链上的任何原子“读-改-写”操作可以拥有任意个存储序列(甚至是memory_order_relaxed)。
不使用锁,使用原子变量的内存访问顺序做同步
#include
#include
#include
std::vector queue_data;
std::atomic count;void wait_for_more_items() {}
void process(int data){}void populate_queue()
{unsigned const number_of_items=20;queue_data.clear();for(unsigned i=0;iqueue_data.push_back(i);}count.store(number_of_items,std::memory_order_release);//1 初始化存储
}void consume_queue_items()
{while(true){int item_index;if((item_index=count.fetch_sub(1,std::memory_order_acquire))<=0) //2 一个“读-改-写”操作{wait_for_more_items();//3 等待更多元素continue;}process(queue_data[item_index-1]);//4 安全读取queue_data}
}int main()
{std::thread a(populate_queue);std::thread b(consume_queue_items);std::thread c(consume_queue_items);a.join();b.join();c.join();
}
一种处理方式是让线程产生数据,并存储到一个共享缓存中,而后调用count.store(number_of_items, memory_order_release) ①让其他线程知道数据是可用的。线程群会消耗队列中的元素,之后可能调用count_fetch_sub(1,memory_order_acquire)②向队列索取一个元素。在这之前,需要对共享缓存进行完整的读取⑨。一旦cout归零,那么队列中就没有元素了,当没有元素耗线程必须等待③。
当只有一个消费者线程时还好,fetch_sub()是一个带有memory_order_acquire的读取操作,并且存储操作是带有memory_order_release语义,所以存储与加载同步,线程可以从缓存中读取元素。当有两个读取线程时,第二个fetch_sub()操作将看到被第一个线程修改的值,且没有值通过store写入其中。先不管释放序列的规则,第二个线程与第一个线程不存在先行关系,并且对共享缓存中值的读取也不安全,除非第一个fetch_sub()是带有memory_order_release语义的,这个语义为两个消费者线程建立了不必要的同步。无论是释放序列的规则,还是带有memory_order_release语义的fetch_sub操作,第二个消费者看到的是一个空的queue_data,无法从其获取任何数据,并且还会产生条件竞争。幸运的是,第一个fetch_sub()对释放顺序做了一些事情,所以store()能同步与第二个fetch_sub()操作。 两个消费者线程间不需要同步关系。这个过程在图5.7中展示,其中虚线表示的就是释放顺序,实线表示的是先行关系。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xI192WWH-1669131297389)(assets/image-20221107134757997.png)]
虽然,大多数同步关系,是对原子变量的操作应用了内存序列,但这里依旧有必要额外介绍一个对排序的约束——栅栏
(fences)。
如果原子操作库缺少了栅栏,那么这个库就不完整。栅栏操作会对内存序列进行约束,使其无法对任何数据进行修改,典型的做法是与使用memory_order_relaxed约束序的原子操作一起使用。栅栏属于全局操作,执行栅栏操作可以影响到在线程中的其他原子操作。因为这类操作就像画了一条任何代码都无法跨越的线一样,所以栅栏操作通常也被称为内存栅栏(memory barriers)。回忆一下5.3.3节,自由操作可以使用编译器或者硬件的方式,在独立的变量上自由的进行重新排序。 不过,栅栏操作就会限制这种自由,并且会介绍之前没有介绍到的“先行”和“同步”关系。
栅栏操作可以让自由变量变得有序
#include
#include
#include std::atomic x,y;
std::atomic z;void write_x_then_y()
{x.store(true,std::memory_order_relaxed);//1std::atomic_thread_fence(std::memory_order_release);//2被栅栏分开的12操作,是有序的y.store(true,std::memory_order_relaxed);//3
}void read_y_then_x()
{while(!y.load(std::memory_order_relaxed));//4std::atomic_thread_fence(std::memory_order_acquire);//5if(x.load(std::memory_order_relaxed))//6++z;
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();assert(z.load()!=0);
}
释放栅栏②与获取栅栏⑤同步, 这是因为加载y的操作④读取③处存储的值。 所以, ①处存储x先行于⑥处加载x, 最后x读取出来必为true, 并且断言不会被触发⑦。 原先不带栅栏的存储和加载x是无序的, 并且断言是可能会触发。 这两个栅栏都是必要的: 需要在一个线程中进行释放, 然后在另一个线程中进行获取, 这样才能构建出同步关系。
这个例子中,如果存储y的操作③标记为memory_order_release,而非memory_order_relaxed,释放栅栏②也会对这个操作产生影响。同样,当加载y的操作④标记为memory_order_acquire时,获取栅栏⑤也会对之产生影响。 使用栅栏的想法是:当获取操作能看到释放栅栏操作后的存储结果,那么这个栅栏就与获取操作同步;并且,当加载操作在获取栅栏操作前,看到一个释放操作的结果,那么这个释放操作同步于获取栅栏。当然,也可以使用双边栅栏操作,举一个简单的例子:当一个加载操作在获取栅栏前,看到一个值有存储操作写入,且这个存储操作发生在释放栅栏后,那么释放栅栏与获取栅栏同步。
虽然,栅栏同步依赖于读取/写入的操作发生于栅栏之前/后,但是这里有一点很重要:同步点,就是栅栏本身。当执
行清单5.12中的write_x_then_y, 并且在栅栏操作之后对x进行写入,就像下面的代码一样。触发断言的条件就不保证一定为true了,尽管写入x的操作在写入y的操作之前发生。
void write_x_then_y()
{std::atomic_thread_fence(std::memory_order_release);x.store(true,std::memory_order_relaxed);y.store(true,std::memory_order_relaxed);
}
这里的两个操作就不会被栅栏分开,并且也不再有序。只有当栅栏出现在存储x和存储y操作之间时,这个顺序才是硬
性的。当然,栅栏是否存在不会影响任何拥有先行关系的执行序列,这种情况是因为一些其他原子操作。
#include
#include
#include bool x=false;
std::atomic y;
std::atomic z;void write_x_then_y()
{x=true; //1 在栅栏前存储xstd::atomic_thread_fence(std::memory_order_release);y.store(true,std::memory_order_relaxed);//2 在栅栏后存储y
}void read_y_then_x()
{while(!y.load(std::memory_order_relaxed));//3 在#2写入前,持续等待std::atomic_thread_fence(std::memory_order_acquire);if(x) //4 这里读取到的值,是#1中写入++z;
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();assert(z.load()!=0);
}
栅栏仍然为存储x①和存储y②,还有加载y③和加载x④提供一个执行序列,并且这里仍然有一个先行关系,在存储x和加载x之间,所以断言⑤不会被触发。②中的存储和③中对y的加载,都必须是原子操作;否则,将会在y上产生条件竞争,不过一旦读取线程看到存储到y的操作,栅栏将会对x执行有序的操作。这个执行顺序意味着,即使它被另外的线程修改或被其他线程读取,×上也不存在条件竞争。
#include
#include
#include
#include struct empty_stack: std::exception
{const char* what() const throw(){return "empty stack";}
};template
class threadsafe_stack
{
private:std::stack data;mutable std::mutex m;
public:threadsafe_stack(){}threadsafe_stack(const threadsafe_stack& other){std::lock_guard lock(other.m);data=other.data;}threadsafe_stack& operator=(const threadsafe_stack&) = delete;void push(T new_value){std::lock_guard lock(m);data.push(std::move(new_value));}std::shared_ptr pop(){std::lock_guard lock(m);if(data.empty()) throw empty_stack();std::shared_ptr const res(std::make_shared(std::move(data.top())));data.pop();return res;}void pop(T& value){std::lock_guard lock(m);if(data.empty()) throw empty_stack();value=std::move(data.top());data.pop();}bool empty() const{std::lock_guard lock(m);return data.empty();}
};
序列化线程会隐性的限制程序性能,这就是栈争议声最大的地方:当一个线程在等待锁时,就会无所事事。对于栈来说, 等待添加元素也是没有意义的,所以当线程需要等待时,会定期检查empty()或pop(),以及对empty_stack异常进行关注。 这样的现实会限制栈的实现方式,线程等待时会浪费宝贵的资源去检查数据,或要求用户编写外部等待和提示的代码(例如: 使用条件变量),这就使内部锁失去存在的意义——也就造成资源的浪费。
#include
#include
#include
#include template
class threadsafe_queue
{
private:mutable std::mutex mut;std::queue data_queue;std::condition_variable data_cond;
public:threadsafe_queue(){}void push(T new_value){std::lock_guard lk(mut);data_queue.push(std::move(new_value));data_cond.notify_one(); //1}void wait_and_pop(T& value) //2{std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});value=std::move(data_queue.front());data_queue.pop();}std::shared_ptr wait_and_pop() //3{std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});//4std::shared_ptr res(std::make_shared(std::move(data_queue.front())));data_queue.pop();return res;}bool try_pop(T& value){std::lock_guard lk(mut);if(data_queue.empty())return false;value=std::move(data_queue.front());data_queue.pop();}std::shared_ptr try_pop(){std::lock_guard lk(mut);if(data_queue.empty())return std::shared_ptr();//5std::shared_ptr res(std::make_shared(std::move(data_queue.front())));data_queue.pop();return res;}bool empty() const{std::lock_guard lk(mut);return data_queue.empty();}
};int main()
{threadsafe_queue rq;
}
#include
#include
#include
#include template
class threadsafe_queue
{
private:mutable std::mutex mut;std::queue > data_queue;std::condition_variable data_cond;
public:threadsafe_queue(){}void push(T new_value){std::shared_ptr data(std::make_shared(std::move(new_value)));std::lock_guard lk(mut);data_queue.push(data);data_cond.notify_one();}void wait_and_pop(T& value){std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});value=std::move(*data_queue.front());data_queue.pop();}std::shared_ptr wait_and_pop(){std::unique_lock lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});std::shared_ptr res=data_queue.front();data_queue.pop();return res;}bool try_pop(T& value){std::lock_guard lk(mut);if(data_queue.empty())return false;value=std::move(*data_queue.front());data_queue.pop();}std::shared_ptr try_pop(){std::lock_guard lk(mut);if(data_queue.empty())return std::shared_ptr();std::shared_ptr res=data_queue.front();data_queue.pop();return res;}bool empty() const{std::lock_guard lk(mut);return data_queue.empty();}
};
std:shared_ptr<>持有数据的好处:新实例分配结束时,不会被锁在push()⑤当中(而在清单6.2中,只能在pop()持有锁时完成)。因为内存分配需要在性能上付出很高的代价(性能较低),所以使用std:shared_ptr<>对队列的性能有很大的提升,其减少了互斥量持有的时间,允许其他线程在分配内存的同时,对队列进行其他的操作。
极致的性能,push pop 的都是指针。将分配内存的操作放在了锁之外。
#include template
class queue
{
private:struct node{T data;std::unique_ptr next;//析构的时候不用手动的delete了node(T data_):data(std::move(data_)){}};std::unique_ptr head;//1node* tail; //2public:queue():tail(nullptr){}queue(const queue& other)=delete;queue& operator=(const queue& other)=delete;std::shared_ptr try_pop(){if(!head){return std::shared_ptr();}std::shared_ptr const res(std::make_shared(std::move(head->data)));std::unique_ptr const old_head=std::move(head);head=std::move(old_head->next);//3if(!head)tail=nullptr;return res;}void push(T new_value){std::unique_ptr p(new node(std::move(new_value)));node* const new_tail=p.get();if(tail){tail->next=std::move(p);//4}else{head=std::move(p);//5}tail=new_tail;//6}
};
std::unique_ptr
虽然,这种实现对于单线程来说没什么问题,但当在多线程下尝试使用细粒度锁时,就会出现问题。因为在给定的实现中有两个数据项(head①和tail②);即使,使用两个互斥量来保护头指针和尾指针,也会出现问题。
最明显的问题就是push()可以同时修改头指针⑤和尾指针⑥,所以push()函数会同时获取两个互斥量。虽然会将两个互斥量都上锁,但这问题还不算太糟糕。糟糕的是push()和pop()都能访问next指针指向的节点:push()可更新tail->next④,随后try_pop()读取read->next③。 当队列中只有一个元素时, head==tail, 所以head->next和tail->next是同一个对象, 并且这个对象需要保护。不过,“在同一个对象在未被head和tail同时访问时,push()和try_pop()锁住的是同一个锁”就不对了。
通过分离数据实现并发
可以使用“预分配一个虚拟节点(无数据),确保这个节点永远在队列的最后,用来分离头尾指针能访问的节点”的办法,走出这个困境。对于一个空队列来说,head和tail都属于虚拟指针,而非空指针。这个办法挺好,因为当队列为空时,try_pop()不能访问head->next了。当添加一个节点入队列时(这时有真实节点了),head和tail现在指向不同的节点,所以就不会在head->next和tail->next上产生竞争。这里的缺点是,必须额外添加一个间接层次的指针数据,来做虚拟节点。下面的代码描述了这个方案如何实现。
#include
template
class queue
{
private:struct node{std::shared_ptr data;std::unique_ptr next;};std::unique_ptr head;node* tail;public:queue():head(new node),tail(head.get()){}queue(const queue& other)=delete;queue& operator=(const queue& other)=delete;std::shared_ptr try_pop(){if(head.get()==tail){return std::shared_ptr();}std::shared_ptr const res(head->data);std::unique_ptr const old_head=std::move(head);head=std::move(old_head->next);return res;}void push(T new_value){std::shared_ptr new_data(std::make_shared(std::move(new_value)));std::unique_ptr p(new node);tail->data=new_data;node* const new_tail=p.get();tail->next=std::move(p);tail=new_tail;}
};
现在的push()只能访问tail,而不能访问head,这就是一个可以访问head和tail的try_pop(),但是tail只需在最初进行比较, 所以所存在的时间很短。重大的提升在于虚拟节点意味着try_pop()和push()不能对同一节点进行操作,所以就不再需要互斥了。
为了最大程度的并发化,所以需要上锁的时间尽可能的少。push()很简单:互斥量需要对tail的访问上锁,就需要对每一个新分配的节点进行上锁⑧,还有对当前尾节点进行赋值的时候⑨也需要上锁。锁需要持续到函数结束时才能解开。
try_pop()就不简单了。首先,需要使用互斥量锁住head,一直到head弹出。实际上,互斥量决定了哪一个线程进行弹出操作。一旦head被改变⑤,才能解锁互斥量;当在返回结果时,互斥量就不需要进行上锁了⑥,这使得访问tail需要一个尾互斥量。因为,只需要访问tail一次,且只有在访问时才需要互斥量。这个操作最好是通过函数进行包装。事实上,因为代码只有在成员需要head时,互斥量才上锁,这项也需要包含在包装函数中。
#include
#include template
class threadsafe_queue
{
private:struct node{std::shared_ptr data;std::unique_ptr next;};std::mutex head_mutex;std::unique_ptr head;std::mutex tail_mutex;node* tail;node* get_tail(){std::lock_guard tail_lock(tail_mutex);return tail;}std::unique_ptr pop_head(){std::lock_guard head_lock(head_mutex);if(head.get()==get_tail()){return nullptr;}std::unique_ptr const old_head=std::move(head);head=std::move(old_head->next);return old_head;}public:threadsafe_queue():head(new node),tail(head.get()){}threadsafe_queue(const threadsafe_queue& other)=delete;threadsafe_queue& operator=(const threadsafe_queue& other)=delete;std::shared_ptr try_pop(){std::unique_ptr old_head=pop_head();return old_head?old_head->data:std::shared_ptr();}void push(T new_value){std::shared_ptr new_data(std::make_shared(std::move(new_value)));std::unique_ptr p(new node);node* const new_tail=p.get();std::lock_guard tail_lock(tail_mutex);tail->data=new_data;tail->next=std::move(p);tail=new_tail;}
};
观察不变量前,需要确定的状态有:
tail->next == nullptr
tail->data == nullptr
head == taill(意味着空列表)
单元素列表 head->next = tail
列表中的每一个节点x, x!=tail且x->data指向一个T类型的实例, 并且x->next指向列表中下一个节点。 x->next == tail意味着x就是列表中最后一个节点
顺着head的next节点找下去, 最终会找到tail
push()很简单:仅修改了被tail_mutex的数据,因为新的尾节点是一个空节点,并且其data和next都为旧的尾节点(实际上的尾节点)设置好,所以其能维持不变量的状态。
有趣的部分在于try_pop()上,不仅需要对tail_mutex上锁来保护对tail的读取;还要保证在从头读取数据时,不会产生数据竞争。如果没有这些互斥量,当一个线程调用try_pop()的同时,另一个线程调用push(),这里操作顺序将不可预测。尽管,每一个成员函数都持有一个互斥量,这些互斥量保护的数据不会同时被多个线程访问到;并且,队列中的所有数据来源,都是通过调用push()得到。线程可能会无序的访问同一数据地址,就会有数据竞争(正如你在第5章看到的那样),以及未定义行为。幸运的是,get_tail()中的tail_mutex解决了所有的问题。因为调用get_tail()将会锁住同名锁,就像push()一样,这就为两个操作规定好了顺序。要不就是get_tail()在push()之前被调用,线程可以看到旧的尾节点,要不就是在push()之后完成,线程就能看到tail的新值,以及真正tail的值,并且新值会附加到之前的tail值上。
这个结构对并发访问的考虑要多于清单6.2中的代码,因为锁粒度更加的小,并且更多的数据不在锁的保护范围内。比如,push()中新节点和新数据的分配都不需要锁来保护。多线程情况下,节点及数据的分配是“安全”并发的。同一时间内,只有一个线程可以将它的节点和数据添加到队列中,所以代码中只是简单使用了指针赋值的形式,相较于基于 std::queue<> 的实现,这个结构中就不需要对于 std::queue<> 的内部操作进行上锁这一步。
锁的力度更小:没有锁住整个数据结构,临界区只是操作指针,分配内存并没有临界区
#include
#include
#include template
class threadsafe_queue
{
private:struct node{std::shared_ptr data;std::unique_ptr next;};std::mutex head_mutex;std::unique_ptr head;std::mutex tail_mutex;node* tail;std::condition_variable data_cond;
public:threadsafe_queue():head(new node),tail(head.get()){}threadsafe_queue(const threadsafe_queue& other)=delete;threadsafe_queue& operator=(const threadsafe_queue& other)=delete;std::shared_ptr try_pop();bool try_pop(T& value);std::shared_ptr wait_and_pop();void wait_and_pop(T& value);void push(T new_value);bool empty();
};
实现
template
void threadsafe_queue::push(T new_value)
{std::shared_ptr new_data(std::make_shared(std::move(new_value)));std::unique_ptr p(new node);{std::lock_guard tail_lock(tail_mutex);tail->data=new_data;node* const new_tail=p.get();tail->next=std::move(p);tail=new_tail;}data_cond.notify_one();
}#include
#include
#include
#include "node.hpp"
template
class threadsafe_queue
{
private://std::atomic tail;//std::atomic head;node* get_tail(){std::lock_guard tail_lock(tail_mutex);return tail;}std::unique_ptr pop_head(){std::unique_ptr const old_head=std::move(head);head=std::move(old_head->next);return old_head;}std::unique_lock wait_for_data(){std::unique_lock head_lock(head_mutex);data_cond.wait(head_lock,[&]{return head!=get_tail();});return std::move(head_lock);}std::unique_ptr wait_pop_head(){std::unique_lock head_lock(wait_for_data());return pop_head();}std::unique_ptr wait_pop_head(T& value){std::unique_lock head_lock(wait_for_data());value=std::move(*head->data);return pop_head();}public:std::shared_ptr wait_and_pop(){std::unique_ptr const old_head=wait_pop_head();return old_head->data;}void wait_and_pop(T& value){std::unique_ptr const old_head=wait_pop_head(value);}
};#include
#include
template
class threadsafe_queue
{
private:std::unique_ptr try_pop_head(){std::lock_guard head_lock(head_mutex);if(head.get()==get_tail()){return std::unique_ptr();}return pop_head();}std::unique_ptr try_pop_head(T& value){std::lock_guard head_lock(head_mutex);if(head.get()==get_tail()){return std::unique_ptr();}value=std::move(*head->data);return pop_head();}public:std::shared_ptr try_pop(){std::unique_ptr const old_head=try_pop_head();return old_head?old_head->data:std::shared_ptr();}bool try_pop(T& value){std::unique_ptr const old_head=try_pop_head(value);return old_head;}bool empty(){std::lock_guard head_lock(head_mutex);return (head==get_tail());}
};
这个查询表作为一个整体,通过单独的操作,对每一个桶进行锁定,并且通过使用 std::shared_mutex 允许读者线程对每一个桶并发访问。
#include
#include
#include
#include
#include
#include
#include template >
class threadsafe_lookup_table
{
private:class bucket_type{private:typedef std::pair bucket_value;typedef std::list bucket_data;typedef typename bucket_data::iterator bucket_iterator;bucket_data data;mutable std::shared_mutex mutex;//1bucket_iterator find_entry_for(Key const& key) const//2{return std::find_if(data.begin(),data.end(),[&](bucket_value const& item){return item.first==key;});}public:Value value_for(Key const& key,Value const& default_value) const{std::shared_lock lock(mutex);//3bucket_iterator const found_entry=find_entry_for(key);return (found_entry==data.end())?default_value : found_entry->second;}void add_or_update_mapping(Key const& key,Value const& value){std::unique_lock lock(mutex);//4bucket_iterator const found_entry=find_entry_for(key);if(found_entry==data.end()){data.push_back(bucket_value(key,value));}else{found_entry->second=value;}}void remove_mapping(Key const& key){std::unique_lock lock(mutex);//5bucket_iterator const found_entry=find_entry_for(key);if(found_entry!=data.end()){data.erase(found_entry);}}};std::vector > buckets;//6Hash hasher;bucket_type& get_bucket(Key const& key) const//7{std::size_t const bucket_index=hasher(key)%buckets.size();return *buckets[bucket_index];}public:typedef Key key_type;typedef Value mapped_type;typedef Hash hash_type;threadsafe_lookup_table(unsigned num_buckets=19, Hash const& hasher_=Hash()):buckets(num_buckets),hasher(hasher_){for(unsigned i=0;ibuckets[i].reset(new bucket_type);}}threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other)=delete;Value value_for(Key const& key,Value const& default_value=Value()) const{return get_bucket(key).value_for(key,default_value);//8}void add_or_update_mapping(Key const& key,Value const& value){get_bucket(key).add_or_update_mapping(key,value);//9}void remove_mapping(Key const& key){get_bucket(key).remove_mapping(key);//10}
};
二叉树的方式,不会对提高并发访问的能力;每一个查找或者修改操作都需要访问根节点,因此,根节点需要上锁。虽然,访问线程在向下移动时,这个锁可以进行释放,但相比横跨整个数据结构的单锁,并没有什么优势。
有序数组是最坏的选择,因为你无法提前言明数组中哪段是有序的,所以你需要用一个锁将整个数组锁起来。
那么就剩哈希表了。假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支特多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。
实现中使用了 std::vector
因为桶的数量是固定的,所以get_bucket()⑦可以无锁调用,⑧⑨⑩也都一样。并且对桶的互斥量上锁,要不就是共享(只读)所有权时③,要不就是在获取唯一(读/写)权时④⑤。这里的互斥量,可适用于每个成员函数。
这三个函数都使用到了find_entry_for()成员函数②,用来确定数据是否在桶中。每一个桶都包含一个“键值-数据”的std::list<>列表,所以添加和删除数据就会很简单。
选择当前状态的快照返回,依次获取所有锁
std::map threadsafe_lookup_table::get_map() const
{std::vector > locks;for(unsigned i=0;ilocks.push_back(std::unique_lock(buckets[i].mutex));}std::map res;for(unsigned i=0;ifor(bucket_iterator it=buckets[i].data.begin();it!=buckets[i].data.end();++it){res.insert(*it);}}return res;
}
#include
#include template
class threadsafe_list
{struct node//1{std::mutex m;std::shared_ptr data;std::unique_ptr next;node()://2next(){}node(T const& value)://3data(std::make_shared(value)){}};node head;public:threadsafe_list(){}~threadsafe_list(){remove_if([](T const&){return true;});}threadsafe_list(threadsafe_list const& other)=delete;threadsafe_list& operator=(threadsafe_list const& other)=delete;void push_front(T const& value){std::unique_ptr new_node(new node(value));//4std::lock_guard lk(head.m);new_node->next=std::move(head.next);//5head.next=std::move(new_node);//6}templatevoid for_each(Function f)//7{node* current=&head;std::unique_lock lk(head.m);//8while(node* const next=current->next.get())//9{std::unique_lock next_lk(next->m);//10lk.unlock();//11f(*next->data);//12current=next;lk=std::move(next_lk);//13}}templatestd::shared_ptr find_first_if(Predicate p)//14{node* current=&head;std::unique_lock lk(head.m);while(node* const next=current->next.get()){std::unique_lock next_lk(next->m);lk.unlock();if(p(*next->data))//15{return next->data;//16}current=next;lk=std::move(next_lk);}return std::shared_ptr();}templatevoid remove_if(Predicate p)//17{node* current=&head;std::unique_lock lk(head.m);while(node* const next=current->next.get()){std::unique_lock next_lk(next->m);if(p(*next->data))//18{std::unique_ptr old_next=std::move(current->next);current->next=std::move(next->next);next_lk.unlock();}//20else{lk.unlock();//21current=next;lk=std::move(next_lk);}}}
};
清单6.13中的threadsafe_list是一个单链表,可从node的结构①中看出。一个默认构造的node作为链表的head,其next指针②指向的是NULL。新节点都通过oush_front()函数添加;构造第一个新节点④,其将会在堆上分配内存③来对数据进行存储,同时将next指针置为NULL。然后,为了设置next的值⑤,需要获取head节点的互斥锁,也就是插入节点到列表的头部,让头节点的head.next指向这个新节点⑥。目前,还没有什么问题:只需要锁住一个互斥量,就能将新的数据添加进入链表,所以不存在死锁的问题。同样,(缓慢的)内存分配操作在锁的范围外,所以锁能保护需要更新的一对指针。那么,再来看一下迭代功能。
首先,来看一下for_each()⑦。这个操作对队列中的每个元素执行Function(函数指针);大多数标准算法库中,都会通过传值方式来执行这个函数,要不就传入一个通用的函数,要不就传入一个有函数操作的类型对象。这种情况下,函数必须接受类型为T的值作为参数。链表中会有一个"手递手"的上锁过程。这个过程开始时,需要锁住hed及节点⑧的互斥量。然后,安全的获取指向下一个节点的指针(使用gt()获取,因为对这个指针没有所有权)。当指针不为NULL⑨,为了继续对数据进行处理,就需要对指向的节点进行上锁⑩。当锁住了那个节点,就可以对上一个节点进行释放了①,并调用指定函数@。当函数执行完成时,就可以更新当前指针所指向的节点(刚刚处理过的节点),并将所有权从next_1k移动移动到lk国。因为for_each传递的每个数据都是能被Function:接受的,所以当需要的时,或需要拷贝到另一个容器的时,或其他情况时,都可以考虑使用这种方式更新每个元素。如果函数的行为没什么问题,这种方式是安全的,因为在获取节点互斥锁时,已经获取锁的节点正在被函数所处理。
那么,所有的互斥量中会有死锁或条件竞争吗?答案无疑是“否”,要看提供的函数(谓词)是否有良好的行为。迭代通常都使用一种方式,从hed节点开始,并且在释放当前节点锁之前,将下一个节点的互斥量锁住,所以就不可能会有不同线程有不同的上锁顺序。唯一可能出现条件竞争的地方就在remove1f()@中删除已有节点的时候。因为,操作在解锁互斥量后进行(其导致的未定义行为,可对已上锁的互斥量进行破坏)。不过,可以确定这的确是安全的,因为现在还特有前一个节点(当前节点)的互斥锁,所以不会有新的线程尝试去获取正在删除节点的互斥锁。
本章开始, 我们讨论了设计并发数据结构的意义, 并给出了一些指导意见。 然后, 通过设计一些通用的数据结构(栈,队列, 哈希表和单链表), 探究了指导意见在实践中的实际意义和应用, 并使用锁来保护数据和避免数据竞争。 现在,应该回看一下本章实现的那些数据结构, 再回顾一下如何增加并发访问的几率, 以及哪里会存在潜在条件竞争
设计无锁并发数据结构
无锁结构中内存管理技术
对无锁数据结构设计的简单指导
使用互斥量、条件变量,以及期望值可以用来同步阻塞算法和数据结构。调用库函数将会挂起执行线程,直到其他线程完成某个特定的动作。库函数将调用阻塞操作来对线程进行阻塞,在阻塞移除前线程无法继续自己的任务。通常,操作系统会完全挂起一个阻塞线程(并将其时间片交给其他线程),直到其被其他线程“解阻塞”;”解阻塞”的方式很多,比如解锁一个互斥锁、通知条件变量达成,或让“期望”就绪。
不使用阻塞库的数据结构和算法被称为“无阻塞结构”。不过,无阻塞的数据结构并非都是无锁的,那么就让我们见识一下各种各样的无阻塞数据结构吧!
#include class spinlock_mutex
{std::atomic_flag flag;
public:spinlock_mutex():flag(ATOMIC_FLAG_INIT){}void lock(){while(flag.test_and_set(std::memory_order_acquire));}void unlock(){flag.clear(std::memory_order_release);}
};
这段代码没有调用任何阻塞函数,lock()只是让循环特续调用test_and_set(),并返回false。这就是为什么取名为“自旋锁"的原因一代码“自旋”于循环当中。所以没有阻塞调用,任意代码使用该spinlock_mutex互斥量来保护共享数据都是非阻塞的。不过,自旋锁并不是无锁结构。这里用了一个锁,并且一次能锁住一个线程。让我们来看一下无锁结构的具体定义,这将有助于你判断那些类型的数据结构是无锁的。这些类型有:
无锁结构就意味着线程可以并发的访问数据结构,线程不能做相同的操作;一个无锁队列可能允许一个线程进行压入数据,另一个线程弹出数据,当有两个线程同时尝试添加元素时,这个数据结构将被破坏。不仅如此,当其中一个访问线程被调度器中途挂起时,其他线程必须能够继续完成自己的工作,而无需等待挂起线程。
具有“比较/交换”操作的数据结构,通常在“比较/交换"操作实现中都有一个循环。使用“比较/交换”操作的原因:当有其他线程同时对指定的数据进行修改时,代码将尝试恢复数据。当其他线程被挂起时,“比较/交换"操作执行成功,这样的代码就是无锁的。当执行失败时,就需要一个自旋锁,且这个结构就是“无阻塞-有锁”的结构。
无锁算法中的循环会让一些线程处于”饥饿”状态。如有线程在”错误”时间执行,那么第一个线程将会不停的尝试所要完成的操作(其他程序继续执行)。“无锁-无等待”数据结构的出现,就为了避免这种问题。
无等待数据结构:首先是无锁数据结构,并且每个线程都能在有限的时间内完成操作,暂且不管其他线程是如何工作的。由于会和别的线程产生冲突,所以算法可以进行无数次尝试,因此并不是无等待的。本章的大多数例子都有一种特性一对compare_exchange_weak或compare_exchange_strong操作进行循环,并且循环次数没有上限。操作系统对线程进行进行管理,有些线程的循环次数非常多,有些线程的循环次数就非常少。因此,这些操作时无等待的。
正确实现一个无锁的结构十分困准。因为,要保证每一个线程都能在有限的时间里完成操作,就需要保证每一个操作可以被一次性执行完;当线程执行到某个操作时,应该不会让其他线程的操作失败。这就会让算法中所使用到的操作变的相当复杂。
考虑到获取无锁或无等待的数据结构所有权很困难,这就有理由来实现一个数据结构了;需要保证所要得获益要大于实现成本。那么,就先来找一下实现成本和所得获益的平衡点吧!
另一方面,当不能限制访问数据结构的线程数量时,就需要注意不变量的状态,或选择替代品来保持不变量的状态。同时,还需要注意操作的顺序约束。为了避免未定义行为,及相关的数据竞争,必须使用原子操作对修改操作进行限制。不过,仅使用原子操作是不够的:需要确定被其他线程看到的修改,是遵循正确的顺序。
#include template
class lock_free_stack
{
private:struct node{T data;node* next;node(T const& data_):data(data_){}};std::atomic head;
public:void push(T const& data){node* const new_node=new node(data);new_node->next=head.load();//如果head更新了,这条语句要春来一遍while(!head.compare_exchange_weak(new_node->next,new_node));}//自己乱加的T data pop(){node* old_node = head.load();while(old_node && !head.compare_exchange_weak(old_node, head->next));return old_node ? old_node->data : T()/*返回默认值*/;}
};
bool compare_exchange_weak( T& expected, T desired)
当前值与期望值(expect)相等时,修改当前值为设定值(desired),返回true
当前值与期望值(expect)不等时,将期望值(expect)修改为当前值,返回false
weak版和strong版的区别:
weak版本的CAS允许偶然出乎意料的返回(比如在字段值和期待值一样的时候却返回了false),不过在一些循环算法中,这是可以接受的。通常它比起strong有更高的性能。
代码的亮点是使用“比较/交换”操作:返回false时,因为比较失败(例如,head被其他线程锁修改),会使用head中的内容更新new_node->next(第一个参数)的内容。循环中不需要每次都重新加载head指针,因为编译器会完成这件事。同样, 因为循环可能直接就失败了,所以使用compare_exchange_weak要好于使用compare_exchange_strong。
第3章中介绍栈结构时,了解了在返回值的时候会出现异常安全问题:有异常被抛出时,复制的值将丢失。这种情况下, 传入引用是一种可以接受的解决方案;这样就能保证当有异常抛出时,栈上的数据不会丢失。不幸的是,不能这样做; 只能在单一线程对值进行返回时,才进行拷贝以确保拷贝操作的安全性,这就意味着在拷贝结束后这个节点就会被删除。 因此,通过引用获取返回值的方式没有任何优点:直接返回也是可以的。若想要安全的返回,必须使用第3章中的其他方法:返回指向数据值的(智能)指针。
#include
#include template
class lock_free_stack
{
private:struct node{std::shared_ptr data;node* next;node(T const& data_):data(std::make_shared(data_)){}};std::atomic head;
public:void push(T const& data){node* const new_node=new node(data);new_node->next=head.load();while(!head.compare_exchange_weak(new_node->next,new_node));}std::shared_ptr pop(){node* old_head=head.load();while(old_head &&!head.compare_exchange_weak(old_head,old_head->next));return old_head ? old_head->data : std::shared_ptr();}
};
高负荷的情况,因为其他线程在初始化之后都能进入pop(),所以to_be_deleted链表将会无界的增加,并且会再次泄露。不存在任何静态的情况时,就得为回收节点寻找替代机制。关键是要确定无线程访问给定节点,这样给定节点就能被回收。最简单的替换机制就是使用风险指针(hazard pointer)。
hazard pointer
7.9 无锁栈——使用无锁 std::shared_ptr<> 的实现
template
class lock_free_stack
{
private:struct node{std::shared_ptr data;std::shared_ptr next;node(T const& data_):data(std::make_shared(data_)){}};std::shared_ptr head;
public:void push(T const& data){std::shared_ptr const new_node=std::make_shared(data);new_node->next=std::atomic_load(&head);while(!std::atomic_compare_exchange_weak(&head,&new_node->next,new_node));}std::shared_ptr pop(){std::shared_ptr old_head=std::atomic_load(&head);while(old_head && !std::atomic_compare_exchange_weak(&head,&old_head,std::atomic_load(&old_head->next)));if(old_head) {std::atomic_store(&old_head->next,std::shared_ptr());return old_head->data;}return std::shared_ptr();}~lock_free_stack(){while(pop());}
};
7.11-7.12使用分离引用计数(内外部引用计数)的方式push/pop一个节点到无锁栈中
#include
#include template
class lock_free_stack
{
private:struct node;struct counted_node_ptr{int external_count;node* ptr;};struct node{std::shared_ptr data;std::atomic internal_count;counted_node_ptr next;node(T const& data_):data(std::make_shared(data_)),internal_count(0){}};std::atomic head;
public:~lock_free_stack(){while(pop());}void push(T const& data){counted_node_ptr new_node;new_node.ptr=new node(data);new_node.external_count=1;new_node.ptr->next=head.load();while(!head.compare_exchange_weak(new_node.ptr->next,new_node));}
};#include
#include template
class lock_free_stack
{
private:struct counted_node_ptr{int external_count;node* ptr;};std::atomic head;void increase_head_count(counted_node_ptr& old_counter){counted_node_ptr new_counter;do{new_counter=old_counter;++new_counter.external_count;}while(!head.compare_exchange_strong(old_counter,new_counter));old_counter.external_count=new_counter.external_count;}
public:std::shared_ptr pop(){counted_node_ptr old_head=head.load();for(;;){increase_head_count(old_head);node* const ptr=old_head.ptr;if(!ptr){return std::shared_ptr();}if(head.compare_exchange_strong(old_head,ptr->next)){std::shared_ptr res;res.swap(ptr->data);int const count_increase=old_head.external_count-2;if(ptr->internal_count.fetch_add(count_increase)==-count_increase){delete ptr;}return res;}else if(ptr->internal_count.fetch_sub(1)==1){delete ptr;}}}
};
再次优化内存序
7.13 基于引用计数和松散原子操作的无锁栈
//Download by www.cctry.com
#include
#include template
class lock_free_stack
{
private:struct node;struct counted_node_ptr{int external_count;node* ptr;};struct node{std::shared_ptr data;std::atomic internal_count;counted_node_ptr next;node(T const& data_):data(std::make_shared(data_)),internal_count(0){}};std::atomic head;void increase_head_count(counted_node_ptr& old_counter){counted_node_ptr new_counter;do{new_counter=old_counter;++new_counter.external_count;}while(!head.compare_exchange_strong(old_counter,new_counter,std::memory_order_acquire,std::memory_order_relaxed));old_counter.external_count=new_counter.external_count;}
public:~lock_free_stack(){while(pop());}void push(T const& data){counted_node_ptr new_node;new_node.ptr=new node(data);new_node.external_count=1;new_node.ptr->next=head.load(std::memory_order_relaxed);while(!head.compare_exchange_weak(new_node.ptr->next,new_node,std::memory_order_release,std::memory_order_relaxed));}std::shared_ptr pop(){counted_node_ptr old_head=head.load(std::memory_order_relaxed);for(;;){increase_head_count(old_head);node* const ptr=old_head.ptr;if(!ptr){return std::shared_ptr();}if(head.compare_exchange_strong(old_head,ptr->next,std::memory_order_relaxed)){std::shared_ptr res;res.swap(ptr->data);int const count_increase=old_head.external_count-2;if(ptr->internal_count.fetch_add(count_increase,std::memory_order_release)==-count_increase){delete ptr;}return res;}else if(ptr->internal_count.fetch_add(-1,std::memory_order_relaxed)==1){ptr->internal_count.load(std::memory_order_acquire);delete ptr;}}}
};
做push()的线程,会先构造数据项和节点,再设置head。做pop()的线程,会先加载head的值,再做在循环中对head做“比较/交换”操作,并增加引用计数,再读取对应的node节点,获取next的指向值,现在可以看到一组需求关系。next的值是普通的非原子对象,所以为了保证读取安全,必须确定存储(推送线程)和加载(弹出线程)的先行关系。因为原子操作就是push()函数中的compare_exchange_weak(),需要释放操作来获取两个线程间的先行关系,compare_exchange_weak()必须是 std::memory_order_release 或更严格的内存序。当compare_exchange_weak()调用失败,什么都不会改变,并且可以持续循环下去,所以使用 std::memory_order_relaxed 就足够了。
pop()的实现呢? 为了确定先行关系, 必须在访问next值之前使用 std::memory_order_acquire 或更严格的内存序操作。 因为,increase_head_count()中使用compare_exchange_strong()就获取next指针指向的旧值,所以要其获取成功就需要确定内存序。如同调用push()那样,当交换失败,循环会继续,所以在失败时使用松散的内存序:
清单7.16 使用带有引用计数tail,实现的无锁队列中的push()
#include
#include
template
class lock_free_queue
{
private:struct node;struct counted_node_ptr{int external_count;node* ptr;};std::atomic head;std::atomic tail;struct node_counter{unsigned internal_count:30;unsigned external_counters:2;};struct node{std::atomic data;std::atomic count;counted_node_ptr next;node(){node_counter new_count;new_count.internal_count=0;new_count.external_counters=2;count.store(new_count);next.ptr=nullptr;next.external_count=0;}};
public:void push(T new_value){std::unique_ptr new_data(new T(new_value));counted_node_ptr new_next;new_next.ptr=new node;new_next.external_count=1;counted_node_ptr old_tail=tail.load();for(;;){increase_external_count(tail,old_tail);T* old_data=nullptr;if(old_tail.ptr->data.compare_exchange_strong(old_data,new_data.get())){old_tail.ptr->next=new_next;old_tail=tail.exchange(new_next);free_external_counter(old_tail);new_data.release();break;}old_tail.ptr->release_ref();}}
};
node初始化时,internal_count设置为0,external_counter设置为2④,因为当新节点加入队列中时,都会被tail和上一个节点的next指针所指向。当其他节点push时,tail指向新的节点,旧节点的external_counter减为1。当节点pop时,external_counter减为0。可以delete了。
清单7.17 使用尾部引用计数, 将节点从无锁队列中弹出
#include
#include
template
class lock_free_queue
{
private:
#include "node.hpp"struct counted_node_ptr{int external_count;node* ptr;};std::atomic head;std::atomic tail;
public:std::unique_ptr pop(){counted_node_ptr old_head=head.load(std::memory_order_relaxed);for(;;){increase_external_count(head,old_head);node* const ptr=old_head.ptr;if(ptr==tail.load().ptr){ptr->release_ref();return std::unique_ptr();}if(head.compare_exchange_strong(old_head,ptr->next)){T* const res=ptr->data.exchange(nullptr);free_external_counter(old_head);return std::unique_ptr(res);}ptr->release_ref();}}
};
清单7.18 在无锁队列中释放一个节点引用
#include
#include
template
class lock_free_queue
{
private: struct node;struct node_counter{unsigned internal_count:30;unsigned external_counters:2;};struct counted_node_ptr{int external_count;node* ptr;};struct node{std::shared_ptr data;std::atomic internal_count;counted_node_ptr next;node(T const& data_):data(std::make_shared(data_)),internal_count(0){}void release_ref(){node_counter old_counter=count.load(std::memory_order_relaxed);node_counter new_counter;do{new_counter=old_counter;--new_counter.internal_count;}while(!count.compare_exchange_strong(old_counter,new_counter,std::memory_order_acquire,std::memory_order_relaxed));if(!new_counter.internal_count &&!new_counter.external_counters){delete this;}}};std::atomic head;std::atomic tail;
};
清单7.19 从无锁队列中获取一个节点的引用
template
class lock_free_queue
{
private:static void increase_external_count(std::atomic& counter,counted_node_ptr& old_counter){counted_node_ptr new_counter;do{new_counter=old_counter;++new_counter.external_count;}while(!counter.compare_exchange_strong(old_counter,new_counter,std::memory_order_acquire,std::memory_order_relaxed));old_counter.external_count=new_counter.external_count;}
};
清单7.20 无锁队列中释放节点外部计数器
template
class lock_free_queue
{
private:static void free_external_counter(counted_node_ptr &old_node_ptr){node* const ptr=old_node_ptr.ptr;int const count_increase=old_node_ptr.external_count-2;node_counter old_counter=ptr->count.load(std::memory_order_relaxed);node_counter new_counter;do{new_counter=old_counter;--new_counter.external_counters;new_counter.internal_count+=count_increase;}while(!ptr->count.compare_exchange_strong(old_counter,new_counter,std::memory_order_acquire,std::memory_order_relaxed));if(!new_counter.internal_count &&!new_counter.external_counters){delete ptr;}}
};
指导建议: 使用 std::memory_order_seq_cst 的原型
指导建议: 对无锁内存的回收策略
引用计数
指导建议: 小心ABA问题
1 线程1读取原子变量X,并且发现其值是A。
2 线程1对这个值进行一些操作,比如,解引用(当其是一个指针的时候),或做查询,或其他操作。
3 操作系统将线程1挂起。
4 其他线程对x执行一些操作,并且将其值改为B。
5 另一个线程对相关的数据进行修改(线程1持有),让其不再合法。可能会在释放指针指向的内存时,代码产生剧烈的反应(大问题);或者只是修改了相关值而已(小问题)。
6 再来一个线程将x的值改回为A。如果A是一个指针,那么其可能指向一个新的对象,只是与旧对象共享同一个地址而已。
7.线程1继续运行,并且对x执行"比较/交换"操作,将A进行对比。这里,"比较/交换”成功(因为其值还是A),不过这是一个错误的A(the wrong A value)。从第2步中读取的数据不再合法,但是线程1无法言明这个问题,并且之后的操作将会损坏数据结构。解决这个问题的一般方法是, 让变量x中包含一个ABA计数器。 “比较/交换”会对加入计数器的x进行操作, 每次的值都不一样,计数随之增长。 所以, x还是原值的前提下, 即使有线程对x进行修改, “比较/交换”还是会失败。
“ABA问题”在使用释放链表和循环使用节点的算法中很是普遍, 而将节点返回给分配器, 则不会引起这个问题。
指导建议: 识别忙等待循环和帮助其他线程
最终队列的例子中,已经见识到线程在执行push操作时,必须等待另一个push操作流程的完成。等待线程就会被孤立,将会陷入到忙等待循环中,当线程尝试失败的时候会继续循环,这会浪费CPU的计算周期。忙等待循环结束时,就象一个阻塞操作解除,和使用互斥锁的行为一样。通过对算法的修改,当之前的线程还没有完成操作前,让等待线程执行未完成的步骤,就能让忙等待的线程不再被阻塞。队列例中需要将一个数据成员转换为一个原子变量,而不是使用非原子变量和使用“比较/交换”操作来做这件事;要是在更加复杂的数据结构中,这将需要更加多的变化来满足需求。
从第6章中的基于锁的数据结构起,本章简要的描述了一些无锁数据结构的实现(通过实现栈和队列)。这个过程中,需要小心使用原子操作的内存序,为了保证无数据竞争,以及让每个线程看到一个数据结构实例。也能了解到,在无锁结构中对内存的管理是越来越难。还有,如何通过帮助线程的方式,来避免忙等待循环。
设计无锁数据结构是一项很困难的任务,并且很容易犯错;不过,这样的数据结构在某些重要情况下可对其性能进行增强。但愿通过本章的的一些例子,以及一些指导意见,可以帮助你设计出自己的无锁数据结构,或实现一份研究报告中的数据结构,或用以发现离职同事代码中的bug。
不管在线程间共享怎么样的数据,需要考虑数据结构应该如何使用,并且如何在线程间同步数据。通过设计并发访问的数据结构,就能对数据结构进行功能性封装,其他部分的代码就着重于对数据的执行,而非数据的同步。在第8章中看到类似的行为:将并发数据结构转为一般的并发代码。并行算法是使用多线程的方式提高性能,因为算法需要工作线程共享它们的数据,所以对并发数据结构的选择就很关键了。
清单8.1 使用栈的并行快速排序算法——等待数据块排序
#include
#include
#include
#include
template
struct sorter
{struct chunk_to_sort{std::list data;std::promise > promise;};thread_safe_stack chunks;std::vector threads;unsigned const max_thread_count;std::atomic end_of_data;sorter():max_thread_count(std::thread::hardware_concurrency()-1),end_of_data(false){}~sorter(){end_of_data=true;for(unsigned i=0;ithreads[i].join();}}void try_sort_chunk(){boost::shared_ptr chunk=chunks.pop();if(chunk){sort_chunk(chunk);}}std::list do_sort(std::list& chunk_data){if(chunk_data.empty()){return chunk_data;}std::list result;result.splice(result.begin(),chunk_data,chunk_data.begin());T const& partition_val=*result.begin();typename std::list::iterator divide_point=std::partition(chunk_data.begin(),chunk_data.end(),[&](T const& val){return val > new_lower=new_lower_chunk.promise.get_future();chunks.push(std::move(new_lower_chunk));if(threads.size()threads.push_back(std::thread(&sorter::sort_thread,this));}std::list new_higher(do_sort(chunk_data));result.splice(result.end(),new_higher);while(new_lower.wait_for(std::chrono::seconds(0)) !=std::future_status::ready){try_sort_chunk();}result.splice(result.begin(),new_lower.get());return result;}void sort_chunk(boost::shared_ptr const& chunk){chunk->promise.set_value(do_sort(chunk->data));}void sort_thread(){while(!end_of_data){try_sort_chunk();std::this_thread::yield();}}
};template
std::list parallel_quick_sort(std::list input)
{if(input.empty()){return input;}sorter s;return s.do_sort(input);
}
几种划分方法:1,处理前划分;2,递归划分(都需要事先知道数据的长度固定),还有上面的那种划分方式。事情并非总是这样好解决,当数据是动态生成,或是通过外部输入,那么这里的办法就不适用了。这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。
或者通过流水线方式:pipeline方式
随着处理器数量的增加,另一个问题就会来影响性能:多个处理器尝试访问同一个数据。
当两个线程并发的在不同处理器上执行,并且对同一数据进行读取,通常不会出现问题;因为数据将会拷贝到每个线程的缓存中,并且让两个处理器同时进行处理。不过,当有线程对数据进行修改的时候,这个修改需要更新到其他核芯的缓存中去,就要耗费一定的时间。根据线程的操作性质,以及使用到的内存序,这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。即便是精确的时间取决于硬件的物理结构,不过根据CPU指令,这是一个特别特别慢的操作,相当于执行成百上千个独立指令。
std::atomic counter(0);
void processing_loop()
{while(counter.fetch_add(1,std::memory_order_relaxed)<100000000){do_something();}
}
多线程循环中counter的数据将在每个缓存中传递若干次,这就叫做乒乓缓存(cache ping-pong)。这种情况会对应用的性能有着重大的影响。当一个处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情,所以对于整个应用来说这是一个坏消息。
缓存行是共享的(即使没有数据存在),因此使用伪共享来称呼这种方式。
考虑竞争(contention),伪共享(false sharing)和数据距离(data proximity)
#include
#include
#include
#include
template
struct accumulate_block
{T operator()(Iterator first,Iterator last) //1{return std::accumulate(first,last,T());//2}
};template
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length=std::distance(first,last);if(!length)return init;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;std::vector > futures(num_threads-1); //3std::vector threads(num_threads-1);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);std::packaged_task task( //4accumulate_block());futures[i]=task.get_future();//5threads[i]=std::thread(std::move(task),block_start,block_end);//6block_start=block_end;}T last_result=accumulate_block(block_start,last);//7std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));T result=init;for(unsigned long i=0;i<(num_threads-1);++i){result+=futures[i].get();}result += last_result;return result;
}
class join_threads
{std::vector& threads;
public:explicit join_threads(std::vector& threads_):threads(threads_){}~join_threads(){for(unsigned long i=0;iif(threads[i].joinable())threads[i].join();}}
};template
struct accumulate_block
{T operator()(Iterator first,Iterator last){return std::accumulate(first,last,T());}
};template
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length=std::distance(first,last);if(!length)return init;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;std::vector > futures(num_threads-1);std::vector threads(num_threads-1);join_threads joiner(threads);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);std::packaged_task task(accumulate_block());futures[i]=task.get_future();threads[i]=std::thread(std::move(task),block_start,block_end);block_start=block_end;}T last_result=accumulate_block(block_start,last);T result=init;for(unsigned long i=0;i<(num_threads-1);++i){result+=futures[i].get();}result += last_result;return result;
}
#include
#include
template
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length=std::distance(first,last);//1unsigned long const max_chunk_size=25;if(length<=max_chunk_size){return std::accumulate(first,last,init);//2}else{Iterator mid_point=first;std::advance(mid_point,length/2);//3std::future first_half_result=std::async(parallel_accumulate,//4first,mid_point,init);T second_half_result=parallel_accumulate(mid_point,last,T());//5return first_half_result.get()+second_half_result;//6}
}
将这个数据块分成两部分,然后再生成一个异步任务对另一半数据进行处理④。第二半的数据是通过直接的递归调用来处理的⑤,之后将两个块的结果加和到一起⑥。标准库能保证 std::async 的调用能够充分的利用硬件线程,并且不会产生线程的超额认购,一些“异步”调用是在调用get()⑥后同步执行的。
优雅的地方,不仅在于利用硬件并发的优势,并且还能保证异常安全。如果有异常在递归调用⑤中抛出,通过调用std::async ④所产生的期望值,将会在异常传播时被销毁。这就需要依次等待异步任务的完成,因此也能避免悬空线程的出现。另外,当异步任务抛出异常,且被期望值所捕获,在对get()⑥调用的时候,期望值中存储的异常会再次抛出。
当程序“串行”部分的时间用fs来表示, 那么性能增益§就可以通过处理器数量(N)进行估计:
P = 1 /(fs + (1–fs)/N)
while(true)
{event_data event=get_event();if(event.type==quit)break;process(event);
}
通过使用并发分离关注,可以将一个很长的任务交给一个全新的线程,并且留下一个专用的GUI线程来处理这些事件。线程可以通过简单的机制进行通讯,而不是将事件处理代码和任务代码混在一起。
#include
#include
#include
std::thread task_thread;
std::atomic task_cancelled(false);void gui_thread()
{while(true){event_data event=get_event();if(event.type==quit)break;process(event);}
}void task()
{while(!task_complete() && !task_cancelled){do_next_operation();}if(task_cancelled){perform_cleanup();}else{post_gui_event(task_complete);}
}void process(event_data const& event)//非阻塞抛线程
{switch(event.type){case start_task:task_cancelled=false;task_thread=std::thread(task);break;case stop_task:task_cancelled=true;task_thread.join();break;case task_complete:task_thread.join();display_results();break;default://...}
}
C++标准库中三个标准函数的并行实现
为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。事先知道元素数量,所以可以处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std:thread:hardware_concurrency()来决定线程的数量。同样,这些元素都能被独立的处理,所以可以使用连续的数据块来避免伪共享(详见8.2.3节)。
这里的算法有点类似于并行版的 std::accumulate (详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了一个指定功能的函数。因为不需要返回结果,但想要将异常传递给调用者,就需要使用 std::packaged_task 和 std::future 机制对线程中的异常进行转移。
清单8.7
#include
#include
struct join_threads
{join_threads(std::vector&){}
};template
void parallel_for_each(Iterator first,Iterator last,Func f)
{unsigned long const length=std::distance(first,last);if(!length)return;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;std::vector > futures(num_threads-1);std::vector threads(num_threads-1);join_threads joiner(threads);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);std::packaged_task task([=](){std::for_each(block_start,block_end,f);});futures[i]=task.get_future();threads[i]=std::thread(std::move(task));block_start=block_end;}std::for_each(block_start,last,f);for(unsigned long i=0;i<(num_threads-1);++i){futures[i].get();}
}
最重要的不同在于期望值向量对 std::future类型①变量进行存储, 因为工作线程不会返回值, 并且简单的Lambda函数会对block_start到block_end上的任务②执行f函数, 为了避免传入线程的构造函数③。 工作线程不需要返回一个值时, 调用futures[i].get()④只是提供检索工作线程异常的方法; 如果不想把异常传递出去, 就可以省略这一步。
清单8.8
#include
#include
template
void parallel_for_each(Iterator first,Iterator last,Func f)
{unsigned long const length=std::distance(first,last);if(!length)return;unsigned long const min_per_thread=25;if(length<(2*min_per_thread)){std::for_each(first,last,f);}else{Iterator const mid_point=first+length/2;std::future first_half=std::async(¶llel_for_each,first,mid_point,f);parallel_for_each(mid_point,last,f);first_half.get();}
}
当第一个元素就满足查找标准, 就没有必要对其他元素进行搜索了。如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。
中断其他线程的一个办法就是使用原子变量作为一个标识,处理过每一个元素后就对这个标识进行检查。如果标识被设置,就有线程找到了匹配元素,所以算法就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。
如何返回值和传播异常呢?现在有两个选择。可以使用一个期望值数组,使用 std::packaged_task 来转移值和异常,主线程上对返回值和异常进行处理;或者使用 std::promise 对工作线程上的最终结果直接进行设置。这完全依赖于想怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用 std::promise 对异常和最终值进行设置。另外,如果想让其他工作线程继续查找,可以使用 std::packaged_task 来存储所有的异常,当线程没有找到匹配元素时,异常将再次抛出。
选择 std::promise ,因为其行为和 std::find 更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。此, 在所有线程结束前,获取期望值上的结果。如果被期望值阻塞住,所要查找的值不在范围内,就会持续的等待下去。
#include
#include
struct join_threads
{join_threads(std::vector&){}
};template
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{struct find_element//1{void operator()(Iterator begin,Iterator end,MatchType match,std::promise* result,std::atomic* done_flag){try{for(;(begin!=end) && !done_flag->load();++begin)//2{if(*begin==match){result->set_value(begin);//3done_flag->store(true);//4return;}}}catch(...)//5{try{result->set_exception(std::current_exception());//6 和set_value一样只能设置一次done_flag->store(true);}catch(...)//7{}}}};unsigned long const length=std::distance(first,last);if(!length)return last;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;std::promise result;std::atomic done_flag(false);std::vector threads(num_threads-1);{join_threads joiner(threads);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);threads[i]=std::thread(find_element(),block_start,block_end,match,&result,&done_flag);block_start=block_end;}find_element()(block_start,last,match,&result,&done_flag);}if(!done_flag.load()){return last;}return result.get_future().get();//8
}
如果有一个异常被抛出,就会被通用处理代码⑤捕获,并且在承诺值⑥中尝试存储前,对done_flag进行设置。如果对应promise已经被设置,设置在承诺值上的值可能会抛出一个异常,所以这里⑦发生的任何异常,都可以捕获并丢弃。
当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_f1ag被设置,那么其他线程将会终止。如果多线程同时找到匹配值或抛出异常,它们将会对承诺值产生竞争。不过,这是良性的条件竞争;因为,成功的竞争者会作为“第一个”返回线程,因此这个结果可以接受。
清单8.10
#include
#include
template
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,std::atomic& done)
{try{unsigned long const length=std::distance(first,last);unsigned long const min_per_thread=25;if(length<(2*min_per_thread)){for(;(first!=last) && !done.load();++first){if(*first==match){done=true;return first;}}return last;}else{Iterator const mid_point=first+(length/2);std::future async_result=std::async(¶llel_find_impl,mid_point,last,match,std::ref(done));Iterator const direct_result=parallel_find_impl(first,mid_point,match,done);return (direct_result==mid_point)?async_result.get():direct_result;}}catch(...){done=true;throw;}
}template
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{std::atomic done(false);return parallel_find_impl(first,last,match,done);
}
std::async 可以用来提供“异常-安全”和“异常-传播”特性。 如果直接递归抛出异常, 期望值的析构函数就能让异步执行的线程提前结束;
[1, 2, 3, 4, 5] --> [1, 3(1+2), 6(1+2+3), 10(1+2+3+4), 15(1+2+3+4+5)]
8.11
确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。
#include
#include
#include
struct join_threads
{join_threads(std::vector&){}
};template
void parallel_partial_sum(Iterator first,Iterator last)
{typedef typename Iterator::value_type value_type;struct process_chunk{void operator()(Iterator begin,Iterator last,std::future* previous_end_value,std::promise* end_value){try{Iterator end=last;++end;std::partial_sum(begin,end,begin);if(previous_end_value){value_type& addend=previous_end_value->get();*last+=addend;if(end_value){end_value->set_value(*last);}std::for_each(begin,last,[addend](value_type& item){item+=addend;});}else if(end_value){end_value->set_value(*last);}}catch(...){if(end_value){end_value->set_exception(std::current_exception());}else{throw;}}}};unsigned long const length=std::distance(first,last);if(!length)return last;unsigned long const min_per_thread=25;unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;unsigned long const hardware_threads=std::thread::hardware_concurrency();unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);unsigned long const block_size=length/num_threads;typedef typename Iterator::value_type value_type;std::vector threads(num_threads-1);std::vector >end_values(num_threads-1);std::vector >previous_end_values;previous_end_values.reserve(num_threads-1);join_threads joiner(threads);Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_last=block_start;std::advance(block_last,block_size-1);threads[i]=std::thread(process_chunk(),block_start,block_last,(i!=0)?&previous_end_values[i-1]:0,&end_values[i]);block_start=block_last;++block_start;previous_end_values.push_back(end_values[i].get_future());}Iterator final_element=block_start;std::advance(final_element,std::distance(block_start,last)-1);process_chunk()(block_start,final_element,(num_threads>1)?&previous_end_values.back():0,0);
}
#include
#include
class barrier
{unsigned const count;std::atomic spaces;std::atomic generation;
public:explicit barrier(unsigned count_)://1count(count_),spaces(count),generation(0){}void wait(){unsigned const my_generation=generation;//2if(!--spaces)//3{spaces=count;//4++generation;//5}else{while(generation==my_generation)//6std::this_thread::yield();//7}}
};
这个实现中,用一定数量的"座位"构造了一个barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces.与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,以向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,那么线程会继续等待。这个实现使用了一个简单的自旋锁⑥,对generation的检查会在wait()开始的时候进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,在等待的时候使用yield()⑦就不会让CPU处于忙等待的状态。
struct barrier
{std::atomic count;std::atomic spaces;std::atomic generation;barrier(unsigned count_):count(count_),spaces(count_),generation(0){}void wait(){unsigned const gen=generation.load();if(!--spaces){spaces=count.load();++generation;}else{while(generation.load()==gen){std::this_thread::yield();}}}void done_waiting(){--count;if(!--spaces){spaces=count.load();++generation;}}
};
template
void parallel_partial_sum(Iterator first,Iterator last)
{typedef typename Iterator::value_type value_type;struct process_element{void operator()(Iterator first,Iterator last,std::vector& buffer,unsigned i,barrier& b){value_type& ith_element=*(first+i);bool update_source=false;for(unsigned step=0,stride=1;stride<=i;++step,stride*=2){value_type const& source=(step%2)?buffer[i]:ith_element;value_type& dest=(step%2)?ith_element:buffer[i];value_type const& addend=(step%2)?buffer[i-stride]:*(first+i-stride);dest=source+addend;update_source=!(step%2);b.wait();}if(update_source){ith_element=buffer[i];}b.done_waiting();}};unsigned long const length=std::distance(first,last);if(length<=1)return;std::vector buffer(length);barrier b(length);std::vector threads(length-1);join_threads joiner(threads);Iterator block_start=first;for(unsigned long i=0;i<(length-1);++i){threads[i]=std::thread(process_element(),first,last,std::ref(buffer),i,std::ref(b));}process_element()(first,last,buffer,length-1,b);
}
不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定str1d的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④。然后,在开始下一步前,会在栅栏处等待⑤。当strid超出了给定数据的范围,当最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,在调用栅栏中的done_waiting()函数⑦。
本章我们讨论了很多东西。我们从划分线程间的工作开始(比如,数据提前划分或让线程形成流水线)。之后,以低层
次视角来看多线程下的性能问题,顺带了解了伪共享和数据通讯:了解访问数据的模式对性能的影响。再后,了解了
附加注意事项是如何影响并发代码设计的,比如:异常安全和可扩展性。最后,用一些并行算法实现来结束了本章,
设计这些并行算法实现时碰到的问题,在设计其他并行代码的时候也会遇到。
线程池
处理线程池中任务的依赖关系
池中线程如何获取任务
中断线程
#include
#include
#include
struct join_threads
{join_threads(std::vector&){}
};class thread_pool
{std::atomic_bool done;thread_safe_queue > work_queue;std::vector threads;join_threads joiner;void worker_thread(){while(!done){std::function task;if(work_queue.try_pop(task)){task();}else{std::this_thread::yield();}}}
public:thread_pool():done(false),joiner(threads){unsigned const thread_count=std::thread::hardware_concurrency();try{for(unsigned i=0;ithreads.push_back(std::thread(&thread_pool::worker_thread,this));}}catch(...){done=true;throw;}}~thread_pool(){done=true;}templatevoid submit(FunctionType f){work_queue.push(std::function(f));}
};
注意成员声明的顺序很重要:done标志和worker_queue必须在threads数组之前声明,而数据必须在joiner前声明。这就能确保成员以正确的顺序销毁;比如,所有线程都停止运行时,队列就可以安全的销毁。
清单9.2
#include
#include
#include
#include
#include
#include class function_wrapper
{struct impl_base {virtual void call()=0;virtual ~impl_base() {}};std::unique_ptr impl;templatestruct impl_type: impl_base{F f;impl_type(F&& f_): f(std::move(f_)) {}void call() { f(); }};
public:templatefunction_wrapper(F&& f):impl(new impl_type(std::move(f))){}void call() { impl->call(); }function_wrapper(function_wrapper&& other):impl(std::move(other.impl)){}function_wrapper& operator=(function_wrapper&& other){impl=std::move(other.impl);return *this;}function_wrapper(const function_wrapper&)=delete;function_wrapper(function_wrapper&)=delete;function_wrapper& operator=(const function_wrapper&)=delete;
};class thread_pool
{
public:std::deque work_queue;templatestd::future::type>submit(FunctionType f){typedef typename std::result_of::type result_type;std::packaged_task task(std::move(f));std::future res(task.get_future());work_queue.push_back(std::move(task));return res;}// rest as before
};
packaged_task的future返回值作为等待任务
9.3
#include
#include
#include template
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length=std::distance(first,last);if(!length)return init;unsigned long const block_size=25;unsigned long const num_blocks=(length+block_size-1)/block_size;std::vector > futures(num_blocks-1);thread_pool pool;Iterator block_start=first;for(unsigned long i=0;i<(num_threads-1);++i){Iterator block_end=block_start;std::advance(block_end,block_size);futures[i]=pool.submit(accumulate_block());block_start=block_end;}T last_result=accumulate_block()(block_start,last);T result=init;for(unsigned long i=0;i<(num_blocks-1);++i){result+=futures[i].get();}result += last_result;return result;
}
template
struct sorter // 1
{thread_pool pool; // 2std::list do_sort(std::list& chunk_data){if(chunk_data.empty()){return chunk_data;}std::list result;result.splice(result.begin(),chunk_data,chunk_data.begin());T const& partition_val=*result.begin();typename std::list::iterator divide_point=std::partition(chunk_data.begin(),chunk_data.end(),[&](T const& val){return val new_lower_chunk;new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);std::future > new_lower= // 3pool.submit(std::bind(&sorter::do_sort,this, std::move(new_lower_chunk)));std::list new_higher(do_sort(chunk_data));result.splice(result.end(),new_higher);while(!new_lower.wait_for(std::chrono::seconds(0)) == std::future_status::timeout){pool.run_pending_task(); // 4}result.splice(result.begin(),new_lower.get());return result;}
};
9.6
为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线
程上的任务队列没有任务时,去全局的任务列表中取任务。下面列表中的实现,使用了一个thread_local变量,来
保证每个线程都拥有自己的任务列表(如全局列表那样)。
class thread_pool
{thread_safe_queue pool_work_queue;typedef std::queue local_queue_type;static thread_local std::unique_ptr local_work_queue;void worker_thread(){local_work_queue.reset(new local_queue_type);while(!done){run_pending_task();}}public:templatestd::future::type>submit(FunctionType f){typedef std::result_of::type result_type;std::packaged_task task(f);std::future res(task.get_future());if(local_work_queue){local_work_queue->push(std::move(task));}else{pool_work_queue.push(std::move(task));}return res;}void run_pending_task(){function_wrapper task;if(local_work_queue && !local_work_queue->empty()){task=std::move(local_work_queue->front());local_work_queue->pop();task();}else if(pool_work_queue.try_pop(task)){task();}else{std::this_thread::yield();}}// rest as before
};
从自己的任务队列中取任务,从全局队列中取任务,从其他线程的任务队列中取任务
thread_local和std::vector
#include
#include
#include
class thread_pool
{typedef function_wrapper task_type;std::atomic_bool done;thread_safe_queue pool_work_queue;std::vector > queues;std::vector threads;join_threads joiner;static thread_local work_stealing_queue* local_work_queue;static thread_local unsigned my_index;void worker_thread(unsigned my_index_){my_index=my_index_;local_work_queue=queues[my_index].get();while(!done){run_pending_task();}}bool pop_task_from_local_queue(task_type& task){return local_work_queue && local_work_queue->try_pop(task);}bool pop_task_from_pool_queue(task_type& task){return pool_work_queue.try_pop(task);}bool pop_task_from_other_thread_queue(task_type& task){for(unsigned i=0;iunsigned const index=(my_index+i+1)%queues.size();if(queues[index]->try_steal(task)){return true;}}return false;}public:thread_pool():joiner(threads),done(false){unsigned const thread_count=std::thread::hardware_concurrency();try{for(unsigned i=0;iqueues.push_back(std::unique_ptr(new work_stealing_queue));threads.push_back(std::thread(&thread_pool::worker_thread,this,i));}}catch(...){done=true;throw;}}~thread_pool(){done=true;}templateusing task_handle=std::unique_future;templatetask_handle::type> submit(FunctionType f){typedef std::result_of::type result_type;std::packaged_task task(f);task_handle res(task.get_future());if(local_work_queue){local_work_queue->push(std::move(task));}else{pool_work_queue.push(std::move(task));}return res;}void run_pending_task(){task_type task;if(pop_task_from_local_queue(task) ||pop_task_from_pool_queue(task) ||pop_task_from_other_thread_queue(task)){task();}else{std::this_thread::yield();}}
};
清单9.9
class interrupt_flag
{
public:void set();bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag;//1class interruptible_thread
{std::thread internal_thread;interrupt_flag* flag;
public:templateinterruptible_thread(FunctionType f){std::promise p;//2internal_thread=std::thread([f,&p]{//3p.set_value(&this_thread_interrupt_flag);f();//4});flag=p.get_future().get();//5}void interrupt(){if(flag){flag->set();//6}}
};
提供函数f是包装了一个Lambda函数③,线程将会持有f副本和本地承诺值变量§的引用②。新线程中,Lambda函数设置承诺值变量的值到this_thread_interrupt_flag(在thread_local①中声明)的地址中,为的是让线程能够调用提供函数的副本④。调用线程会等待与其期望值相关的承诺值就绪,并且将结果存入到flag成员变量中⑤。注意,即使Lambda函数在新线程上执行,对本地变量p进行悬空引用都没有问题,因为在新线程返回之前,interruptible_thread构造函数会等待变量p,直到变量p不被引用。实现没有考虑汇入线程或分离线程,所以需要flag变量在线程退出或分离前已经声明,这样就能避免悬空问题。interrupt()函数相对简单:需要线程去做中断时,需要合法指针作为中断标志,所以可以对标志进行设置⑥。
void interruption_point()
{if(this_thread_interrupt_flag.is_set()){throw thread_interrupted();}
}void foo()
{while(!done){interruption_point();process_next_item();}
}
最好是在线程等待或阻塞的时候中断线程,因为这时的线程不能运行,也就不能调用interruption_point()函数!线程等待时,什么方式才能去中断线程呢
简而言之依然是loop中执行函数,每次检查一个标志位判断是否终端,如果终端结束执行。
使用C++17新特性并行算法
class X{mutable std::mutex m;int data;
public:X():data(0){}int get_value() const{std::lock_guard guard(m);return data;}void increment(){std::lock_guard guard(m);++data;}
};void increment_all(std::vector& v){std::for_each(std::execution::par,v.begin(),v.end(),[](X& x){x.increment();});
}
std::execution::par 的执行策略,表示允许使用多个线程调用此并行算法。这是一种权限,而不是一个申请——如果需要,这个算法依旧可以以串行的方式运行。通过指定执行策略,算法的需求复杂性已经发生变化,并且要比串行版的算法要求要宽松。因为并行算法要利用系统的并行性,从而算法通常会做更多的工作——如果将任务划分在100个处理上进行,即便是总任务量是原先的的2倍,那仍旧能获得50倍的加速。
三个标注执行策略:
std::execution::sequenced_policy //顺序策略
std::execution::parallel_policy //并行策略
std::execution::parallel_unsequenced_policy //并行不排序策略
定义在 头文件中。 这个头文件中也定义了三个相关的策略对象可以传递到算法中
std::execution::seq
std::execution::par
std::execution::par_unseq
算法复杂化
抛出异常时的行为
算法执行的位置、 方式和时间
#include
#include
#include
#include
#include
struct log_info {std::string page;time_t visit_time;std::string browser;// any other fields
};extern log_info parse_log_line(std::string const &line);using visit_map_type= std::unordered_map;visit_map_type count_visits_per_page(std::vector const &log_lines) {struct combine_visits {visit_map_type operator()(visit_map_type lhs, visit_map_type rhs) const {if(lhs.size() < rhs.size())std::swap(lhs, rhs);for(auto const &entry : rhs) {lhs[entry.first]+= entry.second;}return lhs;}visit_map_type operator()(log_info log, visit_map_type map) const {++map[log.page];return map;}visit_map_type operator()(visit_map_type map, log_info log) const {++map[log.page];return map;}visit_map_type operator()(log_info log1, log_info log2) const {visit_map_type map;++map[log1.page];++map[log2.page];return map;}};return std::transform_reduce(std::execution::par, log_lines.begin(), log_lines.end(),visit_map_type(), combine_visits(), parse_log_line);
}
// T transform_reduce( ExecutionPolicy&& policy,
// ForwardIt1 first1, ForwardIt1 last1,
// ForwardIt2 first2,
// T init,
// BinaryReductionOp reduce,
// BinaryTransformOp transform );
1 不必要的阻塞 2 条件竞争
死锁 | 活锁 | I/O阻塞或外部输入
数据竞争 | 破坏不变量 | 生命周期问题
并发访问时, 哪些数据需要保护?
如何确定访问数据受到了保护?
是否会有多个线程同时访问这段代码?
这个线程获取了哪个互斥量?
其他线程可能获取哪些互斥量?
两个线程间的操作是否有依赖关系? 如何满足这种关系?
这个线程加载的数据还是合法数据吗? 数据是否被其他线程修改过?
当假设其他线程可以对数据进行修改, 这将意味着什么? 并且, 怎么确保这样的事情不会发生?
一种方式是使用一组 std::promise 来表示就绪状态。每个线程使用承诺值来表示是否准备好,然后让 std::promise 等待(复制)一个 std::shared_future ; 主线程会等待每个线程上的承诺值设置后,才按下“开始”键。这就能保证每个线程能够同时开始,并且在准备代码执行完成后,并发代码就可以开始执行了;任何线程的特定设置都需要在设置承诺值前完成。 最终,主线程会等待所有线程完成,并且检查其最终状态。还需要格外关心异常,所有线程在准备好的情况下,再按下“开始”键;否则,未准备好的线程就不会运行。
void test_concurrent_push_and_pop_on_empty_queue()
{threadsafe_queue q;std::promise go,push_ready,pop_ready;std::shared_future ready(go.get_future());std::future push_done;std::future pop_done;try{push_done=std::async(std::launch::async,[&q,ready,&push_ready](){push_ready.set_value();ready.wait();q.push(42);});pop_done=std::async(std::launch::async,[&q,ready,&pop_ready](){pop_ready.set_value();ready.wait();return q.pop();});push_ready.get_future().wait();pop_ready.get_future().wait();go.set_value();push_done.get();assert(pop_done.get()==42);assert(q.empty());}catch(...){go.set_value();throw;}
}