当前位置: 首页 > news >正文

WEB3全栈开发——面试专业技能点P5中间件

一、使用 Express 操作 Redis 的完整示例

涵盖 Redis 各种核心数据类型(String、Hash、List、Set、Sorted Set)及常见操作


1️⃣ 概念介绍

Redis 是一个开源的高性能键值对数据库,支持多种数据结构,非常适合缓存、排行榜、会话管理等场景。

Redis 支持的数据类型包括:

类型简介
String最基本的数据类型,支持数字/字符串存取
Hash类似对象,适合存储用户信息、对象结构
List链表结构,可做消息队列
Set无序唯一集合,常用于去重
Sorted Set有序唯一集合,常用于排行榜(带分数排序)

Node.js 下最常用的 Redis 客户端是 ioredis,支持连接池、集群等高级功能。


2️⃣ 安装依赖

npm install express ioredis

3️⃣ 初始化 Express + Redis 应用结构

project/
├── index.js
├── redisClient.js
└── package.json

4️⃣ 创建 Redis 客户端 redisClient.js

const Redis = require('ioredis');// 默认连接本地 Redis,如果你有密码或远程地址可以传入配置对象
const redis = new Redis({host: '127.0.0.1',port: 6379,// password: 'your-password', // 如果有密码
});module.exports = redis;

5️⃣ Express 应用 index.js:演示各种 Redis 数据类型操作

const express = require('express');
const redis = require('./redisClient');
const app = express();
app.use(express.json());/*** ============ String 类型 ============*/// 设置字符串
app.post('/string/set', async (req, res) => {const { key, value } = req.body;await redis.set(key, value);res.send(`设置成功:${key}=${value}`);
});// 获取字符串
app.get('/string/get/:key', async (req, res) => {const value = await redis.get(req.params.key);res.send(`值为:${value}`);
});/*** ============ Hash 类型 ============*/// 设置哈希字段
app.post('/hash/set', async (req, res) => {const { key, field, value } = req.body;await redis.hset(key, field, value);res.send(`哈希字段设置成功:${key}.${field}=${value}`);
});// 获取哈希字段
app.get('/hash/get', async (req, res) => {const { key, field } = req.query;const value = await redis.hget(key, field);res.send(`哈希字段值为:${value}`);
});// 获取整个哈希
app.get('/hash/all/:key', async (req, res) => {const hash = await redis.hgetall(req.params.key);res.json(hash);
});/*** ============ List 类型 ============*/// 从左侧推入列表
app.post('/list/leftpush', async (req, res) => {const { key, value } = req.body;await redis.lpush(key, value);res.send(`左侧推入成功:${value}`);
});// 从右侧弹出列表
app.get('/list/rightpop/:key', async (req, res) => {const value = await redis.rpop(req.params.key);res.send(`右侧弹出值:${value}`);
});/*** ============ Set 类型 ============*/// 添加到 Set 集合
app.post('/set/add', async (req, res) => {const { key, value } = req.body;await redis.sadd(key, value);res.send(`添加到集合成功:${value}`);
});// 获取 Set 所有成员
app.get('/set/members/:key', async (req, res) => {const members = await redis.smembers(req.params.key);res.json(members);
});/*** ============ Sorted Set 类型 ============*/// 添加成员和分数
app.post('/zset/add', async (req, res) => {const { key, score, member } = req.body;await redis.zadd(key, score, member);res.send(`添加到有序集合成功:${member}(分数${score})`);
});// 获取排行榜
app.get('/zset/range/:key', async (req, res) => {const list = await redis.zrevrange(req.params.key, 0, 9, 'WITHSCORES');res.json(list);
});const PORT = 3000;
app.listen(PORT, () => {console.log(`服务器启动:http://localhost:${PORT}`);
});

6️⃣ Postman 测试示例

使用 POSTGET 请求调用以下接口:

  • POST /string/set + JSON { "key": "user:1:name", "value": "Alice" }

  • GET /string/get/user:1:name

  • POST /hash/set + { "key": "user:2", "field": "email", "value": "a@example.com" }

  • GET /hash/all/user:2

  • POST /list/leftpush + { "key": "messages", "value": "hi" }

  • GET /list/rightpop/messages

  • POST /set/add + { "key": "tags", "value": "web3" }

  • GET /set/members/tags

  • POST /zset/add + { "key": "ranking", "score": 98, "member": "Bob" }

  • GET /zset/range/ranking


✅ 总结

数据类型操作方法(部分)使用场景
Stringset, get, incr缓存数据、计数器
Hashhset, hget, hgetall用户信息、配置项
Listlpush, rpop, lrange消息队列、时间轴
Setsadd, smembers, sismember去重、用户标签
Sorted Setzadd, zrange, zrevrange排行榜、权重排序

如需进一步加上缓存失效时间(EX)、分布式锁、订阅发布等高级操作,也可以继续告诉我,我可以补充完整!

二、Redis 在 Express 项目中实现 缓存失效时间(EX)分布式锁发布/订阅机制

1️⃣ 缓存失效时间(EX)

概念介绍

Redis 支持为 key 设置生存时间(TTL),超时后自动删除该键,常用于缓存数据。


示例代码

// 设置字符串并指定过期时间(单位:秒)
app.post('/cache/setex', async (req, res) => {const { key, value, ttl } = req.body;await redis.set(key, value, 'EX', ttl); // 'EX' 表示秒级过期时间res.send(`设置缓存成功:${key}=${value}(过期时间:${ttl}秒)`);
});// 查询 key 的剩余时间
app.get('/cache/ttl/:key', async (req, res) => {const ttl = await redis.ttl(req.params.key);res.send(`剩余时间:${ttl} 秒`);
});

2️⃣ 分布式锁

概念介绍

Redis 提供 SET key value NX EX ttl 命令可用于实现简易分布式锁。

  • NX: 仅当 key 不存在时设置,保证只有一个客户端加锁成功。

  • EX ttl: 设置锁过期时间,防止死锁。


示例代码

// 尝试获取锁
app.post('/lock/acquire', async (req, res) => {const { lockKey, lockValue, ttl } = req.body;const success = await redis.set(lockKey, lockValue, 'NX', 'EX', ttl);if (success) {res.send('✅ 成功获取锁');} else {res.status(423).send('❌ 获取锁失败,资源已被占用');}
});// 释放锁(注意:必须验证持有者)
app.post('/lock/release', async (req, res) => {const { lockKey, lockValue } = req.body;// 用 Lua 脚本确保只有锁持有者可以释放const luaScript = `if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("del", KEYS[1])elsereturn 0end`;const result = await redis.eval(luaScript, 1, lockKey, lockValue);res.send(result === 1 ? '✅ 成功释放锁' : '⚠️ 无权释放锁');
});

3️⃣ 发布/订阅(Pub/Sub)

概念介绍

Redis 提供消息发布订阅功能,常用于:

  • 服务间通信

  • 实时通知

  • 消息驱动架构


示例代码

消费端(订阅频道)

// 单独开一个 Redis 订阅连接
const Redis = require('ioredis');
const subClient = new Redis();subClient.subscribe('news', () => {console.log('📡 订阅了 news 频道');
});subClient.on('message', (channel, message) => {console.log(`📨 接收到频道 ${channel} 的消息: ${message}`);
});

生产端(发布消息)

// 发布消息到 news 频道
app.post('/publish/news', async (req, res) => {const { message } = req.body;await redis.publish('news', message);res.send('📤 消息已发布');
});

✅ 总结表

功能核心命令应用场景
缓存失效SET key value EX ttl缓存自动清理,热点数据缓存
分布式锁SET key val NX EX ttl + Lua控制资源访问,避免并发冲突
发布订阅PUBLISH/SUBSCRIBE实时通信、系统内事件通知


如需更高级用法(如 Redlock、Redis Stream 消息队列、异步任务持久化等),欢迎继续提问!也可以让我帮你把这些整合进一个生产级项目模板。

三、Express实现Redlock、Redis Stream 消息队列、异步任务持久化

下面详细讲解 Redlock 分布式锁算法Redis Streams 消息队列异步任务持久化 的概念、使用场景与详细代码示例(基于 Express + Node.js + ioredis):


1️⃣ Redlock 分布式锁算法

概念介绍

Redlock 是 Redis 官方推荐的分布式锁算法,适用于多 Redis 实例构建高可靠性锁。

  • 使用多个 Redis 节点写入同一锁键;

  • 获取大多数实例同意才算加锁成功;

  • 自动设置超时,防止死锁。

示例代码(基于 redlock 库)

npm install redlock ioredis
const Redis = require('ioredis');
const Redlock = require('redlock');const clients = [new Redis(), new Redis(), new Redis()]; // 模拟多个实例
const redlock = new Redlock(clients, {retryCount: 3,retryDelay: 200, // ms
});app.post('/redlock', async (req, res) => {try {const lock = await redlock.acquire(['locks:resource'], 10000); // 10秒锁console.log('✅ 获取Redlock成功');setTimeout(async () => {await lock.release();console.log('🔓 Redlock释放成功');}, 5000);res.send('Redlock锁定成功');} catch (err) {res.status(500).send('🔒 Redlock锁获取失败');}
});

2️⃣ Redis Streams 消息队列

概念介绍

Redis Stream 是 Redis 5.0+ 引入的持久化消息队列,支持:

  • 消息追加(XADD

  • 消费者组(XGROUP

  • 消费与确认(XREADGROUP, XACK

适合实现 可靠队列、异步任务处理、事件流系统


示例代码(任务生产+消费)

const streamKey = 'mystream';
const consumerGroup = 'mygroup';
const consumerName = 'consumer1';(async () => {try {await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');} catch (e) {if (!e.message.includes('BUSYGROUP')) throw e;}
})();// 生产消息
app.post('/stream/produce', async (req, res) => {const { orderId } = req.body;await redis.xadd(streamKey, '*', 'orderId', orderId);res.send('⏩ 消息已推入流');
});// 消费消息(自动应答)
setInterval(async () => {const result = await redis.xreadgroup('GROUP', consumerGroup, consumerName,'BLOCK', 2000,'COUNT', 1,'STREAMS', streamKey, '>');if (result) {const [[stream, messages]] = result;for (const [id, fields] of messages) {const orderId = fields[1];console.log('✅ 消费订单:', orderId);await redis.xack(streamKey, consumerGroup, id); // 标记已处理}}
}, 3000);

3️⃣ 异步任务持久化(基于队列)

概念介绍

将任务加入 Redis 队列并异步处理,可实现:

  • 防止阻塞主线程

  • 保证任务持久化存储

  • 支持失败重试、延时执行等

推荐使用 bull 或 bullmq,内置持久化、并发控制、失败重试等机制。


示例代码(使用 bull

npm install bull
const Queue = require('bull');
const jobQueue = new Queue('emailQueue', { redis: { port: 6379, host: '127.0.0.1' } });// 添加任务
app.post('/async/send-email', async (req, res) => {const { to, content } = req.body;await jobQueue.add({ to, content });res.send('📨 已加入异步发送任务队列');
});// 处理任务
jobQueue.process(async (job) => {console.log(`发送邮件给:${job.data.to},内容:${job.data.content}`);// 真实场景:集成 nodemailer、Mailgun、Sendgrid 等发送
});// 失败重试配置
jobQueue.on('failed', (job, err) => {console.error(`❌ 邮件发送失败:${job.id}`, err.message);
});

📌 总结对比表

技术类型特点与应用场景
Redlock分布式锁多节点锁协调,防止并发冲突与死锁
Redis Stream消息队列原生持久化、消费者组、高吞吐异步任务
Bull/BullMQ队列框架丰富功能,失败重试、延迟队列、状态可视化


如你需要我将三种方式整合成完整的 Express 微服务框架示例,也可以继续提问!

四、Express 使用布隆过滤器

下面是关于 Express 使用布隆过滤器(Bloom Filter) 的概念、详细代码示例和讲解,帮助你理解它在实际 Web 应用中的应用场景:


1️⃣ 概念介绍

布隆过滤器(Bloom Filter)是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中:

  • 优点:节省内存、查询速度快,适用于大规模去重、黑名单校验等场景。

  • 缺点:可能误判为存在(假阳性),但不会漏判(不会漏掉存在的元素)。

常用于:

  • 防止缓存穿透(查询不存在数据频繁打数据库)

  • 用户是否已提交过表单、请求

  • 黑名单过滤、URL 去重


2️⃣ 安装依赖

我们使用 bloom-filters 库:

npm install bloom-filters

3️⃣ 示例代码(Express + BloomFilter)

以下示例演示如何使用布隆过滤器拦截重复请求或缓存穿透:

// 引入布隆过滤器
const express = require('express');
const { BloomFilter } = require('bloom-filters');
const app = express();app.use(express.json());// 创建布隆过滤器,参数: 预计插入数量、可接受误差率
const bloom = new BloomFilter(1000, 0.01);// 示例:用户请求时检查是否访问过该资源
app.post('/api/resource', (req, res) => {const { userId } = req.body;// 检查是否已经请求过if (bloom.has(userId)) {return res.status(429).json({ message: '该用户已处理过请求,拒绝重复操作。' });}// 第一次访问,插入布隆过滤器bloom.add(userId);// 模拟处理业务逻辑console.log(`处理用户 ${userId} 的请求`);res.json({ message: '请求处理成功' });
});

4️⃣ 与 Redis 配合使用(持久化布隆过滤器)

如需跨服务共享布隆过滤器状态,可使用 Redis 存储:

npm install redis
const redis = require('redis');
const { BloomFilter } = require('bloom-filters');
const client = redis.createClient();// 将布隆过滤器序列化为 JSON 保存
const saveToRedis = async (key, filter) => {const json = JSON.stringify(filter.saveAsJSON());await client.set(key, json);
};// 从 Redis 读取布隆过滤器
const loadFromRedis = async (key) => {const json = await client.get(key);if (json) {return BloomFilter.fromJSON(JSON.parse(json));}return new BloomFilter(1000, 0.01);
};

5️⃣ 使用场景举例

使用场景示例
防止缓存穿透数据库中没有的 key 被频繁查询
请求重复提交拦截用户提交重复订单、重复点赞
URL 去重爬虫系统中防止重复抓取
黑名单判断IP、邮箱、手机号是否在黑名单中


✅ 小结

  • 布隆过滤器适合在高并发系统中快速过滤“不存在”的请求

  • 它不能代替数据库查重,但能大幅减少无效请求打到数据库/缓存

  • 结合 Redis 可以实现布隆过滤器的持久化与多节点共享。

五、Express 使用 RabbitMQ

以下是关于 Express 使用 RabbitMQ 的完整指南,包括:

  • 概念介绍

  • 安装依赖

  • 完整代码示例(发送、消费、连接关闭、持久化、确认机制等)

  • 实战讲解与注释


1️⃣ 概念介绍

RabbitMQ 是一个高性能的消息中间件(消息队列),用于实现异步通信、解耦模块、削峰填谷等。

它基于 AMQP(Advanced Message Queuing Protocol)协议,主要组件包括:

组件说明
Producer消息生产者,发送消息到队列
Queue消息队列,缓存待处理消息
Consumer消费者,监听并处理队列中的消息
Exchange交换机,决定消息路由到哪个队列(direct、fanout、topic、headers)
Routing Key消息携带的路由标识


2️⃣ 安装依赖

我们使用 amqplib 模块:

npm install amqplib

3️⃣ Express 集成 RabbitMQ:发送与消费消息(全流程)

项目结构:

/rabbitmq-demo
├── producer.js        # 发送消息
├── consumer.js        # 消费消息
└── app.js             # Express 接口入口

3.1 配置 RabbitMQ 地址

// config.js
module.exports = {RABBITMQ_URL: 'amqp://localhost', // 默认端口5672QUEUE_NAME: 'task_queue'
};

3.2 消息发送(Producer)

// producer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');async function sendToQueue(msg) {const conn = await amqp.connect(RABBITMQ_URL);const channel = await conn.createChannel();// 保证队列存在(幂等操作)await channel.assertQueue(QUEUE_NAME, { durable: true });// 发送消息(Buffer)channel.sendToQueue(QUEUE_NAME, Buffer.from(msg), { persistent: true });console.log(`[x] 发送消息: ${msg}`);// 延迟关闭连接(避免连接还没写完就关闭)setTimeout(() => {channel.close();conn.close();}, 500);
}module.exports = sendToQueue;

3.3 消息消费(Consumer)

// consumer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');async function startConsumer() {const conn = await amqp.connect(RABBITMQ_URL);const channel = await conn.createChannel();// 保证队列存在await channel.assertQueue(QUEUE_NAME, { durable: true });// 每次只处理一个消息(限流)channel.prefetch(1);console.log('[*] 等待接收消息...');channel.consume(QUEUE_NAME, async (msg) => {const content = msg.content.toString();console.log(`[x] 收到消息: ${content}`);// 模拟处理耗时await new Promise(res => setTimeout(res, 2000));// 确认消息处理完毕channel.ack(msg);console.log(`[✓] 处理完成: ${content}`);});
}// 启动消费者
startConsumer().catch(console.error);

3.4 Express 中调用 Producer

// app.js
const express = require('express');
const sendToQueue = require('./producer');
const app = express();app.use(express.json());app.post('/send', async (req, res) => {const { message } = req.body;if (!message) return res.status(400).json({ error: 'message is required' });await sendToQueue(message);res.json({ status: 'Message sent to queue' });
});app.listen(3000, () => {console.log('Server running at http://localhost:3000');
});

示例请求(用 Postman 或 curl):

curl -X POST http://localhost:3000/send -H "Content-Type: application/json" -d '{"message": "Hello, RabbitMQ!"}'

4️⃣ 进阶功能(操作补全)

✅ 持久化消息

  • 使用 { durable: true } 创建队列

  • 使用 { persistent: true } 发送消息
    可以防止 RabbitMQ 重启丢失消息。

✅ 消息确认机制(ack)

  • 如果 channel.ack(msg) 不执行,消息会保留在队列,确保消息不丢失。

✅ 消费者限流(prefetch)

channel.prefetch(1); // 每次只处理一个消息,避免过载

✅ 多消费者场景(水平扩展)

可以开启多个 consumer.js 实例,共享队列并均衡消费消息。


5️⃣ 常见应用场景

场景示例
异步任务处理用户上传图片,异步压缩处理
邮件通知注册后异步发送邮件
流量削峰高并发请求写入队列,逐步处理
订单系统下单后异步库存扣减、发货处理
区块链事件队列DApp 收到链上事件后异步处理与持久化


✅ 总结

优点说明
解耦模块发送方不关心处理逻辑
提升系统性能异步处理、削峰填谷
提高可靠性可持久化、失败重试、限流
适用于微服务架构服务之间通过消息队列通信


如你想进一步实现 Topic 交换机、死信队列(DLX)、延迟队列等,也可以告诉我,我将继续补充对应场景。

六、Express实现Topic 交换机、死信队列(DLX)、延迟队列

下面是关于 RabbitMQ 的高级用法:Topic 交换机、死信队列(DLX)、延迟队列 的完整实现与讲解,包括 概念 + 示例代码(Node.js + amqplib)+ 使用方式


🧩 1️⃣ Topic 交换机

🔷 概念介绍

Topic 交换机(topic)可以根据通配符匹配路由键,适合复杂的消息路由规则。

  • *:匹配一个单词

  • #:匹配零个或多个单词

示例路由键:

路由键匹配规则
order.created匹配 order.*
order.us.created匹配 order.#
user.deleted不匹配 order.*


✅ 示例代码(Producer + Consumer)

Producer:发送带有路由键的消息
// topicProducer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';async function sendMessage(routingKey, msg) {const conn = await amqp.connect('amqp://localhost');const channel = await conn.createChannel();await channel.assertExchange(exchange, 'topic', { durable: true });channel.publish(exchange, routingKey, Buffer.from(msg));console.log(`[x] Sent '${msg}' with key '${routingKey}'`);setTimeout(() => {channel.close();conn.close();}, 500);
}sendMessage('order.created', '订单已创建');

Consumer:监听带通配符的 key
// topicConsumer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';async function startConsumer(pattern) {const conn = await amqp.connect('amqp://localhost');const channel = await conn.createChannel();await channel.assertExchange(exchange, 'topic', { durable: true });const q = await channel.assertQueue('', { exclusive: true });await channel.bindQueue(q.queue, exchange, pattern);console.log(`[x] Waiting for messages with pattern: ${pattern}`);channel.consume(q.queue, msg => {console.log(`[✓] Received (${msg.fields.routingKey}): ${msg.content.toString()}`);}, { noAck: true });
}startConsumer('order.#');

🧨 2️⃣ 死信队列(DLX)

🔷 概念介绍

死信队列(Dead Letter Exchange) 用于接收未被正常消费的消息,比如:

  • 消费失败未 ack

  • 队列 TTL 超时

  • 队列满

  • 拒绝(nack/reject 且不 requeue)


✅ 示例代码(TTL + DLX)

Producer:声明带 TTL 的主队列 + DLX
// dlxProducer.js
const amqp = require('amqplib');const DLX_EXCHANGE = 'dlx-ex';
const NORMAL_EXCHANGE = 'normal-ex';
const QUEUE = 'normal-queue';
const DLX_QUEUE = 'dead-letter-queue';async function setup() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();// 1. 声明死信交换机与队列await ch.assertExchange(DLX_EXCHANGE, 'fanout', { durable: true });await ch.assertQueue(DLX_QUEUE, { durable: true });await ch.bindQueue(DLX_QUEUE, DLX_EXCHANGE, '');// 2. 声明主交换机和带 DLX 属性的队列await ch.assertExchange(NORMAL_EXCHANGE, 'direct', { durable: true });await ch.assertQueue(QUEUE, {durable: true,deadLetterExchange: DLX_EXCHANGE, // 设置死信交换机messageTtl: 5000                   // 设置消息 TTL 为 5s});await ch.bindQueue(QUEUE, NORMAL_EXCHANGE, 'task');// 3. 发送消息ch.publish(NORMAL_EXCHANGE, 'task', Buffer.from('This will expire!'));console.log('[x] Sent message with TTL');setTimeout(() => {ch.close();conn.close();}, 1000);
}setup();

Consumer:不消费,让其过期 → 死信队列接收
// dlxConsumer.js
const amqp = require('amqplib');async function startDLXConsumer() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('dead-letter-queue', { durable: true });ch.consume('dead-letter-queue', msg => {console.log(`[DLX] Received expired message: ${msg.content.toString()}`);}, { noAck: true });
}startDLXConsumer();

⏳ 3️⃣ 延迟队列(基于 TTL + DLX 实现)

🔷 概念介绍

RabbitMQ 本身不支持“精确到某时间点的延迟消息”,但可以组合:

消息 TTL + DLX 死信交换机 模拟延迟队列。


✅ 延迟队列示例

// delayQueue.js
const amqp = require('amqplib');const DELAY_EX = 'delay-ex';
const DELAY_QUEUE = 'delay-queue';
const TARGET_EX = 'real-task-ex';
const TARGET_QUEUE = 'real-task-queue';async function setupDelayQueue() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();// 1. 目标真实处理交换机/队列await ch.assertExchange(TARGET_EX, 'fanout', { durable: true });await ch.assertQueue(TARGET_QUEUE, { durable: true });await ch.bindQueue(TARGET_QUEUE, TARGET_EX, '');// 2. 延迟队列,消息过期后转发到目标交换机await ch.assertExchange(DELAY_EX, 'direct', { durable: true });await ch.assertQueue(DELAY_QUEUE, {durable: true,messageTtl: 10000, // 延迟10秒deadLetterExchange: TARGET_EX});await ch.bindQueue(DELAY_QUEUE, DELAY_EX, 'delay');// 3. 发送消息ch.publish(DELAY_EX, 'delay', Buffer.from('Hello after 10s'));console.log('[x] Message sent to delay queue');setTimeout(() => {ch.close();conn.close();}, 1000);
}setupDelayQueue();

目标消费(真正业务执行)

// taskConsumer.js
const amqp = require('amqplib');async function consumeRealTask() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('real-task-queue', { durable: true });ch.consume('real-task-queue', msg => {console.log(`[✓] 延迟后收到任务: ${msg.content.toString()}`);}, { noAck: true });
}consumeRealTask();

🧠 总结对比

功能用途技术实现
Topic 交换机多维匹配路由键topic 类型交换机 + 通配符
死信队列异常消息托底设置 x-dead-letter-exchange
延迟队列定时消息、延时执行TTL + DLX 模拟,或用插件实现精确延时


如需:
✅ 多级死信队列(死信消息再次进入延迟)
✅ RabbitMQ 插件方式实现精确延迟(rabbitmq_delayed_message_exchange
✅ NestJS 集成 RabbitMQ 的完整封装

七、RabbitMQ 的可靠性投递消息的幂等性设计

🧩 1️⃣ 可靠性投递(Reliability Delivery)

🟦 概念介绍

可靠投递的目标是:确保消息从生产者 → RabbitMQ → 消费者 三段都不丢失、不重复

涉及三段关键机制:

阶段机制/方法
生产者 → RabbitMQ事务机制、Confirm 模式
RabbitMQ → 队列消息持久化、队列持久化
队列 → 消费者ack 确认消费 + nack 补偿


✅ 关键配置与代码说明(生产者 Confirm 模式)

// producer_confirm.js
const amqp = require('amqplib');async function sendReliableMessage() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createConfirmChannel(); // 👈 Confirm 模式const exchange = 'reliable-ex';await ch.assertExchange(exchange, 'direct', { durable: true });const routingKey = 'reliable';const message = 'Hello with confirm!';ch.publish(exchange, routingKey, Buffer.from(message), { persistent: true }, (err, ok) => {if (err) {console.error('消息发送失败', err);} else {console.log('[✓] 消息成功投递到交换机');}ch.close();conn.close();});
}sendReliableMessage();

📝 persistent: true 确保消息写入磁盘
📝 createConfirmChannel() 可确认 RabbitMQ 是否真正收到了消息(比事务高效)


🧠 2️⃣ 消息的幂等性(Idempotency)

🟦 概念介绍

幂等性是指:无论接收同一条消息多少次,结果都不变,避免重复消费导致的数据错误

✅ 常用方法

方法说明
全局唯一 ID(msgId每条消息带唯一 ID,消费前判断是否处理过
Redis Set/Hash 缓存存储 msgId 已处理记录
数据库唯一约束 / 乐观锁保证写入唯一性或控制版本


✅ 消费者示例(Redis 保证幂等性)

// consumer_idempotent.js
const amqp = require('amqplib');
const Redis = require('ioredis');
const redis = new Redis(); // 默认连接 127.0.0.1:6379async function consumeWithIdempotency() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();const queue = 'task-queue';await ch.assertQueue(queue, { durable: true });ch.consume(queue, async msg => {const msgId = msg.properties.messageId; // 👈 消息唯一标识const key = `processed:${msgId}`;const alreadyProcessed = await redis.get(key);if (alreadyProcessed) {console.log(`[⚠️] 已处理跳过 msgId: ${msgId}`);ch.ack(msg);return;}const content = msg.content.toString();console.log(`[✓] 处理消息: ${content}`);// TODO: 执行业务逻辑...// 标记为已处理,设置过期防止 Redis 爆炸await redis.set(key, '1', 'EX', 86400); // 1天过期ch.ack(msg);}, { noAck: false });
}consumeWithIdempotency();

✅ Producer 设置 messageId

// producer_with_id.js
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');async function sendMessage() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('task-queue', { durable: true });const msg = 'Buy 1 BTC';const msgId = uuidv4(); // 👈 唯一 IDch.sendToQueue('task-queue', Buffer.from(msg), {persistent: true,messageId: msgId});console.log(`[x] Sent with msgId: ${msgId}`);ch.close();conn.close();
}sendMessage();

✅ 小结一览

项目目的推荐技术
投递可靠性消息不丢Confirm 模式 + 持久化
幂等性处理消息不重复处理Redis + msgId 唯一标识
消费异常重试/补偿消息不丢 & 不乱处理nack + 死信/重投机制


如果你需要结合 NestJS、BullMQ、Kafka 或使用 Redlock 进行分布式幂等处理,也可以继续问我,我可以为你生成分布式架构模板方案。

相关文章:

  • CppCon 2015 学习:Simple, Extensible Pattern Matching in C++14
  • 中文分词双向匹配
  • .Net 优秀框架 ABP全面详解
  • 【JMeter】接口断言
  • “一张网,万般用”——聊聊网络虚拟化到底怎么实现的
  • 数据库管理与高可用-MySQL故障排查与生产环境优化
  • 当文化遇见科技:探秘国际数字影像创新生态高地
  • 华硕a豆14 Air香氛版,美学与科技的馨香融合
  • 北京智乐活科技有限公司 适趣ai 二面 全栈
  • FAISS:高性能向量库
  • 佰力博科技与您探讨热释电测量的几种方法
  • RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
  • 博睿数据×华为, 共筑智慧金融新未来
  • React Native 是什么?为什么学它?
  • 井云科技|智能体变现新路径:从开发到盈利的关键跨越
  • day51 python CBAM注意力
  • 前端面试题 微信小程序兼容性问题与组件适配策略
  • Mysql8 忘记密码重置,以及问题解决
  • OpenGL-什么是软OpenGL/软渲染/软光栅?
  • MFC 抛体运动模拟:常见问题解决与界面美化
  • 购物网站app制作/今天发生的重大新闻
  • 寻花问柳-一个专做男人的网站/阿拉善盟seo
  • 具有价值的做pc端网站/杭州网站
  • wordpress新站注意事项/lpl赛区战绩
  • 中国建设银行网站首/宁德市安全教育平台
  • 网站建设公司兴田德润i简介/阿里云域名查询