skynet.newservice接口分析
skynet.newservice接口分析
- Launcher 服务实现
- LAUNCH 命令处理
- 核心:launch_service 函数
- skynet.launch 的底层实现
- 服务启动的异步流程
- 服务初始化完成的通知
- LAUNCHOK 和 ERROR 处理
- 关键数据结构说明
- 完整的启动流程总结
- 错误处理机制
function skynet.newservice(name, ...)return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end
执行流程:
- 向 .launcher 服务发送一个 Lua 调用
- 命令:LAUNCH
- 参数:“snlua” + 服务名 + 其他参数
- 使用 skynet.call 同步等待结果
Launcher 服务实现
local skynet = require "skynet"
local core = require "skynet.core"
require "skynet.manager" -- import manager apis
local string = stringlocal services = {}
local command = {}
local instance = {} -- for confirm (function command.LAUNCH / command.ERROR / command.LAUNCHOK)
local launch_session = {} -- for command.QUERY, service_address -> sessionlocal function handle_to_address(handle)return tonumber("0x" .. string.sub(handle , 2))
endlocal NORET = {}function command.LIST()local list = {}for k,v in pairs(services) dolist[skynet.address(k)] = vendreturn list
endlocal function list_srv(ti, fmt_func, ...)local list = {}local sessions = {}local req = skynet.request()for addr in pairs(services) dolocal r = { addr, "debug", ... }req:add(r)sessions[r] = addrendfor req, resp in req:select(ti) dolocal addr = req[1]if resp thenlocal stat = resp[1]list[skynet.address(addr)] = fmt_func(stat, addr)elselist[skynet.address(addr)] = fmt_func("ERROR", addr)endsessions[req] = nilendfor session, addr in pairs(sessions) dolist[skynet.address(addr)] = fmt_func("TIMEOUT", addr)endreturn list
endfunction command.STAT(addr, ti)return list_srv(ti, function(v) return v end, "STAT")
endfunction command.KILL(_, handle)skynet.kill(handle)local ret = { [skynet.address(handle)] = tostring(services[handle]) }services[handle] = nilreturn ret
endfunction command.MEM(addr, ti)return list_srv(ti, function(kb, addr)local v = services[addr]if type(kb) == "string" thenreturn string.format("%s (%s)", kb, v)elsereturn string.format("%.2f Kb (%s)",kb,v)endend, "MEM")
endfunction command.GC(addr, ti)for k,v in pairs(services) doskynet.send(k,"debug","GC")endreturn command.MEM(addr, ti)
endfunction command.REMOVE(_, handle, kill)services[handle] = nillocal response = instance[handle]if response then-- instance is deadresponse(not kill) -- return nil to caller of newservice, when kill == falseinstance[handle] = nillaunch_session[handle] = nilend-- don't return (skynet.ret) because the handle may exitreturn NORET
endlocal function launch_service(service, ...)local param = table.concat({...}, " ")local inst = skynet.launch(service, param)local session = skynet.context()local response = skynet.response()if inst thenservices[inst] = service .. " " .. paraminstance[inst] = responselaunch_session[inst] = sessionelseresponse(false)returnendreturn inst
endfunction command.LAUNCH(_, service, ...)launch_service(service, ...)return NORET
endfunction command.LOGLAUNCH(_, service, ...)local inst = launch_service(service, ...)if inst thencore.command("LOGON", skynet.address(inst))endreturn NORET
endfunction command.ERROR(address)-- see serivce-src/service_lua.c-- init failedlocal response = instance[address]if response thenresponse(false)launch_session[address] = nilinstance[address] = nilendservices[address] = nilreturn NORET
endfunction command.LAUNCHOK(address)-- init noticelocal response = instance[address]if response thenresponse(true, address)instance[address] = nillaunch_session[address] = nilendreturn NORET
endfunction command.QUERY(_, request_session)for address, session in pairs(launch_session) doif session == request_session thenreturn addressendend
end-- for historical reasons, launcher support text command (for C service)skynet.register_protocol {name = "text",id = skynet.PTYPE_TEXT,unpack = skynet.tostring,dispatch = function(session, address , cmd)if cmd == "" thencommand.LAUNCHOK(address)elseif cmd == "ERROR" thencommand.ERROR(address)elseerror ("Invalid text command " .. cmd)endend,
}skynet.dispatch("lua", function(session, address, cmd , ...)cmd = string.upper(cmd)local f = command[cmd]if f thenlocal ret = f(address, ...)if ret ~= NORET thenskynet.ret(skynet.pack(ret))endelseskynet.ret(skynet.pack {"Unknown command"} )end
end)skynet.start(function() end)
在 launcher.lua 中:
skynet.start(function() end)
这是一个空的启动函数,即launcher 服务启动后立即进入消息循环状态,等待处理命令
LAUNCH 命令处理
function command.LAUNCH(_, service, ...)launch_service(service, ...)return NORET
end
NORET 是一个特殊标记,表示这个命令不会立即返回结果
核心:launch_service 函数
local function launch_service(service, ...)-- 1. 构建参数字符串local param = table.concat({...}, " ")-- 2. 启动服务(核心调用)local inst = skynet.launch(service, param)-- 3. 获取当前会话上下文local session = skynet.context()-- 4. 创建响应对象local response = skynet.response()if inst then-- 5. 记录服务信息services[inst] = service .. " " .. paraminstance[inst] = response -- 保存响应回调launch_session[inst] = session -- 保存会话ID-- 注意:这里没有立即返回!else-- 启动失败,立即返回 falseresponse(false)returnendreturn inst
end
skynet.launch 的底层实现
skynet.launch 是 C 函数,在 skynet_server.c 中实现:
static const char *
cmd_launch(struct skynet_context * context, const char * param) {size_t sz = strlen(param);char tmp[sz+1];strcpy(tmp,param);char * args = tmp;char * mod = strsep(&args, " \t\r\n");args = strsep(&args, "\r\n");struct skynet_context * inst = skynet_context_new(mod,args);if (inst == NULL) {return NULL;} else {id_to_hex(context->result, inst->handle);return context->result;}
}
它调用底层的 skynet_command 函数,命令为 “LAUNCH”
服务启动的异步流程
关键点: 服务启动是异步的!
时序图:
caller -> launcher(LAUNCH) -> C层创建服务 -> 新服务初始化 -> 通知launcher| | | | ||-- 等待响应 --| | | || |-- 记录pending响应 -| | || | |-- 启动snlua服务 -| || | | |-- 执行init --|| | | |-- 成功: 发送"" --|| | | |-- 失败: 发送"ERROR"|| |<- 收到通知,调用response ------------||<- 收到响应 --|
服务初始化完成的通知
当新服务初始化完成后,会通过文本协议通知 launcher:
skynet.register_protocol {name = "text",id = skynet.PTYPE_TEXT,unpack = skynet.tostring,dispatch = function(session, address, cmd)if cmd == "" thencommand.LAUNCHOK(address) -- 启动成功elseif cmd == "ERROR" thencommand.ERROR(address) -- 启动失败elseerror ("Invalid text command " .. cmd)endend,
}
LAUNCHOK 和 ERROR 处理
function command.LAUNCHOK(address)-- 初始化成功通知local response = instance[address]if response thenresponse(true, address) -- 回调:返回 true 和地址instance[address] = nil -- 清理launch_session[address] = nilendreturn NORET
endfunction command.ERROR(address)-- 初始化失败local response = instance[address]if response thenresponse(false) -- 回调:返回 falselaunch_session[address] = nilinstance[address] = nilendservices[address] = nil -- 从服务列表中移除return NORET
end
关键数据结构说明
-- 全局服务表:地址 -> 服务描述
local services = {}-- 待确认的服务实例:地址 -> response回调函数
local instance = {}-- 启动会话映射:地址 -> 会话ID
local launch_session = {}
完整的启动流程总结
- 调用阶段:newservice → launcher:LAUNCH
- 记录阶段:launcher 记录 pending 状态,不立即返回
- 创建阶段:C 层创建新服务进程
- 初始化阶段:新服务执行初始化代码
- 通知阶段:新服务向 launcher 发送启动结果
- 回调阶段:launcher 调用对应的 response 回调
- 返回阶段:原始调用方收到启动结果
-- 用户调用
local my_service = skynet.newservice("myservice", "param1", "param2")-- 实际执行:
-- 1. 调用 .launcher:LAUNCH("snlua", "myservice", "param1", "param2")
-- 2. launcher 调用 skynet.launch("snlua", "myservice param1 param2")
-- 3. 新服务 myservice 开始初始化
-- 4. 初始化成功:myservice 向 launcher 发送空字符串 ""
-- 5. launcher 调用 response(true, address)
-- 6. 用户收到服务地址
错误处理机制
-- 如果 skynet.launch 返回 nil,立即失败
if inst then-- 正常流程
elseresponse(false) -- 立即返回失败return
end-- 如果服务初始化过程中出错
function command.ERROR(address)response(false) -- 异步返回失败services[address] = nil
end
为什么使用异步设计?
- 服务初始化可能需要时间(加载文件、连接数据库等)
- 避免阻塞 launcher 服务处理其他请求
- 支持服务的依赖初始化