当前位置: 首页 > wzjs >正文

顶呱呱集团 网站建设推图制作网站

顶呱呱集团 网站建设,推图制作网站,企业做可信网站认证的好处,网站后台管理权限设计生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 corre…
  • 生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。

  • 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 correlationId)。

  • 超时重试:若某批消息在指定时间内未全部确认,未确认的消息会重新加入待发送队列。

producer.ts

import amqp from 'amqplib';async function start() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });async function produce(limit: number, data: string[], timeout: number = 10000) {let message = [...data];if (message.length > limit) {message = message.slice(0, limit);} else if (message.length < limit) {limit = message.length;}// 消息确认let cache: Array<{correlationId: string,message: string,isDelete: boolean,}> = new Array(limit).fill(null).map((_, index) => {return {correlationId: Math.random().toString().slice(2, -1),message: message[index],isDelete: false,};});for (let i = 0; i < limit; ++i) {channel.sendToQueue(queue, Buffer.from(cache[i].message), {correlationId: cache[i].correlationId,replyTo: ackQueue});}const consume = await channel.consume(ackQueue, (message) => {if (!message) {console.error('message is null', message);return;}let index = cache.findIndex((item) => item.correlationId === message.properties.correlationId);if (index !== -1) {cache[index].isDelete = true;console.log('confirmed success:', `"${message.content.toString()}"`, cache.every(item => item.isDelete));} else {console.log('confirmed fail:', `"${message.content.toString()}"`, cache, cache.every(item => item.isDelete), message.properties.correlationId);}channel.ack(message);});const sleep = (time: number) => {return new Promise<void>(resolve => setTimeout(() => resolve(), time));}let stop = false;const interval = async () => {await sleep(0);if (cache.every(item => item.isDelete) || stop) {return;} else {await interval();}}await Promise.race([interval(), // 监听本批次消息是否已经处理完成sleep(timeout), // 本批次消息最长处理时间]);stop = true;await channel.cancel(consume.consumerTag);// 没有收到确认的消息返回下一批处理继续处理return cache.filter(item => !item.isDelete).map(item => item.message);}// 发送1000条数据,分100批,每批10个let msg = new Array(100).fill(null).map((_, index) => `${index} message ${Math.random().toString().slice(2, -1)}`);while (msg.length) {let res = await produce(10, msg.slice(0, 10), 6000);msg = [...res, ...msg.slice(10, msg.length)];console.log('完成一批:', msg.length, '发送结果:', res.length, res);}
}start();

consumer.ts

import amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });channel.consume(queue, (message) => {if (message) {console.log(message?.content.toString(), message?.properties?.replyTo, message?.properties?.correlationId);// 消息处理完后,向 ackQueue 发送确认消息channel.sendToQueue(ackQueue, message?.content, {// 使用相同的 correlationId 来标识确认消息correlationId: message?.properties?.correlationId,// 将原 replyTo 信息传递回来// replyTo: queue,});// 确认 queue11 中的消息channel.ack(message);} else {console.error('message is null', message);}}, { noAck: false });
}produce();

 

http://www.dtcms.com/wzjs/839688.html

相关文章:

  • 网站制作流程 优帮云山西做网站费用
  • 网站规划有前途吗wordpress设计幻灯片
  • 箱包商城网站建设正规网站建设套餐报价
  • 专业建站公司加盟百度图片搜索图片识别
  • 构建一个网站需要什么海南进出口公司排名
  • 珠海建设网站首页本地的唐山网站建设
  • 做外国网站怎么买空间怎么做网站策划
  • 做网站服务器内存网站淘宝客 难做
  • 怎么介绍自己做的静态网站注册网站显示lp或设备超限怎么办
  • 遵义做网站国外网站空间哪个好
  • adsl做网站做网站还能赚钱
  • 网站做任务赚佣金网站策划就业前景
  • 网站建设需要通过哪些审批微信分销网站建设电话
  • 免费电视剧网站大全在线观看爱站网关键词查询网站的工具
  • 灵犀科技 网站建设特效网站
  • 织梦cms官方网站产品推广的网站怎么做
  • 上海app软件开发谷歌优化技巧
  • 网站建设公司平台在哪个网站上做预收款报告
  • 潍坊网站建设制作win10本地安装wordpress
  • 四川自助seo建站亚马逊雨林大火
  • 网站开发工程师需要会写什么区别安徽汽车网网站建设
  • 一个ip怎么做多个网站学校自己做的网站需要买服务器吗
  • 论坛网站建设网页设计心得体会300
  • 学校网站 建设有后台支撑的网站建设合同
  • c语言做的网站有什么优缺点网络服务设备有哪些
  • 提升网站流量该怎么做南昌快速优化排名
  • 义乌免费做网站推广普通话手抄报句子
  • react用于网站开发wordpress问卷调查
  • 南阳集团网站建设泛微网络科技有限公司
  • 深圳移动网站建设公司排名php 开启gzip加速网站