libco源码剖析(2)-协程生命周期与协程调度
前言
本文会继续介绍libco协程的生命周期以及核心调度函数。
协程生命周期
协程的切换在(1)中已经提到了,这里会介绍协程的启动、挂起、释放。
- 协程启动
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 找到当前运行的协程, 从数组最后一位拿出当前运行的协程,如果目前没有协程,那就是主线程
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 如果当前协程还没有开始运行,为其构建上下文
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co, 0 );
co->cStart = 1;
}
// 将指定协程放入线程的协程队列末尾
env->pCallStack[ env->iCallStackSize++ ] = co;
// 将当前运行的上下文保存到lpCurrRoutine中,同时将协程co的上下文替换进去
// 执行完这一句,当前的运行环境就被替换为 co 了
co_swap( lpCurrRoutine, co );
}
resume 一个协程,这可能是第一次启动该协程,也可以是要准备重新运行挂起的协程。我们可以认为在 libco 里面协程只有两种状态,即 running 和 pending。当创建一个协程并调用 resume 之后便进入了 running 状态,之后协 程可能通过 yield 让出 CPU,这就进入了 pending 状态。不断在这两个状态间循环往复。
A 协程调用 co_resume(B) 启动了 B 协程,本质上是一种特殊的过程调用关系,A 调用 B 进入了 B 过程内部,这是一种串行执行的关系。co_resume() 调用后进入了被调协程执行控制流,那么 co_resume() 函数本身何时返回?这就要等被调协程主动让出 CPU 了。
协程挂起
在非对称协程理论,yield 与 resume 是个相对的操作。A 协程 resume 启动了 B 协 程,那么只有当 B 协程执行 yield 操作时才会返回到 A 协程。函数 co_swap() 会执行被调协程的代码。只有被调协程 yield 让出 CPU,调用者协程的 co_swap() 函数才能返回到原点,即返回到原来 co_resume() 内的位置。
在被调协程要让出 CPU 时,会将它的 stCoRoutine_t 从 pCallStack 弹出,“栈指针”iCallStackSize 减 1,然后 co_swap() 切换 CPU 上下文到原来 被挂起的调用者协程恢复执行。这里“被挂起的调用者协程”,即是调用者 co_resume() 中切换 CPU 上下文被挂起的那个协程。下面我们来看一下 co_yield_env() 函数代码:
/* * * 主动将当前运行的协程挂起,并恢复到上一层的协程 * * @param env 协程管理器 */ void co_yield_env( stCoRoutineEnv_t *env ) { // 这里直接取了iCallStackSize - 2,那么万一icallstacksize < 2呢? // 所以这里实际上有个约束,就是co_yield之前必须先co_resume, 这样就不会造成这个问题了 // last就是 找到上次调用co_resume(curr)的协程 stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ]; // 当前栈 stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ]; env->iCallStackSize--; // 把上下文当前的存储到curr中,并切换成last的上下文 co_swap( curr, last); }
注意到这个函数为什么叫 co_yield_env 而 不是 co_yield 呢?这个也很简单。我们知道 co_resume 是有明确目的对象的,而且可以 通过 resume 将 CPU 交给任意协程。但 yield 则不一样,你只能 yield 给当前协程的调用 者。而当前协程的调用者,即最初 resume 当前协程的协程,是保存在 stCoRoutineEnv_t 的 pCallStack 中的。因此你只能 yield 给“env”,yield 给调用者协程;而不能随意 yield 给任意协程。
libco 提供了一个 co_yield(stCoRoutine_t*) 的函数。看起来你似乎可以将 CPU 让给任意协程。实际上并非如此:
void co_yield( stCoRoutine_t *co ) { co_yield_env( co->env ); }
使用的较多的是另外一个函数— co_yield_ct(),其实本质上作用都是一样的:
void co_yield_ct() { co_yield_env( co_get_curr_thread_env() ); }
协程释放
void co_free( stCoRoutine_t *co ) { if (!co->cIsShareStack) { free(co->stack_mem->stack_buffer); free(co->stack_mem); } free( co ); } void co_release( stCoRoutine_t *co ) { co_free( co ); }
协程调度
相关结构体:
// 自己管理的epoll结构体
struct stCoEpoll_t
{
int iEpollFd; // epoll的id
static const int _EPOLL_SIZE = 1024 * 10;
struct stTimeout_t *pTimeout; // 超时管理器
struct stTimeoutItemLink_t *pstTimeoutList; // 目前已超时的事件,仅仅作为中转使用,最后会合并到active上
struct stTimeoutItemLink_t *pstActiveList; // 正在处理的事件
co_epoll_res *result;
};
struct co_epoll_res
{
int size;
struct epoll_event *events;
struct kevent *eventlist;
};
下面函数是libco的核心调度
在此处调度三种事件:
-
- 被hook的io事件,该io事件是通过co_poll_inner注册进来的
-
- 超时事件
-
- 用户主动使用poll的事件
主要是使用epoll_wait来监测事件发生,收集超时事件,然后依次切换到事件所对应的协程进行执行,如此反复。
/*
* @param ctx epoll管理器
* @param pfn 每轮事件循环的最后会调用该函数
* @param arg pfn的参数
*/
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
// 最大超时时间设置为 1 ms
// 所以最长1ms,epoll_wait就会被唤醒
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
// 处理active事件
for(int i=0;i<ret;i++)
{
// 取出本次epoll响应的事件所对应的stTimeoutItem_t
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
// 如果定义了预处理函数,则首先进行预处理。
// 如果是co_poll_inner/co_poll或者是被hook的函数,则这个函数是OnPollPreparePfn
if( item->pfnPrepare )
{
item->pfnPrepare( item,result->events[i], active );
}
else
{
// 否则将其加到active的链表中
AddTail( active,item );
}
}
// 获取当前时刻
unsigned long long now = GetTickMS();
// 以当前时间为超时截止点
// 取出所有的超时事件,放入timeout 链表中
TakeAllTimeout( ctx->pTimeout, now, timeout );
// 遍历所有的项,将bTimeout置为true
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
// 将timeout合并到active上面
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
while( lp )
{
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if( lp->pfnProcess )
{
/*
处理该事件,默认为OnPollProcessEvent
在OnPollProcessEvent中
会使用co_resume恢复协程
协程会回到co_poll_inner里面
*/
lp->pfnProcess( lp );
}
lp = active->head;
}
// 每轮事件循环的最后调用该函数
if( pfn )
{
if( -1 == pfn( arg ) )
{
break;
}
}
}
}
参考资料
https://github.com/chenyahui/AnnotatedCode/tree/master/libco