当前位置: 首页 > news >正文

System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC

System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀


目录

  • System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀
    • 0. TL;DR —— 为什么选 Pipelines 🎯
    • 1. 帧协议 📦
    • 2. 代码公共部分:帧编解码 🧑‍💻
    • 3. Demo A:TCP + Pipelines 🌐
    • 4. Demo B:HTTP/1.1 + Pipelines 🌍
    • 5. 背压与 `AdvanceTo` 的正确姿势 💡
    • 6. 池化与零分配技巧清单 🛠
    • 7. 并发与流量控制 ⏱
    • 8. 压测与对比 🧑‍💻
    • 9. 可观测性与问题定位 🔍
    • 10. 错误处理与安全 🛡️
    • 11. 仓库结构 🗂️
    • 12. 选型建议 📝


0. TL;DR —— 为什么选 Pipelines 🎯

  • PipeReader.ReadAsync() 返回 ReadOnlySequence<byte>,天然支持跨段缓冲半帧,搭配 AdvanceTo(consumed, examined) 实现背压
  • SequenceReader<byte> 可以在不拷贝到托管数组的情况下解析协议字段;BinaryPrimitives 操作 Span/ReadOnlySpan 更高效。
  • Kestrel 暴露 BodyReader/BodyWriter,HTTP 形态也能享受 Pipes 的收益。
  • 配合内存池、写合并、并发限流,能在吞吐、延迟、分配三项上显著优于传统 Stream

1. 帧协议 📦

大端(网络序)固定头 8 字节

len:uint32   // 含头,总长度
type:uint16  // 0=Ping, 1=Echo, 2=Sum, 0xFFFF=Error
flags:uint16 // bit0=压缩; 其他保留
payload: len-8
  • Ping:空载
  • Echo:原样返回 payload(示例中演示第一段 Span 回声)
  • Sum:payload 为 N 个 int32(BE),返回 int32(BE)之和
  • Error:返回错误码/消息(演示版为简单文本)

2. 代码公共部分:帧编解码 🧑‍💻

src/Rpc.Protocol/Frame.cs

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;namespace Rpc.Protocol;public static class Frame
{public const int HeaderSize = 8;public const ushort TypePing  = 0;public const ushort TypeEcho  = 1;public const ushort TypeSum   = 2;public const ushort TypeError = 0xFFFF;[MethodImpl(MethodImplOptions.AggressiveInlining)]public static bool TryParseFrame(ref ReadOnlySequence<byte> buffer,out ushort type,out ushort flags,out ReadOnlySequence<byte> payload){type = 0; flags = 0; payload = default;if (buffer.Length < HeaderSize) return false;Span<byte> header = stackalloc byte[HeaderSize];buffer.Slice(0, HeaderSize).CopyTo(header);uint len = BinaryPrimitives.ReadUInt32BigEndian(header);type  = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(4));flags = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(6));if (len < HeaderSize) throw new InvalidOperationException("Invalid length");if (buffer.Length < len) return false; // 半帧var frame = buffer.Slice(0, len);payload = frame.Slice(HeaderSize, len - HeaderSize);buffer = buffer.Slice(len);return true;}public static void WriteFrame(PipeWriter writer, ushort type, ushort flags, ReadOnlySpan<byte> payload){int len = HeaderSize + payload.Length;Span<byte> span = writer.GetSpan(len);BinaryPrimitives.WriteUInt32BigEndian(span, (uint)len);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4), type);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(6), flags);payload.CopyTo(span.Slice(HeaderSize));writer.Advance(len);}
}

要点

  • 解析时仅拷头部到栈;payload 始终是原始 ReadOnlySequence<byte> 的切片(零拷贝)。
  • 写入时一次性拿到足够 Span,减少 Advance/Flush 次数。
  • len下限校验防御异常输入。

3. Demo A:TCP + Pipelines 🌐

src/Rpc.TcpServer/Program.cs

using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading.Channels;
using Rpc.Protocol;var listener = new TcpListener(IPAddress.Loopback, 5001);
listener.Start();
Console.WriteLine("TCP RPC listening on 127.0.0.1:5001");while (true)
{var client = await listener.AcceptTcpClientAsync();_ = Task.Run(() => Handle(client));
}static async Task Handle(TcpClient client)
{const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});using var _ = client;client.NoDelay = true;var stream = client.GetStream();var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 64 * 1024));var networkWriter = PipeWriter.Create(stream, new StreamPipeWriterOptions(MemoryPool<byte>.Shared, 64 * 1024, leaveOpen: true));var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared,pauseWriterThreshold: 256 * 1024,resumeWriterThreshold: 128 * 1024));var pumpTask = PumpSendAsync(sendPipe.Reader, networkWriter, flushThreshold: 64 * 1024, flushStopwatchMs: 2);var workerTask = Task.Run(() => WorkerAsync(workQueue.Reader, sendPipe.Writer));try{while (true){var result = await reader.ReadAsync();var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Payload too large")));continue;}Enqueue(workQueue, (type, payload));}}catch (Exception ex){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Bad frame: " + ex.Message)));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await workerTask;await sendPipe.Writer.CompleteAsync();await pumpTask;await networkWriter.CompleteAsync();await reader.CompleteAsync();stream.Close();}static void Enqueue(Channel<(ushort, ReadOnlySequence<byte>)> ch, (ushort, ReadOnlySequence<byte>) item){if (!ch.Writer.TryWrite(item)){// 队列已满或关闭,返回错误以施加背压}}
}static async Task WorkerAsync(ChannelReader<(ushort type, ReadOnlySequence<byte> payload)> reader, PipeWriter sendWriter)
{await foreach (var (type, payload) in reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in buf){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}static ReadOnlySpan<byte> BuildErrorPayload(string msg)
{return System.Text.Encoding.UTF8.GetBytes(msg);
}

4. Demo B:HTTP/1.1 + Pipelines 🌍

src/Rpc.HttpServer/Program.cs

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Hosting;
using Rpc.Protocol;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Threading.Channels;var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();app.MapPost("/rpc", async (HttpContext ctx) =>
{ctx.Response.ContentType = "application/octet-stream";var reader = ctx.Request.BodyReader;var writer = ctx.Response.BodyWriter;var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared, pauseWriterThreshold: 256 * 1024, resumeWriterThreshold: 128 * 1024));var pump = PumpSendAsync(sendPipe.Reader, writer, flushThreshold: 64 * 1024, flushStopwatchMs: 2);const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});var worker = WorkerAsync(workQueue, sendPipe.Writer);try{while (true){var result = await reader.ReadAsync(ctx.RequestAborted);var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(BuildErrorPayload("Payload too large").ToArray())));continue;}workQueue.Writer.TryWrite((type, payload));}}catch (Exception ex){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(System.Text.Encoding.UTF8.GetBytes("Bad frame: " + ex.Message))));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await worker;await sendPipe.Writer.CompleteAsync();await pump;await writer.FlushAsync();}
});app.Run("http://0.0.0.0:5000");static async Task WorkerAsync(Channel<(ushort type, ReadOnlySequence<byte> payload)> queue, PipeWriter sendWriter)
{await foreach (var (type, payload) in queue.Reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in rBuffer){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}

5. 背压与 AdvanceTo 的正确姿势 💡

读取数据
是否为完整帧
保留半帧
消费并推进:AdvanceTo(consumed, examined)
处理下一帧
  • 半帧处理:当解析失败或数据不足(半帧)时,不要推进 consumed,把 examined 设为当前读取批次的 buffer.End,允许底层继续填充。
  • 避免空转:反复传入同一对 (consumed, examined) 会导致“立即返回”的忙等。
  • 阈值生效范围pauseWriterThreshold/resumeWriterThreshold 仅对自建 Pipe 生效(文中用于发送聚合管道);StreamPipeReader/WriterOptions 是另一类配置(池、缓冲尺寸、是否保留底层流),没有背压阈值配置。

6. 池化与零分配技巧清单 🛠

ArrayPool.Shared
Rent/Return
减少内存分配
内存池(MemoryPool)
IMemoryOwner
生命周期管理
确保资源归还
  • 尽量用切片ReadOnlySequence<byte> + Slice/SequenceReader 在原缓冲上游走,避免 ToArray()

  • 池化策略

    • 小对象:ArrayPool<byte>.Shared.Rent/Return
    • 大块与 Pipe:MemoryPool<byte>.Shared,通过 IMemoryOwner<byte> 生命周期保证归还。
  • 一次取足PipeWriter.GetSpan(expected) → 填充 → Advance(expected)少 flush,多合并

  • 异常分支归还:所有可能抛异常的路径都要归还池化对象 / Complete 管道。


7. 并发与流量控制 ⏱

  • 每连接并发上限:用 Channel<T>(有界)把“解析出的请求”投递给业务处理器,避免把背压转化为“线程风暴”。
  • 工作者数:示例为单消费者(确保对 sendPipe.Writer单线程写);若要多工作者并发处理,请对写入统一串行化(如追加一个“发送队列”或使用单写锁)。
  • 超时与取消:按需在 ReadAsync/FlushAsync/业务处理 引入 CancellationToken 与超时,避免悬挂。

8. 压测与对比 🧑‍💻

Lua 脚本scripts/wrk/rpc.lua

local function be32(n) return string.char((n>>24)&255, (n>>16)&255, (n>>8)&255, n&255) end
local function be16(n) return string.char((n>>8)&255, n&255) endlocal function build_frame(msg_type, payload)local len = 8 + #payloadreturn be32(len) .. be16(msg_type) .. be16(0) .. payload
endwrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"request = function()if math.random() < 0.5 thenlocal p = string.rep("A", 16)         -- Echoreturn wrk.format(nil, "/rpc", nil, build_frame(1, p))elselocal N = 8                            -- Sumlocal buf = {}for i=1,N dolocal v = math.random(1, 1000)buf[#buf+1] = be32(v)endreturn wrk.format(nil, "/rpc", nil, build_frame(2, table.concat(buf)))end
end

命令
wrk -t8 -c256 -d30s --latency -s scripts/wrk/rpc.lua http://127.0.0.1:5000/rpc

指标关注:RPS、p50/p95/p99、socket errors、CPU、GC(分配/暂停时间)、Flush 次数(侧写 syscalls)。

基线对比:实现一个“传统 Stream”版本(BinaryReader/NetworkStream.ReadExactly),功能一致,作为对照组。


9. 可观测性与问题定位 🔍

  • dotnet-counters

    • System.Runtime:GC 堆大小、Gen0/1/2 次数、分配速率、线程池队列长度/吞吐等
    • Microsoft.AspNetCore.Hosting(HTTP 形态)
  • 自定义指标

    • 每连接活跃请求数、工作队列长度、聚合写累计字节Flush 次数
    • 解析耗时/业务处理耗时
  • 关键日志点Accept → Read → Parse → Enqueue → Handle → Write → FlushConnectionId


10. 错误处理与安全 🛡️

  • 输入校验len 上限、type 白名单;必要时加入 checksum 或 HMAC。
  • DoS 防护:限制并发连接、排队长度、单连接速率/字节上限;空闲与读写超时。
  • TLS:TCP 形态用 SslStream(服务端证书/客户端证书视需求);HTTP 形态交由 Kestrel。
  • 收尾规则:无论正常/异常,确保 CompleteAsync() 与池化对象归还总能发生

11. 仓库结构 🗂️

pipelines-rpc/src/Rpc.Protocol/            # 帧定义、解析器、序列化帮助类(Frame.cs)Rpc.TcpServer/           # Demo A:TcpListener + Pipelines (+ 聚合写 + 并发限流)Rpc.HttpServer/          # Demo B:Kestrel + BodyReader/Writer(同协议/同策略)Rpc.BaselineStream/      # 可选:传统 Stream 基线实现(对照组)scripts/wrk/rpc.lua              # 构造二进制帧;Echo/Sum 混合perf-collect.ps1         # dotnet-counters 收集脚本(可选)README.md                  # 启动/压测指引与期望结果模板

12. 选型建议 📝

  • 优先 Pipelines:自定义二进制协议、复杂帧/多段缓冲、高 RPS/低延迟、网关/代理内核。
  • 继续 Stream:吞吐需求一般、成本优先、协议/处理简单。
  • 与 gRPC 共存:业务开放接口用 gRPC(生态/可维护);内部热路径或代理内核用 Pipelines(极致性能)。

http://www.dtcms.com/a/347966.html

相关文章:

  • 【SpringBoot集成篇】SpringBoot 深度集成 Elasticsearch 搜索引擎指南
  • rust语言 (1.88) egui (0.32.1) 学习笔记(逐行注释)(十五)网格布局
  • rust语言 (1.88) egui (0.32.1) 学习笔记(逐行注释)(十三)菜单、右键菜单
  • 【JavaEE】了解synchronized
  • 大数据毕业设计选题推荐-基于大数据的丙型肝炎患者数据可视化分析系统-Hadoop-Spark-数据可视化-BigData
  • 【数据结构】从基础到实战:全面解析归并排序与计数排序
  • 基于stm32汽车雨刮器控制系统设计
  • Java基础第3天总结(面向对象)
  • Shell Case 条件语句详解
  • EP01:【DA】数据分析的概述
  • 01Shell脚本入门:基础命令与变量解析
  • JVM之【类加载系统】
  • 【Qt开发】常用控件(六)
  • Golang云端编程深度指南:架构本质与高阶实践
  • Flink Slot 不足导致任务Pending修复方案
  • 互联网大厂Java面试实录:从Spring到微服务的全面考察
  • 【软件安全】ARM64、x86、32 位与 64 位架构的区别、定义、应用背景
  • 个人搭建小网站教程(云服务器Ubuntu版本)
  • 【数据结构】二叉树的顺序存储、堆的实现及其应用:堆排序与Top-K问题
  • 以国产IoTDB为代表的主流时序数据库架构与性能深度选型评测
  • kanass V1.1.4版本发布,支持Mysql数据库、ubuntu安装与Mantis数据导入
  • Thonny+MicroPython搭建ESP32芯片开发环境
  • 代码性能测试——benchmark库
  • Elasticsearch Ruby 客户端故障排查实战指南
  • AI与SEO关键词协同优化
  • DBeaver连接SQL Server集成认证问题解决方案
  • xxl-job 启动后导致pod内存使用率持续增加
  • 从 Unity UGUI 到 Unreal UMG 的交互与高效实践:UI 事件、坐标系适配与性能优化
  • MATLAB 与 Simulink 联合仿真:控制系统建模与动态性能优化
  • C#_gRPC