当前位置: 首页 > news >正文

并发设计模式实战系列(9):消息传递(Message Passing)

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第九章消息传递(Message Passing),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 消息传递架构

2. 并发控制关键

二、生活化类比:邮局系统

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置说明

四、横向对比表格

1. 并发通信模式对比

2. 消息队列实现对比

五、高级优化技巧

1. 批量消息处理

2. 优先级消息处理

3. 死信队列处理

4. 监控指标

六、分布式消息传递系统设计

1. 跨节点通信架构

2. 关键设计考量

七、消息模式高级变体

1. 发布/订阅模式实现

2. 请求-响应模式实现

八、性能优化深度策略

1. 零拷贝传输优化

2. 批处理与压缩

九、容错与可靠性保障

1. 消息持久化方案

2. 事务消息处理

十、现代消息模式演进

1. 响应式消息流

2. 事件溯源模式

十一、性能监控指标体系

1. 关键监控指标

2. 健康检查实现

十二、典型应用场景案例

1. 电商订单处理系统

2. 物联网数据处理


一、核心原理深度拆解

1. 消息传递架构

┌───────────────┐      ┌───────────────┐      ┌───────────────┐
│   Producer    │───>  │  Message Queue │───>  │   Consumer    │
│ (并发发送消息) │<───  │ (线程安全缓冲)  │<───  │ (并发处理消息)  │
└───────────────┘      └───────────────┘      └───────────────┘
  • 生产者-消费者解耦:通过消息队列实现松耦合
  • 线程安全通信:消息队列作为共享数据的唯一通道
  • 流量控制:队列容量限制防止系统过载

2. 并发控制关键

  • 无共享状态:各线程通过消息通信而非共享内存
  • 异步处理:生产者不等待消费者处理完成
  • 背压机制:队列满时生产者阻塞或采取其他策略

二、生活化类比:邮局系统

系统组件

现实类比

核心行为

Producer

寄信人

写好信投入邮箱

Message Queue

邮局分拣中心

暂存和分类信件

Consumer

邮递员

按区域派送信件

  • 突发流量处理:节假日大量信件 → 邮局暂存 → 邮递员按能力派送
  • 失败处理:无法投递的信件退回或进入死信队列

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.concurrent.*;
import java.util.function.Consumer;public class MessagePassingSystem<T> {// 消息队列(设置容量防止OOM)private final BlockingQueue<Message<T>> messageQueue;// 消费者线程池private final ExecutorService consumerPool;// 消息封装static class Message<T> {final T content;final Consumer<T> onSuccess;final Consumer<Exception> onError;Message(T content, Consumer<T> onSuccess, Consumer<Exception> onError) {this.content = content;this.onSuccess = onSuccess;this.onError = onError;}}public MessagePassingSystem(int queueSize, int consumerThreads) {this.messageQueue = new LinkedBlockingQueue<>(queueSize);this.consumerPool = Executors.newFixedThreadPool(consumerThreads, r -> {Thread t = new Thread(r);t.setDaemon(true);return t;});// 启动消费者startConsumers();}// 生产者APIpublic void send(T message, Consumer<T> onSuccess, Consumer<Exception> onError) throws InterruptedException {Message<T> msg = new Message<>(message, onSuccess, onError);messageQueue.put(msg); // 阻塞直到队列有空位}// 消费者处理private void startConsumers() {for (int i = 0; i < consumerPool.getCorePoolSize(); i++) {consumerPool.execute(() -> {while (!Thread.currentThread().isInterrupted()) {try {Message<T> message = messageQueue.take();processMessage(message);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});}}private void processMessage(Message<T> message) {try {// 模拟业务处理System.out.println("处理消息: " + message.content);Thread.sleep(100); // 模拟处理耗时// 处理成功回调if (message.onSuccess != null) {message.onSuccess.accept(message.content);}} catch (Exception e) {// 处理失败回调if (message.onError != null) {message.onError.accept(e);}}}public void shutdown() {consumerPool.shutdown();}public static void main(String[] args) throws Exception {// 创建消息系统:队列容量100,4个消费者线程MessagePassingSystem<String> mps = new MessagePassingSystem<>(100, 4);// 模拟生产者ExecutorService producers = Executors.newFixedThreadPool(8);for (int i = 0; i < 100; i++) {final int msgId = i;producers.execute(() -> {try {mps.send("消息-" + msgId, result -> System.out.println("处理成功: " + result),error -> System.err.println("处理失败: " + msgId + ", " + error));} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}producers.shutdown();producers.awaitTermination(1, TimeUnit.MINUTES);mps.shutdown();}
}

2. 关键配置说明

// 队列选择策略
BlockingQueue<Message<T>> queue = new LinkedBlockingQueue<>(100); // 有界队列
// 或
BlockingQueue<Message<T>> queue = new ArrayBlockingQueue<>(100); // 固定大小// 消费者线程池配置
ExecutorService consumerPool = new ThreadPoolExecutor(4, // 核心线程数8, // 最大线程数60, TimeUnit.SECONDS,new SynchronousQueue<>(), // 直接传递new ThreadPoolExecutor.CallerRunsPolicy() // 饱和策略
);

四、横向对比表格

1. 并发通信模式对比

模式

数据共享方式

线程安全保证

适用场景

共享内存

直接内存访问

需显式同步

高性能计算

消息传递

通过消息队列

队列内部保证

分布式/并发系统

Actor模型

通过消息邮箱

每个Actor单线程

高并发系统

数据流

通过流管道

管道内部保证

流处理系统

2. 消息队列实现对比

实现方式

特点

适用场景

BlockingQueue

JVM内内存队列

单机多线程通信

Kafka/RabbitMQ

分布式持久化队列

分布式系统

Disruptor

高性能无锁队列

低延迟高吞吐系统

ZeroMQ

网络消息库

跨进程通信

五、高级优化技巧

1. 批量消息处理

// 批量消费消息提升吞吐量
List<Message<T>> batch = new ArrayList<>(100);
messageQueue.drainTo(batch, 100); // 批量取出
if (!batch.isEmpty()) {consumerPool.execute(() -> processBatch(batch));
}

2. 优先级消息处理

// 使用优先级队列
BlockingQueue<Message<T>> queue = new PriorityBlockingQueue<>(100, (m1, m2) -> m1.priority - m2.priority
);

3. 死信队列处理

// 失败消息转入死信队列
private final BlockingQueue<Message<T>> deadLetterQueue = new LinkedBlockingQueue<>();private void processMessage(Message<T> message) {try {// ...正常处理} catch (Exception e) {deadLetterQueue.put(message); // 转入死信队列}
}

4. 监控指标

// 监控队列积压
int backlog = messageQueue.size();// 监控处理延迟
long enqueueTime = System.currentTimeMillis();
// 在消息处理时记录延迟
long latency = System.currentTimeMillis() - message.enqueueTime;

六、分布式消息传递系统设计

1. 跨节点通信架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Node 1     │───>│  Message    │───>│  Node 2     │
│ (Producer)  │<───│  Broker     │<───│ (Consumer)  │
└─────────────┘    └─────────────┘    └─────────────┘
  • 消息代理中间件:Kafka/RabbitMQ/RocketMQ
  • 序列化协议:Protobuf/Avro/JSON
  • 网络传输:TCP长连接/HTTP2/gRPC

2. 关键设计考量

// 分布式消息发送示例(伪代码)
public class DistributedSender {private final MessageQueueClient client;public void send(String topic, byte[] payload) {// 消息属性设置Message msg = new Message(topic, payload,System.currentTimeMillis(),DeliveryGuarantee.AT_LEAST_ONCE);// 异步发送+回调client.sendAsync(msg, new Callback() {@Overridepublic void onComplete(Result result) {if(!result.isSuccess()) {// 重试或记录死信retryOrDeadLetter(msg);}}});}
}

七、消息模式高级变体

1. 发布/订阅模式实现

// 基于Topic的消息路由
public class PubSubSystem {private final Map<String, List<Consumer>> topicSubscribers = new ConcurrentHashMap<>();public void subscribe(String topic, Consumer consumer) {topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(consumer);}public void publish(String topic, Message message) {List<Consumer> consumers = topicSubscribers.get(topic);if(consumers != null) {consumers.forEach(c -> c.accept(message));}}
}

2. 请求-响应模式实现

// 带关联ID的请求响应处理
public class RequestReplySystem {private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<>();private final ConcurrentMap<String, CompletableFuture<Response>> pendingRequests = new ConcurrentHashMap<>();// 请求方public CompletableFuture<Response> request(Request req) {CompletableFuture<Response> future = new CompletableFuture<>();pendingRequests.put(req.getCorrelationId(), future);requestQueue.offer(req);return future;}// 响应处理线程private void processResponse(Response resp) {CompletableFuture<Response> future = pendingRequests.remove(resp.getCorrelationId());if(future != null) {future.complete(resp);}}
}

八、性能优化深度策略

1. 零拷贝传输优化

// 使用ByteBuffer减少内存拷贝
public class ZeroCopyMessage {private final ByteBuffer payload;public void send(SocketChannel channel) throws IOException {while(payload.hasRemaining()) {channel.write(payload);}}
}

2. 批处理与压缩

// 消息批量压缩处理
public class BatchCompressor {public byte[] compressBatch(List<Message> messages) {try(ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(baos)) {for(Message msg : messages) {gzip.write(msg.serialize());}gzip.finish();return baos.toByteArray();}}
}

九、容错与可靠性保障

1. 消息持久化方案

// 基于WAL的持久化存储
public class MessageJournal {private final RandomAccessFile journalFile;private final AtomicLong writeOffset = new AtomicLong(0);public void append(Message msg) throws IOException {byte[] data = msg.serialize();synchronized(journalFile) {journalFile.seek(writeOffset.get());journalFile.writeInt(data.length);journalFile.write(data);journalFile.getFD().sync();writeOffset.addAndGet(4 + data.length);}}
}

2. 事务消息处理

// 两阶段提交实现
public class TransactionalProcessor {public void processWithTransaction(Message msg) {try {// 阶段1:预备boolean prepared = prepareResources(msg);// 阶段2:提交/回滚if(prepared) {commitTransaction(msg);sendAck(msg.getId());} else {rollbackTransaction(msg);sendNack(msg.getId());}} catch(Exception e) {// 补偿处理compensateTransaction(msg);}}
}

十、现代消息模式演进

1. 响应式消息流

// 基于Reactive Streams的实现
public class ReactiveMessageSystem {private final Flux<Message> messageStream;public ReactiveMessageSystem(MessageSource source) {this.messageStream = Flux.fromIterable(source).publishOn(Schedulers.parallel()).filter(msg -> isValid(msg)).map(this::transform);}public void subscribe(Consumer<Message> subscriber) {messageStream.subscribe(subscriber);}
}

2. 事件溯源模式

// 基于事件日志的状态重建
public class EventSourcedRepository {private final EventStore eventStore;public Entity get(String id) {List<Event> events = eventStore.getEvents(id);return recreateEntity(events);}private Entity recreateEntity(List<Event> events) {Entity entity = new Entity();events.forEach(e -> entity.apply(e));return entity;}
}

十一、性能监控指标体系

1. 关键监控指标

指标类别

具体指标

采集方式

吞吐量

消息发送/消费速率(msg/s)

计数器+时间窗口

延迟

端到端处理延迟(ms)

时间戳差值统计

可靠性

消息投递成功率(%)

成功/失败计数器

资源使用

内存/CPU/网络占用

系统监控接口

2. 健康检查实现

public class HealthChecker {private final MessageQueue queue;private final ThreadPoolExecutor executor;public HealthCheckResult check() {return new HealthCheckResult(queue.size() < queue.capacity() * 0.9,executor.getActiveCount() < executor.getMaximumPoolSize(),System.currentTimeMillis() - lastMessageTime < 5000);}
}

十二、典型应用场景案例

1. 电商订单处理系统

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 订单创建服务 │───>│ 订单消息队列 │───>│ 订单处理服务 │
└─────────────┘    └─────────────┘    └─────────────┘↓┌─────────────┐│ 支付服务     │└─────────────┘

2. 物联网数据处理

// 设备消息处理流水线
public class IoTMessagePipeline {public void buildPipeline() {// 1. 设备原始数据接收MessageSource source = new MqttSource();// 2. 构建处理流水线Pipeline pipeline = new PipelineBuilder().addStage(new DataValidation()).addStage(new DataNormalization()).addStage(new AnomalyDetection()).addStage(new StorageWriter()).build();// 3. 启动处理source.subscribe(pipeline::process);}
}

相关文章:

  • 废品回收小程序:全链路数字化解决方案,赋能绿色未来
  • TDengine 订阅不到数据问题排查
  • unity在编辑器模式调试音频卡顿电流声
  • 10.学习笔记-MyBatisPlus(P105-P110)
  • 水库现代化建设指南-水库运管矩阵管理系统建设方案
  • Android WIFI体系
  • Liunx安装Apache Tomcat
  • 数据一致性巡检总结:基于分桶采样的设计与实现
  • Linux CentOS 7 安装Apache 部署html页面
  • 松下机器人快速入门指南(2025年更新版)
  • python:sklearn 决策树(Decision Tree)
  • 当前HPLC载波无法满足全量数据分钟级采集需求的主要原因
  • 绿色版的notepad++怎么加入到右键菜单里
  • 深挖Java基础之:认识Java(创立空间/先导:Java认识)
  • 【Linux】第十四章 提高命令行效率
  • 使用 OpenCV 实现图像中心旋转
  • Nacos源码—2.Nacos服务注册发现分析三
  • DeepSeek 联手 Word,开启办公开挂模式
  • mac word接入deepseek
  • 经典算法 最长单调递增子序列
  • 坚持科技创新引领,赢得未来发展新优势
  • 俄罗斯纪念卫国战争胜利80周年阅兵式首次彩排在莫斯科举行
  • 首映|“凤凰传奇”曾毅:拍电影,我是认真的
  • 初步结果显示,卡尼领导的加拿大自由党在联邦众议院选举中获胜
  • 《奇袭白虎团》原型人物之一赵顺合辞世,享年95岁
  • 俄罗斯称已收复库尔斯克州