深入理解C++中的同步并发操作(c++ concurrency in action 第四章总结)
第四章详细介绍了C++中的各种并发工具,包括条件变量、std::future 和 std::async、带超时的wait、std::packaged_task 和 std::promise,以及如何使用这些工具来简化代码。本总结将依次介绍这些工具的用法和应用场景,并结合实际代码进行说明。
4.1 使用std::condition_variable来等待事件
std::condition_variable
- 基本概念:条件变量用于阻塞一个或多个线程,直到某个条件为真。
- 通常,一个线程等待条件变量,另一个线程在条件满足后通知等待的线程继续执行。
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
void do_work() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return ready; });
std::cout << "Work done!" << std::endl;
}
void set_ready() {
std::lock_guard<std::mutex> lock(mtx);
ready = true;
cv.notify_one();
}
int main() {
std::thread worker(do_work);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟一些准备工作
set_ready();
worker.join();
return 0;
}
在这个示例中,do_work线程在调用条件变量的wait函数后进入阻塞状态,直到其他线程设置ready标志并调用notify_one。cv.wait确保线程在锁定互斥锁的同时等待条件满足,而notify_one通知一个等待的线程继续执行。这样可以避免线程忙等待,提高程序的效率。
线程安全queue的实现
在并发环境中,线程安全的队列是实现生产者-消费者模型的基础。我们可以通过 std::mutex 和 std::condition_variable 来实现一个简单的线程安全队列。
// thread_safe_queue.cpp
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
template<typename T>
class threadsafe_queue {
private:
std::queue<T> data_queue;
mutable std::mutex mut;
std::condition_variable data_cond;
public:
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
4.2 使用 std::future 和 std::async 进行异步任务处理
std::future 和 std::async 是C++11中引入的异步任务处理工具,允许开发者在程序中执行异步操作,并获取这些操作的结果。
- std::future 用于存储异步操作的结果。
- std::async 用于启动一个异步任务,并返回一个 std::future 对象。
#include <future>
#include <iostream>
constexpr int find_a_value() noexcept {
return 1;
}
void do_something() {
std::cout << "do something!" << std::endl;
}
struct X {
void foo(int, std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用 p->foo(42, "hello"),其中 p 是 &x
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用 tmpx.bar("goodbye"),其中 tmpx 是 x 的副本
struct Y {
double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141); // 调用 tmpy(3.141),其中 tmpy 是从 Y() 移动构造的
auto f4 = std::async(std::ref(y), 2.718); // 调用 y(2.718)
X baz(X&);
auto f5 = std::async (baz, std::ref(x)); // 调用 baz(x)
class move_only {
public:
move_only();
move_only(move_only&&);
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};
auto f6 = std::async(move_only()); // 调用 tmp(),其中 tmp 是通过 std::move(move_only()) 构造的
auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程中运行
auto f8 = std::async(std::launch::deferred, baz, std::ref(x)); // 在 wait() 或 get() 中运行
auto f9 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); // 实现选择
auto f10 = std::async(baz, std::ref(x)); // 实现选择
// f8.wait(); // 调用延迟函数
int main() {
std::future<int> the_answer(std::async(find_a_value));
do_something();
std::cout << "the answer is: " << the_answer.get() << std::endl;
return 0;
}
4.3 使用 std::packaged_task和 std::promise 进行任务管理
使用 promise 进行线程间通信
std::promise 和 std::future 配合使用,可以在不同线程之间传递值或通知事件。promise 用于设置值,而 future 则用于接收该值。
// promise.cpp
#include <future>
#include <iostream>
#include <thread>
void compute(std::promise<int>&& p) {
p.set_value(42);
}
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(compute, std::move(p));
std::cout << "Result: " << f.get() << std::endl;
t.join();
return 0;
}
在这段代码中,compute 函数在线程中执行计算,并通过 promise 将结果设置为 42。主线程通过 future 获取该结果。
#include <future>
void process_connections(connection_set& connections) {
while (!done(connections)) {
for (connection_iterator connection = connections.begin(), end = connections.end(); connection != end;
++connection) {
if (connection->has_incoming_data()) {
data_packet data = connection->incoming();
std::promise<payload_type>& p = connection->get_promise(data.id);
p.set_value(data.payload);
}
if (connection->has_outgoing_data()) {
outgoing_packet data = connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}
std::packaged_task的使用
#include <future>
#include <iostream>
#include <thread>
int calculate(int x) {
return x * 2;
}
int main() {
std::packaged_task<int(int)> task(calculate);
std::future<int> result = task.get_future();
std::thread(std::move(task), 5).detach();
std::cout << "Result: " << result.get() << std::endl;
return 0;
}
#include <deque>
#include <future>
#include <mutex>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() {
while (!gui_shutdown_message_received()) {
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if (tasks.empty()) {
continue;
}
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
}
std::thread gui_bg_thread(gui_thread);
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}
spawn_task:封装 packaged_task 和 thread
在有些情况下,我们需要对 std::packaged_task 和 std::thread 进行封装,以便更灵活地管理任务。这种封装可以通过创建一个 spawn_task 函数来实现。
#include <future>
#include <type_traits>
template <typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a) {
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)>
task(std::move(f)));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task), std::move(a));
t.detach();
return res;
}
4.4使用带时间限制的wait
在实际开发中,往往需要在等待某个任务完成时设置一个超时时间,以防止程序无限期地阻塞。
- 等待超时与时钟:标准库提供了std::condition_variable和std::future的wait_for与wait_until方法,这些方法可以指定等待的时间段或绝对时间。
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
void do_work() {
std::unique_lock<std::mutex> lock(mtx);
if(cv.wait_for(lock, std::chrono::seconds(1), [] { return ready; })) {
std::cout << "Work done within timeout!" << std::endl;
} else {
std::cout << "Work timeout!" << std::endl;
}
}
int main() {
std::thread worker(do_work);
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟较长的准备工作
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one();
worker.join();
return 0;
}
在这个例子中,如果ready标志在1秒内没有被设置,do_work函数将超时并输出“Work timeout!”。使用带时间限制的等待,可以防止线程无限期地等待,从而提高程序的鲁棒性。
4.5使用同步机制简化代码
通过将同步机制抽象化,我们可以更简洁地实现复杂的并发逻辑。例如,通过消息传递或使用std::future的扩展功能,可以更容易地实现并发任务的协调。
函数式编程
通过std::future和std::promise,我们可以构建一种函数式编程风格的并发系统,避免直接共享数据,从而降低复杂性。例如,在实现快速排序算法时,可以利用std::future实现并行计算。
std::list<int> parallel_quick_sort(std::list<int> input) {
if(input.empty()) {
return input;
}
auto pivot = *input.begin();
auto divide_point = std::partition(input.begin(), input.end(), [pivot](int const& t) { return t < pivot; });
std::list<int> lower_part(input.begin(), divide_point);
std::future<std::list<int>> new_lower(std::async(parallel_quick_sort, std::move(lower_part)));
auto new_higher = parallel_quick_sort(std::list<int>(divide_point, input.end()));
std::list<int> result;
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}
4.6使用消息传递模型实现并发系统:以ATM机为例
在并发编程中,如何在多个线程之间进行有效的通信和同步非常重要。传统的方式往往涉及到锁、互斥量等复杂的同步机制,而这些机制很容易导致死锁、竞争条件等问题。为了解决这些问题,《C++ Concurrency in Action》一书中介绍了一种使用消息传递模型(Message Passing)的方式,并通过一个ATM机的例子,展示了如何设计一个更为简洁和高效的并发系统。
消息传递与CSP模型
消息传递模型的核心思想来源于CSP(Communicating Sequential Processes),这是一种不共享数据的并发编程模型。在这种模型中,每个线程都可以被视为一个独立的状态机(State Machine)。当一个线程接收到消息时,它会根据当前的状态处理该消息,并可能改变其状态或发送消息给其他线程。
ATM机的状态机设计
为了演示消息传递模型,书中设计了一个ATM机的状态机模型,展示了如何使用消息队列来处理ATM机的不同状态和操作。ATM机的主要功能包括等待用户插卡、处理用户输入PIN码、处理取款请求、显示账户余额和完成交易。每个操作对应一个状态,并通过消息在状态之间进行转换。
1. 等待用户插入卡片
ATM机首先处于等待状态,直到用户插入卡片。当卡片插入后,ATM机会读取账户信息,并进入输入PIN码的状态。
2. 处理用户输入PIN码
在这个状态下,用户需要输入4位PIN码。如果输入的PIN码正确,ATM机会进入处理取款请求或显示余额的状态;如果不正确,ATM机会显示错误信息并返回初始状态。
3. 处理取款请求
当用户选择取款时,ATM机会与银行服务器进行通信,确认用户的账户余额。如果余额充足,ATM机会发放现金;如果余额不足,则显示错误信息并取消交易。
4. 显示账户余额
用户可以选择查看账户余额,ATM机会与银行服务器通信,并显示用户的当前余额。
5. 完成交易
在交易完成后,ATM机会退回用户的卡片,并回到等待下一位用户插卡的状态。
消息队列的实现
消息队列是整个消息传递系统的核心。在这个实现中,消息被封装在一个基类指针中,并通过模板类进行具体消息类型的处理。消息可以被压入队列,并在需要时被线程弹出进行处理。
发送与接收消息
消息的发送通过sender类完成,这个类是对消息队列的简单封装,只允许其他线程向队列发送消息。而消息的接收则通过receiver类来实现,receiver类持有消息队列的所有权,并提供等待和分发消息的功能。具体的消息处理逻辑通过dispatcher和TemplateDispatcher类来实现,这些类负责将接收到的消息分发给对应的处理函数。
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
#include <iostream>
#include <thread>
#include <string>
namespace messaging {
struct message_base {
virtual ~message_base() {}
};
template<typename Msg>
struct wrapped_message: message_base {
Msg contents;
explicit wrapped_message(Msg const& contents_): contents(contents_) {}
};
class queue {
std::mutex m;
std::condition_variable c;
std::queue<std::shared_ptr<message_base> > q;
public:
template<typename T>
void push(T const& msg) {
std::lock_guard<std::mutex> lk(m);
q.push(std::make_shared<wrapped_message<T> >(msg));
c.notify_all();
}
std::shared_ptr<message_base> wait_and_pop() {
std::unique_lock<std::mutex> lk(m);
c.wait(lk,[&]{return !q.empty();});
auto res=q.front();
q.pop();
return res;
}
};
class sender {
queue* q;
public:
sender(): q(nullptr) {}
explicit sender(queue*q_): q(q_) {}
template<typename Message>
void send(Message const& msg) {
if(q) {
q->push(msg);
}
}
};
class close_queue {};
class dispatcher {
queue* q;
bool chained;
dispatcher(dispatcher const&)=delete;
dispatcher& operator=(dispatcher const&)=delete;
template<typename Dispatcher, typename Msg, typename Func>
friend class TemplateDispatcher;
void wait_and_dispatch() {
for(;;) {
auto msg=q->wait_and_pop();
dispatch(msg);
}
}
bool dispatch(std::shared_ptr<message_base> const& msg) {
if(dynamic_cast<wrapped_message<close_queue>*>(msg.get())) {
throw close_queue();
}
return false;
}
public:
dispatcher(dispatcher&& other): q(other.q),chained(other.chained) {
other.chained=true;
}
explicit dispatcher(queue* q_): q(q_),chained(false) {}
template<typename Message,typename Func>
TemplateDispatcher<dispatcher,Message,Func> handle(Func&& f) {
return TemplateDispatcher<dispatcher,Message,Func>(q,this,std::forward<Func>(f));
}
~dispatcher() noexcept(false) {
if(!chained) {
wait_and_dispatch();
}
}
};
template<typename PreviousDispatcher,typename Msg,typename Func>
class TemplateDispatcher {
queue* q;
PreviousDispatcher* prev;
Func f;
bool chained;
TemplateDispatcher(TemplateDispatcher const&)=delete;
TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;
template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
friend class TemplateDispatcher;
void wait_and_dispatch() {
for(;;) {
auto msg=q->wait_and_pop();
if(dispatch(msg))
break;
}
}
bool dispatch(std::shared_ptr<message_base> const& msg) {
if(wrapped_message<Msg>* wrapper=dynamic_cast<wrapped_message<Msg>*>(msg.get())) {
f(wrapper->contents);
return true;
} else {
return prev->dispatch(msg);
}
}
public:
TemplateDispatcher(TemplateDispatcher&& other):
q(other.q),prev(other.prev),f(std::move(other.f)),chained(other.chained) {
other.chained=true;
}
TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false) {
prev_->chained=true;
}
template<typename OtherMsg,typename OtherFunc>
TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc> handle(OtherFunc&& of) {
return TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>(
q,this,std::forward<OtherFunc>(of));
}
~TemplateDispatcher() noexcept(false) {
if(!chained) {
wait_and_dispatch();
}
}
};
class receiver {
queue q;
public:
operator sender() {
return sender(&q);
}
dispatcher wait() {
return dispatcher(&q);
}
};
}
struct withdraw {
std::string account;
unsigned amount;
mutable messaging::sender atm_queue;
withdraw(std::string const& account_, unsigned amount_, messaging::sender atm_queue_):
account(account_),amount(amount_), atm_queue(atm_queue_) {}
};
struct withdraw_ok {};
struct withdraw_denied {};
struct cancel_withdrawal {
std::string account;
unsigned amount;
cancel_withdrawal(std::string const& account_, unsigned amount_):
account(account_),amount(amount_) {}
};
struct withdrawal_processed {
std::string account;
unsigned amount;
withdrawal_processed(std::string const& account_, unsigned amount_):
account(account_),amount(amount_) {}
};
struct card_inserted {
std::string account;
explicit card_inserted(std::string const& account_): account(account_) {}
};
struct digit_pressed {
char digit;
explicit digit_pressed(char digit_): digit(digit_) {}
};
struct clear_last_pressed {};
struct eject_card {};
struct withdraw_pressed {
unsigned amount;
explicit withdraw_pressed(unsigned amount_): amount(amount_) {}
};
struct cancel_pressed {};
struct issue_money {
unsigned amount;
issue_money(unsigned amount_): amount(amount_) {}
};
struct verify_pin {
std::string account;
std::string pin;
mutable messaging::sender atm_queue;
verify_pin(std::string const& account_,std::string const& pin_, messaging::sender atm_queue_):
account(account_),pin(pin_),atm_queue(atm_queue_) {}
};
struct pin_verified {};
struct pin_incorrect {};
struct display_enter_pin {};
struct display_enter_card {};
struct display_insufficient_funds {};
struct display_withdrawal_cancelled {};
struct display_pin_incorrect_message {};
struct display_withdrawal_options {};
struct get_balance {
std::string account;
mutable messaging::sender atm_queue;
get_balance(std::string const& account_,messaging::sender atm_queue_):
account(account_),atm_queue(atm_queue_) {}
};
struct balance {
unsigned amount;
explicit balance(unsigned amount_): amount(amount_) {}
};
struct display_balance {
unsigned amount;
explicit display_balance(unsigned amount_): amount(amount_) {}
};
struct balance_pressed {};
class atm {
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::*state)();
std::string account;
unsigned withdrawal_amount;
std::string pin;
void process_withdrawal() {
incoming.wait()
.handle<withdraw_ok>(
[&](withdraw_ok const& msg) {
interface_hardware.send(issue_money(withdrawal_amount));
bank.send(withdrawal_processed(account,withdrawal_amount));
state=&atm::done_processing;
}
)
.handle<withdraw_denied>(
[&](withdraw_denied const& msg) {
interface_hardware.send(display_insufficient_funds());
state=&atm::done_processing;
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
bank.send(cancel_withdrawal(account,withdrawal_amount));
interface_hardware.send(display_withdrawal_cancelled());
state=&atm::done_processing;
}
);
}
void process_balance() {
incoming.wait()
.handle<balance>(
[&](balance const& msg) {
interface_hardware.send(display_balance(msg.amount));
state=&atm::wait_for_action;
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state=&atm::done_processing;
}
);
}
void wait_for_action() {
interface_hardware.send(display_withdrawal_options());
incoming.wait()
.handle<withdraw_pressed>(
[&](withdraw_pressed const& msg) {
withdrawal_amount=msg.amount;
bank.send(withdraw(account,msg.amount,incoming));
state=&atm::process_withdrawal;
}
)
.handle<balance_pressed>(
[&](balance_pressed const& msg) {
bank.send(get_balance(account,incoming));
state=&atm::process_balance;
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state=&atm::done_processing;
}
);
}
void verifying_pin() {
incoming.wait()
.handle<pin_verified>(
[&](pin_verified const& msg) {
state=&atm::wait_for_action;
}
)
.handle<pin_incorrect>(
[&](pin_incorrect const& msg) {
interface_hardware.send(display_pin_incorrect_message());
state=&atm::done_processing;
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state=&atm::done_processing;
}
);
}
void getting_pin() {
incoming.wait()
.handle<digit_pressed>(
[&](digit_pressed const& msg) {
unsigned const pin_length=4;
pin+=msg.digit;
if(pin.length()==pin_length) {
bank.send(verify_pin(account,pin,incoming));
state=&atm::verifying_pin;
}
}
)
.handle<clear_last_pressed>(
[&](clear_last_pressed const& msg) {
if(!pin.empty()) {
pin.pop_back();
}
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state=&atm::done_processing;
}
);
}
void waiting_for_card() {
interface_hardware.send(display_enter_card());
incoming.wait()
.handle<card_inserted>(
[&](card_inserted const& msg) {
account=msg.account;
pin="";
interface_hardware.send(display_enter_pin());
state=&atm::getting_pin;
}
);
}
void done_processing() {
interface_hardware.send(eject_card());
state=&atm::waiting_for_card;
}
atm(atm const&)=delete;
atm& operator=(atm const&)=delete;
public:
atm(messaging::sender bank_, messaging::sender interface_hardware_):
bank(bank_),interface_hardware(interface_hardware_) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
state=&atm::waiting_for_card;
try {
for(;;) {
(this->*state)();
}
} catch(messaging::close_queue const&) {}
}
messaging::sender get_sender() {
return incoming;
}
};
class bank_machine {
messaging::receiver incoming;
unsigned balance;
public:
bank_machine(): balance(199) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for(;;) {
incoming.wait()
.handle<verify_pin>(
[&](verify_pin const& msg) {
if(msg.pin=="1937") {
msg.atm_queue.send(pin_verified());
} else {
msg.atm_queue.send(pin_incorrect());
}
}
)
.handle<withdraw>(
[&](withdraw const& msg) {
if(balance>=msg.amount) {
msg.atm_queue.send(withdraw_ok());
balance-=msg.amount;
} else {
msg.atm_queue.send(withdraw_denied());
}
}
)
.handle<get_balance>(
[&](get_balance const& msg) {
msg.atm_queue.send(::balance(balance));
}
)
.handle<withdrawal_processed>(
[&](withdrawal_processed const& msg) {}
)
.handle<cancel_withdrawal>(
[&](cancel_withdrawal const& msg) {}
);
}
} catch(messaging::close_queue const&) {}
}
messaging::sender get_sender() {
return incoming;
}
};
class interface_machine {
messaging::receiver incoming;
public:
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for(;;) {
incoming.wait()
.handle<issue_money>(
[&](issue_money const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Issuing "<<msg.amount<<std::endl;
}
}
)
.handle<display_insufficient_funds>(
[&](display_insufficient_funds const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Insufficient funds"<<std::endl;
}
}
)
.handle<display_enter_pin>(
[&](display_enter_pin const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Please enter your PIN (0-9)"<<std::endl;
}
}
)
.handle<display_enter_card>(
[&](display_enter_card const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Please enter your card (I)"<<std::endl;
}
}
)
.handle<display_balance>(
[&](display_balance const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"The balance of your account is "<<msg.amount<<std::endl;
}
}
)
.handle<display_withdrawal_options>(
[&](display_withdrawal_options const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Withdraw 50? (w)"<<std::endl;
std::cout<<"Display Balance? (b)"<<std::endl;
std::cout<<"Cancel? (c)"<<std::endl;
}
}
)
.handle<display_withdrawal_cancelled>(
[&](display_withdrawal_cancelled const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Withdrawal cancelled"<<std::endl;
}
}
)
.handle<display_pin_incorrect_message>(
[&](display_pin_incorrect_message const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"PIN incorrect"<<std::endl;
}
}
)
.handle<eject_card>(
[&](eject_card const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout<<"Ejecting card"<<std::endl;
}
}
);
}
} catch(messaging::close_queue&) {}
}
messaging::sender get_sender() {
return incoming;
}
};
int main() {
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(),interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run,&bank);
std::thread if_thread(&interface_machine::run,&interface_hardware);
std::thread atm_thread(&atm::run,&machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed=false;
while(!quit_pressed) {
char c=getchar();
switch(c) {
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(c));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'c':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed=true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}
标签:std,const,atm,c++,queue,amount,C++,concurrency,msg
From: https://blog.csdn.net/uuumrr/article/details/141675956