• 认真地记录技术中遇到的坑!

C++11 第三节 同步并发操作

C/C++ Moxun 10个月前 (03-07) 284次浏览 0个评论

同步:即指多个线程之间需要按照某种特定顺序来调用才能完成一个完整的任务。C++11线程中提供了条件变量(condition variables)和期值(future)为形式的工具来处理这个问题。

3.1等待事件或其他条件

处理同步的三种思路:

假设线程B的执行需要线程A的运行结果
– A.设置一个共享数据,依次来标识线程A的运行状态,当线程A完成时它将共享数据更新为约定好的值。线程B一直去检查共享区的数据。这种方式实现的同步带来的问题是,造成CPU资源的浪费,因为线程B不同的加锁解锁,占用了其它线程的可用资源。
– B.使用std::this_thread::sleep_for()函数,让等待中的线程在检查之间休眠一会儿。例如:

bool flag;
std::mutex m;
void wait_for_flag()
    {
     std::unique_lock<std::mutex>   lk(m);
    while(!flag)
        {
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock();
    }
}

这种方式存在的问题是:很难设置恰当的休眠时间,休眠时间过短则还会浪费CPU资源,休眠时间过长则导致等待的线程已经完成,但是它还在休眠造成延时。
– C.用C++提供的工具来等待事件。等待由另一个线程触发一个事件的最基本机制是条件变量(condition variable),从概念上说,条件变量与某些事件或其他条件相关,并且一个或多个线程可以等待该条件被满足,当某个线程已经确定条件得到满足,它就可以通知一个或多个正在条件变量上进行等待的线程,以便唤醒它们使得它们继续执行。

3.1.1 用条件变量等待条件

C++库提供了两个条件变量的实现:std::condition_variable和std::consition_variable::any,这两个实现都在库的头文件中声明。两者都适合和互斥元一起工作,以便提供恰当的同步;前者仅限于和std::mutex一起工作,而后者则可以与符合称为类似互斥元的最低标准的任何东西一起工作,所以以_any为后缀。首选condition_variable。

使用condition_variable实现线程同步的示例:

std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread()
    {
    while(more_data_to_prepare())
        {
        data_chunk const data = prepare_data();
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_con.notify_one(); //通知等待中的线程
    }
}

void data_processing_thread()
    {
    while(true)
        {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[] return !data_queue.empty()); //传入锁以及表示正在等待的条件的lambda函数,检查条件,如果条件不满足,wait()解锁互斥元,并将该线程置于阻塞或者等待状态。当来自数据准备线程中对notify_one()的调用通知条件变量时,线程从睡眠状态苏醒(解除其阻塞),重新获得互斥元上的锁,并再次检查条件,如果条件已经满足,就从wait()返回值,互斥元仍然被锁定。如果不满足条件,该线程解锁互斥元,并恢复等待,所以才使用unique_lock而不是std::lock_guard-等待中的线程在等待期间必须解锁互斥元,并在这之后重新将其锁定
        data_chunk data = data.queue.front();
        data_queue.pop();
        lk.unlock();
        process(data);
        if(is_last_chunk(data))
            break;
    }
}

wait()中也可以直接传入函数,在wait()的调用中,条件变量可能对所提供的变量检查任意多次,然而它总是在互斥元被锁定的情况下这样做,并且(当且仅当)用来测试的条件返回true,它才会立即返回,当等待线程重新获取互斥元并检车条件时,如果它并非直接响应另一个线程的通知,这就是伪唤醒。

notify_all(),将导致所有当前执行这wait()的线程检查其等待中的条件。

3.2使用future等待一次性事件

如果一个线程需要等待特定性的一次性事件,那么它就会获取一个furture来代表这一事件。然后,该线程就可以周期性地在这个future上等待一小段时间以检查事件是否发生(检查出发告示板),而在检查间隙执行其它的任务。另外它还可以去做另外一个任务,直到其所需的事件已经发生才继续进行,随后就等待future变为就绪。future可能会有与之相关的数据,或可能没有。一旦事件已经发生,即future已变为就绪,future就无法复位。

C++标准库中有两类future,一类是唯一future(std::future<>)和共享future(std::shared_future<>),std::future的实例是仅有的一个指向其关联事件的实例,而多个std::shared_future的实例可以指向同一事件,对后者而言,所有实例将同时变成就绪,并且它们都可以访问所有与该事件相关联的数据。模板类型就是future实例关联数据的类型。而std::future则是无关联数据,future仅用于线程间通信,它本身不提供同步机制。如果多个线程需要访问同一个future对象,它们必须通过互斥元或其它同步机制来保护访问。但是多个线程可以分别访问自己的shared_future<>的副本而无需进一步的同步,即使它们都指向一个异步结果。

3.2.1 从后台任务返回值

从线程中传会结果,要用到函数模板std::async<>它也声明于头文件中,在不需要立刻得到记过的时候,可以使用std::async来启动一个异步任务,std::async返回一个std::future对象而不是给你一个std::thread,让你在上面等待,std::future对象最终将持有函数的返回值,当你需要这个值的时候,只要在future上调用get(),线程就会阻塞直到future就绪,然后返回该值。示例:

#include <future>
#include <iostream>
#include <thread>
#include "windows.h"

using namespace std;

int getTest()
{
    Sleep(10000);
    return 10;
}

int main(int argc, char *argv[])
{
    std::future<int> the_answer = std::async(std::launch::async,getTest);
    cout << "已执行async!" << endl;
    Sleep(10000);
    auto beg = clock();
    int iTest = the_answer.get();
    auto end = clock();
    cout << "得到答案:" << iTest << "!消耗时间是:" << end - beg << endl;
    getchar();
    return EXIT_SUCCESS;
}

上述代码执行结果如图所示:
C++11 第三节 同步并发操作

std::async允许将额外的参数添加到调用中,来将附加参数传递给函数,这和std::thread是同样的方式。如果第一个参数是指向成员函数的指针,第二个参数则提供了用来应用该成员函数的对象(直接的,或通过指针,或封装在ref中),其余的参数则作为参数传递给该成员函数。否则,第二个以及后续的参数将作为参数,传递给第一个参数所指定的函数或者可调用对象。
传参示例:

#incude <string>
#include <future>

struct X
    {
    void foo(int,std::string const&);
    std::string bar(std::string const&);
};
X x;
auto f1 = std::async(&X::foo,&x,42"Hello"); //调用p->foo(42,"Hello"),其中p是&X
auto f2 = std::async(&X::bar,x,"goodbye"); //调用tmpx.bar("goodbye"),其中tmpx是x的副本

struct Y
    {
    double operator()(double);
}
Y y;
auto f3 = std::async(Y(),3.141); //调用tmpy(3.141),其中tmpy是从Y()移动构造来的
auto f4 = std::async(std::ref(y),2.718);  //调用y(2.718)

X baz(X&);
std::sync(baz,std::ref(x));  //调用baz(x);

class move_only
    {
    public:
    move_only();
    move_only(move_only &&);
    move_only(move_only const &) = delete;
    move_only& operator = (move_only &&);
    move_only& operator = (move_only const &) = delete;
    void operator()();
};

auto f5 = std::async(move_only());  //调用tmp(),其中tmp是从std::move(move_only())构造的

默认情况下,std::async是否启动一个新线程,或者在等待future时任务是否同步运行都取决于具体实现方式。但你也可以在函数调用之前使用一个额外的参数来指定究竟使用何种方式,这个参数的类型是std::launch,可以是std::launch::deferred,以表明该函数调用将会延迟,直到在future上调用wait()或者get()为止(其实就是说不会立即创建线程来执行函数),或者是std::launch::asyc,这表明会立即创建线程来运行函数,又或者是std::launch::async|std::launch::deferred,以表明可以由具体实现来选择,最后一个选项是默认的,如果函数调用有延迟,它可能永远都不会实际运行。这个具体实现指的是async函数本身的实现方式,永远不会执行是指,在函数是按照延迟调用模式下进行的,那么没有调用get的话就不会执行。标准并没有规定async具体是采用哪种方式启动线程,所以在实际使用时最好先测试一下,实测,VS2015是异步的。

3.2.2将任务与future关联

std::packaged_task<>将一个future绑定到一个函数或可调用对象上。当std::packaged_task<>对象被调用时,它就调用相关联的函数或可调用对象,并且让future就绪,将返回值作为关联数据存储。

这个机制的优势在于,如果一个大型操作可以被划分为若干子操作,那么每一个子任务都可以被封装在std::packaged_task<>实例中,然后将该实例传给任务调度器或线程池,这样调度程序仅需处理std::packaged_task<>实例,而非各个函数。

std::packaged_task<> 类模板的模板参数为函数签名,比如void()表示无参数无返回值的函数,或是像int(std::string & ,doube *) 表示可接受对std::string的非const引用和指向double的指针,并返回int的函数。当构造std::packaged_task实例时,必须传入一个函数或者可调用对象,它可以接受指定的参数并且返回指定的返回类型。类型无需严格匹配,但是要相容。

指定的函数签名的返回类型确定了从get_future()成员函数返回的std::future<>的类型,而函数签名的参数列表用来指定封装任务的函数调用运算符的签名。用法示例及其讲解参见P75。


喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址