当前位置: 首页 > news >正文

在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件(rabbitmq_sharding)允许将消息分布到多个队列中,这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载,从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间分配负载,避免单个队列过载。(注:不能单独消费分片消息。消息分片不利于消息顺序区分)

启用消息分片插件。 

rabbitmq-plugins enable rabbitmq_sharding 

示例

通过rabbitmq management添加策略,用于分片消息匹配转发。

或者通过命令添加策略 

CTL set_policy images-shard "queue10" '{"shards-per-node": 3, "routing-key": "sharding"}'

producer.ts

import RabbitMQ from 'amqplib';


async function start() {
  try {
    const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60");

    conn.on("error", function (err1) {
      if (err1.message !== "Connection closing") {
        console.error("[AMQP] conn error", err1.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");

    let channel = null;
    try {
      channel = await conn.createChannel();
    } catch (err) {
      console.error("[AMQP]", err);
      return setTimeout(start, 1000);
    }

    const exchangeName = 'exchange_queue10';
    await channel.assertExchange(
      exchangeName,
      'x-modulus-hash',
      {
        durable: true,
        arguments: {
          'x-modulus': 3 // 分片数量(需与队列分片数匹配)
        }
      },
    );

    let routeKey = '';
    for (let i = 0; i < 1000; ++i) {
      // console.log('message send!', channel.sendToQueue(
      //   queueName,
      //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
      //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
      //   // (err: any, ok: Replies.Empty)=>{}
      // ));

      let num = Math.ceil(Math.random() * 100000);
      console.log('消息发送是否成功', num, routeKey, channel.publish(
        exchangeName,
        `${routeKey}${i}`,
        Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),
        {
          persistent: true,
        },
      ));
    }

    setTimeout(() => {
      conn.close();
      process.exit(0);
    }, 1000);
  } catch (err) {
    console.error("[AMQP]", err);
    return setTimeout(start, 1000);
  }
}

start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';


RabbitMQ.connect('amqp://admin:admin1234@localhost:5672//mirror', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    console.log('[*] waiting...');

    const exchangeName = 'exchange_queue10';

    channel.prefetch(32);

    // for(let i=0;i<3;++i){
    //   channel.assertQueue(queueName, { durable: true }, () => {
    //     channel.bindQueue(queueName, exchangeName, `shard_${shardId}`);
    //   });
    // }

    channel.consume(exchangeName, function (msg) {
      if(msg){
        console.log(`队列'${exchangeName}'接收到的消息`, msg?.content.toString());
        // 第二个参数,false拒绝当前消息
        // 第二个参数,true拒绝小于等于当前消息
        // 第三个参数,3false从队列中清除
        // 第三个参数,4true从新在队列中排队
        channel.nack(msg, false, false);
      }
    }, {
      // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
      noAck: false,
      arguments: {}
    }, (err: any, ok: Replies.Empty) => {
      console.log(err, ok);
    });

  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});

相关文章:

  • Pandas数据填充(fill)中的那些坑:避免机器学习中的数据泄露
  • Arduino 第四章:数字输出 —— 深入解析引脚差异与 LED 顺序点亮实践
  • 人生的转折点反而迷失了方向
  • 分布式技术
  • 【C++】C++-教师信息管理系统(含源码+数据文件)【独一无二】
  • LabVIEW用户界面设计原则
  • 【Elasticsearch】文本分析Text analysis概述
  • 面向 Data+AI 的新一代智能数仓平台
  • 实现Tree 树形控件的鼠标拖拽功能
  • USB Flash闪存驱动器安全分析(第一部分)
  • java nio 原理 非阻塞IO Netty
  • sql注入中information_schema被过滤的问题
  • Android车机DIY开发之软件篇(十二) AOSP12下载编译
  • UI用例调试_元素能定位到且不在frame内_无法点击/录入文本
  • 利用Firewalld和Iptables实现IP端口限制与开放
  • SQL最佳实践(笔记)
  • 12.项目结构
  • [Android] 【汽车OBD软件】Torque Pro (OBD 2 Car)
  • 降本增效 - VGF 构建轻量高性能日志管理平台
  • DeepSeek v3 技术报告阅读笔记
  • 平安资管总经理罗水权因个人工作原因辞职
  • 圆桌|如何应对特朗普政府的关税霸凌?一种联合国视角的思考
  • 来伊份一季度净利减少近八成,今年集中精力帮助加盟商成功
  • 演员刘美含二手集市被曝售假,本人道歉
  • 西湖大学本科招生新增三省两市,首次面向上海招生
  • 江苏银行一季度净赚近98亿增逾8%,不良贷款率微降