当前位置: 首页 > wzjs >正文

网站建设 cn3万网站建设费会计分录

网站建设 cn,3万网站建设费会计分录,网站建设的安全威胁,个人备案网站可以做支付吗一、使用 Express 操作 Redis 的完整示例 涵盖 Redis 各种核心数据类型(String、Hash、List、Set、Sorted Set)及常见操作 1️⃣ 概念介绍 Redis 是一个开源的高性能键值对数据库,支持多种数据结构,非常适合缓存、排行榜、会话管…

一、使用 Express 操作 Redis 的完整示例

涵盖 Redis 各种核心数据类型(String、Hash、List、Set、Sorted Set)及常见操作


1️⃣ 概念介绍

Redis 是一个开源的高性能键值对数据库,支持多种数据结构,非常适合缓存、排行榜、会话管理等场景。

Redis 支持的数据类型包括:

类型简介
String最基本的数据类型,支持数字/字符串存取
Hash类似对象,适合存储用户信息、对象结构
List链表结构,可做消息队列
Set无序唯一集合,常用于去重
Sorted Set有序唯一集合,常用于排行榜(带分数排序)

Node.js 下最常用的 Redis 客户端是 ioredis,支持连接池、集群等高级功能。


2️⃣ 安装依赖

npm install express ioredis

3️⃣ 初始化 Express + Redis 应用结构

project/
├── index.js
├── redisClient.js
└── package.json

4️⃣ 创建 Redis 客户端 redisClient.js

const Redis = require('ioredis');// 默认连接本地 Redis,如果你有密码或远程地址可以传入配置对象
const redis = new Redis({host: '127.0.0.1',port: 6379,// password: 'your-password', // 如果有密码
});module.exports = redis;

5️⃣ Express 应用 index.js:演示各种 Redis 数据类型操作

const express = require('express');
const redis = require('./redisClient');
const app = express();
app.use(express.json());/*** ============ String 类型 ============*/// 设置字符串
app.post('/string/set', async (req, res) => {const { key, value } = req.body;await redis.set(key, value);res.send(`设置成功:${key}=${value}`);
});// 获取字符串
app.get('/string/get/:key', async (req, res) => {const value = await redis.get(req.params.key);res.send(`值为:${value}`);
});/*** ============ Hash 类型 ============*/// 设置哈希字段
app.post('/hash/set', async (req, res) => {const { key, field, value } = req.body;await redis.hset(key, field, value);res.send(`哈希字段设置成功:${key}.${field}=${value}`);
});// 获取哈希字段
app.get('/hash/get', async (req, res) => {const { key, field } = req.query;const value = await redis.hget(key, field);res.send(`哈希字段值为:${value}`);
});// 获取整个哈希
app.get('/hash/all/:key', async (req, res) => {const hash = await redis.hgetall(req.params.key);res.json(hash);
});/*** ============ List 类型 ============*/// 从左侧推入列表
app.post('/list/leftpush', async (req, res) => {const { key, value } = req.body;await redis.lpush(key, value);res.send(`左侧推入成功:${value}`);
});// 从右侧弹出列表
app.get('/list/rightpop/:key', async (req, res) => {const value = await redis.rpop(req.params.key);res.send(`右侧弹出值:${value}`);
});/*** ============ Set 类型 ============*/// 添加到 Set 集合
app.post('/set/add', async (req, res) => {const { key, value } = req.body;await redis.sadd(key, value);res.send(`添加到集合成功:${value}`);
});// 获取 Set 所有成员
app.get('/set/members/:key', async (req, res) => {const members = await redis.smembers(req.params.key);res.json(members);
});/*** ============ Sorted Set 类型 ============*/// 添加成员和分数
app.post('/zset/add', async (req, res) => {const { key, score, member } = req.body;await redis.zadd(key, score, member);res.send(`添加到有序集合成功:${member}(分数${score})`);
});// 获取排行榜
app.get('/zset/range/:key', async (req, res) => {const list = await redis.zrevrange(req.params.key, 0, 9, 'WITHSCORES');res.json(list);
});const PORT = 3000;
app.listen(PORT, () => {console.log(`服务器启动:http://localhost:${PORT}`);
});

6️⃣ Postman 测试示例

使用 POSTGET 请求调用以下接口:

  • POST /string/set + JSON { "key": "user:1:name", "value": "Alice" }

  • GET /string/get/user:1:name

  • POST /hash/set + { "key": "user:2", "field": "email", "value": "a@example.com" }

  • GET /hash/all/user:2

  • POST /list/leftpush + { "key": "messages", "value": "hi" }

  • GET /list/rightpop/messages

  • POST /set/add + { "key": "tags", "value": "web3" }

  • GET /set/members/tags

  • POST /zset/add + { "key": "ranking", "score": 98, "member": "Bob" }

  • GET /zset/range/ranking


✅ 总结

数据类型操作方法(部分)使用场景
Stringset, get, incr缓存数据、计数器
Hashhset, hget, hgetall用户信息、配置项
Listlpush, rpop, lrange消息队列、时间轴
Setsadd, smembers, sismember去重、用户标签
Sorted Setzadd, zrange, zrevrange排行榜、权重排序

如需进一步加上缓存失效时间(EX)、分布式锁、订阅发布等高级操作,也可以继续告诉我,我可以补充完整!

二、Redis 在 Express 项目中实现 缓存失效时间(EX)分布式锁发布/订阅机制

1️⃣ 缓存失效时间(EX)

概念介绍

Redis 支持为 key 设置生存时间(TTL),超时后自动删除该键,常用于缓存数据。


示例代码

// 设置字符串并指定过期时间(单位:秒)
app.post('/cache/setex', async (req, res) => {const { key, value, ttl } = req.body;await redis.set(key, value, 'EX', ttl); // 'EX' 表示秒级过期时间res.send(`设置缓存成功:${key}=${value}(过期时间:${ttl}秒)`);
});// 查询 key 的剩余时间
app.get('/cache/ttl/:key', async (req, res) => {const ttl = await redis.ttl(req.params.key);res.send(`剩余时间:${ttl} 秒`);
});

2️⃣ 分布式锁

概念介绍

Redis 提供 SET key value NX EX ttl 命令可用于实现简易分布式锁。

  • NX: 仅当 key 不存在时设置,保证只有一个客户端加锁成功。

  • EX ttl: 设置锁过期时间,防止死锁。


示例代码

// 尝试获取锁
app.post('/lock/acquire', async (req, res) => {const { lockKey, lockValue, ttl } = req.body;const success = await redis.set(lockKey, lockValue, 'NX', 'EX', ttl);if (success) {res.send('✅ 成功获取锁');} else {res.status(423).send('❌ 获取锁失败,资源已被占用');}
});// 释放锁(注意:必须验证持有者)
app.post('/lock/release', async (req, res) => {const { lockKey, lockValue } = req.body;// 用 Lua 脚本确保只有锁持有者可以释放const luaScript = `if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("del", KEYS[1])elsereturn 0end`;const result = await redis.eval(luaScript, 1, lockKey, lockValue);res.send(result === 1 ? '✅ 成功释放锁' : '⚠️ 无权释放锁');
});

3️⃣ 发布/订阅(Pub/Sub)

概念介绍

Redis 提供消息发布订阅功能,常用于:

  • 服务间通信

  • 实时通知

  • 消息驱动架构


示例代码

消费端(订阅频道)

// 单独开一个 Redis 订阅连接
const Redis = require('ioredis');
const subClient = new Redis();subClient.subscribe('news', () => {console.log('📡 订阅了 news 频道');
});subClient.on('message', (channel, message) => {console.log(`📨 接收到频道 ${channel} 的消息: ${message}`);
});

生产端(发布消息)

// 发布消息到 news 频道
app.post('/publish/news', async (req, res) => {const { message } = req.body;await redis.publish('news', message);res.send('📤 消息已发布');
});

✅ 总结表

功能核心命令应用场景
缓存失效SET key value EX ttl缓存自动清理,热点数据缓存
分布式锁SET key val NX EX ttl + Lua控制资源访问,避免并发冲突
发布订阅PUBLISH/SUBSCRIBE实时通信、系统内事件通知


如需更高级用法(如 Redlock、Redis Stream 消息队列、异步任务持久化等),欢迎继续提问!也可以让我帮你把这些整合进一个生产级项目模板。

三、Express实现Redlock、Redis Stream 消息队列、异步任务持久化

下面详细讲解 Redlock 分布式锁算法Redis Streams 消息队列异步任务持久化 的概念、使用场景与详细代码示例(基于 Express + Node.js + ioredis):


1️⃣ Redlock 分布式锁算法

概念介绍

Redlock 是 Redis 官方推荐的分布式锁算法,适用于多 Redis 实例构建高可靠性锁。

  • 使用多个 Redis 节点写入同一锁键;

  • 获取大多数实例同意才算加锁成功;

  • 自动设置超时,防止死锁。

示例代码(基于 redlock 库)

npm install redlock ioredis
const Redis = require('ioredis');
const Redlock = require('redlock');const clients = [new Redis(), new Redis(), new Redis()]; // 模拟多个实例
const redlock = new Redlock(clients, {retryCount: 3,retryDelay: 200, // ms
});app.post('/redlock', async (req, res) => {try {const lock = await redlock.acquire(['locks:resource'], 10000); // 10秒锁console.log('✅ 获取Redlock成功');setTimeout(async () => {await lock.release();console.log('🔓 Redlock释放成功');}, 5000);res.send('Redlock锁定成功');} catch (err) {res.status(500).send('🔒 Redlock锁获取失败');}
});

2️⃣ Redis Streams 消息队列

概念介绍

Redis Stream 是 Redis 5.0+ 引入的持久化消息队列,支持:

  • 消息追加(XADD

  • 消费者组(XGROUP

  • 消费与确认(XREADGROUP, XACK

适合实现 可靠队列、异步任务处理、事件流系统


示例代码(任务生产+消费)

const streamKey = 'mystream';
const consumerGroup = 'mygroup';
const consumerName = 'consumer1';(async () => {try {await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');} catch (e) {if (!e.message.includes('BUSYGROUP')) throw e;}
})();// 生产消息
app.post('/stream/produce', async (req, res) => {const { orderId } = req.body;await redis.xadd(streamKey, '*', 'orderId', orderId);res.send('⏩ 消息已推入流');
});// 消费消息(自动应答)
setInterval(async () => {const result = await redis.xreadgroup('GROUP', consumerGroup, consumerName,'BLOCK', 2000,'COUNT', 1,'STREAMS', streamKey, '>');if (result) {const [[stream, messages]] = result;for (const [id, fields] of messages) {const orderId = fields[1];console.log('✅ 消费订单:', orderId);await redis.xack(streamKey, consumerGroup, id); // 标记已处理}}
}, 3000);

3️⃣ 异步任务持久化(基于队列)

概念介绍

将任务加入 Redis 队列并异步处理,可实现:

  • 防止阻塞主线程

  • 保证任务持久化存储

  • 支持失败重试、延时执行等

推荐使用 bull 或 bullmq,内置持久化、并发控制、失败重试等机制。


示例代码(使用 bull

npm install bull
const Queue = require('bull');
const jobQueue = new Queue('emailQueue', { redis: { port: 6379, host: '127.0.0.1' } });// 添加任务
app.post('/async/send-email', async (req, res) => {const { to, content } = req.body;await jobQueue.add({ to, content });res.send('📨 已加入异步发送任务队列');
});// 处理任务
jobQueue.process(async (job) => {console.log(`发送邮件给:${job.data.to},内容:${job.data.content}`);// 真实场景:集成 nodemailer、Mailgun、Sendgrid 等发送
});// 失败重试配置
jobQueue.on('failed', (job, err) => {console.error(`❌ 邮件发送失败:${job.id}`, err.message);
});

📌 总结对比表

技术类型特点与应用场景
Redlock分布式锁多节点锁协调,防止并发冲突与死锁
Redis Stream消息队列原生持久化、消费者组、高吞吐异步任务
Bull/BullMQ队列框架丰富功能,失败重试、延迟队列、状态可视化


如你需要我将三种方式整合成完整的 Express 微服务框架示例,也可以继续提问!

四、Express 使用布隆过滤器

下面是关于 Express 使用布隆过滤器(Bloom Filter) 的概念、详细代码示例和讲解,帮助你理解它在实际 Web 应用中的应用场景:


1️⃣ 概念介绍

布隆过滤器(Bloom Filter)是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中:

  • 优点:节省内存、查询速度快,适用于大规模去重、黑名单校验等场景。

  • 缺点:可能误判为存在(假阳性),但不会漏判(不会漏掉存在的元素)。

常用于:

  • 防止缓存穿透(查询不存在数据频繁打数据库)

  • 用户是否已提交过表单、请求

  • 黑名单过滤、URL 去重


2️⃣ 安装依赖

我们使用 bloom-filters 库:

npm install bloom-filters

3️⃣ 示例代码(Express + BloomFilter)

以下示例演示如何使用布隆过滤器拦截重复请求或缓存穿透:

// 引入布隆过滤器
const express = require('express');
const { BloomFilter } = require('bloom-filters');
const app = express();app.use(express.json());// 创建布隆过滤器,参数: 预计插入数量、可接受误差率
const bloom = new BloomFilter(1000, 0.01);// 示例:用户请求时检查是否访问过该资源
app.post('/api/resource', (req, res) => {const { userId } = req.body;// 检查是否已经请求过if (bloom.has(userId)) {return res.status(429).json({ message: '该用户已处理过请求,拒绝重复操作。' });}// 第一次访问,插入布隆过滤器bloom.add(userId);// 模拟处理业务逻辑console.log(`处理用户 ${userId} 的请求`);res.json({ message: '请求处理成功' });
});

4️⃣ 与 Redis 配合使用(持久化布隆过滤器)

如需跨服务共享布隆过滤器状态,可使用 Redis 存储:

npm install redis
const redis = require('redis');
const { BloomFilter } = require('bloom-filters');
const client = redis.createClient();// 将布隆过滤器序列化为 JSON 保存
const saveToRedis = async (key, filter) => {const json = JSON.stringify(filter.saveAsJSON());await client.set(key, json);
};// 从 Redis 读取布隆过滤器
const loadFromRedis = async (key) => {const json = await client.get(key);if (json) {return BloomFilter.fromJSON(JSON.parse(json));}return new BloomFilter(1000, 0.01);
};

5️⃣ 使用场景举例

使用场景示例
防止缓存穿透数据库中没有的 key 被频繁查询
请求重复提交拦截用户提交重复订单、重复点赞
URL 去重爬虫系统中防止重复抓取
黑名单判断IP、邮箱、手机号是否在黑名单中


✅ 小结

  • 布隆过滤器适合在高并发系统中快速过滤“不存在”的请求

  • 它不能代替数据库查重,但能大幅减少无效请求打到数据库/缓存

  • 结合 Redis 可以实现布隆过滤器的持久化与多节点共享。

五、Express 使用 RabbitMQ

以下是关于 Express 使用 RabbitMQ 的完整指南,包括:

  • 概念介绍

  • 安装依赖

  • 完整代码示例(发送、消费、连接关闭、持久化、确认机制等)

  • 实战讲解与注释


1️⃣ 概念介绍

RabbitMQ 是一个高性能的消息中间件(消息队列),用于实现异步通信、解耦模块、削峰填谷等。

它基于 AMQP(Advanced Message Queuing Protocol)协议,主要组件包括:

组件说明
Producer消息生产者,发送消息到队列
Queue消息队列,缓存待处理消息
Consumer消费者,监听并处理队列中的消息
Exchange交换机,决定消息路由到哪个队列(direct、fanout、topic、headers)
Routing Key消息携带的路由标识


2️⃣ 安装依赖

我们使用 amqplib 模块:

npm install amqplib

3️⃣ Express 集成 RabbitMQ:发送与消费消息(全流程)

项目结构:

/rabbitmq-demo
├── producer.js        # 发送消息
├── consumer.js        # 消费消息
└── app.js             # Express 接口入口

3.1 配置 RabbitMQ 地址

// config.js
module.exports = {RABBITMQ_URL: 'amqp://localhost', // 默认端口5672QUEUE_NAME: 'task_queue'
};

3.2 消息发送(Producer)

// producer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');async function sendToQueue(msg) {const conn = await amqp.connect(RABBITMQ_URL);const channel = await conn.createChannel();// 保证队列存在(幂等操作)await channel.assertQueue(QUEUE_NAME, { durable: true });// 发送消息(Buffer)channel.sendToQueue(QUEUE_NAME, Buffer.from(msg), { persistent: true });console.log(`[x] 发送消息: ${msg}`);// 延迟关闭连接(避免连接还没写完就关闭)setTimeout(() => {channel.close();conn.close();}, 500);
}module.exports = sendToQueue;

3.3 消息消费(Consumer)

// consumer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');async function startConsumer() {const conn = await amqp.connect(RABBITMQ_URL);const channel = await conn.createChannel();// 保证队列存在await channel.assertQueue(QUEUE_NAME, { durable: true });// 每次只处理一个消息(限流)channel.prefetch(1);console.log('[*] 等待接收消息...');channel.consume(QUEUE_NAME, async (msg) => {const content = msg.content.toString();console.log(`[x] 收到消息: ${content}`);// 模拟处理耗时await new Promise(res => setTimeout(res, 2000));// 确认消息处理完毕channel.ack(msg);console.log(`[✓] 处理完成: ${content}`);});
}// 启动消费者
startConsumer().catch(console.error);

3.4 Express 中调用 Producer

// app.js
const express = require('express');
const sendToQueue = require('./producer');
const app = express();app.use(express.json());app.post('/send', async (req, res) => {const { message } = req.body;if (!message) return res.status(400).json({ error: 'message is required' });await sendToQueue(message);res.json({ status: 'Message sent to queue' });
});app.listen(3000, () => {console.log('Server running at http://localhost:3000');
});

示例请求(用 Postman 或 curl):

curl -X POST http://localhost:3000/send -H "Content-Type: application/json" -d '{"message": "Hello, RabbitMQ!"}'

4️⃣ 进阶功能(操作补全)

✅ 持久化消息

  • 使用 { durable: true } 创建队列

  • 使用 { persistent: true } 发送消息
    可以防止 RabbitMQ 重启丢失消息。

✅ 消息确认机制(ack)

  • 如果 channel.ack(msg) 不执行,消息会保留在队列,确保消息不丢失。

✅ 消费者限流(prefetch)

channel.prefetch(1); // 每次只处理一个消息,避免过载

✅ 多消费者场景(水平扩展)

可以开启多个 consumer.js 实例,共享队列并均衡消费消息。


5️⃣ 常见应用场景

场景示例
异步任务处理用户上传图片,异步压缩处理
邮件通知注册后异步发送邮件
流量削峰高并发请求写入队列,逐步处理
订单系统下单后异步库存扣减、发货处理
区块链事件队列DApp 收到链上事件后异步处理与持久化


✅ 总结

优点说明
解耦模块发送方不关心处理逻辑
提升系统性能异步处理、削峰填谷
提高可靠性可持久化、失败重试、限流
适用于微服务架构服务之间通过消息队列通信


如你想进一步实现 Topic 交换机、死信队列(DLX)、延迟队列等,也可以告诉我,我将继续补充对应场景。

六、Express实现Topic 交换机、死信队列(DLX)、延迟队列

下面是关于 RabbitMQ 的高级用法:Topic 交换机、死信队列(DLX)、延迟队列 的完整实现与讲解,包括 概念 + 示例代码(Node.js + amqplib)+ 使用方式


🧩 1️⃣ Topic 交换机

🔷 概念介绍

Topic 交换机(topic)可以根据通配符匹配路由键,适合复杂的消息路由规则。

  • *:匹配一个单词

  • #:匹配零个或多个单词

示例路由键:

路由键匹配规则
order.created匹配 order.*
order.us.created匹配 order.#
user.deleted不匹配 order.*


✅ 示例代码(Producer + Consumer)

Producer:发送带有路由键的消息
// topicProducer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';async function sendMessage(routingKey, msg) {const conn = await amqp.connect('amqp://localhost');const channel = await conn.createChannel();await channel.assertExchange(exchange, 'topic', { durable: true });channel.publish(exchange, routingKey, Buffer.from(msg));console.log(`[x] Sent '${msg}' with key '${routingKey}'`);setTimeout(() => {channel.close();conn.close();}, 500);
}sendMessage('order.created', '订单已创建');

Consumer:监听带通配符的 key
// topicConsumer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';async function startConsumer(pattern) {const conn = await amqp.connect('amqp://localhost');const channel = await conn.createChannel();await channel.assertExchange(exchange, 'topic', { durable: true });const q = await channel.assertQueue('', { exclusive: true });await channel.bindQueue(q.queue, exchange, pattern);console.log(`[x] Waiting for messages with pattern: ${pattern}`);channel.consume(q.queue, msg => {console.log(`[✓] Received (${msg.fields.routingKey}): ${msg.content.toString()}`);}, { noAck: true });
}startConsumer('order.#');

🧨 2️⃣ 死信队列(DLX)

🔷 概念介绍

死信队列(Dead Letter Exchange) 用于接收未被正常消费的消息,比如:

  • 消费失败未 ack

  • 队列 TTL 超时

  • 队列满

  • 拒绝(nack/reject 且不 requeue)


✅ 示例代码(TTL + DLX)

Producer:声明带 TTL 的主队列 + DLX
// dlxProducer.js
const amqp = require('amqplib');const DLX_EXCHANGE = 'dlx-ex';
const NORMAL_EXCHANGE = 'normal-ex';
const QUEUE = 'normal-queue';
const DLX_QUEUE = 'dead-letter-queue';async function setup() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();// 1. 声明死信交换机与队列await ch.assertExchange(DLX_EXCHANGE, 'fanout', { durable: true });await ch.assertQueue(DLX_QUEUE, { durable: true });await ch.bindQueue(DLX_QUEUE, DLX_EXCHANGE, '');// 2. 声明主交换机和带 DLX 属性的队列await ch.assertExchange(NORMAL_EXCHANGE, 'direct', { durable: true });await ch.assertQueue(QUEUE, {durable: true,deadLetterExchange: DLX_EXCHANGE, // 设置死信交换机messageTtl: 5000                   // 设置消息 TTL 为 5s});await ch.bindQueue(QUEUE, NORMAL_EXCHANGE, 'task');// 3. 发送消息ch.publish(NORMAL_EXCHANGE, 'task', Buffer.from('This will expire!'));console.log('[x] Sent message with TTL');setTimeout(() => {ch.close();conn.close();}, 1000);
}setup();

Consumer:不消费,让其过期 → 死信队列接收
// dlxConsumer.js
const amqp = require('amqplib');async function startDLXConsumer() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('dead-letter-queue', { durable: true });ch.consume('dead-letter-queue', msg => {console.log(`[DLX] Received expired message: ${msg.content.toString()}`);}, { noAck: true });
}startDLXConsumer();

⏳ 3️⃣ 延迟队列(基于 TTL + DLX 实现)

🔷 概念介绍

RabbitMQ 本身不支持“精确到某时间点的延迟消息”,但可以组合:

消息 TTL + DLX 死信交换机 模拟延迟队列。


✅ 延迟队列示例

// delayQueue.js
const amqp = require('amqplib');const DELAY_EX = 'delay-ex';
const DELAY_QUEUE = 'delay-queue';
const TARGET_EX = 'real-task-ex';
const TARGET_QUEUE = 'real-task-queue';async function setupDelayQueue() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();// 1. 目标真实处理交换机/队列await ch.assertExchange(TARGET_EX, 'fanout', { durable: true });await ch.assertQueue(TARGET_QUEUE, { durable: true });await ch.bindQueue(TARGET_QUEUE, TARGET_EX, '');// 2. 延迟队列,消息过期后转发到目标交换机await ch.assertExchange(DELAY_EX, 'direct', { durable: true });await ch.assertQueue(DELAY_QUEUE, {durable: true,messageTtl: 10000, // 延迟10秒deadLetterExchange: TARGET_EX});await ch.bindQueue(DELAY_QUEUE, DELAY_EX, 'delay');// 3. 发送消息ch.publish(DELAY_EX, 'delay', Buffer.from('Hello after 10s'));console.log('[x] Message sent to delay queue');setTimeout(() => {ch.close();conn.close();}, 1000);
}setupDelayQueue();

目标消费(真正业务执行)

// taskConsumer.js
const amqp = require('amqplib');async function consumeRealTask() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('real-task-queue', { durable: true });ch.consume('real-task-queue', msg => {console.log(`[✓] 延迟后收到任务: ${msg.content.toString()}`);}, { noAck: true });
}consumeRealTask();

🧠 总结对比

功能用途技术实现
Topic 交换机多维匹配路由键topic 类型交换机 + 通配符
死信队列异常消息托底设置 x-dead-letter-exchange
延迟队列定时消息、延时执行TTL + DLX 模拟,或用插件实现精确延时


如需:
✅ 多级死信队列(死信消息再次进入延迟)
✅ RabbitMQ 插件方式实现精确延迟(rabbitmq_delayed_message_exchange
✅ NestJS 集成 RabbitMQ 的完整封装

七、RabbitMQ 的可靠性投递消息的幂等性设计

🧩 1️⃣ 可靠性投递(Reliability Delivery)

🟦 概念介绍

可靠投递的目标是:确保消息从生产者 → RabbitMQ → 消费者 三段都不丢失、不重复

涉及三段关键机制:

阶段机制/方法
生产者 → RabbitMQ事务机制、Confirm 模式
RabbitMQ → 队列消息持久化、队列持久化
队列 → 消费者ack 确认消费 + nack 补偿


✅ 关键配置与代码说明(生产者 Confirm 模式)

// producer_confirm.js
const amqp = require('amqplib');async function sendReliableMessage() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createConfirmChannel(); // 👈 Confirm 模式const exchange = 'reliable-ex';await ch.assertExchange(exchange, 'direct', { durable: true });const routingKey = 'reliable';const message = 'Hello with confirm!';ch.publish(exchange, routingKey, Buffer.from(message), { persistent: true }, (err, ok) => {if (err) {console.error('消息发送失败', err);} else {console.log('[✓] 消息成功投递到交换机');}ch.close();conn.close();});
}sendReliableMessage();

📝 persistent: true 确保消息写入磁盘
📝 createConfirmChannel() 可确认 RabbitMQ 是否真正收到了消息(比事务高效)


🧠 2️⃣ 消息的幂等性(Idempotency)

🟦 概念介绍

幂等性是指:无论接收同一条消息多少次,结果都不变,避免重复消费导致的数据错误

✅ 常用方法

方法说明
全局唯一 ID(msgId每条消息带唯一 ID,消费前判断是否处理过
Redis Set/Hash 缓存存储 msgId 已处理记录
数据库唯一约束 / 乐观锁保证写入唯一性或控制版本


✅ 消费者示例(Redis 保证幂等性)

// consumer_idempotent.js
const amqp = require('amqplib');
const Redis = require('ioredis');
const redis = new Redis(); // 默认连接 127.0.0.1:6379async function consumeWithIdempotency() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();const queue = 'task-queue';await ch.assertQueue(queue, { durable: true });ch.consume(queue, async msg => {const msgId = msg.properties.messageId; // 👈 消息唯一标识const key = `processed:${msgId}`;const alreadyProcessed = await redis.get(key);if (alreadyProcessed) {console.log(`[⚠️] 已处理跳过 msgId: ${msgId}`);ch.ack(msg);return;}const content = msg.content.toString();console.log(`[✓] 处理消息: ${content}`);// TODO: 执行业务逻辑...// 标记为已处理,设置过期防止 Redis 爆炸await redis.set(key, '1', 'EX', 86400); // 1天过期ch.ack(msg);}, { noAck: false });
}consumeWithIdempotency();

✅ Producer 设置 messageId

// producer_with_id.js
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');async function sendMessage() {const conn = await amqp.connect('amqp://localhost');const ch = await conn.createChannel();await ch.assertQueue('task-queue', { durable: true });const msg = 'Buy 1 BTC';const msgId = uuidv4(); // 👈 唯一 IDch.sendToQueue('task-queue', Buffer.from(msg), {persistent: true,messageId: msgId});console.log(`[x] Sent with msgId: ${msgId}`);ch.close();conn.close();
}sendMessage();

✅ 小结一览

项目目的推荐技术
投递可靠性消息不丢Confirm 模式 + 持久化
幂等性处理消息不重复处理Redis + msgId 唯一标识
消费异常重试/补偿消息不丢 & 不乱处理nack + 死信/重投机制


如果你需要结合 NestJS、BullMQ、Kafka 或使用 Redlock 进行分布式幂等处理,也可以继续问我,我可以为你生成分布式架构模板方案。


文章转载自:

http://mhUz4w2N.dyxLj.cn
http://DsJojhGg.dyxLj.cn
http://fAMEr8a2.dyxLj.cn
http://V0VxKw3l.dyxLj.cn
http://NMKlbnak.dyxLj.cn
http://oHc3Goy0.dyxLj.cn
http://EJmr5xwi.dyxLj.cn
http://qxc7e0LX.dyxLj.cn
http://EUfgIgwm.dyxLj.cn
http://KpGDe368.dyxLj.cn
http://5arFKHsx.dyxLj.cn
http://wdgpo5QL.dyxLj.cn
http://s9akrCtC.dyxLj.cn
http://ATDv53IA.dyxLj.cn
http://Om9cC2ft.dyxLj.cn
http://AQoq3lxz.dyxLj.cn
http://e2o6ODWH.dyxLj.cn
http://3fnMZHsm.dyxLj.cn
http://4z2itq66.dyxLj.cn
http://1mS1cWdW.dyxLj.cn
http://5YhU3BDM.dyxLj.cn
http://egt0BgKh.dyxLj.cn
http://1XoLxUL0.dyxLj.cn
http://9ok5BydZ.dyxLj.cn
http://LLC2d0pJ.dyxLj.cn
http://ZVq4RjGC.dyxLj.cn
http://ckl1hOjI.dyxLj.cn
http://eL9Rptal.dyxLj.cn
http://pFcoxnPK.dyxLj.cn
http://EHTLx5gs.dyxLj.cn
http://www.dtcms.com/wzjs/642153.html

相关文章:

  • 婚礼摄影作品网站湖南省建设厅气源适配性目录2022
  • access数据库网站开发杭州网站开发制作公司
  • 网站背景音乐网站xml
  • 湖南城乡建设部网站江门当地的免费网站优化
  • 网站建站历史建站网址
  • p2p金融网站建设手机版网站案例
  • 广科网站开发怎么做干果网站
  • 大连模板网站制作推荐软件工程学费
  • 怎样使用网站后台的模板网站建设项目立项登记 表
  • 昆明医院网站建设纺织服装板块上市公司网站建设
  • nodejs做网站容易被攻击吗wordpress支持什么语言
  • 建设企业网站的深圳建筑工程交易服务中心网
  • 企业网站建设的上市公司怎么创建一个论坛
  • 陕西省医院网站建设管理做地图特效的网站
  • 松江做网站的公司智能建站程序
  • 做网站系统论坛网页设计
  • 西安商城网站开发制作重庆黄埔seo整站优化
  • android下载安装app网站优化推广费用
  • 网站架构组成部分北京vi设计
  • 手机网站有什么区别吗黄冈商城网站建设
  • 湖南省住房建设厅网站信息产业部网站备案查询
  • 那里有专做粮食的网站网站如何运营管理
  • 网站目录做跳转西安网站建设设计公司
  • 网站建设平台硬件要求宁波建设公司网站
  • 商家在携程旅游网站怎样做宣传做打牌的网站怎么办
  • 沈阳网站优化 唐朝网络石家庄做外贸网站推广
  • html5网站链接标签网站主办者是谁
  • 科技网络网站建设普通话的顺口溜6句
  • html代码网站wordpress 过滤html
  • 企业微信网站开发公司做外贸那个网站好