当前位置: 首页 > news >正文

.net 8压榨rabbitMq性能

dotnet 8压榨rabbitMq性能


文章目录

  • dotnet 8压榨rabbitMq性能
  • 前言
    • 测试环境
      • 项目数据:
      • 测试程序所在机器配置:
      • 远程rabbimq服务器配置:
  • 一、单队列读写测试
    • 通过消息条数控制批次
      • 实现代码
      • 1.1 生产消费分开测试
        • 网络开销情景
          • 非压缩
          • 压缩
        • 无网络开销情景
          • 压缩
          • 非压缩
      • 1.2 生产消费同时进行
        • 网络开销情景
          • 非压缩
          • 压缩
      • 1.3 生产消费同时进行(本机测试)
        • 无网络开销情景
          • 压缩
          • 非压缩
      • 结论
    • 通过消息体大小控制批次
        • 实现代码
      • 1.1 生产消费分开测试
        • 无网络开销情景
          • 非压缩
  • 二、多队列读写测试
    • 硬件信息
    • 数据量级
    • 测试代码
      • 非压缩模式
        • 2个队列
          • 数据量级
        • 3个队列
          • 数据量级
        • 4个队列
          • 数据量级
        • 5个队列
          • 数据量级
        • 6个队列
        • 7个队列
        • 8个队列
      • 压缩模式
        • 2个队列
          • 数据量级
        • 3个队列
          • 数据量级
        • 6个队列
          • 优化后
  • 总结


前言

本文主要讲对rabbitMq的性能测试,以及在dotnetCore里如果正确的使用rabbitmq,废话不多说直接看数据


测试环境

项目数据:

  • 项目框架采用.net 8
  • 与rabbitMQ交互采用RabbitMq.Client(7.1.2)
  • 序列化与反序列化采用MessagePack(3.1.4)

测试目标:写入和读取速度不得小于10000条/s

测试程序所在机器配置:

  • CPU: i5-12400 (2.5GHz)

  • 运行内存:16GB

  • 可用内存:4.6GB

远程rabbimq服务器配置:

  • CPU: Intel® Xeon® CPU E5-2667 v4 @ 3.20GHz
  • 运行内存:7.57GB
  • 可用内存(5.67GB)

一、单队列读写测试

  • 测试数据数量:100万
  • 测试数据大小:1024MB(1GB)

通过消息条数控制批次

实现代码

这里只是生产者的部分实现代码,完整代码后续会分享下载连接

/// <summary>
/// 持续写入
/// </summary>
/// <returns></returns>
private async Task ContinuousWriteAsync2()
{try{using var channel = await this._conn.CreateChannelAsync(this._queueName);using MemoryStream stream = new(524288);// 512KB缓冲区var lastErrValues = new ConcurrentBag<T>();var retryCount = 0;//重试次数byte[]? buffer = null;byte[]? messageBuffer = null;//实际发送数据var properties = new BasicProperties(){Persistent = true, // 根据需求可设置为false提高性能(但会牺牲持久性)};do{var values = lastErrValues.Count > 0 ? [.. lastErrValues] : this.GetValues();try{if (values.Count == 0){// 优化等待策略,使用超时等待避免不必要的循环await this._reading.WaitAsync(50); // 减少等待时间提高响应速度continue;}// 优化内存使用stream.SetLength(0);await this._objectSerializer.SerializeAsync(stream, values);// 获取序列化后的数据长度int dataLength = (int)stream.Position;if (buffer == null || buffer.Length < dataLength){buffer = ArrayPool<byte>.Shared.Rent(dataLength);}stream.Position = 0;stream.ReadExactly(buffer, 0, dataLength);messageBuffer = buffer;if (_conn.Options.Compression){if (buffer.Length > 1024)//大于1kb再压缩{properties = new BasicProperties{Persistent = true,Headers = new Dictionary<string, object?>{["compressed"] = true,  // 用于消费者识别["original_size"] = buffer.Length}};messageBuffer = this._objectSerializer.Compress(buffer);}}var messageBody = new ReadOnlyMemory<byte>(messageBuffer, 0, messageBuffer.Length);// mandatory设置为false,不可路由数据直接丢了await channel.BasicPublishAsync(exchange: string.Empty, routingKey: this._queueName, mandatory: false, basicProperties: properties,body: messageBody);lastErrValues.Clear();retryCount = 0;}catch (Exception ex) when (retryCount < 2)//最多重试2次{this._logger.LogError(ex, "RabbitMQ Publish Message Exception{NewLine}Source:{@Values}", Environment.NewLine, values);lastErrValues = values;await Task.Delay(1000 * (int)Math.Pow(2, retryCount++)); // 指数退避 }catch (Exception finalEx)//尝试5次后依然失败{//写入二进制文件this._logger.LogError(finalEx, "RabbitMQ Publish Message Exception{NewLine}Source:{@Values}", Environment.NewLine, values);//写文件之后清空lastErrValues.Clear();}finally{// 释放ArrayPool中的缓冲区if (buffer != null){ArrayPool<byte>.Shared.Return(buffer);buffer = null;}}} while (true);}catch (Exception ex){this._logger.LogError(ex, "{QueueName} ContinuousWriteAsync Exception", this._queueName);}
}
private ConcurrentBag<T> GetValues()
{// 快速检查是否有数据if (!this._queue.Reader.TryPeek(out _)){return [];}var list = new ConcurrentBag<T>();int itemsRead = 0;while (itemsRead < this._conn.Options.SingleMaxCount && this._queue.Reader.TryRead(out var item)){list.Add(item);itemsRead++;}return list;
}

1.1 生产消费分开测试

网络开销情景

网络环境为,本机(192.168.1.181)请求 RabbitMQ服务器(192.168.1.88)
该项测试跟但是网络环境有关系,存在小幅波动,结果取的中间值。

注意 :下文的压缩处理时自己实现的不是rabbitmq自带的

非压缩
操作耗时速率
生产数据30303.8937ms33333条/秒
消费数据30970.8971ms32289条/秒
压缩
操作耗时速率
生产数据20241.7993ms50000条/秒
消费数据18522.4573ms54054条/秒
无网络开销情景
压缩
操作耗时速率
生产数据2454.051ms407498条/秒
消费数据5698.1219ms175500条/秒
非压缩
操作耗时速率
生产数据1424.0384ms714285条/秒
消费数据5038.9806ms200000条/秒

1.2 生产消费同时进行

生产和消费同时进行时记录时间的节点为:
开始时间:第一条数据开始插入
结束时间:最后一条数据读取结束

网络开销情景

网络环境为,本机(192.168.1.181)请求 RabbitMQ服务器(192.168.1.88)
该项测试跟但是网络环境有关系,存在小幅波动,结果取的中间值。

非压缩
耗时速率
50356.5951ms20000条/秒
压缩
耗时速率
27989.461ms35714条/秒

1.3 生产消费同时进行(本机测试)

无网络开销情景
压缩
耗时速率
8341.0121ms120482条/秒
非压缩
耗时速率
7706.8663ms129870条/秒

结论

在本机运行时关闭压缩可提高处理速度,当处于需要网络开销的环境时,开启压缩能提高处理速度

通过消息体大小控制批次

实现代码

这里只是生产者的部分实现代码,完整代码后续会分享下载连接

 private async Task ContinuousWriteAsync(CancellationToken cancellationToken){const decimal MaxBatchSize = 4 * 1024 * 1024; // 4MBconst int TimeoutMs = 1000;using var channel = await this._conn.CreateChannelAsync(this._queueName);var properties = new BasicProperties { Persistent = true };var batch = new List<T>(); // 用于收集当前批次var tempStream = new MemoryStream(); // 临时流用于序列化检查decimal count = 0;try{while (!cancellationToken.IsCancellationRequested){// 等待新数据或超时var timeoutTask = Task.Delay(TimeoutMs, cancellationToken);var readTask = _queue.Reader.WaitToReadAsync(cancellationToken).AsTask();var completedTask = await Task.WhenAny(readTask, timeoutTask);// 超时且有数据 → 发送if (completedTask == timeoutTask && batch.Count > 0){await PublishBatchAsync(batch, channel, properties, cancellationToken);batch.Clear();continue;}if (cancellationToken.IsCancellationRequested) break;// 读取一个 itemif (await readTask && _queue.Reader.TryRead(out T? item) && item != null){//计算需要几个对象填充才能到限制大小if (batch.Count == 0){tempStream.SetLength(0);tempStream.Position = 0;await MessagePackSerializer.SerializeAsync(tempStream, item);count = Math.Ceiling(MaxBatchSize / tempStream.Length);}batch.Add(item);if(batch.Count==count){tempStream.SetLength(0);tempStream.Position = 0;await MessagePackSerializer.SerializeAsync(tempStream, batch);await PublishBatchAsync(batch, channel, properties, cancellationToken);batch.Clear();count = 0;}}}}catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested){// 正常退出}catch (Exception ex){_logger.LogError(ex, "Error in ContinuousWriteAsync");}finally{// 发送剩余数据if (batch.Count > 0){await PublishBatchAsync(batch, channel, properties, cancellationToken);}tempStream.Dispose();}}private async Task PublishBatchAsync(List<T> batch,IChannel channel,BasicProperties properties,CancellationToken cancellationToken){if (batch.Count == 0) return;try{using var stream = new MemoryStream();await MessagePackSerializer.SerializeAsync(stream, batch, cancellationToken: cancellationToken);var body = stream.ToArray(); // 或直接用 stream.TryGetBuffer()await channel.BasicPublishAsync(exchange: string.Empty,routingKey: this._queueName,mandatory: false,basicProperties: properties,body: new ReadOnlyMemory<byte>(body),cancellationToken: cancellationToken);}catch (Exception ex){_logger.LogError(ex, "Failed to publish batch of {Count} items", batch.Count);// TODO: 落盘或重试逻辑}}

1.1 生产消费分开测试

无网络开销情景
非压缩
操作耗时速率
生产数据4563.7953ms222222条/秒
消费数据4963.4871ms204081条/秒

由这里的数据和上面用条数控制的数据对比发现,这种使用包大小控制的方式性能不如用条数控制,因此后续测试直接不再做了直接放弃这种方式;

二、多队列读写测试

由于上面但队列测试体现出用数据条数控量的方式性能更好,所以我们后续都是采用以数据条数控制的方式进行测试。测试采用本地测试

硬件信息

  • CPU: i5-12400 (2.5GHz)

  • 运行内存:16GB

  • 可用内存:10GB

数据量级

队列数量:6个
每个队列数据量:100万(总共600万)
每个队列数据大小:1G(总共6G)

测试代码

Progran中的代码

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ_Server;
using RabbitMq_Test;
using System.Diagnostics;var host = Host.CreateDefaultBuilder(args).ConfigureServices((context, services) =>{ services.AddRabbitMQ(context.Configuration.GetSection("AppSettings"));services.AddSingleton<TestFunc>();services.AddSingleton<TestFunc2>();services.AddSingleton<TestFunc3>();services.AddSingleton<TestFunc4>();services.AddSingleton<TestFunc5>();services.AddSingleton<TestFunc6>();}).Build();  
var testFunc = host.Services.GetRequiredService<TestFunc>();
var testFunc2 = host.Services.GetRequiredService<TestFunc2>();
var testFunc3 = host.Services.GetRequiredService<TestFunc3>();
var testFunc4 = host.Services.GetRequiredService<TestFunc4>();
var testFunc5 = host.Services.GetRequiredService<TestFunc5>();
var testFunc6 = host.Services.GetRequiredService<TestFunc6>();
var logger = host.Services.GetRequiredService<ILogger<Program>>();try
{Console.WriteLine("[1]单队列消费生产同时进行耗时统计");Console.WriteLine("[2]2个队列消费生产同时进行耗时统计");Console.WriteLine("[3]3个队列消费生产同时进行耗时统计");Console.WriteLine("[4]4个队列消费生产同时进行耗时统计");Console.WriteLine("[5]5个队列消费生产同时进行耗时统计");Console.WriteLine("[6]6个队列消费生产同时进行耗时统计");Console.WriteLine("[Q] 退出");Console.WriteLine("请输入选项:");string? val=Console.ReadLine();if(!string.IsNullOrWhiteSpace(val)){val = val.Trim().ToLower();if (val == "q") return;switch (val){case "1":await Func1();break;case "2":await Func2();break;case "3":await Func3();break;case "4":await Func4();break;case "5":await Func5();break;case "6":await Func6(); break;default: break;}}}
catch (Exception ex)
{logger.LogError(ex, "调用 AddData 时发生错误");
}// ========== 测试函数定义 ==========Task Func1()
{Console.WriteLine("开始生成数据");var data = TestEntity.Generate(1000000);var stopwatch = Stopwatch.StartNew();Console.WriteLine("生成数据完成");var task1 = Task.Run(() => testFunc.AddData(data, CancellationToken.None));var task2 = Task.Run(() => testFunc.ReadData(stopwatch));var task3 = Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None));var task4 = Task.Run(() => testFunc.PrintData(CancellationToken.None));return Task.WhenAll(task1, task2, task3, task4);
}Task Func2()
{Console.WriteLine("开始生成数据");var data1 = TestEntity.Generate(1000000);var data2 = TestEntity2.Generate(1000000);Console.WriteLine("生成数据完成");var sw1 = Stopwatch.StartNew();var sw2 = Stopwatch.StartNew();var tasks = new[]{Task.Run(() => testFunc.AddData(data1, CancellationToken.None)),Task.Run(() => testFunc.ReadData(sw1)),Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc.PrintData(CancellationToken.None)),Task.Run(() => testFunc2.AddData(data2, CancellationToken.None)),Task.Run(() => testFunc2.ReadData(sw2)),Task.Run(() => testFunc2.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc2.PrintData(CancellationToken.None))};return Task.WhenAll(tasks);
}Task Func3()
{Console.WriteLine("开始生成数据");var data1 = TestEntity.Generate(1000000);var data2 = TestEntity2.Generate(1000000);var data3 = TestEntity3.Generate(1000000);Console.WriteLine("生成数据完成");var sw1 = Stopwatch.StartNew();var sw2 = Stopwatch.StartNew();var sw3 = Stopwatch.StartNew();var tasks = new List<Task>{// Func1Task.Run(() => testFunc.AddData(data1, CancellationToken.None)),Task.Run(() => testFunc.ReadData(sw1)),Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc.PrintData(CancellationToken.None)),// Func2Task.Run(() => testFunc2.AddData(data2, CancellationToken.None)),Task.Run(() => testFunc2.ReadData(sw2)),Task.Run(() => testFunc2.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc2.PrintData(CancellationToken.None)),// Func3Task.Run(() => testFunc3.AddData(data3, CancellationToken.None)),Task.Run(() => testFunc3.ReadData(sw3)),Task.Run(() => testFunc3.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc3.PrintData(CancellationToken.None))};return Task.WhenAll(tasks);
}// 类似地实现 Func4 ~ Func6
Task Func4()
{Console.WriteLine("开始生成数据");var data1 = TestEntity.Generate(1000000);var data2 = TestEntity2.Generate(1000000);var data3 = TestEntity3.Generate(1000000);var data4 = TestEntity4.Generate(1000000);Console.WriteLine("生成数据完成");var sw1 = Stopwatch.StartNew();var sw2 = Stopwatch.StartNew();var sw3 = Stopwatch.StartNew();var sw4 = Stopwatch.StartNew(); var tasks = new List<Task>{// Func1Task.Run(() => testFunc.AddData(data1, CancellationToken.None)),Task.Run(() => testFunc.ReadData(sw1)),Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc.PrintData(CancellationToken.None)),// Func2Task.Run(() => testFunc2.AddData(data2, CancellationToken.None)),Task.Run(() => testFunc2.ReadData(sw2)),Task.Run(() => testFunc2.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc2.PrintData(CancellationToken.None)),// Func3Task.Run(() => testFunc3.AddData(data3, CancellationToken.None)),Task.Run(() => testFunc3.ReadData(sw3)),Task.Run(() => testFunc3.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc3.PrintData(CancellationToken.None)),//Func4Task.Run(() => testFunc4.AddData(data4, CancellationToken.None)),Task.Run(() => testFunc4.ReadData(sw4)),Task.Run(() => testFunc4.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc4.PrintData(CancellationToken.None))};return Task.WhenAll(tasks);
}Task Func5()
{Console.WriteLine("开始生成数据");var data1 = TestEntity.Generate(1000000);var data2 = TestEntity2.Generate(1000000);var data3 = TestEntity3.Generate(1000000); var data4 = TestEntity4.Generate(1000000); var data5 = TestEntity5.Generate(1000000);Console.WriteLine("生成数据完成");var sw1 = Stopwatch.StartNew();var sw2 = Stopwatch.StartNew();var sw3 = Stopwatch.StartNew();var sw4 = Stopwatch.StartNew();var sw5 = Stopwatch.StartNew();var tasks = new List<Task>{// Func1Task.Run(() => testFunc.AddData(data1, CancellationToken.None)),Task.Run(() => testFunc.ReadData(sw1)),Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc.PrintData(CancellationToken.None)),// Func2Task.Run(() => testFunc2.AddData(data2, CancellationToken.None)),Task.Run(() => testFunc2.ReadData(sw2)),Task.Run(() => testFunc2.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc2.PrintData(CancellationToken.None)),// Func3Task.Run(() => testFunc3.AddData(data3, CancellationToken.None)),Task.Run(() => testFunc3.ReadData(sw3)),Task.Run(() => testFunc3.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc3.PrintData(CancellationToken.None)),//Func4Task.Run(() => testFunc4.AddData(data4, CancellationToken.None)),Task.Run(() => testFunc4.ReadData(sw4)),Task.Run(() => testFunc4.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc4.PrintData(CancellationToken.None)),//Func5Task.Run(() => testFunc5.AddData(data5, CancellationToken.None)),Task.Run(() => testFunc5.ReadData(sw5)),Task.Run(() => testFunc5.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc5.PrintData(CancellationToken.None))};return Task.WhenAll(tasks);
}Task Func6()
{Console.WriteLine("开始生成数据");var data1 = TestEntity.Generate(1000000);var data2 = TestEntity2.Generate(1000000);var data3 = TestEntity3.Generate(1000000); var data4 = TestEntity4.Generate(1000000); var data5 = TestEntity5.Generate(1000000); var data6 = TestEntity6.Generate(1000000);Console.WriteLine("生成数据完成");var sw1 = Stopwatch.StartNew();var sw2 = Stopwatch.StartNew();var sw3 = Stopwatch.StartNew();var sw4 = Stopwatch.StartNew();var sw5 = Stopwatch.StartNew();var sw6 = Stopwatch.StartNew();var tasks = new List<Task>{// Func1Task.Run(() => testFunc.AddData(data1, CancellationToken.None)),Task.Run(() => testFunc.ReadData(sw1)),Task.Run(() => testFunc.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc.PrintData(CancellationToken.None)),// Func2Task.Run(() => testFunc2.AddData(data2, CancellationToken.None)),Task.Run(() => testFunc2.ReadData(sw2)),Task.Run(() => testFunc2.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc2.PrintData(CancellationToken.None)),// Func3Task.Run(() => testFunc3.AddData(data3, CancellationToken.None)),Task.Run(() => testFunc3.ReadData(sw3)),Task.Run(() => testFunc3.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc3.PrintData(CancellationToken.None)),//Func4Task.Run(() => testFunc4.AddData(data4, CancellationToken.None)),Task.Run(() => testFunc4.ReadData(sw4)),Task.Run(() => testFunc4.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc4.PrintData(CancellationToken.None)),//Func5Task.Run(() => testFunc5.AddData(data5, CancellationToken.None)),Task.Run(() => testFunc5.ReadData(sw5)),Task.Run(() => testFunc5.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc5.PrintData(CancellationToken.None)),//Func6Task.Run(() => testFunc6.AddData(data6, CancellationToken.None)),Task.Run(() => testFunc6.ReadData(sw6)),Task.Run(() => testFunc6.ReadDataToChannel(CancellationToken.None)),Task.Run(() => testFunc6.PrintData(CancellationToken.None)),};return Task.WhenAll(tasks);
}
await host.RunAsync();

非压缩模式

2个队列
数据量级

队列数量:2个
每个队列数据量:100万(总共200万)
每个队列数据大小:1G(总共2G)

队列名称耗时速率
RabbitMq_Test.TestEntity27109.9938ms140845条/秒
RabbitMq_Test.TestEntity7260.0543ms138888条/秒
3个队列
数据量级

队列数量:3个
每个队列数据量:100万(总共300万)
每个队列数据大小:1G(总共3G)

队列名称耗时速率
RabbitMq_Test.TestEntity312269.7164ms81967条/秒
RabbitMq_Test.TestEntity212433.5706ms80645条/秒
RabbitMq_Test.TestEntity12289.7275ms81967条/秒
4个队列
数据量级

队列数量:4个
每个队列数据量:100万(总共400万)
每个队列数据大小:1G(总共4G)

队列名称耗时速率
RabbitMq_Test.TestEntity417991.7397ms55865条/秒
RabbitMq_Test.TestEntity318265.5919ms54749条/秒
RabbitMq_Test.TestEntity218059.6052ms55555条/秒
RabbitMq_Test.TestEntity19922.9437ms50251条/秒
5个队列
数据量级

队列数量:5个
每个队列数据量:100万(总共500万)
每个队列数据大小:1G(总共5G)

队列名称耗时速率
RabbitMq_Test.TestEntity524362.8939ms41047条/秒
RabbitMq_Test.TestEntity424503.1196ms40811条/秒
RabbitMq_Test.TestEntity324427.8169ms40938条/秒
RabbitMq_Test.TestEntity224382.5936ms41013条/秒
RabbitMq_Test.TestEntity29531.8912ms33862条/秒
6个队列

队列数量:6个
每个队列数据量:100万(总共600万)
每个队列数据大小:1G(总共6G)

队列名称耗时速率
RabbitMq_Test.TestEntity646230.9052ms21630条/秒
RabbitMq_Test.TestEntity546066.7173ms21739条/秒
RabbitMq_Test.TestEntity446598.328ms21460条/秒
RabbitMq_Test.TestEntity346625.4741ms21447条/秒
RabbitMq_Test.TestEntity249083.6873ms20408条/秒
RabbitMq_Test.TestEntity46938.8805ms21304条/秒
7个队列

队列数量:7个
每个队列数据量:100万(总共700万)
每个队列数据大小:1G(总共7G)

队列名称耗时速率
RabbitMq_Test.TestEntity747649.6311ms20986条/秒
RabbitMq_Test.TestEntity653631.2966ms18645条/秒
RabbitMq_Test.TestEntity554405.6605ms18380条/秒
RabbitMq_Test.TestEntity455427.3113ms18041条/秒
RabbitMq_Test.TestEntity355610.2502ms17982条/秒
RabbitMq_Test.TestEntity247273.7346ms21153条/秒
RabbitMq_Test.TestEntity54895.6962ms18216条/秒
8个队列

队列数量:8个
每个队列数据量:100万(总共800万)
每个队列数据大小:1G(总共8G)

队列名称耗时速率
RabbitMq_Test.TestEntity759807.4547ms16720条/秒
RabbitMq_Test.TestEntity658438.524ms17112条/秒
RabbitMq_Test.TestEntity544158.4624ms22645条/秒
RabbitMq_Test.TestEntity458413.4717ms17119条/秒
RabbitMq_Test.TestEntity359909.4938ms16691条/秒
RabbitMq_Test.TestEntity284004.0464ms11904条/秒
RabbitMq_Test.TestEntity34604.4826ms28898条/秒
RabbitMq_Test.TestEntity854438.524ms18369条/秒

压缩模式

2个队列
数据量级

队列数量:2个
每个队列数据量:100万(总共200万)
每个队列数据大小:1G(总共2G)

队列名称耗时速率
RabbitMq_Test.TestEntity27375.0325ms136986条/秒
RabbitMq_Test.TestEntity7806.1246ms128205条/秒
3个队列
数据量级

队列数量:3个
每个队列数据量:100万(总共300万)
每个队列数据大小:1G(总共3G)

队列名称耗时速率
RabbitMq_Test.TestEntity312699.2807ms78746条/秒
RabbitMq_Test.TestEntity212860.3425ms77760条/秒
RabbitMq_Test.TestEntity12289.7275ms81373条/秒
6个队列

队列数量:6个
每个队列数据量:100万(总共600万)
每个队列数据大小:1G(总共6G)

队列名称耗时速率
RabbitMq_Test.TestEntity639127.6586ms25557条/秒
RabbitMq_Test.TestEntity539006.5023ms25641条/秒
RabbitMq_Test.TestEntity439045.5548ms25611条/秒
RabbitMq_Test.TestEntity338941.9413ms25679条/秒
RabbitMq_Test.TestEntity238648.9858ms25874条/秒
RabbitMq_Test.TestEntity39351.7071ms25412条/秒

优化前生产者代码

private async Task ContinuousWriteAsync()
{try{using var channel = await this._conn.CreateChannelAsync(this._queueName);using MemoryStream stream = new(524288);// 512KB缓冲区var lastErrValues = new ConcurrentBag<T>();var retryCount = 0;//重试次数byte[]? buffer = null;byte[]? messageBuffer = null;//实际发送数据var properties = new BasicProperties(){Persistent = true, // 根据需求可设置为false提高性能(但会牺牲持久性)};do{var values = lastErrValues.Count > 0 ? [.. lastErrValues] : this.GetValues();try{if (values.Count == 0){// 优化等待策略,使用超时等待避免不必要的循环await this._reading.WaitAsync(50); // 减少等待时间提高响应速度continue;}// 优化内存使用stream.SetLength(0);await this._objectSerializer.SerializeAsync(stream, values);// 获取序列化后的数据长度int dataLength = (int)stream.Position;if (buffer == null || buffer.Length < dataLength){buffer = ArrayPool<byte>.Shared.Rent(dataLength);}stream.Position = 0;stream.ReadExactly(buffer, 0, dataLength);messageBuffer = buffer;if (_conn.Options.Compression){if (buffer.Length > 1024)//大于1kb再压缩{properties = new BasicProperties{Persistent = true,Headers = new Dictionary<string, object?>{["compressed"] = true,  // 用于消费者识别["original_size"] = buffer.Length}};messageBuffer = this._objectSerializer.Compress(buffer);}}var messageBody = new ReadOnlyMemory<byte>(messageBuffer, 0, messageBuffer.Length);// mandatory设置为false,不可路由数据直接丢了await channel.BasicPublishAsync(exchange: string.Empty, routingKey: this._queueName, mandatory: false, basicProperties: properties,body: messageBody);lastErrValues.Clear();retryCount = 0;}catch (Exception ex) when (retryCount < 2)//最多重试2次{this._logger.LogError(ex, "RabbitMQ Publish Message Exception{NewLine}Source:{@Values}", Environment.NewLine, values);lastErrValues = values;await Task.Delay(1000 * (int)Math.Pow(2, retryCount++)); // 指数退避 }catch (Exception finalEx)//尝试5次后依然失败{//写入二进制文件this._logger.LogError(finalEx, "RabbitMQ Publish Message Exception{NewLine}Source:{@Values}", Environment.NewLine, values);//写文件之后清空lastErrValues.Clear();}finally{// 释放ArrayPool中的缓冲区if (buffer != null){ArrayPool<byte>.Shared.Return(buffer);buffer = null;}}} while (true);}catch (Exception ex){this._logger.LogError(ex, "{QueueName} ContinuousWriteAsync Exception", this._queueName);}
}
private ConcurrentBag<T> GetValues()
{// 快速检查是否有数据if (!this._queue.Reader.TryPeek(out _)){return [];}var list = new ConcurrentBag<T>();int itemsRead = 0;while (itemsRead < this._conn.Options.SingleMaxCount && this._queue.Reader.TryRead(out var item)){list.Add(item);itemsRead++;}return list;
}

这里我发现代码有几个点可以优化,

  • ConcurrentBag(这里不会出现线程竞争不需要用它)
  • 压缩时properties 在循环里创建
优化后

队列数量:6个
每个队列数据量:100万(总共600万)
每个队列数据大小:1G(总共6G)

队列名称优化后耗时优化前速率优化后速率
RabbitMq_Test.TestEntity634094.1119ms25557条/秒29330条/秒
RabbitMq_Test.TestEntity534107.0528ms25641条/秒29319条/秒
RabbitMq_Test.TestEntity435083.1341ms25611条/秒28503条/秒
RabbitMq_Test.TestEntity335171.1465ms25679条/秒28432条/秒
RabbitMq_Test.TestEntity235447.8285ms25874条/秒28211条/秒
RabbitMq_Test.TestEntity34792.7855ms25412条/秒28742条/秒

优化后生产者代码

private async Task ContinuousWriteAsync(CancellationToken cancellationToken)
{try{var channel = await this._conn.CreateChannelAsync(this._queueName);MemoryStream stream = new(524288);// 512KB缓冲区var lastErrValues = new List<T>();var retryCount = 0;//重试次数byte[]? buffer = null;byte[]? messageBuffer = null;//实际发送数据var properties = new BasicProperties(){Persistent = true, // 根据需求可设置为false提高性能(但会牺牲持久性)};var  compressedProperties= new BasicProperties{Persistent = true,Headers = new Dictionary<string, object?>{["compressed"] = true,  // 用于消费者识别}};do{List<T> values;if (lastErrValues.Count > 0){values = lastErrValues;lastErrValues = new List<T>(); // 清空并准备下次使用}else{values = this.GetValues();}try{if (values.Count == 0){// 优化等待策略,使用超时等待避免不必要的循环await this._reading.WaitAsync(50); // 减少等待时间提高响应速度continue;}// 优化内存使用stream.SetLength(0);await this._objectSerializer.SerializeAsync(stream, values);// 获取序列化后的数据长度int dataLength = (int)stream.Position;if (buffer == null || buffer.Length < dataLength){buffer = ArrayPool<byte>.Shared.Rent(dataLength);}stream.Position = 0;stream.ReadExactly(buffer, 0, dataLength);messageBuffer = buffer;if (_conn.Options.Compression){if (buffer.Length > 1024)//大于1kb再压缩{properties = compressedProperties;messageBuffer = this._objectSerializer.Compress(buffer);}}var messageBody = new ReadOnlyMemory<byte>(messageBuffer, 0, messageBuffer.Length);// mandatory设置为false,不可路由数据直接丢了await channel.BasicPublishAsync(exchange: string.Empty, routingKey: this._queueName, mandatory: false, basicProperties: properties,body: messageBody);retryCount = 0;}catch (Exception ex) when (retryCount < 2)//最多重试2次{this._logger.LogError(ex, "RabbitMQ Publish Message Exception{NewLine} Retry:{retryCount}", Environment.NewLine, retryCount);lastErrValues = values;await Task.Delay(1000 * (int)Math.Pow(2, retryCount++)); // 指数退避 }catch (Exception finalEx)//尝试5次后依然失败{//写入二进制文件this._logger.LogError(finalEx, "RabbitMQ Publish Message Exception{NewLine}Source:{@Values}", Environment.NewLine, values);//写文件之后清空lastErrValues.Clear();}finally{// 释放ArrayPool中的缓冲区if (buffer != null){ArrayPool<byte>.Shared.Return(buffer);buffer = null;}}} while (!cancellationToken.IsCancellationRequested);}catch (Exception ex){this._logger.LogError(ex, "{QueueName} ContinuousWriteAsync Exception", this._queueName);}
}
private List<T> GetValues()
{// 快速检查是否有数据if (!this._queue.Reader.TryPeek(out _)){return [];}var list = new List<T>(Math.Min(this._conn.Options.SingleMaxCount, 1024));int itemsRead = 0;while (itemsRead < this._conn.Options.SingleMaxCount && this._queue.Reader.TryRead(out var item)){list.Add(item);itemsRead++;}return list;
}

此时就体现出当数据量大时开启压缩会比不压缩的性能好,接下来我们再加数据量再进行测试,看压缩的优势会体现得更明显吗?由于时间问题我后面就没有再测试了,大家感兴趣可以去测一测

总结

以上就是我的测试数据,分享给大家希望对大家有帮助。测试代码我后续会上传到csdn上供大家下载,需要注意的是大家测试时使用的电脑配置不同可能得出的数据也会不同

http://www.dtcms.com/a/545701.html

相关文章:

  • 关于jupyter notebook调用GPU
  • 网站的建设课程做网站的实训报告
  • 商业网站的设计与推广系统湖南做网站
  • Adobe Lightroom Classic下载与安装教程(附安装包) 2025最新版详细图文安装教程
  • 仓颉语言赋能鸿蒙应用开发:UI主题样式定制的深度实践
  • 什么是 Adobe Experience Platform (AEP)?
  • 男孩子怎么做网站推广查询域名是否备案?
  • 帝国cms 关闭网站企业管理咨询心得体会
  • StarRocks 在 Cisco Webex 的探索与实践
  • 线程等待、终止与资源回收
  • NestJS 系列教程(十一):集成 Swagger 实现自动 API 文档与接口测试
  • 深圳招聘网站推荐上海华东建设发展设计有限公司网站
  • 网站建设 应酷wordpress4.0安装教程
  • SQlite:电影院售票系统中的主键(单列,复合)约束应用
  • 美橙互联网站建设涟水建设银行网站
  • 【推荐系统】深度学习训练框架(二):深入剖析Spark Cluster模式下DDP网络配置解析
  • 左右左右网站深圳云网站建站公司
  • npm error code ERR_SSL_TLSV1_UNRECOGNIZED_NAME
  • 规模大的企业建站wordpress是是什么技术
  • 从 “不会” 到 “会写”:Rust 入门基础实战,用一个小项目串完所有核心基础
  • 织梦网站图标更换网站开发教程百度云
  • SpringBoot14-集成Redis
  • Maven 下载和 Spring Boot 搭建
  • 怎么花最少的钱做网站上海建设工程招标网
  • 分布式锁Redis、ZooKeeper 和数据库实现分布式锁的优缺点、实现方式以及适用场景
  • 《创作一周年有感》
  • Rust:异步锁(Mutex、RwLock)的设计
  • EG1195S 带使能降压开关电源控制芯片技术解析
  • 关于解决stm32cubeIDE打开现有工程失败的方法:
  • 代码随想录 669.修剪二叉搜索树