消息传递模型实现
消息传递模型实现
文章目录
- 消息传递模型实现
- 优化说明
- 另一个消息队列结合并发编程的例子
- ① 对这份 ATM 重构代码的评价
- ② 消息队列简介
- **“ATM 消息队列工作流程图”**
- 展示“取款操作从插卡到取钱”的完整交互过程
- 使用标准库的
<queue>
和条件变量实现线程安全的消息队列 - 明确定义消息类型,而不仅仅是函数对象
- 实现更清晰的状态机和消息处理分离
- 添加了更完整的用户交互模拟
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <map>// 消息类型枚举
enum class MessageType {INSERT_CARD,ENTER_PIN,PROCESS_TRANSACTION,TRANSACTION_COMPLETE,EJECT_CARD
};// 消息结构体
struct Message {MessageType type;std::string data;Message(MessageType t, const std::string& d = "") : type(t), data(d) {}
};// 线程安全的消息队列
class MessageQueue {
private:std::queue<Message> messages;std::mutex mtx;std::condition_variable cv;public:// 推送消息到队列void push(const Message& msg) {{std::lock_guard<std::mutex> lock(mtx);messages.push(msg);}cv.notify_one();}// 从队列获取消息(阻塞直到有消息)Message pop() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return !messages.empty(); });Message msg = messages.front();messages.pop();return msg;}// 检查队列是否为空bool empty() const {std::lock_guard<std::mutex> lock(mtx);return messages.empty();}
};// ATM状态机
class ATM {
private:enum class State { IDLE, WAITING_FOR_PIN, PROCESSING, COMPLETING };State currentState;MessageQueue queue;std::string accountNumber;std::string pin;// 处理消息的主循环void processMessages() {while (true) {Message msg = queue.pop();switch (currentState) {case State::IDLE:handleIdleState(msg);break;case State::WAITING_FOR_PIN:handlePinState(msg);break;case State::PROCESSING:handleProcessingState(msg);break;case State::COMPLETING:handleCompletingState(msg);break;}}}void handleIdleState(const Message& msg) {if (msg.type == MessageType::INSERT_CARD) {std::cout << "卡已插入\n";accountNumber = msg.data;std::cout << "账户: " << accountNumber << "\n";std::cout << "请输入4位PIN码: ";currentState = State::WAITING_FOR_PIN;}}void handlePinState(const Message& msg) {if (msg.type == MessageType::ENTER_PIN) {pin = msg.data;std::cout << "\nPIN码已接收\n";std::cout << "处理中...\n";// 模拟处理延迟std::this_thread::sleep_for(std::chrono::seconds(1));if (pin == "1234") { // 简单验证std::cout << "PIN验证成功\n";queue.push(Message(MessageType::PROCESS_TRANSACTION));} else {std::cout << "PIN验证失败\n";std::cout << "请取回您的卡\n";currentState = State::IDLE;return;}currentState = State::PROCESSING;}}void handleProcessingState(const Message& msg) {if (msg.type == MessageType::PROCESS_TRANSACTION) {std::cout << "交易处理中...\n";// 模拟交易处理std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "交易完成\n";queue.push(Message(MessageType::TRANSACTION_COMPLETE));currentState = State::COMPLETING;}}void handleCompletingState(const Message& msg) {if (msg.type == MessageType::TRANSACTION_COMPLETE) {std::cout << "请取回您的卡\n";std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "欢迎再次使用\n\n";currentState = State::IDLE;}}public:ATM() : currentState(State::IDLE) {std::cout << "ATM初始化完成,等待银行卡插入...\n";}void start() {std::thread([this] { processMessages(); }).detach();}void insertCard(const std::string& account) {queue.push(Message(MessageType::INSERT_CARD, account));}void enterPin(const std::string& pinCode) {queue.push(Message(MessageType::ENTER_PIN, pinCode));}
};// 模拟用户操作
void simulateUser(ATM& atm) {std::this_thread::sleep_for(std::chrono::seconds(1));// 插入银行卡std::cout << "用户插入银行卡...\n";atm.insertCard("1234-5678-9012-3456");// 等待ATM提示输入PINstd::this_thread::sleep_for(std::chrono::seconds(2));// 输入PINstd::cout << "用户输入PIN码: 1234\n";atm.enterPin("1234");// 等待交易完成std::this_thread::sleep_for(std::chrono::seconds(5));// 开始新的交易std::cout << "\n开始新的交易...\n";std::this_thread::sleep_for(std::chrono::seconds(1));// 插入另一张银行卡std::cout << "用户插入银行卡...\n";atm.insertCard("9876-5432-1098-7654");// 等待ATM提示输入PINstd::this_thread::sleep_for(std::chrono::seconds(2));// 输入错误PINstd::cout << "用户输入错误PIN码: 9999\n";atm.enterPin("9999");
}int main() {ATM atm;atm.start();// 启动用户模拟simulateUser(atm);// 保持程序运行std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}
优化说明
-
明确定义的消息类型:
- 使用
MessageType
枚举明确区分不同类型的消息 - 使用
Message
结构体封装消息内容和类型
- 使用
-
改进的消息队列:
- 使用标准库的
queue
容器存储消息 - 使用条件变量实现高效的消息等待机制
- 确保线程安全的操作
- 使用标准库的
-
清晰的状态机设计:
- 明确定义ATM的各个状态
- 每个状态有专门的消息处理函数
- 状态转换逻辑清晰
-
更好的用户交互模拟:
- 模拟完整的用户操作流程
- 包含正确和错误的PIN码输入场景
这个实现更好地展示了消息传递模型的核心概念:不同的线程通过传递消息进行通信,而不是共享状态。ATM状态机和用户模拟在不同的逻辑上下文中运行,通过消息队列进行交互。
另一个消息队列结合并发编程的例子
#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <map>
#include <atomic>
#include <sstream>
#include <iomanip>
#include <random>
#include <memory>
#include <future>
#include <functional>
#include <unordered_map>
#include <unordered_set>
class ErrorState;
class PrintingState;
class DispensingState;
class ProcessingState;
class WaitingForPinState;
class MaintenanceState;
class ReadingCardState;
class IdleState;
// ========================= Async Logger =========================
class Logger {
public:enum class Level { DEBUG, INFO, WARNING, ERROR };explicit Logger(Level lvl = Level::INFO) : level(lvl), running(true) {worker = std::thread([this] { this->process(); });}~Logger() {running = false;cv.notify_all();if (worker.joinable()) worker.join();}void setLevel(Level lvl) { level = lvl; }void log(Level lvl, const std::string& msg) {if (lvl < level) return;std::lock_guard<std::mutex> lk(mtx);queue.emplace(format(lvl, msg));cv.notify_one();}private:Level level;std::atomic<bool> running{ false };std::queue<std::string> queue;std::mutex mtx;std::condition_variable cv;std::thread worker;static const char* to_cstr(Level l) {switch (l) {case Level::DEBUG: return "DEBUG";case Level::INFO: return "INFO";case Level::WARNING: return "WARN";case Level::ERROR: return "ERROR";}return "UNK";}static std::string now_str() {using namespace std::chrono;auto now = system_clock::now();auto t = system_clock::to_time_t(now);auto ms = duration_cast<milliseconds>(now.time_since_epoch()) % 1000;std::ostringstream oss;oss << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S")<< "." << std::setw(3) << std::setfill('0') << ms.count();return oss.str();}static std::string format(Level lvl, const std::string& msg) {std::ostringstream oss;oss << "[" << now_str() << "] [" << to_cstr(lvl) << "] " << msg;return oss.str();}void process() {std::unique_lock<std::mutex> lk(mtx);while (running) {cv.wait(lk, [this] { return !queue.empty() || !running; });while (!queue.empty()) {std::cout << queue.front() << std::endl;queue.pop();}}while (!queue.empty()) {std::cout << queue.front() << std::endl;queue.pop();}}
};// ========================= Messages =========================
enum class MessageType {INSERT_CARD,ENTER_PIN,PROCESS_TRANSACTION,TRANSACTION_COMPLETE,EJECT_CARD,TIMEOUT,CANCEL,SELECT_AMOUNT,DISPENSE_CASH,PRINT_RECEIPT,SYSTEM_ERROR,MAINTENANCE_MODE,STOP
};struct Message {MessageType type;std::string data;std::chrono::system_clock::time_point ts{ std::chrono::system_clock::now() };uint64_t seq{ 0 };
};// ========================= Stable Priority Queue =========================
class PriorityMessageQueue {struct Item {Message msg;int prio; // lower number = higher priorityuint64_t order; // tie-breaker: FIFO within same prioritybool operator<(const Item& other) const {if (prio == other.prio) return order > other.order; // min-heap behaviorreturn prio > other.prio;}};std::priority_queue<Item> pq;std::mutex mtx;std::condition_variable cv;std::atomic<uint64_t> counter{ 0 };
public:void push(const Message& m, int prio = 5) {std::lock_guard<std::mutex> lk(mtx);pq.push(Item{ m, prio, counter++ });cv.notify_one();}Message pop() {std::unique_lock<std::mutex> lk(mtx);cv.wait(lk, [this] { return !pq.empty(); });auto it = pq.top(); pq.pop();return it.msg;}void notify() { cv.notify_all(); }
};// ========================= Timer (cancelable) =========================
class Timer {
public:Timer() = default;~Timer() { cancel(); }template<class Rep, class Period, class Fn>void start_after(std::chrono::duration<Rep, Period> d, Fn f) {cancel();stop = false;th = std::thread([this, d, f] {std::unique_lock<std::mutex> lk(mtx);if (cv.wait_for(lk, d, [this] { return stop.load(); })) return; // canceledtry { f(); }catch (...) {}});}void cancel() {stop = true;cv.notify_all();if (th.joinable()) th.join();}private:std::thread th;std::mutex mtx;std::condition_variable cv;std::atomic<bool> stop{ false };
};// ========================= Domain: Account/Bank/Hardware =========================
class Account {
public:Account() = default;Account(std::string num, std::string pin, double bal): number(std::move(num)), pinCode(std::move(pin)), balance(bal) {}bool verifyPin(const std::string& pin) const { return pin == pinCode; }bool withdraw(double amt) { if (amt > balance) return false; balance -= amt; return true; }double getBalance() const { return balance; }const std::string& getNumber() const { return number; }
private:std::string number;std::string pinCode;double balance{ 0.0 };
};class BankService {
public:BankService() {accounts.emplace("123456789", Account("123456789", "1234", 1000.0));accounts.emplace("987654321", Account("987654321", "4321", 2500.0));accounts.emplace("111111111", Account("111111111", "1111", 500.0));}bool verifyAccount(const std::string& acc, const std::string& pin) {std::lock_guard<std::mutex> lk(mtx);auto it = accounts.find(acc);simulate_latency(120);return it != accounts.end() && it->second.verifyPin(pin);}bool processWithdrawal(const std::string& acc, double amt) {std::lock_guard<std::mutex> lk(mtx);auto it = accounts.find(acc);simulate_latency(220);return it != accounts.end() && it->second.withdraw(amt);}double getBalance(const std::string& acc) {std::lock_guard<std::mutex> lk(mtx);auto it = accounts.find(acc);simulate_latency(140);return it == accounts.end() ? -1.0 : it->second.getBalance();}private:std::map<std::string, Account> accounts;std::mutex mtx;std::default_random_engine gen{ std::random_device{}() };std::uniform_int_distribution<int> jitter{ 0,60 };void simulate_latency(int base_ms) { std::this_thread::sleep_for(std::chrono::milliseconds(base_ms + jitter(gen))); }
};class HardwareInterface {
public:bool ejectCard() { std::cout << "硬件: 弹出银行卡" << std::endl; return true; }bool dispenseCash(double amt) {std::cout << "硬件: 分发现金 $" << amt << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));return true;}bool printReceipt(const std::string& details) { std::cout << "硬件: 打印收据 - " << details << std::endl; return true; }bool readCard(std::string& acc) { std::this_thread::sleep_for(std::chrono::milliseconds(400)); acc = "123456789"; return true; }
};// Forward decl
class ATM;// ========================= State Pattern =========================
class IATMState {
public:virtual ~IATMState() = default;virtual void handle(ATM& atm, const Message& msg) = 0;virtual const char* name() const = 0;
};// ========================= ATM =========================
class ATM {
public:ATM() : logger(Logger::Level::INFO) {setState(make_state<IdleState>());worker = std::thread([this] { loop(); });}~ATM() {running = false;// Cancel timers and wake queuefor (auto& kv : timers) kv.second.cancel();msgQueue.push(Message{ MessageType::STOP }, 0);msgQueue.notify();if (worker.joinable()) worker.join();// wait async tasksfor (auto& f : futures) f.wait();}// API exposed to outside worldvoid insertCard() { logger.log(Logger::Level::INFO, "用户插入银行卡"); post({ MessageType::INSERT_CARD }); }void enterPin(const std::string& pin) { logger.log(Logger::Level::INFO, "用户输入PIN码"); post({ MessageType::ENTER_PIN, pin }); }void cancel() { logger.log(Logger::Level::INFO, "用户取消操作"); post({ MessageType::CANCEL }); }void setMaintenanceMode(bool enable) { logger.log(Logger::Level::INFO, enable ? "切换到维护模式" : "退出维护模式"); post({ MessageType::MAINTENANCE_MODE, enable ? "on" : "off" }, 0); }// State/Helpers used by statestemplate<class S, class...Args>std::unique_ptr<IATMState> make_state(Args&&...args) { return std::make_unique<S>(std::forward<Args>(args)...); }void setState(std::unique_ptr<IATMState> st) {const char* old = state ? state->name() : "<none>";const char* nw = st ? st->name() : "<none>";logger.log(Logger::Level::INFO, std::string("状态转换: ") + old + " -> " + nw);// cancel state-scoped timersfor (auto& k : stateTimerKeys) timers[k].cancel();stateTimerKeys.clear();state = std::move(st);}void post(const Message& m, int prio = 5) { msgQueue.push(m, prio); }// Async ops used by states (managed, no detach)void readCardAsync() {futures.emplace_back(std::async(std::launch::async, [this] {std::string acc;if (hardware.readCard(acc)) post(Message{ MessageType::ENTER_PIN, acc }, 1);else post(Message{ MessageType::SYSTEM_ERROR }, 1);}));}void verifyPinAsync(const std::string& acc, const std::string& pin) {futures.emplace_back(std::async(std::launch::async, [this, acc, pin] {if (bank.verifyAccount(acc, pin)) post(Message{ MessageType::PROCESS_TRANSACTION }, 1);else { std::cout << "PIN码错误,请取回您的卡" << std::endl; hardware.ejectCard(); post(Message{ MessageType::EJECT_CARD }, 1); }}));}void getBalanceAsync(const std::string& acc, double want) {futures.emplace_back(std::async(std::launch::async, [this, acc, want] {double bal = bank.getBalance(acc);if (bal >= want) post(Message{ MessageType::PROCESS_TRANSACTION }, 1);else { std::cout << "余额不足,当前余额: $" << bal << "\n请选择较小金额或取消交易" << std::endl; }}));}void withdrawAsync(const std::string& acc, double amt, const std::string& txnId) {futures.emplace_back(std::async(std::launch::async, [this, acc, amt, txnId] {if (bank.processWithdrawal(acc, amt)) post(Message{ MessageType::DISPENSE_CASH, txnId }, 1);else { std::cout << "交易失败,余额不足" << std::endl; post(Message{ MessageType::EJECT_CARD }, 1); }}));}void dispenseAsync(double amt) {futures.emplace_back(std::async(std::launch::async, [this, amt] {if (hardware.dispenseCash(amt)) post(Message{ MessageType::PRINT_RECEIPT }, 1);else post(Message{ MessageType::SYSTEM_ERROR }, 1);}));}void printReceiptAsync(const std::string& details) {futures.emplace_back(std::async(std::launch::async, [this, details] {if (hardware.printReceipt(details)) post(Message{ MessageType::TRANSACTION_COMPLETE }, 1);else post(Message{ MessageType::SYSTEM_ERROR }, 1);}));}// State-scoped timer utilitiestemplate<class Rep, class Period>void startStateTimer(const std::string& key, std::chrono::duration<Rep, Period> d, MessageType type) {stateTimerKeys.insert(key);timers[key].start_after(d, [this, type] { post(Message{ type }, 2); });}void cancelStateTimer(const std::string& key) { timers[key].cancel(); stateTimerKeys.erase(key); }// Expose components to states (read-only / controlled)Logger logger; // public for convenience inside statesHardwareInterface hardware;BankService bank;// session datastd::string accountNumber;std::string pin;double amount{ 0.0 };// Utilitystd::string newTxnId() {static std::atomic<uint64_t> c{ 0 };auto now = std::chrono::system_clock::now();auto t = std::chrono::system_clock::to_time_t(now);std::ostringstream ss;ss << "TXN" << std::put_time(std::localtime(&t), "%Y%m%d%H%M%S") << std::setw(6) << std::setfill('0') << c++;return ss.str();}private:std::unique_ptr<IATMState> state;std::atomic<bool> running{ true };PriorityMessageQueue msgQueue;std::thread worker;std::vector<std::future<void>> futures;std::unordered_map<std::string, Timer> timers;std::unordered_set<std::string> stateTimerKeys;void loop() {logger.log(Logger::Level::INFO, "ATM消息处理循环启动");while (running) {Message m = msgQueue.pop();if (m.type == MessageType::STOP) break;if (state) state->handle(*this, m);}logger.log(Logger::Level::INFO, "ATM消息处理循环结束");}
};// ========================= Concrete States =========================
class IdleState : public IATMState {
public:const char* name() const override { return "IDLE"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::INSERT_CARD) {atm.logger.log(Logger::Level::INFO, "检测到银行卡插入");atm.setState(atm.make_state<ReadingCardState>());atm.startStateTimer("readcard_timeout", std::chrono::seconds(5), MessageType::TIMEOUT);atm.readCardAsync();}else if (msg.type == MessageType::MAINTENANCE_MODE && msg.data == "on") {atm.setState(atm.make_state<MaintenanceState>());}}
};class ReadingCardState : public IATMState {
public:const char* name() const override { return "READING_CARD"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::ENTER_PIN) {atm.cancelStateTimer("readcard_timeout");atm.accountNumber = msg.data;std::cout << "请请输入4位PIN码: " << std::flush;atm.setState(atm.make_state<WaitingForPinState>());atm.startStateTimer("pin_timeout", std::chrono::seconds(10), MessageType::TIMEOUT);}else if (msg.type == MessageType::TIMEOUT) {std::cout << "读卡超时,请取回您的卡" << std::endl;atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}else if (msg.type == MessageType::SYSTEM_ERROR) {atm.setState(atm.make_state<ErrorState>());}}
};class WaitingForPinState : public IATMState {
public:const char* name() const override { return "WAITING_FOR_PIN"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::ENTER_PIN) {atm.cancelStateTimer("pin_timeout");atm.pin = msg.data;atm.setState(atm.make_state<ProcessingState>());atm.verifyPinAsync(atm.accountNumber, atm.pin);}else if (msg.type == MessageType::TIMEOUT) {std::cout << "输入超时,请取回您的卡" << std::endl;atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}else if (msg.type == MessageType::CANCEL) {std::cout << "操作已取消,请取回您的卡" << std::endl;atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}}
};class ProcessingState : public IATMState {
public:const char* name() const override { return "PROCESSING"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::PROCESS_TRANSACTION) {// 真实系统为UI输入,这里固定100atm.amount = 100.0;atm.logger.log(Logger::Level::INFO, std::string("交易金额选择: $") + std::to_string(atm.amount));// 检查余额 -> 提现std::string txn = atm.newTxnId();atm.withdrawAsync(atm.accountNumber, atm.amount, txn);atm.setState(atm.make_state<DispensingState>());}else if (msg.type == MessageType::EJECT_CARD) {atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}}
};class ErrorState : public IATMState {
public:const char* name() const override { return "ERROR"; }void handle(ATM& atm, const Message& msg) override {(void)msg;std::cout << "系统错误,请联系银行" << std::endl;atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}
};class DispensingState : public IATMState {
public:const char* name() const override { return "DISPENSING"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::DISPENSE_CASH) {atm.dispenseAsync(atm.amount);atm.setState(atm.make_state<PrintingState>());}else if (msg.type == MessageType::SYSTEM_ERROR) {atm.setState(atm.make_state<ErrorState>());}else if (msg.type == MessageType::EJECT_CARD) {atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}}
};class CompletingState : public IATMState {
public:const char* name() const override { return "COMPLETING"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::TRANSACTION_COMPLETE) {atm.logger.log(Logger::Level::INFO, "交易完成");std::cout << "交易完成,请取回您的卡和现金" << std::endl;atm.hardware.ejectCard();atm.setState(atm.make_state<IdleState>());}}
};class MaintenanceState : public IATMState {
public:const char* name() const override { return "MAINTENANCE"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::MAINTENANCE_MODE && msg.data == "off") {atm.setState(atm.make_state<IdleState>());}}
};
class PrintingState : public IATMState {
public:const char* name() const override { return "PRINTING"; }void handle(ATM& atm, const Message& msg) override {if (msg.type == MessageType::PRINT_RECEIPT) {std::string details = std::string("交易号: ") + atm.newTxnId() + ", 账户: " + atm.accountNumber + ", 金额: $" + std::to_string(atm.amount);atm.printReceiptAsync(details);atm.setState(atm.make_state<CompletingState>());}else if (msg.type == MessageType::SYSTEM_ERROR) {atm.setState(atm.make_state<ErrorState>());}}
};// ========================= Simulation =========================
static void simulateUser(ATM& atm) {using namespace std::chrono_literals;std::this_thread::sleep_for(1s);std::cout << "用户插入银行卡..." << std::endl;atm.insertCard();std::this_thread::sleep_for(2s);std::cout << "用户输入PIN码: 1234" << std::endl;atm.enterPin("1234");std::this_thread::sleep_for(10s);std::cout << "\n开始新的交易..." << std::endl;std::this_thread::sleep_for(1s);std::cout << "用户插入银行卡..." << std::endl;atm.insertCard();std::this_thread::sleep_for(2s);std::cout << "用户输入错误PIN码: 9999" << std::endl;atm.enterPin("9999");std::this_thread::sleep_for(2s);std::cout << "用户取消操作" << std::endl;atm.cancel();
}int main() {ATM atm;std::thread user([&] { simulateUser(atm); });using namespace std::chrono_literals;std::this_thread::sleep_for(25s);std::cout << "技术人员启用维护模式" << std::endl;atm.setMaintenanceMode(true);std::this_thread::sleep_for(5s);std::cout << "技术人员禁用维护模式" << std::endl;atm.setMaintenanceMode(false);user.join();std::this_thread::sleep_for(5s);return 0;
}
好的 👍
我来分两部分回答:
① 对这份 ATM 重构代码的评价
这份代码已经相当规范,体现了工业级设计思路:
-
职责分离清晰
Logger
独立为异步组件,避免阻塞业务。PriorityMessageQueue
只负责消息的有序调度,不掺杂业务逻辑。ATM
使用 状态模式,每个状态独立类封装逻辑,替代了臃肿的switch-case
。
-
并发处理合理
- 异步任务 (
std::async
) 用于模拟银行后台和硬件操作,避免阻塞主消息循环。 - 消息队列是 单一入口,保证了状态机的线程安全。
- 不再使用
detach
,析构时能安全等待所有异步任务结束。
- 异步任务 (
-
扩展性好
- 新增消息类型(如转账、查询余额)只需加枚举 + 状态类,不必修改庞大函数。
- 可以方便地插入日志策略(写文件/网络)、硬件驱动模拟或真实实现。
-
潜在改进点
- 目前消息优先级是整数(0=最高),可以考虑用枚举或
constexpr
让优先级更语义化。 std::async
在高并发时可能创建过多线程,后续可以替换成线程池。- 日志现在只输出到
stdout
,可以扩展为多后端(文件、syslog)。 - 没有实现交易失败后的“吞卡/锁卡”逻辑。
- 目前消息优先级是整数(0=最高),可以考虑用枚举或
整体评价:代码结构清晰,线程安全,工程化思维强,适合作为教学或真实 ATM 仿真系统的原型。
② 消息队列简介
在这份代码里,ATM 的核心通信机制就是 PriorityMessageQueue
。它的作用是:
-
异步解耦
- 用户操作(插卡/输入 PIN)、硬件事件(吐钞/打印收据)、后台返回(验证成功/失败)都会转化为一个 消息对象。
- 这些消息不直接调用状态机,而是放进队列,由专门的线程统一处理。
-
线程安全
- 队列内部用
std::mutex
+std::condition_variable
保护。 - 多个线程可以安全地
push
消息,但只有 ATM 的主循环在pop
。 - 避免了多线程同时修改 ATM 状态的风险。
- 队列内部用
-
优先级控制
-
不同消息有不同优先级,例如:
- 系统错误、维护模式 → 最高优先级
- 正常交易 → 中优先级
- 超时提醒 → 低优先级
-
使用 小顶堆改造的大顶堆(
priority_queue
)保证高优先级消息先被处理。
-
-
时序保证
- 同一优先级下,按入队顺序处理(利用了消息的
sequenceId
)。 - 这样既保证了高优先级抢占,又不会乱序。
- 同一优先级下,按入队顺序处理(利用了消息的
👉 这样做的好处是:
ATM 不管有多少个线程在跑(硬件、银行服务、用户输入),都不会直接打断 ATM 的状态机逻辑,而是通过队列串行化调度,状态机只需要关心“下一条消息是什么”,大大降低了并发复杂度。
“ATM 消息队列工作流程图”
好嘞 ✅ 我帮你画一张 ATM 消息队列工作流程图,用 Mermaid 表示:
flowchart TDsubgraph 用户与硬件A1[插卡事件]A2[输入 PIN]A3[取款请求]A4[超时/错误信号]endsubgraph 消息队列B1[Push 消息\n(带优先级)]B2[(PriorityMessageQueue\n线程安全队列)]B3[Pop 消息]endsubgraph ATM 状态机C1[等待插卡状态]C2[验证 PIN 状态]C3[选择交易状态]C4[取款状态]C5[结束/返回等待]endsubgraph 后台服务D1[银行验证 PIN]D2[银行账户扣款]end%% 事件流A1 -->|生成消息| B1A2 -->|生成消息| B1A3 -->|生成消息| B1A4 -->|生成消息| B1B1 --> B2 --> B3 -->|dispatch| C1B3 -->|dispatch| C2B3 -->|dispatch| C3B3 -->|dispatch| C4B3 -->|dispatch| C5C2 -->|请求验证| D1 -->|结果消息入队| B1C4 -->|请求扣款| D2 -->|结果消息入队| B1
这张图描述了整个消息流转过程:
- 用户或硬件产生事件(插卡、输入 PIN、取款请求、超时等)。
- 事件封装成 消息对象,
push
到 线程安全优先级消息队列。 - ATM 的 主循环 从队列
pop
消息,分发给对应的 状态对象。 - 状态机根据消息决定下一步逻辑,可能向 后台银行服务 发起请求。
- 银行返回的结果再次通过 消息队列 送回 ATM 状态机,进入下一步。
要不要我再帮你画一个 时序图(Mermaid sequenceDiagram),展示“取款操作从插卡到取钱”的完整交互过程?
展示“取款操作从插卡到取钱”的完整交互过程
好的 👍 那我再帮你画一个 取款操作的时序图,用 Mermaid 表示:
这个时序图展示了 一次完整取款流程 的消息交互:
- 用户插卡 → 消息队列转发给 ATM 状态机 → 状态机进入验证 PIN。
- 用户输入 PIN → 状态机转发到银行后台 → 银行返回结果 → 状态机进入选择交易。
- 用户选择取款金额 → ATM 状态机向银行扣款 → 银行返回结果 → 状态机控制硬件吐钞。
- 最后 ATM 回到等待插卡状态。
要不要我再帮你扩展一下,加上 超时消息(比如用户输入 PIN 超时返回初始状态),让流程图更贴近真实 ATM?