多线程并发
总览
- 创建线程
- 使用互斥量
- 异步线程
- 原子类型
- 生产者消费者模型
- 线程池
创建线程
#include <iostream>
#include <thread>
#include <mutex>
#include <functional>
void task_func(int &n) {
std::this_thread::sleep_for(std::chrono::microseconds(n));
printf("in task_func, thread id: %d\n", std::this_thread::get_id());
n = 30;
}
int main() {
int n = 50;
auto func = std::bind(task_func, std::ref(n));
//std::thread th(func);
std::thread th(func, &n); //也行
printf("in main, son thread id: %d\n", th.get_id());
printf("support num: %d\n", th.hardware_concurrency());
printf("cur n: %d\n", n);
th.join();
return 0;
}
互斥量
#include <iostream>
#include <thread>
#include <mutex>
#include <functional>
int globalVal = 0;
std::mutex lock;
void task_func() {
for (int i = 0; i < 100000; ++i) {
lock.lock();
globalVal++;
lock.unlock();
}
}
int main() {
std::thread t1(task_func);
std::thread t2(task_func);
t1.join();
t2.join();
std::cout << globalVal << std::endl;
return 0;
}
运行结果符合预期, 这就解决了吗? 问题一: 未能正常解锁. 真实场景下, lock和unlock之间还有大量逻辑, 出现异常后, unlock是不会执行的, 因此可能造成死锁. 该问题可以通过lock_guard/ unique_lock
解决, 利用C++的RAII机制, 中途抛出异常, 会调用析构函数.
#include<iostream>
using namespace std;
class A {
public:
A() {
printf("initialize\n");
}
~A() {
printf("destroy\n");
}
};
int exception_maker(int) {
try {
throw -1;
}
catch (int) {
throw -1;
}
}
int main(void) {
A obj;
try{
exception_maker(0);
}catch(int){
printf("exception occurs\n");
}
return 0;
}
-------------------------------------
initialize
exception occurs
destroy
另一种问题: 上锁顺序的不同造成死锁
void task_func1() {
for (int i = 0; i < 100000; ++i) {
lock1.lock();
lock2.lock();
globalVal++;
lock2.unlock();
lock1.unlock();
}
}
void task_func2() {
for (int i = 0; i < 100000; ++i) {
lock2.lock();
lock1.lock();
globalVal++;
lock1.unlock();
lock2.unlock();
}
}
第二个问题解决: std::lock(lock1, lock2); 顺序无所谓
条件变量
#include <iostream>
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>
std::mutex mtx;
std::queue<int> queue;
std::condition_variable cv;
void task_producer() {
int i = 0;
while (true) {
std::unique_lock<std::mutex> lock(mtx);
queue.push(i);
std::cout << "put data: " << i << std::endl;
lock.unlock();
cv.notify_all();
if (i < 1000000) {
i++;
} else {
i = 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void task_consumer1() {
int data;
while (true) {
std::unique_lock<std::mutex> lock(mtx);
while(queue.empty()) {
cv.wait(lock);
}
data = queue.front();
queue.pop();
std::cout << "Get data: " << data << std::endl;
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds (50));
}
}
void task_consumer2() {
int data;
while (true) {
std::unique_lock<std::mutex> lock(mtx);
while(queue.empty()) {
cv.wait(lock);
// lock.unlock(); cv.wait();
}
data = queue.front();
queue.pop();
std::cout << "Get data: " << data << std::endl;
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds (50));
}
}
int main() {
std::thread t1(task_producer);
std::thread t2(task_consumer1);
std::thread t3(task_consumer2);
t1.join();
t2.join();
t3.join();
return 0;
}
promise和future
解决线程不知何时返回问题
先看一个简单的案例: 子线程计算a, b的平方和, 返回给主线程定义的变量ret. 由于主线程和子线程都可能操作ret, 故上锁.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
void task(int a, int b, int &ret){
// 模拟准备工作
std::this_thread::sleep_for(std::chrono::milliseconds (100));
std::unique_lock<std::mutex> lock(mtx);
ret = a * a + b * b;
lock.unlock();
cv.notify_one();
// 模拟之后处理的任务
std::this_thread::sleep_for(std::chrono::milliseconds (2000));
}
int main() {
int ret = 0;
std::cout << "ret before:" << ret << std::endl;
std::thread t(task,1,2,std::ref(ret));
std::unique_lock<std::mutex>lock(mtx);
cv.wait(lock);
std::cout << "current ret is:" << ret << std::endl;
t.join();
return 0;
}
为何以这么麻烦, 又加锁又用条件变量? 因为主线程不知道子线程什么时候会将计算结果给ret, 故需要条件变量通知主线程. 可如此简单的任务, 实现也太麻烦了, 解决方法: promise和future结合
#include <iostream>
#include <thread>
#include <future>
void task(int a, int b, std::promise<int> &p) {
// 模拟准备工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
p.set_value(a * a + b * b);
// 模拟之后处理的任务
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
int main() {
std::promise<int> p;
std::future<int> f = p.get_future(); // 联系起来
std::cout << f.valid() << std::endl;
std::thread t(task, 1, 2, std::ref(p));
std::cout << f.get()<<std::endl; //阻塞,直到set_value被调用,只能get一次
std::cout << f.valid() << std::endl;
t.join();
return 0;
}
解决入参滞后传递, 线程先创建的问题
线程有个参数需要通过计算, 之后才能得到, 但想先创建线程, 在线程里等待参数传递过来, 怎么办? 还是借助promise和future
void task(int a, std::future<int>& b, std::promise<int> &p) {
// 模拟准备工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(b.valid()){
int tmp = b.get();
p.set_value(a * a + tmp * tmp);
}
// 模拟之后处理的任务
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
int main() {
std::promise<int> pout;
std::future<int> fout = pout.get_future(); // 联系起来
std::promise<int> pin;
std::future<int> fin = pin.get_future(); // 联系起来
std::thread t(task,1,std::ref(fin),std::ref(pout));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pin.set_value(3); // 给入参赋值
std::cout << "ret value:" << fout.get() << std::endl; // 阻塞等待结果
t.join();
return 0;
}
----------------------------
ret value:10
一个滞后入参需传递给多个线程-- shared_future
注意promise的set_value也只能调用一次, 否则抛异常.
#include <iostream>
#include <thread>
#include <future>
void task(int a, std::shared_future<int> &s_f, std::promise<int> &p) {
// 模拟准备工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int tmp = s_f.get();
p.set_value(a * a + tmp * tmp);
// 模拟之后处理的任务
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
int main() {
std::promise<int> pout1;
std::future<int> fout1 = pout1.get_future(); // 联系起来
std::promise<int> pout2;
std::future<int> fout2 = pout2.get_future(); // 联系起来
std::promise<int> pout3;
std::future<int> fout3 = pout3.get_future(); // 联系起来
std::promise<int> pin;
std::future<int> fin = pin.get_future();
std::shared_future<int> s_f = fin.share(); // 联系起来
std::thread t1(task, 1, std::ref(s_f), std::ref(pout1));
std::thread t2(task, 1, std::ref(s_f), std::ref(pout2));
std::thread t3(task, 1, std::ref(s_f), std::ref(pout3));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pin.set_value(3);
std::cout << "ret value:" << fout1.get() + fout2.get() + fout3.get() << std::endl;
t1.join();
t2.join();
t3.join();
return 0;
}
async创建异步任务
#include <iostream>
#include <thread>
#include <future>
int task(int a, int b) {
std::cout << std::this_thread::get_id() << std::endl;
return a * a + b * b;
}
int main() {
std::cout << std::this_thread::get_id() << std::endl;
// 必须放在新线程
// std::future<int> f = std::async(std::launch::async, task, 3, 4);
// 延迟调用, 不会在线程中, 调用get时才执行
// std::future<int> f = std::async(std::launch::deferred, task, 3, 4);
// 默认等价于 std::launch::async | std::launch::deferred
//std::future<int> f = std::async(std::launch::async | std::launch::deferred, task, 3, 4);
std::future<int> f = std::async(task, 3, 4);
std::cout << f.get() << std::endl;
return 0;
}
packaged_task
作用: 任务打包. 主要就是和future联系起来.
#include <iostream>
#include <thread>
#include <future>
#include <functional>
int task(int a, int b) {
std::cout << "son thread: " << std::this_thread::get_id() << std::endl;
return a * a + b * b;
}
int main() {
std::cout << "main thread: " << std::this_thread::get_id() << std::endl;
// 打包任务
std::packaged_task<int()> pt(std::bind(task, 3, 4));
std::future<int> fu = pt.get_future();
pt();
std::cout << fu.get() << std::endl;
// 重置
pt.reset();
fu = pt.get_future();
std::thread t(std::move(pt));
// pt();
std::cout << fu.get() << std::endl;
t.join();
std::packaged_task<int()> pt2(std::bind(task, 3, 4));
fu = pt2.get_future();
std::async(std::move(pt2));
std::cout << fu.get() << std::endl;
return 0;
}
标签:11,std,include,int,lock,C++,task,future,多线程
From: https://www.cnblogs.com/shmilyt/p/17054572.html