libco源码剖析(2)-协程生命周期与协程调度

发布于:2022-12-21 ⋅ 阅读:(197) ⋅ 点赞:(0)

libco源码剖析(2)-协程生命周期与协程调度

前言

libco源码剖析(1) - 共享栈与协程的创建

本文会继续介绍libco协程的生命周期以及核心调度函数。

协程生命周期

协程的切换在(1)中已经提到了,这里会介绍协程的启动、挂起、释放。

  • 协程启动
void co_resume( stCoRoutine_t *co )
{
	stCoRoutineEnv_t *env = co->env;

	// 找到当前运行的协程, 从数组最后一位拿出当前运行的协程,如果目前没有协程,那就是主线程
	stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];

	if( !co->cStart )
	{
		// 如果当前协程还没有开始运行,为其构建上下文
		coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co, 0 );
		co->cStart = 1;
	}

	// 将指定协程放入线程的协程队列末尾
	env->pCallStack[ env->iCallStackSize++ ] = co;
	
	// 将当前运行的上下文保存到lpCurrRoutine中,同时将协程co的上下文替换进去
	// 执行完这一句,当前的运行环境就被替换为 co 了
	co_swap( lpCurrRoutine, co );
}

resume 一个协程,这可能是第一次启动该协程,也可以是要准备重新运行挂起的协程。我们可以认为在 libco 里面协程只有两种状态,即 running 和 pending。当创建一个协程并调用 resume 之后便进入了 running 状态,之后协 程可能通过 yield 让出 CPU,这就进入了 pending 状态。不断在这两个状态间循环往复。

A 协程调用 co_resume(B) 启动了 B 协程,本质上是一种特殊的过程调用关系,A 调用 B 进入了 B 过程内部,这是一种串行执行的关系。co_resume() 调用后进入了被调协程执行控制流,那么 co_resume() 函数本身何时返回?这就要等被调协程主动让出 CPU 了。

协程状态转换

  • 协程挂起

    在非对称协程理论,yield 与 resume 是个相对的操作。A 协程 resume 启动了 B 协 程,那么只有当 B 协程执行 yield 操作时才会返回到 A 协程。函数 co_swap() 会执行被调协程的代码。只有被调协程 yield 让出 CPU,调用者协程的 co_swap() 函数才能返回到原点,即返回到原来 co_resume() 内的位置。

    在被调协程要让出 CPU 时,会将它的 stCoRoutine_t 从 pCallStack 弹出,“栈指针”iCallStackSize 减 1,然后 co_swap() 切换 CPU 上下文到原来 被挂起的调用者协程恢复执行。这里“被挂起的调用者协程”,即是调用者 co_resume() 中切换 CPU 上下文被挂起的那个协程。下面我们来看一下 co_yield_env() 函数代码:

    /*
    *
    * 主动将当前运行的协程挂起,并恢复到上一层的协程
    *
    * @param env 协程管理器 
    */
    void co_yield_env( stCoRoutineEnv_t *env )
    {
    	// 这里直接取了iCallStackSize - 2,那么万一icallstacksize < 2呢?
    	// 所以这里实际上有个约束,就是co_yield之前必须先co_resume, 这样就不会造成这个问题了
    
    	// last就是 找到上次调用co_resume(curr)的协程
    	stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
    
    	// 当前栈
    	stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
    
    	env->iCallStackSize--;
    
    	// 把上下文当前的存储到curr中,并切换成last的上下文
    	co_swap( curr, last);
    }
    

    注意到这个函数为什么叫 co_yield_env 而 不是 co_yield 呢?这个也很简单。我们知道 co_resume 是有明确目的对象的,而且可以 通过 resume 将 CPU 交给任意协程。但 yield 则不一样,你只能 yield 给当前协程的调用 者。而当前协程的调用者,即最初 resume 当前协程的协程,是保存在 stCoRoutineEnv_t 的 pCallStack 中的。因此你只能 yield 给“env”,yield 给调用者协程;而不能随意 yield 给任意协程。

    libco 提供了一个 co_yield(stCoRoutine_t*) 的函数。看起来你似乎可以将 CPU 让给任意协程。实际上并非如此:

    void co_yield( stCoRoutine_t *co )
    {
    	co_yield_env( co->env );
    }
    

    使用的较多的是另外一个函数— co_yield_ct(),其实本质上作用都是一样的:

    void co_yield_ct()
    {
    	co_yield_env( co_get_curr_thread_env() );
    }
    
  • 协程释放

    void co_free( stCoRoutine_t *co )
    {
        if (!co->cIsShareStack) 
        {    
            free(co->stack_mem->stack_buffer);
            free(co->stack_mem);
        }   
        free( co );
    }
    
    void co_release( stCoRoutine_t *co )
    {
        co_free( co );
    }
    

协程调度

相关结构体:

// 自己管理的epoll结构体
struct stCoEpoll_t
{
	int iEpollFd;	// epoll的id

	static const int _EPOLL_SIZE = 1024 * 10;

	struct stTimeout_t *pTimeout;  // 超时管理器

	struct stTimeoutItemLink_t *pstTimeoutList; // 目前已超时的事件,仅仅作为中转使用,最后会合并到active上

	struct stTimeoutItemLink_t *pstActiveList; // 正在处理的事件

	co_epoll_res *result; 
};

struct co_epoll_res
{
	int size;
	struct epoll_event *events;
	struct kevent *eventlist;
};

下面函数是libco的核心调度

在此处调度三种事件:

    1. 被hook的io事件,该io事件是通过co_poll_inner注册进来的
    1. 超时事件
    1. 用户主动使用poll的事件

主要是使用epoll_wait来监测事件发生,收集超时事件,然后依次切换到事件所对应的协程进行执行,如此反复。

/*
* @param ctx epoll管理器
* @param pfn 每轮事件循环的最后会调用该函数
* @param arg pfn的参数
*/
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
	if( !ctx->result )
	{
		ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
	}

	co_epoll_res *result = ctx->result;

	for(;;)
	{
		// 最大超时时间设置为 1 ms
		// 所以最长1ms,epoll_wait就会被唤醒
		int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );

		stTimeoutItemLink_t *active = (ctx->pstActiveList);
		stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

		memset( timeout,0,sizeof(stTimeoutItemLink_t) );

		// 处理active事件
		for(int i=0;i<ret;i++)
		{
			// 取出本次epoll响应的事件所对应的stTimeoutItem_t
			stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
			
			// 如果定义了预处理函数,则首先进行预处理。
			// 如果是co_poll_inner/co_poll或者是被hook的函数,则这个函数是OnPollPreparePfn
			if( item->pfnPrepare )
			{
				item->pfnPrepare( item,result->events[i], active );
			}
			else
			{
				// 否则将其加到active的链表中
				AddTail( active,item );
			}
		}

		// 获取当前时刻
		unsigned long long now = GetTickMS();

		// 以当前时间为超时截止点
		// 取出所有的超时事件,放入timeout 链表中
		TakeAllTimeout( ctx->pTimeout, now, timeout );

		// 遍历所有的项,将bTimeout置为true
		stTimeoutItem_t *lp = timeout->head;
		while( lp )
		{
			//printf("raise timeout %p\n",lp);
			lp->bTimeout = true;
			lp = lp->pNext;
		}

		// 将timeout合并到active上面
		Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

		lp = active->head;
		while( lp )
		{
			PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
			if( lp->pfnProcess )
			{
				 /*
				   处理该事件,默认为OnPollProcessEvent
				   在OnPollProcessEvent中
				   会使用co_resume恢复协程

				   协程会回到co_poll_inner里面
				 */
				lp->pfnProcess( lp );
			}

			lp = active->head;
		}

		// 每轮事件循环的最后调用该函数
		if( pfn )
		{
			if( -1 == pfn( arg ) )
			{
				break;
			}
		}

	}
}

参考资料

https://github.com/chenyahui/AnnotatedCode/tree/master/libco

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到