首页 > 编程语言 >深入理解C++中的同步并发操作(c++ concurrency in action 第四章总结)

深入理解C++中的同步并发操作(c++ concurrency in action 第四章总结)

时间:2024-09-01 18:53:16浏览次数:15  
标签:std const atm c++ queue amount C++ concurrency msg

深入理解C++中的同步并发操作(c++ concurrency in action 第四章总结)

第四章详细介绍了C++中的各种并发工具,包括条件变量、std::future 和 std::async、带超时的wait、std::packaged_task 和 std::promise,以及如何使用这些工具来简化代码。本总结将依次介绍这些工具的用法和应用场景,并结合实际代码进行说明。

4.1 使用std::condition_variable来等待事件

std::condition_variable

  • 基本概念:条件变量用于阻塞一个或多个线程,直到某个条件为真。
  • 通常,一个线程等待条件变量,另一个线程在条件满足后通知等待的线程继续执行。
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>

std::condition_variable cv;
std::mutex mtx;
bool ready = false;

void do_work() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [] { return ready; });
    std::cout << "Work done!" << std::endl;
}

void set_ready() {
    std::lock_guard<std::mutex> lock(mtx);
    ready = true;
    cv.notify_one();
}

int main() {
    std::thread worker(do_work);
    std::this_thread::sleep_for(std::chrono::seconds(1));  // 模拟一些准备工作
    set_ready();
    worker.join();
    return 0;
}

在这个示例中,do_work线程在调用条件变量的wait函数后进入阻塞状态,直到其他线程设置ready标志并调用notify_one。cv.wait确保线程在锁定互斥锁的同时等待条件满足,而notify_one通知一个等待的线程继续执行。这样可以避免线程忙等待,提高程序的效率。

线程安全queue的实现

在并发环境中,线程安全的队列是实现生产者-消费者模型的基础。我们可以通过 std::mutex 和 std::condition_variable 来实现一个简单的线程安全队列。

// thread_safe_queue.cpp
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>

template<typename T>
class threadsafe_queue {
private:
    std::queue<T> data_queue;
    mutable std::mutex mut;
    std::condition_variable data_cond;

public:
    void push(T new_value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }

    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{ return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

4.2 使用 std::future 和 std::async 进行异步任务处理

std::future 和 std::async 是C++11中引入的异步任务处理工具,允许开发者在程序中执行异步操作,并获取这些操作的结果。

  • std::future 用于存储异步操作的结果。
  • std::async 用于启动一个异步任务,并返回一个 std::future 对象。
#include <future>
#include <iostream>

constexpr int find_a_value() noexcept {
  return 1;
}
void do_something() {
  std::cout << "do something!" << std::endl;
}

struct X {
  void foo(int, std::string const&);
  std::string bar(std::string const&);
};

X x;
auto f1 = std::async(&X::foo, &x, 42, "hello");  // 调用 p->foo(42, "hello"),其中 p 是 &x
auto f2 = std::async(&X::bar, x, "goodbye");     // 调用 tmpx.bar("goodbye"),其中 tmpx 是 x 的副本

struct Y {
  double operator()(double);
};

Y y;
auto f3 = std::async(Y(), 3.141);          // 调用 tmpy(3.141),其中 tmpy 是从 Y() 移动构造的
auto f4 = std::async(std::ref(y), 2.718);  // 调用 y(2.718)
X baz(X&);
auto f5 = std::async (baz, std::ref(x));  // 调用 baz(x)

class move_only {
 public:
  move_only();
  move_only(move_only&&);
  move_only(move_only const&) = delete;
  move_only& operator=(move_only&&);
  move_only& operator=(move_only const&) = delete;
  void operator()();
};

auto f6 = std::async(move_only());  // 调用 tmp(),其中 tmp 是通过 std::move(move_only()) 构造的
auto f7 = std::async(std::launch::async, Y(), 1.2);                                  // 在新线程中运行
auto f8 = std::async(std::launch::deferred, baz, std::ref(x));                       // 在 wait() 或 get() 中运行
auto f9 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x));  // 实现选择
auto f10 = std::async(baz, std::ref(x));                                              // 实现选择
// f8.wait();                                                                      // 调用延迟函数

int main() {
  std::future<int> the_answer(std::async(find_a_value));
  do_something();
  std::cout << "the answer is: " << the_answer.get() << std::endl;
  return 0;
}

4.3 使用 std::packaged_task和 std::promise 进行任务管理

使用 promise 进行线程间通信

std::promise 和 std::future 配合使用,可以在不同线程之间传递值或通知事件。promise 用于设置值,而 future 则用于接收该值。

// promise.cpp
#include <future>
#include <iostream>
#include <thread>

void compute(std::promise<int>&& p) {
    p.set_value(42);  
}

int main() {
    std::promise<int> p;
    std::future<int> f = p.get_future();
    std::thread t(compute, std::move(p));
    std::cout << "Result: " << f.get() << std::endl;  
    t.join();
    return 0;
}

在这段代码中,compute 函数在线程中执行计算,并通过 promise 将结果设置为 42。主线程通过 future 获取该结果。

#include <future>

void process_connections(connection_set& connections) {
  while (!done(connections)) {
    for (connection_iterator connection = connections.begin(), end = connections.end(); connection != end;
         ++connection) {
      if (connection->has_incoming_data()) {
        data_packet data              = connection->incoming();
        std::promise<payload_type>& p = connection->get_promise(data.id);
        p.set_value(data.payload);
      }
      if (connection->has_outgoing_data()) {
        outgoing_packet data = connection->top_of_outgoing_queue();
        connection->send(data.payload);
        data.promise.set_value(true);
      }
    }
  }
}

std::packaged_task的使用

#include <future>
#include <iostream>
#include <thread>

int calculate(int x) {
    return x * 2;
}

int main() {
    std::packaged_task<int(int)> task(calculate);
    std::future<int> result = task.get_future();
    std::thread(std::move(task), 5).detach();
    std::cout << "Result: " << result.get() << std::endl;
    return 0;
}

#include <deque>
#include <future>
#include <mutex>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread() {
  while (!gui_shutdown_message_received()) {
    get_and_process_gui_message();
    std::packaged_task<void()> task;
    {
      std::lock_guard<std::mutex> lk(m);
      if (tasks.empty()) {
        continue;
      }
      task = std::move(tasks.front());
      tasks.pop_front();
    }
    task();
  }
}

std::thread gui_bg_thread(gui_thread);

template <typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
  std::packaged_task<void()> task(f);
  std::future<void> res = task.get_future();
  std::lock_guard<std::mutex> lk(m);
  tasks.push_back(std::move(task));
  return res;
}

spawn_task:封装 packaged_task 和 thread

在有些情况下,我们需要对 std::packaged_task 和 std::thread 进行封装,以便更灵活地管理任务。这种封装可以通过创建一个 spawn_task 函数来实现。

#include <future>
#include <type_traits>
template <typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a) {
  typedef std::result_of<F(A&&)>::type result_type;
 std::packaged_task<result_type(A&&)>
 task(std::move(f)));
 std::future<result_type> res(task.get_future());
 std::thread t(std::move(task), std::move(a));
 t.detach();
 return res;
}

4.4使用带时间限制的wait

在实际开发中,往往需要在等待某个任务完成时设置一个超时时间,以防止程序无限期地阻塞。

  • 等待超时与时钟:标准库提供了std::condition_variable和std::future的wait_for与wait_until方法,这些方法可以指定等待的时间段或绝对时间。
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>

std::condition_variable cv;
std::mutex mtx;
bool ready = false;

void do_work() {
    std::unique_lock<std::mutex> lock(mtx);
    if(cv.wait_for(lock, std::chrono::seconds(1), [] { return ready; })) {
        std::cout << "Work done within timeout!" << std::endl;
    } else {
        std::cout << "Work timeout!" << std::endl;
    }
}

int main() {
    std::thread worker(do_work);
    std::this_thread::sleep_for(std::chrono::seconds(2));  // 模拟较长的准备工作
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;
    }
    cv.notify_one();
    worker.join();
    return 0;
}


在这个例子中,如果ready标志在1秒内没有被设置,do_work函数将超时并输出“Work timeout!”。使用带时间限制的等待,可以防止线程无限期地等待,从而提高程序的鲁棒性。

4.5使用同步机制简化代码

通过将同步机制抽象化,我们可以更简洁地实现复杂的并发逻辑。例如,通过消息传递或使用std::future的扩展功能,可以更容易地实现并发任务的协调。

函数式编程

通过std::future和std::promise,我们可以构建一种函数式编程风格的并发系统,避免直接共享数据,从而降低复杂性。例如,在实现快速排序算法时,可以利用std::future实现并行计算。

std::list<int> parallel_quick_sort(std::list<int> input) {
    if(input.empty()) {
        return input;
    }
    auto pivot = *input.begin();
    auto divide_point = std::partition(input.begin(), input.end(), [pivot](int const& t) { return t < pivot; });

    std::list<int> lower_part(input.begin(), divide_point);
    std::future<std::list<int>> new_lower(std::async(parallel_quick_sort, std::move(lower_part)));

    auto new_higher = parallel_quick_sort(std::list<int>(divide_point, input.end()));

    std::list<int> result;
    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());

    return result;
}

4.6使用消息传递模型实现并发系统:以ATM机为例

在并发编程中,如何在多个线程之间进行有效的通信和同步非常重要。传统的方式往往涉及到锁、互斥量等复杂的同步机制,而这些机制很容易导致死锁、竞争条件等问题。为了解决这些问题,《C++ Concurrency in Action》一书中介绍了一种使用消息传递模型(Message Passing)的方式,并通过一个ATM机的例子,展示了如何设计一个更为简洁和高效的并发系统。

消息传递与CSP模型

消息传递模型的核心思想来源于CSP(Communicating Sequential Processes),这是一种不共享数据的并发编程模型。在这种模型中,每个线程都可以被视为一个独立的状态机(State Machine)。当一个线程接收到消息时,它会根据当前的状态处理该消息,并可能改变其状态或发送消息给其他线程。

ATM机的状态机设计

为了演示消息传递模型,书中设计了一个ATM机的状态机模型,展示了如何使用消息队列来处理ATM机的不同状态和操作。ATM机的主要功能包括等待用户插卡、处理用户输入PIN码、处理取款请求、显示账户余额和完成交易。每个操作对应一个状态,并通过消息在状态之间进行转换。

1. 等待用户插入卡片

ATM机首先处于等待状态,直到用户插入卡片。当卡片插入后,ATM机会读取账户信息,并进入输入PIN码的状态。

2. 处理用户输入PIN码

在这个状态下,用户需要输入4位PIN码。如果输入的PIN码正确,ATM机会进入处理取款请求或显示余额的状态;如果不正确,ATM机会显示错误信息并返回初始状态。

3. 处理取款请求

当用户选择取款时,ATM机会与银行服务器进行通信,确认用户的账户余额。如果余额充足,ATM机会发放现金;如果余额不足,则显示错误信息并取消交易。

4. 显示账户余额

用户可以选择查看账户余额,ATM机会与银行服务器通信,并显示用户的当前余额。

5. 完成交易

在交易完成后,ATM机会退回用户的卡片,并回到等待下一位用户插卡的状态。

消息队列的实现

消息队列是整个消息传递系统的核心。在这个实现中,消息被封装在一个基类指针中,并通过模板类进行具体消息类型的处理。消息可以被压入队列,并在需要时被线程弹出进行处理。

发送与接收消息

消息的发送通过sender类完成,这个类是对消息队列的简单封装,只允许其他线程向队列发送消息。而消息的接收则通过receiver类来实现,receiver类持有消息队列的所有权,并提供等待和分发消息的功能。具体的消息处理逻辑通过dispatcher和TemplateDispatcher类来实现,这些类负责将接收到的消息分发给对应的处理函数。

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
#include <iostream>
#include <thread>
#include <string>

namespace messaging {
    struct message_base {
        virtual ~message_base() {}
    };

    template<typename Msg>
    struct wrapped_message: message_base {
        Msg contents;
        explicit wrapped_message(Msg const& contents_): contents(contents_) {}
    };

    class queue {
        std::mutex m;
        std::condition_variable c;
        std::queue<std::shared_ptr<message_base> > q; 
    public:
        template<typename T>
        void push(T const& msg) {
            std::lock_guard<std::mutex> lk(m);
            q.push(std::make_shared<wrapped_message<T> >(msg)); 
            c.notify_all();
        }

        std::shared_ptr<message_base> wait_and_pop() {
            std::unique_lock<std::mutex> lk(m);
            c.wait(lk,[&]{return !q.empty();}); 
            auto res=q.front();
            q.pop();
            return res;
        }
    };

    class sender {
        queue* q; 
    public:
        sender(): q(nullptr) {}
        explicit sender(queue*q_): q(q_) {}

        template<typename Message>
        void send(Message const& msg) {
            if(q) {
                q->push(msg); 
            }
        }
    };

    class close_queue {};

    class dispatcher {
        queue* q;
        bool chained;
        dispatcher(dispatcher const&)=delete; 
        dispatcher& operator=(dispatcher const&)=delete;

        template<typename Dispatcher, typename Msg, typename Func> 
        friend class TemplateDispatcher;

        void wait_and_dispatch() {
            for(;;) {
                auto msg=q->wait_and_pop();
                dispatch(msg);
            }
        }

        bool dispatch(std::shared_ptr<message_base> const& msg) {
            if(dynamic_cast<wrapped_message<close_queue>*>(msg.get())) {
                throw close_queue();
            }
            return false;
        }
    public:
        dispatcher(dispatcher&& other): q(other.q),chained(other.chained) {
            other.chained=true; 
        }

        explicit dispatcher(queue* q_): q(q_),chained(false) {}

        template<typename Message,typename Func>
        TemplateDispatcher<dispatcher,Message,Func> handle(Func&& f) {
            return TemplateDispatcher<dispatcher,Message,Func>(q,this,std::forward<Func>(f));
        }

        ~dispatcher() noexcept(false) {
            if(!chained) {
                wait_and_dispatch();
            }
        }
    };

    template<typename PreviousDispatcher,typename Msg,typename Func>
    class TemplateDispatcher {
        queue* q;
        PreviousDispatcher* prev;
        Func f;
        bool chained;
        TemplateDispatcher(TemplateDispatcher const&)=delete;
        TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;
        template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
        friend class TemplateDispatcher;

        void wait_and_dispatch() {
            for(;;) {
                auto msg=q->wait_and_pop();
                if(dispatch(msg)) 
                    break;
            }
        }

        bool dispatch(std::shared_ptr<message_base> const& msg) {
            if(wrapped_message<Msg>* wrapper=dynamic_cast<wrapped_message<Msg>*>(msg.get())) {
                f(wrapper->contents); 
                return true;
            } else {
                return prev->dispatch(msg); 
            }
        }
    public:
        TemplateDispatcher(TemplateDispatcher&& other):
            q(other.q),prev(other.prev),f(std::move(other.f)),chained(other.chained) {
            other.chained=true;
        }

        TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
            q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false) {
            prev_->chained=true;
        }

        template<typename OtherMsg,typename OtherFunc>
        TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc> handle(OtherFunc&& of) {
            return TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>(
                q,this,std::forward<OtherFunc>(of));
        }

        ~TemplateDispatcher() noexcept(false) {
            if(!chained) {
                wait_and_dispatch();
            }
        }
    };

    class receiver {
        queue q; 
    public:
        operator sender() {
            return sender(&q);
        }

        dispatcher wait() {
            return dispatcher(&q);
        }
    };
}

struct withdraw {
    std::string account;
    unsigned amount;
    mutable messaging::sender atm_queue;
    withdraw(std::string const& account_, unsigned amount_, messaging::sender atm_queue_):
        account(account_),amount(amount_), atm_queue(atm_queue_) {}
};

struct withdraw_ok {};
struct withdraw_denied {};
struct cancel_withdrawal {
    std::string account;
    unsigned amount;
    cancel_withdrawal(std::string const& account_, unsigned amount_):
        account(account_),amount(amount_) {}
};

struct withdrawal_processed {
    std::string account;
    unsigned amount;
    withdrawal_processed(std::string const& account_, unsigned amount_):
        account(account_),amount(amount_) {}
};

struct card_inserted {
    std::string account;
    explicit card_inserted(std::string const& account_): account(account_) {}
};

struct digit_pressed {
    char digit;
    explicit digit_pressed(char digit_): digit(digit_) {}
};

struct clear_last_pressed {};
struct eject_card {};
struct withdraw_pressed {
    unsigned amount;
    explicit withdraw_pressed(unsigned amount_): amount(amount_) {}
};

struct cancel_pressed {};
struct issue_money {
    unsigned amount;
    issue_money(unsigned amount_): amount(amount_) {}
};

struct verify_pin {
    std::string account;
    std::string pin;
    mutable messaging::sender atm_queue;
    verify_pin(std::string const& account_,std::string const& pin_, messaging::sender atm_queue_):
        account(account_),pin(pin_),atm_queue(atm_queue_) {}
};

struct pin_verified {};
struct pin_incorrect {};
struct display_enter_pin {};
struct display_enter_card {};
struct display_insufficient_funds {};
struct display_withdrawal_cancelled {};
struct display_pin_incorrect_message {};
struct display_withdrawal_options {};
struct get_balance {
    std::string account;
    mutable messaging::sender atm_queue;
    get_balance(std::string const& account_,messaging::sender atm_queue_):
        account(account_),atm_queue(atm_queue_) {}
};

struct balance {
    unsigned amount;
    explicit balance(unsigned amount_): amount(amount_) {}
};

struct display_balance {
    unsigned amount;
    explicit display_balance(unsigned amount_): amount(amount_) {}
};

struct balance_pressed {};

class atm {
    messaging::receiver incoming;
    messaging::sender bank;
    messaging::sender interface_hardware;
    void (atm::*state)();
    std::string account;
    unsigned withdrawal_amount;
    std::string pin;

    void process_withdrawal() {
        incoming.wait()
        .handle<withdraw_ok>(
            [&](withdraw_ok const& msg) {
                interface_hardware.send(issue_money(withdrawal_amount));
                bank.send(withdrawal_processed(account,withdrawal_amount));
                state=&atm::done_processing;
            }
        )
        .handle<withdraw_denied>(
            [&](withdraw_denied const& msg) {
                interface_hardware.send(display_insufficient_funds());
                state=&atm::done_processing;
            }
        )
        .handle<cancel_pressed>(
            [&](cancel_pressed const& msg) {
                bank.send(cancel_withdrawal(account,withdrawal_amount));
                interface_hardware.send(display_withdrawal_cancelled());
                state=&atm::done_processing;
            }
        );
    }

    void process_balance() {
        incoming.wait()
        .handle<balance>(
            [&](balance const& msg) {
                interface_hardware.send(display_balance(msg.amount));
                state=&atm::wait_for_action;
            }
        )
        .handle<cancel_pressed>(
            [&](cancel_pressed const& msg) {
                state=&atm::done_processing;
            }
        );
    }

    void wait_for_action() {
        interface_hardware.send(display_withdrawal_options());
        incoming.wait()
        .handle<withdraw_pressed>(
            [&](withdraw_pressed const& msg) {
                withdrawal_amount=msg.amount;
                bank.send(withdraw(account,msg.amount,incoming));
                state=&atm::process_withdrawal;
            }
        )
        .handle<balance_pressed>(
            [&](balance_pressed const& msg) {
                bank.send(get_balance(account,incoming));
                state=&atm::process_balance;
            }
        )
        .handle<cancel_pressed>(
            [&](cancel_pressed const& msg) {
                state=&atm::done_processing;
            }
        );
    }

    void verifying_pin() {
        incoming.wait()
        .handle<pin_verified>(
            [&](pin_verified const& msg) {
                state=&atm::wait_for_action;
            }
        )
        .handle<pin_incorrect>(
            [&](pin_incorrect const& msg) {
                interface_hardware.send(display_pin_incorrect_message());
                state=&atm::done_processing;
            }
        )
        .handle<cancel_pressed>(
            [&](cancel_pressed const& msg) {
                state=&atm::done_processing;
            }
        );
    }

    void getting_pin() {
        incoming.wait()
        .handle<digit_pressed>(
            [&](digit_pressed const& msg) {
                unsigned const pin_length=4;
                pin+=msg.digit;
                if(pin.length()==pin_length) {
                    bank.send(verify_pin(account,pin,incoming));
                    state=&atm::verifying_pin;
                }
            }
        )
        .handle<clear_last_pressed>(
            [&](clear_last_pressed const& msg) {
                if(!pin.empty()) {
                    pin.pop_back();
                }
            }
        )
        .handle<cancel_pressed>(
            [&](cancel_pressed const& msg) {
                state=&atm::done_processing;
            }
        );
    }

    void waiting_for_card() {
        interface_hardware.send(display_enter_card());
        incoming.wait()
        .handle<card_inserted>(
            [&](card_inserted const& msg) {
                account=msg.account;
                pin="";
                interface_hardware.send(display_enter_pin());
                state=&atm::getting_pin;
            }
        );
    }

    void done_processing() {
        interface_hardware.send(eject_card());
        state=&atm::waiting_for_card;
    }

    atm(atm const&)=delete;
    atm& operator=(atm const&)=delete;

public:
    atm(messaging::sender bank_, messaging::sender interface_hardware_):
        bank(bank_),interface_hardware(interface_hardware_) {}

    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run() {
        state=&atm::waiting_for_card;
        try {
            for(;;) {
                (this->*state)();
            }
        } catch(messaging::close_queue const&) {}
    }

    messaging::sender get_sender() {
        return incoming;
    }
};

class bank_machine {
    messaging::receiver incoming;
    unsigned balance;

public:
    bank_machine(): balance(199) {}

    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run() {
        try {
            for(;;) {
                incoming.wait()
                .handle<verify_pin>(
                    [&](verify_pin const& msg) {
                        if(msg.pin=="1937") {
                            msg.atm_queue.send(pin_verified());
                        } else {
                            msg.atm_queue.send(pin_incorrect());
                        }
                    }
                )
                .handle<withdraw>(
                    [&](withdraw const& msg) {
                        if(balance>=msg.amount) {
                            msg.atm_queue.send(withdraw_ok());
                            balance-=msg.amount;
                        } else {
                            msg.atm_queue.send(withdraw_denied());
                        }
                    }
                )
                .handle<get_balance>(
                    [&](get_balance const& msg) {
                        msg.atm_queue.send(::balance(balance));
                    }
                )
                .handle<withdrawal_processed>(
                    [&](withdrawal_processed const& msg) {}
                )
                .handle<cancel_withdrawal>(
                    [&](cancel_withdrawal const& msg) {}
                );
            }
        } catch(messaging::close_queue const&) {}
    }

    messaging::sender get_sender() {
        return incoming;
    }
};

class interface_machine {
    messaging::receiver incoming;

public:
    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run() {
        try {
            for(;;) {
                incoming.wait()
                .handle<issue_money>(
                    [&](issue_money const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Issuing "<<msg.amount<<std::endl;
                        }
                    }
                )
                .handle<display_insufficient_funds>(
                    [&](display_insufficient_funds const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Insufficient funds"<<std::endl;
                        }
                    }
                )
                .handle<display_enter_pin>(
                    [&](display_enter_pin const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Please enter your PIN (0-9)"<<std::endl;
                        }
                    }
                )
                .handle<display_enter_card>(
                    [&](display_enter_card const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Please enter your card (I)"<<std::endl;
                        }
                    }
                )
                .handle<display_balance>(
                    [&](display_balance const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"The balance of your account is "<<msg.amount<<std::endl;
                        }
                    }
                )
                .handle<display_withdrawal_options>(
                    [&](display_withdrawal_options const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Withdraw 50? (w)"<<std::endl;
                            std::cout<<"Display Balance? (b)"<<std::endl;
                            std::cout<<"Cancel? (c)"<<std::endl;
                        }
                    }
                )
                .handle<display_withdrawal_cancelled>(
                    [&](display_withdrawal_cancelled const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Withdrawal cancelled"<<std::endl;
                        }
                    }
                )
                .handle<display_pin_incorrect_message>(
                    [&](display_pin_incorrect_message const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"PIN incorrect"<<std::endl;
                        }
                    }
                )
                .handle<eject_card>(
                    [&](eject_card const& msg) {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Ejecting card"<<std::endl;
                        }
                    }
                );
            }
        } catch(messaging::close_queue&) {}
    }

    messaging::sender get_sender() {
        return incoming;
    } 
};

int main() {
    bank_machine bank;
    interface_machine interface_hardware;
    atm machine(bank.get_sender(),interface_hardware.get_sender());

    std::thread bank_thread(&bank_machine::run,&bank);
    std::thread if_thread(&interface_machine::run,&interface_hardware);
    std::thread atm_thread(&atm::run,&machine);

    messaging::sender atmqueue(machine.get_sender());
    bool quit_pressed=false;

    while(!quit_pressed) {
        char c=getchar();
        switch(c) {
            case '0':
            case '1':
            case '2':
            case '3':
            case '4':
            case '5':
            case '6':
            case '7':
            case '8':
            case '9':
                atmqueue.send(digit_pressed(c));
                break;
            case 'b':
                atmqueue.send(balance_pressed());
                break;
            case 'w':
                atmqueue.send(withdraw_pressed(50));
                break;
            case 'c':
                atmqueue.send(cancel_pressed());
                break;
            case 'q':
                quit_pressed=true;
                break;
            case 'i':
                atmqueue.send(card_inserted("acc1234"));
                break;
        }
    }

    bank.done();
    machine.done();
    interface_hardware.done();

    atm_thread.join();
    bank_thread.join();
    if_thread.join();
}

标签:std,const,atm,c++,queue,amount,C++,concurrency,msg
From: https://blog.csdn.net/uuumrr/article/details/141675956

相关文章

  • 深度解析:引用 vs 指针,C++程序员必须掌握的核心概念
    引用(Reference)和指针(Pointer)都是C++中非常重要的概念,它们都用于间接访问变量或对象,但在使用方式、语法、内存管理等方面存在显著区别。下面详细解释什么是引用,并展开讨论它与指针的区别。一、什么是引用?引用(Reference)是C++中的一种变量类型,它是已存在变量的一个别名。引用本......
  • C++菜鸟教程 - 从入门到精通 第一节
    一.C++简介C++是一种编程语言,它是由BjarneStroustrup于1979年在贝尔实验室开始设计开发的。C++进一步扩充和完善了C语言,是一种面向对象的程序设计语言。C++可运行于多种平台上,如Windows、MAC操作系统以及 UNIX的各种版本。C++是一种静态类型的、编译式的、......
  • C++头文件<algorithm>中常用函数简介
     概述头文件algorithm(算法库)中主要提供了一些对容器操作的函数,如排序、搜索、复制、比较等,因此使用频率还是很高的,由于主要是操作容器,所以函数的语法也很类似:algorithm_name(container.begin(),container.end(),...);其中,container.begin()和container.end()分......
  • CLR/C++程序找不到DLL最有效方法?
     使用C++17及以上,支持filesystem处理不需要程序集强签名,也不用注册全局程序集缓存,非常有效。#include<iostream>#include<filesystem>usingnamespacestd::filesystem;namespacefs=std::filesystem;//-----------------------------------------------------......
  • Modern C++——不准确“类型声明”引发的非必要性能损耗
    大纲案例代码地址C++是一种强类型语言。我们在编码时就需要明确指出每个变量的类型,进而让编译器可以正确的编译。看似C++编译器比其他弱类型语言的编译器要死板,实则它也做了很多“隐藏”的操作。它会在尝试针对一些非预期类型进行相应转换,以符合预期,比如《C++拾趣——......
  • c++ I/O
    1.flush刷新缓存,endl刷新缓存并换行cout<<"Hello"<<fulsh;cout<<"Wait<<endl;2.hex,oct,dec输出16进制,8进制,10进制cout<<hexcout<<octcout<<dec3.使用width调节宽度cout.width(12);//width函数只影响下一个要显示的item4.使用fill填充字符。C++默认......
  • C++ 标准输入输出 -- <iostream>
    <iostream>库是C++标准库中用于输入输出操作的头文件。<iostream>定义了几个常用的流类和操作符,允许程序与标准输入输出设备(如键盘和屏幕)进行交互。以下是<iostream>库的详细使用说明,包括其主要类和常见用法示例。主要类std::istream:用于输入操作的抽象基类。std::ostre......
  • C++奇迹之旅:深度解析list的模拟实现
    文章目录......
  • C++:std::thread 和 pthread
            在C++中,线程的实现主要有两种方式:使用C++11标准库中的std::thread和POSIX线程库(pthread)。这两种方式各有优缺点,适用于不同的场景。以下是对这两种方式的详细比较和示例代码。std::thread示例代码#include<iostream>#include<thread>#include<chrono>......