使用条件变量和互斥锁实现信号量
layout: post title: 使用条件变量和互斥锁实现信号量 categories: cpp_concurrency description: C++并发编程简介 keywords: c++, 并发编程,std::condition_variable,std::mutex,条件变量,信号量
- 使用条件变量和互斥锁实现信号量
- keywords: c++, 并发编程,std::condition\_variable,std::mutex,条件变量,信号量
- 信号量实现
- 测试代码
C++11 没有提供信号量,但是可以使用条件变量和互斥锁很容易的实现信号量。信号量是用来在多线程中进行资源同步的。信号量内部维护资源的数量,并且提供2个操作——wait和signal,wait的时候获取资源并减少计数器,signal的时候释放资源并增加计数器。只有当计数器的数目>0的情况下去wait才能够获取到资源。
注意:
使用信号量的一个原则是只有获取资源才wait,释放资源才调用signal!!
信号量实现
#include <condition_variable>
#include <thread>
class semaphore
{
public:
semaphore(int count) :m_counter(count){}
~semaphore(){}
void wait()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_counter--;
// while (m_counter < 0)// 防止假唤醒
// {
// m_cv.wait(lock);
// }
m_cv.wait(lock,[this](){
return this->m_counter >= 0; //
});
}
void signal()
{
std::unique_lock<std::mutex> lock(m_mutex);
if (++m_counter <= 0) // 在执行++之后<=0,表明有其他线程在等待资源,因此执行唤醒
{
m_cv.notify_one();
}
}
int m_counter;
std::condition_variable m_cv;
std::mutex m_mutex;
};
测试代码
下面的示例,我们使用信号量来限制并发数。我们将信号量数目设置为5,线程数量设置为8。线程while循环并进行wait获取资源,只有获取到了资源的线程才会继续往下执行。因此最多同时只有5个线程能获取到资源,未获取到资源的线程将陷入等待,直到其他线程调用signal释放资源,将信号量的值加一。
#include <algorithm>标签:11,std,int,count,C++,信号量,互斥,include,wait From: https://blog.51cto.com/u_6650004/5913054
#include <vector>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <iostream>
#include <chrono>
#include "fxtime.h"
int main()
{
semaphore s(5); // 信号量,资源数定义为5,因此最大允许5个资源被同时访问
int task_count = 1000;
int i = 0;
std::vector<int> queue(task_count);
std::transform(queue.begin(), queue.end(), queue.begin(), [&i](int &n) {i++; return i; });
int thread_count = 8; // 线程个数
std::vector<std::thread> threads;
std::mutex m;
int sub_index = 0;
fxtime t;
for (int i = 0; i < thread_count; ++i)
{
auto t = std::thread([&queue, &sub_index,&s,&m, task_count]() {
do
{
s.wait();// 获取资源,这里通过信号量限制同时允许5个线程运行
std::unique_lock<std::mutex> lock(m);
if (sub_index >= task_count)
{
std::cout << "thread return" << std::endl;
return;
}
std::cout << ",q[" << sub_index << "]=" << queue[sub_index] << ";counter = [" << s.m_counter << "]"<< std::endl;
sub_index++;
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100+10));
s.signal();// 释放资源
} while (true);
});
threads.push_back(std::move(t));
//s.wait(); // 这里进行wait是一个明显的错误,因为这里并没有获取资源!!!
}
std::cout << ";counter = [" << s.m_counter << "]" << std::endl;
int exit_thread_count = 0;
for (auto& t : threads)
{
t.join();
exit_thread_count++;
std::cout << "exit thread count = " << exit_thread_count << std::endl;
}
std::cout << "dont wait>time espaced:" << t.escaped_miliseconds() << "ms" << std::endl;
std::cout << ";counter = [" << s.m_counter << "]" << std::endl;
return 0;
}