消息传递框架与完整的ATM示例

消息传递框架与完整的ATM示例

Why

   为了避开共享易变数据,通过 CSP(Communicating Sequential Processer) 将信息在不同线程间进行传递。
每个线程都有一个状态机。当线程收到一条信息,它将会以某种方式更新其状态,并且可能向其他线程发出一条或多条信息,
对于消息的处理依赖于线程的初始化状态。

What

消息传递框架与完整的ATM示例

How to build

g++ -std=c++11 atm.cpp -o atm  -lpthread

Code

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
#include <thread>
#include <iostream>


namespace messaging
{
    struct message_base
    {
        virtual ~message_base()
        {}
    };

    template<typename Msg>
        struct wrapped_message: message_base
    {
        Msg contents;
        explicit wrapped_message(Msg const& contents_):
            contents(contents_)
        {}
    };

    class queue
    {
        std::mutex m;
        std::condition_variable c;
        std::queue<std::shared_ptr<message_base> > q;
        public:
            template<typename T>
                void push(T const& msg)
                {
                    std::lock_guard<std::mutex> lk(m);
                    q.push(std::make_shared<wrapped_message<T> >(msg));
                    c.notify_all();
                }

            std::shared_ptr<message_base> wait_and_pop()
            {
                std::unique_lock<std::mutex> lk(m);
                c.wait(lk,[&]{return !q.empty();});
                auto res=q.front();
                q.pop();
                return res;
            }
    };
}


namespace messaging
{
    class sender
    {
        queue*q;
        public:
        sender(): q(nullptr) {}
        explicit sender(queue*q_): q(q_) {}

        template<typename Message>
        void send(Message const& msg) {
            if(q) {
                q->push(msg);
            }
        }
    };
}

namespace messaging
{
    template<typename PreviousDispatcher,typename Msg,typename Func>
    class TemplateDispatcher
    {
        queue* q;
        PreviousDispatcher* prev;
        Func f;
        bool chained;
        TemplateDispatcher(TemplateDispatcher const&)=delete;
        TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;

        template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
        friend class TemplateDispatcher; // 所有特化的 TemplateDispatcher类型实例都是友元类

        void wait_and_dispatch() {
            for(;;) {
                auto msg=q->wait_and_pop();
                if(dispatch(msg)) // 1 如果消息处理过后,会跳出循环
                    break;
            }
        }

        bool dispatch(std::shared_ptr<message_base> const& msg) {
            if(wrapped_message<Msg>* wrapper=dynamic_cast<wrapped_message<Msg>*>(msg.get())) {
                f(wrapper->contents);
                return true;
            } else {
                return prev->dispatch(msg); // 3 链接到之前的调度器上
            }
        }

        public:
        TemplateDispatcher(TemplateDispatcher&& other):
                q(other.q),prev(other.prev), f(std::move(other.f)), chained(other.chained) {
            other.chained=true;
        }

        TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
            q(q_),prev(prev_),f(std::forward<Func> (f_)),chained(false) {
            prev_->chained=true;
        }

        template<typename OtherMsg, typename OtherFunc>
        TemplateDispatcher<TemplateDispatcher, OtherMsg, OtherFunc> handle(OtherFunc&& of) {
            return TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>(q, this, std::forward<OtherFunc>(of));
        }


        ~TemplateDispatcher() noexcept(false) { // 5 这个析构函数也是
            if(!chained) {
                wait_and_dispatch();
            }
        }
    };
}

namespace messaging
{
    class close_queue
    {};

    class dispatcher
    {
        queue* q;
        bool chained;
        dispatcher(dispatcher const&)=delete; // dispatcher实例不能被拷贝
        dispatcher& operator=(dispatcher const&)=delete;

        template< typename Dispatcher, typename Msg, typename Func>
        friend class TemplateDispatcher;

        void wait_and_dispatch() {
            for(;;) {
                auto msg=q->wait_and_pop();
                dispatch(msg);
            }
        }

        bool dispatch(std::shared_ptr<message_base> const& msg) { // 2 dispatch()会检查close_queue消息,然后抛出
            if(dynamic_cast<wrapped_message<close_queue>*>(msg.get())) {
                throw close_queue();
            }
            return false;
        }

        public:

        dispatcher(dispatcher&& other): q(other.q), chained(other.chained) {
            other.chained=true; // 源不能等待消息
        }

        explicit dispatcher(queue* q_): q(q_),chained(false)
        {}

        template<typename Message,typename Func>
        TemplateDispatcher<dispatcher, Message, Func>
        handle(Func&& f) {
            return TemplateDispatcher<dispatcher,Message,Func>(q,this,std::forward<Func>(f));
        }

        ~dispatcher() noexcept(false) {
            if(!chained) {
                wait_and_dispatch();
            }
        }
    };
}

namespace messaging
{
    class receiver
    {
        queue q;

        public:

        operator sender() {
            return sender(&q);
        }

        dispatcher wait() {
            return dispatcher(&q);
        }
    };
}




struct withdraw
{
    std::string account;
    unsigned amount;
    mutable messaging::sender atm_queue;
    withdraw(std::string const& account_,
            unsigned amount_,
            messaging::sender atm_queue_):
        account(account_),amount(amount_),
        atm_queue(atm_queue_)
    {}
};

struct withdraw_ok
{};

struct withdraw_denied
{};

struct cancel_withdrawal
{
    std::string account;
    unsigned amount;
    cancel_withdrawal(std::string const& account_, unsigned amount_):
        account(account_),amount(amount_)
    {}
};

struct withdrawal_processed
{

    std::string account;
    unsigned amount;
    withdrawal_processed(std::string const& account_, unsigned amount_):
        account(account_),amount(amount_)
    {}
};

struct card_inserted
{
    std::string account;
    explicit card_inserted(std::string const& account_): account(account_)
    {}
};

struct digit_pressed
{
    char digit;
    explicit digit_pressed(char digit_): digit(digit_)
    {}
};

struct clear_last_pressed
{};

struct eject_card
{};


struct withdraw_pressed
{
    unsigned amount;
    explicit withdraw_pressed(unsigned amount_):
        amount(amount_)
    {}
};

struct cancel_pressed
{};

struct issue_money
{
    unsigned amount;
    issue_money(unsigned amount_):
        amount(amount_)
    {}
};

struct verify_pin
{
    std::string account;
    std::string pin;
    mutable messaging::sender atm_queue;
    verify_pin(std::string const& account_,std::string const&
            pin_,
            messaging::sender atm_queue_):
        account(account_),pin(pin_),atm_queue(atm_queue_)
    {}
};

struct pin_verified
{};
struct pin_incorrect
{};
struct display_enter_pin
{};
struct display_enter_card
{};
struct display_insufficient_funds
{};
struct display_withdrawal_cancelled
{};

struct display_pin_incorrect_message
{};
struct display_withdrawal_options
{};
struct get_balance
{
    std::string account;
    mutable messaging::sender atm_queue;
    get_balance(std::string const& account_,messaging::sender
            atm_queue_):
        account(account_),atm_queue(atm_queue_)
    {}
};
struct balance
{
    unsigned amount;
    explicit balance(unsigned amount_):
        amount(amount_)
    {}
};
struct display_balance
{
    unsigned amount;
    explicit display_balance(unsigned amount_):
        amount(amount_)
    {}
};
struct balance_pressed
{};
class atm
{
    messaging::receiver incoming;
    messaging::sender bank;
    messaging::sender interface_hardware;
    void (atm::*state)();
    std::string account;
    unsigned withdrawal_amount;
    std::string pin;

    void process_withdrawal() {
        incoming.wait()
            .handle<withdraw_ok>(
                [&](withdraw_ok const& msg) {
                    interface_hardware.send(issue_money(withdrawal_amount));
                    bank.send(withdrawal_processed(account,withdrawal_amount));
                    state=&atm::done_processing;
                }
            )
            .handle<withdraw_denied>(
                    [&](withdraw_denied const& msg) {
                        interface_hardware.send(display_insufficient_funds());
                        state=&atm::done_processing;
                    }
            )
            .handle<cancel_pressed>(
                    [&](cancel_pressed const& msg) {
                        bank.send(cancel_withdrawal(account,withdrawal_amount));
                        interface_hardware.send(display_withdrawal_cancelled());
                        state=&atm::done_processing;
                    }
            );
    }

    void process_balance() {
        incoming.wait()
            .handle<balance>(
                    [&](balance const& msg)
                    {
                    interface_hardware.send(display_balance(msg.amount));
                    state=&atm::wait_for_action;
                    })
        .handle<cancel_pressed>(
                [&](cancel_pressed const& msg)
                {
                state=&atm::done_processing;
                });
    }
    void wait_for_action() {
        interface_hardware.send(display_withdrawal_options());
        incoming.wait()
            .handle<withdraw_pressed>(
                    [&](withdraw_pressed const& msg)
                    {
                    withdrawal_amount=msg.amount;
                    bank.send(withdraw(account,msg.amount,incoming));
                    state=&atm::process_withdrawal;
                    })
        .handle<balance_pressed>(
                [&](balance_pressed const& msg)
                {
                bank.send(get_balance(account,incoming));
                state=&atm::process_balance;
                })
        .handle<cancel_pressed>(

                [&](cancel_pressed const& msg)
                {
                state=&atm::done_processing;
                });
    }

    void verifying_pin() {
        incoming.wait()
            .handle<pin_verified>(
                    [&](pin_verified const& msg)
                    {
                    state=&atm::wait_for_action;
                    })
        .handle<pin_incorrect>(
                [&](pin_incorrect const& msg)
                {
                interface_hardware.send(display_pin_incorrect_message());
                state=&atm::done_processing;
                })
        .handle<cancel_pressed>(
                [&](cancel_pressed const& msg)
                {
                state=&atm::done_processing;
                });
    }

    void getting_pin()
    {
        incoming.wait()
            .handle<digit_pressed>(
                    [&](digit_pressed const& msg)
                    {
                        unsigned const pin_length=4;
                        pin+=msg.digit;
                        if(pin.length()==pin_length) {
                            bank.send(verify_pin(account,pin,incoming));
                            state=&atm::verifying_pin;
                        }
                    })
        .handle<clear_last_pressed>(
                [&](clear_last_pressed const& msg)
                {
                    if(!pin.empty()) {
                        pin.pop_back();
                }
                })
        .handle<cancel_pressed>(
                [&](cancel_pressed const& msg)
                {
                    state=&atm::done_processing;
                });
    }

    void waiting_for_card() {
        interface_hardware.send(display_enter_card());
        incoming.wait()
            .handle<card_inserted>(
                [&](card_inserted const& msg) {
                    account=msg.account;
                    pin="";
                    interface_hardware.send(display_enter_pin());
                    state=&atm::getting_pin;
                }
            );
    }

    void done_processing()
    {
        interface_hardware.send(eject_card());
        state=&atm::waiting_for_card;
    }

    atm(atm const&)=delete;

    atm& operator=(atm const&)=delete;

    public:
    atm(messaging::sender bank_, messaging::sender interface_hardware_):
        bank(bank_),interface_hardware(interface_hardware_)
    {}

    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run() {
        state=&atm::waiting_for_card;
        try {
            for(;;) {
                (this->*state)();
            }
        } catch(messaging::close_queue const&) {
        }
    }

    messaging::sender get_sender() {
        return incoming;
    }
};

class bank_machine
{
    messaging::receiver incoming;
    unsigned balance;
    public:

    bank_machine(): balance(199) {}

    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run() {
        try {
            for(;;) {
                incoming.wait()
                    .handle<verify_pin>(
                            [&](verify_pin const& msg) {
                                if(msg.pin=="1937") {
                                    msg.atm_queue.send(pin_verified());
                                } else {
                                    msg.atm_queue.send(pin_incorrect());
                                }
                            }
                            )
                    .handle<withdraw>(
                            [&](withdraw const& msg) {
                                if(balance>=msg.amount) {
                                    msg.atm_queue.send(withdraw_ok());
                                    balance-=msg.amount;
                                } else {
                                    msg.atm_queue.send(withdraw_denied());
                                }
                            }
                            )
                    .handle<get_balance>(
                            [&](get_balance const& msg) {
                                msg.atm_queue.send(::balance(balance));
                            }
                            )
                    .handle<withdrawal_processed>(
                            [&](withdrawal_processed const& msg) {}
                            )
                    .handle<cancel_withdrawal>(
                            [&](cancel_withdrawal const& msg) {}
                            );
            }
        }
        catch(messaging::close_queue const&) {
        }
    }

    messaging::sender get_sender() {
        return incoming;
    }
};

class interface_machine
{
    messaging::receiver incoming;
    std::mutex iom;

    public:
    void done() {
        get_sender().send(messaging::close_queue());
    }

    void run()
    {
        try {
            for(;;) {
                incoming.wait()
                    .handle<issue_money>(
                            [&](issue_money const& msg)
                            {
                            {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Issuing "
                            <<msg.amount<<std::endl;
                            }
                            })
                .handle<display_insufficient_funds>(
                        [&](display_insufficient_funds const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"Insufficient funds"<<std::endl;
                        }
                        })
                .handle<display_enter_pin>(
                        [&](display_enter_pin const& msg) {
                            {
                                std::lock_guard<std::mutex> lk(iom);
                                std::cout<<"Please enter your PIN (0-9)" <<std::endl;
                            }
                        }
                        )
                .handle<display_enter_card>(
                        [&](display_enter_card const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"Please enter your card (I)"
                        <<std::endl;
                        }

                        })
                .handle<display_balance>(
                        [&](display_balance const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout
                        <<"The balance of your account is "
                        <<msg.amount<<std::endl;
                        }
                        })
                .handle<display_withdrawal_options>(
                        [&](display_withdrawal_options const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"Withdraw 50? (w)"<<std::endl;
                        std::cout<<"Display Balance? (b)"
                        <<std::endl;
                        std::cout<<"Cancel? (c)"<<std::endl;
                        }
                        })
                .handle<display_withdrawal_cancelled>(
                        [&](display_withdrawal_cancelled const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"Withdrawal cancelled"
                        <<std::endl;
                        }
                        })
                .handle<display_pin_incorrect_message>(
                        [&](display_pin_incorrect_message const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"PIN incorrect"<<std::endl;
                        }
                        })
                .handle<eject_card>(

                        [&](eject_card const& msg)
                        {
                        {
                        std::lock_guard<std::mutex> lk(iom);
                        std::cout<<"Ejecting card"<<std::endl;
                        }
                        });
            }
        } catch(messaging::close_queue&) {
        }
    }

    messaging::sender get_sender() {
        return incoming;
    }
};

int main()
{
    bank_machine bank;
    interface_machine interface_hardware;
    atm machine(bank.get_sender(), interface_hardware.get_sender());
    std::thread bank_thread(&bank_machine::run,&bank);
    std::thread if_thread(&interface_machine::run,&interface_hardware);
    std::thread atm_thread(&atm::run,&machine);
    messaging::sender atmqueue(machine.get_sender());

    bool quit_pressed=false;
    while(!quit_pressed) {
        char c=getchar();
        switch(c) {
            case '0':
            case '1':
            case '2':
            case '3':
            case '4':
            case '5':
            case '6':
            case '7':
            case '8':
            case '9':
                atmqueue.send(digit_pressed(c));
                break;
            case 'b':
                atmqueue.send(balance_pressed());
                break;
            case 'w':
                atmqueue.send(withdraw_pressed(50));
                break;
            case 'c':
                atmqueue.send(cancel_pressed());
                break;
            case 'q':
                quit_pressed=true;
                break;
            case 'i':
                atmqueue.send(card_inserted("acc1234"));
                break;
        }
    }
    bank.done();
    machine.done();
    interface_hardware.done();
    atm_thread.join();
    bank_thread.join();
    if_thread.join();
}