Orleans 自定义二进制协议在 TCP 上层实现的完整过程
Orleans 自定义二进制协议实现详解
1. 协议栈架构
┌─────────────────────────────────────────┐
│ Orleans 自定义协议层 │ ← 应用层协议
│ ┌─────────┬─────────┬─────────┬──────┐ │
│ │Grain路由│方法调度 │异常处理 │超时控制│ │
│ └─────────┴─────────┴─────────┴──────┘ │
├─────────────────────────────────────────┤
│ TCP 传输层 │ ← 可靠传输
│ (Socket 连接管理) │
└─────────────────────────────────────────┘
2. 核心组件实现
(1) 消息结构定义 (Message.cs)
[Id(101)]
internal sealed class Message : ISpanFormattable
{public const int LENGTH_HEADER_SIZE = 8; // 帧长度头public const int LENGTH_META_HEADER = 4; // 元数据头// 核心字段public object BodyObject { get; set; } // 消息体public PackedHeaders _headers; // 压缩头信息public CorrelationId _id; // 消息IDpublic GrainId _targetGrain; // 目标Grainpublic GrainId _sendingGrain; // 发送Grainpublic SiloAddress _targetSilo; // 目标Silopublic SiloAddress _sendingSilo; // 发送Silopublic ushort _interfaceVersion; // 接口版本public GrainInterfaceType _interfaceType; // 接口类型
}
(2) 二进制协议序列化 (MessageSerializer.cs)
public (int RequiredBytes, int HeaderLength, int BodyLength) TryRead(ref ReadOnlySequence<byte> input, out Message? message)
{if (input.Length < FramingLength){message = default;return (FramingLength, 0, 0);}// 1. 读取帧长度头 (8字节)Span<byte> lengthBytes = stackalloc byte[FramingLength];input.Slice(input.Start, FramingLength).CopyTo(lengthBytes);var headerLength = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes);var bodyLength = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes[4..]);// 2. 验证长度有效性ThrowIfLengthsInvalid(headerLength, bodyLength);var requiredBytes = FramingLength + headerLength + bodyLength;if (input.Length < requiredBytes){message = default;return (requiredBytes, 0, 0);}
}
(3) 消息头序列化实现
private void Serialize<TBufferWriter>(ref Writer<TBufferWriter> writer, Message value, PackedHeaders headers) where TBufferWriter : IBufferWriter<byte>
{writer.WriteUInt32((uint)headers); // 压缩头标志writer.WriteInt64(value.Id.ToInt64()); // 消息IDWriteGrainId(ref writer, value.SendingGrain); // 发送Grain IDWriteGrainId(ref writer, value.TargetGrain); // 目标Grain ID_writerSiloAddressCodec.WriteRaw(ref writer, value.SendingSilo); // 发送Silo地址_writerSiloAddressCodec.WriteRaw(ref writer, value.TargetSilo); // 目标Silo地址if (headers.HasFlag(MessageFlags.HasTimeToLive)){writer.WriteInt32((int)value.GetTimeToLiveMilliseconds()); // 超时控制}if (headers.HasFlag(MessageFlags.HasInterfaceType)){_idSpanCodec.WriteRaw(ref writer, value.InterfaceType.Value); // 接口类型}
}
3. TCP 连接管理
(1) 连接管理器 (ConnectionManager.cs)
public ValueTask<Connection> GetConnection(SiloAddress endpoint)
{if (this.connections.TryGetValue(endpoint, out var entry) && entry.NextConnection() is { } connection){if (!entry.HasSufficientConnections(connectionOptions) && entry.PendingConnection is null){this.GetConnectionAsync(endpoint).Ignore();}return new(connection); // 返回现有连接}return new(this.GetConnectionAsync(endpoint)); // 创建新连接
}
(2) Socket 连接工厂 (SocketConnectionFactory.cs)
public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken)
{var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp){LingerState = new LingerOption(true, 0),NoDelay = _options.NoDelay, // 禁用Nagle算法};if (_options.KeepAlive){socket.EnableKeepAlive(timeSeconds: _options.KeepAliveTimeSeconds,intervalSeconds: _options.KeepAliveIntervalSeconds,retryCount: _options.KeepAliveRetryCount);}socket.EnableFastPath(); // 启用快速路径优化
}
4. 消息路由和处理流程
(1) 消息发送 (MessageCenter.cs)
if (targetSilo.Matches(_siloAddress))
{// 本地消息 - 直接处理this.ReceiveMessage(msg);
}
else
{if (this.connectionManager.TryGetConnection(targetSilo, out var existingConnection)){existingConnection.Send(msg); // 通过TCP发送return;}else if (this.siloStatusOracle.IsDeadSilo(targetSilo)){// 目标Silo已死亡 - 发送拒绝消息this.SendRejection(msg, Message.RejectionTypes.Transient, "Target silo is known to be dead");return;}else{// 异步建立连接并发送var connectionTask = this.connectionManager.GetConnection(targetSilo);if (connectionTask.IsCompletedSuccessfully){var sender = connectionTask.Result;sender.Send(msg);}}
}
(2) 消息接收和路由 (MessageCenter.cs)
public void ReceiveMessage(Message msg)
{try{this.messagingTrace.OnIncomingMessageAgentReceiveMessage(msg);if (TryDeliverToProxy(msg)){return; // 代理消息处理}else if (msg.Direction == Message.Directions.Response){this.catalog.RuntimeClient.ReceiveResponse(msg); // 响应消息处理}else{// 请求消息 - 路由到目标Grainvar targetActivation = catalog.GetOrCreateActivation(msg.TargetGrain,msg.RequestContextData,rehydrationContext: null);if (targetActivation is null){ProcessMessageToNonExistentActivation(msg);return;}targetActivation.ReceiveMessage(msg); // 转发到Grain激活}}catch (Exception ex){HandleReceiveFailure(msg, ex);}
}
(3) Grain 方法调用 (InsideRuntimeClient.cs)
public async Task Invoke(IGrainContext target, Message message)
{try{if (message.IsExpired){this.messagingTrace.OnDropExpiredMessage(message, MessagingInstruments.Phase.Invoke);return;}Response response;try{switch (message.BodyObject){case IInvokable invokable:{invokable.SetTarget(target);// 应用Grain调用过滤器if (GrainCallFilters is { Count: > 0 } || target.GrainInstance is IIncomingGrainCallFilter){var invoker = new GrainMethodInvoker(message, target, invokable, GrainCallFilters, this.interfaceToImplementationMapping, this.responseCopier);await invoker.Invoke();response = invoker.Response;}else{response = await invokable.Invoke(); // 直接调用response = this.responseCopier.Copy(response);}break;}}}catch (Exception ex){// 异常处理response = Response.FromException(ex);}}
}
5. 关键特性实现
(1) 超时控制 (InvokeMethodOptions.cs)
[Flags]
[GenerateSerializer]
public enum InvokeMethodOptions
{None = 0,OneWay = 1 << 0, // 单向调用 - 无响应ReadOnly = 1 << 1, // 只读操作 - 可并发AlwaysInterleave = 1 << 2, // 允许交错 - 可与其他请求并发Unordered = 1 << 3, // 无序消息 - 可优化传输
}
(2) 连接前导码 (ConnectionPreamble.cs)
[GenerateSerializer, Immutable]
internal sealed class ConnectionPreamble
{[Id(0)]public NetworkProtocolVersion NetworkProtocolVersion { get; init; }[Id(1)]public GrainId NodeIdentity { get; init; }[Id(2)]public SiloAddress SiloAddress { get; init; }[Id(3)]public string ClusterId { get; init; }
}
6. 完整通信流程
-
调用发起:
- Grain A 调用 Grain B 的方法
- 创建
Message对象,包含目标Grain ID、方法参数等 - 设置调用选项(超时、单向等)
-
消息序列化:
- 使用
MessageSerializer将消息序列化为二进制格式 - 添加帧长度头(8字节)+ 消息头 + 消息体
- 压缩头信息以减少网络开销
- 使用
-
TCP 传输:
- 通过
ConnectionManager获取到目标Silo的TCP连接 - 使用
Socket发送二进制数据 - 支持连接池和自动重连
- 通过
-
消息接收:
- 目标Silo的TCP服务器接收数据
- 反序列化消息,解析帧长度和消息内容
- 根据
TargetGrain路由到对应的Grain激活
-
方法执行:
- 通过
GrainMethodInvoker调用目标方法 - 处理异常和超时
- 序列化响应并返回
- 通过
-
响应处理:
- 调用方接收响应消息
- 反序列化结果或异常
- 完成异步调用
这个设计实现了高性能、可靠的分布式Grain通信,同时保持了良好的可扩展性和容错能力。
