FreeSWITCH 1.10 源码阅读(1)-服务启动及 event_socket 模块工作原理

发布于:2022-12-15 ⋅ 阅读:(941) ⋅ 点赞:(0)

1. 前言

FreeSWITCH 是一个开源的电话软交换平台,使用广泛,功能强大。本文基于 FreeSWITCH 1.10 版本,读者如有兴趣可以自行点击链接进入 github 下载源码。下图为 FreeSWITCH 服务启动及 Event Socket 模块运行工作的源代码时序,下文将对源码流程进行代码分析

在这里插入图片描述

2. 源码分析

2.1 服务的启动

2.1.1 FreeSWITCH 核心的启动流程

  1. FreeSWITCH 是用 C 语言写的,服务启动的入口为 switch.c#main() 函数。这个函数非常长,不过主要的处理大致分为以下几步:

    1. 调用 switch_core.c#switch_core_set_globals() 函数设置重要的运行时文件夹路径,比如模块文件的文件夹等
    2. 调用 switch_core.c#switch_core_init_and_modload() 函数初始化系统并加载模块,这部分是重点
    3. 调用 switch_core.c#switch_core_runtime_loop() 函数开启主线程循环保持系统运行
    int main(int argc, char *argv[])
    {
     
     ......
    
     switch_core_set_globals();
    
     pid = getpid();
    
     memset(pid_buffer, 0, sizeof(pid_buffer));
     switch_snprintf(pid_path, sizeof(pid_path), "%s%s%s", SWITCH_GLOBAL_dirs.run_dir, SWITCH_PATH_SEPARATOR, pfile);
     switch_snprintf(pid_buffer, sizeof(pid_buffer), "%d", pid);
     pid_len = strlen(pid_buffer);
    
     apr_pool_create(&pool, NULL);
    
     ......
    
     if (switch_core_init_and_modload(flags, nc ? SWITCH_FALSE : SWITCH_TRUE, &err) != SWITCH_STATUS_SUCCESS) {
     	fprintf(stderr, "Cannot Initialize [%s]\n", err);
     	return 255;
     }
    
     ......
    
     switch_core_runtime_loop(nc);
    
     ......
    
    }
    
  2. switch_core.c#switch_core_init_and_modload() 函数是初始化重要组件和模块的入口,不过本文关注的主要是以下几个函数调用,FreeSWITCH 核心启动的主要逻辑其实就是拉起关键组件及模块

    1. switch_core.c#switch_core_init() 函数负责初始化 FreeSWITCH 核心的重要组件
    2. switch_loadable_module.c#switch_loadable_module_init() 函数将加载 FreeSWITCH 核心之外的可插拔模块
    SWITCH_DECLARE(switch_status_t) switch_core_init_and_modload(switch_core_flag_t flags, switch_bool_t console, const char **err)
    {
     ......
    
     if (switch_core_init(flags, console, err) != SWITCH_STATUS_SUCCESS) {
     	return SWITCH_STATUS_GENERR;
     }
    
     if (runtime.runlevel > 1) {
     	/* one per customer */
     	return SWITCH_STATUS_SUCCESS;
     }
    
     runtime.runlevel++;
     runtime.events_use_dispatch = 1;
    
     switch_core_set_signal_handlers();
     switch_load_network_lists(SWITCH_FALSE);
    
     switch_msrp_init();
    
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Bringing up environment.\n");
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Loading Modules.\n");
     if (switch_loadable_module_init(SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) {
     	*err = "Cannot load modules";
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Error: %s\n", *err);
     	return SWITCH_STATUS_GENERR;
     }
    
     switch_load_network_lists(SWITCH_FALSE);
    
     switch_load_core_config("post_load_switch.conf");
    
     switch_core_set_signal_handlers();
    
     ......
    
    }
    
    
  3. 此时回到本节步骤1第3步switch_core.c#switch_core_runtime_loop() 函数其实就是根据前后台启动方式来开启主线程循环保持系统运行,至此宏观上 FreeSWITCH 核心启动的流程基本结束

    如果 FreeSWITCH 以前台方式启动,则调用 switch_console.c#switch_console_loop() 函数启动控制台循环,接收控制台输入的命令并处理

    SWITCH_DECLARE(void) switch_core_runtime_loop(int bg)
    {
    #ifdef WIN32
     HANDLE shutdown_event;
     char path[256] = "";
    #endif
     if (bg) {
    #ifdef WIN32
     	switch_snprintf(path, sizeof(path), "Global\\Freeswitch.%d", getpid());
     	shutdown_event = CreateEvent(NULL, FALSE, FALSE, path);
     	if (shutdown_event) {
     		WaitForSingleObject(shutdown_event, INFINITE);
     	}
    #else
     	while (runtime.running) {
     		switch_yield(1000000);
     	}
    #endif
     } else {
     	/* wait for console input */
     	switch_console_loop();
     }
    }
    

2.1.2 事件分发组件的初始化

  1. 2.1.1节步骤2第1步 中,笔者提到 switch_core.c#switch_core_init() 函数会初始化 FreeSWITCH 核心的重要组件和运行时属性,其大致源码如下,本文涉及的主要是以下几部分:

    1. 调用函数 switch_core_session.c#switch_core_session_init() 初始化 Session 管理器并创建对应的事件队列,这部分本文暂不深入
    2. 调用 switch_event.c#switch_event_init() 函数初始化事件分发组件,包括启动事件分发线程及队列创建等
    3. 执行 switch_core.c#switch_load_core_config() 函数解析 FreeSWITCH 的核心配置文件 switch.conf.xml,将配置属性加载进内存中
    4. 执行 switch_scheduler.c#switch_scheduler_task_thread_start() 函数启动内部定时任务线程,后续可以添加心跳通知等定时任务到内部列表,等待触发执行
    SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switch_bool_t console, const char **err)
    {
     switch_uuid_t uuid;
     char guess_ip[256];
     int mask = 0;
     struct in_addr in;
    
    
     if (runtime.runlevel > 0) {
     	/* one per customer */
     	return SWITCH_STATUS_SUCCESS;
     }
    
     memset(&runtime, 0, sizeof(runtime));
     gethostname(runtime.hostname, sizeof(runtime.hostname));
    
     runtime.shutdown_cause = SWITCH_CAUSE_SYSTEM_SHUTDOWN;
     runtime.max_db_handles = 50;
     runtime.db_handle_timeout = 5000000;
     runtime.event_heartbeat_interval = 20;
     
     ......
    
     if (!runtime.cpu_count) runtime.cpu_count = 1;
    
     if (sqlite3_initialize() != SQLITE_OK) {
     	*err = "FATAL ERROR! Could not initialize SQLite\n";
     	return SWITCH_STATUS_MEMERR;
     }
    
     /* INIT APR and Create the pool context */
     if (apr_initialize() != SWITCH_STATUS_SUCCESS) {
     	*err = "FATAL ERROR! Could not initialize APR\n";
     	return SWITCH_STATUS_MEMERR;
     }
    
     if (!(runtime.memory_pool = switch_core_memory_init())) {
     	*err = "FATAL ERROR! Could not allocate memory pool\n";
     	return SWITCH_STATUS_MEMERR;
     }
    
     ......
    
     switch_core_session_init(runtime.memory_pool);
     
     ......
    
     switch_event_init(runtime.memory_pool);
    
     ......
    
     switch_load_core_config("switch.conf");
    
     ......
    
     switch_scheduler_task_thread_start();
    
     ......
    
     switch_scheduler_add_task(switch_epoch_time_now(NULL), heartbeat_callback, "heartbeat", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL);
    
     switch_scheduler_add_task(switch_epoch_time_now(NULL), check_ip_callback, "check_ip", "core", 0, NULL, SSHF_NONE | SSHF_NO_DEL | SSHF_OWN_THREAD);
    
     ......
    
     return SWITCH_STATUS_SUCCESS;
    }
    
    
  2. switch_event.c#switch_event_init() 函数会根据当前机器的核心数计算确定事件分发线程的最大数量MAX_DISPATCH,核心流程是调用 switch_event.c#check_dispatch() 函数创建并启动事件分发线程

    SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
    {
    
     /* don't need any more dispatch threads than we have CPU's*/
     MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1;
     if (MAX_DISPATCH < 2) {
     	MAX_DISPATCH = 2;
     }
    
     switch_assert(pool != NULL);
     THRUNTIME_POOL = RUNTIME_POOL = pool;
     switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL);
     switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
     switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
     switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
     switch_mutex_init(&CUSTOM_HASH_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
     switch_core_hash_init(&CUSTOM_HASH);
    
     if (switch_core_test_flag(SCF_MINIMAL)) {
     	return SWITCH_STATUS_SUCCESS;
     }
    
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n");
    
     switch_core_hash_init(&event_channel_manager.lahash);
     switch_mutex_init(&event_channel_manager.lamutex, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
    
     switch_thread_rwlock_create(&event_channel_manager.rwlock, RUNTIME_POOL);
     switch_core_hash_init(&event_channel_manager.hash);
     switch_core_hash_init(&event_channel_manager.perm_hash);
     event_channel_manager.ID = 1;
    
     switch_mutex_lock(EVENT_QUEUE_MUTEX);
     SYSTEM_RUNNING = -1;
     switch_mutex_unlock(EVENT_QUEUE_MUTEX);
    
     //switch_threadattr_create(&thd_attr, pool);
     switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), NULL, AF_INET);
     switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), NULL, AF_INET6);
    
    
    #ifdef SWITCH_EVENT_RECYCLE
     switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
     switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL);
    #endif
    
     check_dispatch();
    
     switch_mutex_lock(EVENT_QUEUE_MUTEX);
     SYSTEM_RUNNING = 1;
     switch_mutex_unlock(EVENT_QUEUE_MUTEX);
    
     return SWITCH_STATUS_SUCCESS;
    }
    
  3. switch_event.c#check_dispatch() 函数会创建事件分发队列EVENT_DISPATCH_QUEUE,并设置队列的容量为DISPATCH_QUEUE_LEN * MAX_DISPATCH,最后通过 switch_event.c#switch_event_launch_dispatch_threads() 函数创建 1 个事件分发线程

    static void check_dispatch(void)
    {
     if (!EVENT_DISPATCH_QUEUE) {
     	switch_mutex_lock(BLOCK);
    
     	if (!EVENT_DISPATCH_QUEUE) {
     		switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, THRUNTIME_POOL);
     		switch_event_launch_dispatch_threads(1);
    
     		while (!THREAD_COUNT) {
     			switch_cond_next();
     		}
     	}
     	switch_mutex_unlock(BLOCK);
     }
    }
    
    
  4. switch_event.c#switch_event_launch_dispatch_threads() 函数会调用库函数创建事件分发线程,并将 switch_event.c#switch_event_dispatch_thread() 函数设置为线程任务。需注意此处会维护一个事件分发线程的数组 EVENT_DISPATCH_QUEUE_THREADS,如果数组下标上已经有线程存在了,则不会重复创建,至此事件分发组件的初始化基本结束

    SWITCH_DECLARE(void) switch_event_launch_dispatch_threads(uint32_t max)
    {
     switch_threadattr_t *thd_attr;
     uint32_t index = 0;
     int launched = 0;
     uint32_t sanity = 200;
    
     switch_memory_pool_t *pool = RUNTIME_POOL;
    
     check_dispatch();
    
     if (max > MAX_DISPATCH) {
     	return;
     }
    
     if (max < SOFT_MAX_DISPATCH) {
     	return;
     }
    
     for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) {
     	if (EVENT_DISPATCH_QUEUE_THREADS[index]) {
     		continue;
     	}
    
     	switch_threadattr_create(&thd_attr, pool);
     	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
     	switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
     	switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool);
     	while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
    
     	if (index == 1) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create event dispatch thread %d\n", index);
     	} else {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Create additional event dispatch thread %d\n", index);
     	}
     	launched++;
     }
    
     SOFT_MAX_DISPATCH = index;
    }
    

2.1.3 模块 mod_event_socket 的加载运行

  1. 2.1.1节步骤2第2步 中,执行 switch_loadable_module.c#switch_loadable_module_init() 函数将会加载配置文件中的配置的模块,其关键点如下:

    1. 模块配置由三个配置文件 pre_load_modules.conf.xml、modules.conf.xm、post_load_modules.conf.xml 共同组成,三者主要是加载顺序的区别,因为有些模块相互之间可能存在依赖,其他并无不同
    2. 在加载完前置模块之后,先调用 sswitch_core.c#witch_core_sqldb_init() 函数初始化 sql 数据库
    3. 加载模块的核心是 switch_loadable_module.c#switch_loadable_module_load_module_ex() 函数,这个函数会将指定文件夹下的模块文件加载进内存
    4. 模块加载进内存后,会调用 switch_loadable_module.c#switch_loadable_module_runtime() 函数为每一个模块新起线程执行其定义的 runtime 函数
    SWITCH_DECLARE(switch_status_t) switch_loadable_module_init(switch_bool_t autoload)
    {
    
     apr_finfo_t finfo = { 0 };
     apr_dir_t *module_dir_handle = NULL;
     apr_int32_t finfo_flags = APR_FINFO_DIRENT | APR_FINFO_TYPE | APR_FINFO_NAME;
     char *precf = "pre_load_modules.conf";
     char *cf = "modules.conf";
     char *pcf = "post_load_modules.conf";
     switch_xml_t cfg, xml;
     unsigned char all = 0;
     unsigned int count = 0;
     const char *err;
     switch_hash_t *event_hash;
     switch_hash_index_t *hi;
     void *hash_val;
     switch_event_t *event;
    
    
    #ifdef WIN32
     const char *ext = ".dll";
     const char *EXT = ".DLL";
    #elif defined (MACOSX) || defined (DARWIN)
     const char *ext = ".dylib";
     const char *EXT = ".DYLIB";
    #else
     const char *ext = ".so";
     const char *EXT = ".SO";
    #endif
    
     memset(&loadable_modules, 0, sizeof(loadable_modules));
     switch_core_new_memory_pool(&loadable_modules.pool);
    
    
    #ifdef WIN32
     switch_loadable_module_path_init();
    #endif
    
     switch_core_hash_init(&loadable_modules.module_hash);
     switch_core_hash_init_nocase(&loadable_modules.endpoint_hash);
     switch_core_hash_init_nocase(&loadable_modules.codec_hash);
     switch_core_hash_init_nocase(&loadable_modules.timer_hash);
     switch_core_hash_init_nocase(&loadable_modules.application_hash);
     switch_core_hash_init_nocase(&loadable_modules.chat_application_hash);
     switch_core_hash_init_nocase(&loadable_modules.api_hash);
     switch_core_hash_init_nocase(&loadable_modules.json_api_hash);
     switch_core_hash_init(&loadable_modules.file_hash);
     switch_core_hash_init_nocase(&loadable_modules.speech_hash);
     switch_core_hash_init_nocase(&loadable_modules.asr_hash);
     switch_core_hash_init_nocase(&loadable_modules.directory_hash);
     switch_core_hash_init_nocase(&loadable_modules.chat_hash);
     switch_core_hash_init_nocase(&loadable_modules.say_hash);
     switch_core_hash_init_nocase(&loadable_modules.management_hash);
     switch_core_hash_init_nocase(&loadable_modules.limit_hash);
     switch_core_hash_init_nocase(&loadable_modules.database_hash);
     switch_core_hash_init_nocase(&loadable_modules.dialplan_hash);
     switch_core_hash_init(&loadable_modules.secondary_recover_hash);
     switch_mutex_init(&loadable_modules.mutex, SWITCH_MUTEX_NESTED, loadable_modules.pool);
    
     if (!autoload) return SWITCH_STATUS_SUCCESS;
     
     /*
     	switch_core_sqldb_init() is not yet ready and is executed after starting modules from pre_load_modules.conf
     	Modules loading procedure generates events used by sqldb.
     	This is why we should hold those events (storing in the event_hash) not firing them until sqldb is ready.
     */
     switch_core_hash_init(&event_hash);
    
     /* 
     	Pre-load core modules.
     	Do not pre-load modules which may use databases,
     	use appropriate section.
     */
     switch_loadable_module_load_module_ex("", "CORE_SOFTTIMER_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
     switch_loadable_module_load_module_ex("", "CORE_PCM_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
     switch_loadable_module_load_module_ex("", "CORE_SPEEX_MODULE", SWITCH_FALSE, SWITCH_FALSE, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, event_hash);
    
     /*
     	Loading pre-load modules.
     	Database modules must be loaded here.
     */
     if ((xml = switch_xml_open_cfg(precf, &cfg, NULL))) {
     	switch_xml_t mods, ld;
     	if ((mods = switch_xml_child(cfg, "modules"))) {
     		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
     			switch_bool_t global = SWITCH_FALSE;
     			const char *val = switch_xml_attr_soft(ld, "module");
     			const char *path = switch_xml_attr_soft(ld, "path");
     			const char *critical = switch_xml_attr_soft(ld, "critical");
     			const char *sglobal = switch_xml_attr_soft(ld, "global");
    
     			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
     				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
     				continue;
     			}
     			global = switch_true(sglobal);
    
     			if (path && zstr(path)) {
     				path = SWITCH_GLOBAL_dirs.mod_dir;
     			}
     			if (switch_loadable_module_load_module_ex((char *)path, (char *)val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_PRELOAD, event_hash) == SWITCH_STATUS_GENERR) {
     				if (critical && switch_true(critical)) {
     					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
     					switch_core_hash_destroy(&event_hash);
    
     					abort();
     				}
     			}
     			count++;
     		}
     	}
     	switch_xml_free(xml);
    
     }
     else {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", precf);
     }
    
     if (switch_core_sqldb_init(&err) != SWITCH_STATUS_SUCCESS)
     {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Loading modules interrupted. [Error: %s]\n", err);
     	switch_core_hash_destroy(&event_hash);
     	return SWITCH_STATUS_GENERR;
     }
    
     /* sqldb is ready. Fire holding events! */
     for (hi = switch_core_hash_first(event_hash); hi; hi = switch_core_hash_next(&hi)) {
     	switch_core_hash_this(hi, NULL, NULL, &hash_val);
     	event = (switch_event_t *)hash_val;
     	switch_event_fire(&event);
     }
    
     switch_core_hash_destroy(&event_hash);
    
     /*
     	To perevent locking.
     	Core modules which may use databases should be pre-loaded here
     	(databases are loaded already).
     */
    #ifdef SWITCH_HAVE_YUV
    #ifdef SWITCH_HAVE_VPX
     switch_loadable_module_load_module("", "CORE_VPX_MODULE", SWITCH_FALSE, &err);
    #endif
    #endif
    
     /* Loading common modules */
     if ((xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
     	switch_xml_t mods, ld;
     	if ((mods = switch_xml_child(cfg, "modules"))) {
     		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
     			switch_bool_t global = SWITCH_FALSE;
     			const char *val = switch_xml_attr_soft(ld, "module");
     			const char *path = switch_xml_attr_soft(ld, "path");
     			const char *critical = switch_xml_attr_soft(ld, "critical");
     			const char *sglobal = switch_xml_attr_soft(ld, "global");
     			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
     				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
     				continue;
     			}
     			global = switch_true(sglobal);
    
     			if (path && zstr(path)) {
     				path = SWITCH_GLOBAL_dirs.mod_dir;
     			}
     			if (switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_COMMON, NULL) == SWITCH_STATUS_GENERR) {
     				if (critical && switch_true(critical)) {
     					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to load critical module '%s', abort()\n", val);
     					abort();
     				}
     			}
     			count++;
     		}
     	}
     	switch_xml_free(xml);
    
     } else {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", cf);
     }
    
     if ((xml = switch_xml_open_cfg(pcf, &cfg, NULL))) {
     	switch_xml_t mods, ld;
    
     	if ((mods = switch_xml_child(cfg, "modules"))) {
     		for (ld = switch_xml_child(mods, "load"); ld; ld = ld->next) {
     			switch_bool_t global = SWITCH_FALSE;
     			const char *val = switch_xml_attr_soft(ld, "module");
     			const char *path = switch_xml_attr_soft(ld, "path");
     			const char *sglobal = switch_xml_attr_soft(ld, "global");
     			if (zstr(val) || (strchr(val, '.') && !strstr(val, ext) && !strstr(val, EXT))) {
     				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Invalid extension for %s\n", val);
     				continue;
     			}
     			global = switch_true(sglobal);
    
     			if (path && zstr(path)) {
     				path = SWITCH_GLOBAL_dirs.mod_dir;
     			}
     			switch_loadable_module_load_module_ex(path, val, SWITCH_FALSE, global, &err, SWITCH_LOADABLE_MODULE_TYPE_POSTLOAD, NULL);
     			count++;
     		}
     	}
     	switch_xml_free(xml);
    
     } else {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "open of %s failed\n", pcf);
     }
    
     if (!count) {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "No modules loaded, assuming 'load all'\n");
     	all = 1;
     }
    
     if (all) {
     	if (apr_dir_open(&module_dir_handle, SWITCH_GLOBAL_dirs.mod_dir, loadable_modules.pool) != APR_SUCCESS) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Can't open directory: %s\n", SWITCH_GLOBAL_dirs.mod_dir);
     		return SWITCH_STATUS_GENERR;
     	}
    
     	while (apr_dir_read(&finfo, finfo_flags, module_dir_handle) == APR_SUCCESS) {
     		const char *fname = finfo.fname;
    
     		if (finfo.filetype != APR_REG) {
     			continue;
     		}
    
     		if (!fname) {
     			fname = finfo.name;
     		}
    
     		if (!fname) {
     			continue;
     		}
    
     		if (zstr(fname) || (!strstr(fname, ext) && !strstr(fname, EXT))) {
     			continue;
     		}
    
     		switch_loadable_module_load_module(SWITCH_GLOBAL_dirs.mod_dir, fname, SWITCH_FALSE, &err);
     	}
     	apr_dir_close(module_dir_handle);
     }
    
     switch_loadable_module_runtime();
    
     memset(&chat_globals, 0, sizeof(chat_globals));
     chat_globals.running = 1;
     chat_globals.pool = loadable_modules.pool;
     switch_mutex_init(&chat_globals.mutex, SWITCH_MUTEX_NESTED, chat_globals.pool);
    
     chat_thread_start(1);
    
     return SWITCH_STATUS_SUCCESS;
    }
    
    
  2. switch_loadable_module.c#switch_loadable_module_load_module_ex() 函数的核心处理如下:

    1. 首先根据模块名称计算对应的模块的动态库文件名称,然后通过 switch_core_hash.c#switch_core_hash_find_locked() 方法查找哈希表 loadable_modules.module_hash 中是否存在该文件名,如果不存在说明模块还没有加载过,需要加载
    2. switch_loadable_module.c#switch_loadable_module_load_file() 函数负责加载动态库文件
    3. 如果加载动态库文件成功,则调用 switch_loadable_module.c#switch_loadable_module_process() 将加载完成的模块插入哈希表 loadable_modules.module_hash,并生成模块加载的事件将其推入到 FreeSWITCH 事件组件中
    static switch_status_t switch_loadable_module_load_module_ex(const char *dir, const char *fname, switch_bool_t runtime, switch_bool_t global, const char **err, switch_loadable_module_type_t type, switch_hash_t *event_hash)
    {
     switch_size_t len = 0;
     char *path;
     char *file, *dot;
     switch_loadable_module_t *new_module = NULL;
     switch_status_t status = SWITCH_STATUS_SUCCESS;
    
    #ifdef WIN32
     const char *ext = ".dll";
    #else
     const char *ext = ".so";
    #endif
    
     *err = "";
    
     if ((file = switch_core_strdup(loadable_modules.pool, fname)) == 0) {
     	*err = "allocation error";
     	return SWITCH_STATUS_FALSE;
     }
    
     if (switch_is_file_path(file)) {
     	path = switch_core_strdup(loadable_modules.pool, file);
     	file = (char *) switch_cut_path(file);
     	if ((dot = strchr(file, '.'))) {
     		*dot = '\0';
     	}
     } else {
     	if ((dot = strchr(file, '.'))) {
     		*dot = '\0';
     	}
     	len = strlen(switch_str_nil(dir));
     	len += strlen(file);
     	len += 8;
     	path = (char *) switch_core_alloc(loadable_modules.pool, len);
     	switch_snprintf(path, len, "%s%s%s%s", switch_str_nil(dir), SWITCH_PATH_SEPARATOR, file, ext);
     }
    
    
     if (switch_core_hash_find_locked(loadable_modules.module_hash, file, loadable_modules.mutex)) {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Module %s Already Loaded!\n", file);
     	*err = "Module already loaded";
     	status = SWITCH_STATUS_FALSE;
     } else if ((status = switch_loadable_module_load_file(path, file, global, &new_module)) == SWITCH_STATUS_SUCCESS) {
     	new_module->type = type;
    
     	if ((status = switch_loadable_module_process(file, new_module, event_hash)) == SWITCH_STATUS_SUCCESS && runtime) {
     		if (new_module->switch_module_runtime) {
     			new_module->thread = switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool);
     		}
     	} else if (status != SWITCH_STATUS_SUCCESS) {
     		*err = "module load routine returned an error";
     	}
     } else {
     	*err = "module load file routine returned an error";
     }
    
     return status;
    
    }
    
    
  3. switch_loadable_module.c#switch_loadable_module_load_file() 函数的核心处理步骤如下:

    1. 首先调用 switch_dso.c#switch_dso_open() 函数打开库文件,随后调用 switch_dso.c#sswitch_dso_data_sym() 函数获取动态库中的符号表,这部分涉及到平台库函数,不做深入讨论
    2. 如果符号表加载成功,则将其赋给函数表 interface_struct_handle,最终将其赋给函数表结构体 mod_interface_functions
    3. 如果当前加载的模块定义了 load 函数,则需要通过函数指针load_func_ptr回调执行,以 Event Socket 模块为例,这里将回调到 mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) 函数
    4. 以上处理完毕,生成结构体 switch_loadable_module_t 实例 module 保存模块中的函数等信息,最终将其添加到传入函数的指针 new_module 末尾
    static switch_status_t switch_loadable_module_load_file(char *path, char *filename, switch_bool_t global, switch_loadable_module_t **new_module)
    {
     switch_loadable_module_t *module = NULL;
     switch_dso_lib_t dso = NULL;
     apr_status_t status = SWITCH_STATUS_SUCCESS;
     switch_loadable_module_function_table_t *interface_struct_handle = NULL;
     switch_loadable_module_function_table_t *mod_interface_functions = NULL;
     char *struct_name = NULL;
     switch_module_load_t load_func_ptr = NULL;
     int loading = 1;
     switch_loadable_module_interface_t *module_interface = NULL;
     char *derr = NULL;
     const char *err = NULL;
     switch_memory_pool_t *pool = NULL;
     switch_bool_t load_global = global;
    
     switch_assert(path != NULL);
    
     switch_core_new_memory_pool(&pool);
     *new_module = NULL;
    
     struct_name = switch_core_sprintf(pool, "%s_module_interface", filename);
    
    #ifdef WIN32
     dso = switch_dso_open("FreeSwitch.dll", load_global, &derr);
    #elif defined (MACOSX) || defined(DARWIN)
     {
     	char *lib_path = switch_mprintf("%s/libfreeswitch.dylib", SWITCH_GLOBAL_dirs.lib_dir);
     	dso = switch_dso_open(lib_path, load_global, &derr);
     	switch_safe_free(lib_path);
     }
    #else
     dso = switch_dso_open(NULL, load_global, &derr);
    #endif
     if (!derr && dso) {
     	interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
     }
    
     switch_safe_free(derr);
    
     if (!interface_struct_handle) {
     	if (dso) switch_dso_destroy(&dso);
     	dso = switch_dso_open(path, load_global, &derr);
     }
    
     while (loading) {
     	if (derr) {
     		err = derr;
     		break;
     	}
    
     	if (!interface_struct_handle) {
     		interface_struct_handle = switch_dso_data_sym(dso, struct_name, &derr);
     	}
    
     	if (derr) {
     		err = derr;
     		break;
     	}
    
     	if (interface_struct_handle && interface_struct_handle->switch_api_version != SWITCH_API_VERSION) {
     		err = "Trying to load an out of date module, please rebuild the module.";
     		break;
     	}
    
     	if (!load_global && interface_struct_handle && switch_test_flag(interface_struct_handle, SMODF_GLOBAL_SYMBOLS)) {
     		load_global = SWITCH_TRUE;
     		switch_dso_destroy(&dso);
     		interface_struct_handle = NULL;
     		dso = switch_dso_open(path, load_global, &derr);
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loading module with global namespace at request of module\n");
     		continue;
     	}
    
     	if (interface_struct_handle) {
     		mod_interface_functions = interface_struct_handle;
     		load_func_ptr = mod_interface_functions->load;
     	}
    
     	if (load_func_ptr == NULL) {
     		err = "Cannot locate symbol 'switch_module_load' please make sure this is a valid module.";
     		break;
     	}
    
     	status = load_func_ptr(&module_interface, pool);
    
     	if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_NOUNLOAD) {
     		err = "Module load routine returned an error";
     		module_interface = NULL;
     		break;
     	}
    
     	if (!module_interface) {
     		err = "Module failed to initialize its module_interface. Is this a valid module?";
     		break;
     	}
    
     	if ((module = switch_core_alloc(pool, sizeof(switch_loadable_module_t))) == 0) {
     		abort();
     	}
    
     	if (status == SWITCH_STATUS_NOUNLOAD) {
     		module->perm++;
     	}
    
     	loading = 0;
     }
    
    
     if (err) {
    
     	if (dso) {
     		switch_dso_destroy(&dso);
     	}
    
     	if (pool) {
     		switch_core_destroy_memory_pool(&pool);
     	}
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error Loading module %s\n**%s**\n", path, err);
     	switch_safe_free(derr);
     	return SWITCH_STATUS_GENERR;
     }
    
     module->pool = pool;
     module->filename = switch_core_strdup(module->pool, path);
     module->module_interface = module_interface;
     module->switch_module_load = load_func_ptr;
    
     if (mod_interface_functions) {
     	module->switch_module_shutdown = mod_interface_functions->shutdown;
     	module->switch_module_runtime = mod_interface_functions->runtime;
     }
    
     module->lib = dso;
    
     *new_module = module;
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Successfully Loaded [%s]\n", module_interface->module_name);
    
     switch_core_set_signal_handlers();
    
     return SWITCH_STATUS_SUCCESS;
    
    }
    
  4. mod_event_socket.c#SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) 函数源码中重要的处理如下:

    1. 调用 switch_event.c#switch_event_bind_removable() 注册事件监听,这里是监听 SWITCH_EVENT_ALL 所有事件,并将函数 mod_event_socket.c#event_hanlder() 作为回调传入
    2. 通过宏调用 switch_loadable_module.h#SWITCH_ADD_APP()switch_event.c#socket_function() 函数作为一个名为 socket 的 Application 注册到 FreeSwitch 核心中供系统调用,也就是外联 Outbound 模式主动连接外部的入口
    SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
    {
     switch_application_interface_t *app_interface;
     switch_api_interface_t *api_interface;
    
     memset(&globals, 0, sizeof(globals));
    
     switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
    
     memset(&listen_list, 0, sizeof(listen_list));
     switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool);
    
     if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
     	return SWITCH_STATUS_GENERR;
     }
    
     switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
    
     /* connect my internal structure to the blank pointer passed to me */
     *module_interface = switch_loadable_module_create_module_interface(pool, modname);
     SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
     SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "<web data>");
    
     /* indicate that the module should continue to be loaded */
     return SWITCH_STATUS_SUCCESS;
    }
    
  5. switch_event.c#switch_event_bind_removable() 函数的处理比较直观,可以看到核心就是将调用方传入的回调等信息封装为一个 switch_event_node_t 结构体实例,并将其加入到监听节点数组 EVENT_NODES

    SWITCH_DECLARE(switch_status_t) switch_event_bind_removable(const char *id, switch_event_types_t event, const char *subclass_name,
     														switch_event_callback_t callback, void *user_data, switch_event_node_t **node)
    {
     switch_event_node_t *event_node;
     switch_event_subclass_t *subclass = NULL;
    
     switch_assert(BLOCK != NULL);
     switch_assert(RUNTIME_POOL != NULL);
    
     if (node) {
     	*node = NULL;
     }
    
     if (subclass_name) {
     	switch_mutex_lock(CUSTOM_HASH_MUTEX);
    
     	if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) {
     		switch_event_reserve_subclass_detailed(id, subclass_name);
     		subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name);
     		subclass->bind = 1;
     	}
    
     	switch_mutex_unlock(CUSTOM_HASH_MUTEX);
    
     	if (!subclass) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not reserve subclass. '%s'\n", subclass_name);
     		return SWITCH_STATUS_FALSE;
     	}
     }
    
     if (event <= SWITCH_EVENT_ALL) {
     	switch_zmalloc(event_node, sizeof(*event_node));
     	switch_thread_rwlock_wrlock(RWLOCK);
     	switch_mutex_lock(BLOCK);
     	/* <LOCKED> ----------------------------------------------- */
     	event_node->id = DUP(id);
     	event_node->event_id = event;
     	if (subclass_name) {
     		event_node->subclass_name = DUP(subclass_name);
     	}
     	event_node->callback = callback;
     	event_node->user_data = user_data;
    
     	if (EVENT_NODES[event]) {
     		event_node->next = EVENT_NODES[event];
     	}
    
     	EVENT_NODES[event] = event_node;
     	switch_mutex_unlock(BLOCK);
     	switch_thread_rwlock_unlock(RWLOCK);
     	/* </LOCKED> ----------------------------------------------- */
    
     	if (node) {
     		*node = event_node;
     	}
    
     	return SWITCH_STATUS_SUCCESS;
     }
    
     return SWITCH_STATUS_MEMERR;
    }
    
    
  6. 此时模块的加载流程基本结束,回到本节步骤1第4步 switch_loadable_module.c#switch_loadable_module_runtime() 函数调用,可以看到这个函数的主要逻辑是判断加载的模块是否有定义 runtime 函数,有的话就调用 switch_core.c#switch_core_launch_thread() 函数新建线程,将 switch_loadable_module.c#switch_loadable_module_exec() 函数作为线程任务传入

    static void switch_loadable_module_runtime(void)
    {
     switch_hash_index_t *hi;
     void *val;
     switch_loadable_module_t *module;
    
     switch_mutex_lock(loadable_modules.mutex);
     for (hi = switch_core_hash_first(loadable_modules.module_hash); hi; hi = switch_core_hash_next(&hi)) {
     	switch_core_hash_this(hi, NULL, NULL, &val);
     	module = (switch_loadable_module_t *) val;
    
     	if (module->switch_module_runtime) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting runtime thread for %s\n", module->module_interface->module_name);
     		module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool);
     	}
     }
     switch_mutex_unlock(loadable_modules.mutex);
    }
    
  7. switch_loadable_module.c#switch_loadable_module_exec() 函数逻辑清晰,主要逻辑就是执行模块的 runtime 函数,以 Event Socket 模块为例,这里将回调到 mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数,至此加载进内存的 Event Socket 模块已经运行起来,其运行原理将在下节分析

    static void *SWITCH_THREAD_FUNC switch_loadable_module_exec(switch_thread_t *thread, void *obj)
    {
    
     switch_status_t status = SWITCH_STATUS_SUCCESS;
     switch_core_thread_session_t *ts = obj;
     switch_loadable_module_t *module = ts->objs[0];
     int restarts;
    
     switch_assert(thread != NULL);
     switch_assert(module != NULL);
    
     for (restarts = 0; status != SWITCH_STATUS_TERM && !module->shutting_down; restarts++) {
     	status = module->switch_module_runtime();
     }
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Thread ended for %s\n", module->module_interface->module_name);
    
     if (ts->pool) {
     	switch_memory_pool_t *pool = ts->pool;
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying Pool for %s\n", module->module_interface->module_name);
     	switch_core_destroy_memory_pool(&pool);
     }
     switch_thread_exit(thread, 0);
     return NULL;
    }
    

2.2 mod_event_socket 的运行原理

FreeSWITCH 中的 EventSocket 有如下图两种交互模式,从功能上来说二者几乎没有差别,主要区别是 FreeSWITCH 在连接中担任的角色不同

在这里插入图片描述

2.2.1 事件订阅命令的处理

  1. mod_event_socket.c#SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) 函数是 Event Socket 模块运行的关键,其核心处理如下:

    1. 首先调用 mod_event_socket.c#config() 函数读取配置文件的属性,将其加载到内存中
    2. 根据配置属性,绑定监听本机端口,通过 switch_apr.c#switch_socket_accept() 调用底层接口等待远程 Inbound 连接
    3. 一旦接收到远程 Inbound 连接,则将其封装为 listener_t 结构体实例 listener 并添加到 socket 监听列表中
    4. 最后调用 mod_event_socket.c#launch_listener_thread() 函数创建一个线程单独处理这条 Inbound 连接上的数据交互
    SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
    {
     switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
     switch_status_t rv;
     switch_sockaddr_t *sa;
     switch_socket_t *inbound_socket = NULL;
     listener_t *listener;
     uint32_t x = 0;
     uint32_t errs = 0;
    
     if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
     	return SWITCH_STATUS_TERM;
     }
    
     config();
    
     while (!prefs.done) {
     	rv = switch_sockaddr_info_get(&sa, prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool);
     	if (rv) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot get information about IP address %s\n", prefs.ip);
     		goto fail;
     	}
     	rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool);
     	if (rv)
     		goto sock_fail;
     	rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1);
     	if (rv)
     		goto sock_fail;
    #ifdef WIN32
     	/* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
     	if (switch_sockaddr_get_family(sa) == AF_INET6) {
     		rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_IPV6_V6ONLY, 0);
     		if (rv) goto sock_fail;
     	}
    #endif
     	rv = switch_socket_bind(listen_list.sock, sa);
     	if (rv)
     		goto sock_fail;
     	rv = switch_socket_listen(listen_list.sock, 5);
     	if (rv)
     		goto sock_fail;
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", prefs.ip, prefs.port);
    
     	if (prefs.nat_map) {
     		switch_nat_add_mapping(prefs.port, SWITCH_NAT_TCP, NULL, SWITCH_FALSE);
     	}
    
     	break;
       sock_fail:
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port);
     	if (prefs.stop_on_bind_error) {
     		prefs.done = 1;
     		goto fail;
     	}
     	switch_yield(100000);
     }
    
     listen_list.ready = 1;
    
    
     while (!prefs.done) {
     	if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
     		goto fail;
     	}
    
    
     	if ((rv = switch_socket_accept(&inbound_socket, listen_list.sock, listener_pool))) {
     		if (prefs.done) {
     			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n");
     			goto end;
     		} else {
     			/* I wish we could use strerror_r here but its not defined everywhere =/ */
     			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
     			if (++errs > 100) {
     				goto end;
     			}
     		}
     	} else {
     		errs = 0;
     	}
    
    
     	if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
     		break;
     	}
    
     	switch_thread_rwlock_create(&listener->rwlock, listener_pool);
     	switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener_pool);
     	switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener_pool);
    
     	listener->sock = inbound_socket;
     	listener->pool = listener_pool;
     	listener_pool = NULL;
     	listener->format = EVENT_FORMAT_PLAIN;
     	switch_set_flag(listener, LFLAG_FULL);
     	switch_set_flag(listener, LFLAG_ALLOW_LOG);
    
     	switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
     	switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
    
     	switch_core_hash_init(&listener->event_hash);
     	switch_socket_create_pollset(&listener->pollfd, listener->sock, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool);
    
    
    
     	if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
     		switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
     		if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
     			if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
     				continue;
     		}
     	}
    
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initilizing connection\n");
     	close_socket(&listener->sock);
     	expire_listener(&listener);
    
     }
    
     end:
    
     close_socket(&listen_list.sock);
    
     if (prefs.nat_map && switch_nat_get_type()) {
     	switch_nat_del_mapping(prefs.port, SWITCH_NAT_TCP);
     }
    
     if (pool) {
     	switch_core_destroy_memory_pool(&pool);
     }
    
     if (listener_pool) {
     	switch_core_destroy_memory_pool(&listener_pool);
     }
    
    
     for (x = 0; x < prefs.acl_count; x++) {
     	switch_safe_free(prefs.acl[x]);
     }
    
    fail:
     return SWITCH_STATUS_TERM;
    }
    
    
  2. mod_event_socket.c#config() 函数逻辑比较简洁,就是读取配置文件 event_socket.conf.xml 属性,并将其保存到 prefs 结构体中

    static int config(void)
    {
     char *cf = "event_socket.conf";
     switch_xml_t cfg, xml, settings, param;
    
     memset(&prefs, 0, sizeof(prefs));
    
     if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
     	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
     } else {
     	if ((settings = switch_xml_child(cfg, "settings"))) {
     		for (param = switch_xml_child(settings, "param"); param; param = param->next) {
     			char *var = (char *) switch_xml_attr_soft(param, "name");
     			char *val = (char *) switch_xml_attr_soft(param, "value");
    
     			if (!strcmp(var, "listen-ip")) {
     				set_pref_ip(val);
     			} else if (!strcmp(var, "debug")) {
     				globals.debug = atoi(val);
     			} else if (!strcmp(var, "nat-map")) {
     				if (switch_true(val) && switch_nat_get_type()) {
     					prefs.nat_map = 1;
     				}
     			} else if (!strcmp(var, "listen-port")) {
     				prefs.port = (uint16_t) atoi(val);
     			} else if (!strcmp(var, "password")) {
     				set_pref_pass(val);
     			} else if (!strcasecmp(var, "apply-inbound-acl") && ! zstr(val)) {
     				if (prefs.acl_count < MAX_ACL) {
     					prefs.acl[prefs.acl_count++] = strdup(val);
     				} else {
     					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Max acl records of %d reached\n", MAX_ACL);
     				}
     			} else if (!strcasecmp(var, "stop-on-bind-error")) {
     				prefs.stop_on_bind_error = switch_true(val) ? 1 : 0;
     			}
     		}
     	}
     	switch_xml_free(xml);
     }
    
     if (zstr(prefs.ip)) {
     	set_pref_ip("127.0.0.1");
     }
    
     if (zstr(prefs.password)) {
     	set_pref_pass("ClueCon");
     }
    
     if (!prefs.nat_map) {
     	prefs.nat_map = 0;
     }
    
     if (!prefs.acl_count) {
     	prefs.acl[prefs.acl_count++] = strdup("loopback.auto");
     }
    
     if (prefs.nat_map) {
     	prefs.nat_map = 0;
     }
    
     if (!prefs.port) {
     	prefs.port = 8021;
     }
    
     return 0;
    }
    
    
  3. 远程 Inbound 连接到来时,mod_event_socket.c#launch_listener_thread() 函数将被调用,可以看到此处核心处理是新建线程,并将 mod_event_socket.c#listener_run() 函数作为线程任务执行

    static switch_status_t launch_listener_thread(listener_t *listener)
    {
     switch_thread_t *thread;
     switch_threadattr_t *thd_attr = NULL;
    
     switch_threadattr_create(&thd_attr, listener->pool);
     switch_threadattr_detach_set(thd_attr, 1);
     switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
     return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
    }
    
  4. mod_event_socket.c#listener_run() 函数源码如下,可以看到其关键的处理分为以下几步:

    1. 首先是尝试对连接进行 acl 检查,实际上只会对 Inbound 连接进行 acl 检查,Outbound 连接跳过这步
    2. 接下来调用 mod_event_socket.c#add_listener() 函数将当前 listener 添加到内部 listen_list 列表
    3. 接着是用户授权认证,同样是只会对 Inbound 连接进行,Outbound 连接默认已经授权。授权完成后,这个连接可以正式开始交互,则在 while 循环中不断调用 mod_event_socket.c#read_packet() 函数读取对端数据
    4. 读取到对端数据后,再调用 mod_event_socket.c#parse_command() 函数解析传输过来的命令并执行
    static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
    {
     listener_t *listener = (listener_t *) obj;
     char buf[1024];
     switch_size_t len;
     switch_status_t status;
     switch_event_t *event;
     char reply[512] = "";
     switch_core_session_t *session = NULL;
     switch_channel_t *channel = NULL;
     switch_event_t *revent = NULL;
     const char *var;
     int locked = 1;
    
     switch_mutex_lock(globals.listener_mutex);
     prefs.threads++;
     switch_mutex_unlock(globals.listener_mutex);
    
     switch_assert(listener != NULL);
    
     if ((session = listener->session)) {
     	if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
     		locked = 0;
     		session = NULL;
     		goto done;
     	}
    
     	listener->lock_acquired = 1;
     }
    
     if (!listener->sock) {
     	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Listener socket is null!\n");
     	switch_clear_flag_locked(listener, LFLAG_RUNNING);
     	goto done;
     }
    
     switch_socket_opt_set(listener->sock, SWITCH_SO_TCP_NODELAY, TRUE);
     switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
    
     if (prefs.acl_count && listener->sa && !zstr(listener->remote_ip)) {
     	uint32_t x = 0;
    
     	for (x = 0; x < prefs.acl_count; x++) {
     		if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) {
     			const char message[] = "Access Denied, go away.\n";
     			int mlen = (int)strlen(message);
    
     			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "IP %s Rejected by acl \"%s\"\n", listener->remote_ip,
     							  prefs.acl[x]);
    
     			switch_snprintf(buf, sizeof(buf), "Content-Type: text/rude-rejection\nContent-Length: %d\n\n", mlen);
     			len = strlen(buf);
     			switch_socket_send(listener->sock, buf, &len);
     			len = mlen;
     			switch_socket_send(listener->sock, message, &len);
     			goto done;
     		}
     	}
     }
    
     if (globals.debug > 0) {
     	if (zstr(listener->remote_ip)) {
     		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open\n");
     	} else {
     		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Open from %s:%d\n", listener->remote_ip,
     						  listener->remote_port);
     	}
     }
    
     switch_socket_opt_set(listener->sock, SWITCH_SO_NONBLOCK, TRUE);
     switch_set_flag_locked(listener, LFLAG_RUNNING);
     add_listener(listener);
    
     if (session && switch_test_flag(listener, LFLAG_AUTHED)) {
     	switch_event_t *ievent = NULL;
    
     	switch_set_flag_locked(listener, LFLAG_SESSION);
     	status = read_packet(listener, &ievent, 25);
    
     	if (status != SWITCH_STATUS_SUCCESS || !ievent) {
     		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Socket Error!\n");
     		switch_clear_flag_locked(listener, LFLAG_RUNNING);
     		goto done;
     	}
    
    
     	if (parse_command(listener, &ievent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
     		switch_clear_flag_locked(listener, LFLAG_RUNNING);
     		goto done;
     	}
    
    
     } else {
     	switch_snprintf(buf, sizeof(buf), "Content-Type: auth/request\n\n");
    
     	len = strlen(buf);
     	switch_socket_send(listener->sock, buf, &len);
    
     	while (!switch_test_flag(listener, LFLAG_AUTHED)) {
     		status = read_packet(listener, &event, 25);
     		if (status != SWITCH_STATUS_SUCCESS) {
     			goto done;
     		}
     		if (!event) {
     			continue;
     		}
    
     		if (parse_command(listener, &event, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
     			switch_clear_flag_locked(listener, LFLAG_RUNNING);
     			goto done;
     		}
     		if (*reply != '\0') {
     			if (*reply == '~') {
     				switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
     			} else {
     				switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
     			}
     			len = strlen(buf);
     			switch_socket_send(listener->sock, buf, &len);
     		}
     		break;
     	}
     }
    
     while (!prefs.done && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) {
     	len = sizeof(buf);
     	memset(buf, 0, len);
     	status = read_packet(listener, &revent, 0);
    
     	if (status != SWITCH_STATUS_SUCCESS) {
     		break;
     	}
    
     	if (!revent) {
     		continue;
     	}
    
     	if (parse_command(listener, &revent, reply, sizeof(reply)) != SWITCH_STATUS_SUCCESS) {
     		switch_clear_flag_locked(listener, LFLAG_RUNNING);
     		break;
     	}
    
     	if (revent) {
     		switch_event_destroy(&revent);
     	}
    
     	if (*reply != '\0') {
     		if (*reply == '~') {
     			switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\n%s", reply + 1);
     		} else {
     			switch_snprintf(buf, sizeof(buf), "Content-Type: command/reply\nReply-Text: %s\n\n", reply);
     		}
     		len = strlen(buf);
     		switch_socket_send(listener->sock, buf, &len);
     	}
    
     }
    
     done:
    
     if (revent) {
     	switch_event_destroy(&revent);
     }
    
     remove_listener(listener);
    
     if (globals.debug > 0) {
     	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
     }
    
     switch_thread_rwlock_wrlock(listener->rwlock);
     flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE);
     switch_mutex_lock(listener->filter_mutex);
     if (listener->filters) {
     	switch_event_destroy(&listener->filters);
     }
     switch_mutex_unlock(listener->filter_mutex);
    
     if (listener->session && locked) {
     	channel = switch_core_session_get_channel(listener->session);
     }
    
     if (channel && switch_channel_get_state(channel) != CS_HIBERNATE &&
     	!switch_channel_test_flag(channel, CF_REDIRECT) && !switch_channel_test_flag(channel, CF_TRANSFER) && !switch_channel_test_flag(channel, CF_RESET) &&
     	(switch_test_flag(listener, LFLAG_RESUME) || ((var = switch_channel_get_variable(channel, "socket_resume")) && switch_true(var)))) {
     	switch_channel_set_state(channel, CS_RESET);
     }
    
     if (listener->sock) {
     	send_disconnect(listener, "Disconnected, goodbye.\nSee you at ClueCon! http://www.cluecon.com/\n");
     	close_socket(&listener->sock);
     }
    
     switch_thread_rwlock_unlock(listener->rwlock);
    
     if (globals.debug > 0) {
     	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Connection Closed\n");
     }
    
     switch_core_hash_destroy(&listener->event_hash);
    
     if (listener->allowed_event_hash) {
     	switch_core_hash_destroy(&listener->allowed_event_hash);
     }
    
     if (listener->allowed_api_hash) {
     	switch_core_hash_destroy(&listener->allowed_api_hash);
     }
    
     if (listener->session) {
     	if (locked) {
     		switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
     	}
     	switch_clear_flag_locked(listener, LFLAG_SESSION);
     	if (locked) {
     		switch_core_session_rwunlock(listener->session);
     	}
     } else if (listener->pool) {
     	switch_memory_pool_t *pool = listener->pool;
     	switch_core_destroy_memory_pool(&pool);
     }
    
     switch_mutex_lock(globals.listener_mutex);
     prefs.threads--;
     switch_mutex_unlock(globals.listener_mutex);
    
     listener->finished = 1;
    
     return NULL;
    }
    
  5. mod_event_socket.c#read_packet() 函数体比较琐碎,简单来说关键处理是以下几步:

    1. 通过 switch_apr.c#switch_socket_recv() 函数调用底层接口读取 socket 数据,有数据则将其处理后封装到 switch_event_t 结构体实例 event 中,跳出循环
    2. 如果没有读到数据,并且当前 listener 对应的远端已经订阅了事件,则将 listener->event_queue 队列中的事件出队,按照订阅方的订阅格式通过 switch_apr.c#switch_socket_send() 函数发送给对端
    static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
    {
     
     ......
    
     while (listener->sock && !prefs.done) {
     	uint8_t do_sleep = 1;
     	mlen = 1;
    
     	if (bytes == buf_len - 1) {
     		char *tmp;
     		int pos;
    
     		pos = (int)(ptr - mbuf);
     		buf_len += block_len;
     		tmp = realloc(mbuf, buf_len);
     		switch_assert(tmp);
     		mbuf = tmp;
     		memset(mbuf + bytes, 0, buf_len - bytes);
     		ptr = (mbuf + pos);
    
     	}
    
     	status = switch_socket_recv(listener->sock, ptr, &mlen);
    
     	if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
     		switch_goto_status(SWITCH_STATUS_FALSE, end);
     	}
    
     	if (mlen) {
     		bytes += mlen;
     		do_sleep = 0;
    
     		if (*mbuf == '\r' || *mbuf == '\n') {	/* bah */
     			ptr = mbuf;
     			mbuf[0] = '\0';
     			bytes = 0;
     			continue;
     		}
    
     		if (*ptr == '\n') {
     			crcount++;
     		} else if (*ptr != '\r') {
     			crcount = 0;
     		}
     		ptr++;
    
     		if (bytes >= max_len) {
     			crcount = 2;
     		}
    
     		if (crcount == 2) {
     			char *next;
     			char *cur = mbuf;
     			while (cur) {
     				if ((next = strchr(cur, '\r')) || (next = strchr(cur, '\n'))) {
     					while (*next == '\r' || *next == '\n') {
     						next++;
     					}
     				}
     				count++;
     				if (count == 1) {
     					switch_event_create(event, SWITCH_EVENT_CLONE);
     					switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Command", mbuf);
     				} else if (cur) {
     					char *var, *val;
     					var = cur;
     					strip_cr(var);
     					if (!zstr(var)) {
     						if ((val = strchr(var, ':'))) {
     							*val++ = '\0';
     							while (*val == ' ') {
     								val++;
     							}
     						}
     						if (val) {
     							switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, var, val);
     							if (!strcasecmp(var, "content-length")) {
     								clen = atoi(val);
    
     								if (clen > 0) {
     									char *body;
     									char *p;
    
     									switch_zmalloc(body, clen + 1);
    
     									p = body;
     									while (clen > 0) {
     										mlen = clen;
    
     										status = switch_socket_recv(listener->sock, p, &mlen);
    
     										if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) {
     											free(body);
     											switch_goto_status(SWITCH_STATUS_FALSE, end);
     										}
    
     										/*
     										   if (channel && !switch_channel_ready(channel)) {
     										   status = SWITCH_STATUS_FALSE;
     										   break;
     										   }
     										 */
    
     										clen -= (int) mlen;
     										p += mlen;
     									}
    
     									switch_event_add_body(*event, "%s", body);
     									free(body);
     								}
     							}
     						}
     					}
     				}
    
     				cur = next;
     			}
     			break;
     		}
     	}
     	if (!*mbuf) {
         
             ......
    
     		if (listener->session) {
     			switch_channel_t *chan = switch_core_session_get_channel(listener->session);
     			if (switch_channel_get_state(chan) < CS_HANGUP && switch_channel_test_flag(chan, CF_DIVERT_EVENTS)) {
     				switch_event_t *e = NULL;
     				while (switch_core_session_dequeue_event(listener->session, &e, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
     					if (switch_queue_trypush(listener->event_queue, e) != SWITCH_STATUS_SUCCESS) {
     						switch_core_session_queue_event(listener->session, &e);
     						break;
     					}
     				}
     			}
     		}
    
     		if (switch_test_flag(listener, LFLAG_EVENTS)) {
     			while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
     				char hbuf[512];
     				switch_event_t *pevent = (switch_event_t *) pop;
     				char *etype;
    
     				do_sleep = 0;
     				if (listener->format == EVENT_FORMAT_PLAIN) {
     					etype = "plain";
     					switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
     				} else if (listener->format == EVENT_FORMAT_JSON) {
     					etype = "json";
     					switch_event_serialize_json(pevent, &listener->ebuf);
     				} else {
     					switch_xml_t xml;
     					etype = "xml";
    
     					if ((xml = switch_event_xmlize(pevent, SWITCH_VA_NONE))) {
     						listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
     						switch_xml_free(xml);
     					} else {
     						switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_ERROR, "XML ERROR!\n");
     						goto endloop;
     					}
     				}
    
     				switch_assert(listener->ebuf);
    
     				len = strlen(listener->ebuf);
    
     				switch_snprintf(hbuf, sizeof(hbuf), "Content-Length: %" SWITCH_SSIZE_T_FMT "\n" "Content-Type: text/event-%s\n" "\n", len, etype);
    
     				len = strlen(hbuf);
     				switch_socket_send(listener->sock, hbuf, &len);
    
     				len = strlen(listener->ebuf);
     				switch_socket_send(listener->sock, listener->ebuf, &len);
    
     				switch_safe_free(listener->ebuf);
    
     			  endloop:
    
     				switch_event_destroy(&pevent);
     			}
     		}
     	}
    
     	......
    
    }
    
  6. 此时回到本节步骤4第4步mod_event_socket.c#parse_command() 函数非常长,主要逻辑是各个命令的处理,可以看到其关键处理如下:

    1. 首先调用 switch_event.h#switch_event_get_header() 函数从上一步封装好的 switch_event_t 结构体中获取到命令名称
    2. 命令的处理非常繁冗,此处以 event 订阅事件命令为例解析,该命令的主要处理逻辑是解析命令参数,将订阅的事件记录在 listener->event_list标记数组中并给 listener 添上 LFLAG_EVENTS 标记位,至此 Event Socket 事件订阅处理基本结束
    static switch_status_t parse_command(listener_t *listener, switch_event_t **event, char *reply, uint32_t reply_len)
    {
     switch_status_t status = SWITCH_STATUS_SUCCESS;
     char *cmd = NULL;
     char unload_cheat[] = "api bgapi unload mod_event_socket";
     char reload_cheat[] = "api bgapi reload mod_event_socket";
    
     *reply = '\0';
    
     if (!event || !*event || !(cmd = switch_event_get_header(*event, "command"))) {
     	switch_clear_flag_locked(listener, LFLAG_RUNNING);
     	switch_snprintf(reply, reply_len, "-ERR command parse error.");
     	goto done;
     }
    
     ......
     
     if (!strncasecmp(cmd, "sendevent", 9)) {
         
     ......
     
     } else if (!strncasecmp(cmd, "event", 5)) {
     	char *next, *cur;
     	uint32_t count = 0, key_count = 0;
     	uint8_t custom = 0;
    
     	strip_cr(cmd);
     	cur = cmd + 5;
    
     	if ((cur = strchr(cur, ' '))) {
     		for (cur++; cur; count++) {
     			switch_event_types_t type;
    
     			if ((next = strchr(cur, ' '))) {
     				*next++ = '\0';
     			}
    
     			if (!count) {
     				if (!strcasecmp(cur, "xml")) {
     					listener->format = EVENT_FORMAT_XML;
     					goto end;
     				} else if (!strcasecmp(cur, "plain")) {
     					listener->format = EVENT_FORMAT_PLAIN;
     					goto end;
     				} else if (!strcasecmp(cur, "json")) {
     					listener->format = EVENT_FORMAT_JSON;
     					goto end;
     				}
     			}
    
    
     			if (custom) {
     				if (!listener->allowed_event_hash || switch_core_hash_find(listener->allowed_event_hash, cur)) {
     					switch_core_hash_insert(listener->event_hash, cur, MARKER);
     				} else {
     					switch_snprintf(reply, reply_len, "-ERR permission denied");
     					goto done;
     				}
     			} else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
     				if (switch_test_flag(listener, LFLAG_AUTH_EVENTS) && !listener->allowed_event_list[type] &&
     					!switch_test_flag(listener, LFLAG_ALL_EVENTS_AUTHED)) {
     					switch_snprintf(reply, reply_len, "-ERR permission denied");
     					goto done;
     				}
    
     				key_count++;
     				if (type == SWITCH_EVENT_ALL) {
     					uint32_t x = 0;
     					for (x = 0; x < SWITCH_EVENT_ALL; x++) {
     						listener->event_list[x] = 1;
     					}
    
     					if (!listener->allowed_event_hash) {
     						set_all_custom(listener);
     					} else {
     						set_allowed_custom(listener);
     					}
    
     				}
     				if (type <= SWITCH_EVENT_ALL) {
     					listener->event_list[type] = 1;
     				}
     				if (type == SWITCH_EVENT_CUSTOM) {
     					custom++;
     				}
     			}
    
     		  end:
     			cur = next;
     		}
     	}
    
     	if (!key_count) {
     		switch_snprintf(reply, reply_len, "-ERR no keywords supplied");
     		goto done;
     	}
    
     	if (!switch_test_flag(listener, LFLAG_EVENTS)) {
     		switch_set_flag_locked(listener, LFLAG_EVENTS);
     	}
    
     	switch_snprintf(reply, reply_len, "+OK event listener enabled %s", format2str(listener->format));
    
     }
     
     ......
    
    done:
    
     if (zstr(reply)) {
     	switch_snprintf(reply, reply_len, "-ERR command not found");
     }
    
     done_noreply:
    
     if (event) {
     	switch_event_destroy(event);
     }
    
     return status;
    }
    
    

2.2.2 事件分发的流程

  1. 经过上一节步骤5第2步的分析,我们知道远程连接监听的事件其实都来自于 listner 的内部队列,所以要了解事件的分发流程,关键是要知道队列中的数据是从哪里来。此时回顾2.1.3节步骤4第1步,Event Socket 模块在加载时注册了监听到 FreeSWITCH 的事件组件中,而事件组件的事件来源其实是宏定义 switch_event.h#switch_event_fire() 调用 switch_event.c#switch_event_fire_detailed() 函数。该函数比较简洁,核心处理如下:

    1. 根据 runtime.events_use_dispatch 属性决定外部调用方投递的事件的最终目的地,该属性默认值为1,也就是默认会调用 switch_event.c#switch_event_queue_dispatch_event() 投递到 EVENT_DISPATCH_QUEUE
    2. switch_event.c#switch_event_deliver_thread_pool() 函数执行时会将事件投递到 session_manager.thread_queue 队列,并根据线程池当前空闲状态决定是否创建新线程
    SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, const char *func, int line, switch_event_t **event, void *user_data)
    {
    
     switch_assert(BLOCK != NULL);
     switch_assert(RUNTIME_POOL != NULL);
     switch_assert(EVENT_QUEUE_MUTEX != NULL);
     switch_assert(RUNTIME_POOL != NULL);
    
     if (SYSTEM_RUNNING <= 0) {
     	/* sorry we're closed */
     	switch_event_destroy(event);
     	return SWITCH_STATUS_SUCCESS;
     }
    
     if (user_data) {
     	(*event)->event_user_data = user_data;
     }
    
     if (runtime.events_use_dispatch) {
     	check_dispatch();
    
     	if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
     		switch_event_destroy(event);
     		return SWITCH_STATUS_FALSE;
     	}
     } else {
     	switch_event_deliver_thread_pool(event);
     }
    
     return SWITCH_STATUS_SUCCESS;
    }
    
    
  2. switch_event.c#switch_event_queue_dispatch_event() 函数的核心处理如下:

    1. 计算 DISPATCH_QUEUE_LEN 队列空闲状态,如果队列中数据已经超过阈值,则检查当前事件分发线程数量是否达到最大值,最终决定是否需要新建事件分发线程
    2. 如果需要新建线程,则执行 switch_event.c#switch_event_launch_dispatch_threads() 函数启动新线程
    3. 最后调用函数 switch_apr.c#switch_queue_push() 将事件添加到 DISPATCH_QUEUE_LEN 队列
    static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp)
    {
    
     switch_event_t *event = *eventp;
    
     if (!SYSTEM_RUNNING) {
     	return SWITCH_STATUS_FALSE;
     }
    
     while (event) {
     	int launch = 0;
    
     	switch_mutex_lock(EVENT_QUEUE_MUTEX);
    
     	if (!PENDING && switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
     		if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
     			launch++;
     			PENDING++;
     		}
     	}
    
     	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
    
     	if (launch) {
     		if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
     			switch_event_launch_dispatch_threads(SOFT_MAX_DISPATCH + 1);
     		}
    
     		switch_mutex_lock(EVENT_QUEUE_MUTEX);
     		PENDING--;
     		switch_mutex_unlock(EVENT_QUEUE_MUTEX);
     	}
    
     	*eventp = NULL;
     	switch_queue_push(EVENT_DISPATCH_QUEUE, event);
     	event = NULL;
    
     }
    
     return SWITCH_STATUS_SUCCESS;
    }
    
  3. 事件已经入队,实际对 DISPATCH_QUEUE_LEN 队列进行处理的是在 2.1.2节步骤4 提到的线程任务 switch_event.c#switch_event_dispatch_thread() 中。可以看到这个函数的核心操作是在 for 空循环中将 DISPATCH_QUEUE_LEN 队列的数据轮询出来,随后调用 switch_event.c#switch_event_deliver() 函数进行处理

    static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
    {
     switch_queue_t *queue = (switch_queue_t *) obj;
     int my_id = 0;
    
     switch_mutex_lock(EVENT_QUEUE_MUTEX);
     THREAD_COUNT++;
     DISPATCH_THREAD_COUNT++;
    
     for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) {
     	if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) {
     		break;
     	}
     }
    
     if ( my_id >= MAX_DISPATCH_VAL ) {
     	switch_mutex_unlock(EVENT_QUEUE_MUTEX);
     	return NULL;
     }
    
     EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
     switch_mutex_unlock(EVENT_QUEUE_MUTEX);
    
    
     for (;;) {
     	void *pop = NULL;
     	switch_event_t *event = NULL;
    
     	if (!SYSTEM_RUNNING) {
     		break;
     	}
    
     	if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
     		continue;
     	}
    
     	if (!pop) {
     		break;
     	}
    
     	event = (switch_event_t *) pop;
     	switch_event_deliver(&event);
     	switch_os_yield();
     }
    
    
     switch_mutex_lock(EVENT_QUEUE_MUTEX);
     EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0;
     THREAD_COUNT--;
     DISPATCH_THREAD_COUNT--;
     switch_mutex_unlock(EVENT_QUEUE_MUTEX);
    
     switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id);
     return NULL;
    
    }
    
  4. switch_event.c#switch_event_deliver() 函数非常简洁,核心处理就是遍历 EVENT_NODES 列表,调用注册方的回调函数将事件分发到订阅方。此处结合2.1.3节步骤4第1步,可以知道会调用到 mod_event_socket.c#event_handler() 函数

    SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event)
    {
     switch_event_types_t e;
     switch_event_node_t *node;
    
     if (SYSTEM_RUNNING) {
     	switch_thread_rwlock_rdlock(RWLOCK);
     	for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) {
     		for (node = EVENT_NODES[e]; node; node = node->next) {
     			if (switch_events_match(*event, node)) {
     				(*event)->bind_user_data = node->user_data;
     				node->callback(*event);
     			}
     		}
    
     		if (e == SWITCH_EVENT_ALL) {
     			break;
     		}
     	}
     	switch_thread_rwlock_unlock(RWLOCK);
     }
    
     switch_event_destroy(event);
    }
    
  5. mod_event_socket.c#event_handler() 函数源码比较繁琐,不过核心处理并不复杂,关键如下:

    1. 遍历全局列表 listen_list.listeners,判断当前事件是否当前 listener 订阅的
    2. 是的话再检查 listener 上是否有 filter,有则经过 filter 过滤后最终确定需要发送该事件则将其拷贝一份通过 switch_apr.c#switch_queue_trypush() 函数将事件添加到 listener 私属的事件队列。至此事件已经入队,结合2.2.1节步骤5第2步,事件最终发送给了外部订阅方,事件分发流程的分析基本结束
    static void event_handler(switch_event_t *event)
    {
     switch_event_t *clone = NULL;
     listener_t *l, *lp, *last = NULL;
     time_t now = switch_epoch_time_now(NULL);
     switch_status_t qstatus;
    
     switch_assert(event != NULL);
    
     if (!listen_list.ready) {
     	return;
     }
    
     switch_mutex_lock(globals.listener_mutex);
    
     lp = listen_list.listeners;
    
     while (lp) {
     	int send = 0;
    
     	l = lp;
     	lp = lp->next;
    
     	if (switch_test_flag(l, LFLAG_STATEFUL) && (l->expire_time || (l->timeout && now - l->last_flush > l->timeout))) {
     		if (expire_listener(&l) == SWITCH_STATUS_SUCCESS) {
     			if (last) {
     				last->next = lp;
     			} else {
     				listen_list.listeners = lp;
     			}
     			continue;
     		}
     	}
    
     	if (l->expire_time || !switch_test_flag(l, LFLAG_EVENTS)) {
     		last = l;
     		continue;
     	}
    
     	if (l->event_list[SWITCH_EVENT_ALL]) {
     		send = 1;
     	} else if ((l->event_list[event->event_id])) {
     		if (event->event_id != SWITCH_EVENT_CUSTOM || !event->subclass_name || (switch_core_hash_find(l->event_hash, event->subclass_name))) {
     			send = 1;
     		}
     	}
    
     	if (send) {
     		switch_mutex_lock(l->filter_mutex);
    
     		if (l->filters && l->filters->headers) {
     			switch_event_header_t *hp;
     			const char *hval;
    
     			send = 0;
    
     			for (hp = l->filters->headers; hp; hp = hp->next) {
     				if ((hval = switch_event_get_header(event, hp->name))) {
     					const char *comp_to = hp->value;
     					int pos = 1, cmp = 0;
    
     					while (comp_to && *comp_to) {
     						if (*comp_to == '+') {
     							pos = 1;
     						} else if (*comp_to == '-') {
     							pos = 0;
     						} else if (*comp_to != ' ') {
     							break;
     						}
     						comp_to++;
     					}
    
     					if (send && pos) {
     						continue;
     					}
    
     					if (!comp_to) {
     						continue;
     					}
    
     					if (*hp->value == '/') {
     						switch_regex_t *re = NULL;
     						int ovector[30];
     						cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0]));
     						switch_regex_safe_free(re);
     					} else {
     						cmp = !strcasecmp(hval, comp_to);
     					}
    
     					if (cmp) {
     						if (pos) {
     							send = 1;
     						} else {
     							send = 0;
     							break;
     						}
     					}
     				}
     			}
     		}
    
     		switch_mutex_unlock(l->filter_mutex);
     	}
    
     	if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
     		char *uuid = switch_event_get_header(event, "unique-id");
     		if (!uuid || (l->session && strcmp(uuid, switch_core_session_get_uuid(l->session)))) {
     			send = 0;
     		}
     		if (!strcmp(switch_core_session_get_uuid(l->session), switch_event_get_header_nil(event, "Job-Owner-UUID"))) {
     		    send = 1;
     		}
     	}
    
     	if (send) {
     		if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
     			qstatus = switch_queue_trypush(l->event_queue, clone); 
     			if (qstatus == SWITCH_STATUS_SUCCESS) {
     				if (l->lost_events) {
     					int le = l->lost_events;
     					l->lost_events = 0;
     					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN);
     				}
     			} else {
     				char errbuf[512] = {0};
     				unsigned int qsize = switch_queue_size(l->event_queue);
     				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, 
     						"Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n", 
     						(int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":"");
     				if (++l->lost_events > MAX_MISSED) {
     					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN);
     					kill_listener(l, "killed listener because of lost events\n");
     				}
     				switch_event_destroy(&clone);
     			}
     		} else {
     			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_ERROR, "Memory Error!\n");
     		}
     	}
     	last = l;
     }
     switch_mutex_unlock(globals.listener_mutex);
    }
    
    
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到