当前位置: 首页 > news >正文

湖南省建设工程造价管理协会网站资阳抖音搜索优化

湖南省建设工程造价管理协会网站,资阳抖音搜索优化,网站维护中 页面,和田网站建设示例一、Routing exchange类型direct,根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息,consumer.ts负责接收消息,同时也都可以创建exchange交换机,创建队列,为队列绑定exchange&#xff…

示例一、Routing

exchange类型direct,根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息,consumer.ts负责接收消息,同时也都可以创建exchange交换机,创建队列,为队列绑定exchange,为避免重复简化代码,提高可维护性,队列相关操作移动到消费者端。队列,exchange交换机推荐在启动程序前手动创建好。

producer.ts 

import RabbitMQ from 'amqplib/callback_api';function start() {RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {if (err0) {console.error("[AMQP]", err0.message);return setTimeout(start, 1000);}conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");conn.createChannel(async (err2, channel) => {if (err2) {console.error("[AMQP]", err2.message);return setTimeout(start, 1000);}const exchangeName = 'exchange1';channel.assertExchange(exchangeName,'direct',{durable: true},(err, ok) => {if (err) {console.log('exchange路由转发创建失败', err);} else {let args = ['info', 'warn', 'error'];for (let i = 0; i < 10; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));const routeKey = args[Math.floor(Math.random() * 3)];console.log('消息发送是否成功', channel.publish(exchangeName,routeKey,Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),{ persistent: true },));}}});});setTimeout(() => {conn.close();process.exit(0);}, 1000);});
}start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName = 'queue1';channel.assertQueue(queueName, { durable: true }, (err2) => {if (err2) {console.log('队列创建失败', err2);return;}console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);channel.bindQueue(queueName, 'exchange1', 'info', {}, (err3, ok) => {console.log(queueName, '队列绑定结果', err3, ok);});channel.bindQueue(queueName, 'exchange1', 'warn', {}, (err3, ok) => {console.log(queueName, '队列绑定结果', err3, ok);});channel.bindQueue(queueName, 'exchange1', 'error', {}, (err3, ok) => {console.log(queueName, '队列绑定结果', err3, ok);});channel.consume(queueName,function (msg) {console.log('接收到的消息', msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {//   channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},(err3: any, ok: Replies.Empty) => {console.log(err3, ok);},);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

consumer2.ts

import RabbitMQ from 'amqplib';const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}
});conn.on("close", function () {console.error("[AMQP] reconnecting");
});const channel = await conn.createChannel();const queueName = 'queue2';await channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);await channel.bindQueue(queueName, 'exchange1', 'error', {});channel.consume(queueName,function (msg) {console.log('接收到的消息', msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {//   channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},
);

示例二、Topic

exchange的topic类型和direct类似,使用的仍然是routeKey进行匹配转发,topic支持通过*和#进行模糊查询。*代码一个具体单词,#代码0或多个单词。

producer.ts

import RabbitMQ from 'amqplib';async function start() {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});try {const channel = await conn.createChannel();console.log("[AMQP] connected");const exchangeName = 'exchange4';await channel.assertExchange(exchangeName, 'topic', { durable: true });let args = ['123.orange.456', '123.456.rabbit', 'lazy', 'lazy.123', 'lazy.123.456'];for (let i = 0; i < 20; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));const routeKey = args[Math.floor(Math.random() * args.length)];console.log('消息发送是否成功', channel.publish(exchangeName,routeKey,Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}`),{ persistent: true },));}} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}setTimeout(() => {conn.close();process.exit(0);}, 1000);
}start();

consumer.ts

import RabbitMQ from 'amqplib';const conn = await RabbitMQ.connect('amqp://admin:admin1234@localhost:5672');conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}
});conn.on("close", function () {console.error("[AMQP] reconnecting");
});const channel = await conn.createChannel();const queueName = 'queue1';channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载
await channel.prefetch(1);// *代码一个具体单词,#代码0或多个单词
await channel.bindQueue(queueName, 'exchange4', '*.orange.*', {});channel.consume(queueName, function (msg) {console.log('接收到的消息', msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}
}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false
});

consumer2.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName = 'queue2';channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);channel.bindQueue(queueName, 'exchange4', '*.*.rabbit', {}, (err, ok) => {console.log(queueName, '队列绑定结果', err, ok);});channel.bindQueue(queueName, 'exchange4', 'lazy.#', {}, (err, ok) => {console.log(queueName, '队列绑定结果', err, ok);});channel.consume(queueName, function (msg) {console.log('接收到的消息', msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false});// return,error事件不会把消息重新放回队列channel.on('return', (msg) => {console.error('消息发送失败:', msg);});channel.on('error', (err) => {console.error('通道发生错误:', err);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

示例三、Headers

 exchange类型headers,根据传递的头部信息进行转发,头部信息类型为object对象。在头部信息中要设置x-match属性,'x-match': 'any', any,下方消息匹配上一个就可以。all,下方消息要全部匹配。

producer.ts

import RabbitMQ from 'amqplib/callback_api';function start() {RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {if (err0) {console.error("[AMQP]", err0.message);return setTimeout(start, 1000);}conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");conn.createChannel(async (err2, channel) => {if (err2) {console.error("[AMQP]", err2.message);return setTimeout(start, 1000);}const exchangeName = 'exchange5';channel.assertExchange(exchangeName,'headers',{durable: true},(err, ok) => {if (err) {console.log('exchange路由转发创建失败', err);} else {let args = [{// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配'loglevel': 'info',// 'buslevel': 'product',// 'syslevel': 'admin'},{// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配// 'loglevel': 'info','buslevel': 'product','syslevel': 'admin'},{// 'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配// 'loglevel': 'info','buslevel': 'product',// 'syslevel': 'admin'},{// 'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配'loglevel': 'info','buslevel': 'product','syslevel': 'admin'},];for (let i = 0; i < 20; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));const routeKey = args[Math.floor(Math.random() * args.length)];console.log('消息发送是否成功', routeKey, channel.publish(exchangeName,'',Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)},${JSON.stringify(routeKey)}`),{persistent: true,headers: routeKey},));}}});});setTimeout(() => {conn.close();process.exit(0);}, 1000);});
}start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName = 'queue1';channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// *代码一个具体单词,#代码0或多个单词channel.bindQueue(queueName,'exchange5','',{'x-match': 'any', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配'loglevel': 'info','buslevel': 'product','syslevel': 'admin'},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName, function (msg) {console.log('接收到的消息', msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});// return,error事件不会把消息重新放回队列channel.on('return', (msg) => {console.error('消息发送失败:', msg);});channel.on('error', (err) => {console.error('通道发生错误:', err);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

consumer2.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName = 'queue2';channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);channel.bindQueue(queueName,'exchange5','',{'x-match': 'all', // any,下方消息匹配上一个就可以; all,下方消息要全部匹配'loglevel': 'info','buslevel': 'product','syslevel': 'admin'},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName,function (msg) {console.log('接收到的消息', msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {//   channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},(err: any, ok: Replies.Empty) => {console.log(err, ok);},);// return,error事件不会把消息重新放回队列channel.on('return', (msg) => {console.error('消息发送失败:', msg);});channel.on('error', (err) => {console.error('通道发生错误:', err);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});
http://www.dtcms.com/a/455830.html

相关文章:

  • 专业建设外贸网站定制网站设计高端网站建设
  • 升阳广州做网站公司好看的论坛源码
  • 物流的网站模板免费下载微网站 一键拨号
  • php做网站难么国内免费视频素材无水印素材网站
  • 有凡客模版怎么建设网站做钓鱼网站会被抓吗
  • 中国网站设计模板下载兰州西固区公司网站建设
  • 免费网站代码大全免费网站托管平台
  • 四川城乡住房城乡建设厅网站网站整体色调
  • 梧州网站优化价格做网站都需要租服务器吗
  • 政务网站建设相关文件工程装修
  • 局域网手动vxlan集中式网关整体配置方案
  • 如何访问服务器上的网站苏州免费网站制作
  • 沧州外贸网站建设青岛 茶叶网站建设
  • 国外优秀的平面设计网站云南省建设厅网站查询
  • 在哪里找做网站的客户食品网站网页设计
  • 常州公诚建设项目管理有限公司官方网站佛山网络科技公司有哪些
  • 湖南网站推广建设公司网站注销备案
  • 哪个网站做新中式技术研发流程的六个阶段
  • 开源快速网站搭建平台山东港基建设集团网站
  • 网站优化 书wordpress能建什么网站
  • 毕设DW做网站的过程妇幼保健院人流价格表
  • 浙江省城乡住房建设厅网站信息中心完成网站建设
  • 做网站卖高仿什么是网站建设技术
  • 建设论坛网站自学上海工程建设咨询有限公司
  • 查询网站备案进度查询乐清网站改版公司
  • 建设银行网站不能登录密码北京市建设工程交易服务平台
  • wordpress 中英文网站网站建设企业邮箱制作网站
  • 什么网站可以做相册视频去哪里学习wordpress
  • 企业 宣传 还要网站吗自己做微网站
  • 网站维护中要多久才能重新进入北京注册公司代理机构