• 认真地记录技术中遇到的坑!

C++11 线程库 第二节 在线程间共享数据

C/C++ Moxun 10个月前 (03-05) 251次浏览 0个评论

2.1 线程之间共享数据的问题

   实际上这就是因为竞争引起的同步和互斥的问题。
   - 从总体上看,线程之间的问题都是由对数据的修改造成的。如果所有线程对共享数据的使用方式都是只读的,那这是没有任何问题的,因为一个线程所读取的数据不会受到另一个线程是否正在读取相同的数据而影响。但是如果有线程需要修改共享数据,那么此时,你将需要格外小心了。
   一个帮助程序员推导代码的概念——不变量:对于特定的数据结构总是为真的语句。这些不变量在更新中经常被打破。
   修改线程之间的最简单的潜在问题就是破坏不变量,书上举的是链表删除的例子。P32

2.1.1竞争条件

     在并发环境中,竞争条件就是结果取决于两个或更多线程上的操作执行相对顺序的一切事物,线程竞争各自的操作。在说并发时,竞争条件通常指破坏了不变量时产生的竞争
     C++标准库定义了数据数据竞争,它表示因对单个对象的并发修改而产生的特定类型的竞争条件。数据竞争会导致未定义行为。有问题的竞争通常发生在完成操作需要修改两个或者多个不同的数据块的地方,例如双向链表的节点删除问题。

2.2.2避免有问题的竞争条件

   A.用保护机制来封装你的数据结构,以确保只有实际执行修改的线程能够在不变量损坏的地方看到中间数据。从其它角度看,要么这种修改还没开始,要么已经完成。
   B.另一个选择是修改数据结构的设计及其不变量,从而令修改作为一系列不可分割的变更来完成,每个修改均保留其不变量,这 通常称为无锁编程。
   C.将对数据结构的更新作为一个事务处理,就如同在一个事务内完成数据库的更新 一样,所需的一系列数据修改和读取被存储在一个事务日志中,然后在单个步骤内提交,如果该提交因为数据结构已被另一个线程修改而无法进行,该事务重启,这叫做软事务内存(STM),C++不直接支持STM
   C++标准提供的保护共享数据的最基本机制是互斥元。

2.2用互斥元保护共享数据

   线程访问共享数据带来的一个问题是互斥,简单的说互斥就是有多个线程需要对共享数据进行读写操作,此时我们要求在同一时刻只能有一个线程访问共享数据,否则会造成读写冲突。
   一个解决互斥问题的方案是,将访问共享数据的代码块标识为互斥的,这意味着如果任何线程运行了访问共享数据的代码块,则其它线程不能在运行访问共享数据的代码块,必须等待当前正在执行的线程运行完成。这是使用互斥元的操作原语所能得到的。
   具体的操作步骤是,在访问共享数据时,先加锁(lock)互斥元,当访问共享数据结束后再解锁互斥元(unlock)。
   注:感觉上,互斥元就是标识一段代码是互斥的。
   注:这里有一个非常重要的前提条件,有助于理解后面将要讲解的关于锁的问题。线程库会确保一旦一个线程已经锁定某个互斥元,所有其它试图锁定相同互斥元的线程必须等待,直到成功锁定了该互斥元的代码解锁此互斥元。

2.2.1 使用C++中的互斥元

    在C++中通过构造std::mutex的实例来创建互斥元,调用成员函数lock来锁定它,调用成员函数unlock来对它解锁。但是如果直接调用成员函数意味着你必须格外小心,需要在代码离开的每条路径上调用unlock,包括catch子块。
    还记得RAII和在前一节中我们为了保证在发生异常时线程对象被结合或者分离所编写的thread_guard类和scoped_thread类吗?我们可以采用同样的思想,利用局部对象管理资源。C++标准库为我们提供了这样lock_guard类模板类,它在构造时对互斥元调用lock,在析构时调用unlock,这样就实现了资源的自动管理。mutex和lock_guard都在头文件中。
    ```cpp

#include
#include
#include

using namespace std;

list my_list;
mutex my_mutex; //申明一个互斥元

void add_to_list(int new_value)
{
lock_guard my_guard_mutex(my_mutex); //使用lock_guard对下面的代码段加锁,当离开作用域时会在析构函数中自动调用unlock()
my_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
std::lock_guard my_guard_mutex(my_mutex); //对同一个互斥元执行加锁,这意味着这个代码片段和上一次加锁的代码片段是互斥的
return find(my_list.begin(), my_list.end(), value_to_find) != my_list.end();
}

/在这个例子中,我们使用全局对象把互斥元和共享数据组织在了一起
*但是更通用的做法是在一个类中将互斥元和受保护的数据组织在一起,这样清楚的标明了互斥元和数据之间的关系,还可以对此封装函数以及强制保护
*另外通常把受保护的数据和互斥元的访问权限设为private
/

/此外,假设现在上面的两个函数是类的成员函数,如果其中一个返回了受保护数据的引用或者指针,那么即使所有的成员函数都以良好的顺序锁定了互斥元也是没有用的
*因为此时你已经在保护中捅了一个窟窿,能够访问(并且能修改)该指针或引用的任意代码段都可以访问这个共享数据而无需锁定该互斥元
/

      检查成员函数是否返回受保护数据的引用或者指针这是容易的,但是有些情况下,用户函数(例如你的成员函数传参时传递了一个函数指针进来)会返回受保护数据的指针或者引用,此时也会破坏互斥元对受保护数据的保护。

      一个可以参照的原则是:不要把受保护数据的指针或者引用传递到锁的范围之外,无论是通过从函数中返回它们、将其存放在外部可见的内存中,还是作为参数传递给用户提供的函数。



##2.2.2接口中固有的竞争条件
        分析这个问题时要牢记:所谓多线程并发指的是在同一时刻有多个线程在同时运行着。
        这里接口之间的竞争条件,是指数据接口在没有使用互斥元保护的情况。具体分析还是看书吧!P40-P44
        这几页中还介绍了一个线程安全stack的实现,非常值得细细阅读。

## 2.2.3 死锁:问题和解决方案
        死锁的产生:线程之间相互等待。这是在需要锁定两个或者更多互斥元以执行操作时的最大问题。避免死锁的一种思路:始终用相同的顺序锁定这两个互斥元。另外,C++标准库中的std::lock可以帮助解决死锁——std::lock可以同时锁定两个或者等多的互斥元,而没有死锁的风险。
        使用示例:
        ```cpp
class some_big_object;
void swap(some_big_object& lhs,some_big_object &rhs);
class X
    {
    private:
         some_big_object some_detail;
         std::mutex m;
    public:
          X(some_big_object const &sd): some_detail(sd){}
          friend void swap(X&lhs, X& rhs)
              {
              if(&lhs == &rhs)  //检查参数确保它们是不同的实例
                  return;
              std::lock(lhs.m,rhs.m);//调用lock锁定这两个实例的互斥元,因为试图在已经锁定了的std::mutex上获取锁是未定义行为
              std::lock_guard  lock_a(lhs.m,std::adopt_lock); //std::adopt_lock的作用是告诉std::lock_guard对象该互斥元已经被锁定,并且它只应沿用互斥元上已有锁的所有权,而不是试图在构造函数中锁定互斥元。它的目的是确保受保护的操作在引发异常的情况下函数退出时也能正确的解锁互斥元
              std::lock_guard  lock_b(rhs.m,std::adopt_lock);
              swap(lhs.some_detail,rhs.some_detail); 
          }
}
 std::lock在锁定lhs.m或者rhs.m的时候都有可能引发异常,这个异常会被传播出std::lock,如果std::lock已经成功的在一个互斥元上获取了锁,当它试图在另一个互斥元上获取锁的时候就会引发异常,前一个互斥元就会自动释放。

2.2.4 避免死锁的进一步处理

  死锁并不一定是有相互等待锁而引发的,例如,令每个线程在thread对象上为令一个线程调用join(),这就会引发死锁,因为两个线程都无法取得进展,因为正在等待着另一个线程完成。避免死锁的准则大致可以归纳如下:如果有另外一个线程在等你,那么你就别等他。
  A.避免嵌套锁
  在你已经持有一个锁的时候,就不要再获取锁了(加锁)。获取多个锁,为了避免死锁,可以使用std::lock。如果一定要使用嵌套锁,那么必须确保证加锁和释放锁的顺序是一样的,比如先加的锁后释放,例如:lock1,lock2,unlock2,unlock1,那么在其它地方就不允许出现lock1,lock2,unlock1,unlock2.
  B.在持有锁时,避免调用用户提供的代码
  因为你不知道用户代码中做了什么,它可能也会获取锁,这样就违背了第一条原则,当然如果有些情况下是无法避免嵌套锁的
  C.以固定的顺序获取锁
  如果你绝对需要获取两个或者更多的锁,并且不能以std::lock的单个操作取得,次优的做法是在每个线程中以相同的顺序获取它们。
  D.使用锁层次
  其实它就是定义了锁定的顺序,但是锁层次可以提供一种方法,来检查在运行时是否遵循了约定。其思路是将应用程序分层,并且确认所有能够在任意给定的层级上被锁定的互斥元。当代码试图锁定一个互斥元时,如果它在较低层已经持有锁,那么就不允许它锁定该互斥元。通过给每一个互斥元分配层号,并记录下每个线程都锁定了哪些互斥元,你就可以在运行时进行检查了。  
  关于层次锁的讲解在书P48,例子非常细致需要仔细研读。
  分层次互斥元的简单实现:
class hierarchical_mutex
{
    std::mutex internal_mutex;
    unsigned long const hierarchy_vaule;
    unsigned long previous_hierarchy_value;
    static thread_local unsigned long this_thread_hierarchy_value;   //用来表示当前线程的层次值,它被初始化long类型的最大值
    void check_for_hierarchy_violation()
    {
        if (this_thread_hierarchy_value <= hierarchy_vaule)
            throw std::logic_error("mutex hierarchy violated");
    }

    void update_hierarchy_value()
    {
        previous_hierarchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_vaule;
    }
public:
    explicit hierarchical_mutex(unsigned long value) :hierarchy_vaule(value), previous_hierarchy_value(0)
    {}
    void lock()
    {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        update_hierarchy_value();
    }

    void unlock()
    {
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();
    }

    bool try_lock()
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        update_hierarchy_value();
        return true;
    }
};

thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);  //初始化当前线程的层次值

关于这个类的具体说明参见书P50

所有循环等待(join)都有可能导致死锁,另外在持有锁时等待另一个线程是一个坏主意,因为线程可能也在等待这个锁。

2.2.5用std::unique_lock灵活锁定

    std::unique_lock实例并不总是拥有与之关联的互斥元,首先,你可以把std::adopt_lock作为第二参数传递给构造函数,以便让锁对象来管理互斥元上的锁,其次,也可以把std::defer_lock作为第二参数传递,来表示该互斥元在构造时应保持未被锁定,这个锁就可以在这之后通过在std::unique_lock对象(不是互斥元)上调用loc(),或是通过将std::unique_lock对象本身传递给std::Lock来获取。std::unique_lock占用更多的空间使用起来比lock_guard略慢。
    使用示例```cpp

class some_big_object;
void swap(some_big_object &lhs,some_big_object &rhs);
class X{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const &sd):some_detail(sd){}
friend void swap(X& lhs ,X& rhs)
if(&lhs == &rhs)
return;
std::unique_lock lock_a(lhs.m,std::defer_lock);
std::unique_lock lock_b(rhs.m,std::defer_lock);
swap(lhs.some_detail,rhs.some_detail);
}

unique_lock提供了lock、unlock、try_lock三个成员函数,它们会转发给底层互斥元上同名的函数去做实际工作,并且只是更新在std::unique_lock实例内部的一个标识,来表示该实例当前是否拥有此互斥元,为了确保unlock()在析构函数中被正确调用,这个标识是必须的。如果该实例确实已拥有此互斥元,则析构函数必需调用unlock(),并且,如果该实例没有拥有此互斥元,则析构函数决不能调用unlock()。可以通过调用owns_lock成员函数来查询这个标识。这个标识必须被存储在某个地方,std::unique_lock使用于延迟锁定、锁的所有权需要从一个作用域转移到另一个作用域等。
## 2.2.6 在作用域之间转移锁的所有权
          使用示例:
          ```cpp
std::unique_lock get_lock()
    {
    extern std::mutex some_mutex;
    std::unique_lock lk(some_mutex);
    prepare_data();
    return lk;
}

void process_data()
    {
    std::unique_lock lk(get_lock());
    do_something();
}
//unique_lock支持与互斥元一样的用来锁定和解锁的基本成员函数集合,以便在明显不需要锁的时候解锁

mutex :互斥元(互斥量),作用,互斥的占有一个变量,在一段时间内仅能有一个线程访问。

2.2.7 锁粒度

   锁粒度:用来描述单个锁占保护的数据量。细粒度锁保护着较少的数据,粗粒度锁保护着较多的数据。选择一个足够粗粒度的锁来保护确实需要保护的数据,另外要保证只有在真正需要锁的时候才持有锁(别忘了,某个线程持有这个锁的时候,可能其它线程处在等待状态)。如果可能,仅在实际访问共享数据的时候锁定互斥元,尝试在锁的外面做任意的数据处理,另外在持有锁时,尽可能不要做确实很耗时的操作,例如文件I/O,除非你是想保护对文件的访问,因为这会延迟其它线程。
   对于这种应用场景,我们可以使用unique_lock来解决问题,示例:```cpp

void get_and_process_data()
{
std::unique_lock my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock();
result_type result = process(data_to_process);
my_lock.lock();
write_result(data_to_process,result);
}

# 2.3 用于共享数据保护的替代工具
        一个极端的情况,共享数据只在初始化的时候需要并发访问的保护(这个例子可以参考单例模型的两种实现),但那之后不需要显式同步。

## 2.3.1在初始化时保护共享数据
       延迟初始化:指这样的场景,构造起来非常昂贵,只有在实际使用的时候才会去构造它。那么像这类资源,在每次操作之前就需要先检查它是否初始化过,没有就在使用之前初始化它。往往这些资源除去初始化操作外只有初始化会在并发中产生竞争。


    '''cpp
    //使用互斥元进行线程安全的延迟初始化
    std::shared_ptr<some_resource>  resource_ptr;
    std::mutex resource_mutex;
    void foo()
    {
         std::unique_lock<std::mutex>   lk(resource_ptr);
         if(!resource_ptr)
         {
               resource_ptr.reset(new  some_resource);
         }
         lk.unlock();
         resource_ptr->do_something();
    }
    '''
这段代码的运行本身从线程安全的角度来讲是没有问题的,假设每个线程的都会使用这个只初始化一次的资源,那么每个线程在都需要锁,这样就造成了大量的时间消耗。因此,伟大的前辈想到了二次检查锁定的办法,示例如下:


    '''cpp

    void undefined_behaviour_with_double_checked_locking()
    {
           if(!resource_ptr)
           {
                  std::lock_guard<std::mutex>   lk(resource_mutex);
                  if(!resource_ptr)
                  {
                         resource_ptr.reset(new some_resource);
                  }
            }
            resource_ptr->do_something();
    }
    '''
    但是这段代码在运行时并非毫无问题,关于它可能引发的问题详解参见:http://blog.csdn.net/nodeathphoenix/article/details/51657973
    简单来说,加锁之后的代码不是原子操作,new 就包含了几个步骤1.分配内存 2.构造对象,存入已分配的内存3.使指针指向这块内存。但是在硬件指令优化的时候2和3的顺序可能发生变化,再加上线程的调度,那么某个线程访问到的指针可能并未指向实际的对象,这样就会引发错误。
    为了解决这个问题,C++标准委员会提供了std::once_flag和std::call_once来处理这种情况,到std::call_once返回时,指针将会被某个线程初始化(完全同步)。std::call_once可以与任意函数或可调用对象合作。使用示例:
    ```cpp
std::shared_ptr<some_resource>  resource_ptr;
std::once_flag  resource_flag;

void init_resource()
    {
    resource_ptr.reset(new some_resource);
}

void foo()
    {
    std::call_once(resource_flag,init_resource);   //初始化会被正好调用一次
    resource_ptr->do_something();
}

使用std::call_once 的线程安全的类成员函数的延迟初始化

class X
    {
    private:
          connection_info  connect_details;
          connection_handle connect;
          std::once_flag connect_init_flag;
          void  open_connection()
              {
                    connection = connection_manager.open(connection_details);
          }
    public:
         X(connection_info const &connection_details_):connection_details(connection_details_){}
    void send_data(data_packet  const& data)
        {
        std::call_once(connection_init_flag,&X::open_connection,this);
        connection.send_data(data);
    }
    data_packet  recevice_data()
        {
        std::call_once(connection_init_flag,&X::open_connection,this);
        return connection.receive_data();
    }
}
 注:1.std::mutex和std::once_flag的实例都不能被复制或移动,所以如果想像类成员一样使用,就必须显式定义这些你所需要的特殊成员函数。
 2.一个在初始化过程中可能会有竞争条件的场景,是将局部变量声明为static的,这种变量的初始化,被定义为在时间控制首次经过其声明时发生。在C++11中,初始化被定义为只发生在一个线程上,并且其它线程不可以继续直到初始化完成。

2.3.2保护很少更新的数据结构

读写互斥元:这个互斥元考虑到了两种用法,由单个“写”线程独占访问或共享,由多个“读”线程并发访问。C++11中为提供支持,但是Boost库中有——boost::shared_mutex。对于更新操作,,std::lock_guard 和std::unique_lock可用于锁定,以取代相应的std::mutex特化。这确保了独占访问,就像std::mutex一样,那些不需要更新数据结构的线程能够转而使用boost::shared_lock来获得共享访问。这与std::unique_lock用起来是相同的,除了多个线程在同一时间、同一boost::share_mutex上可能会具有共享锁。唯一的限制是,如果任意一个线程拥有一个共享锁,试图获取独占锁的线程会被阻塞,直到其它线程全都撤回它们的锁,同样地,如果任意一个线程具有独占锁,其它线程都能不获取共享锁或独占锁,直到一个线程撤回了它的锁。

示例代码如下:“`cpp
#include

#include
#include
#include <boost/thread/shared_mutex.hpp>
class dns_entry;
class dns_cache
{
std::map<std::string,dns_entry> entries;
mutable boost::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const & domain) const
{
boost::shared_lock lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it = entries.find(domain);
return (it== entries.end()) ? dns_entry():it->second;
}

void update_or_add_entry(std::string const &domain,dns_entry const & dns_details)
    {
    std::lock_guard<boost::shared_mutex> lk(entry_mutex);
    entries[domain] = dns_details;
}

}
“`

2.3.3 递归锁

在使用std::mutex的情况下,一个线程视图锁定其已经拥有的互斥元是错误的,并且试图这么做会产生未定义的行为。然而在某些情况下,线程多次重新获取一个互斥元却无需事先释放它是可取的。为了支持这个操作,C++标准库提供了std::recursive_mutex,它就像std::mutex,区别在于你可以在同一个线程中的单个实例上获取多个锁。在互斥元能够被另一个线程锁定之前,你必须释放所有的锁,因此如果你调用lock三次,就要调用三次unlock。正确使用std::lock_guard和std::unique_lock将会为你处理。


喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址