当前位置: 首页 > wzjs >正文

个人建设图片分享网站sem优化软件选哪家

个人建设图片分享网站,sem优化软件选哪家,html5做图网站,沈阳做平板网站生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 corre…
  • 生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。

  • 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 correlationId)。

  • 超时重试:若某批消息在指定时间内未全部确认,未确认的消息会重新加入待发送队列。

producer.ts

import amqp from 'amqplib';async function start() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });async function produce(limit: number, data: string[], timeout: number = 10000) {let message = [...data];if (message.length > limit) {message = message.slice(0, limit);} else if (message.length < limit) {limit = message.length;}// 消息确认let cache: Array<{correlationId: string,message: string,isDelete: boolean,}> = new Array(limit).fill(null).map((_, index) => {return {correlationId: Math.random().toString().slice(2, -1),message: message[index],isDelete: false,};});for (let i = 0; i < limit; ++i) {channel.sendToQueue(queue, Buffer.from(cache[i].message), {correlationId: cache[i].correlationId,replyTo: ackQueue});}const consume = await channel.consume(ackQueue, (message) => {if (!message) {console.error('message is null', message);return;}let index = cache.findIndex((item) => item.correlationId === message.properties.correlationId);if (index !== -1) {cache[index].isDelete = true;console.log('confirmed success:', `"${message.content.toString()}"`, cache.every(item => item.isDelete));} else {console.log('confirmed fail:', `"${message.content.toString()}"`, cache, cache.every(item => item.isDelete), message.properties.correlationId);}channel.ack(message);});const sleep = (time: number) => {return new Promise<void>(resolve => setTimeout(() => resolve(), time));}let stop = false;const interval = async () => {await sleep(0);if (cache.every(item => item.isDelete) || stop) {return;} else {await interval();}}await Promise.race([interval(), // 监听本批次消息是否已经处理完成sleep(timeout), // 本批次消息最长处理时间]);stop = true;await channel.cancel(consume.consumerTag);// 没有收到确认的消息返回下一批处理继续处理return cache.filter(item => !item.isDelete).map(item => item.message);}// 发送1000条数据,分100批,每批10个let msg = new Array(100).fill(null).map((_, index) => `${index} message ${Math.random().toString().slice(2, -1)}`);while (msg.length) {let res = await produce(10, msg.slice(0, 10), 6000);msg = [...res, ...msg.slice(10, msg.length)];console.log('完成一批:', msg.length, '发送结果:', res.length, res);}
}start();

consumer.ts

import amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });channel.consume(queue, (message) => {if (message) {console.log(message?.content.toString(), message?.properties?.replyTo, message?.properties?.correlationId);// 消息处理完后,向 ackQueue 发送确认消息channel.sendToQueue(ackQueue, message?.content, {// 使用相同的 correlationId 来标识确认消息correlationId: message?.properties?.correlationId,// 将原 replyTo 信息传递回来// replyTo: queue,});// 确认 queue11 中的消息channel.ack(message);} else {console.error('message is null', message);}}, { noAck: false });
}produce();

 

http://www.dtcms.com/wzjs/102637.html

相关文章:

  • 软件前端开发工程师宁波网站推广网站优化
  • 虚拟机做的网站怎么让外网访问不了网今日新闻最新消息50字
  • 男人女人做那个网站互动营销案例
  • 欧亚专线兰州网站seo优化
  • 使用redis做视频网站缓存免费行情软件app网站下载大全
  • 做网站推销话术网络推广营销技巧
  • 豪圣建设项目管理网站百度推广账号登录入口
  • wordpress utf8企业网站seo诊断报告
  • 在线音乐网站开发php珠海seo推广
  • 内部劵网站怎么做磁力
  • 游戏网站怎么做seo最近的头条新闻
  • wap网站有哪些电商网页
  • 怎么做网站文件百度云搜索资源入口
  • 商务网站建设与管理读后感seo怎么做排名
  • 智慧团建网站注册东莞百度快照优化排名
  • 软文广告平台网站seo诊断技巧
  • 山东青岛网站制作公司百度域名
  • 手机网站主机如何创建一个网站
  • wordpress新建子域名多站点百度地图关键词排名优化
  • 政府网站建设 责任感搜索引擎优化方法有哪几种
  • 旅游网站开发 目的及必要性怎么让百度收录网址
  • 网站域名被抢注做商标西安网站快速排名提升
  • 网站的收费系统怎么做上海专业的网络推广
  • 易瑞通网站建设千锋培训机构官网
  • 中国建设招标网是个假网站企业推广的网站
  • web程序设计与实践做网站如何自己做网络推广
  • 网站建设平台信息google浏览器官网
  • 广州模板建站系统搜索引擎优化人员优化
  • 腾讯云做视频网站吗游戏推广赚佣金平台
  • 产品做推广一般上什么网站搜索网站有哪几个