System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC
System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀
目录
- System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀
- 0. TL;DR —— 为什么选 Pipelines 🎯
- 1. 帧协议 📦
- 2. 代码公共部分:帧编解码 🧑💻
- 3. Demo A:TCP + Pipelines 🌐
- 4. Demo B:HTTP/1.1 + Pipelines 🌍
- 5. 背压与 `AdvanceTo` 的正确姿势 💡
- 6. 池化与零分配技巧清单 🛠
- 7. 并发与流量控制 ⏱
- 8. 压测与对比 🧑💻
- 9. 可观测性与问题定位 🔍
- 10. 错误处理与安全 🛡️
- 11. 仓库结构 🗂️
- 12. 选型建议 📝
0. TL;DR —— 为什么选 Pipelines 🎯
PipeReader.ReadAsync()
返回ReadOnlySequence<byte>
,天然支持跨段缓冲与半帧,搭配AdvanceTo(consumed, examined)
实现背压。SequenceReader<byte>
可以在不拷贝到托管数组的情况下解析协议字段;BinaryPrimitives
操作Span/ReadOnlySpan
更高效。- Kestrel 暴露
BodyReader/BodyWriter
,HTTP 形态也能享受 Pipes 的收益。 - 配合内存池、写合并、并发限流,能在吞吐、延迟、分配三项上显著优于传统
Stream
。
1. 帧协议 📦
大端(网络序)固定头 8 字节:
len:uint32 // 含头,总长度
type:uint16 // 0=Ping, 1=Echo, 2=Sum, 0xFFFF=Error
flags:uint16 // bit0=压缩; 其他保留
payload: len-8
- Ping:空载
- Echo:原样返回 payload(示例中演示第一段
Span
回声) - Sum:payload 为 N 个 int32(BE),返回 int32(BE)之和
- Error:返回错误码/消息(演示版为简单文本)
2. 代码公共部分:帧编解码 🧑💻
src/Rpc.Protocol/Frame.cs
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;namespace Rpc.Protocol;public static class Frame
{public const int HeaderSize = 8;public const ushort TypePing = 0;public const ushort TypeEcho = 1;public const ushort TypeSum = 2;public const ushort TypeError = 0xFFFF;[MethodImpl(MethodImplOptions.AggressiveInlining)]public static bool TryParseFrame(ref ReadOnlySequence<byte> buffer,out ushort type,out ushort flags,out ReadOnlySequence<byte> payload){type = 0; flags = 0; payload = default;if (buffer.Length < HeaderSize) return false;Span<byte> header = stackalloc byte[HeaderSize];buffer.Slice(0, HeaderSize).CopyTo(header);uint len = BinaryPrimitives.ReadUInt32BigEndian(header);type = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(4));flags = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(6));if (len < HeaderSize) throw new InvalidOperationException("Invalid length");if (buffer.Length < len) return false; // 半帧var frame = buffer.Slice(0, len);payload = frame.Slice(HeaderSize, len - HeaderSize);buffer = buffer.Slice(len);return true;}public static void WriteFrame(PipeWriter writer, ushort type, ushort flags, ReadOnlySpan<byte> payload){int len = HeaderSize + payload.Length;Span<byte> span = writer.GetSpan(len);BinaryPrimitives.WriteUInt32BigEndian(span, (uint)len);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4), type);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(6), flags);payload.CopyTo(span.Slice(HeaderSize));writer.Advance(len);}
}
要点
- 解析时仅拷头部到栈;payload 始终是原始
ReadOnlySequence<byte>
的切片(零拷贝)。 - 写入时一次性拿到足够
Span
,减少Advance/Flush
次数。 len
的下限校验防御异常输入。
3. Demo A:TCP + Pipelines 🌐
src/Rpc.TcpServer/Program.cs
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading.Channels;
using Rpc.Protocol;var listener = new TcpListener(IPAddress.Loopback, 5001);
listener.Start();
Console.WriteLine("TCP RPC listening on 127.0.0.1:5001");while (true)
{var client = await listener.AcceptTcpClientAsync();_ = Task.Run(() => Handle(client));
}static async Task Handle(TcpClient client)
{const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});using var _ = client;client.NoDelay = true;var stream = client.GetStream();var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 64 * 1024));var networkWriter = PipeWriter.Create(stream, new StreamPipeWriterOptions(MemoryPool<byte>.Shared, 64 * 1024, leaveOpen: true));var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared,pauseWriterThreshold: 256 * 1024,resumeWriterThreshold: 128 * 1024));var pumpTask = PumpSendAsync(sendPipe.Reader, networkWriter, flushThreshold: 64 * 1024, flushStopwatchMs: 2);var workerTask = Task.Run(() => WorkerAsync(workQueue.Reader, sendPipe.Writer));try{while (true){var result = await reader.ReadAsync();var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Payload too large")));continue;}Enqueue(workQueue, (type, payload));}}catch (Exception ex){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Bad frame: " + ex.Message)));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await workerTask;await sendPipe.Writer.CompleteAsync();await pumpTask;await networkWriter.CompleteAsync();await reader.CompleteAsync();stream.Close();}static void Enqueue(Channel<(ushort, ReadOnlySequence<byte>)> ch, (ushort, ReadOnlySequence<byte>) item){if (!ch.Writer.TryWrite(item)){// 队列已满或关闭,返回错误以施加背压}}
}static async Task WorkerAsync(ChannelReader<(ushort type, ReadOnlySequence<byte> payload)> reader, PipeWriter sendWriter)
{await foreach (var (type, payload) in reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in buf){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}static ReadOnlySpan<byte> BuildErrorPayload(string msg)
{return System.Text.Encoding.UTF8.GetBytes(msg);
}
4. Demo B:HTTP/1.1 + Pipelines 🌍
src/Rpc.HttpServer/Program.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Hosting;
using Rpc.Protocol;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Threading.Channels;var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();app.MapPost("/rpc", async (HttpContext ctx) =>
{ctx.Response.ContentType = "application/octet-stream";var reader = ctx.Request.BodyReader;var writer = ctx.Response.BodyWriter;var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared, pauseWriterThreshold: 256 * 1024, resumeWriterThreshold: 128 * 1024));var pump = PumpSendAsync(sendPipe.Reader, writer, flushThreshold: 64 * 1024, flushStopwatchMs: 2);const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});var worker = WorkerAsync(workQueue, sendPipe.Writer);try{while (true){var result = await reader.ReadAsync(ctx.RequestAborted);var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(BuildErrorPayload("Payload too large").ToArray())));continue;}workQueue.Writer.TryWrite((type, payload));}}catch (Exception ex){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(System.Text.Encoding.UTF8.GetBytes("Bad frame: " + ex.Message))));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await worker;await sendPipe.Writer.CompleteAsync();await pump;await writer.FlushAsync();}
});app.Run("http://0.0.0.0:5000");static async Task WorkerAsync(Channel<(ushort type, ReadOnlySequence<byte> payload)> queue, PipeWriter sendWriter)
{await foreach (var (type, payload) in queue.Reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in rBuffer){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}
5. 背压与 AdvanceTo
的正确姿势 💡
- 半帧处理:当解析失败或数据不足(半帧)时,不要推进
consumed
,把examined
设为当前读取批次的buffer.End
,允许底层继续填充。 - 避免空转:反复传入同一对
(consumed, examined)
会导致“立即返回”的忙等。 - 阈值生效范围:
pauseWriterThreshold/resumeWriterThreshold
仅对自建Pipe
生效(文中用于发送聚合管道);StreamPipeReader/WriterOptions
是另一类配置(池、缓冲尺寸、是否保留底层流),没有背压阈值配置。
6. 池化与零分配技巧清单 🛠
-
尽量用切片:
ReadOnlySequence<byte>
+Slice
/SequenceReader
在原缓冲上游走,避免ToArray()
。 -
池化策略:
- 小对象:
ArrayPool<byte>.Shared.Rent/Return
; - 大块与 Pipe:
MemoryPool<byte>.Shared
,通过IMemoryOwner<byte>
生命周期保证归还。
- 小对象:
-
一次取足:
PipeWriter.GetSpan(expected)
→ 填充 →Advance(expected)
→ 少 flush,多合并。 -
异常分支归还:所有可能抛异常的路径都要归还池化对象 / Complete 管道。
7. 并发与流量控制 ⏱
- 每连接并发上限:用
Channel<T>
(有界)把“解析出的请求”投递给业务处理器,避免把背压转化为“线程风暴”。 - 工作者数:示例为单消费者(确保对
sendPipe.Writer
的单线程写);若要多工作者并发处理,请对写入统一串行化(如追加一个“发送队列”或使用单写锁)。 - 超时与取消:按需在
ReadAsync/FlushAsync/业务处理
引入CancellationToken
与超时,避免悬挂。
8. 压测与对比 🧑💻
Lua 脚本(scripts/wrk/rpc.lua
)
local function be32(n) return string.char((n>>24)&255, (n>>16)&255, (n>>8)&255, n&255) end
local function be16(n) return string.char((n>>8)&255, n&255) endlocal function build_frame(msg_type, payload)local len = 8 + #payloadreturn be32(len) .. be16(msg_type) .. be16(0) .. payload
endwrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"request = function()if math.random() < 0.5 thenlocal p = string.rep("A", 16) -- Echoreturn wrk.format(nil, "/rpc", nil, build_frame(1, p))elselocal N = 8 -- Sumlocal buf = {}for i=1,N dolocal v = math.random(1, 1000)buf[#buf+1] = be32(v)endreturn wrk.format(nil, "/rpc", nil, build_frame(2, table.concat(buf)))end
end
命令:
wrk -t8 -c256 -d30s --latency -s scripts/wrk/rpc.lua http://127.0.0.1:5000/rpc
指标关注:RPS、p50/p95/p99、socket errors
、CPU、GC(分配/暂停时间)、Flush
次数(侧写 syscalls)。
基线对比:实现一个“传统 Stream
”版本(BinaryReader
/NetworkStream.ReadExactly
),功能一致,作为对照组。
9. 可观测性与问题定位 🔍
-
dotnet-counters:
System.Runtime
:GC 堆大小、Gen0/1/2 次数、分配速率、线程池队列长度/吞吐等Microsoft.AspNetCore.Hosting
(HTTP 形态)
-
自定义指标:
- 每连接活跃请求数、工作队列长度、聚合写累计字节与Flush 次数
- 解析耗时/业务处理耗时
-
关键日志点:
Accept → Read → Parse → Enqueue → Handle → Write → Flush
带ConnectionId
。
10. 错误处理与安全 🛡️
- 输入校验:
len
上限、type
白名单;必要时加入 checksum 或 HMAC。 - DoS 防护:限制并发连接、排队长度、单连接速率/字节上限;空闲与读写超时。
- TLS:TCP 形态用
SslStream
(服务端证书/客户端证书视需求);HTTP 形态交由 Kestrel。 - 收尾规则:无论正常/异常,确保
CompleteAsync()
与池化对象归还总能发生。
11. 仓库结构 🗂️
pipelines-rpc/src/Rpc.Protocol/ # 帧定义、解析器、序列化帮助类(Frame.cs)Rpc.TcpServer/ # Demo A:TcpListener + Pipelines (+ 聚合写 + 并发限流)Rpc.HttpServer/ # Demo B:Kestrel + BodyReader/Writer(同协议/同策略)Rpc.BaselineStream/ # 可选:传统 Stream 基线实现(对照组)scripts/wrk/rpc.lua # 构造二进制帧;Echo/Sum 混合perf-collect.ps1 # dotnet-counters 收集脚本(可选)README.md # 启动/压测指引与期望结果模板
12. 选型建议 📝
- 优先 Pipelines:自定义二进制协议、复杂帧/多段缓冲、高 RPS/低延迟、网关/代理内核。
- 继续 Stream:吞吐需求一般、成本优先、协议/处理简单。
- 与 gRPC 共存:业务开放接口用 gRPC(生态/可维护);内部热路径或代理内核用 Pipelines(极致性能)。