精读 C++20 设计模式:行为型设计模式——观察者模式
精读 C++20 设计模式:行为型设计模式——观察者模式
前言
观察者!这个是一个很有名的设计模式——简而言之,我们这个模式在关心对象的变化。当对象变化的时候,我们要触发点事情,这个怎么做呢?我们要放一个观察者,看着它:嘿对象变了处理点事情!这就是这个设计模式在做的事情。
Observer<T>
现在我们很关心Person的Age变化,甚至要求它变化的时候咱们就做点事情:
class Person
{int age;
public:void setAge(const int _age);
};
那根据前言,咱们就做点事情:
template<typename T>
struct Observer
{virtual void monitor_change(T& p, const std::string& what_changed) = 0;
};
之后咱们就可以:
struct PersonObserver : Observer<Person>
{void monitor_change(Person& p, const std::string& what_changed) override{if(what_changed == "age"){// process the sessions, like, print}}
};
甚至如果我们想要监控更多的属性,就可以采用多继承了(虽然不太建议)
Observable<T>
被监视对象也要支持被监控!这个事情很简单:
template<typename T>
struct Observeable
{void notify(T& src, const std::string& f_n){for(auto& o : obs)o->monitor_change(src, f_n);}// push/pop <-> subsrcibe / unsubsrcibe the obs vector
private:vector<Observer<T>*> obs;
};
连续观察者 + 被观察者
连续观察者是一个经典的设计场景:说白了就是:A 观察 B,B 观察 C;当 C 变化,B 收到通知并更新,从而触发 B 对 A 的通知 —— 这就是“连续观察者/被观察者”。听着没啥?但是问题没那么简单。
依赖问题(cycles / bounce)
第一个问题——能成为链式的话,能不能成为环呢?显然这是有风险的。A 观察 B,B 又观察 A。如果 A 改变 => 通知 B,B 的回调修改 A => 再通知 B => 无限循环。实战中这类问题会导致栈溢出或持续 CPU 占用。咋办呢?
- 我们完全可以触严苛化触发条件变化——在
setX()
前比较新旧值,只有真正变化才通知(我们在Person::setAge
中演示)。这是首选且最有效的方式。 - 我们根据自己的场景进行合并化:合并多个变化后一次性发出通知(coalesce),比如
begin_update()
/end_update()
模式,只有end_update()
时才通知。 - 在某些更新路径临时禁用通知(例如:
ScopedNotificationDisable
),完成后恢复并可选择是否发一次最终通知。 - 在 notify 路径中维护最大嵌套深度或使用版本号来防止同一事件反复传播(但往往是权宜之计,Overflow 检测机制)。
- 设计时避免互相观察;如果必需,明确哪端是“主要数据源”并在被动端做好防护
取消订阅 + 线程安全(并发场景)
// observable.hpp — 一个可复用的 Observable 实现
#pragma once
#include <functional>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <unordered_set>
#include <cstddef>
#include <memory>template<typename T>
class Observable {
public:using Callback = std::function<void(T&, const std::string&)>;// Subscription:RAII 风格(析构时自动取消)class Subscription {public:Subscription() = default;Subscription(size_t id, Observable* owner) : id_(id), owner_(owner) {}Subscription(const Subscription&) = delete;Subscription& operator=(const Subscription&) = delete;Subscription(Subscription&& o) noexcept { id_ = o.id_; owner_ = o.owner_; o.owner_ = nullptr; o.id_ = 0; }Subscription& operator=(Subscription&& o) noexcept {if (this != &o) { unsubscribe(); id_ = o.id_; owner_ = o.owner_; o.owner_ = nullptr; o.id_ = 0; }return *this;}~Subscription() { unsubscribe(); }void unsubscribe() {if (owner_) { owner_->unsubscribe(id_); owner_ = nullptr; id_ = 0; }}bool valid() const { return owner_ != nullptr; }private:size_t id_ = 0;Observable* owner_ = nullptr;};Observable() = default;~Observable() = default;// 订阅,返回 Subscription,析构或手动调用 unsubscribe 取消Subscription subscribe(Callback cb) {std::lock_guard lock(mutex_);const size_t id = next_id_++;if (in_notify_ > 0) {// 在 notify 中订阅,延迟加入(避免修改当前观察者集合)pending_add_.emplace_back(id, std::move(cb));} else {observers_.emplace(id, std::move(cb));}return Subscription{id, this};}// 直接按 id 取消(Subscription 会调用它)void unsubscribe(size_t id) {std::lock_guard lock(mutex_);if (in_notify_ > 0) {pending_remove_.insert(id);} else {observers_.erase(id);}}// 通知所有观察者(线程安全,可重入)void notify(T& src, const std::string& what_changed) {std::vector<Callback> snapshot;{std::lock_guard lock(mutex_);++in_notify_;snapshot.reserve(observers_.size());for (auto &kv : observers_) snapshot.push_back(kv.second);}// 调用回调(在外部 unlocked)for (auto &cb : snapshot) {try {cb(src, what_changed);} catch (...) {// 任意异常策略:不要让单个 observer 崩掉整个流程// 这里简单吞掉,也可记录日志}}// 结束通知,若是最外层 notify,应用挂起的增删{std::lock_guard lock(mutex_);--in_notify_;if (in_notify_ == 0) apply_pending_locked();}}private:void apply_pending_locked() {// 必须在持锁状态下调用for (auto &id : pending_remove_) observers_.erase(id);pending_remove_.clear();for (auto &p : pending_add_) observers_.emplace(p.first, std::move(p.second));pending_add_.clear();}private:std::mutex mutex_;std::unordered_map<size_t, Callback> observers_;std::vector<std::pair<size_t, Callback>> pending_add_;std::unordered_set<size_t> pending_remove_;size_t next_id_ = 1;int in_notify_ = 0; // notify 嵌套计数
};
上面的 Observable
实现已经做了线程安全的基本保障:subscribe
/ unsubscribe
/ notify
用 mutex
保护共享状态。notify
在外面调用回调,避免回调期间持锁(防止回调里阻塞导致其他线程无法订阅)。在 notify 中退订/订阅的请求会被延迟处理(放到 pending 集合),避免在迭代 observers_
时修改容器。
可重入性(Reentrancy)与嵌套通知
观察者在其回调中可能会再次修改 subject(例如 UI 在收到 age
更新后又调用 setAge()
进行校正)。这会导致嵌套 notify()
调用。我们的实现支持嵌套通知(in_notify_
计数器),并把对订阅集合的修改延迟到最外层通知完成。这样避免了在迭代容器时的并发修改崩溃。
但嵌套通知仍需要注意:
- 嵌套 notify 会再次发送 snapshot(包括可能仍存在的观察者),从而产生更深的调用栈与复杂的执行顺序。
- 若没有做好变更检测或抑制,很容易进入无限循环(见依赖问题)。
- 有时我们希望“递归通知即时看到新订阅”,有时又希望“通知期间新增的订阅不接收当前正在进行的事件”。上面实现选择后者(snapshot 在 notify 开始时产生),这是常见且可预期的行为。若你需要前者,设计会更复杂(需要在 notify 中读取到 pending add),但会导致回调里新增的观察者在本轮也收到通知,可能制造惊喜或风险。一般不推荐。
可选的“延迟操作队列”策略(示例思路):
- 把所有 subscribe/unsubscribe/其他修改放到队列中,在
notify
完成后、或在安全点统一执行。 - 这可以避免竞态并让通知视作原子操作,但也会增加延迟(订阅在本轮不会立即生效)。这是常见的 trade-off。
订阅并缓存状态的只读代理
在 GUI/渲染或大型系统中,一个常见模式是 View:订阅被观察对象并维护一份本地缓存,用于快速读取(避免每次访问都加锁或计算)。View 是观察者的一种具体用途。
#include <atomic>
#include <iostream>struct PersonView {std::atomic<int> cached_age{0};std::optional<Observable<Person>::Subscription> sub;void attach(Person& p) {// 订阅并更新缓存sub = p.changes.subscribe([this](Person& who, const std::string& f){if (f == "age") cached_age.store(who.age, std::memory_order_relaxed);});// 初始化缓存cached_age.store(p.age, std::memory_order_relaxed);}int age() const { return cached_age.load(std::memory_order_relaxed); }
};
优点:
- 快速读(无需每次从主对象加锁或计算)。
- 视图可以把更新批量化、格式化或做额外的衍生计算(例如显示字符串形式)。
注意:
- 缓存有时会过期(滞后),设计时需保证接受可接受的最终一致性。
- 若缓存需要严格一致性(强一致),就不能单纯用这种异步订阅方式,需要同步读取或在更新时做同步通知/等待。
总结
我们试图解决的问题
- 希望在对象状态变化时,通知多个关心该变化的组件(解耦发送者与接收者)。
- 要解决关注点分离(数据变化 VS. 响应逻辑),并支持运行时灵活绑定/解绑观察者。
- 需要在多线程 / UI / 组件化系统中安全、可控地传播变化。
我们如何解决
- 提供
Observable
抽象,允许注册回调/观察者,变更时notify
所有注册的观察者。 - 通过 RAII
Subscription
实现自动退订;通过weak_ptr
协助管理生命周期;通过 snapshot + pending queues 实现线程安全与 reentrancy-safe 的 notify。 - 通过变更检测、事务/抑制或批量通知应对循环依赖与性能问题。视图(View)模式把观察者的职责扩展为“订阅并缓存”以便快速读取。
优点
- 解耦:发送端不知道谁在监听;观察者可以独立演化。
- 灵活:运行时绑定/解绑,便于插件化架构与模块热插拔。
- 表达力强:适合 UI、事件总线、发布/订阅场景。
缺点(以及缓解)
- 生命周期与悬指针问题:观察者或被观察者被销毁会造成回调访问已释放内存。
- 缓解:使用
Subscription
(RAII),回调内使用weak_ptr
检查,或者在对象析构时先统一退订。
- 缓解:使用
- 循环依赖/无限回调:观察链可能产生循环触发。
- 缓解:做好变更检测、事务/批量更新、或明确禁止双向观察。
- 并发与性能问题:大量观察者和频繁通知可能导致拷贝开销或锁竞争。
- 缓解:snapshot + 延迟 apply 是通用的安全折中;性能敏感场景考虑 RCU/lock-free 数据结构或降低通知频率(采样/限流)。
- 语义复杂性(何时生效):新增订阅在当前通知中是否能收到事件,订阅/退订是否即时生效——不同实现会有不同语义,需在设计中明确。
- 缓解:在文档中明确语义(例如:本实现保证“订阅在本轮通知不会收到正在进行的事件”;退订被延迟到最外层通知完成时生效)