当前位置: 首页 > news >正文

Python10天突击--Day 2: 实现观察者模式

以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:


1. 经典观察者模式实现

from abc import ABC, abstractmethod
from typing import List, Any

class Observer(ABC):
    """观察者抽象基类"""
    @abstractmethod
    def update(self, subject: Any) -> None:
        pass

class Subject:
    """被观察对象基类"""
    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer) -> None:
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        try:
            self._observers.remove(observer)
        except ValueError:
            pass

    def notify(self) -> None:
        """同步通知所有观察者"""
        for observer in self._observers:
            observer.update(self)

# 使用示例
class TemperatureSensor(Subject):
    """具体被观察者:温度传感器"""
    def __init__(self):
        super().__init__()
        self._temperature = 0.0

    @property
    def temperature(self) -> float:
        return self._temperature

    @temperature.setter
    def temperature(self, value: float) -> None:
        self._temperature = value
        self.notify()  # 温度变化时通知观察者

class Display(Observer):
    """具体观察者:显示屏"""
    def update(self, subject: TemperatureSensor) -> None:
        print(f"当前温度: {subject.temperature}°C")

# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)

sensor.temperature = 25.5  # 输出: 当前温度: 25.5°C

2. 线程安全增强版

import threading
from typing import List, Any

class ThreadSafeSubject:
    """线程安全的被观察对象"""
    def __init__(self):
        self._observers: List[Observer] = []
        self._lock = threading.RLock()

    def attach(self, observer: Observer) -> None:
        with self._lock:
            if observer not in self._observers:
                self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        with self._lock:
            try:
                self._observers.remove(observer)
            except ValueError:
                pass

    def notify(self) -> None:
        """线程安全的通知"""
        with self._lock:
            observers = self._observers.copy()
        
        for observer in observers:
            observer.update(self)

3. 异步观察者模式

import asyncio
from abc import ABC, abstractmethod
from typing import List, Any

class AsyncObserver(ABC):
    """异步观察者接口"""
    @abstractmethod
    async def update(self, subject: Any) -> None:
        pass

class AsyncSubject:
    """支持异步通知的被观察对象"""
    def __init__(self):
        self._observers: List[AsyncObserver] = []

    def attach(self, observer: AsyncObserver) -> None:
        if observer not in self._observers:
            self._observers.append(observer)

    async def notify(self) -> None:
        """异步通知所有观察者"""
        await asyncio.gather(
            *[observer.update(self) for observer in self._observers]
        )

# 使用示例
class AsyncTemperatureSensor(AsyncSubject):
    def __init__(self):
        super().__init__()
        self._temp = 0.0

    async def set_temperature(self, value: float) -> None:
        self._temp = value
        await self.notify()

class CloudLogger(AsyncObserver):
    async def update(self, subject: AsyncTemperatureSensor) -> None:
        print(f"云端记录温度: {subject._temp}°C")
        await asyncio.sleep(0.1)  # 模拟网络请求

async def main():
    sensor = AsyncTemperatureSensor()
    sensor.attach(CloudLogger())
    await sensor.set_temperature(28.5)  # 输出: 云端记录温度: 28.5°C

asyncio.run(main())

4. 事件总线实现(发布-订阅模式)

from typing import Dict, List, Callable, Any
import inspect

class EventBus:
    """事件总线(高级观察者模式)"""
    _instance = None

    def __new__(cls):
        if not cls._instance:
            cls._instance = super().__new__(cls)
            cls._instance._subscriptions: Dict[str, List[Callable]] = {}
        return cls._instance

    def subscribe(self, event_type: str, callback: Callable) -> None:
        if not inspect.iscoroutinefunction(callback):
            callback = self._sync_to_async(callback)
            
        if event_type not in self._subscriptions:
            self._subscriptions[event_type] = []
        self._subscriptions[event_type].append(callback)

    async def publish(self, event_type: str, **data) -> None:
        if event_type in self._subscriptions:
            await asyncio.gather(
                *[callback(**data) for callback in self._subscriptions[event_type]]
            )

    @staticmethod
    def _sync_to_async(func: Callable) -> Callable:
        async def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper

# 使用示例
bus = EventBus()

@bus.subscribe("temperature_change")
async def log_temp_change(value: float):
    print(f"温度变化记录: {value}°C")

async def trigger_events():
    await bus.publish("temperature_change", value=30.0)

asyncio.run(trigger_events())  # 输出: 温度变化记录: 30.0°C

5. 带过滤器的观察者模式

from typing import Callable, Any

class FilteredObserver:
    """带条件过滤的观察者"""
    def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):
        self.callback = callback
        self.filter = filter_condition

    def update(self, subject: Any) -> None:
        if self.filter(subject):
            self.callback(subject)

# 使用示例
sensor = TemperatureSensor()

def alert(temp: float):
    print(f"警报!当前温度过高: {temp}°C")

# 只接收温度>30的通知
high_temp_observer = FilteredObserver(
    callback=alert,
    filter_condition=lambda s: s.temperature > 30
)

sensor.attach(high_temp_observer)
sensor.temperature = 25  # 无输出
sensor.temperature = 35  # 输出: 警报!当前温度过高: 35°C

方案对比

实现方式特点适用场景
经典实现简单直接单线程简单场景
线程安全版避免竞态条件多线程环境
异步实现非阻塞通知I/O密集型应用
事件总线松耦合,支持多对多复杂事件系统
过滤观察者条件触发需要选择性通知的场景

最佳实践建议

  1. 生命周期管理

    # 使用上下文管理器自动取消注册
    class ObserverContext:
        def __init__(self, subject: Subject, observer: Observer):
            self.subject = subject
            self.observer = observer
            
        def __enter__(self):
            self.subject.attach(self.observer)
            return self
            
        def __exit__(self, *args):
            self.subject.detach(self.observer)
    
    with ObserverContext(sensor, display):
        sensor.temperature = 20
    
  2. 性能优化

    • 对于高频事件,考虑使用弱引用(weakref.WeakSet
    • 批量通知时使用@dataclass封装事件数据
  3. 异常处理

    def safe_notify(self):
        for observer in self._observers:
            try:
                observer.update(self)
            except Exception as e:
                print(f"Observer failed: {e}")
    
  4. 与Python生态集成

    • 使用PyPubSub等现成库
    • 结合asyncio.Queue实现生产者-消费者模式

根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。

相关文章:

  • 【LeetCode 热题100】二叉树构造题精讲:前序 + 中序建树 有序数组构造 BST(力扣105 / 108)(Go语言版)
  • 基于SpringBoot的宠物健康咨询系统(源码+数据库+万字文档)
  • OpenHarmony5.0.2 USB摄像头适配
  • win11安装更新报错:我们无法更新系统保留分区
  • 【频域分析】包络分析
  • 【Scratch编程系列】程序积木-声音类
  • 【响应式编程】Reactor 常用操作符与使用指南
  • 资深词源学家提示词
  • VirtualBox虚拟机转换到VMware
  • 波束形成(BF)从算法仿真到工程源码实现-第六节-广义旁瓣消除算法(GSC)
  • Android Compose 权限申请完整指南
  • Embracing your shadows reveals the wholeness of your light.
  • Spring Cloud-负载均衡
  • docker进行打包
  • Vue3+Element Plus如何实现左树右表页面案例:即根据左边的树筛选右侧表功能实现
  • DIP支付方式改革下各种疾病医疗费用的影响以及分析方法研究综述
  • 【XCP实战】AUTOSAR架构下XCP从0到1开发配置实践
  • SDHC接口协议底层传输数据是安全的
  • Git 远程仓库
  • 设计模式(8)——SOLID原则之依赖倒置原则
  • 蒲慕明院士:未来数十年不是AI取代人,而是会用AI的人取代不会用的
  • 张国清将赴俄罗斯举行中俄“长江—伏尔加河”地方合作理事会第五次会议和“东北—远东”政府间合作委员会双方主席会晤
  • 中国军网:带你揭开3所新调整组建军队院校的神秘面纱
  • 龚正市长调研闵行区,更加奋发有为地稳增长促转型,久久为功增强发展后劲
  • 澳大利亚首例“漂绿”诉讼开庭:能源巨头因“碳中和”承诺遭起诉
  • 创同期历史新高!1至4月全国铁路发送旅客14.6亿人次