当前位置: 首页 > news >正文

基于skynet框架业务中的gateway实现分析

基于skynet框架业务中的gateway实现分析

  • gate.lua
    • 关键数据结构
    • 消息转发逻辑
    • 连接管理命令
  • gateserver.lua
    • 关键代码分析
    • 消息处理机制
  • gateway.lua
    • 增加的连接管理
    • 智能消息路由
    • 用户注册机制
    • 超时检查机制
  • 调用链路分析
    • 启动流程
    • 新连接到达
    • 消息处理流程
    • 用户登录后转发
    • 连接关闭

gate.lua

基于gateserver构建的通用网关服务,负责连接管理和消息转发

local skynet = require "skynet"
local gateserver = require "snax.gateserver"local watchdog
local connection = {}   -- fd -> connection : { fd , client, agent , ip, mode }skynet.register_protocol {name = "client",id = skynet.PTYPE_CLIENT,
}local handler = {}function handler.open(source, conf)watchdog = conf.watchdog or sourcereturn conf.address, conf.port
endfunction handler.message(fd, msg, sz)-- recv a package, forward itlocal c = connection[fd]local agent = c.agentif agent then-- It's safe to redirect msg directly , gateserver framework will not free msg.skynet.redirect(agent, c.client, "client", fd, msg, sz)elseskynet.send(watchdog, "lua", "socket", "data", fd, skynet.tostring(msg, sz))-- skynet.tostring will copy msg to a string, so we must free msg here.skynet.trash(msg,sz)end
endfunction handler.connect(fd, addr)local c = {fd = fd,ip = addr,}connection[fd] = cskynet.send(watchdog, "lua", "socket", "open", fd, inspect(addr))
endlocal function unforward(c)if c.agent thenc.agent = nilc.client = nilend
endlocal function close_fd(fd)local c = connection[fd]if c thenunforward(c)connection[fd] = nilend
endfunction handler.disconnect(fd)close_fd(fd)skynet.send(watchdog, "lua", "socket", "close", fd)
endfunction handler.error(fd, msg)close_fd(fd)skynet.send(watchdog, "lua", "socket", "error", fd, msg)
endfunction handler.warning(fd, size)skynet.send(watchdog, "lua", "socket", "warning", fd, size)
endlocal CMD = {}function CMD.forward(source, fd, client, address)local c = assert(connection[fd])unforward(c)c.client = client or 0c.agent = address or sourcegateserver.openclient(fd)
endfunction CMD.accept(source, fd)local c = assert(connection[fd])unforward(c)gateserver.openclient(fd)
endfunction CMD.kick(source, fd)gateserver.closeclient(fd)
endfunction handler.command(cmd, source, ...)local f = assert(CMD[cmd])return f(source, ...)
endgateserver.start(handler)

关键数据结构

local connection = {} -- fd -> connection : { fd, client, agent, ip, mode }
-- fd: 文件描述符
-- client: 客户端地址
-- agent: 代理服务地址  
-- ip: 客户端IP

消息转发逻辑

其通过skynet.register_protocol注册了"client"类型协议,核心转发逻辑如下:

function handler.message(fd, msg, sz)local c = connection[fd]local agent = c.agentif agent then-- 已分配agent,直接转发消息skynet.redirect(agent, c.client, "client", fd, msg, sz)else-- 未分配agent,发给watchdog处理skynet.send(watchdog, "lua", "socket", "data", fd, skynet.tostring(msg, sz))skynet.trash(msg,sz) -- 释放内存endend

连接管理命令

local CMD = {}function CMD.forward(source, fd, client, address)-- 将连接转发给指定agentlocal c = assert(connection[fd])unforward(c)c.client = client or 0c.agent = address or sourcegateserver.openclient(fd) -- 开始接收数据
endfunction CMD.kick(source, fd)-- 踢掉客户端gateserver.closeclient(fd)
end

gateserver.lua

这是最底层的网关服务器实现,封装了Skynet的socket操作,提供TCP连接管理的基础框架,其内部注册了"socket"类型的协议,通过gate将gate中的"socket"类型调用到gateserver中的对于指令

local skynet = require "skynet"
local netpack = require "skynet.netpack"
local socketdriver = require "skynet.socketdriver"local gateserver = {}local socket    -- listen socket
local queue     -- message queue
local maxclient -- max client
local client_number = 0
local CMD = setmetatable({}, { __gc = function() netpack.clear(queue) end })
local nodelay = falselocal connection = {}
-- true : connected
-- nil : closed
-- false : close readfunction gateserver.openclient(fd)if connection[fd] thensocketdriver.start(fd)end
endfunction gateserver.closeclient(fd)local c = connection[fd]if c ~= nil thenconnection[fd] = nilsocketdriver.close(fd)end
endfunction gateserver.start(handler)assert(handler.message)assert(handler.connect)local listen_context = {}function CMD.open( source, conf )assert(not socket)local address = conf.address or "0.0.0.0"local port = assert(conf.port)local backlog = assert(conf.backlog)maxclient = conf.maxclient or 1024nodelay = conf.nodelayskynet.error(string.format("Listen on %s:%d", address, port))socket = socketdriver.listen(address, port, backlog)listen_context.co = coroutine.running()listen_context.fd = socketskynet.wait(listen_context.co)conf.address = listen_context.addrconf.port = listen_context.portlisten_context = nilsocketdriver.start(socket)if handler.open thenreturn handler.open(source, conf)endendfunction CMD.close()assert(socket)socketdriver.close(socket)endlocal MSG = {}local function dispatch_msg(fd, msg, sz)if connection[fd] thenhandler.message(fd, msg, sz)elseskynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))endendMSG.data = dispatch_msglocal function dispatch_queue()local fd, msg, sz = netpack.pop(queue)if fd then-- may dispatch even the handler.message blocked-- If the handler.message never block, the queue should be empty, so only fork once and then exit.skynet.fork(dispatch_queue)dispatch_msg(fd, msg, sz)for fd, msg, sz in netpack.pop, queue dodispatch_msg(fd, msg, sz)endendendMSG.more = dispatch_queuefunction MSG.open(fd, msg)client_number = client_number + 1if client_number >= maxclient thensocketdriver.shutdown(fd)returnendif nodelay thensocketdriver.nodelay(fd)endconnection[fd] = truehandler.connect(fd, msg)endfunction MSG.close(fd)if fd ~= socket thenclient_number = client_number - 1if connection[fd] thenconnection[fd] = false  -- close readendif handler.disconnect thenhandler.disconnect(fd)endelsesocket = nilendendfunction MSG.error(fd, msg)if fd == socket thenskynet.error("gateserver accept error:",msg)elsesocketdriver.shutdown(fd)if handler.error thenhandler.error(fd, msg)endendendfunction MSG.warning(fd, size)if handler.warning thenhandler.warning(fd, size)endendfunction MSG.init(id, addr, port)if listen_context thenlocal co = listen_context.coif co thenassert(id == listen_context.fd)listen_context.addr = addrlisten_context.port = portskynet.wakeup(co)listen_context.co = nilendendendskynet.register_protocol {name = "socket",id = skynet.PTYPE_SOCKET,   -- PTYPE_SOCKET = 6unpack = function ( msg, sz )return netpack.filter( queue, msg, sz)end,dispatch = function (_, _, q, type, ...)queue = qif type thenMSG[type](...)endend}local function init()skynet.dispatch("lua", function (_, address, cmd, ...)local f = CMD[cmd]if f thenskynet.ret(skynet.pack(f(address, ...)))elseskynet.ret(skynet.pack(handler.command(cmd, address, ...)))endend)endif handler.embed theninit()elseskynet.start(init)end
endreturn gateserver

关键代码分析

-- 核心数据结构
local connection = {} -- 连接状态管理:true=已连接, nil=已关闭, false=关闭读
local queue -- 消息队列
local maxclient -- 最大客户端数-- 主要函数
function gateserver.openclient(fd)-- 开启客户端连接的数据接收if connection[fd] thensocketdriver.start(fd)end
endfunction gateserver.closeclient(fd)  -- 关闭客户端连接local c = connection[fd]if c ~= nil thenconnection[fd] = nilsocketdriver.close(fd)end
end

消息处理机制

主要分2类,一类是处理socket类型消息,一类是lua类型消息

-- socket消息类型处理
local MSG = {}
MSG.data = function(fd, msg, sz) -- 数据到达if connection[fd] thenhandler.message(fd, msg, sz) -- 调用上层handlerend
endMSG.open = function(fd, msg) -- 新连接client_number = client_number + 1if client_number >= maxclient thensocketdriver.shutdown(fd) -- 超过最大连接数,拒绝returnendconnection[fd] = truehandler.connect(fd, msg) -- 通知上层
endMSG.close = function(fd) -- 连接关闭if fd ~= socket thenclient_number = client_number - 1if connection[fd] thenconnection[fd] = false -- 标记为关闭读endif handler.disconnect thenhandler.disconnect(fd) -- 通知上层endelsesocket = nil -- 监听socket关闭end
end

gateway.lua

业务级别网关服务实例,增加了业务逻辑,如用户认证、超时检查等,启动网关服务时每个节点只需要启动一个就行,因为网络线程只有一个,启动后可以调用"lua"类型消息open,开启网关服务

local skynet = require "skynet"
local ignoreret = skynet.ignoreretlocal log = require "log"
local liblogin
local libcenter
local libclientagent
local libpublishlocal gateserver = require "gateserver"  
local inspect = inspect
local tonumber, os_date = tonumber, os.date
local self_addr = skynet.self()
local EACH = 50local connection = {} -- fd -> { fd , ip, uid(登录后有)game(登录后有)key(登录后有)}
local to_check = {} -- 该table会复用,所以需要使用begin to end的方式遍历
local minute = tonumber(os_date("%M"))skynet.register_protocol {name = "client",id = skynet.PTYPE_CLIENT,
}local handler = {}function handler.open(source, conf)log.info("start listen port: %d", conf.port)
endlocal function split(input, delimiter)input = tostring(input)delimiter = tostring(delimiter)if (delimiter=='') then return false endlocal pos,arr = 0, {}-- for each divider foundfor st,sp in function() return string.find(input, delimiter, pos, true) end dotable.insert(arr, string.sub(input, pos, st - 1))pos = sp + 1endtable.insert(arr, string.sub(input, pos))return arr
endfunction handler.connect(fd, addr)DEBUG("New client from: ", addr, " fd: ", fd)local ips = split(addr, ":")local c = {fd = fd,ip = ips[1],uid = nil,agent = nil,lastop_ts = minute,}connection[fd] = cgateserver.openclient(fd)endfunction handler.message(fd, msg, sz)ignoreret() -- session is fd, don't call skynet.retlocal c = connection[fd]local uid = c.uidif uid then--fd为session,特殊用法c.lastop_ts = minuteskynet.redirect(c.agent, self_addr, "client", uid, msg, sz)elselocal login = liblogin.fetch_login()--fd为session,特殊用法skynet.redirect(login, self_addr, "client", fd, msg, sz)endendlocal CMD = {}--true/false
function CMD.register(source, data)log.info("Begin CMD.register %d", source)local c = connection[data.fd]if not c thenreturn falseendlog.info("ok CMD.register %d", source)c.uid = data.uidc.agent = data.agentc.key = data.keylibpublish.register(data.fd, data.uid)return true
endfunction CMD.get_connection(source, fd)log.info("Begin CMD.get_connection %d", source)local c = connection[fd]if not c thenreturn falseendlog.info("ok CMD.get_connection %d,fd=%d", source,fd)return c
endlocal function close_agent(fd)local c = connection[fd]DEBUG("gate server close_agent:", inspect(c))if c thenconnection[fd] = nilif c.uid thenlibcenter.logout(c.uid, c.key)libclientagent.kick(c.uid, fd)endlibpublish.unregister(fd)gateserver.closeclient(fd)endreturn true
end --true/false
function CMD.kick(source, fd)DEBUG("cmd.kick fd:", fd)return close_agent(fd)
end--true/false
function CMD.kick_all(source)DEBUG("cmd.kick_all")for fd, info in pairs(connection) dopcall(close_agent, fd)endreturn true
endfunction handler.disconnect(fd)DEBUG("handler.disconnect fd:", fd)return close_agent(fd)
endfunction handler.error(fd, msg)DEBUG("handler.error:", msg)handler.disconnect(fd)
endfunction handler.warning(fd, size)DEBUG("handler.warning fd:", fd, " size:", size)
endfunction handler.command(cmd, source, ...)DEBUG("gate server handler command:", cmd)local f = assert(CMD[cmd])return f(source, ...)
endgateserver.start(handler)local function do_check()local idx = 0for fd, c in pairs(connection) doidx = idx + 1to_check[idx] = fdendlocal fd, c = nil, nilfor i=1, idx dofd = to_check[i]c = connection[fd]if c thenif (minute - c.lastop_ts) > 1 thenclose_agent(fd)endendif i % EACH == 0 thenskynet.yield()endend
endlocal function start_init()liblogin = require "liblogin"libcenter = require "libcenter"libclientagent = require "libclientagent"libpublish = require "libpublish"
endskynet.init(function ()skynet.timeout(10, start_init)skynet.fork(function()while true doskynet.sleep(3000)minute = tonumber(os_date("%M"))endend)skynet.fork(function()while true doskynet.sleep(6000)minute = tonumber(os_date("%M")) -- check前也更新下当前minutepcall(do_check)endend)
end)

增加的连接管理

local connection = {} 
-- fd -> { fd, ip, uid, agent, key, lastop_ts }
-- 增加了用户ID、最后操作时间等业务字段local to_check = {} -- 用于超时检查的临时表
local minute = tonumber(os_date("%M")) -- 当前分钟数

智能消息路由

通过登录时绑定的fd和agent,使用skynet.redirect对用户进行消息的转发,因为使用skynet.redirect派发"client"类型消息,所以需要skynet.register_protocol注册client协议

function handler.message(fd, msg, sz)local c = connection[fd]local uid = c.uidif uid then-- 已登录用户:消息转发到对应agentc.lastop_ts = minuteskynet.redirect(c.agent, self_addr, "client", uid, msg, sz)else-- 未登录用户:消息转发到登录服务local login = liblogin.fetch_login()skynet.redirect(login, self_addr, "client", fd, msg, sz)end
end

用户注册机制

例如可以在用户login时调用register绑定agent和fd

function CMD.register(source, data)local c = connection[data.fd]if not c thenreturn falseend-- 注册用户信息c.uid = data.uidc.agent = data.agentc.key = data.keylibpublish.register(data.fd, data.uid) -- 发布注册事件return true
end

超时检查机制

local function do_check()local idx = 0-- 收集所有连接for fd, c in pairs(connection) doidx = idx + 1to_check[idx] = fdend-- 检查超时连接for i=1, idx dolocal fd = to_check[i]local c = connection[fd]if c and (minute - c.lastop_ts) > 1 thenclose_agent(fd) -- 关闭超时连接endif i % EACH == 0 thenskynet.yield() -- 每50个连接让出CPU,避免阻塞endend
end

调用链路分析

启动流程

gateserver.start(handler) → CMD.open() → socketdriver.listen()→ handler.open() 

新连接到达

socket消息 → MSG.open() → handler.connect() → 记录connection信息→ gateserver.openclient()开始接收数据

消息处理流程

客户端发送数据 → socket消息 → MSG.data() → handler.message() → 根据是否登录路由到agent或login服务

用户登录后转发

login服务 → CMD.register() → 更新connection信息→ 后续消息直接转发到agent

连接关闭

socket关闭 → MSG.close() → handler.disconnect()close_agent()清理资源

可以把这三个文件想象成一个公司的前台接待系统:
gateserver.lua - 大楼门禁系统

  • 功能:管理谁可以进大楼,基本的进出控制
  • 职责:开门、关门、统计人数、防止超员

gate.lua - 前台接待员

  • 功能:登记访客信息,决定把访客引导到哪里
  • 职责:记录访客信息、分配接待部门、传递消息

gateway.lua - 智能接待系统

  • 功能:在基础接待上增加了业务逻辑
  • 职责:验证身份、记录最后活动时间、自动清理长时间不活动的访客

完整工作流程:

  1. 访客到达(连接建立)→ 门禁系统记录 → 前台登记
  2. 访客说话(消息到达)→ 前台根据情况:
  • 未认证:转给认证部门(login服务)
  • 已认证:直接转给对应负责人(agent服务)
  1. 认证成功 → 前台更新记录,后续直接转接
  2. 访客离开(连接断开)→ 前台清理记录,通知相关部门
http://www.dtcms.com/a/426804.html

相关文章:

  • OpenCV基础操作与图像处理
  • 北京高端网站建设图片大全dede做手机网站
  • 关于Pycharm的conda虚拟环境包更改路径问题的配置问题
  • 从Docker到K8s:MySQL容器化部署的终极进化论
  • Windows Server 2022离线搭建Gitlab
  • iPhone 用户如何通过鼠标提升操作体验?
  • 传统小型企业做网站的好处wordpress的主题切换不成功
  • 开个小网站要怎么做网络培训中心
  • 【Linux】库的制作与原理(2)
  • 制作英文网站费用wordpress添加网站
  • synchronized底层原理+锁升级
  • VGG改进(12):PositionAttentionModule 源码解析与设计思想
  • OpenCV项目实战——数字识别代码及食用指南
  • Promise详细解析
  • 【从Vue3到React】Day 1: React基础概念
  • Hotfixes数据库工作原理、机制和应用流程
  • 网站建设面试表wordpress建m域名网站
  • Node.js面试题及详细答案120题(93-100) -- 错误处理与调试篇
  • pc端js动态调用提示音音频报错的问题解决
  • 网站的建设特色网站开发培训哪家好
  • C# 中的 简单工厂模式 (Simple Factory)
  • Docker linux 离线部署springcloud
  • 第 2 天:搭建 C 语言开发环境 ——VS Code/Dev-C++/Code::Blocks 安装与配置全指南
  • 基于 Celery 的分布式文件监控系统
  • CATIA二次开发(2)C#启用AOT
  • Linux 驱动开发与内核通信机制——超详细教程
  • 【langgraph】本地部署方法及实例分析
  • Linux入门指南:从零掌握基础指令
  • 做笔记的网站源码江永网站建设
  • 是时候重启了:AIGC将如何重构UI设计师的学习路径与知识体系?