当前位置: 首页 > news >正文

Orleans 自定义二进制协议在 TCP 上层实现的完整过程

Orleans 自定义二进制协议实现详解

1. 协议栈架构

┌─────────────────────────────────────────┐
│         Orleans 自定义协议层              │  ← 应用层协议
│  ┌─────────┬─────────┬─────────┬──────┐  │
│  │Grain路由│方法调度 │异常处理 │超时控制│  │
│  └─────────┴─────────┴─────────┴──────┘  │
├─────────────────────────────────────────┤
│            TCP 传输层                    │  ← 可靠传输
│        (Socket 连接管理)                 │
└─────────────────────────────────────────┘

2. 核心组件实现

(1) 消息结构定义 (Message.cs)
[Id(101)]
internal sealed class Message : ISpanFormattable
{public const int LENGTH_HEADER_SIZE = 8;  // 帧长度头public const int LENGTH_META_HEADER = 4;  // 元数据头// 核心字段public object BodyObject { get; set; }           // 消息体public PackedHeaders _headers;                   // 压缩头信息public CorrelationId _id;                        // 消息IDpublic GrainId _targetGrain;                     // 目标Grainpublic GrainId _sendingGrain;                    // 发送Grainpublic SiloAddress _targetSilo;                  // 目标Silopublic SiloAddress _sendingSilo;                 // 发送Silopublic ushort _interfaceVersion;                 // 接口版本public GrainInterfaceType _interfaceType;        // 接口类型
}
(2) 二进制协议序列化 (MessageSerializer.cs)
public (int RequiredBytes, int HeaderLength, int BodyLength) TryRead(ref ReadOnlySequence<byte> input, out Message? message)
{if (input.Length < FramingLength){message = default;return (FramingLength, 0, 0);}// 1. 读取帧长度头 (8字节)Span<byte> lengthBytes = stackalloc byte[FramingLength];input.Slice(input.Start, FramingLength).CopyTo(lengthBytes);var headerLength = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes);var bodyLength = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes[4..]);// 2. 验证长度有效性ThrowIfLengthsInvalid(headerLength, bodyLength);var requiredBytes = FramingLength + headerLength + bodyLength;if (input.Length < requiredBytes){message = default;return (requiredBytes, 0, 0);}
}
(3) 消息头序列化实现
private void Serialize<TBufferWriter>(ref Writer<TBufferWriter> writer, Message value, PackedHeaders headers) where TBufferWriter : IBufferWriter<byte>
{writer.WriteUInt32((uint)headers);                    // 压缩头标志writer.WriteInt64(value.Id.ToInt64());                // 消息IDWriteGrainId(ref writer, value.SendingGrain);         // 发送Grain IDWriteGrainId(ref writer, value.TargetGrain);          // 目标Grain ID_writerSiloAddressCodec.WriteRaw(ref writer, value.SendingSilo);  // 发送Silo地址_writerSiloAddressCodec.WriteRaw(ref writer, value.TargetSilo);   // 目标Silo地址if (headers.HasFlag(MessageFlags.HasTimeToLive)){writer.WriteInt32((int)value.GetTimeToLiveMilliseconds());  // 超时控制}if (headers.HasFlag(MessageFlags.HasInterfaceType)){_idSpanCodec.WriteRaw(ref writer, value.InterfaceType.Value);  // 接口类型}
}

3. TCP 连接管理

(1) 连接管理器 (ConnectionManager.cs)
public ValueTask<Connection> GetConnection(SiloAddress endpoint)
{if (this.connections.TryGetValue(endpoint, out var entry) && entry.NextConnection() is { } connection){if (!entry.HasSufficientConnections(connectionOptions) && entry.PendingConnection is null){this.GetConnectionAsync(endpoint).Ignore();}return new(connection);  // 返回现有连接}return new(this.GetConnectionAsync(endpoint));  // 创建新连接
}
(2) Socket 连接工厂 (SocketConnectionFactory.cs)
public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken)
{var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp){LingerState = new LingerOption(true, 0),NoDelay = _options.NoDelay,  // 禁用Nagle算法};if (_options.KeepAlive){socket.EnableKeepAlive(timeSeconds: _options.KeepAliveTimeSeconds,intervalSeconds: _options.KeepAliveIntervalSeconds,retryCount: _options.KeepAliveRetryCount);}socket.EnableFastPath();  // 启用快速路径优化
}

4. 消息路由和处理流程

(1) 消息发送 (MessageCenter.cs)
if (targetSilo.Matches(_siloAddress))
{// 本地消息 - 直接处理this.ReceiveMessage(msg);
}
else
{if (this.connectionManager.TryGetConnection(targetSilo, out var existingConnection)){existingConnection.Send(msg);  // 通过TCP发送return;}else if (this.siloStatusOracle.IsDeadSilo(targetSilo)){// 目标Silo已死亡 - 发送拒绝消息this.SendRejection(msg, Message.RejectionTypes.Transient, "Target silo is known to be dead");return;}else{// 异步建立连接并发送var connectionTask = this.connectionManager.GetConnection(targetSilo);if (connectionTask.IsCompletedSuccessfully){var sender = connectionTask.Result;sender.Send(msg);}}
}
(2) 消息接收和路由 (MessageCenter.cs)
public void ReceiveMessage(Message msg)
{try{this.messagingTrace.OnIncomingMessageAgentReceiveMessage(msg);if (TryDeliverToProxy(msg)){return;  // 代理消息处理}else if (msg.Direction == Message.Directions.Response){this.catalog.RuntimeClient.ReceiveResponse(msg);  // 响应消息处理}else{// 请求消息 - 路由到目标Grainvar targetActivation = catalog.GetOrCreateActivation(msg.TargetGrain,msg.RequestContextData,rehydrationContext: null);if (targetActivation is null){ProcessMessageToNonExistentActivation(msg);return;}targetActivation.ReceiveMessage(msg);  // 转发到Grain激活}}catch (Exception ex){HandleReceiveFailure(msg, ex);}
}
(3) Grain 方法调用 (InsideRuntimeClient.cs)
public async Task Invoke(IGrainContext target, Message message)
{try{if (message.IsExpired){this.messagingTrace.OnDropExpiredMessage(message, MessagingInstruments.Phase.Invoke);return;}Response response;try{switch (message.BodyObject){case IInvokable invokable:{invokable.SetTarget(target);// 应用Grain调用过滤器if (GrainCallFilters is { Count: > 0 } || target.GrainInstance is IIncomingGrainCallFilter){var invoker = new GrainMethodInvoker(message, target, invokable, GrainCallFilters, this.interfaceToImplementationMapping, this.responseCopier);await invoker.Invoke();response = invoker.Response;}else{response = await invokable.Invoke();  // 直接调用response = this.responseCopier.Copy(response);}break;}}}catch (Exception ex){// 异常处理response = Response.FromException(ex);}}
}

5. 关键特性实现

(1) 超时控制 (InvokeMethodOptions.cs)
[Flags]
[GenerateSerializer]
public enum InvokeMethodOptions
{None = 0,OneWay = 1 << 0,          // 单向调用 - 无响应ReadOnly = 1 << 1,        // 只读操作 - 可并发AlwaysInterleave = 1 << 2, // 允许交错 - 可与其他请求并发Unordered = 1 << 3,       // 无序消息 - 可优化传输
}
(2) 连接前导码 (ConnectionPreamble.cs)
[GenerateSerializer, Immutable]
internal sealed class ConnectionPreamble
{[Id(0)]public NetworkProtocolVersion NetworkProtocolVersion { get; init; }[Id(1)]public GrainId NodeIdentity { get; init; }[Id(2)]public SiloAddress SiloAddress { get; init; }[Id(3)]public string ClusterId { get; init; }
}

6. 完整通信流程

  1. 调用发起

    • Grain A 调用 Grain B 的方法
    • 创建 Message 对象,包含目标Grain ID、方法参数等
    • 设置调用选项(超时、单向等)
  2. 消息序列化

    • 使用 MessageSerializer 将消息序列化为二进制格式
    • 添加帧长度头(8字节)+ 消息头 + 消息体
    • 压缩头信息以减少网络开销
  3. TCP 传输

    • 通过 ConnectionManager 获取到目标Silo的TCP连接
    • 使用 Socket 发送二进制数据
    • 支持连接池和自动重连
  4. 消息接收

    • 目标Silo的TCP服务器接收数据
    • 反序列化消息,解析帧长度和消息内容
    • 根据 TargetGrain 路由到对应的Grain激活
  5. 方法执行

    • 通过 GrainMethodInvoker 调用目标方法
    • 处理异常和超时
    • 序列化响应并返回
  6. 响应处理

    • 调用方接收响应消息
    • 反序列化结果或异常
    • 完成异步调用

这个设计实现了高性能、可靠的分布式Grain通信,同时保持了良好的可扩展性和容错能力。

http://www.dtcms.com/a/538141.html

相关文章:

  • 宁波网站建设工作室重庆手机网站制作价格
  • 那个做图网站叫什么贵州做网站怎么推广
  • PostgreSQL认证含金量如何?适合哪些人?
  • ZSet 与实时排行榜:从应用到原理的深度解析
  • 网站获取访客手机号源码百度软文推广公司
  • 辽宁省建设厅网站河南网站建设工作室
  • html5电影网站源码php网站开发费用清单
  • 湖北微网站建设多少钱品牌网站建设美丽
  • 龙华品牌网站制作公众号 上传 wordpress
  • 吉林市网站建设公司做网站哪里学
  • 宁波英文网站建设国产wordpress模板
  • 28-4.1继承
  • opencart网站国外wordpress电影模板
  • 成都网站维护公司装修公司网站开发
  • 如何识别一个网站是否做的好wordpress 优秀主题
  • 电子商务网站开发基本流程图黄页 网站模板
  • 开发青年网站一个做二维码问卷调查的网站
  • 网页站点什么意思网站建设运营公司推荐
  • 北京网站seo排名在哪里可以做个人网站
  • Java基础一文速过
  • 南宁希噢网站开发工作室爱淘苗网站开发模式
  • 单位的网站的建设wordpress页首文件
  • 天龙八部TLBB系列 - 客户端技术整体性分析
  • 140.72bit ddr测试时ddr_dm脚如果硬件存在,则需要拉低处理
  • 石家庄网站建设电话咨询代做淘宝联盟网站
  • 农村pc网站开发wordpress 男科医院主题
  • Playwright中BrowserContext深度解析-BrowserContext方法速查手册
  • 企业网站seo诊断工具哪个网站用帝国cms做的
  • 外贸商城建站个人网站内容有哪些内容
  • 深圳室内设计公司50强织梦网站seo