在使用 c + lua + mongodb 开发游戏服务器的时候,在上面实现了一套数据库读写方案,做到每帧落地,使用起来感觉还行。
首先是从数据库读数据,这个就比较简单了,lua的数据结构和mongodb基本就是一致的,数据读取出来后就是一个table,基本上直接设置上metatable,包装成一类就可以使用。
这里需要注意的一个地方就是mongodb中的Array:如果luatable使用number做key,在保存的时候会被转换为Array,如果key值不连续还比较大,在保存的时候就会生成一个极大的Array,所以在保存的时候将key转换为string是比较好的做法,读取的时候再转换回来,我使用的bson库是云风的https://github.com/cloudwu/lua-bson 修改了一点,实现了上述的转换,不在lua层做这个事情,lua的代码也可以简洁不少。
然后是如何保存数据,我采用的是记录数据的变化(增加/删除/修改),然后在帧尾统一保存。
因为使用的是mongodb,数据在保存的时候并不需要进行转换,直接通过upsert的方式就可以写入,通过upsert就覆盖了增加/修改两个场景,如何记录删除呢,特别是在一帧中对数据进行多次操作的时候?
例如有数据 a = {b = {c = 1}},
先在b下添加 a = {b = {c1 = 1}},语句是 set “a.b.c1” = 1
然后删除b:a = {},语句是 unset “a.b” = 1
再添加b:a = {b = {c2 = 1}},语句是 set “a.b” = {c2=1},这里注意上一层unset过的话,再设置数据上去就不应该只处理子节点了。
从上面可以看出,增删是有层级关系的,上层的删操作会覆盖下层,所以需要一个数据对应的结构来记录每层进行过的操作。考虑到需要保存的数据理论上只是数据,不应该存在meta,这里为了简单点,就直接在数据上使用meta来打标记。
记录变化的主要代码如下
-- 变化的数据
__save_data = {}
-- 标记数据变化的meta
__mt_pending = __mt_pending or {}
__mt_pending.__index = function(self, k)
local t = setmetatable({}, __mt_pending)
self[k] = t
return t
end
__mt_delete = __mt_delete or {}
__mt_delete.__index = function(self, k)
local t = setmetatable({}, __mt_pending)
self[k] = t
return t
end
-- 如果需要的话,不同的handle映射不同的数据库,比如分为本服数据库,全局数据库
-- Save = new_handler('local')
-- 添加
-- Save.Add('player', 1, pid, 100)
-- 删除
-- Save.Del('player', 1, pid)
-- Add相对用的更多,通过__index实现一个更简单的用法
-- Save.player[1].pid = 100
function new_handler(tip)
local handle = {
__tip = tip,
__pending = setmetatable({}, __mt_pending), -- 当前帧改动数据
__raw_cmds = {}, -- 当前帧直接的指令(优先于pending执行)
__frame_data = {}, -- 还未入库的改动
__frame_queue = {}, -- 顺序的frame_data
__ack_window = {}, -- 流控窗口(基于顺序写入是顺序返回的)
Add = pending_add, -- 添加
Del = pending_del, -- 删除
}
setmetatable(handle, {
__index = function(t, k)
return handle.__pending[k]
end
})
_save_data[tip] = handle
return handle
end
-- 通过 __index触发函数调用,在__pending上自动生成记录层级,并为数据挂上meta,
-- 有meta的数据即表示有改动,通过所有的meta描述了一个改动的树结构
function _pending_add(t, k, v, tail, ...)
if tail == nil then
t[k] = v
else
if not t[k] then
t[k] = setmetatable({}, __mt_pending)
end
_pending_add(t[k], v, tail, ...)
end
end
function pending_add(data, ...)
local args = {...}
assert(#args == table.size(args), string.fmt('%q', args))
_pending_add(data.__pending, ...)
end
-- 标记对应的节点被删除
function _pending_del(t, k, tail, ...)
if not t[k] then
t[k] = setmetatable({}, __mt_pending)
end
if not getmetatable(t) then
setmetatable(t, __mt_pending)
end
if not tail then
t[k] = setmetatable({}, __mt_delete)
return
end
_pending_del(t[k], tail, ...)
end
function pending_del(data, ...)
local args = {...}
assert(#args == table.size(args))
_pending_del(data.__pending, ...)
end
有了上述代码,我们就可以标记数据的变化,然后在帧尾将变化转换为数据库操作,其实就是通过打包为一个大的bson,一次性发送多条cmd给数据库。
WRITE_CONCERN = {w=1,wtimeout=5000}
_save_data = {}
function save()
for k, v in pairs(_save_data) do
save_data(v)
end
end
function save_data(data, frame)
if table.is_empty(data.__raw_cmds) and table.is_empty(data.__pending) then
-- nothing to do
return
end
local frame_data = data.__frame_data[frame]
if not frame_data then
frame_data = {
__frame = frame, -- 帧序号
__cmds = {}, -- pending进来的
__raws = {}, -- 直接加的
__ncmd = 0, -- 总数
__nsend = 0, -- 发出去的请求数量
__nrecv = 0, -- 收到的ack数量
}
data.__frame_data[frame_data.__frame] = frame_data
table.insert(data.__frame_queue, frame_data)
end
-- raw_cmds记录进来
table.insert(frame_data.__raws, data.__raw_cmds)
frame_data.__ncmd = frame_data.__ncmd + #data.__raw_cmds
data.__raw_cmds = {}
-- 遍历重置__pending,生成数据库操作
local pending = data.__pending
data.__pending = setmetatable({}, __mt_pending)
for collname, coll in pairs(pending) do
local upds = {}
local dels = {}
for recid, rec in pairs(coll) do
local __mt = getmetatable(rec)
if __mt == __mt_delete then
if table.is_empty(dels) then
table.insert(dels, {
q = {_id = recid},
limit = 1,
})
else
table.insert(upds, {
q = {_id = recid},
u = {
['$set'] = rec,
},
upsert = true,
multi = false,
})
end
elseif __mt == __mt_pending then
local chgs = {set={}, unset={}}
for k, v in pairs(rec) do
check_value_chgs(chgs, k, v)
end
table.insert(upds, {
q = {_id = recid},
u = {
['$set'] = (not table.is_empty(chgs.set)) and chgs.set or nil,
['$unset'] = (not table.is_empty(chgs.unset)) and chgs.unset or nil,
},
upsert = true,
multi = false,
})
else
table.insert(upds, {
q = {_id = recid},
u = {
['$set'] = rec,
},
upsert = true,
multi = false,
})
end
-- debug.dump(upds, 'dbmng.upds')
-- debug.dump(dels, 'dbmng.dels')
-- 每1024个改动生成一个语句,所有改动生成一个超大的cmd并不正确,另外bson也是有长度限制的,
if #upds >= 1024 then
build_bson_cmd_update(frame_data.__cmds, collname, upds)
upds = {}
end
if #dels >= 1024 then
build_bson_cmd_delete(frame_data.__cmds, collname, dels)
dels = {}
end
end
build_bson_cmd_update(frame_data.__cmds, collname, upds)
build_bson_cmd_delete(frame_data.__cmds, collname, dels)
end
frame_data.__ncmd = frame_data.__ncmd + #frame_data.__cmds
-- loop_write(data)
end
function check_value_chgs(chgs, key, val)
if type(val) == 'table' then
local __mt = getmetatable(val)
if __mt == __mt_pending then
for k, v in pairs(val) do
check_value_chgs(chgs, string.format('%s.%s',key, k), v)
end
elseif __mt == __mt_delete then
if table.is_empty(val) then
chgs.unset[key] = 1
else
for k, v in pairs(val) do
check_value_chgs(chgs, string.format('%s.%s',key, k), v)
end
end
else
chgs.set[key] = val
end
else
chgs.set[key] = val
end
end
function build_bson_cmd_update(cmds, collname, docs)
if table.is_empty(docs) then
return
end
local ret, cmd = xpcall(bson.encode_order, __G__TRACEBACK__, 'update', collname, 'updates', docs, 'ordered', false, 'writeConcern', WRITE_CONCERN)
if not ret then
-- 生成失败的话拆分成二分后再次尝试生成,尽量排除掉生成失败的代码
local len = #docs
if len > 1 then
build_bson_cmd_update(cmds, collname, tool.take(docs, 1, math.floor(len/2)))
build_bson_cmd_update(cmds, collname, tool.take(docs, math.ceil(len/2), len))
else
-- ERROR('build_bson_cmd_update, failed')
-- debug.dump(cmd)
end
else
table.insert(cmds, cmd)
end
end
function build_bson_cmd_delete(cmds, collname, docs)
if table.is_empty(docs) then
return
end
local ret, cmd = xpcall(bson.encode_order, __G__TRACEBACK__, 'delete', collname, 'deletes', docs, 'ordered', false, 'writeConcern', WRITE_CONCERN)
if not ret then
local len = #docs
if len > 1 then
build_bson_cmd_delete(cmds, collname, tool.take(docs, 1, math.floor(len/2)))
build_bson_cmd_delete(cmds, collname, tool.take(docs, math.ceil(len/2), len))
else
-- ERROR('build_bson_cmd_delete, failed')
-- debug.dump(cmd)
end
else
table.insert(cmds, cmd)
end
end
最后就是保存数据,发送消息,等待回复。当然需要实现一定的流控和重试策略,在数据库异常情况下不至于丢失数据。
这里并没有去考虑说数据库不在了就写到文件之类的方式,在我看来意义不大。这里的策略是只要数据库最终能连上来,就不会丢数据。
-- 持有的数据库连接
_dbconn_map = _dbconn_map or {}
-- 流控,允许同时进行多少个写入帧,超过后不再继续发送写操作
ACK_WINDOW = 2
SAVE_STATE = {
NONE = 0,
SENT = 1,
}
function conn_new(conn, collection)
local t = {
tip = conn.tip,
conn = conn,
db = collection,
}
if not _dbconn_map[t.tip] then
_dbconn_map[t.tip] = {}
end
table.insert(_dbconn_map[t.tip], t)
gCoPool:resume_lru('wait_mongo')
end
function rand_idx(total, policy)
if total == 0 then
return nil
end
policy = policy or total
return (policy % total) + 1
end
function get_db(tip, policy)
local conns = _dbconn_map[tip] or {}
if #conns == 0 then
WARN('dbmng.get_db, not ready, tip=%s', tip)
gCoPool:yield_lru('wait_mongo')
end
local conn = conns[rand_idx(#conns, policy)]
return conn.db
end
-- 这里用了一些协程处理,作用是在等待数据库回复的时候让出,执行其他逻辑,
-- loop_write也是在协程中执行的
function loop_write(data)
while true do
local frame_data = data.__frame_queue[1]
if not frame_data then
break
end
if #data.__ack_window > ACK_WINDOW then
local sz = #data.__frame_queue
local msg = string.format('dbmng.loop_write, limit by ACK_WINDOW, nwait=%s', #data.__frame_queue)
if sz > ACK_WINDOW * 50 and sz % 50 == 0 then
ERROR(msg)
elseif sz > ACK_WINDOW * 5 and sz % 5 == 0 then
WARN(msg)
else
INFO(msg)
end
break
end
table.insert(data.__ack_window, table.remove(data.__frame_queue, 1))
local db = dbmng.get_db(data.__tip)
for _, v in ipairs(frame_data.__raws) do
for _, cmd in ipairs(v) do
gCoPool:pull(write_thread, data, frame_data, db, cmd)
end
end
for _, cmd in pairs(frame_data.__cmds) do
gCoPool:pull(write_thread, data, frame_data, db, cmd)
end
frame_data.__state = SAVE_STATE.SENT
end
end
function write_thread(data, frame_data, db, bson_cmd)
frame_data.__nsend = frame_data.__nsend + 1
local info = db:runCommandBson(bson_cmd)
frame_data.__nrecv = frame_data.__nrecv + 1
local ack_window = data.__ack_window[1]
if ack_window ~= frame_data then
CRIT('dbmng.write_thread, ack_window, missing??, sendframe=%s, gFrame=%s, ncmd=%s, nsend=%s, nrecv=%s'
, frame_data.__frame, gFrame, frame_data.__ncmd, frame_data.__nsend, frame_data.__nrecv)
elseif frame_data.__nrecv >= frame_data.__nsend then
DEBUG('dbmng.save, finish, frame=%s, gFrame=%s', frame_data.__frame, gFrame)
table.remove(data.__ack_window, 1)
end
--TODO: redo when return specific fail code
debug.dump(info)
if info.ok ~= 1 then
ERROR('dbmng.save, failed, ok=%s, code=%s, errmsg=%s', info.ok, info.code, info.errmsg)
else
end
end
有了这套落库方式,处理数据库读写简单了不少,更进一步,考虑我们要保存数据其实就是要检测到数据的变动,luatable和mongodb又挺一致的,如果我们能实现检测luatable的属性变动,自动调用Save,在写逻辑时就方便了不少,这个通过meta很容易实现,我这里采用的方式是实现一个可以检查变动的对象,所有需要保存的数据实现包装为此对象,就可以自动保存。
-- class实现了类,单继承结构,mark是用于标记同类的对象创建了多少个的,通过在lua c实现中注入代码来实现,这里忽略并不影响主题
-- 包含了两个创建方法,deliver/new和wrap,
-- deliver/new用于直接创建,
-- wrap是把数据包装成类,主要就是用于从数据库读取的数据中恢复类对象
-- 拥有两个静态属性_base和_example
-- _example用于标记哪些数据需要检测变动,这些数据在类中被保存到_pro上,通过meta触发检查
-- _base指向基类
local function _trans(d, r, example)
for k, v in pairs(r) do
if example and example[k] ~= nil then
d._pro[k] = v
else
rawset(d, k, v)
end
end
end
-- 我使用的是lua5.3,并且开启了module兼容,既可以使用module也可以使用env
-- 使用module的原因是module在热更新的时候很好用,也比较习惯了
-- 使用lua5.3的原因是lua5.1 attempt to yield across metamethod/C-call boundary,我大量使用了协程,很难受,升级是必须的
function class(env, name, mark)
if name then
local base = table.find(_G, table.unpack(string.split(name, '%.')))
assert(base, string.format("base class not found, %s", name))
env._base = setmetatable({}, {
__index=function(t, k)
if base[k] then return base[k] end
if base._base then return base._base[k] end
end,
})
end
for k, v in pairs(env._base and env._base._example or {}) do
if not env._example then env._example = {} end
env._example[k] = env._base._example[k]
end
env.init = function(self)
if env._base then env._base.init(self) end
end
env._meta = {
__index=function(t, k)
if not rawget(t, '_pro') then
error('_pro is missing')
elseif t._pro[k] ~= nil then
return t._pro[k]
else
local v = rawget(t, k) or rawget(env, k)
if v then return v end
return env._base and env._base[k]
end
end,
__newindex=function(t, k, v)
if env._example and env._example[k] ~= nil then
if (not t._init) and t.on_value_change then
t:on_value_change(k, t._pro[k], v)
end
t._pro[k] = v
return
end
rawset(t, k, v)
end
}
env.new = function(...)
local o = mark and mark() or {}
o._pro = table.copy(env._example) or {}
setmetatable(o, env._meta)
o._init = true
o:init()
if o.ctor then o:ctor(...) end
o._init = nil
return o
end
env.wrap = function(t)
local o = mark and mark() or {}
o._pro = table.copy(env._example) or {}
setmetatable(o, env._meta)
o._init = true
o.init(o)
local pro = rawget(t, '_pro')
if pro then
_trans(o, pro, env._example)
rawset(t, "_pro", nil)
end
_trans(o, t, env._example)
o._init = nil
return o
end
end
使用方式如下
module("player", package.seeall)
_example = {
_id = 0,
pid = 0,
name = '',
data = {},
}
class(_ENV, nil, nil)
function init(self)
self.temp = 1
end
function on_value_change(self, key, old, new)
Save.player[self._id][key] = new
end
--
local A = player.new()
-- 修改name会触发meta,调用on_value_change,最终落库
A.name = 'hello'
-- 这个并不会触发meta
A.temp = 2
-- 这样并不会触发
A.data.key = 'value'
-- 我们可以这样强制触发
A.data = A.data
-- 或者手动调用Save
Save.player[A._id].data.key = A.data.key
到这里,实现了一个比较好用的落库做法,可以看到还是又些不完美的地方,比如在处理子层级的时候我们还是需要手动调用save,但是解决了80%的问题。游戏服务器里面数据并不是很复杂,层级不多,层级内的数据也不太多,通常直接A.data = A.data简单粗暴。
因为class的实现多了一个间层,然后每帧都会去写数据库,性能可能跟不上,但是实测没啥影响,性能热点不在这里,每帧的数据变动也没有想象那么多,反而因为落地即时,即使服务器挂掉也只会丢失几帧数据,这点上来说还是很重要。
不知道有没有其他更好的方案?