skynet源码学习-skynet_timer定时器
核心数据结构
struct timer {
struct link_list near[256]; // 近期定时器(0-255个单位)表示0~255个单位时间
struct link_list t[4][64]; // 4级时间轮(每级64槽)
struct spinlock lock; // 自旋锁,保护定时器结构
uint32_t time; // 当前时间(单位:10ms)从启动开始计数
uint32_t starttime; // 系统启动时间(秒)(从1970年开始的秒数)
uint64_t current; // 当前时间(10ms单位)与time相同,但类型是64位
uint64_t current_point; // 最近一次调用的实际时间(单位:10ms,从CLOCK_MONOTONIC获取)
};
时间轮分为5个数组:
- near: 一个大小为256的数组,表示最近256个单位时间(10ms)内将要触发的定时器
- t[4][64]: 4个层级,每个层级64个槽位,每个层级的时间跨度分别是:
- 第一层:256 * 64 = 16384 个单位(即16384*10ms≈163.84s)
- 第二层:16384 * 64 = 1048576 个单位(约10485.76s≈2.9小时)
- 第三层:1048576 * 64 = 67108864 个单位(约671088.64s≈7.77天)
- 第四层:67108864 * 64 = 4294967296 个单位(约42949672.96s≈497天)
接口功能详解
1. skynet_timer_init()
void
skynet_timer_init(void) {
TI = timer_create_timer();
uint32_t current = 0;
systime(&TI->starttime, ¤t);
TI->current = current;
TI->current_point = gettime();
}
- 功能:初始化定时器系统
- 流程:
- 创建时间轮结构体
- 初始化所有链表
- 记录系统启动时间
- 设置初始时间点
- 调用时机:Skynet 启动时
2. skynet_timeout(handle, time, session)
int
skynet_timeout(uint32_t handle, int time, int session) {
if (time <= 0) {
struct skynet_message message;
message.source = 0;
message.session = session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
if (skynet_context_push(handle, &message)) {
return -1;
}
} else {
struct timer_event event;
event.handle = handle;
event.session = session;
timer_add(TI, &event, sizeof(event), time);
}
return session;
}
- 功能:设置定时器,参数包括目标服务句柄(handle)、定时时间(单位是10ms,即time个10ms后触发)、以及会话号(session),返回session,如果设置失败返回-1
- 参数:
- handle:目标服务句柄
- time:延迟时间(单位:10ms)
- session:会话ID
- 流程:
3. timer_add(T, arg, sz, time)
static void
timer_add(struct timer *T,void *arg,size_t sz,int time) {
struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
memcpy(node+1,arg,sz);
SPIN_LOCK(T);
node->expire=time+T->time;
add_node(T,node);
SPIN_UNLOCK(T);
}
功能:添加一个定时器
- 分配一个timer_node节点,后面跟着用户数据(即timer_event)
- 将节点插入到时间轮的合适位置(通过add_node函数)
- 插入时加锁
4. add_node(T, arg, sz, time)
static void
add_node(struct timer *T,struct timer_node *node) {
uint32_t time=node->expire;
uint32_t current_time=T->time;
if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
link(&T->near[time&TIME_NEAR_MASK],node);
} else {
int i;
uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
for (i=0;i<3;i++) {
if ((time|(mask-1))==(current_time|(mask-1))) {
break;
}
mask <<= TIME_LEVEL_SHIFT;
}
link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}
功能:根据定时器的过期时间,将其插入到时间轮的相应层级
- 如果定时器的过期时间在最近256个单位时间内,则插入到near数组的相应槽位
- 否则,计算应该插入到哪个层级和槽位
5. timer_shift(T)
static void
timer_shift(struct timer *T) {
int mask = TIME_NEAR;
uint32_t ct = ++T->time;
if (ct == 0) {
move_list(T, 3, 0);
} else {
uint32_t time = ct >> TIME_NEAR_SHIFT;
int i=0;
while ((ct & (mask-1))==0) {
int idx=time & TIME_LEVEL_MASK;
if (idx!=0) {
move_list(T, i, idx);
break;
}
mask <<= TIME_LEVEL_SHIFT;
time >>= TIME_LEVEL_SHIFT;
++i;
}
}
}
功能:时间推进时,将高层的定时器重新分配到低层
- 每次调用,当前时间T->time加1
- 检查是否有需要降级的定时器(比如,当time的256槽位满了,就需要将下一层(第一层)的第一个槽位的定时器重新分配到near和第一层的其他槽位)
6. timer_execute(T)
static inline void
timer_execute(struct timer *T) {
int idx = T->time & TIME_NEAR_MASK;
while (T->near[idx].head.next) {
struct timer_node *current = link_clear(&T->near[idx]);
SPIN_UNLOCK(T);
// dispatch_list don't need lock T
dispatch_list(current);
SPIN_LOCK(T);
}
}
功能:执行当前时间槽的所有定时器
- 从near数组中取出当前时间槽的所有定时器
- 然后调用dispatch_list发送消息
7. dispatch_list(current)
static inline void
dispatch_list(struct timer_node *current) {
do {
struct timer_event * event = (struct timer_event *)(current+1);
struct skynet_message message;
message.source = 0;
message.session = event->session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
skynet_context_push(event->handle, &message);
struct timer_node * temp = current;
current=current->next;
skynet_free(temp);
} while (current);
}
功能:处理定时器触发事件
- 每个定时器节点后面跟着一个timer_event(包含handle和session)
- 构造一个消息(类型为PTYPE_RESPONSE,数据为NULL),然后通过skynet_context_push推送到目标服务的消息队列
8. skynet_updatetime()
void
skynet_updatetime(void) {
uint64_t cp = gettime(); // 获取当前时间
if(cp < TI->current_point) {
skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point);
TI->current_point = cp;
} else if (cp != TI->current_point) {
uint32_t diff = (uint32_t)(cp - TI->current_point);
TI->current_point = cp;
TI->current += diff;
int i;
for (i=0;i<diff;i++) {
timer_update(TI); // 逐单位更新时间
}
}
}
- 功能:驱动定时器更新,这个函数需要由系统定期调用(通常在主循环中),用于驱动定时器的更新
9. timer_update(T)
static void
timer_update(struct timer *T) {
SPIN_LOCK(T);
// try to dispatch timeout 0 (rare condition)
timer_execute(T);
// shift time first, and then dispatch timer message
timer_shift(T);
timer_execute(T);
SPIN_UNLOCK(T);
}
功能:更新时间轮,包括执行当前时间的定时器和时间轮的移位操作
- 先执行当前时间槽的定时器(timer_execute)
- 然后调用timer_shift推进时间轮
- 再次执行新的当前时间槽的定时器(因为移位操作可能将一些定时器移到了near数组的当前槽)
- 定时器更新核心:
10. skynet_now()
uint64_t
skynet_now(void) {
return TI->current;
}
- 功能:获取当前时间(单位:10ms)
- 返回值:从启动开始的10ms计数
11. skynet_starttime()
uint32_t
skynet_starttime(void) {
return TI->starttime;
}
- 功能:获取系统启动时间戳(秒)
12. skynet_thread_time()
#define NANOSEC 1000000000
#define MICROSEC 1000000
uint64_t
skynet_thread_time(void) {
struct timespec ti;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ti);
return (uint64_t)ti.tv_sec * MICROSEC + (uint64_t)ti.tv_nsec / (NANOSEC / MICROSEC);
}
- 功能:获取线程CPU时间(微秒级)
- 用途:性能分析
时间轮设计优势
1. 五级时间轮结构
- 近期轮(256槽):0-255个单位(2.56秒)
- 第一层:256 * 64 = 16384 个单位(即16384*10ms≈163.84s)
- 第二层:16384 * 64 = 1048576 个单位(约10485.76s≈2.9小时)
- 第三层:1048576 * 64 = 67108864 个单位(约671088.64s≈7.77天)
- 第四层:67108864 * 64 = 4294967296 个单位(约42949672.96s≈497天)
2. 锁优化设计
- 使用细粒度自旋锁(非互斥锁)
- 锁只保护时间轮操作
- 事件派发时不持锁
3. 分层管理策略
- 近期事件:直接存储在near数组
- 远期事件:存储在分层时间轮
- 时间推进时自动降级迁移
4. 常数级操作效率
- 添加/删除定时器:直接哈希定位,无需遍历
- 触发处理:仅处理当前槽位链表
5. 低内存开销
- 链表节点内存复用
- 固定大小的数组结构
6. 高精度与长周期平衡
- 10ms基础时间单位
- 支持长达数年的定时
- 自动管理定时器迁移
7. 无遍历设计
- 传统方案:每次扫描所有定时器(O(n))
- 本方案:仅处理到期槽位(O(1))
8. 时间跳变容错
- 检测时间回退:if(cp < TI->current_point)
- 自动校正时间基准
工作流程示意图
Skynet 定时器业务使用注意事项与潜在隐患
使用注意事项
1. 时间单位精度
- 定时器使用 10ms(1/100秒) 作为基础时间单位
- 业务层设置时间参数时需换算:
-- Lua层设置1秒定时器
skynet.timeout(100, callback) -- 100个单位 = 1000ms
- 无法设置小于10ms的定时器
2. 定时器不可取消
- 框架未提供取消接口,业务层需自行处理:
local active_timers = {}
function set_timer(id, time, cb)
local session = skynet.timeout(time, function()
if active_timers[id] then
cb()
active_timers[id] = nil
end
end)
active_timers[id] = session
end
function cancel_timer(id)
active_timers[id] = nil
end
3. 消息处理顺序
- 定时器消息类型为 PTYPE_RESPONSE
- 需在消息处理中区分普通响应和定时触发:
function on_message(msg, sz, session, msg_type)
if msg_type == skynet.PTYPE_RESPONSE then
-- 定时器触发处理
else
-- 其他消息处理
end
end
4. 时间同步问题
- 使用 skynet.now() 获取框架时间而非系统时间
- 跨节点时间需自行同步:
local start = skynet.starttime() -- 系统启动UTC时间
local offset = start + skynet.now()/100 -- 当前UTC秒数
潜在隐患与解决方案
1. 定时器风暴(危险指数:★★★★)
- 场景:大量定时器在同一时间点触发
- 表现:
- 消息队列瞬间暴涨
- 服务消息处理延迟增加
- 可能触发消息队列溢出
- 解决方案:
– 添加随机偏移分散触发
function safe_timeout(time, cb)
local jitter = math.random(1, 20) -- ±200ms随机抖动
skynet.timeout(time + jitter, cb)
end
2. 定时器泄漏(危险指数:★★★)
- 场景:服务退出时未清理定时器
- 后果:
- 框架继续向已销毁服务发送消息
- 增加无效消息处理开销
- 解决方案:
function service:exit()
-- 清理所有关联定时器
for id, _ in pairs(self.timers) do
self.timers[id] = nil
end
skynet.exit()
end
3. 时间漂移问题(危险指数:★★)
- 原因:
- 系统负载高导致 skynet_updatetime() 调用延迟,定时器的触发依赖于skynet_updatetime的调用,该函数通常在Skynet的主线程中调用(在timer线程中),如果系统负载过高,可能导致skynet_updatetime不能及时执行,从而造成定时器触发延迟
- 时间轮推进滞后
- 检测方法:
// 在skynet_updatetime中
if (diff > 100) { // 超过1秒未更新
skynet_error(NULL, "timer lag %d units", diff);
}
- 缓解措施:
- 优化高负载业务逻辑
- 避免单次处理消耗过大CPU时间
4. 跨节点时间不一致
- 问题:集群环境下各节点时间轮不同步
- 解决方案:
– 使用集中式时间服务
function cluster_timeout(node, handle, time, session)
if node == self_node then
skynet.timeout(time, ...)
else
cluster.send(node, "timer", "set", ...)
end
end
5. 长时间定时器精度损失
- 现象:设置在远期(如几天后)的定时器误差增大
- 原因:高层时间轮迁移时精度降低
- 建议:
- 超过1小时的定时器应使用数据库+轮询
- 或分解为:长期存储 + 短期定时器刷新
6. 最长定时器限制:实际最长定时器约497天(1.36年),超长定时器处理:超过1年的定时器最好有特殊处理
-- 超过1年的定时器应使用持久化存储
function set_long_timer(days, callback)
local expire_time = os.time() + days*86400
db:save_timer(expire_time, callback)
-- 设置短期检查器
skynet.timeout(8640000, function() check_persistent_timers() end)
end
7. 时间回绕处理:32位整数约497天后会溢出归零
// 在timer_shift函数中
if (ct == 0) {
move_list(T, 3, 0); // 处理时间溢出
}
可以优化点思考(视实际需求而定)
1. 批量定时器处理
– 避免大量独立定时器
local function batch_scheduler()
while active do
process_batch() -- 处理一批任务
skynet.sleep(200) -- 休眠2秒
end
end
2. 时间轮缓存优化
// 在timer_update中增加批处理
#define MAX_DISPATCH 50
void timer_execute(struct timer *T) {
int count = 0;
while (T->near[idx].head.next && count++ < MAX_DISPATCH) {
// 每次最多处理50个
}
}
3. 定时器类型分级
-- 关键定时器:使用独立时间轮
function critical_timeout(time, cb)
-- 单独实现高精度时间轮
end
典型错误案例
案例:分布式锁超时不同步
-- 节点A设置锁超时
lock_timeout = skynet.timeout(300, release_lock)
-- 节点B检查超时
if os.time() > lock.expire then -- 错误!使用了系统时间
take_lock()
end
正确做法:
local lock = {
expire = skynet.now() + 300, -- 使用框架时间
node = skynet.self_node()
}
-- 检查时
if skynet.now() > lock.expire then
take_lock()
end