skynet源码学习-skynet_timer定时器

发布于:2025-06-11 ⋅ 阅读:(21) ⋅ 点赞:(0)

核心数据结构

struct timer {
    struct link_list near[256];    // 近期定时器(0-255个单位)表示0~255个单位时间
    struct link_list t[4][64];     // 4级时间轮(每级64槽)
    struct spinlock lock;           // 自旋锁,保护定时器结构
    uint32_t time;                 // 当前时间(单位:10ms)从启动开始计数
    uint32_t starttime;            // 系统启动时间(秒)(从1970年开始的秒数)
    uint64_t current;              // 当前时间(10ms单位)与time相同,但类型是64位
    uint64_t current_point;        // 最近一次调用的实际时间(单位:10ms,从CLOCK_MONOTONIC获取)
};

时间轮分为5个数组:

  • near: 一个大小为256的数组,表示最近256个单位时间(10ms)内将要触发的定时器
  • t[4][64]: 4个层级,每个层级64个槽位,每个层级的时间跨度分别是:
  • 第一层:256 * 64 = 16384 个单位(即16384*10ms≈163.84s)
  • 第二层:16384 * 64 = 1048576 个单位(约10485.76s≈2.9小时)
  • 第三层:1048576 * 64 = 67108864 个单位(约671088.64s≈7.77天)
  • 第四层:67108864 * 64 = 4294967296 个单位(约42949672.96s≈497天)
    在这里插入图片描述

接口功能详解

1. skynet_timer_init()

void 
skynet_timer_init(void) {
    TI = timer_create_timer();
    uint32_t current = 0;
    systime(&TI->starttime, &current);
    TI->current = current;
    TI->current_point = gettime();
}
  • 功能:初始化定时器系统
  • 流程:
    1. 创建时间轮结构体
    2. 初始化所有链表
    3. 记录系统启动时间
    4. 设置初始时间点
  • 调用时机:Skynet 启动时

2. skynet_timeout(handle, time, session)

int
skynet_timeout(uint32_t handle, int time, int session) {
    if (time <= 0) {
        struct skynet_message message;
        message.source = 0;
        message.session = session;
        message.data = NULL;
        message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

        if (skynet_context_push(handle, &message)) {
            return -1;
        }
    } else {
        struct timer_event event;
        event.handle = handle;
        event.session = session;
        timer_add(TI, &event, sizeof(event), time); 
    }

    return session;
}
  • 功能:设置定时器,参数包括目标服务句柄(handle)、定时时间(单位是10ms,即time个10ms后触发)、以及会话号(session),返回session,如果设置失败返回-1
  • 参数:
    • handle:目标服务句柄
    • time:延迟时间(单位:10ms)
    • session:会话ID
  • 流程:
    在这里插入图片描述

3. timer_add(T, arg, sz, time)

static void
timer_add(struct timer *T,void *arg,size_t sz,int time) {
    struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
    memcpy(node+1,arg,sz);

    SPIN_LOCK(T);

        node->expire=time+T->time;
        add_node(T,node);

    SPIN_UNLOCK(T);
}

功能:添加一个定时器

  • 分配一个timer_node节点,后面跟着用户数据(即timer_event)
  • 将节点插入到时间轮的合适位置(通过add_node函数)
  • 插入时加锁

4. add_node(T, arg, sz, time)

static void
add_node(struct timer *T,struct timer_node *node) {
    uint32_t time=node->expire;
    uint32_t current_time=T->time;
    
    if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
        link(&T->near[time&TIME_NEAR_MASK],node);
    } else {
        int i;
        uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
        for (i=0;i<3;i++) {
            if ((time|(mask-1))==(current_time|(mask-1))) {
                break;
            }
            mask <<= TIME_LEVEL_SHIFT;
        }

        link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
    }
}

功能:根据定时器的过期时间,将其插入到时间轮的相应层级

  • 如果定时器的过期时间在最近256个单位时间内,则插入到near数组的相应槽位
  • 否则,计算应该插入到哪个层级和槽位

5. timer_shift(T)

static void
timer_shift(struct timer *T) {
    int mask = TIME_NEAR;
    uint32_t ct = ++T->time;
    if (ct == 0) {
        move_list(T, 3, 0);
    } else {
        uint32_t time = ct >> TIME_NEAR_SHIFT;
        int i=0;

        while ((ct & (mask-1))==0) {
            int idx=time & TIME_LEVEL_MASK;
            if (idx!=0) {
                move_list(T, i, idx);
                break;              
            }
            mask <<= TIME_LEVEL_SHIFT;
            time >>= TIME_LEVEL_SHIFT;
            ++i;
        }
    }
}

功能:时间推进时,将高层的定时器重新分配到低层

  • 每次调用,当前时间T->time加1
  • 检查是否有需要降级的定时器(比如,当time的256槽位满了,就需要将下一层(第一层)的第一个槽位的定时器重新分配到near和第一层的其他槽位)

6. timer_execute(T)

static inline void
timer_execute(struct timer *T) {
    int idx = T->time & TIME_NEAR_MASK;
    
    while (T->near[idx].head.next) {
        struct timer_node *current = link_clear(&T->near[idx]);
        SPIN_UNLOCK(T);
        // dispatch_list don't need lock T
        dispatch_list(current);
        SPIN_LOCK(T);
    }
}

功能:执行当前时间槽的所有定时器

  • 从near数组中取出当前时间槽的所有定时器
  • 然后调用dispatch_list发送消息

7. dispatch_list(current)

static inline void
dispatch_list(struct timer_node *current) {
    do {
        struct timer_event * event = (struct timer_event *)(current+1);
        struct skynet_message message;
        message.source = 0;
        message.session = event->session;
        message.data = NULL;
        message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

        skynet_context_push(event->handle, &message);
        
        struct timer_node * temp = current;
        current=current->next;
        skynet_free(temp);  
    } while (current);
}

功能:处理定时器触发事件

  • 每个定时器节点后面跟着一个timer_event(包含handle和session)
  • 构造一个消息(类型为PTYPE_RESPONSE,数据为NULL),然后通过skynet_context_push推送到目标服务的消息队列

8. skynet_updatetime()

void
skynet_updatetime(void) {
    uint64_t cp = gettime(); // 获取当前时间
    if(cp < TI->current_point) {
        skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point);
        TI->current_point = cp;
    } else if (cp != TI->current_point) {
        uint32_t diff = (uint32_t)(cp - TI->current_point);
        TI->current_point = cp;
        TI->current += diff;
        int i;
        for (i=0;i<diff;i++) {
            timer_update(TI); // 逐单位更新时间
        }
    }
}
  • 功能:驱动定时器更新,这个函数需要由系统定期调用(通常在主循环中),用于驱动定时器的更新

9. timer_update(T)

static void 
timer_update(struct timer *T) {
    SPIN_LOCK(T);

    // try to dispatch timeout 0 (rare condition)
    timer_execute(T);

    // shift time first, and then dispatch timer message
    timer_shift(T);

    timer_execute(T);

    SPIN_UNLOCK(T);
}

功能:更新时间轮,包括执行当前时间的定时器和时间轮的移位操作

  • 先执行当前时间槽的定时器(timer_execute)
  • 然后调用timer_shift推进时间轮
  • 再次执行新的当前时间槽的定时器(因为移位操作可能将一些定时器移到了near数组的当前槽)
  • 定时器更新核心:
    在这里插入图片描述

10. skynet_now()

uint64_t 
skynet_now(void) {
    return TI->current;
}
  • 功能:获取当前时间(单位:10ms)
  • 返回值:从启动开始的10ms计数

11. skynet_starttime()

uint32_t
skynet_starttime(void) {
    return TI->starttime;
}
  • 功能:获取系统启动时间戳(秒)

12. skynet_thread_time()

#define NANOSEC 1000000000
#define MICROSEC 1000000

uint64_t
skynet_thread_time(void) {
    struct timespec ti;
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ti);

    return (uint64_t)ti.tv_sec * MICROSEC + (uint64_t)ti.tv_nsec / (NANOSEC / MICROSEC);
}
  • 功能:获取线程CPU时间(微秒级)
  • 用途:性能分析

时间轮设计优势

1. 五级时间轮结构

  • 近期轮(256槽):0-255个单位(2.56秒)
    • 第一层:256 * 64 = 16384 个单位(即16384*10ms≈163.84s)
    • 第二层:16384 * 64 = 1048576 个单位(约10485.76s≈2.9小时)
    • 第三层:1048576 * 64 = 67108864 个单位(约671088.64s≈7.77天)
    • 第四层:67108864 * 64 = 4294967296 个单位(约42949672.96s≈497天)

2. 锁优化设计

  • 使用细粒度自旋锁(非互斥锁)
  • 锁只保护时间轮操作
  • 事件派发时不持锁

3. 分层管理策略

  • 近期事件:直接存储在near数组
  • 远期事件:存储在分层时间轮
  • 时间推进时自动降级迁移

4. 常数级操作效率

  • 添加/删除定时器:直接哈希定位,无需遍历
  • 触发处理:仅处理当前槽位链表

5. 低内存开销

  • 链表节点内存复用
  • 固定大小的数组结构

6. 高精度与长周期平衡

  • 10ms基础时间单位
  • 支持长达数年的定时
  • 自动管理定时器迁移

7. 无遍历设计

  • 传统方案:每次扫描所有定时器(O(n))
  • 本方案:仅处理到期槽位(O(1))

8. 时间跳变容错

  • 检测时间回退:if(cp < TI->current_point)
  • 自动校正时间基准

工作流程示意图

在这里插入图片描述

Skynet 定时器业务使用注意事项与潜在隐患

使用注意事项

1. 时间单位精度

  • 定时器使用 10ms(1/100秒) 作为基础时间单位
  • 业务层设置时间参数时需换算:
-- Lua层设置1秒定时器
skynet.timeout(100, callback) -- 100个单位 = 1000ms
  • 无法设置小于10ms的定时器

2. 定时器不可取消

  • 框架未提供取消接口,业务层需自行处理:
local active_timers = {}

function set_timer(id, time, cb)
    local session = skynet.timeout(time, function()
        if active_timers[id] then
            cb()
            active_timers[id] = nil
        end
    end)
    active_timers[id] = session
end

function cancel_timer(id)
    active_timers[id] = nil
end

3. 消息处理顺序

  • 定时器消息类型为 PTYPE_RESPONSE
  • 需在消息处理中区分普通响应和定时触发:
function on_message(msg, sz, session, msg_type)
    if msg_type == skynet.PTYPE_RESPONSE then
        -- 定时器触发处理
    else
        -- 其他消息处理
    end
end

4. 时间同步问题

  • 使用 skynet.now() 获取框架时间而非系统时间
  • 跨节点时间需自行同步:
local start = skynet.starttime()  -- 系统启动UTC时间
local offset = start + skynet.now()/100  -- 当前UTC秒数

潜在隐患与解决方案

1. 定时器风暴(危险指数:★★★★)

  • 场景:大量定时器在同一时间点触发
  • 表现:
    • 消息队列瞬间暴涨
    • 服务消息处理延迟增加
    • 可能触发消息队列溢出
  • 解决方案:
    – 添加随机偏移分散触发
function safe_timeout(time, cb)
    local jitter = math.random(1, 20) -- ±200ms随机抖动
    skynet.timeout(time + jitter, cb)
end

2. 定时器泄漏(危险指数:★★★)

  • 场景:服务退出时未清理定时器
  • 后果:
    • 框架继续向已销毁服务发送消息
    • 增加无效消息处理开销
  • 解决方案:
function service:exit()
    -- 清理所有关联定时器
    for id, _ in pairs(self.timers) do
        self.timers[id] = nil
    end
    skynet.exit()
end

3. 时间漂移问题(危险指数:★★)

  • 原因:
    • 系统负载高导致 skynet_updatetime() 调用延迟,定时器的触发依赖于skynet_updatetime的调用,该函数通常在Skynet的主线程中调用(在timer线程中),如果系统负载过高,可能导致skynet_updatetime不能及时执行,从而造成定时器触发延迟
    • 时间轮推进滞后
  • 检测方法:
// 在skynet_updatetime中
if (diff > 100) { // 超过1秒未更新
    skynet_error(NULL, "timer lag %d units", diff);
}
  • 缓解措施:
    • 优化高负载业务逻辑
    • 避免单次处理消耗过大CPU时间

4. 跨节点时间不一致

  • 问题:集群环境下各节点时间轮不同步
  • 解决方案:
    – 使用集中式时间服务
function cluster_timeout(node, handle, time, session)
    if node == self_node then
        skynet.timeout(time, ...)
    else
        cluster.send(node, "timer", "set", ...)
    end
end

5. 长时间定时器精度损失

  • 现象:设置在远期(如几天后)的定时器误差增大
  • 原因:高层时间轮迁移时精度降低
  • 建议:
    • 超过1小时的定时器应使用数据库+轮询
    • 或分解为:长期存储 + 短期定时器刷新

6. 最长定时器限制:实际最长定时器约497天(1.36年),超长定时器处理:超过1年的定时器最好有特殊处理

-- 超过1年的定时器应使用持久化存储
function set_long_timer(days, callback)
    local expire_time = os.time() + days*86400
    db:save_timer(expire_time, callback)
    
    -- 设置短期检查器
    skynet.timeout(8640000, function() check_persistent_timers() end)
end

7. 时间回绕处理:32位整数约497天后会溢出归零

// 在timer_shift函数中
if (ct == 0) {
    move_list(T, 3, 0); // 处理时间溢出
}

可以优化点思考(视实际需求而定)

1. 批量定时器处理
– 避免大量独立定时器

local function batch_scheduler()
    while active do
        process_batch()  -- 处理一批任务
        skynet.sleep(200) -- 休眠2秒
    end
end

2. 时间轮缓存优化

// 在timer_update中增加批处理
#define MAX_DISPATCH 50
void timer_execute(struct timer *T) {
    int count = 0;
    while (T->near[idx].head.next && count++ < MAX_DISPATCH) {
        // 每次最多处理50个
    }
}

3. 定时器类型分级

-- 关键定时器:使用独立时间轮
function critical_timeout(time, cb)
    -- 单独实现高精度时间轮
end

典型错误案例

案例:分布式锁超时不同步

-- 节点A设置锁超时
lock_timeout = skynet.timeout(300, release_lock)

-- 节点B检查超时
if os.time() > lock.expire then -- 错误!使用了系统时间
    take_lock()
end

正确做法:

local lock = {
    expire = skynet.now() + 300, -- 使用框架时间
    node = skynet.self_node()
}

-- 检查时
if skynet.now() > lock.expire then
    take_lock()
end